Skip to content
Snippets Groups Projects
Unverified Commit 9bc7c32b authored by Silas Davis's avatar Silas Davis
Browse files

Events: restructure events, add EventData sum type in preparation for

splitting up event payloads, add EventCache to drop dependency on Tendermint's
EventCache.

Signed-off-by: default avatarSilas Davis <silas@monax.io>
parent 749b565f
No related branches found
No related tags found
No related merge requests found
package event
// When exceeded we will trim the buffer's backing array capacity to avoid excessive
// allocation
const maximumBufferCapacityToLengthRatio = 2
// An Cache buffers events for a Fireable
// All events are cached. Filtering happens on Flush
type Cache struct {
evsw Fireable
events []eventInfo
}
var _ Fireable = &Cache{}
// Create a new Cache with an EventSwitch as backend
func NewEventCache(evsw Fireable) *Cache {
return &Cache{
evsw: evsw,
}
}
// a cached event
type eventInfo struct {
event string
data AnyEventData
}
// Cache an event to be fired upon finality.
func (evc *Cache) Fire(event string, eventData interface{}) {
// append to list (go will grow our backing array exponentially)
evc.events = append(evc.events, eventInfo{event: event, data: MapToAnyEventData(eventData)})
}
// Fire events by running evsw.Fire on all cached events. Blocks.
// Clears cached events
func (evc *Cache) Flush() {
for _, ei := range evc.events {
evc.evsw.Fire(ei.event, ei.data)
}
// Clear the buffer by re-slicing its length to zero
if cap(evc.events) > len(evc.events)*maximumBufferCapacityToLengthRatio {
// Trim the backing array capacity when it is more than double the length of the slice to avoid tying up memory
// after a spike in the number of events to buffer
evc.events = evc.events[:0:len(evc.events)]
} else {
// Re-slice the length to 0 to clear buffer but hang on to spare capacity in backing array that has been added
// in previous cache round
evc.events = evc.events[:0]
}
}
package event
import (
"testing"
"github.com/hyperledger/burrow/logging/loggers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEventCache_Flush(t *testing.T) {
evts := NewEmitter(loggers.NewNoopInfoTraceLogger())
evts.Subscribe("nothingness", "", func(data AnyEventData) {
// Check we are not initialising an empty buffer full of zeroed eventInfos in the Cache
require.FailNow(t, "We should never receive a message on this switch since none are fired")
})
evc := NewEventCache(evts)
evc.Flush()
// Check after reset
evc.Flush()
fail := true
pass := false
evts.Subscribe("somethingness", "something", func(data AnyEventData) {
if fail {
require.FailNow(t, "Shouldn't see a message until flushed")
}
pass = true
})
evc.Fire("something", AnyEventData{})
evc.Fire("something", AnyEventData{})
evc.Fire("something", AnyEventData{})
fail = false
evc.Flush()
assert.True(t, pass)
}
func TestEventCacheGrowth(t *testing.T) {
evc := NewEventCache(NewEmitter(loggers.NewNoopInfoTraceLogger()))
fireNEvents(evc, 100)
c := cap(evc.events)
evc.Flush()
assert.Equal(t, c, cap(evc.events), "cache cap should remain the same after flushing events")
fireNEvents(evc, c/maximumBufferCapacityToLengthRatio+1)
evc.Flush()
assert.Equal(t, c, cap(evc.events), "cache cap should remain the same after flushing more than half "+
"the number of events as last time")
fireNEvents(evc, c/maximumBufferCapacityToLengthRatio-1)
evc.Flush()
assert.True(t, c > cap(evc.events), "cache cap should drop after flushing fewer than half "+
"the number of events as last time")
fireNEvents(evc, c*2*maximumBufferCapacityToLengthRatio)
evc.Flush()
assert.True(t, c < cap(evc.events), "cache cap should grow after flushing more events than seen before")
for numEvents := 100; numEvents >= 0; numEvents-- {
fireNEvents(evc, numEvents)
evc.Flush()
assert.True(t, cap(evc.events) <= maximumBufferCapacityToLengthRatio*numEvents,
"cap (%v) should be at most twice numEvents (%v)", cap(evc.events), numEvents)
}
}
func fireNEvents(evc *Cache, n int) {
for i := 0; i < n; i++ {
evc.Fire("something", AnyEventData{})
}
}
package event
import (
"fmt"
exe_events "github.com/hyperledger/burrow/execution/events"
evm_events "github.com/hyperledger/burrow/execution/evm/events"
"github.com/tendermint/go-wire/data"
tm_types "github.com/tendermint/tendermint/types"
)
// Oh for a real sum type
// AnyEventData provides a single type for our multiplexed event categories of EVM events and Tendermint events
type AnyEventData struct {
TMEventData *tm_types.TMEventData `json:"tm_event_data,omitempty"`
BurrowEventData *EventData `json:"burrow_event_data,omitempty"`
Err *string `json:"error,omitempty"`
}
type EventData struct {
EventDataInner `json:"unwrap"`
}
type EventDataInner interface {
}
func (ed EventData) Unwrap() EventDataInner {
return ed.EventDataInner
}
func (ed EventData) MarshalJSON() ([]byte, error) {
return mapper.ToJSON(ed.EventDataInner)
}
func (ed *EventData) UnmarshalJSON(data []byte) (err error) {
parsed, err := mapper.FromJSON(data)
if err == nil && parsed != nil {
ed.EventDataInner = parsed.(EventDataInner)
}
return err
}
var mapper = data.NewMapper(EventData{}).
RegisterImplementation(exe_events.EventDataTx{}, "event_data_tx", biota()).
RegisterImplementation(evm_events.EventDataCall{}, "event_data_call", biota()).
RegisterImplementation(evm_events.EventDataLog{}, "event_data_log", biota())
// Get whichever element of the AnyEventData sum type that is not nil
func (aed AnyEventData) Get() interface{} {
if aed.TMEventData != nil {
return aed.TMEventData.Unwrap()
}
if aed.BurrowEventData != nil {
return aed.BurrowEventData.Unwrap()
}
if aed.Err != nil {
return *aed.Err
}
return nil
}
// If this AnyEventData wraps an EventDataNewBlock then return a pointer to that value, else return nil
func (aed AnyEventData) EventDataNewBlock() *tm_types.EventDataNewBlock {
if aed.TMEventData != nil {
eventData, _ := aed.TMEventData.Unwrap().(tm_types.EventDataNewBlock)
return &eventData
}
return nil
}
// If this AnyEventData wraps an EventDataLog then return a pointer to that value, else return nil
func (aed AnyEventData) EventDataLog() *evm_events.EventDataLog {
if aed.BurrowEventData != nil {
eventData, _ := aed.BurrowEventData.Unwrap().(evm_events.EventDataLog)
return &eventData
}
return nil
}
// If this AnyEventData wraps an EventDataCall then return a pointer to that value, else return nil
func (aed AnyEventData) EventDataCall() *evm_events.EventDataCall {
if aed.BurrowEventData != nil {
eventData, _ := aed.BurrowEventData.Unwrap().(evm_events.EventDataCall)
return &eventData
}
return nil
}
// If this AnyEventData wraps an EventDataTx then return a pointer to that value, else return nil
func (aed AnyEventData) EventDataTx() *exe_events.EventDataTx {
if aed.BurrowEventData != nil {
eventData, _ := aed.BurrowEventData.Unwrap().(exe_events.EventDataTx)
return &eventData
}
return nil
}
func (aed AnyEventData) Error() string {
if aed.Err == nil {
return ""
}
return *aed.Err
}
// Map any supported event data element to our AnyEventData sum type
func MapToAnyEventData(eventData interface{}) AnyEventData {
switch ed := eventData.(type) {
case AnyEventData:
return ed
case tm_types.TMEventData:
return AnyEventData{TMEventData: &ed}
case EventData:
return AnyEventData{BurrowEventData: &ed}
case EventDataInner:
return AnyEventData{BurrowEventData: &EventData{
EventDataInner: ed,
}}
default:
errStr := fmt.Sprintf("could not map event data of type %T to AnyEventData", eventData)
return AnyEventData{Err: &errStr}
}
}
// Type byte helper
var nextByte byte = 1
func biota() (b byte) {
b = nextByte
nextByte++
return
}
package event
import (
"encoding/json"
"testing"
acm "github.com/hyperledger/burrow/account"
exe_events "github.com/hyperledger/burrow/execution/events"
"github.com/hyperledger/burrow/txs"
"github.com/stretchr/testify/assert"
tm_types "github.com/tendermint/tendermint/types"
)
func TestSerialiseTMEventData(t *testing.T) {
roundTripAnyEventData(t, AnyEventData{
TMEventData: &tm_types.TMEventData{
TMEventDataInner: tm_types.EventDataNewBlock{
Block: &tm_types.Block{
LastCommit: &tm_types.Commit{},
Header: &tm_types.Header{
ChainID: "ChainID-ChainEgo",
},
Data: &tm_types.Data{},
},
},
},
})
}
func TestSerialiseEVMEventData(t *testing.T) {
roundTripAnyEventData(t, AnyEventData{
BurrowEventData: &EventData{
EventDataInner: exe_events.EventDataTx{
Tx: &txs.CallTx{
Address: &acm.Address{1, 2, 2, 3},
},
Return: []byte{1, 2, 3},
Exception: "Exception",
},
},
})
}
func TestSerialiseError(t *testing.T) {
s := "random error"
roundTripAnyEventData(t, AnyEventData{
Err: &s,
})
}
func roundTripAnyEventData(t *testing.T, aed AnyEventData) {
bs, err := json.Marshal(aed)
assert.NoError(t, err)
aedOut := new(AnyEventData)
err = json.Unmarshal(bs, aedOut)
assert.NoError(t, err)
bsOut, err := json.Marshal(aedOut)
assert.NoError(t, err)
assert.Equal(t, string(bs), string(bsOut))
}
......@@ -17,99 +17,176 @@ package event
import (
"crypto/rand"
"encoding/hex"
"strings"
"fmt"
"strings"
"github.com/hyperledger/burrow/logging"
"github.com/hyperledger/burrow/logging/structure"
logging_types "github.com/hyperledger/burrow/logging/types"
"github.com/hyperledger/burrow/txs"
go_events "github.com/tendermint/go-events"
tm_types "github.com/tendermint/tendermint/types"
"github.com/tendermint/tmlibs/common"
go_events "github.com/tendermint/tmlibs/events"
)
// TODO: [Silas] this is a compatibility layer between our event types and
// TODO: go-events. Our ultimate plan is to replace go-events with our own pub-sub
// TODO: code that will better allow us to manage and multiplex events from different
// TODO: subsystems
// Oh for a sum type
// We are using this as a marker interface for the
type anyEventData interface{}
type EventEmitter interface {
Subscribe(subId, event string, callback func(txs.EventData)) error
type Subscribable interface {
Subscribe(subId, event string, callback func(AnyEventData)) error
Unsubscribe(subId string) error
}
func NewEvents(eventSwitch go_events.EventSwitch, logger logging_types.InfoTraceLogger) *events {
return &events{eventSwitch: eventSwitch, logger: logging.WithScope(logger, "Events")}
type Fireable interface {
Fire(event string, data interface{})
}
// Provides an EventEmitter that wraps many underlying EventEmitters as a
// convenience for Subscribing and Unsubscribing on multiple EventEmitters at
// once
func Multiplex(events ...EventEmitter) *multiplexedEvents {
return &multiplexedEvents{events}
type Emitter interface {
Fireable
go_events.EventSwitch
Subscribable
}
// The events struct has methods for working with events.
type events struct {
type emitter struct {
// Bah, Service infects everything
*common.BaseService
eventSwitch go_events.EventSwitch
logger logging_types.InfoTraceLogger
}
var _ Emitter = &emitter{}
func NewEmitter(logger logging_types.InfoTraceLogger) *emitter {
return WrapEventSwitch(go_events.NewEventSwitch(), logger)
}
func WrapEventSwitch(eventSwitch go_events.EventSwitch, logger logging_types.InfoTraceLogger) *emitter {
eventSwitch.Start()
return &emitter{
BaseService: common.NewBaseService(nil, "BurrowEventEmitter", eventSwitch),
eventSwitch: eventSwitch,
logger: logger.With(structure.ComponentKey, "Events"),
}
}
// Fireable
func (evts *emitter) Fire(event string, eventData interface{}) {
evts.eventSwitch.FireEvent(event, eventData)
}
func (evts *emitter) FireEvent(event string, data go_events.EventData) {
evts.Fire(event, data)
}
// EventSwitch
func (evts *emitter) AddListenerForEvent(listenerID, event string, cb go_events.EventCallback) {
evts.eventSwitch.AddListenerForEvent(listenerID, event, cb)
}
func (evts *emitter) RemoveListenerForEvent(event string, listenerID string) {
evts.eventSwitch.RemoveListenerForEvent(event, listenerID)
}
func (evts *emitter) RemoveListener(listenerID string) {
evts.eventSwitch.RemoveListener(listenerID)
}
// Subscribe to an event.
func (evts *events) Subscribe(subId, event string,
callback func(txs.EventData)) error {
cb := func(evt go_events.EventData) {
eventData, err := mapToOurEventData(evt)
if err != nil {
logging.InfoMsg(evts.logger, "Failed to map go-events EventData to our EventData",
"error", err,
"event", event)
func (evts *emitter) Subscribe(subId, event string, callback func(AnyEventData)) error {
logging.TraceMsg(evts.logger, "Subscribing to event",
structure.ScopeKey, "events.Subscribe", "subId", subId, "event", event)
evts.eventSwitch.AddListenerForEvent(subId, event, func(eventData go_events.EventData) {
if eventData == nil {
logging.TraceMsg(evts.logger, "Sent nil go-events EventData")
return
}
callback(eventData)
}
evts.eventSwitch.AddListenerForEvent(subId, event, cb)
callback(MapToAnyEventData(eventData))
})
return nil
}
// Un-subscribe from an event.
func (evts *events) Unsubscribe(subId string) error {
func (evts *emitter) Unsubscribe(subId string) error {
logging.TraceMsg(evts.logger, "Unsubscribing from event",
structure.ScopeKey, "events.Unsubscribe", "subId", subId)
evts.eventSwitch.RemoveListener(subId)
return nil
}
// Provides an Emitter that wraps many underlying EventEmitters as a
// convenience for Subscribing and Unsubscribing on multiple EventEmitters at
// once
func Multiplex(events ...Emitter) *multiplexedEvents {
return &multiplexedEvents{
BaseService: common.NewBaseService(nil, "BurrowMultiplexedEventEmitter", nil),
eventEmitters: events,
}
}
type multiplexedEvents struct {
eventEmitters []EventEmitter
*common.BaseService
eventEmitters []Emitter
}
var _ Emitter = &multiplexedEvents{}
// Subscribe to an event.
func (multiEvents *multiplexedEvents) Subscribe(subId, event string,
callback func(txs.EventData)) error {
for _, eventEmitter := range multiEvents.eventEmitters {
err := eventEmitter.Subscribe(subId, event, callback)
func (multiEvents *multiplexedEvents) Subscribe(subId, event string, cb func(AnyEventData)) error {
for _, evts := range multiEvents.eventEmitters {
err := evts.Subscribe(subId, event, cb)
if err != nil {
return err
}
}
return nil
}
// Un-subscribe from an event.
func (multiEvents *multiplexedEvents) Unsubscribe(subId string) error {
for _, eventEmitter := range multiEvents.eventEmitters {
err := eventEmitter.Unsubscribe(subId)
for _, evts := range multiEvents.eventEmitters {
err := evts.Unsubscribe(subId)
if err != nil {
return err
}
}
return nil
}
func (multiEvents *multiplexedEvents) Fire(event string, eventData interface{}) {
for _, evts := range multiEvents.eventEmitters {
evts.Fire(event, eventData)
}
}
func (multiEvents *multiplexedEvents) FireEvent(event string, eventData go_events.EventData) {
multiEvents.Fire(event, eventData)
}
// EventSwitch
func (multiEvents *multiplexedEvents) AddListenerForEvent(listenerID, event string, cb go_events.EventCallback) {
for _, evts := range multiEvents.eventEmitters {
evts.AddListenerForEvent(listenerID, event, cb)
}
}
func (multiEvents *multiplexedEvents) RemoveListenerForEvent(event string, listenerID string) {
for _, evts := range multiEvents.eventEmitters {
evts.RemoveListenerForEvent(event, listenerID)
}
}
func (multiEvents *multiplexedEvents) RemoveListener(listenerID string) {
for _, evts := range multiEvents.eventEmitters {
evts.RemoveListener(listenerID)
}
}
type noOpFireable struct {
}
func (*noOpFireable) Fire(string, interface{}) {
}
func NewNoOpFireable() Fireable {
return &noOpFireable{}
}
// *********************************** Events ***********************************
// EventSubscribe
......@@ -134,29 +211,9 @@ func GenerateSubId() (string, error) {
b := make([]byte, 32)
_, err := rand.Read(b)
if err != nil {
return "", fmt.Errorf("Could not generate random bytes for a subscription"+
return "", fmt.Errorf("could not generate random bytes for a subscription"+
" id: %v", err)
}
rStr := hex.EncodeToString(b)
return strings.ToUpper(rStr), nil
}
func mapToOurEventData(eventData anyEventData) (txs.EventData, error) {
// TODO: [Silas] avoid this with a better event pub-sub system of our own
// TODO: that maybe involves a registry of events
switch eventData := eventData.(type) {
case txs.EventData:
return eventData, nil
case tm_types.EventDataNewBlock:
return txs.EventDataNewBlock{
Block: eventData.Block,
}, nil
case tm_types.EventDataNewBlockHeader:
return txs.EventDataNewBlockHeader{
Header: eventData.Header,
}, nil
default:
return nil, fmt.Errorf("EventData not recognised as known EventData: %v",
eventData)
}
}
// Copyright 2017 Monax Industries Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"testing"
"sync"
"time"
"github.com/hyperledger/burrow/txs"
"github.com/stretchr/testify/assert"
)
func TestMultiplexedEvents(t *testing.T) {
emitter1 := newMockEventEmitter()
emitter2 := newMockEventEmitter()
emitter12 := Multiplex(emitter1, emitter2)
eventData1 := make(map[txs.EventData]int)
eventData2 := make(map[txs.EventData]int)
eventData12 := make(map[txs.EventData]int)
mutex1 := &sync.Mutex{}
mutex2 := &sync.Mutex{}
mutex12 := &sync.Mutex{}
emitter12.Subscribe("Sub12", "Event12", func(eventData txs.EventData) {
mutex12.Lock()
eventData12[eventData] = 1
mutex12.Unlock()
})
emitter1.Subscribe("Sub1", "Event1", func(eventData txs.EventData) {
mutex1.Lock()
eventData1[eventData] = 1
mutex1.Unlock()
})
emitter2.Subscribe("Sub2", "Event2", func(eventData txs.EventData) {
mutex2.Lock()
eventData2[eventData] = 1
mutex2.Unlock()
})
time.Sleep(4 * mockInterval)
err := emitter12.Unsubscribe("Sub12")
assert.NoError(t, err)
err = emitter1.Unsubscribe("Sub2")
assert.NoError(t, err)
err = emitter2.Unsubscribe("Sub2")
assert.NoError(t, err)
mutex1.Lock()
defer mutex1.Unlock()
mutex2.Lock()
defer mutex2.Unlock()
mutex12.Lock()
defer mutex12.Unlock()
assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub1", "Event1"}: 1},
eventData1)
assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub2", "Event2"}: 1},
eventData2)
assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub12", "Event12"}: 1},
eventData12)
}
......@@ -141,15 +141,15 @@ func (this *FilterFactory) newSingleFilter(fd *FilterData) (ConfigurableFilter,
// Some standard value parsing functions.
func ParseNumberValue(value string) (int64, error) {
var val int64
func ParseNumberValue(value string) (uint64, error) {
var val uint64
// Check for wildcards.
if value == "min" {
val = math.MinInt64
val = 0
} else if value == "max" {
val = math.MaxInt64
val = math.MaxUint64
} else {
tv, err := strconv.ParseInt(value, 10, 64)
tv, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return 0, fmt.Errorf("Wrong value type.")
......@@ -161,29 +161,29 @@ func ParseNumberValue(value string) (int64, error) {
// Some standard filtering functions.
func GetRangeFilter(op, fName string) (func(a, b int64) bool, error) {
func GetRangeFilter(op, fName string) (func(a, b uint64) bool, error) {
if op == "==" {
return func(a, b int64) bool {
return func(a, b uint64) bool {
return a == b
}, nil
} else if op == "!=" {
return func(a, b int64) bool {
return func(a, b uint64) bool {
return a != b
}, nil
} else if op == "<=" {
return func(a, b int64) bool {
return func(a, b uint64) bool {
return a <= b
}, nil
} else if op == ">=" {
return func(a, b int64) bool {
return func(a, b uint64) bool {
return a >= b
}, nil
} else if op == "<" {
return func(a, b int64) bool {
return func(a, b uint64) bool {
return a < b
}, nil
} else if op == ">" {
return func(a, b int64) bool {
return func(a, b uint64) bool {
return a > b
}, nil
} else {
......
......@@ -18,8 +18,6 @@ import (
"fmt"
"sync"
"time"
"github.com/hyperledger/burrow/txs"
)
var (
......@@ -27,15 +25,15 @@ var (
reaperThreshold = 10 * time.Second
)
type EventCache struct {
type SubscriptionsCache struct {
mtx *sync.Mutex
events []interface{}
ts time.Time
subId string
}
func newEventCache() *EventCache {
return &EventCache{
func newSubscriptionsCache() *SubscriptionsCache {
return &SubscriptionsCache{
&sync.Mutex{},
make([]interface{}, 0),
time.Now(),
......@@ -43,40 +41,40 @@ func newEventCache() *EventCache {
}
}
func (this *EventCache) poll() []interface{} {
this.mtx.Lock()
defer this.mtx.Unlock()
func (subsCache *SubscriptionsCache) poll() []interface{} {
subsCache.mtx.Lock()
defer subsCache.mtx.Unlock()
var evts []interface{}
if len(this.events) > 0 {
evts = this.events
this.events = []interface{}{}
if len(subsCache.events) > 0 {
evts = subsCache.events
subsCache.events = []interface{}{}
} else {
evts = []interface{}{}
}
this.ts = time.Now()
subsCache.ts = time.Now()
return evts
}
// Catches events that callers subscribe to and adds them to an array ready to be polled.
type EventSubscriptions struct {
type Subscriptions struct {
mtx *sync.RWMutex
eventEmitter EventEmitter
subs map[string]*EventCache
subscribable Subscribable
subs map[string]*SubscriptionsCache
reap bool
}
func NewEventSubscriptions(eventEmitter EventEmitter) *EventSubscriptions {
es := &EventSubscriptions{
func NewSubscriptions(subscribable Subscribable) *Subscriptions {
es := &Subscriptions{
mtx: &sync.RWMutex{},
eventEmitter: eventEmitter,
subs: make(map[string]*EventCache),
subscribable: subscribable,
subs: make(map[string]*SubscriptionsCache),
reap: true,
}
go reap(es)
return es
}
func reap(es *EventSubscriptions) {
func reap(es *Subscriptions) {
if !es.reap {
return
}
......@@ -87,7 +85,7 @@ func reap(es *EventSubscriptions) {
if time.Since(sub.ts) > reaperThreshold {
// Seems like Go is ok with this..
delete(es.subs, id)
es.eventEmitter.Unsubscribe(id)
es.subscribable.Unsubscribe(id)
}
}
go reap(es)
......@@ -97,47 +95,47 @@ func reap(es *EventSubscriptions) {
// has to call func which involves acquiring a mutex lock, so might be
// a delay - though a conflict is practically impossible, and if it does
// happen it's for an insignificant amount of time (the time it takes to
// carry out EventCache.poll() ).
func (this *EventSubscriptions) Add(eventId string) (string, error) {
// carry out SubscriptionsCache.poll() ).
func (subs *Subscriptions) Add(eventId string) (string, error) {
subId, errSID := GenerateSubId()
if errSID != nil {
return "", errSID
}
cache := newEventCache()
errC := this.eventEmitter.Subscribe(subId, eventId,
func(evt txs.EventData) {
cache := newSubscriptionsCache()
errC := subs.subscribable.Subscribe(subId, eventId,
func(evt AnyEventData) {
cache.mtx.Lock()
defer cache.mtx.Unlock()
cache.events = append(cache.events, evt)
})
cache.subId = subId
this.mtx.Lock()
defer this.mtx.Unlock()
this.subs[subId] = cache
subs.mtx.Lock()
defer subs.mtx.Unlock()
subs.subs[subId] = cache
if errC != nil {
return "", errC
}
return subId, nil
}
func (this *EventSubscriptions) Poll(subId string) ([]interface{}, error) {
this.mtx.RLock()
defer this.mtx.RUnlock()
sub, ok := this.subs[subId]
func (subs *Subscriptions) Poll(subId string) ([]interface{}, error) {
subs.mtx.RLock()
defer subs.mtx.RUnlock()
sub, ok := subs.subs[subId]
if !ok {
return nil, fmt.Errorf("Subscription not active. ID: " + subId)
}
return sub.poll(), nil
}
func (this *EventSubscriptions) Remove(subId string) error {
this.mtx.Lock()
defer this.mtx.Unlock()
func (subs *Subscriptions) Remove(subId string) error {
subs.mtx.Lock()
defer subs.mtx.Unlock()
// TODO Check this.
_, ok := this.subs[subId]
_, ok := subs.subs[subId]
if !ok {
return fmt.Errorf("Subscription not active. ID: " + subId)
}
delete(this.subs, subId)
delete(subs.subs, subId)
return nil
}
......@@ -18,12 +18,10 @@ import (
"encoding/hex"
"fmt"
"runtime"
"sync"
"testing"
"time"
"sync"
"github.com/hyperledger/burrow/txs"
"github.com/stretchr/testify/assert"
)
......@@ -32,7 +30,7 @@ var mockInterval = 20 * time.Millisecond
type mockSub struct {
subId string
eventId string
f func(txs.EventData)
f func(AnyEventData)
sdChan chan struct{}
}
......@@ -41,10 +39,10 @@ type mockEventData struct {
eventId string
}
func (eventData mockEventData) AssertIsEventData() {}
func (eventData mockEventData) AssertIsEVMEventData() {}
// A mock event
func newMockSub(subId, eventId string, f func(txs.EventData)) mockSub {
func newMockSub(subId, eventId string, f func(AnyEventData)) mockSub {
return mockSub{subId, eventId, f, make(chan struct{})}
}
......@@ -57,35 +55,37 @@ func newMockEventEmitter() *mockEventEmitter {
return &mockEventEmitter{make(map[string]mockSub), &sync.Mutex{}}
}
func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(txs.EventData)) error {
if _, ok := this.subs[subId]; ok {
func (mee *mockEventEmitter) Subscribe(subId, eventId string, callback func(AnyEventData)) error {
if _, ok := mee.subs[subId]; ok {
return nil
}
me := newMockSub(subId, eventId, callback)
this.mutex.Lock()
this.subs[subId] = me
this.mutex.Unlock()
mee.mutex.Lock()
mee.subs[subId] = me
mee.mutex.Unlock()
go func() {
for {
select {
case <-me.sdChan:
this.mutex.Lock()
delete(this.subs, subId)
this.mutex.Unlock()
mee.mutex.Lock()
delete(mee.subs, subId)
mee.mutex.Unlock()
return
case <-time.After(mockInterval):
me.f(mockEventData{subId, eventId})
me.f(AnyEventData{BurrowEventData: &EventData{
EventDataInner: mockEventData{subId: subId, eventId: eventId},
}})
}
}
}()
return nil
}
func (this *mockEventEmitter) Unsubscribe(subId string) error {
this.mutex.Lock()
sub, ok := this.subs[subId]
this.mutex.Unlock()
func (mee *mockEventEmitter) Unsubscribe(subId string) error {
mee.mutex.Lock()
sub, ok := mee.subs[subId]
mee.mutex.Unlock()
if !ok {
return nil
}
......@@ -101,7 +101,7 @@ func TestSubReaping(t *testing.T) {
reaperTimeout = 100 * time.Millisecond
mee := newMockEventEmitter()
eSubs := NewEventSubscriptions(mee)
eSubs := NewSubscriptions(mee)
doneChan := make(chan error)
go func() {
for i := 0; i < NUM_SUBS; i++ {
......@@ -146,7 +146,7 @@ func TestSubManualClose(t *testing.T) {
reaperTimeout = 10000 * time.Millisecond
mee := newMockEventEmitter()
eSubs := NewEventSubscriptions(mee)
eSubs := NewSubscriptions(mee)
doneChan := make(chan error)
go func() {
for i := 0; i < NUM_SUBS; i++ {
......@@ -194,7 +194,7 @@ func TestSubFlooding(t *testing.T) {
// Crank it up. Now pressure is 10 times higher on each sub.
mockInterval = 1 * time.Millisecond
mee := newMockEventEmitter()
eSubs := NewEventSubscriptions(mee)
eSubs := NewSubscriptions(mee)
doneChan := make(chan error)
go func() {
for i := 0; i < NUM_SUBS; i++ {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment