diff --git a/event/event_cache.go b/event/event_cache.go index 99dc4f3fa98e65f9de8c9cc6fc8f98ea9632912f..9342564bc51d26c80a10dbf2fa099b81e5eb167c 100644 --- a/event/event_cache.go +++ b/event/event_cache.go @@ -59,7 +59,7 @@ func (this *EventCache) poll() []interface{} { // Catches events that callers subscribe to and adds them to an array ready to be polled. type EventSubscriptions struct { - mtx *sync.Mutex + mtx *sync.RWMutex eventEmitter EventEmitter subs map[string]*EventCache reap bool @@ -67,7 +67,7 @@ type EventSubscriptions struct { func NewEventSubscriptions(eventEmitter EventEmitter) *EventSubscriptions { es := &EventSubscriptions{ - mtx: &sync.Mutex{}, + mtx: &sync.RWMutex{}, eventEmitter: eventEmitter, subs: make(map[string]*EventCache), reap: true, @@ -121,6 +121,8 @@ func (this *EventSubscriptions) Add(eventId string) (string, error) { } func (this *EventSubscriptions) Poll(subId string) ([]interface{}, error) { + this.mtx.RLock() + defer this.mtx.RUnlock() sub, ok := this.subs[subId] if !ok { return nil, fmt.Errorf("Subscription not active. ID: " + subId) diff --git a/event/event_cache_test.go b/event/event_cache_test.go index bd1954f6a79359bba214e7c7c366b4f91a0fa73a..1d271b9cc6499b11edd45ac9f619486cbc3b8a35 100644 --- a/event/event_cache_test.go +++ b/event/event_cache_test.go @@ -27,14 +27,13 @@ import ( "github.com/stretchr/testify/assert" ) -var mockInterval = 40 * time.Millisecond +var mockInterval = 20 * time.Millisecond type mockSub struct { - subId string - eventId string - f func(txs.EventData) - shutdown bool - sdChan chan struct{} + subId string + eventId string + f func(txs.EventData) + sdChan chan struct{} } type mockEventData struct { @@ -46,7 +45,7 @@ func (eventData mockEventData) AssertIsEventData() {} // A mock event func newMockSub(subId, eventId string, f func(txs.EventData)) mockSub { - return mockSub{subId, eventId, f, false, make(chan struct{})} + return mockSub{subId, eventId, f, make(chan struct{})} } type mockEventEmitter struct { @@ -67,33 +66,30 @@ func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(txs this.subs[subId] = me this.mutex.Unlock() - go func() { - <-me.sdChan - me.shutdown = true - }() go func() { for { - if !me.shutdown { - me.f(mockEventData{subId, eventId}) - } else { + select { + case <-me.sdChan: this.mutex.Lock() delete(this.subs, subId) this.mutex.Unlock() return + case <-time.After(mockInterval): + me.f(mockEventData{subId, eventId}) } - time.Sleep(mockInterval) } }() return nil } func (this *mockEventEmitter) Unsubscribe(subId string) error { + this.mutex.Lock() sub, ok := this.subs[subId] + this.mutex.Unlock() if !ok { return nil } - sub.shutdown = true - delete(this.subs, subId) + sub.sdChan <- struct{}{} return nil } diff --git a/event/events_test.go b/event/events_test.go index a4dcf84cd2bedd1b0f01b079833661f0e385764c..9f373f46ee8d30a5c08cdd63a41e9d7c7f02fc95 100644 --- a/event/events_test.go +++ b/event/events_test.go @@ -53,16 +53,21 @@ func TestMultiplexedEvents(t *testing.T) { mutex2.Unlock() }) - time.Sleep(mockInterval) + time.Sleep(2 * mockInterval) - allEventData := make(map[txs.EventData]int) - for k, v := range eventData1 { - allEventData[k] = v - } - for k, v := range eventData2 { - allEventData[k] = v - } + err := emitter12.Unsubscribe("Sub12") + assert.NoError(t, err) + err = emitter1.Unsubscribe("Sub2") + assert.NoError(t, err) + err = emitter2.Unsubscribe("Sub2") + assert.NoError(t, err) + mutex1.Lock() + defer mutex1.Unlock() + mutex2.Lock() + defer mutex2.Unlock() + mutex12.Lock() + defer mutex12.Unlock() assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub1", "Event1"}: 1}, eventData1) assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub2", "Event2"}: 1}, @@ -70,5 +75,4 @@ func TestMultiplexedEvents(t *testing.T) { assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub12", "Event12"}: 1}, eventData12) - assert.NotEmpty(t, allEventData, "Some events should have been published") }