Newer
Older
// Copyright 2017 Monax Industries Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Benjamin Bollen
committed
import (
"fmt"
"sync"
"time"
)
var (
reaperTimeout = 5 * time.Second
reaperThreshold = 10 * time.Second
)
type SubscriptionsCache struct {
Benjamin Bollen
committed
mtx *sync.Mutex
events []interface{}
ts time.Time
subId string
}
func newSubscriptionsCache() *SubscriptionsCache {
return &SubscriptionsCache{
Benjamin Bollen
committed
&sync.Mutex{},
make([]interface{}, 0),
time.Now(),
"",
}
}
func (subsCache *SubscriptionsCache) poll() []interface{} {
subsCache.mtx.Lock()
defer subsCache.mtx.Unlock()
Benjamin Bollen
committed
var evts []interface{}
if len(subsCache.events) > 0 {
evts = subsCache.events
subsCache.events = []interface{}{}
Benjamin Bollen
committed
} else {
evts = []interface{}{}
}
subsCache.ts = time.Now()
Benjamin Bollen
committed
return evts
}
// Catches events that callers subscribe to and adds them to an array ready to be polled.
type Subscriptions struct {
mtx *sync.RWMutex
subscribable Subscribable
subs map[string]*SubscriptionsCache
Benjamin Bollen
committed
reap bool
}
func NewSubscriptions(subscribable Subscribable) *Subscriptions {
es := &Subscriptions{
mtx: &sync.RWMutex{},
subscribable: subscribable,
subs: make(map[string]*SubscriptionsCache),
Benjamin Bollen
committed
reap: true,
}
go reap(es)
return es
}
func reap(es *Subscriptions) {
Benjamin Bollen
committed
if !es.reap {
return
}
time.Sleep(reaperTimeout)
es.mtx.Lock()
defer es.mtx.Unlock()
for id, sub := range es.subs {
if time.Since(sub.ts) > reaperThreshold {
// Seems like Go is ok with this..
delete(es.subs, id)
es.subscribable.Unsubscribe(id)
Benjamin Bollen
committed
}
}
go reap(es)
}
// Add a subscription and return the generated id. Note event dispatcher
Silas Davis
committed
// has to call func which involves acquiring a mutex lock, so might be
Benjamin Bollen
committed
// a delay - though a conflict is practically impossible, and if it does
// happen it's for an insignificant amount of time (the time it takes to
// carry out SubscriptionsCache.poll() ).
func (subs *Subscriptions) Add(eventId string) (string, error) {
subId, errSID := GenerateSubscriptionID()
Benjamin Bollen
committed
if errSID != nil {
return "", errSID
}
cache := newSubscriptionsCache()
errC := subs.subscribable.Subscribe(subId, eventId,
func(evt AnyEventData) {
Benjamin Bollen
committed
cache.mtx.Lock()
defer cache.mtx.Unlock()
cache.events = append(cache.events, evt)
})
cache.subId = subId
subs.mtx.Lock()
defer subs.mtx.Unlock()
subs.subs[subId] = cache
Benjamin Bollen
committed
if errC != nil {
return "", errC
}
return subId, nil
}
func (subs *Subscriptions) Poll(subId string) ([]interface{}, error) {
subs.mtx.RLock()
defer subs.mtx.RUnlock()
sub, ok := subs.subs[subId]
Benjamin Bollen
committed
if !ok {
return nil, fmt.Errorf("Subscription not active. ID: " + subId)
}
return sub.poll(), nil
}
func (subs *Subscriptions) Remove(subId string) error {
subs.mtx.Lock()
defer subs.mtx.Unlock()
Benjamin Bollen
committed
// TODO Check this.
_, ok := subs.subs[subId]
Benjamin Bollen
committed
if !ok {
return fmt.Errorf("Subscription not active. ID: " + subId)
}
delete(subs.subs, subId)
Benjamin Bollen
committed
return nil
}