diff --git a/consensus/tendermint/tendermint.go b/consensus/tendermint/tendermint.go index 8b25ca85ca5d180c456abde1ad61a2ce3f6329b9..d88f979d29697c01ce6146a9e6fbc7ea1d1a6ff9 100644 --- a/consensus/tendermint/tendermint.go +++ b/consensus/tendermint/tendermint.go @@ -32,6 +32,7 @@ import ( tendermint_types "github.com/tendermint/tendermint/types" tmsp_types "github.com/tendermint/tmsp/types" + edb_event "github.com/eris-ltd/eris-db/event" log "github.com/eris-ltd/eris-logger" config "github.com/eris-ltd/eris-db/config" @@ -198,6 +199,10 @@ func (this *TendermintNode) PublicValidatorKey() crypto.PubKey { return copyPublicValidatorKey } +func (this *TendermintNode) Events() edb_event.EventEmitter { + return edb_event.NewEvents(this.tmintNode.EventSwitch()) +} + func (this *TendermintNode) BroadcastTransaction(transaction []byte, callback func(*tmsp_types.Response)) error { return this.tmintNode.MempoolReactor().BroadcastTx(transaction, callback) diff --git a/definitions/consensus.go b/definitions/consensus.go index 42d1f4a2418ced8226e7f938ee3d35c9c8a9bcb6..3043095a8c89471fa80ac3d105c2113d60a38e2d 100644 --- a/definitions/consensus.go +++ b/definitions/consensus.go @@ -22,6 +22,7 @@ import ( tendermint_types "github.com/tendermint/tendermint/types" tmsp_types "github.com/tendermint/tmsp/types" + edb_event "github.com/eris-ltd/eris-db/event" rpc_tendermint_types "github.com/eris-ltd/eris-db/rpc/tendermint/core/types" ) @@ -46,6 +47,10 @@ type ConsensusEngine interface { // Memory pool BroadcastTransaction(transaction []byte, callback func(*tmsp_types.Response)) error + + // Events + // For consensus events like NewBlock + Events() edb_event.EventEmitter } // type Communicator interface { diff --git a/event/event_cache.go b/event/event_cache.go index 326cf9b1c56b7792e952c225f5d0beb23606e1ff..2db266101e5b97e86bc0d09b919a6782f1f72ed5 100644 --- a/event/event_cache.go +++ b/event/event_cache.go @@ -90,13 +90,15 @@ func (this *EventSubscriptions) Add(eventId string) (string, error) { return "", errSID } cache := newEventCache() - _, errC := this.eventEmitter.Subscribe(subId, eventId, + errC := this.eventEmitter.Subscribe(subId, eventId, func(evt evts.EventData) { cache.mtx.Lock() defer cache.mtx.Unlock() cache.events = append(cache.events, evt) }) cache.subId = subId + this.mtx.Lock() + defer this.mtx.Unlock() this.subs[subId] = cache if errC != nil { return "", errC diff --git a/event/event_cache_test.go b/event/event_cache_test.go index 38f57a0484e455f562a76696d87e8e93d8acc3b0..9240457184c53b0d410e6c3f07b3e93d524451e1 100644 --- a/event/event_cache_test.go +++ b/event/event_cache_test.go @@ -7,7 +7,8 @@ import ( "testing" "time" - "github.com/eris-ltd/eris-db/txs" + "sync" + "github.com/stretchr/testify/assert" evts "github.com/tendermint/go-events" ) @@ -22,24 +23,33 @@ type mockSub struct { sdChan chan struct{} } +type mockEventData struct { + subId string + eventId string +} + // A mock event func newMockSub(subId, eventId string, f func(evts.EventData)) mockSub { return mockSub{subId, eventId, f, false, make(chan struct{})} } type mockEventEmitter struct { - subs map[string]mockSub + subs map[string]mockSub + mutex *sync.Mutex } func newMockEventEmitter() *mockEventEmitter { - return &mockEventEmitter{make(map[string]mockSub)} + return &mockEventEmitter{make(map[string]mockSub), &sync.Mutex{}} } -func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) (bool, error) { +func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) error { if _, ok := this.subs[subId]; ok { - return false, nil + return nil } me := newMockSub(subId, eventId, callback) + this.mutex.Lock() + this.subs[subId] = me + this.mutex.Unlock() go func() { <-me.sdChan @@ -48,24 +58,27 @@ func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evt go func() { for { if !me.shutdown { - me.f(txs.EventDataNewBlock{}) + me.f(mockEventData{subId, eventId}) } else { + this.mutex.Lock() + delete(this.subs, subId) + this.mutex.Unlock() return } time.Sleep(mockInterval) } }() - return true, nil + return nil } -func (this *mockEventEmitter) Unsubscribe(subId string) (bool, error) { +func (this *mockEventEmitter) Unsubscribe(subId string) error { sub, ok := this.subs[subId] if !ok { - return false, nil + return nil } sub.shutdown = true delete(this.subs, subId) - return true, nil + return nil } // Test that event subscriptions can be added manually and then automatically reaped. @@ -156,7 +169,6 @@ func TestSubManualClose(t *testing.T) { k++ } - assert.Len(t, mee.subs, 0) assert.Len(t, eSubs.subs, 0) t.Logf("Added %d subs that were all closed down by unsubscribing.", NUM_SUBS) } @@ -205,7 +217,6 @@ func TestSubFlooding(t *testing.T) { k++ } - assert.Len(t, mee.subs, 0) assert.Len(t, eSubs.subs, 0) t.Logf("Added %d subs that all received 1000 events each. They were all closed down by unsubscribing.", NUM_SUBS) } diff --git a/event/events.go b/event/events.go index dd5f47f6a41e9ee1ffbdcf0b1a96db2f113c7b84..b776c0debb87d11f54e3a39724d2ba09bdf0586c 100644 --- a/event/events.go +++ b/event/events.go @@ -30,8 +30,19 @@ import ( // that there is no need anymore for this poor wrapper. type EventEmitter interface { - Subscribe(subId, event string, callback func(evts.EventData)) (bool, error) - Unsubscribe(subId string) (bool, error) + Subscribe(subId, event string, callback func(evts.EventData)) error + Unsubscribe(subId string) error +} + +func NewEvents(eventSwitch *evts.EventSwitch) *events { + return &events{eventSwitch} +} + +// Provides an EventEmitter that wraps many underlying EventEmitters as a +// convenience for Subscribing and Unsubscribing on multiple EventEmitters at +// once +func Multiplex(events ...EventEmitter) *multiplexedEvents { + return &multiplexedEvents{events} } // The events struct has methods for working with events. @@ -39,20 +50,44 @@ type events struct { eventSwitch *evts.EventSwitch } -func NewEvents(eventSwitch *evts.EventSwitch) *events { - return &events{eventSwitch} -} - // Subscribe to an event. -func (this *events) Subscribe(subId, event string, callback func(evts.EventData)) (bool, error) { +func (this *events) Subscribe(subId, event string, callback func(evts.EventData)) error { this.eventSwitch.AddListenerForEvent(subId, event, callback) - return true, nil + return nil } // Un-subscribe from an event. -func (this *events) Unsubscribe(subId string) (bool, error) { +func (this *events) Unsubscribe(subId string) error { this.eventSwitch.RemoveListener(subId) - return true, nil + return nil +} + +type multiplexedEvents struct { + eventEmitters []EventEmitter +} + +// Subscribe to an event. +func (multiEvents *multiplexedEvents) Subscribe(subId, event string, callback func(evts.EventData)) error { + for _, eventEmitter := range multiEvents.eventEmitters { + err := eventEmitter.Subscribe(subId, event, callback) + if err != nil { + return err + } + } + + return nil +} + +// Un-subscribe from an event. +func (multiEvents *multiplexedEvents) Unsubscribe(subId string) error { + for _, eventEmitter := range multiEvents.eventEmitters { + err := eventEmitter.Unsubscribe(subId) + if err != nil { + return err + } + } + + return nil } // *********************************** Events *********************************** diff --git a/event/events_test.go b/event/events_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7fdf3e7bd245ed34c8fb96a11a7e3a33c612cde0 --- /dev/null +++ b/event/events_test.go @@ -0,0 +1,60 @@ +package event + +import ( + "testing" + + "sync" + "time" + + "github.com/stretchr/testify/assert" + evts "github.com/tendermint/go-events" +) + +func TestMultiplexedEvents(t *testing.T) { + emitter1 := newMockEventEmitter() + emitter2 := newMockEventEmitter() + emitter12 := Multiplex(emitter1, emitter2) + + eventData1 := make(map[evts.EventData]int) + eventData2 := make(map[evts.EventData]int) + eventData12 := make(map[evts.EventData]int) + + mutex1 := &sync.Mutex{} + mutex2 := &sync.Mutex{} + mutex12 := &sync.Mutex{} + + emitter12.Subscribe("Sub12", "Event12", func(eventData evts.EventData) { + mutex12.Lock() + eventData12[eventData] = 1 + mutex12.Unlock() + }) + emitter1.Subscribe("Sub1", "Event1", func(eventData evts.EventData) { + mutex1.Lock() + eventData1[eventData] = 1 + mutex1.Unlock() + }) + emitter2.Subscribe("Sub2", "Event2", func(eventData evts.EventData) { + mutex2.Lock() + eventData2[eventData] = 1 + mutex2.Unlock() + }) + + time.Sleep(mockInterval) + + allEventData := make(map[evts.EventData]int) + for k, v := range eventData1 { + allEventData[k] = v + } + for k, v := range eventData2 { + allEventData[k] = v + } + + assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub1", "Event1"}: 1}, + eventData1) + assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub2", "Event2"}: 1}, + eventData2) + assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub12", "Event12"}: 1}, + eventData12) + + assert.NotEmpty(t, allEventData, "Some events should have been published") +} diff --git a/manager/eris-mint/pipe.go b/manager/eris-mint/pipe.go index ceafeebe82dcc060faf9e7b3fe5e6cecb8242aaf..06dde2de3fa058778bf70e134eaa7f590e2d5ae5 100644 --- a/manager/eris-mint/pipe.go +++ b/manager/eris-mint/pipe.go @@ -44,9 +44,8 @@ import ( "github.com/eris-ltd/eris-db/txs" ) -type ErisMintPipe struct { +type erisMintPipe struct { erisMintState *state.State - eventSwitch *go_events.EventSwitch erisMint *ErisMint // Pipe implementations accounts *accounts @@ -63,16 +62,16 @@ type ErisMintPipe struct { genesisState *state.State } -// NOTE [ben] Compiler check to ensure ErisMintPipe successfully implements +// NOTE [ben] Compiler check to ensure erisMintPipe successfully implements // eris-db/definitions.Pipe -var _ definitions.Pipe = (*ErisMintPipe)(nil) +var _ definitions.Pipe = (*erisMintPipe)(nil) -// NOTE [ben] Compiler check to ensure ErisMintPipe successfully implements +// NOTE [ben] Compiler check to ensure erisMintPipe successfully implements // eris-db/definitions.erisTendermintPipe -var _ definitions.TendermintPipe = (*ErisMintPipe)(nil) +var _ definitions.TendermintPipe = (*erisMintPipe)(nil) func NewErisMintPipe(moduleConfig *config.ModuleConfig, - eventSwitch *go_events.EventSwitch) (*ErisMintPipe, error) { + eventSwitch *go_events.EventSwitch) (*erisMintPipe, error) { startedState, genesisDoc, err := startState(moduleConfig.DataDir, moduleConfig.Config.GetString("db_backend"), moduleConfig.GenesisFile, @@ -106,9 +105,8 @@ func NewErisMintPipe(moduleConfig *config.ModuleConfig, // consensus := newConsensus(erisdbApp) // net := newNetwork(erisdbApp) - return &ErisMintPipe{ + return &erisMintPipe{ erisMintState: startedState, - eventSwitch: eventSwitch, erisMint: erisMint, accounts: accounts, events: events, @@ -173,41 +171,41 @@ func startState(dataDir, backend, genesisFile, chainId string) (*state.State, } //------------------------------------------------------------------------------ -// Implement definitions.Pipe for ErisMintPipe +// Implement definitions.Pipe for erisMintPipe -func (pipe *ErisMintPipe) Accounts() definitions.Accounts { +func (pipe *erisMintPipe) Accounts() definitions.Accounts { return pipe.accounts } -func (pipe *ErisMintPipe) Blockchain() definitions.Blockchain { +func (pipe *erisMintPipe) Blockchain() definitions.Blockchain { return pipe.blockchain } -func (pipe *ErisMintPipe) Consensus() definitions.Consensus { +func (pipe *erisMintPipe) Consensus() definitions.Consensus { return pipe.consensus } -func (pipe *ErisMintPipe) Events() edb_event.EventEmitter { +func (pipe *erisMintPipe) Events() edb_event.EventEmitter { return pipe.events } -func (pipe *ErisMintPipe) NameReg() definitions.NameReg { +func (pipe *erisMintPipe) NameReg() definitions.NameReg { return pipe.namereg } -func (pipe *ErisMintPipe) Net() definitions.Net { +func (pipe *erisMintPipe) Net() definitions.Net { return pipe.network } -func (pipe *ErisMintPipe) Transactor() definitions.Transactor { +func (pipe *erisMintPipe) Transactor() definitions.Transactor { return pipe.transactor } -func (pipe *ErisMintPipe) GetApplication() manager_types.Application { +func (pipe *erisMintPipe) GetApplication() manager_types.Application { return pipe.erisMint } -func (pipe *ErisMintPipe) SetConsensusEngine( +func (pipe *erisMintPipe) SetConsensusEngine( consensus definitions.ConsensusEngine) error { if pipe.consensusEngine == nil { pipe.consensusEngine = consensus @@ -217,22 +215,28 @@ func (pipe *ErisMintPipe) SetConsensusEngine( return nil } -func (pipe *ErisMintPipe) GetConsensusEngine() definitions.ConsensusEngine { +func (pipe *erisMintPipe) GetConsensusEngine() definitions.ConsensusEngine { return pipe.consensusEngine } -func (pipe *ErisMintPipe) GetTendermintPipe() (definitions.TendermintPipe, +func (pipe *erisMintPipe) GetTendermintPipe() (definitions.TendermintPipe, error) { return definitions.TendermintPipe(pipe), nil } +func (pipe *erisMintPipe) consensusAndManagerEvents() edb_event.EventEmitter { + // NOTE: [Silas] We could initialise this lazily and use the cached instance, + // but for the time being that feels like a premature optimisation + return edb_event.Multiplex(pipe.events, pipe.consensusEngine.Events()) +} + //------------------------------------------------------------------------------ -// Implement definitions.TendermintPipe for ErisMintPipe -func (pipe *ErisMintPipe) Subscribe(listenerId, event string, +// Implement definitions.TendermintPipe for erisMintPipe +func (pipe *erisMintPipe) Subscribe(listenerId, event string, rpcResponseWriter func(result rpc_tm_types.ErisDBResult)) (*rpc_tm_types.ResultSubscribe, error) { log.WithFields(log.Fields{"listenerId": listenerId, "event": event}). Info("Subscribing to event") - pipe.events.Subscribe(subscriptionId(listenerId, event), event, + pipe.consensusAndManagerEvents().Subscribe(subscriptionId(listenerId, event), event, func(eventData go_events.EventData) { result := rpc_tm_types.ErisDBResult(&rpc_tm_types.ResultEvent{event, tm_types.TMEventData(eventData)}) @@ -242,11 +246,11 @@ func (pipe *ErisMintPipe) Subscribe(listenerId, event string, return &rpc_tm_types.ResultSubscribe{}, nil } -func (pipe *ErisMintPipe) Unsubscribe(listenerId, +func (pipe *erisMintPipe) Unsubscribe(listenerId, event string) (*rpc_tm_types.ResultUnsubscribe, error) { log.WithFields(log.Fields{"listenerId": listenerId, "event": event}). - Info("Unsubsribing from event") - pipe.events.Unsubscribe(subscriptionId(listenerId, event)) + Info("Unsubscribing from event") + pipe.consensusAndManagerEvents().Unsubscribe(subscriptionId(listenerId, event)) return &rpc_tm_types.ResultUnsubscribe{}, nil } @@ -254,7 +258,7 @@ func subscriptionId(listenerId, event string) string { return fmt.Sprintf("%s#%s", listenerId, event) } -func (pipe *ErisMintPipe) Status() (*rpc_tm_types.ResultStatus, error) { +func (pipe *erisMintPipe) Status() (*rpc_tm_types.ResultStatus, error) { memoryDatabase := db.NewMemDB() if pipe.genesisState == nil { pipe.genesisState = state.MakeGenesisState(memoryDatabase, pipe.genesisDoc) @@ -283,7 +287,7 @@ func (pipe *ErisMintPipe) Status() (*rpc_tm_types.ResultStatus, error) { LatestBlockTime: latestBlockTime}, nil } -func (pipe *ErisMintPipe) NetInfo() (*rpc_tm_types.ResultNetInfo, error) { +func (pipe *erisMintPipe) NetInfo() (*rpc_tm_types.ResultNetInfo, error) { listening := pipe.consensusEngine.IsListening() listeners := []string{} for _, listener := range pipe.consensusEngine.Listeners() { @@ -297,7 +301,7 @@ func (pipe *ErisMintPipe) NetInfo() (*rpc_tm_types.ResultNetInfo, error) { }, nil } -func (pipe *ErisMintPipe) Genesis() (*rpc_tm_types.ResultGenesis, error) { +func (pipe *erisMintPipe) Genesis() (*rpc_tm_types.ResultGenesis, error) { return &rpc_tm_types.ResultGenesis{ // TODO: [ben] sharing pointer to unmutated GenesisDoc, but is not immutable Genesis: pipe.genesisDoc, @@ -305,7 +309,7 @@ func (pipe *ErisMintPipe) Genesis() (*rpc_tm_types.ResultGenesis, error) { } // Accounts -func (pipe *ErisMintPipe) GetAccount(address []byte) (*rpc_tm_types.ResultGetAccount, +func (pipe *erisMintPipe) GetAccount(address []byte) (*rpc_tm_types.ResultGetAccount, error) { cache := pipe.erisMint.GetCheckCache() // cache := mempoolReactor.Mempool.GetCache() @@ -317,7 +321,7 @@ func (pipe *ErisMintPipe) GetAccount(address []byte) (*rpc_tm_types.ResultGetAcc return &rpc_tm_types.ResultGetAccount{account}, nil } -func (pipe *ErisMintPipe) ListAccounts() (*rpc_tm_types.ResultListAccounts, error) { +func (pipe *erisMintPipe) ListAccounts() (*rpc_tm_types.ResultListAccounts, error) { var blockHeight int var accounts []*account.Account state := pipe.erisMint.GetState() @@ -329,7 +333,7 @@ func (pipe *ErisMintPipe) ListAccounts() (*rpc_tm_types.ResultListAccounts, erro return &rpc_tm_types.ResultListAccounts{blockHeight, accounts}, nil } -func (pipe *ErisMintPipe) GetStorage(address, key []byte) (*rpc_tm_types.ResultGetStorage, +func (pipe *erisMintPipe) GetStorage(address, key []byte) (*rpc_tm_types.ResultGetStorage, error) { state := pipe.erisMint.GetState() // state := consensusState.GetState() @@ -349,7 +353,7 @@ func (pipe *ErisMintPipe) GetStorage(address, key []byte) (*rpc_tm_types.ResultG return &rpc_tm_types.ResultGetStorage{key, value}, nil } -func (pipe *ErisMintPipe) DumpStorage(address []byte) (*rpc_tm_types.ResultDumpStorage, +func (pipe *erisMintPipe) DumpStorage(address []byte) (*rpc_tm_types.ResultDumpStorage, error) { state := pipe.erisMint.GetState() account := state.GetAccount(address) @@ -368,7 +372,7 @@ func (pipe *ErisMintPipe) DumpStorage(address []byte) (*rpc_tm_types.ResultDumpS } // Call -func (pipe *ErisMintPipe) Call(fromAddress, toAddress, data []byte) (*rpc_tm_types.ResultCall, +func (pipe *erisMintPipe) Call(fromAddress, toAddress, data []byte) (*rpc_tm_types.ResultCall, error) { st := pipe.erisMint.GetState() cache := state.NewBlockCache(st) @@ -395,7 +399,7 @@ func (pipe *ErisMintPipe) Call(fromAddress, toAddress, data []byte) (*rpc_tm_typ return &rpc_tm_types.ResultCall{Return: ret}, nil } -func (pipe *ErisMintPipe) CallCode(fromAddress, code, data []byte) (*rpc_tm_types.ResultCall, +func (pipe *erisMintPipe) CallCode(fromAddress, code, data []byte) (*rpc_tm_types.ResultCall, error) { st := pipe.erisMint.GetState() cache := pipe.erisMint.GetCheckCache() @@ -421,7 +425,7 @@ func (pipe *ErisMintPipe) CallCode(fromAddress, code, data []byte) (*rpc_tm_type // TODO: [ben] deprecate as we should not allow unsafe behaviour // where a user is allowed to send a private key over the wire, // especially unencrypted. -func (pipe *ErisMintPipe) SignTransaction(tx txs.Tx, +func (pipe *erisMintPipe) SignTransaction(tx txs.Tx, privAccounts []*account.PrivAccount) (*rpc_tm_types.ResultSignTx, error) { @@ -461,7 +465,7 @@ func (pipe *ErisMintPipe) SignTransaction(tx txs.Tx, } // Name registry -func (pipe *ErisMintPipe) GetName(name string) (*rpc_tm_types.ResultGetName, error) { +func (pipe *erisMintPipe) GetName(name string) (*rpc_tm_types.ResultGetName, error) { currentState := pipe.erisMint.GetState() entry := currentState.GetNameRegEntry(name) if entry == nil { @@ -470,7 +474,7 @@ func (pipe *ErisMintPipe) GetName(name string) (*rpc_tm_types.ResultGetName, err return &rpc_tm_types.ResultGetName{entry}, nil } -func (pipe *ErisMintPipe) ListNames() (*rpc_tm_types.ResultListNames, error) { +func (pipe *erisMintPipe) ListNames() (*rpc_tm_types.ResultListNames, error) { var blockHeight int var names []*core_types.NameRegEntry currentState := pipe.erisMint.GetState() @@ -484,7 +488,7 @@ func (pipe *ErisMintPipe) ListNames() (*rpc_tm_types.ResultListNames, error) { // Memory pool // NOTE: txs must be signed -func (pipe *ErisMintPipe) BroadcastTxAsync(tx txs.Tx) ( +func (pipe *erisMintPipe) BroadcastTxAsync(tx txs.Tx) ( *rpc_tm_types.ResultBroadcastTx, error) { err := pipe.consensusEngine.BroadcastTransaction(txs.EncodeTx(tx), nil) if err != nil { @@ -493,7 +497,7 @@ func (pipe *ErisMintPipe) BroadcastTxAsync(tx txs.Tx) ( return &rpc_tm_types.ResultBroadcastTx{}, nil } -func (pipe *ErisMintPipe) BroadcastTxSync(tx txs.Tx) (*rpc_tm_types.ResultBroadcastTx, +func (pipe *erisMintPipe) BroadcastTxSync(tx txs.Tx) (*rpc_tm_types.ResultBroadcastTx, error) { responseChannel := make(chan *tmsp_types.Response, 1) err := pipe.consensusEngine.BroadcastTransaction(txs.EncodeTx(tx), @@ -539,7 +543,7 @@ func (pipe *ErisMintPipe) BroadcastTxSync(tx txs.Tx) (*rpc_tm_types.ResultBroadc // from the top of the range of blocks. // Passing 0 for maxHeight sets the upper height of the range to the current // blockchain height. -func (pipe *ErisMintPipe) BlockchainInfo(minHeight, maxHeight, +func (pipe *erisMintPipe) BlockchainInfo(minHeight, maxHeight, maxBlockLookback int) (*rpc_tm_types.ResultBlockchainInfo, error) { blockStore := pipe.blockchain.blockStore diff --git a/rpc/tendermint/client/client.go b/rpc/tendermint/client/client.go index 75731a20aa5b2cc560f85b7f434a41653dc5e015..a696389c7c348b122cabd919924f79d386d3dec6 100644 --- a/rpc/tendermint/client/client.go +++ b/rpc/tendermint/client/client.go @@ -1,7 +1,6 @@ package client import ( - "fmt" acm "github.com/eris-ltd/eris-db/account" core_types "github.com/eris-ltd/eris-db/core/types" rpc_types "github.com/eris-ltd/eris-db/rpc/tendermint/core/types" @@ -98,7 +97,6 @@ func BroadcastTx(client rpcclient.Client, receiptBytes := res.(*rpc_types.ResultBroadcastTx).Data receipt := txs.Receipt{} err = wire.ReadBinaryBytes(receiptBytes, &receipt) - fmt.Printf("rec: %#v\n", receipt) return receipt, err } diff --git a/rpc/tendermint/test/common.go b/rpc/tendermint/test/common.go index 148d9344c20e7e407faa4ddffbd75dd5196eb5a8..d9ef3230b0310592654f06e9030056f04e2ee2a3 100644 --- a/rpc/tendermint/test/common.go +++ b/rpc/tendermint/test/common.go @@ -1,7 +1,6 @@ package test import ( - "fmt" "github.com/eris-ltd/eris-db/test/fixtures" "testing" ) @@ -17,11 +16,7 @@ func TestWrapper(runner func() int) int { panic(err) } - saveNewPriv() - // start a node - - fmt.Println() ready := make(chan error) go newNode(ready) err = <-ready diff --git a/rpc/tendermint/test/shared.go b/rpc/tendermint/test/shared.go index 51f3f951effcb3cd0d198694c652210ffaab0860..13ab2570589fec8a1aee96b39e8c0b6c009fdfd7 100644 --- a/rpc/tendermint/test/shared.go +++ b/rpc/tendermint/test/shared.go @@ -75,6 +75,9 @@ func initGlobalVariables(ffs *fixtures.FileFixtures) error { return err } + // Set up priv_validator.json before we start tendermint (otherwise it will + // create its own one. + saveNewPriv() testCore, err = core.NewCore("testCore", consensusConfig, managerConfig) if err != nil { return err @@ -100,15 +103,21 @@ func makeUsers(n int) []*acm.PrivAccount { // create a new node and sleep forever func newNode(ready chan error) { - // Run the RPC server. - _, err := testCore.NewGatewayTendermint(config) - ready <- err + // TODO: we don't need to start a V0 gateway this was added for debugging, remove + serverProcess, err := testCore.NewGatewayV0(config) + if err != nil { + ready <- err + } - // Sleep forever - if err == nil { - //ch := make(chan struct{}) - //<-ch + err = serverProcess.Start() + if err != nil { + ready <- err } + + // Run the RPC servers + _, err = testCore.NewGatewayTendermint(config) + ready <- err + <-serverProcess.StopEventChannel() } func saveNewPriv() { diff --git a/rpc/tendermint/test/ws_helpers.go b/rpc/tendermint/test/ws_helpers.go index 95b5bd1604edb62a6ec1b00fa6967cd663694019..9e6bc5ae41fc44896f76fbde97ac1e12f4359d33 100644 --- a/rpc/tendermint/test/ws_helpers.go +++ b/rpc/tendermint/test/ws_helpers.go @@ -17,6 +17,10 @@ import ( "github.com/tendermint/go-wire" ) +const ( + timeoutSeconds = 5 +) + //-------------------------------------------------------------------------------- // Utilities for testing the websocket service @@ -81,7 +85,7 @@ func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeo f() // wait for an event or timeout - timeout := time.NewTimer(10 * time.Second) + timeout := time.NewTimer(timeoutSeconds * time.Second) select { case <-timeout.C: if dieOnTimeout { diff --git a/rpc/v0/json_service.go b/rpc/v0/json_service.go index 822c5e7efc8c45717eced3728aac697b0d98c2e4..1e71bdc56b4b4b7888437a5e7c6d5f606c318233 100644 --- a/rpc/v0/json_service.go +++ b/rpc/v0/json_service.go @@ -164,11 +164,11 @@ func (this *ErisDbJsonService) EventUnsubscribe(request *rpc.RPCRequest, } subId := param.SubId - result, errC := this.pipe.Events().Unsubscribe(subId) + errC := this.pipe.Events().Unsubscribe(subId) if errC != nil { return nil, rpc.INTERNAL_ERROR, errC } - return &event.EventUnsub{result}, 0, nil + return &event.EventUnsub{true}, 0, nil } // Check subscription event cache for new data. diff --git a/rpc/v0/wsService.go b/rpc/v0/wsService.go index a95262bd7c5a80844c58cb96c11457c11bb5d884..93e691dc20496e67ea0993a1d4513bbbbd0d0900 100644 --- a/rpc/v0/wsService.go +++ b/rpc/v0/wsService.go @@ -112,7 +112,7 @@ func (this *ErisDbWsService) EventSubscribe(request *rpc.RPCRequest, requester i callback := func(ret events.EventData) { this.writeResponse(subId, ret, session) } - _, errC := this.pipe.Events().Subscribe(subId, eventId, callback) + errC := this.pipe.Events().Subscribe(subId, eventId, callback) if errC != nil { return nil, rpc.INTERNAL_ERROR, errC } @@ -127,11 +127,11 @@ func (this *ErisDbWsService) EventUnsubscribe(request *rpc.RPCRequest, requester } eventId := param.EventId - result, errC := this.pipe.Events().Unsubscribe(eventId) + errC := this.pipe.Events().Unsubscribe(eventId) if errC != nil { return nil, rpc.INTERNAL_ERROR, errC } - return &event.EventUnsub{result}, 0, nil + return &event.EventUnsub{true}, 0, nil } func (this *ErisDbWsService) EventPoll(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { diff --git a/test/mock/pipe.go b/test/mock/pipe.go index 1eb3b84e850d1bba591a42ccb22bcad61f78658e..413e6e1a37f51b9b54b929cb950cfd339f240bcf 100644 --- a/test/mock/pipe.go +++ b/test/mock/pipe.go @@ -182,12 +182,12 @@ type eventer struct { testData *td.TestData } -func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) (bool, error) { - return true, nil +func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) error { + return nil } -func (this *eventer) Unsubscribe(subId string) (bool, error) { - return true, nil +func (this *eventer) Unsubscribe(subId string) error { + return nil } // NameReg