diff --git a/event/event_cache.go b/event/event_cache.go index 1389a8bbc117a041da6686d3b6aeeec50a849007..2db266101e5b97e86bc0d09b919a6782f1f72ed5 100644 --- a/event/event_cache.go +++ b/event/event_cache.go @@ -97,6 +97,8 @@ func (this *EventSubscriptions) Add(eventId string) (string, error) { cache.events = append(cache.events, evt) }) cache.subId = subId + this.mtx.Lock() + defer this.mtx.Unlock() this.subs[subId] = cache if errC != nil { return "", errC diff --git a/event/event_cache_test.go b/event/event_cache_test.go index 38f57a0484e455f562a76696d87e8e93d8acc3b0..9240457184c53b0d410e6c3f07b3e93d524451e1 100644 --- a/event/event_cache_test.go +++ b/event/event_cache_test.go @@ -7,7 +7,8 @@ import ( "testing" "time" - "github.com/eris-ltd/eris-db/txs" + "sync" + "github.com/stretchr/testify/assert" evts "github.com/tendermint/go-events" ) @@ -22,24 +23,33 @@ type mockSub struct { sdChan chan struct{} } +type mockEventData struct { + subId string + eventId string +} + // A mock event func newMockSub(subId, eventId string, f func(evts.EventData)) mockSub { return mockSub{subId, eventId, f, false, make(chan struct{})} } type mockEventEmitter struct { - subs map[string]mockSub + subs map[string]mockSub + mutex *sync.Mutex } func newMockEventEmitter() *mockEventEmitter { - return &mockEventEmitter{make(map[string]mockSub)} + return &mockEventEmitter{make(map[string]mockSub), &sync.Mutex{}} } -func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) (bool, error) { +func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) error { if _, ok := this.subs[subId]; ok { - return false, nil + return nil } me := newMockSub(subId, eventId, callback) + this.mutex.Lock() + this.subs[subId] = me + this.mutex.Unlock() go func() { <-me.sdChan @@ -48,24 +58,27 @@ func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evt go func() { for { if !me.shutdown { - me.f(txs.EventDataNewBlock{}) + me.f(mockEventData{subId, eventId}) } else { + this.mutex.Lock() + delete(this.subs, subId) + this.mutex.Unlock() return } time.Sleep(mockInterval) } }() - return true, nil + return nil } -func (this *mockEventEmitter) Unsubscribe(subId string) (bool, error) { +func (this *mockEventEmitter) Unsubscribe(subId string) error { sub, ok := this.subs[subId] if !ok { - return false, nil + return nil } sub.shutdown = true delete(this.subs, subId) - return true, nil + return nil } // Test that event subscriptions can be added manually and then automatically reaped. @@ -156,7 +169,6 @@ func TestSubManualClose(t *testing.T) { k++ } - assert.Len(t, mee.subs, 0) assert.Len(t, eSubs.subs, 0) t.Logf("Added %d subs that were all closed down by unsubscribing.", NUM_SUBS) } @@ -205,7 +217,6 @@ func TestSubFlooding(t *testing.T) { k++ } - assert.Len(t, mee.subs, 0) assert.Len(t, eSubs.subs, 0) t.Logf("Added %d subs that all received 1000 events each. They were all closed down by unsubscribing.", NUM_SUBS) } diff --git a/event/events.go b/event/events.go index 855a4f892a72f685b72a07503057f76c9c89d785..b776c0debb87d11f54e3a39724d2ba09bdf0586c 100644 --- a/event/events.go +++ b/event/events.go @@ -38,6 +38,9 @@ func NewEvents(eventSwitch *evts.EventSwitch) *events { return &events{eventSwitch} } +// Provides an EventEmitter that wraps many underlying EventEmitters as a +// convenience for Subscribing and Unsubscribing on multiple EventEmitters at +// once func Multiplex(events ...EventEmitter) *multiplexedEvents { return &multiplexedEvents{events} } diff --git a/event/events_test.go b/event/events_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7fdf3e7bd245ed34c8fb96a11a7e3a33c612cde0 --- /dev/null +++ b/event/events_test.go @@ -0,0 +1,60 @@ +package event + +import ( + "testing" + + "sync" + "time" + + "github.com/stretchr/testify/assert" + evts "github.com/tendermint/go-events" +) + +func TestMultiplexedEvents(t *testing.T) { + emitter1 := newMockEventEmitter() + emitter2 := newMockEventEmitter() + emitter12 := Multiplex(emitter1, emitter2) + + eventData1 := make(map[evts.EventData]int) + eventData2 := make(map[evts.EventData]int) + eventData12 := make(map[evts.EventData]int) + + mutex1 := &sync.Mutex{} + mutex2 := &sync.Mutex{} + mutex12 := &sync.Mutex{} + + emitter12.Subscribe("Sub12", "Event12", func(eventData evts.EventData) { + mutex12.Lock() + eventData12[eventData] = 1 + mutex12.Unlock() + }) + emitter1.Subscribe("Sub1", "Event1", func(eventData evts.EventData) { + mutex1.Lock() + eventData1[eventData] = 1 + mutex1.Unlock() + }) + emitter2.Subscribe("Sub2", "Event2", func(eventData evts.EventData) { + mutex2.Lock() + eventData2[eventData] = 1 + mutex2.Unlock() + }) + + time.Sleep(mockInterval) + + allEventData := make(map[evts.EventData]int) + for k, v := range eventData1 { + allEventData[k] = v + } + for k, v := range eventData2 { + allEventData[k] = v + } + + assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub1", "Event1"}: 1}, + eventData1) + assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub2", "Event2"}: 1}, + eventData2) + assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub12", "Event12"}: 1}, + eventData12) + + assert.NotEmpty(t, allEventData, "Some events should have been published") +} diff --git a/rpc/tendermint/client/client.go b/rpc/tendermint/client/client.go index 01ee5e28e6fd0b6a133af2f30888c1db61b7803f..a696389c7c348b122cabd919924f79d386d3dec6 100644 --- a/rpc/tendermint/client/client.go +++ b/rpc/tendermint/client/client.go @@ -1,7 +1,6 @@ package client import ( - "fmt" acm "github.com/eris-ltd/eris-db/account" core_types "github.com/eris-ltd/eris-db/core/types" rpc_types "github.com/eris-ltd/eris-db/rpc/tendermint/core/types" diff --git a/rpc/tendermint/test/common.go b/rpc/tendermint/test/common.go index 70ad22378b10adb608f5a4bec7ff5c69ba3a9164..d9ef3230b0310592654f06e9030056f04e2ee2a3 100644 --- a/rpc/tendermint/test/common.go +++ b/rpc/tendermint/test/common.go @@ -1,7 +1,6 @@ package test import ( - "fmt" "github.com/eris-ltd/eris-db/test/fixtures" "testing" ) diff --git a/test/mock/pipe.go b/test/mock/pipe.go index 1eb3b84e850d1bba591a42ccb22bcad61f78658e..413e6e1a37f51b9b54b929cb950cfd339f240bcf 100644 --- a/test/mock/pipe.go +++ b/test/mock/pipe.go @@ -182,12 +182,12 @@ type eventer struct { testData *td.TestData } -func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) (bool, error) { - return true, nil +func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) error { + return nil } -func (this *eventer) Unsubscribe(subId string) (bool, error) { - return true, nil +func (this *eventer) Unsubscribe(subId string) error { + return nil } // NameReg