Skip to content
Snippets Groups Projects
event_cache.go 2.7 KiB
Newer Older
Androlo's avatar
Androlo committed
package erisdb

import (
	"fmt"
Androlo's avatar
Androlo committed
	ep "github.com/eris-ltd/eris-db/erisdb/pipe"
Androlo's avatar
Androlo committed
	"sync"
	"time"
)

Androlo's avatar
Androlo committed
var (
	reaperTimeout   = 5 * time.Second
	reaperThreshold = 10 * time.Second
Androlo's avatar
Androlo committed
)

type EventCache struct {
	mtx    *sync.Mutex
	events []interface{}
	ts     time.Time
Androlo's avatar
Androlo committed
	subId  string
Androlo's avatar
Androlo committed
}

func newEventCache() *EventCache {
	return &EventCache{
		&sync.Mutex{},
		make([]interface{}, 0),
		time.Now(),
		"",
	}
}

func (this *EventCache) poll() []interface{} {
	this.mtx.Lock()
	defer this.mtx.Unlock()
	var evts []interface{}
	if len(this.events) > 0 {
		evts = this.events
		this.events = []interface{}{}
	} else {
Androlo's avatar
Androlo committed
		evts = []interface{}{}
Androlo's avatar
Androlo committed
	}
	this.ts = time.Now()
	return evts
}

// Catches events that callers subscribe to and adds them to an array ready to be polled.
type EventSubscriptions struct {
Androlo's avatar
Androlo committed
	mtx          *sync.Mutex
Androlo's avatar
Androlo committed
	eventEmitter ep.EventEmitter
Androlo's avatar
Androlo committed
	subs         map[string]*EventCache
	reap         bool
Androlo's avatar
Androlo committed
}

Androlo's avatar
Androlo committed
func NewEventSubscriptions(eventEmitter ep.EventEmitter) *EventSubscriptions {
Androlo's avatar
Androlo committed
	es := &EventSubscriptions{
Androlo's avatar
Androlo committed
		mtx:          &sync.Mutex{},
		eventEmitter: eventEmitter,
		subs:         make(map[string]*EventCache),
		reap:         true,
Androlo's avatar
Androlo committed
	}
	go reap(es)
	return es
}

func reap(es *EventSubscriptions) {
	if !es.reap {
		return
	}
Androlo's avatar
Androlo committed
	time.Sleep(reaperTimeout)
Androlo's avatar
Androlo committed
	es.mtx.Lock()
	defer es.mtx.Unlock()
	for id, sub := range es.subs {
Androlo's avatar
Androlo committed
		if time.Since(sub.ts) > reaperThreshold {
Androlo's avatar
Androlo committed
			// Seems like Go is ok with this..
			delete(es.subs, id)
Androlo's avatar
Androlo committed
			es.eventEmitter.Unsubscribe(id)
Androlo's avatar
Androlo committed
		}
	}
	go reap(es)
}

// Add a subscription and return the generated id. Note event dispatcher
// has to call func which involves aquiring a mutex lock, so might be
// a delay - though a conflict is practically impossible, and if it does
// happen it's for an insignificant amount of time (the time it takes to
// carry out EventCache.poll() ).
func (this *EventSubscriptions) add(eventId string) (string, error) {
	subId, errSID := generateSubId()
	if errSID != nil {
		return "", errSID
	}
	cache := newEventCache()
Androlo's avatar
Androlo committed
	_, errC := this.eventEmitter.Subscribe(subId, eventId,
Androlo's avatar
Androlo committed
		func(evt interface{}) {
			cache.mtx.Lock()
			defer cache.mtx.Unlock()
			cache.events = append(cache.events, evt)
		})
	cache.subId = subId
	this.subs[subId] = cache
	if errC != nil {
		return "", errC
	}
	return subId, nil
}

func (this *EventSubscriptions) poll(subId string) ([]interface{}, error) {
	sub, ok := this.subs[subId]
	if !ok {
		return nil, fmt.Errorf("Subscription not active. ID: " + subId)
	}
	return sub.poll(), nil
}

func (this *EventSubscriptions) remove(subId string) error {
	this.mtx.Lock()
	defer this.mtx.Unlock()
	// TODO Check this.
	_, ok := this.subs[subId]
	if !ok {
		return fmt.Errorf("Subscription not active. ID: " + subId)
	}
	delete(this.subs, subId)
	return nil
}