diff --git a/core/core.go b/core/core.go index 9aca75859f34cfc470614debe8bd52647a4abf65..3f79d1fbbfc2bb2a2129c4413e88c9aa53d508e7 100644 --- a/core/core.go +++ b/core/core.go @@ -53,16 +53,11 @@ func NewCore(chainId string, consensusConfig *config.ModuleConfig, // pass the consensus engine into the pipe consensus.LoadConsensusEngineInPipe(consensusConfig, pipe) - - // [x] create state - // [x] from genesis - // [x] create event switch - // [x] give state and evsw to app - // [x] give app to consensus - // [x] create new Pipe - // [x] give consensus to pipe - // [ ] create servers - return &Core{}, fmt.Errorf("PLACEHOLDER") + return &Core{ + chainId: chainId, + evsw: evsw, + pipe: pipe, + }, nil } //------------------------------------------------------------------------------ @@ -71,3 +66,13 @@ func NewCore(chainId string, consensusConfig *config.ModuleConfig, // manager with a consensus engine. // TODO: [ben] before such Engine abstraction, // think about many-manager-to-one-consensus + +//------------------------------------------------------------------------------ +// Server functions +// NOTE: [ben] in phase 0 we exactly take over the full server architecture +// from Eris-DB and Tendermint; This is a draft and will be overhauled. + +// func (core *Core) StartGateway() { +// codec := &TCodec{} +// eventSubscriptions := +// } diff --git a/core/event_cache.go b/core/event_cache.go new file mode 100644 index 0000000000000000000000000000000000000000..f2e2e58a81d4b626ef708df6b67922c957ddba3a --- /dev/null +++ b/core/event_cache.go @@ -0,0 +1,127 @@ +package core + +import ( + "fmt" + "sync" + "time" + + "github.com/tendermint/go-events" + + definitions "github.com/eris-ltd/eris-db/definitions" +) + +var ( + reaperTimeout = 5 * time.Second + reaperThreshold = 10 * time.Second +) + +type EventCache struct { + mtx *sync.Mutex + events []interface{} + ts time.Time + subId string +} + +func newEventCache() *EventCache { + return &EventCache{ + &sync.Mutex{}, + make([]interface{}, 0), + time.Now(), + "", + } +} + +func (this *EventCache) poll() []interface{} { + this.mtx.Lock() + defer this.mtx.Unlock() + var evts []interface{} + if len(this.events) > 0 { + evts = this.events + this.events = []interface{}{} + } else { + evts = []interface{}{} + } + this.ts = time.Now() + return evts +} + +// Catches events that callers subscribe to and adds them to an array ready to be polled. +type EventSubscriptions struct { + mtx *sync.Mutex + eventEmitter definitions.EventEmitter + subs map[string]*EventCache + reap bool +} + +func NewEventSubscriptions(eventEmitter definitions.EventEmitter) *EventSubscriptions { + es := &EventSubscriptions{ + mtx: &sync.Mutex{}, + eventEmitter: eventEmitter, + subs: make(map[string]*EventCache), + reap: true, + } + go reap(es) + return es +} + +func reap(es *EventSubscriptions) { + if !es.reap { + return + } + time.Sleep(reaperTimeout) + es.mtx.Lock() + defer es.mtx.Unlock() + for id, sub := range es.subs { + if time.Since(sub.ts) > reaperThreshold { + // Seems like Go is ok with this.. + delete(es.subs, id) + es.eventEmitter.Unsubscribe(id) + } + } + go reap(es) +} + +// Add a subscription and return the generated id. Note event dispatcher +// has to call func which involves aquiring a mutex lock, so might be +// a delay - though a conflict is practically impossible, and if it does +// happen it's for an insignificant amount of time (the time it takes to +// carry out EventCache.poll() ). +func (this *EventSubscriptions) add(eventId string) (string, error) { + subId, errSID := generateSubId() + if errSID != nil { + return "", errSID + } + cache := newEventCache() + _, errC := this.eventEmitter.Subscribe(subId, eventId, + func(evt events.EventData) { + cache.mtx.Lock() + defer cache.mtx.Unlock() + cache.events = append(cache.events, evt) + }) + cache.subId = subId + this.subs[subId] = cache + if errC != nil { + return "", errC + } + return subId, nil +} + +func (this *EventSubscriptions) poll(subId string) ([]interface{}, error) { + sub, ok := this.subs[subId] + if !ok { + return nil, fmt.Errorf("Subscription not active. ID: " + subId) + } + return sub.poll(), nil +} + +func (this *EventSubscriptions) remove(subId string) error { + this.mtx.Lock() + defer this.mtx.Unlock() + // TODO Check this. + _, ok := this.subs[subId] + if !ok { + return fmt.Errorf("Subscription not active. ID: " + subId) + } + delete(this.subs, subId) + return nil +} diff --git a/core/event_cache_test.go b/core/event_cache_test.go new file mode 100644 index 0000000000000000000000000000000000000000..da0a483993b0ac7b73856a6fdf750a19cf3a003e --- /dev/null +++ b/core/event_cache_test.go @@ -0,0 +1,211 @@ +package core + +import ( + "encoding/hex" + "fmt" + "runtime" + "testing" + "time" + + "github.com/eris-ltd/eris-db/txs" + "github.com/stretchr/testify/assert" + "github.com/tendermint/go-events" +) + +var mockInterval = 10 * time.Millisecond + +type mockSub struct { + subId string + eventId string + f func(events.EventData) + shutdown bool + sdChan chan struct{} +} + +// A mock event +func newMockSub(subId, eventId string, f func(events.EventData)) mockSub { + return mockSub{subId, eventId, f, false, make(chan struct{})} +} + +type mockEventEmitter struct { + subs map[string]mockSub +} + +func newMockEventEmitter() *mockEventEmitter { + return &mockEventEmitter{make(map[string]mockSub)} +} + +func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(events.EventData)) (bool, error) { + if _, ok := this.subs[subId]; ok { + return false, nil + } + me := newMockSub(subId, eventId, callback) + + go func() { + <-me.sdChan + me.shutdown = true + }() + go func() { + for { + if !me.shutdown { + me.f(txs.EventDataNewBlock{}) + } else { + return + } + time.Sleep(mockInterval) + } + }() + return true, nil +} + +func (this *mockEventEmitter) Unsubscribe(subId string) (bool, error) { + sub, ok := this.subs[subId] + if !ok { + return false, nil + } + sub.shutdown = true + delete(this.subs, subId) + return true, nil +} + +// Test that event subscriptions can be added manually and then automatically reaped. +func TestSubReaping(t *testing.T) { + runtime.GOMAXPROCS(runtime.NumCPU()) + NUM_SUBS := 100 + reaperThreshold = 200 * time.Millisecond + reaperTimeout = 100 * time.Millisecond + + mee := newMockEventEmitter() + eSubs := NewEventSubscriptions(mee) + doneChan := make(chan error) + go func() { + for i := 0; i < NUM_SUBS; i++ { + time.Sleep(2 * time.Millisecond) + go func() { + id, err := eSubs.add("WeirdEvent") + if err != nil { + doneChan <- err + return + } + if len(id) != 64 { + doneChan <- fmt.Errorf("Id not of length 64") + return + } + _, err2 := hex.DecodeString(id) + if err2 != nil { + doneChan <- err2 + } + + doneChan <- nil + }() + } + }() + k := 0 + for k < NUM_SUBS { + err := <-doneChan + assert.NoError(t, err) + k++ + } + time.Sleep(1100 * time.Millisecond) + + assert.Len(t, mee.subs, 0) + assert.Len(t, eSubs.subs, 0) + t.Logf("Added %d subs that were all automatically reaped.", NUM_SUBS) +} + +// Test that event subscriptions can be added and removed manually. +func TestSubManualClose(t *testing.T) { + NUM_SUBS := 100 + // Keep the reaper out of this. + reaperThreshold = 10000 * time.Millisecond + reaperTimeout = 10000 * time.Millisecond + + mee := newMockEventEmitter() + eSubs := NewEventSubscriptions(mee) + doneChan := make(chan error) + go func() { + for i := 0; i < NUM_SUBS; i++ { + time.Sleep(2 * time.Millisecond) + go func() { + id, err := eSubs.add("WeirdEvent") + if err != nil { + doneChan <- err + return + } + if len(id) != 64 { + doneChan <- fmt.Errorf("Id not of length 64") + return + } + _, err2 := hex.DecodeString(id) + if err2 != nil { + doneChan <- err2 + } + time.Sleep(100 * time.Millisecond) + err3 := eSubs.remove(id) + if err3 != nil { + doneChan <- err3 + } + doneChan <- nil + }() + } + }() + k := 0 + for k < NUM_SUBS { + err := <-doneChan + assert.NoError(t, err) + k++ + } + + assert.Len(t, mee.subs, 0) + assert.Len(t, eSubs.subs, 0) + t.Logf("Added %d subs that were all closed down by unsubscribing.", NUM_SUBS) +} + +// Test that the system doesn't fail under high pressure. +func TestSubFlooding(t *testing.T) { + NUM_SUBS := 100 + // Keep the reaper out of this. + reaperThreshold = 10000 * time.Millisecond + reaperTimeout = 10000 * time.Millisecond + // Crank it up. Now pressure is 10 times higher on each sub. + mockInterval = 1 * time.Millisecond + mee := newMockEventEmitter() + eSubs := NewEventSubscriptions(mee) + doneChan := make(chan error) + go func() { + for i := 0; i < NUM_SUBS; i++ { + time.Sleep(1 * time.Millisecond) + go func() { + id, err := eSubs.add("WeirdEvent") + if err != nil { + doneChan <- err + return + } + if len(id) != 64 { + doneChan <- fmt.Errorf("Id not of length 64") + return + } + _, err2 := hex.DecodeString(id) + if err2 != nil { + doneChan <- err2 + } + time.Sleep(1000 * time.Millisecond) + err3 := eSubs.remove(id) + if err3 != nil { + doneChan <- err3 + } + doneChan <- nil + }() + } + }() + k := 0 + for k < NUM_SUBS { + err := <-doneChan + assert.NoError(t, err) + k++ + } + + assert.Len(t, mee.subs, 0) + assert.Len(t, eSubs.subs, 0) + t.Logf("Added %d subs that all received 1000 events each. They were all closed down by unsubscribing.", NUM_SUBS) +} diff --git a/core/methods.go b/core/methods.go new file mode 100644 index 0000000000000000000000000000000000000000..3dcecec07bc63e3f21b1392843ac3555a9763ebb --- /dev/null +++ b/core/methods.go @@ -0,0 +1,498 @@ +package core + +import ( + "crypto/rand" + "encoding/hex" + "strings" + + rpc "github.com/eris-ltd/eris-db/rpc" + + core_types "github.com/eris-ltd/eris-db/core/types" + definitions "github.com/eris-ltd/eris-db/definitions" + "github.com/eris-ltd/eris-db/txs" +) + +// TODO use the method name definition file. +const ( + SERVICE_NAME = "erisdb" + + GET_ACCOUNTS = SERVICE_NAME + ".getAccounts" // Accounts + GET_ACCOUNT = SERVICE_NAME + ".getAccount" + GET_STORAGE = SERVICE_NAME + ".getStorage" + GET_STORAGE_AT = SERVICE_NAME + ".getStorageAt" + GEN_PRIV_ACCOUNT = SERVICE_NAME + ".genPrivAccount" + GEN_PRIV_ACCOUNT_FROM_KEY = SERVICE_NAME + ".genPrivAccountFromKey" + GET_BLOCKCHAIN_INFO = SERVICE_NAME + ".getBlockchainInfo" // Blockchain + GET_GENESIS_HASH = SERVICE_NAME + ".getGenesisHash" + GET_LATEST_BLOCK_HEIGHT = SERVICE_NAME + ".getLatestBlockHeight" + GET_LATEST_BLOCK = SERVICE_NAME + ".getLatestBlock" + GET_BLOCKS = SERVICE_NAME + ".getBlocks" + GET_BLOCK = SERVICE_NAME + ".getBlock" + GET_CONSENSUS_STATE = SERVICE_NAME + ".getConsensusState" // Consensus + GET_VALIDATORS = SERVICE_NAME + ".getValidators" + GET_NETWORK_INFO = SERVICE_NAME + ".getNetworkInfo" // Net + GET_CLIENT_VERSION = SERVICE_NAME + ".getClientVersion" + GET_MONIKER = SERVICE_NAME + ".getMoniker" + GET_CHAIN_ID = SERVICE_NAME + ".getChainId" + IS_LISTENING = SERVICE_NAME + ".isListening" + GET_LISTENERS = SERVICE_NAME + ".getListeners" + GET_PEERS = SERVICE_NAME + ".getPeers" + GET_PEER = SERVICE_NAME + ".getPeer" + CALL = SERVICE_NAME + ".call" // Tx + CALL_CODE = SERVICE_NAME + ".callCode" + BROADCAST_TX = SERVICE_NAME + ".broadcastTx" + GET_UNCONFIRMED_TXS = SERVICE_NAME + ".getUnconfirmedTxs" + SIGN_TX = SERVICE_NAME + ".signTx" + TRANSACT = SERVICE_NAME + ".transact" + TRANSACT_AND_HOLD = SERVICE_NAME + ".transactAndHold" + TRANSACT_NAMEREG = SERVICE_NAME + ".transactNameReg" + EVENT_SUBSCRIBE = SERVICE_NAME + ".eventSubscribe" // Events + EVENT_UNSUBSCRIBE = SERVICE_NAME + ".eventUnsubscribe" + EVENT_POLL = SERVICE_NAME + ".eventPoll" + GET_NAMEREG_ENTRY = SERVICE_NAME + ".getNameRegEntry" // Namereg + GET_NAMEREG_ENTRIES = SERVICE_NAME + ".getNameRegEntries" +) + +// The rpc method handlers. +type ErisDbMethods struct { + codec rpc.Codec + pipe definitions.Pipe +} + +// Used to handle requests. interface{} param is a wildcard used for example with socket events. +type RequestHandlerFunc func(*rpc.RPCRequest, interface{}) (interface{}, int, error) + +// Private. Create a method name -> method handler map. +func (this *ErisDbMethods) getMethods() map[string]RequestHandlerFunc { + dhMap := make(map[string]RequestHandlerFunc) + // Accounts + dhMap[GET_ACCOUNTS] = this.Accounts + dhMap[GET_ACCOUNT] = this.Account + dhMap[GET_STORAGE] = this.AccountStorage + dhMap[GET_STORAGE_AT] = this.AccountStorageAt + dhMap[GEN_PRIV_ACCOUNT] = this.GenPrivAccount + dhMap[GEN_PRIV_ACCOUNT_FROM_KEY] = this.GenPrivAccountFromKey + // Blockchain + dhMap[GET_BLOCKCHAIN_INFO] = this.BlockchainInfo + dhMap[GET_GENESIS_HASH] = this.GenesisHash + dhMap[GET_LATEST_BLOCK_HEIGHT] = this.LatestBlockHeight + dhMap[GET_LATEST_BLOCK] = this.LatestBlock + dhMap[GET_BLOCKS] = this.Blocks + dhMap[GET_BLOCK] = this.Block + // Consensus + dhMap[GET_CONSENSUS_STATE] = this.ConsensusState + dhMap[GET_VALIDATORS] = this.Validators + // Network + dhMap[GET_NETWORK_INFO] = this.NetworkInfo + dhMap[GET_CLIENT_VERSION] = this.ClientVersion + dhMap[GET_MONIKER] = this.Moniker + dhMap[GET_CHAIN_ID] = this.ChainId + dhMap[IS_LISTENING] = this.Listening + dhMap[GET_LISTENERS] = this.Listeners + dhMap[GET_PEERS] = this.Peers + dhMap[GET_PEER] = this.Peer + // Txs + dhMap[CALL] = this.Call + dhMap[CALL_CODE] = this.CallCode + dhMap[BROADCAST_TX] = this.BroadcastTx + dhMap[GET_UNCONFIRMED_TXS] = this.UnconfirmedTxs + dhMap[SIGN_TX] = this.SignTx + dhMap[TRANSACT] = this.Transact + dhMap[TRANSACT_AND_HOLD] = this.TransactAndHold + dhMap[TRANSACT_NAMEREG] = this.TransactNameReg + // Namereg + dhMap[GET_NAMEREG_ENTRY] = this.NameRegEntry + dhMap[GET_NAMEREG_ENTRIES] = this.NameRegEntries + + return dhMap +} + +// TODO add some sanity checks on address params and such. +// Look into the reflection code in core, see what can be used. + +// *************************************** Accounts *************************************** + +func (this *ErisDbMethods) GenPrivAccount(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + pac, errC := this.pipe.Accounts().GenPrivAccount() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return pac, 0, nil +} + +func (this *ErisDbMethods) GenPrivAccountFromKey(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + + param := &PrivKeyParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + + privKey := param.PrivKey + pac, errC := this.pipe.Accounts().GenPrivAccountFromKey(privKey) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return pac, 0, nil +} + +func (this *ErisDbMethods) Account(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &AddressParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + address := param.Address + // TODO is address check? + account, errC := this.pipe.Accounts().Account(address) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return account, 0, nil +} + +func (this *ErisDbMethods) Accounts(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &AccountsParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + list, errC := this.pipe.Accounts().Accounts(param.Filters) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return list, 0, nil +} + +func (this *ErisDbMethods) AccountStorage(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &AddressParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + address := param.Address + storage, errC := this.pipe.Accounts().Storage(address) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return storage, 0, nil +} + +func (this *ErisDbMethods) AccountStorageAt(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &StorageAtParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + address := param.Address + key := param.Key + storageItem, errC := this.pipe.Accounts().StorageAt(address, key) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return storageItem, 0, nil +} + +// *************************************** Blockchain ************************************ + +func (this *ErisDbMethods) BlockchainInfo(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + status, errC := this.pipe.Blockchain().Info() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return status, 0, nil +} + +func (this *ErisDbMethods) ChainId(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + chainId, errC := this.pipe.Blockchain().ChainId() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.ChainId{chainId}, 0, nil +} + +func (this *ErisDbMethods) GenesisHash(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + hash, errC := this.pipe.Blockchain().GenesisHash() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.GenesisHash{hash}, 0, nil +} + +func (this *ErisDbMethods) LatestBlockHeight(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + height, errC := this.pipe.Blockchain().LatestBlockHeight() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.LatestBlockHeight{height}, 0, nil +} + +func (this *ErisDbMethods) LatestBlock(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + + block, errC := this.pipe.Blockchain().LatestBlock() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return block, 0, nil +} + +func (this *ErisDbMethods) Blocks(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &BlocksParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + blocks, errC := this.pipe.Blockchain().Blocks(param.Filters) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return blocks, 0, nil +} + +func (this *ErisDbMethods) Block(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &HeightParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + height := param.Height + block, errC := this.pipe.Blockchain().Block(height) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return block, 0, nil +} + +// *************************************** Consensus ************************************ + +func (this *ErisDbMethods) ConsensusState(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + state, errC := this.pipe.Consensus().State() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return state, 0, nil +} + +func (this *ErisDbMethods) Validators(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + validators, errC := this.pipe.Consensus().Validators() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return validators, 0, nil +} + +// *************************************** Net ************************************ + +func (this *ErisDbMethods) NetworkInfo(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + info, errC := this.pipe.Net().Info() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return info, 0, nil +} + +func (this *ErisDbMethods) ClientVersion(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + version, errC := this.pipe.Net().ClientVersion() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.ClientVersion{version}, 0, nil +} + +func (this *ErisDbMethods) Moniker(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + moniker, errC := this.pipe.Net().Moniker() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.Moniker{moniker}, 0, nil +} + +func (this *ErisDbMethods) Listening(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + listening, errC := this.pipe.Net().Listening() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.Listening{listening}, 0, nil +} + +func (this *ErisDbMethods) Listeners(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + listeners, errC := this.pipe.Net().Listeners() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.Listeners{listeners}, 0, nil +} + +func (this *ErisDbMethods) Peers(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + peers, errC := this.pipe.Net().Peers() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return peers, 0, nil +} + +func (this *ErisDbMethods) Peer(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &PeerParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + address := param.Address + peer, errC := this.pipe.Net().Peer(address) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return peer, 0, nil +} + +// *************************************** Txs ************************************ + +func (this *ErisDbMethods) Call(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &CallParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + from := param.From + to := param.Address + data := param.Data + call, errC := this.pipe.Transactor().Call(from, to, data) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return call, 0, nil +} + +func (this *ErisDbMethods) CallCode(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &CallCodeParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + from := param.From + code := param.Code + data := param.Data + call, errC := this.pipe.Transactor().CallCode(from, code, data) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return call, 0, nil +} + +func (this *ErisDbMethods) BroadcastTx(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &txs.CallTx{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + receipt, errC := this.pipe.Transactor().BroadcastTx(param) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return receipt, 0, nil +} + +func (this *ErisDbMethods) Transact(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &TransactParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + receipt, errC := this.pipe.Transactor().Transact(param.PrivKey, param.Address, param.Data, param.GasLimit, param.Fee) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return receipt, 0, nil +} + +func (this *ErisDbMethods) TransactAndHold(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &TransactParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + ce, errC := this.pipe.Transactor().TransactAndHold(param.PrivKey, param.Address, param.Data, param.GasLimit, param.Fee) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return ce, 0, nil +} + +func (this *ErisDbMethods) TransactNameReg(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &TransactNameRegParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + receipt, errC := this.pipe.Transactor().TransactNameReg(param.PrivKey, param.Name, param.Data, param.Amount, param.Fee) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return receipt, 0, nil +} + +func (this *ErisDbMethods) UnconfirmedTxs(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + txs, errC := this.pipe.Transactor().UnconfirmedTxs() + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return txs, 0, nil +} + +func (this *ErisDbMethods) SignTx(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &SignTxParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + tx := param.Tx + pAccs := param.PrivAccounts + txRet, errC := this.pipe.Transactor().SignTx(tx, pAccs) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return txRet, 0, nil +} + +// *************************************** Name Registry *************************************** + +func (this *ErisDbMethods) NameRegEntry(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &NameRegEntryParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + name := param.Name + // TODO is address check? + entry, errC := this.pipe.NameReg().Entry(name) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return entry, 0, nil +} + +func (this *ErisDbMethods) NameRegEntries(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &FilterListParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + list, errC := this.pipe.NameReg().Entries(param.Filters) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return list, 0, nil +} + +// ************************************************************************************** + +func generateSubId() (string, error) { + b := make([]byte, 32) + _, err := rand.Read(b) + if err != nil { + return "", err + } + rStr := hex.EncodeToString(b) + return strings.ToUpper(rStr), nil + +} diff --git a/core/params.go b/core/params.go new file mode 100644 index 0000000000000000000000000000000000000000..1549eaf94e6c01f203222b21f430b63325ded463 --- /dev/null +++ b/core/params.go @@ -0,0 +1,106 @@ +package core + +import ( + "github.com/eris-ltd/eris-db/account" + core_types "github.com/eris-ltd/eris-db/core/types" + "github.com/eris-ltd/eris-db/txs" +) + +type ( + + // Used to send an address. The address should be hex and properly formatted. + // TODO enforce. + AddressParam struct { + Address []byte `json:"address"` + } + + // Used to send an address + // TODO deprecate in favor of 'FilterListParam' + AccountsParam struct { + Filters []*core_types.FilterData `json:"filters"` + } + + // Used to send an address + FilterListParam struct { + Filters []*core_types.FilterData `json:"filters"` + } + + PrivKeyParam struct { + PrivKey []byte `json:"priv_key"` + } + + // StorageAt + StorageAtParam struct { + Address []byte `json:"address"` + Key []byte `json:"key"` + } + + // Get a block + HeightParam struct { + Height int `json:"height"` + } + + // Get a series of blocks + // TODO deprecate in favor of 'FilterListParam' + BlocksParam struct { + Filters []*core_types.FilterData `json:"filters"` + } + + // Event Id + EventIdParam struct { + EventId string `json:"event_id"` + } + + // Event Id + SubIdParam struct { + SubId string `json:"sub_id"` + } + + PeerParam struct { + Address string `json:"address"` + } + + // Used when doing calls + CallParam struct { + Address []byte `json:"address"` + From []byte `json:"from"` + Data []byte `json:"data"` + } + + // Used when doing code calls + CallCodeParam struct { + From []byte `json:"from"` + Code []byte `json:"code"` + Data []byte `json:"data"` + } + + // Used when signing a tx. Uses placeholders just like TxParam + SignTxParam struct { + Tx *txs.CallTx `json:"tx"` + PrivAccounts []*account.PrivAccount `json:"priv_accounts"` + } + + // Used when sending a transaction to be created and signed on the server + // (using the private key). This only uses the standard key type for now. + TransactParam struct { + PrivKey []byte `json:"priv_key"` + Data []byte `json:"data"` + Address []byte `json:"address"` + Fee int64 `json:"fee"` + GasLimit int64 `json:"gas_limit"` + } + + NameRegEntryParam struct { + Name string `json:"name"` + } + + // Used when sending a namereg transaction to be created and signed on the server + // (using the private key). This only uses the standard key type for now. + TransactNameRegParam struct { + PrivKey []byte `json:"priv_key"` + Name string `json:"name"` + Data string `json:"data"` + Fee int64 `json:"fee"` + Amount int64 `json:"amount"` + } +) diff --git a/core/types/codec.go b/core/types/codec.go new file mode 100644 index 0000000000000000000000000000000000000000..eff1ac4b61220934e374743ec0258156a442b2f7 --- /dev/null +++ b/core/types/codec.go @@ -0,0 +1,66 @@ +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +package types + +import ( + "io" + "io/ioutil" + + wire "github.com/tendermint/go-wire" + + rpc "github.com/eris-ltd/eris-db/rpc" +) + +// Codec that uses tendermints 'binary' package for JSON. +type TCodec struct { +} + +// Get a new codec. +func NewTCodec() rpc.Codec { + return &TCodec{} +} + +// Encode to an io.Writer. +func (this *TCodec) Encode(v interface{}, w io.Writer) error { + var err error + var n int + wire.WriteJSON(v, w, &n, &err) + return err +} + +// Encode to a byte array. +func (this *TCodec) EncodeBytes(v interface{}) ([]byte, error) { + return wire.JSONBytes(v), nil +} + +// Decode from an io.Reader. +func (this *TCodec) Decode(v interface{}, r io.Reader) error { + bts, errR := ioutil.ReadAll(r) + if errR != nil { + return errR + } + var err error + wire.ReadJSON(v, bts, &err) + return err +} + +// Decode from a byte array. +func (this *TCodec) DecodeBytes(v interface{}, bts []byte) error { + var err error + wire.ReadJSON(v, bts, &err) + return err +} diff --git a/definitions/consensus.go b/definitions/consensus.go new file mode 100644 index 0000000000000000000000000000000000000000..a4890f06f52082e581ef7aa31bf3dca213d6c21b --- /dev/null +++ b/definitions/consensus.go @@ -0,0 +1,34 @@ +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +package definitions + +// TODO: [ben] explore the value of abstracting the consensus into an interface +// currently we cut a corner here and suffices to have direct calls. + +// for now an empty interface +type ConsensusEngine interface { + +} + +// type Communicator interface { +// // Unicast() +// Broadcast() +// } +// +// type ConsensusModule interface { +// Start() +// } diff --git a/version/version.go b/version/version.go index 428b995ff5122b7021090aa7f2481ac0acc2029f..cad1310619701d40e99e308b58f2d3ac5f84ee60 100644 --- a/version/version.go +++ b/version/version.go @@ -23,6 +23,9 @@ import ( "fmt" ) +// IMPORTANT: this version number needs to be manually kept +// in sync at the bottom of this file for the deployment scripts to parse +// the version number. const ( // Client identifier to advertise over the network erisClientIdentifier = "eris-db"