diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000000000000000000000000000000000000..2c9e3d3e85206588fc897ab51ad862015b2fe16b --- /dev/null +++ b/event/event.go @@ -0,0 +1,241 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + + +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +// This code is ported over into Eris-RT from Go-Ethereum and further adapted, +// with many thanks to the Ethereum foundation and the Go-Ethereum team. + +// Package event implements an event multiplexer. +package event + +import ( + "errors" + "fmt" + "reflect" + "sync" + "time" +) + +// Event is a time-tagged notification pushed to subscribers. +type Event struct { + Time time.Time + Data interface{} +} + +// Subscription is implemented by event subscriptions. +type Subscription interface { + // Chan returns a channel that carries events. + // Implementations should return the same channel + // for any subsequent calls to Chan. + Chan() <-chan *Event + + // Unsubscribe stops delivery of events to a subscription. + // The event channel is closed. + // Unsubscribe can be called more than once. + Unsubscribe() +} + +// A TypeMux dispatches events to registered receivers. Receivers can be +// registered to handle events of certain type. Any operation +// called after mux is stopped will return ErrMuxClosed. +// +// The zero value is ready to use. +type TypeMux struct { + mutex sync.RWMutex + subm map[reflect.Type][]*muxsub + stopped bool +} + +// ErrMuxClosed is returned when Posting on a closed TypeMux. +var ErrMuxClosed = errors.New("event: mux closed") + +// Subscribe creates a subscription for events of the given types. The +// subscription's channel is closed when it is unsubscribed +// or the mux is closed. +func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { + sub := newsub(mux) + mux.mutex.Lock() + defer mux.mutex.Unlock() + if mux.stopped { + // set the status to closed so that calling Unsubscribe after this + // call will short curuit + sub.closed = true + close(sub.postC) + } else { + if mux.subm == nil { + mux.subm = make(map[reflect.Type][]*muxsub) + } + for _, t := range types { + rtyp := reflect.TypeOf(t) + oldsubs := mux.subm[rtyp] + if find(oldsubs, sub) != -1 { + panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) + } + subs := make([]*muxsub, len(oldsubs)+1) + copy(subs, oldsubs) + subs[len(oldsubs)] = sub + mux.subm[rtyp] = subs + } + } + return sub +} + +// Post sends an event to all receivers registered for the given type. +// It returns ErrMuxClosed if the mux has been stopped. +func (mux *TypeMux) Post(ev interface{}) error { + event := &Event{ + Time: time.Now(), + Data: ev, + } + rtyp := reflect.TypeOf(ev) + mux.mutex.RLock() + if mux.stopped { + mux.mutex.RUnlock() + return ErrMuxClosed + } + subs := mux.subm[rtyp] + mux.mutex.RUnlock() + for _, sub := range subs { + sub.deliver(event) + } + return nil +} + +// Stop closes a mux. The mux can no longer be used. +// Future Post calls will fail with ErrMuxClosed. +// Stop blocks until all current deliveries have finished. +func (mux *TypeMux) Stop() { + mux.mutex.Lock() + for _, subs := range mux.subm { + for _, sub := range subs { + sub.closewait() + } + } + mux.subm = nil + mux.stopped = true + mux.mutex.Unlock() +} + +func (mux *TypeMux) del(s *muxsub) { + mux.mutex.Lock() + for typ, subs := range mux.subm { + if pos := find(subs, s); pos >= 0 { + if len(subs) == 1 { + delete(mux.subm, typ) + } else { + mux.subm[typ] = posdelete(subs, pos) + } + } + } + s.mux.mutex.Unlock() +} + +func find(slice []*muxsub, item *muxsub) int { + for i, v := range slice { + if v == item { + return i + } + } + return -1 +} + +func posdelete(slice []*muxsub, pos int) []*muxsub { + news := make([]*muxsub, len(slice)-1) + copy(news[:pos], slice[:pos]) + copy(news[pos:], slice[pos+1:]) + return news +} + +type muxsub struct { + mux *TypeMux + created time.Time + closeMu sync.Mutex + closing chan struct{} + closed bool + + // these two are the same channel. they are stored separately so + // postC can be set to nil without affecting the return value of + // Chan. + postMu sync.RWMutex + readC <-chan *Event + postC chan<- *Event +} + +func newsub(mux *TypeMux) *muxsub { + c := make(chan *Event) + return &muxsub{ + mux: mux, + created: time.Now(), + readC: c, + postC: c, + closing: make(chan struct{}), + } +} + +func (s *muxsub) Chan() <-chan *Event { + return s.readC +} + +func (s *muxsub) Unsubscribe() { + s.mux.del(s) + s.closewait() +} + +func (s *muxsub) closewait() { + s.closeMu.Lock() + defer s.closeMu.Unlock() + if s.closed { + return + } + close(s.closing) + s.closed = true + + s.postMu.Lock() + close(s.postC) + s.postC = nil + s.postMu.Unlock() +} + +func (s *muxsub) deliver(event *Event) { + // Short circuit delivery if stale event + if s.created.After(event.Time) { + return + } + // Otherwise deliver the event + s.postMu.RLock() + defer s.postMu.RUnlock() + + select { + case s.postC <- event: + case <-s.closing: + } +} diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 0000000000000000000000000000000000000000..178eb4c8494e8f2e4a293aee2bf524562d2493f5 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,220 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + + +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +// This code is ported over into Eris-RT from Go-Ethereum and further adapted, +// with many thanks to the Ethereum foundation and the Go-Ethereum team. + +package event + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +type testEvent int + +func TestSubCloseUnsub(t *testing.T) { + // the point of this test is **not** to panic + var mux TypeMux + mux.Stop() + sub := mux.Subscribe(int(0)) + sub.Unsubscribe() +} + +func TestSub(t *testing.T) { + mux := new(TypeMux) + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + go func() { + if err := mux.Post(testEvent(5)); err != nil { + t.Errorf("Post returned unexpected error: %v", err) + } + }() + ev := <-sub.Chan() + + if ev.Data.(testEvent) != testEvent(5) { + t.Errorf("Got %v (%T), expected event %v (%T)", + ev, ev, testEvent(5), testEvent(5)) + } +} + +func TestMuxErrorAfterStop(t *testing.T) { + mux := new(TypeMux) + mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + if _, isopen := <-sub.Chan(); isopen { + t.Errorf("subscription channel was not closed") + } + if err := mux.Post(testEvent(0)); err != ErrMuxClosed { + t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed) + } +} + +func TestUnsubscribeUnblockPost(t *testing.T) { + mux := new(TypeMux) + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + unblocked := make(chan bool) + go func() { + mux.Post(testEvent(5)) + unblocked <- true + }() + + select { + case <-unblocked: + t.Errorf("Post returned before Unsubscribe") + default: + sub.Unsubscribe() + <-unblocked + } +} + +func TestSubscribeDuplicateType(t *testing.T) { + mux := new(TypeMux) + expected := "event: duplicate type event.testEvent in Subscribe" + + defer func() { + err := recover() + if err == nil { + t.Errorf("Subscribe didn't panic for duplicate type") + } else if err != expected { + t.Errorf("panic mismatch: got %#v, expected %#v", err, expected) + } + }() + mux.Subscribe(testEvent(1), testEvent(2)) +} + +func TestMuxConcurrent(t *testing.T) { + rand.Seed(time.Now().Unix()) + mux := new(TypeMux) + defer mux.Stop() + + recv := make(chan int) + poster := func() { + for { + err := mux.Post(testEvent(0)) + if err != nil { + return + } + } + } + sub := func(i int) { + time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) + sub := mux.Subscribe(testEvent(0)) + <-sub.Chan() + sub.Unsubscribe() + recv <- i + } + + go poster() + go poster() + go poster() + nsubs := 1000 + for i := 0; i < nsubs; i++ { + go sub(i) + } + + // wait until everyone has been served + counts := make(map[int]int, nsubs) + for i := 0; i < nsubs; i++ { + counts[<-recv]++ + } + for i, count := range counts { + if count != 1 { + t.Errorf("receiver %d called %d times, expected only 1 call", i, count) + } + } +} + +func emptySubscriber(mux *TypeMux, types ...interface{}) { + s := mux.Subscribe(testEvent(0)) + go func() { + for _ = range s.Chan() { + } + }() +} + +func BenchmarkPost3(b *testing.B) { + var mux = new(TypeMux) + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } +} + +func BenchmarkPostConcurrent(b *testing.B) { + var mux = new(TypeMux) + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + var wg sync.WaitGroup + poster := func() { + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } + wg.Done() + } + wg.Add(5) + for i := 0; i < 5; i++ { + go poster() + } + wg.Wait() +} + +// for comparison +func BenchmarkChanSend(b *testing.B) { + c := make(chan interface{}) + closed := make(chan struct{}) + go func() { + for _ = range c { + } + }() + + for i := 0; i < b.N; i++ { + select { + case c <- i: + case <-closed: + } + } +} diff --git a/event/example_test.go b/event/example_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c390e4a6123611d22bff4ce6a674baa65328f1b7 --- /dev/null +++ b/event/example_test.go @@ -0,0 +1,78 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + + +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +// This code is ported over into Eris-RT from Go-Ethereum and further adapted, +// with many thanks to the Ethereum foundation and the Go-Ethereum team. + +package event + +import "fmt" + +func ExampleTypeMux() { + type someEvent struct{ I int } + type otherEvent struct{ S string } + type yetAnotherEvent struct{ X, Y int } + + var mux TypeMux + + // Start a subscriber. + done := make(chan struct{}) + sub := mux.Subscribe(someEvent{}, otherEvent{}) + go func() { + for event := range sub.Chan() { + fmt.Printf("Received: %#v\n", event.Data) + } + fmt.Println("done") + close(done) + }() + + // Post some events. + mux.Post(someEvent{5}) + mux.Post(yetAnotherEvent{X: 3, Y: 4}) + mux.Post(someEvent{6}) + mux.Post(otherEvent{"whoa"}) + + // Stop closes all subscription channels. + // The subscriber goroutine will print "done" + // and exit. + mux.Stop() + + // Wait for subscriber to return. + <-done + + // Output: + // Received: event.someEvent{I:5} + // Received: event.someEvent{I:6} + // Received: event.otherEvent{S:"whoa"} + // done +} diff --git a/event/filter/filter.go b/event/filter/filter.go new file mode 100644 index 0000000000000000000000000000000000000000..64b159e65147a846a3d7ad9cca08c0f5fc28503c --- /dev/null +++ b/event/filter/filter.go @@ -0,0 +1,115 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + + +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +// This code is ported over into Eris-RT from Go-Ethereum and further adapted, +// with many thanks to the Ethereum foundation and the Go-Ethereum team. + +// Package filter implements event filters. +package filter + +import "reflect" + +type Filter interface { + Compare(Filter) bool + Trigger(data interface{}) +} + +type FilterEvent struct { + filter Filter + data interface{} +} + +type Filters struct { + id int + watchers map[int]Filter + ch chan FilterEvent + + quit chan struct{} +} + +func New() *Filters { + return &Filters{ + ch: make(chan FilterEvent), + watchers: make(map[int]Filter), + quit: make(chan struct{}), + } +} + +func (self *Filters) Start() { + go self.loop() +} + +func (self *Filters) Stop() { + close(self.quit) +} + +func (self *Filters) Notify(filter Filter, data interface{}) { + self.ch <- FilterEvent{filter, data} +} + +func (self *Filters) Install(watcher Filter) int { + self.watchers[self.id] = watcher + self.id++ + + return self.id - 1 +} + +func (self *Filters) Uninstall(id int) { + delete(self.watchers, id) +} + +func (self *Filters) loop() { +out: + for { + select { + case <-self.quit: + break out + case event := <-self.ch: + for _, watcher := range self.watchers { + if reflect.TypeOf(watcher) == reflect.TypeOf(event.filter) { + if watcher.Compare(event.filter) { + watcher.Trigger(event.data) + } + } + } + } + } +} + +func (self *Filters) Match(a, b Filter) bool { + return reflect.TypeOf(a) == reflect.TypeOf(b) && a.Compare(b) +} + +func (self *Filters) Get(i int) Filter { + return self.watchers[i] +} diff --git a/event/filter/filter_test.go b/event/filter/filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cb1f5d2ca9d84ffa71d92764115365a77460e752 --- /dev/null +++ b/event/filter/filter_test.go @@ -0,0 +1,80 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + + +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +// This code is ported over into Eris-RT from Go-Ethereum and further adapted, +// with many thanks to the Ethereum foundation and the Go-Ethereum team. + +package filter + +import ( + "testing" + "time" +) + +// Simple test to check if baseline matching/mismatching filtering works. +func TestFilters(t *testing.T) { + fm := New() + fm.Start() + + // Register two filters to catch posted data + first := make(chan struct{}) + fm.Install(Generic{ + Str1: "hello", + Fn: func(data interface{}) { + first <- struct{}{} + }, + }) + second := make(chan struct{}) + fm.Install(Generic{ + Str1: "hello1", + Str2: "hello", + Fn: func(data interface{}) { + second <- struct{}{} + }, + }) + // Post an event that should only match the first filter + fm.Notify(Generic{Str1: "hello"}, true) + fm.Stop() + + // Ensure only the mathcing filters fire + select { + case <-first: + case <-time.After(100 * time.Millisecond): + t.Error("matching filter timed out") + } + select { + case <-second: + t.Error("mismatching filter fired") + case <-time.After(100 * time.Millisecond): + } +} diff --git a/event/filter/generic_filter.go b/event/filter/generic_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..8e421ac33398c734b514399fb178dffb085cf584 --- /dev/null +++ b/event/filter/generic_filter.go @@ -0,0 +1,68 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + + +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +// This code is ported over into Eris-RT from Go-Ethereum and further adapted, +// with many thanks to the Ethereum foundation and the Go-Ethereum team. + +package filter + +type Generic struct { + Str1, Str2, Str3 string + Data map[string]struct{} + + Fn func(data interface{}) +} + +// self = registered, f = incoming +func (self Generic) Compare(f Filter) bool { + var strMatch, dataMatch = true, true + + filter := f.(Generic) + if (len(self.Str1) > 0 && filter.Str1 != self.Str1) || + (len(self.Str2) > 0 && filter.Str2 != self.Str2) || + (len(self.Str3) > 0 && filter.Str3 != self.Str3) { + strMatch = false + } + + for k, _ := range self.Data { + if _, ok := filter.Data[k]; !ok { + return false + } + } + + return strMatch && dataMatch +} + +func (self Generic) Trigger(data interface{}) { + self.Fn(data) +}