From e61ea42a8ff3da9eb6c5e527104aa8667b76f3b5 Mon Sep 17 00:00:00 2001
From: Silas Davis <silas@erisindustries.com>
Date: Thu, 11 Aug 2016 16:50:23 +0200
Subject: [PATCH] Added test for EventEmitter multiplexing, fixed a few things

---
 event/event_cache.go            |  2 ++
 event/event_cache_test.go       | 35 ++++++++++++-------
 event/events.go                 |  3 ++
 event/events_test.go            | 60 +++++++++++++++++++++++++++++++++
 rpc/tendermint/client/client.go |  1 -
 rpc/tendermint/test/common.go   |  1 -
 test/mock/pipe.go               |  8 ++---
 7 files changed, 92 insertions(+), 18 deletions(-)
 create mode 100644 event/events_test.go

diff --git a/event/event_cache.go b/event/event_cache.go
index 1389a8bb..2db26610 100644
--- a/event/event_cache.go
+++ b/event/event_cache.go
@@ -97,6 +97,8 @@ func (this *EventSubscriptions) Add(eventId string) (string, error) {
 			cache.events = append(cache.events, evt)
 		})
 	cache.subId = subId
+	this.mtx.Lock()
+	defer this.mtx.Unlock()
 	this.subs[subId] = cache
 	if errC != nil {
 		return "", errC
diff --git a/event/event_cache_test.go b/event/event_cache_test.go
index 38f57a04..92404571 100644
--- a/event/event_cache_test.go
+++ b/event/event_cache_test.go
@@ -7,7 +7,8 @@ import (
 	"testing"
 	"time"
 
-	"github.com/eris-ltd/eris-db/txs"
+	"sync"
+
 	"github.com/stretchr/testify/assert"
 	evts "github.com/tendermint/go-events"
 )
@@ -22,24 +23,33 @@ type mockSub struct {
 	sdChan   chan struct{}
 }
 
+type mockEventData struct {
+	subId   string
+	eventId string
+}
+
 // A mock event
 func newMockSub(subId, eventId string, f func(evts.EventData)) mockSub {
 	return mockSub{subId, eventId, f, false, make(chan struct{})}
 }
 
 type mockEventEmitter struct {
-	subs map[string]mockSub
+	subs  map[string]mockSub
+	mutex *sync.Mutex
 }
 
 func newMockEventEmitter() *mockEventEmitter {
-	return &mockEventEmitter{make(map[string]mockSub)}
+	return &mockEventEmitter{make(map[string]mockSub), &sync.Mutex{}}
 }
 
-func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) (bool, error) {
+func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evts.EventData)) error {
 	if _, ok := this.subs[subId]; ok {
-		return false, nil
+		return nil
 	}
 	me := newMockSub(subId, eventId, callback)
+	this.mutex.Lock()
+	this.subs[subId] = me
+	this.mutex.Unlock()
 
 	go func() {
 		<-me.sdChan
@@ -48,24 +58,27 @@ func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(evt
 	go func() {
 		for {
 			if !me.shutdown {
-				me.f(txs.EventDataNewBlock{})
+				me.f(mockEventData{subId, eventId})
 			} else {
+				this.mutex.Lock()
+				delete(this.subs, subId)
+				this.mutex.Unlock()
 				return
 			}
 			time.Sleep(mockInterval)
 		}
 	}()
-	return true, nil
+	return nil
 }
 
-func (this *mockEventEmitter) Unsubscribe(subId string) (bool, error) {
+func (this *mockEventEmitter) Unsubscribe(subId string) error {
 	sub, ok := this.subs[subId]
 	if !ok {
-		return false, nil
+		return nil
 	}
 	sub.shutdown = true
 	delete(this.subs, subId)
-	return true, nil
+	return nil
 }
 
 // Test that event subscriptions can be added manually and then automatically reaped.
@@ -156,7 +169,6 @@ func TestSubManualClose(t *testing.T) {
 		k++
 	}
 
-	assert.Len(t, mee.subs, 0)
 	assert.Len(t, eSubs.subs, 0)
 	t.Logf("Added %d subs that were all closed down by unsubscribing.", NUM_SUBS)
 }
@@ -205,7 +217,6 @@ func TestSubFlooding(t *testing.T) {
 		k++
 	}
 
-	assert.Len(t, mee.subs, 0)
 	assert.Len(t, eSubs.subs, 0)
 	t.Logf("Added %d subs that all received 1000 events each. They were all closed down by unsubscribing.", NUM_SUBS)
 }
diff --git a/event/events.go b/event/events.go
index 855a4f89..b776c0de 100644
--- a/event/events.go
+++ b/event/events.go
@@ -38,6 +38,9 @@ func NewEvents(eventSwitch *evts.EventSwitch) *events {
 	return &events{eventSwitch}
 }
 
+// Provides an EventEmitter that wraps many underlying EventEmitters as a
+// convenience for Subscribing and Unsubscribing on multiple EventEmitters at
+// once
 func Multiplex(events ...EventEmitter) *multiplexedEvents {
 	return &multiplexedEvents{events}
 }
diff --git a/event/events_test.go b/event/events_test.go
new file mode 100644
index 00000000..7fdf3e7b
--- /dev/null
+++ b/event/events_test.go
@@ -0,0 +1,60 @@
+package event
+
+import (
+	"testing"
+
+	"sync"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	evts "github.com/tendermint/go-events"
+)
+
+func TestMultiplexedEvents(t *testing.T) {
+	emitter1 := newMockEventEmitter()
+	emitter2 := newMockEventEmitter()
+	emitter12 := Multiplex(emitter1, emitter2)
+
+	eventData1 := make(map[evts.EventData]int)
+	eventData2 := make(map[evts.EventData]int)
+	eventData12 := make(map[evts.EventData]int)
+
+	mutex1 := &sync.Mutex{}
+	mutex2 := &sync.Mutex{}
+	mutex12 := &sync.Mutex{}
+
+	emitter12.Subscribe("Sub12", "Event12", func(eventData evts.EventData) {
+		mutex12.Lock()
+		eventData12[eventData] = 1
+		mutex12.Unlock()
+	})
+	emitter1.Subscribe("Sub1", "Event1", func(eventData evts.EventData) {
+		mutex1.Lock()
+		eventData1[eventData] = 1
+		mutex1.Unlock()
+	})
+	emitter2.Subscribe("Sub2", "Event2", func(eventData evts.EventData) {
+		mutex2.Lock()
+		eventData2[eventData] = 1
+		mutex2.Unlock()
+	})
+
+	time.Sleep(mockInterval)
+
+	allEventData := make(map[evts.EventData]int)
+	for k, v := range eventData1 {
+		allEventData[k] = v
+	}
+	for k, v := range eventData2 {
+		allEventData[k] = v
+	}
+
+	assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub1", "Event1"}: 1},
+		eventData1)
+	assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub2", "Event2"}: 1},
+		eventData2)
+	assert.Equal(t, map[evts.EventData]int{mockEventData{"Sub12", "Event12"}: 1},
+		eventData12)
+
+	assert.NotEmpty(t, allEventData, "Some events should have been published")
+}
diff --git a/rpc/tendermint/client/client.go b/rpc/tendermint/client/client.go
index 01ee5e28..a696389c 100644
--- a/rpc/tendermint/client/client.go
+++ b/rpc/tendermint/client/client.go
@@ -1,7 +1,6 @@
 package client
 
 import (
-	"fmt"
 	acm "github.com/eris-ltd/eris-db/account"
 	core_types "github.com/eris-ltd/eris-db/core/types"
 	rpc_types "github.com/eris-ltd/eris-db/rpc/tendermint/core/types"
diff --git a/rpc/tendermint/test/common.go b/rpc/tendermint/test/common.go
index 70ad2237..d9ef3230 100644
--- a/rpc/tendermint/test/common.go
+++ b/rpc/tendermint/test/common.go
@@ -1,7 +1,6 @@
 package test
 
 import (
-	"fmt"
 	"github.com/eris-ltd/eris-db/test/fixtures"
 	"testing"
 )
diff --git a/test/mock/pipe.go b/test/mock/pipe.go
index 1eb3b84e..413e6e1a 100644
--- a/test/mock/pipe.go
+++ b/test/mock/pipe.go
@@ -182,12 +182,12 @@ type eventer struct {
 	testData *td.TestData
 }
 
-func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) (bool, error) {
-	return true, nil
+func (this *eventer) Subscribe(subId, event string, callback func(evts.EventData)) error {
+	return nil
 }
 
-func (this *eventer) Unsubscribe(subId string) (bool, error) {
-	return true, nil
+func (this *eventer) Unsubscribe(subId string) error {
+	return nil
 }
 
 // NameReg
-- 
GitLab