Newer
Older
Benjamin Bollen
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package core
import (
"fmt"
"sync"
"time"
"github.com/tendermint/go-events"
definitions "github.com/eris-ltd/eris-db/definitions"
)
var (
reaperTimeout = 5 * time.Second
reaperThreshold = 10 * time.Second
)
type EventCache struct {
mtx *sync.Mutex
events []interface{}
ts time.Time
subId string
}
func newEventCache() *EventCache {
return &EventCache{
&sync.Mutex{},
make([]interface{}, 0),
time.Now(),
"",
}
}
func (this *EventCache) poll() []interface{} {
this.mtx.Lock()
defer this.mtx.Unlock()
var evts []interface{}
if len(this.events) > 0 {
evts = this.events
this.events = []interface{}{}
} else {
evts = []interface{}{}
}
this.ts = time.Now()
return evts
}
// Catches events that callers subscribe to and adds them to an array ready to be polled.
type EventSubscriptions struct {
mtx *sync.Mutex
eventEmitter definitions.EventEmitter
subs map[string]*EventCache
reap bool
}
func NewEventSubscriptions(eventEmitter definitions.EventEmitter) *EventSubscriptions {
es := &EventSubscriptions{
mtx: &sync.Mutex{},
eventEmitter: eventEmitter,
subs: make(map[string]*EventCache),
reap: true,
}
go reap(es)
return es
}
func reap(es *EventSubscriptions) {
if !es.reap {
return
}
time.Sleep(reaperTimeout)
es.mtx.Lock()
defer es.mtx.Unlock()
for id, sub := range es.subs {
if time.Since(sub.ts) > reaperThreshold {
// Seems like Go is ok with this..
delete(es.subs, id)
es.eventEmitter.Unsubscribe(id)
}
}
go reap(es)
}
// Add a subscription and return the generated id. Note event dispatcher
// has to call func which involves aquiring a mutex lock, so might be
// a delay - though a conflict is practically impossible, and if it does
// happen it's for an insignificant amount of time (the time it takes to
// carry out EventCache.poll() ).
func (this *EventSubscriptions) add(eventId string) (string, error) {
subId, errSID := generateSubId()
if errSID != nil {
return "", errSID
}
cache := newEventCache()
_, errC := this.eventEmitter.Subscribe(subId, eventId,
func(evt events.EventData) {
cache.mtx.Lock()
defer cache.mtx.Unlock()
cache.events = append(cache.events, evt)
})
cache.subId = subId
this.subs[subId] = cache
if errC != nil {
return "", errC
}
return subId, nil
}
func (this *EventSubscriptions) poll(subId string) ([]interface{}, error) {
sub, ok := this.subs[subId]
if !ok {
return nil, fmt.Errorf("Subscription not active. ID: " + subId)
}
return sub.poll(), nil
}
func (this *EventSubscriptions) remove(subId string) error {
this.mtx.Lock()
defer this.mtx.Unlock()
// TODO Check this.
_, ok := this.subs[subId]
if !ok {
return fmt.Errorf("Subscription not active. ID: " + subId)
}
delete(this.subs, subId)
return nil
}