Skip to content
Snippets Groups Projects
subscriptions.go 3.39 KiB
Newer Older
// Copyright 2017 Monax Industries Limited
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.


import (
	"fmt"
	"sync"
	"time"
)

var (
	reaperTimeout   = 5 * time.Second
	reaperThreshold = 10 * time.Second
)

type SubscriptionsCache struct {
	mtx    *sync.Mutex
	events []interface{}
	ts     time.Time
	subId  string
}

func newSubscriptionsCache() *SubscriptionsCache {
	return &SubscriptionsCache{
func (subsCache *SubscriptionsCache) poll() []interface{} {
	subsCache.mtx.Lock()
	defer subsCache.mtx.Unlock()
	if len(subsCache.events) > 0 {
		evts = subsCache.events
		subsCache.events = []interface{}{}
	return evts
}

// Catches events that callers subscribe to and adds them to an array ready to be polled.
	subscribable Subscribable
	subs         map[string]*SubscriptionsCache
func NewSubscriptions(subscribable Subscribable) *Subscriptions {
	es := &Subscriptions{
		subscribable: subscribable,
		subs:         make(map[string]*SubscriptionsCache),
	if !es.reap {
		return
	}
	time.Sleep(reaperTimeout)
	es.mtx.Lock()
	defer es.mtx.Unlock()
	for id, sub := range es.subs {
		if time.Since(sub.ts) > reaperThreshold {
			// Seems like Go is ok with this..
			delete(es.subs, id)
		}
	}
	go reap(es)
}

// Add a subscription and return the generated id. Note event dispatcher
// has to call func which involves acquiring a mutex lock, so might be
// a delay - though a conflict is practically impossible, and if it does
// happen it's for an insignificant amount of time (the time it takes to
// carry out SubscriptionsCache.poll() ).
func (subs *Subscriptions) Add(eventId string) (string, error) {
	subId, errSID := GenerateSubscriptionID()
	cache := newSubscriptionsCache()
	errC := subs.subscribable.Subscribe(subId, eventId,
		func(evt AnyEventData) {
			cache.mtx.Lock()
			defer cache.mtx.Unlock()
			cache.events = append(cache.events, evt)
		})
	cache.subId = subId
	subs.mtx.Lock()
	defer subs.mtx.Unlock()
	subs.subs[subId] = cache
func (subs *Subscriptions) Poll(subId string) ([]interface{}, error) {
	subs.mtx.RLock()
	defer subs.mtx.RUnlock()
	sub, ok := subs.subs[subId]
	if !ok {
		return nil, fmt.Errorf("Subscription not active. ID: " + subId)
	}
	return sub.poll(), nil
}

func (subs *Subscriptions) Remove(subId string) error {
	subs.mtx.Lock()
	defer subs.mtx.Unlock()
	if !ok {
		return fmt.Errorf("Subscription not active. ID: " + subId)
	}