Skip to content
Snippets Groups Projects
Unverified Commit e61ea42a authored by Silas Davis's avatar Silas Davis
Browse files

Added test for EventEmitter multiplexing, fixed a few things

parent 4d3c5ab9
No related branches found
No related tags found
No related merge requests found
......@@ -97,6 +97,8 @@ func (this *EventSubscriptions) Add(eventId string) (string, error) {
cache.events = append(cache.events, evt)
})
cache.subId = subId
this.mtx.Lock()
defer this.mtx.Unlock()
this.subs[subId] = cache
if errC != nil {
return "", errC
......
......@@ -7,7 +7,8 @@ import (
"testing"
"time"
"github.com/eris-ltd/eris-db/txs"
"sync"
"github.com/stretchr/testify/assert"
evts "github.com/tendermint/go-events"
)
......@@ -22,24 +23,33 @@ type mockSub struct {
sdChan chan struct{}
}
type mockEventData struct {
subId string
eventId string
}
// A mock event
func newMockSub(subId, eventId string, f func(evts.EventData)) mockSub {
return mockSub{subId, eventId, f, false, make(chan struct{})}
}
type mockEventEmitter struct {
subs map[string]mockSub
subs map[string]mockSub
mutex *sync.Mutex
}
func newMockEventEmitter() *mockEventEmitter {
return &mockEventEmitter{make(map[string]mockSub)}
return &mockEventEmitter{make(map[string]mockSub), &sync.Mutex{}}
}
func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) (bool, error) {
func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) error {
if _, ok := this.subs[subId]; ok {
return false, nil
return nil
}
me := newMockSub(subId, eventId, callback)
this.mutex.Lock()
this.subs[subId] = me
this.mutex.Unlock()
go func() {
<-me.sdChan
......@@ -48,24 +58,27 @@ func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evt
go func() {
for {
if !me.shutdown {
me.f(txs.EventDataNewBlock{})
me.f(mockEventData{subId, eventId})
} else {
this.mutex.Lock()
delete(this.subs, subId)
this.mutex.Unlock()
return
}
time.Sleep(mockInterval)
}
}()
return true, nil
return nil
}
func (this *mockEventEmitter) Unsubscribe(subId string) (bool, error) {
func (this *mockEventEmitter) Unsubscribe(subId string) error {
sub, ok := this.subs[subId]
if !ok {
return false, nil
return nil
}
sub.shutdown = true
delete(this.subs, subId)
return true, nil
return nil
}
// Test that event subscriptions can be added manually and then automatically reaped.
......@@ -156,7 +169,6 @@ func TestSubManualClose(t *testing.T) {
k++
}
assert.Len(t, mee.subs, 0)
assert.Len(t, eSubs.subs, 0)
t.Logf("Added %d subs that were all closed down by unsubscribing.", NUM_SUBS)
}
......@@ -205,7 +217,6 @@ func TestSubFlooding(t *testing.T) {
k++
}
assert.Len(t, mee.subs, 0)
assert.Len(t, eSubs.subs, 0)
t.Logf("Added %d subs that all received 1000 events each. They were all closed down by unsubscribing.", NUM_SUBS)
}
......@@ -38,6 +38,9 @@ func NewEvents(eventSwitch *evts.EventSwitch) *events {
return &events{eventSwitch}
}
// Provides an EventEmitter that wraps many underlying EventEmitters as a
// convenience for Subscribing and Unsubscribing on multiple EventEmitters at
// once
func Multiplex(events ...EventEmitter) *multiplexedEvents {
return &multiplexedEvents{events}
}
......
package event
import (
"testing"
"sync"
"time"
"github.com/stretchr/testify/assert"
evts "github.com/tendermint/go-events"
)
func TestMultiplexedEvents(t *testing.T) {
emitter1 := newMockEventEmitter()
emitter2 := newMockEventEmitter()
emitter12 := Multiplex(emitter1, emitter2)
eventData1 := make(map[evts.EventData]int)
eventData2 := make(map[evts.EventData]int)
eventData12 := make(map[evts.EventData]int)
mutex1 := &sync.Mutex{}
mutex2 := &sync.Mutex{}
mutex12 := &sync.Mutex{}
emitter12.Subscribe("Sub12", "Event12", func(eventData evts.EventData) {
mutex12.Lock()
eventData12[eventData] = 1
mutex12.Unlock()
})
emitter1.Subscribe("Sub1", "Event1", func(eventData evts.EventData) {
mutex1.Lock()
eventData1[eventData] = 1
mutex1.Unlock()
})
emitter2.Subscribe("Sub2", "Event2", func(eventData evts.EventData) {
mutex2.Lock()
eventData2[eventData] = 1
mutex2.Unlock()
})
time.Sleep(mockInterval)
allEventData := make(map[evts.EventData]int)
for k, v := range eventData1 {
allEventData[k] = v
}
for k, v := range eventData2 {
allEventData[k] = v
}
assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub1", "Event1"}: 1},
eventData1)
assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub2", "Event2"}: 1},
eventData2)
assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub12", "Event12"}: 1},
eventData12)
assert.NotEmpty(t, allEventData, "Some events should have been published")
}
package client
import (
"fmt"
acm "github.com/eris-ltd/eris-db/account"
core_types "github.com/eris-ltd/eris-db/core/types"
rpc_types "github.com/eris-ltd/eris-db/rpc/tendermint/core/types"
......
package test
import (
"fmt"
"github.com/eris-ltd/eris-db/test/fixtures"
"testing"
)
......
......@@ -182,12 +182,12 @@ type eventer struct {
testData *td.TestData
}
func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) (bool, error) {
return true, nil
func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) error {
return nil
}
func (this *eventer) Unsubscribe(subId string) (bool, error) {
return true, nil
func (this *eventer) Unsubscribe(subId string) error {
return nil
}
// NameReg
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment