From 9bc7c32bf6d9f9a48cd64a3f284dfce0dd15c292 Mon Sep 17 00:00:00 2001 From: Silas Davis <silas@monax.io> Date: Mon, 8 Jan 2018 13:00:22 +0000 Subject: [PATCH] Events: restructure events, add EventData sum type in preparation for splitting up event payloads, add EventCache to drop dependency on Tendermint's EventCache. Signed-off-by: Silas Davis <silas@monax.io> --- event/cache.go | 52 +++++ event/cache_test.go | 71 ++++++ event/data.go | 136 +++++++++++ event/data_test.go | 64 +++++ event/emitter.go | 219 ++++++++++++++++++ event/events.go | 162 ------------- event/events_test.go | 78 ------- event/filters.go | 24 +- event/{event_cache.go => subscriptions.go} | 74 +++--- ...nt_cache_test.go => subscriptions_test.go} | 44 ++-- 10 files changed, 612 insertions(+), 312 deletions(-) create mode 100644 event/cache.go create mode 100644 event/cache_test.go create mode 100644 event/data.go create mode 100644 event/data_test.go create mode 100644 event/emitter.go delete mode 100644 event/events.go delete mode 100644 event/events_test.go rename event/{event_cache.go => subscriptions.go} (63%) rename event/{event_cache_test.go => subscriptions_test.go} (85%) diff --git a/event/cache.go b/event/cache.go new file mode 100644 index 00000000..7279c097 --- /dev/null +++ b/event/cache.go @@ -0,0 +1,52 @@ +package event + +// When exceeded we will trim the buffer's backing array capacity to avoid excessive +// allocation + +const maximumBufferCapacityToLengthRatio = 2 + +// An Cache buffers events for a Fireable +// All events are cached. Filtering happens on Flush +type Cache struct { + evsw Fireable + events []eventInfo +} + +var _ Fireable = &Cache{} + +// Create a new Cache with an EventSwitch as backend +func NewEventCache(evsw Fireable) *Cache { + return &Cache{ + evsw: evsw, + } +} + +// a cached event +type eventInfo struct { + event string + data AnyEventData +} + +// Cache an event to be fired upon finality. +func (evc *Cache) Fire(event string, eventData interface{}) { + // append to list (go will grow our backing array exponentially) + evc.events = append(evc.events, eventInfo{event: event, data: MapToAnyEventData(eventData)}) +} + +// Fire events by running evsw.Fire on all cached events. Blocks. +// Clears cached events +func (evc *Cache) Flush() { + for _, ei := range evc.events { + evc.evsw.Fire(ei.event, ei.data) + } + // Clear the buffer by re-slicing its length to zero + if cap(evc.events) > len(evc.events)*maximumBufferCapacityToLengthRatio { + // Trim the backing array capacity when it is more than double the length of the slice to avoid tying up memory + // after a spike in the number of events to buffer + evc.events = evc.events[:0:len(evc.events)] + } else { + // Re-slice the length to 0 to clear buffer but hang on to spare capacity in backing array that has been added + // in previous cache round + evc.events = evc.events[:0] + } +} diff --git a/event/cache_test.go b/event/cache_test.go new file mode 100644 index 00000000..713b15d6 --- /dev/null +++ b/event/cache_test.go @@ -0,0 +1,71 @@ +package event + +import ( + "testing" + + "github.com/hyperledger/burrow/logging/loggers" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventCache_Flush(t *testing.T) { + evts := NewEmitter(loggers.NewNoopInfoTraceLogger()) + evts.Subscribe("nothingness", "", func(data AnyEventData) { + // Check we are not initialising an empty buffer full of zeroed eventInfos in the Cache + require.FailNow(t, "We should never receive a message on this switch since none are fired") + }) + evc := NewEventCache(evts) + evc.Flush() + // Check after reset + evc.Flush() + fail := true + pass := false + evts.Subscribe("somethingness", "something", func(data AnyEventData) { + if fail { + require.FailNow(t, "Shouldn't see a message until flushed") + } + pass = true + }) + evc.Fire("something", AnyEventData{}) + evc.Fire("something", AnyEventData{}) + evc.Fire("something", AnyEventData{}) + fail = false + evc.Flush() + assert.True(t, pass) +} + +func TestEventCacheGrowth(t *testing.T) { + evc := NewEventCache(NewEmitter(loggers.NewNoopInfoTraceLogger())) + + fireNEvents(evc, 100) + c := cap(evc.events) + evc.Flush() + assert.Equal(t, c, cap(evc.events), "cache cap should remain the same after flushing events") + + fireNEvents(evc, c/maximumBufferCapacityToLengthRatio+1) + evc.Flush() + assert.Equal(t, c, cap(evc.events), "cache cap should remain the same after flushing more than half "+ + "the number of events as last time") + + fireNEvents(evc, c/maximumBufferCapacityToLengthRatio-1) + evc.Flush() + assert.True(t, c > cap(evc.events), "cache cap should drop after flushing fewer than half "+ + "the number of events as last time") + + fireNEvents(evc, c*2*maximumBufferCapacityToLengthRatio) + evc.Flush() + assert.True(t, c < cap(evc.events), "cache cap should grow after flushing more events than seen before") + + for numEvents := 100; numEvents >= 0; numEvents-- { + fireNEvents(evc, numEvents) + evc.Flush() + assert.True(t, cap(evc.events) <= maximumBufferCapacityToLengthRatio*numEvents, + "cap (%v) should be at most twice numEvents (%v)", cap(evc.events), numEvents) + } +} + +func fireNEvents(evc *Cache, n int) { + for i := 0; i < n; i++ { + evc.Fire("something", AnyEventData{}) + } +} diff --git a/event/data.go b/event/data.go new file mode 100644 index 00000000..199c5743 --- /dev/null +++ b/event/data.go @@ -0,0 +1,136 @@ +package event + +import ( + "fmt" + + exe_events "github.com/hyperledger/burrow/execution/events" + evm_events "github.com/hyperledger/burrow/execution/evm/events" + "github.com/tendermint/go-wire/data" + tm_types "github.com/tendermint/tendermint/types" +) + +// Oh for a real sum type + +// AnyEventData provides a single type for our multiplexed event categories of EVM events and Tendermint events +type AnyEventData struct { + TMEventData *tm_types.TMEventData `json:"tm_event_data,omitempty"` + BurrowEventData *EventData `json:"burrow_event_data,omitempty"` + Err *string `json:"error,omitempty"` +} + +type EventData struct { + EventDataInner `json:"unwrap"` +} + +type EventDataInner interface { +} + +func (ed EventData) Unwrap() EventDataInner { + return ed.EventDataInner +} + +func (ed EventData) MarshalJSON() ([]byte, error) { + return mapper.ToJSON(ed.EventDataInner) +} + +func (ed *EventData) UnmarshalJSON(data []byte) (err error) { + parsed, err := mapper.FromJSON(data) + if err == nil && parsed != nil { + ed.EventDataInner = parsed.(EventDataInner) + } + return err +} + +var mapper = data.NewMapper(EventData{}). + RegisterImplementation(exe_events.EventDataTx{}, "event_data_tx", biota()). + RegisterImplementation(evm_events.EventDataCall{}, "event_data_call", biota()). + RegisterImplementation(evm_events.EventDataLog{}, "event_data_log", biota()) + +// Get whichever element of the AnyEventData sum type that is not nil +func (aed AnyEventData) Get() interface{} { + if aed.TMEventData != nil { + return aed.TMEventData.Unwrap() + } + if aed.BurrowEventData != nil { + return aed.BurrowEventData.Unwrap() + } + if aed.Err != nil { + return *aed.Err + } + return nil +} + +// If this AnyEventData wraps an EventDataNewBlock then return a pointer to that value, else return nil +func (aed AnyEventData) EventDataNewBlock() *tm_types.EventDataNewBlock { + if aed.TMEventData != nil { + eventData, _ := aed.TMEventData.Unwrap().(tm_types.EventDataNewBlock) + return &eventData + } + return nil +} + +// If this AnyEventData wraps an EventDataLog then return a pointer to that value, else return nil +func (aed AnyEventData) EventDataLog() *evm_events.EventDataLog { + if aed.BurrowEventData != nil { + eventData, _ := aed.BurrowEventData.Unwrap().(evm_events.EventDataLog) + return &eventData + } + return nil +} + +// If this AnyEventData wraps an EventDataCall then return a pointer to that value, else return nil +func (aed AnyEventData) EventDataCall() *evm_events.EventDataCall { + if aed.BurrowEventData != nil { + eventData, _ := aed.BurrowEventData.Unwrap().(evm_events.EventDataCall) + return &eventData + } + return nil +} + +// If this AnyEventData wraps an EventDataTx then return a pointer to that value, else return nil +func (aed AnyEventData) EventDataTx() *exe_events.EventDataTx { + if aed.BurrowEventData != nil { + eventData, _ := aed.BurrowEventData.Unwrap().(exe_events.EventDataTx) + return &eventData + } + return nil +} + +func (aed AnyEventData) Error() string { + if aed.Err == nil { + return "" + } + return *aed.Err +} + +// Map any supported event data element to our AnyEventData sum type +func MapToAnyEventData(eventData interface{}) AnyEventData { + switch ed := eventData.(type) { + case AnyEventData: + return ed + + case tm_types.TMEventData: + return AnyEventData{TMEventData: &ed} + + case EventData: + return AnyEventData{BurrowEventData: &ed} + + case EventDataInner: + return AnyEventData{BurrowEventData: &EventData{ + EventDataInner: ed, + }} + + default: + errStr := fmt.Sprintf("could not map event data of type %T to AnyEventData", eventData) + return AnyEventData{Err: &errStr} + } +} + +// Type byte helper +var nextByte byte = 1 + +func biota() (b byte) { + b = nextByte + nextByte++ + return +} diff --git a/event/data_test.go b/event/data_test.go new file mode 100644 index 00000000..26187967 --- /dev/null +++ b/event/data_test.go @@ -0,0 +1,64 @@ +package event + +import ( + "encoding/json" + "testing" + + acm "github.com/hyperledger/burrow/account" + exe_events "github.com/hyperledger/burrow/execution/events" + "github.com/hyperledger/burrow/txs" + "github.com/stretchr/testify/assert" + tm_types "github.com/tendermint/tendermint/types" +) + +func TestSerialiseTMEventData(t *testing.T) { + roundTripAnyEventData(t, AnyEventData{ + TMEventData: &tm_types.TMEventData{ + TMEventDataInner: tm_types.EventDataNewBlock{ + Block: &tm_types.Block{ + LastCommit: &tm_types.Commit{}, + Header: &tm_types.Header{ + ChainID: "ChainID-ChainEgo", + }, + Data: &tm_types.Data{}, + }, + }, + }, + }) + +} + +func TestSerialiseEVMEventData(t *testing.T) { + roundTripAnyEventData(t, AnyEventData{ + BurrowEventData: &EventData{ + EventDataInner: exe_events.EventDataTx{ + Tx: &txs.CallTx{ + Address: &acm.Address{1, 2, 2, 3}, + }, + Return: []byte{1, 2, 3}, + Exception: "Exception", + }, + }, + }) +} + +func TestSerialiseError(t *testing.T) { + s := "random error" + roundTripAnyEventData(t, AnyEventData{ + Err: &s, + }) +} + +func roundTripAnyEventData(t *testing.T, aed AnyEventData) { + bs, err := json.Marshal(aed) + assert.NoError(t, err) + + aedOut := new(AnyEventData) + err = json.Unmarshal(bs, aedOut) + assert.NoError(t, err) + + bsOut, err := json.Marshal(aedOut) + assert.NoError(t, err) + assert.Equal(t, string(bs), string(bsOut)) + +} diff --git a/event/emitter.go b/event/emitter.go new file mode 100644 index 00000000..c4b8f296 --- /dev/null +++ b/event/emitter.go @@ -0,0 +1,219 @@ +// Copyright 2017 Monax Industries Limited +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package event + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + "strings" + + "github.com/hyperledger/burrow/logging" + "github.com/hyperledger/burrow/logging/structure" + logging_types "github.com/hyperledger/burrow/logging/types" + "github.com/tendermint/tmlibs/common" + go_events "github.com/tendermint/tmlibs/events" +) + +type Subscribable interface { + Subscribe(subId, event string, callback func(AnyEventData)) error + Unsubscribe(subId string) error +} + +type Fireable interface { + Fire(event string, data interface{}) +} + +type Emitter interface { + Fireable + go_events.EventSwitch + Subscribable +} + +// The events struct has methods for working with events. +type emitter struct { + // Bah, Service infects everything + *common.BaseService + eventSwitch go_events.EventSwitch + logger logging_types.InfoTraceLogger +} + +var _ Emitter = &emitter{} + +func NewEmitter(logger logging_types.InfoTraceLogger) *emitter { + return WrapEventSwitch(go_events.NewEventSwitch(), logger) +} + +func WrapEventSwitch(eventSwitch go_events.EventSwitch, logger logging_types.InfoTraceLogger) *emitter { + eventSwitch.Start() + return &emitter{ + BaseService: common.NewBaseService(nil, "BurrowEventEmitter", eventSwitch), + eventSwitch: eventSwitch, + logger: logger.With(structure.ComponentKey, "Events"), + } +} + +// Fireable +func (evts *emitter) Fire(event string, eventData interface{}) { + evts.eventSwitch.FireEvent(event, eventData) +} + +func (evts *emitter) FireEvent(event string, data go_events.EventData) { + evts.Fire(event, data) +} + +// EventSwitch +func (evts *emitter) AddListenerForEvent(listenerID, event string, cb go_events.EventCallback) { + evts.eventSwitch.AddListenerForEvent(listenerID, event, cb) +} + +func (evts *emitter) RemoveListenerForEvent(event string, listenerID string) { + evts.eventSwitch.RemoveListenerForEvent(event, listenerID) +} + +func (evts *emitter) RemoveListener(listenerID string) { + evts.eventSwitch.RemoveListener(listenerID) +} + +// Subscribe to an event. +func (evts *emitter) Subscribe(subId, event string, callback func(AnyEventData)) error { + logging.TraceMsg(evts.logger, "Subscribing to event", + structure.ScopeKey, "events.Subscribe", "subId", subId, "event", event) + evts.eventSwitch.AddListenerForEvent(subId, event, func(eventData go_events.EventData) { + if eventData == nil { + logging.TraceMsg(evts.logger, "Sent nil go-events EventData") + return + } + callback(MapToAnyEventData(eventData)) + }) + return nil +} + +// Un-subscribe from an event. +func (evts *emitter) Unsubscribe(subId string) error { + logging.TraceMsg(evts.logger, "Unsubscribing from event", + structure.ScopeKey, "events.Unsubscribe", "subId", subId) + evts.eventSwitch.RemoveListener(subId) + return nil +} + +// Provides an Emitter that wraps many underlying EventEmitters as a +// convenience for Subscribing and Unsubscribing on multiple EventEmitters at +// once +func Multiplex(events ...Emitter) *multiplexedEvents { + return &multiplexedEvents{ + BaseService: common.NewBaseService(nil, "BurrowMultiplexedEventEmitter", nil), + eventEmitters: events, + } +} + +type multiplexedEvents struct { + *common.BaseService + eventEmitters []Emitter +} + +var _ Emitter = &multiplexedEvents{} + +// Subscribe to an event. +func (multiEvents *multiplexedEvents) Subscribe(subId, event string, cb func(AnyEventData)) error { + for _, evts := range multiEvents.eventEmitters { + err := evts.Subscribe(subId, event, cb) + if err != nil { + return err + } + } + return nil +} + +func (multiEvents *multiplexedEvents) Unsubscribe(subId string) error { + for _, evts := range multiEvents.eventEmitters { + err := evts.Unsubscribe(subId) + if err != nil { + return err + } + } + return nil +} + +func (multiEvents *multiplexedEvents) Fire(event string, eventData interface{}) { + for _, evts := range multiEvents.eventEmitters { + evts.Fire(event, eventData) + } +} + +func (multiEvents *multiplexedEvents) FireEvent(event string, eventData go_events.EventData) { + multiEvents.Fire(event, eventData) +} + +// EventSwitch +func (multiEvents *multiplexedEvents) AddListenerForEvent(listenerID, event string, cb go_events.EventCallback) { + for _, evts := range multiEvents.eventEmitters { + evts.AddListenerForEvent(listenerID, event, cb) + } +} + +func (multiEvents *multiplexedEvents) RemoveListenerForEvent(event string, listenerID string) { + for _, evts := range multiEvents.eventEmitters { + evts.RemoveListenerForEvent(event, listenerID) + } +} + +func (multiEvents *multiplexedEvents) RemoveListener(listenerID string) { + for _, evts := range multiEvents.eventEmitters { + evts.RemoveListener(listenerID) + } +} + +type noOpFireable struct { +} + +func (*noOpFireable) Fire(string, interface{}) { + +} + +func NewNoOpFireable() Fireable { + return &noOpFireable{} +} + +// *********************************** Events *********************************** + +// EventSubscribe +type EventSub struct { + SubId string `json:"sub_id"` +} + +// EventUnsubscribe +type EventUnsub struct { + Result bool `json:"result"` +} + +// EventPoll +type PollResponse struct { + Events []interface{} `json:"events"` +} + +// ************************************************************************************** +// Helper function + +func GenerateSubId() (string, error) { + b := make([]byte, 32) + _, err := rand.Read(b) + if err != nil { + return "", fmt.Errorf("could not generate random bytes for a subscription"+ + " id: %v", err) + } + rStr := hex.EncodeToString(b) + return strings.ToUpper(rStr), nil +} diff --git a/event/events.go b/event/events.go deleted file mode 100644 index f93b1b64..00000000 --- a/event/events.go +++ /dev/null @@ -1,162 +0,0 @@ -// Copyright 2017 Monax Industries Limited -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package event - -import ( - "crypto/rand" - "encoding/hex" - "strings" - - "fmt" - - "github.com/hyperledger/burrow/logging" - logging_types "github.com/hyperledger/burrow/logging/types" - "github.com/hyperledger/burrow/txs" - go_events "github.com/tendermint/go-events" - tm_types "github.com/tendermint/tendermint/types" -) - -// TODO: [Silas] this is a compatibility layer between our event types and -// TODO: go-events. Our ultimate plan is to replace go-events with our own pub-sub -// TODO: code that will better allow us to manage and multiplex events from different -// TODO: subsystems - -// Oh for a sum type -// We are using this as a marker interface for the -type anyEventData interface{} - -type EventEmitter interface { - Subscribe(subId, event string, callback func(txs.EventData)) error - Unsubscribe(subId string) error -} - -func NewEvents(eventSwitch go_events.EventSwitch, logger logging_types.InfoTraceLogger) *events { - return &events{eventSwitch: eventSwitch, logger: logging.WithScope(logger, "Events")} -} - -// Provides an EventEmitter that wraps many underlying EventEmitters as a -// convenience for Subscribing and Unsubscribing on multiple EventEmitters at -// once -func Multiplex(events ...EventEmitter) *multiplexedEvents { - return &multiplexedEvents{events} -} - -// The events struct has methods for working with events. -type events struct { - eventSwitch go_events.EventSwitch - logger logging_types.InfoTraceLogger -} - -// Subscribe to an event. -func (evts *events) Subscribe(subId, event string, - callback func(txs.EventData)) error { - cb := func(evt go_events.EventData) { - eventData, err := mapToOurEventData(evt) - if err != nil { - logging.InfoMsg(evts.logger, "Failed to map go-events EventData to our EventData", - "error", err, - "event", event) - } - callback(eventData) - } - evts.eventSwitch.AddListenerForEvent(subId, event, cb) - return nil -} - -// Un-subscribe from an event. -func (evts *events) Unsubscribe(subId string) error { - evts.eventSwitch.RemoveListener(subId) - return nil -} - -type multiplexedEvents struct { - eventEmitters []EventEmitter -} - -// Subscribe to an event. -func (multiEvents *multiplexedEvents) Subscribe(subId, event string, - callback func(txs.EventData)) error { - for _, eventEmitter := range multiEvents.eventEmitters { - err := eventEmitter.Subscribe(subId, event, callback) - if err != nil { - return err - } - } - - return nil -} - -// Un-subscribe from an event. -func (multiEvents *multiplexedEvents) Unsubscribe(subId string) error { - for _, eventEmitter := range multiEvents.eventEmitters { - err := eventEmitter.Unsubscribe(subId) - if err != nil { - return err - } - } - - return nil -} - -// *********************************** Events *********************************** - -// EventSubscribe -type EventSub struct { - SubId string `json:"sub_id"` -} - -// EventUnsubscribe -type EventUnsub struct { - Result bool `json:"result"` -} - -// EventPoll -type PollResponse struct { - Events []interface{} `json:"events"` -} - -// ************************************************************************************** -// Helper function - -func GenerateSubId() (string, error) { - b := make([]byte, 32) - _, err := rand.Read(b) - if err != nil { - return "", fmt.Errorf("Could not generate random bytes for a subscription"+ - " id: %v", err) - } - rStr := hex.EncodeToString(b) - return strings.ToUpper(rStr), nil -} - -func mapToOurEventData(eventData anyEventData) (txs.EventData, error) { - // TODO: [Silas] avoid this with a better event pub-sub system of our own - // TODO: that maybe involves a registry of events - switch eventData := eventData.(type) { - case txs.EventData: - return eventData, nil - case tm_types.EventDataNewBlock: - return txs.EventDataNewBlock{ - Block: eventData.Block, - }, nil - case tm_types.EventDataNewBlockHeader: - return txs.EventDataNewBlockHeader{ - Header: eventData.Header, - }, nil - default: - return nil, fmt.Errorf("EventData not recognised as known EventData: %v", - eventData) - } -} diff --git a/event/events_test.go b/event/events_test.go deleted file mode 100644 index b9305d79..00000000 --- a/event/events_test.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2017 Monax Industries Limited -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package event - -import ( - "testing" - - "sync" - "time" - - "github.com/hyperledger/burrow/txs" - "github.com/stretchr/testify/assert" -) - -func TestMultiplexedEvents(t *testing.T) { - emitter1 := newMockEventEmitter() - emitter2 := newMockEventEmitter() - emitter12 := Multiplex(emitter1, emitter2) - - eventData1 := make(map[txs.EventData]int) - eventData2 := make(map[txs.EventData]int) - eventData12 := make(map[txs.EventData]int) - - mutex1 := &sync.Mutex{} - mutex2 := &sync.Mutex{} - mutex12 := &sync.Mutex{} - - emitter12.Subscribe("Sub12", "Event12", func(eventData txs.EventData) { - mutex12.Lock() - eventData12[eventData] = 1 - mutex12.Unlock() - }) - emitter1.Subscribe("Sub1", "Event1", func(eventData txs.EventData) { - mutex1.Lock() - eventData1[eventData] = 1 - mutex1.Unlock() - }) - emitter2.Subscribe("Sub2", "Event2", func(eventData txs.EventData) { - mutex2.Lock() - eventData2[eventData] = 1 - mutex2.Unlock() - }) - - time.Sleep(4 * mockInterval) - - err := emitter12.Unsubscribe("Sub12") - assert.NoError(t, err) - err = emitter1.Unsubscribe("Sub2") - assert.NoError(t, err) - err = emitter2.Unsubscribe("Sub2") - assert.NoError(t, err) - - mutex1.Lock() - defer mutex1.Unlock() - mutex2.Lock() - defer mutex2.Unlock() - mutex12.Lock() - defer mutex12.Unlock() - assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub1", "Event1"}: 1}, - eventData1) - assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub2", "Event2"}: 1}, - eventData2) - assert.Equal(t, map[txs.EventData]int{mockEventData{"Sub12", "Event12"}: 1}, - eventData12) - -} diff --git a/event/filters.go b/event/filters.go index 0a1c7324..1cdd1533 100644 --- a/event/filters.go +++ b/event/filters.go @@ -141,15 +141,15 @@ func (this *FilterFactory) newSingleFilter(fd *FilterData) (ConfigurableFilter, // Some standard value parsing functions. -func ParseNumberValue(value string) (int64, error) { - var val int64 +func ParseNumberValue(value string) (uint64, error) { + var val uint64 // Check for wildcards. if value == "min" { - val = math.MinInt64 + val = 0 } else if value == "max" { - val = math.MaxInt64 + val = math.MaxUint64 } else { - tv, err := strconv.ParseInt(value, 10, 64) + tv, err := strconv.ParseUint(value, 10, 64) if err != nil { return 0, fmt.Errorf("Wrong value type.") @@ -161,29 +161,29 @@ func ParseNumberValue(value string) (int64, error) { // Some standard filtering functions. -func GetRangeFilter(op, fName string) (func(a, b int64) bool, error) { +func GetRangeFilter(op, fName string) (func(a, b uint64) bool, error) { if op == "==" { - return func(a, b int64) bool { + return func(a, b uint64) bool { return a == b }, nil } else if op == "!=" { - return func(a, b int64) bool { + return func(a, b uint64) bool { return a != b }, nil } else if op == "<=" { - return func(a, b int64) bool { + return func(a, b uint64) bool { return a <= b }, nil } else if op == ">=" { - return func(a, b int64) bool { + return func(a, b uint64) bool { return a >= b }, nil } else if op == "<" { - return func(a, b int64) bool { + return func(a, b uint64) bool { return a < b }, nil } else if op == ">" { - return func(a, b int64) bool { + return func(a, b uint64) bool { return a > b }, nil } else { diff --git a/event/event_cache.go b/event/subscriptions.go similarity index 63% rename from event/event_cache.go rename to event/subscriptions.go index 9342564b..8c1baecf 100644 --- a/event/event_cache.go +++ b/event/subscriptions.go @@ -18,8 +18,6 @@ import ( "fmt" "sync" "time" - - "github.com/hyperledger/burrow/txs" ) var ( @@ -27,15 +25,15 @@ var ( reaperThreshold = 10 * time.Second ) -type EventCache struct { +type SubscriptionsCache struct { mtx *sync.Mutex events []interface{} ts time.Time subId string } -func newEventCache() *EventCache { - return &EventCache{ +func newSubscriptionsCache() *SubscriptionsCache { + return &SubscriptionsCache{ &sync.Mutex{}, make([]interface{}, 0), time.Now(), @@ -43,40 +41,40 @@ func newEventCache() *EventCache { } } -func (this *EventCache) poll() []interface{} { - this.mtx.Lock() - defer this.mtx.Unlock() +func (subsCache *SubscriptionsCache) poll() []interface{} { + subsCache.mtx.Lock() + defer subsCache.mtx.Unlock() var evts []interface{} - if len(this.events) > 0 { - evts = this.events - this.events = []interface{}{} + if len(subsCache.events) > 0 { + evts = subsCache.events + subsCache.events = []interface{}{} } else { evts = []interface{}{} } - this.ts = time.Now() + subsCache.ts = time.Now() return evts } // Catches events that callers subscribe to and adds them to an array ready to be polled. -type EventSubscriptions struct { +type Subscriptions struct { mtx *sync.RWMutex - eventEmitter EventEmitter - subs map[string]*EventCache + subscribable Subscribable + subs map[string]*SubscriptionsCache reap bool } -func NewEventSubscriptions(eventEmitter EventEmitter) *EventSubscriptions { - es := &EventSubscriptions{ +func NewSubscriptions(subscribable Subscribable) *Subscriptions { + es := &Subscriptions{ mtx: &sync.RWMutex{}, - eventEmitter: eventEmitter, - subs: make(map[string]*EventCache), + subscribable: subscribable, + subs: make(map[string]*SubscriptionsCache), reap: true, } go reap(es) return es } -func reap(es *EventSubscriptions) { +func reap(es *Subscriptions) { if !es.reap { return } @@ -87,7 +85,7 @@ func reap(es *EventSubscriptions) { if time.Since(sub.ts) > reaperThreshold { // Seems like Go is ok with this.. delete(es.subs, id) - es.eventEmitter.Unsubscribe(id) + es.subscribable.Unsubscribe(id) } } go reap(es) @@ -97,47 +95,47 @@ func reap(es *EventSubscriptions) { // has to call func which involves acquiring a mutex lock, so might be // a delay - though a conflict is practically impossible, and if it does // happen it's for an insignificant amount of time (the time it takes to -// carry out EventCache.poll() ). -func (this *EventSubscriptions) Add(eventId string) (string, error) { +// carry out SubscriptionsCache.poll() ). +func (subs *Subscriptions) Add(eventId string) (string, error) { subId, errSID := GenerateSubId() if errSID != nil { return "", errSID } - cache := newEventCache() - errC := this.eventEmitter.Subscribe(subId, eventId, - func(evt txs.EventData) { + cache := newSubscriptionsCache() + errC := subs.subscribable.Subscribe(subId, eventId, + func(evt AnyEventData) { cache.mtx.Lock() defer cache.mtx.Unlock() cache.events = append(cache.events, evt) }) cache.subId = subId - this.mtx.Lock() - defer this.mtx.Unlock() - this.subs[subId] = cache + subs.mtx.Lock() + defer subs.mtx.Unlock() + subs.subs[subId] = cache if errC != nil { return "", errC } return subId, nil } -func (this *EventSubscriptions) Poll(subId string) ([]interface{}, error) { - this.mtx.RLock() - defer this.mtx.RUnlock() - sub, ok := this.subs[subId] +func (subs *Subscriptions) Poll(subId string) ([]interface{}, error) { + subs.mtx.RLock() + defer subs.mtx.RUnlock() + sub, ok := subs.subs[subId] if !ok { return nil, fmt.Errorf("Subscription not active. ID: " + subId) } return sub.poll(), nil } -func (this *EventSubscriptions) Remove(subId string) error { - this.mtx.Lock() - defer this.mtx.Unlock() +func (subs *Subscriptions) Remove(subId string) error { + subs.mtx.Lock() + defer subs.mtx.Unlock() // TODO Check this. - _, ok := this.subs[subId] + _, ok := subs.subs[subId] if !ok { return fmt.Errorf("Subscription not active. ID: " + subId) } - delete(this.subs, subId) + delete(subs.subs, subId) return nil } diff --git a/event/event_cache_test.go b/event/subscriptions_test.go similarity index 85% rename from event/event_cache_test.go rename to event/subscriptions_test.go index 1d271b9c..c02bbb70 100644 --- a/event/event_cache_test.go +++ b/event/subscriptions_test.go @@ -18,12 +18,10 @@ import ( "encoding/hex" "fmt" "runtime" + "sync" "testing" "time" - "sync" - - "github.com/hyperledger/burrow/txs" "github.com/stretchr/testify/assert" ) @@ -32,7 +30,7 @@ var mockInterval = 20 * time.Millisecond type mockSub struct { subId string eventId string - f func(txs.EventData) + f func(AnyEventData) sdChan chan struct{} } @@ -41,10 +39,10 @@ type mockEventData struct { eventId string } -func (eventData mockEventData) AssertIsEventData() {} +func (eventData mockEventData) AssertIsEVMEventData() {} // A mock event -func newMockSub(subId, eventId string, f func(txs.EventData)) mockSub { +func newMockSub(subId, eventId string, f func(AnyEventData)) mockSub { return mockSub{subId, eventId, f, make(chan struct{})} } @@ -57,35 +55,37 @@ func newMockEventEmitter() *mockEventEmitter { return &mockEventEmitter{make(map[string]mockSub), &sync.Mutex{}} } -func (this *mockEventEmitter) Subscribe(subId, eventId string, callback func(txs.EventData)) error { - if _, ok := this.subs[subId]; ok { +func (mee *mockEventEmitter) Subscribe(subId, eventId string, callback func(AnyEventData)) error { + if _, ok := mee.subs[subId]; ok { return nil } me := newMockSub(subId, eventId, callback) - this.mutex.Lock() - this.subs[subId] = me - this.mutex.Unlock() + mee.mutex.Lock() + mee.subs[subId] = me + mee.mutex.Unlock() go func() { for { select { case <-me.sdChan: - this.mutex.Lock() - delete(this.subs, subId) - this.mutex.Unlock() + mee.mutex.Lock() + delete(mee.subs, subId) + mee.mutex.Unlock() return case <-time.After(mockInterval): - me.f(mockEventData{subId, eventId}) + me.f(AnyEventData{BurrowEventData: &EventData{ + EventDataInner: mockEventData{subId: subId, eventId: eventId}, + }}) } } }() return nil } -func (this *mockEventEmitter) Unsubscribe(subId string) error { - this.mutex.Lock() - sub, ok := this.subs[subId] - this.mutex.Unlock() +func (mee *mockEventEmitter) Unsubscribe(subId string) error { + mee.mutex.Lock() + sub, ok := mee.subs[subId] + mee.mutex.Unlock() if !ok { return nil } @@ -101,7 +101,7 @@ func TestSubReaping(t *testing.T) { reaperTimeout = 100 * time.Millisecond mee := newMockEventEmitter() - eSubs := NewEventSubscriptions(mee) + eSubs := NewSubscriptions(mee) doneChan := make(chan error) go func() { for i := 0; i < NUM_SUBS; i++ { @@ -146,7 +146,7 @@ func TestSubManualClose(t *testing.T) { reaperTimeout = 10000 * time.Millisecond mee := newMockEventEmitter() - eSubs := NewEventSubscriptions(mee) + eSubs := NewSubscriptions(mee) doneChan := make(chan error) go func() { for i := 0; i < NUM_SUBS; i++ { @@ -194,7 +194,7 @@ func TestSubFlooding(t *testing.T) { // Crank it up. Now pressure is 10 times higher on each sub. mockInterval = 1 * time.Millisecond mee := newMockEventEmitter() - eSubs := NewEventSubscriptions(mee) + eSubs := NewSubscriptions(mee) doneChan := make(chan error) go func() { for i := 0; i < NUM_SUBS; i++ { -- GitLab