Newer
Older
Benjamin Bollen
committed
import (
"fmt"
"sync"
"time"
evts "github.com/tendermint/go-events"
Benjamin Bollen
committed
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
)
var (
reaperTimeout = 5 * time.Second
reaperThreshold = 10 * time.Second
)
type EventCache struct {
mtx *sync.Mutex
events []interface{}
ts time.Time
subId string
}
func newEventCache() *EventCache {
return &EventCache{
&sync.Mutex{},
make([]interface{}, 0),
time.Now(),
"",
}
}
func (this *EventCache) poll() []interface{} {
this.mtx.Lock()
defer this.mtx.Unlock()
var evts []interface{}
if len(this.events) > 0 {
evts = this.events
this.events = []interface{}{}
} else {
evts = []interface{}{}
}
this.ts = time.Now()
return evts
}
// Catches events that callers subscribe to and adds them to an array ready to be polled.
type EventSubscriptions struct {
mtx *sync.Mutex
Benjamin Bollen
committed
subs map[string]*EventCache
reap bool
}
func NewEventSubscriptions(eventEmitter EventEmitter) *EventSubscriptions {
Benjamin Bollen
committed
es := &EventSubscriptions{
mtx: &sync.Mutex{},
eventEmitter: eventEmitter,
subs: make(map[string]*EventCache),
reap: true,
}
go reap(es)
return es
}
func reap(es *EventSubscriptions) {
if !es.reap {
return
}
time.Sleep(reaperTimeout)
es.mtx.Lock()
defer es.mtx.Unlock()
for id, sub := range es.subs {
if time.Since(sub.ts) > reaperThreshold {
// Seems like Go is ok with this..
delete(es.subs, id)
es.eventEmitter.Unsubscribe(id)
}
}
go reap(es)
}
// Add a subscription and return the generated id. Note event dispatcher
Silas Davis
committed
// has to call func which involves acquiring a mutex lock, so might be
Benjamin Bollen
committed
// a delay - though a conflict is practically impossible, and if it does
// happen it's for an insignificant amount of time (the time it takes to
// carry out EventCache.poll() ).
func (this *EventSubscriptions) Add(eventId string) (string, error) {
subId, errSID := GenerateSubId()
Benjamin Bollen
committed
if errSID != nil {
return "", errSID
}
cache := newEventCache()
errC := this.eventEmitter.Subscribe(subId, eventId,
Benjamin Bollen
committed
cache.mtx.Lock()
defer cache.mtx.Unlock()
cache.events = append(cache.events, evt)
})
cache.subId = subId
this.mtx.Lock()
defer this.mtx.Unlock()
Benjamin Bollen
committed
this.subs[subId] = cache
if errC != nil {
return "", errC
}
return subId, nil
}
func (this *EventSubscriptions) Poll(subId string) ([]interface{}, error) {
Benjamin Bollen
committed
sub, ok := this.subs[subId]
if !ok {
return nil, fmt.Errorf("Subscription not active. ID: " + subId)
}
return sub.poll(), nil
}
func (this *EventSubscriptions) Remove(subId string) error {
Benjamin Bollen
committed
this.mtx.Lock()
defer this.mtx.Unlock()
// TODO Check this.
_, ok := this.subs[subId]
if !ok {
return fmt.Errorf("Subscription not active. ID: " + subId)
}
delete(this.subs, subId)
return nil
}