From 94669ce5639bc02bbc498c535531e32899a3b7d3 Mon Sep 17 00:00:00 2001 From: Silas Davis <silas@monax.io> Date: Fri, 11 May 2018 13:11:35 +0100 Subject: [PATCH] Cleanup some subscription code on V0 Increase reap threshold to 20 seconds Signed-off-by: Silas Davis <silas@monax.io> --- rpc/v0/json_service.go | 2 +- rpc/v0/json_service_test.go | 15 ++++++++ rpc/v0/subscriptions.go | 67 ++++++++++++++++++------------------ rpc/v0/subscriptions_test.go | 8 ++--- 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/rpc/v0/json_service.go b/rpc/v0/json_service.go index d075eedd..08459b70 100644 --- a/rpc/v0/json_service.go +++ b/rpc/v0/json_service.go @@ -27,7 +27,7 @@ import ( // EventSubscribe type EventSub struct { - SubId string `json:"sub_id"` + SubId string `json:"subId"` } // EventUnsubscribe diff --git a/rpc/v0/json_service_test.go b/rpc/v0/json_service_test.go index 86dbad4c..23ec7620 100644 --- a/rpc/v0/json_service_test.go +++ b/rpc/v0/json_service_test.go @@ -13,3 +13,18 @@ // limitations under the License. package v0 + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPollResponse(t *testing.T) { + pr := PollResponse{} + bs, err := json.Marshal(pr) + require.NoError(t, err) + fmt.Println(string(bs)) +} diff --git a/rpc/v0/subscriptions.go b/rpc/v0/subscriptions.go index 8bb91864..74fff345 100644 --- a/rpc/v0/subscriptions.go +++ b/rpc/v0/subscriptions.go @@ -25,10 +25,18 @@ import ( ) var ( - reaperTimeout = 5 * time.Second - reaperThreshold = 10 * time.Second + reaperPeriod = 5 * time.Second + reaperThreshold = 20 * time.Second ) +// Catches events that callers subscribe to and adds them to an array ready to be polled. +type Subscriptions struct { + mtx *sync.RWMutex + service *rpc.Service + subs map[string]*SubscriptionsCache + reap bool +} + type SubscriptionsCache struct { mtx *sync.Mutex events []interface{} @@ -36,6 +44,19 @@ type SubscriptionsCache struct { subId string } +func NewSubscriptions(service *rpc.Service) *Subscriptions { + es := &Subscriptions{ + mtx: &sync.RWMutex{}, + service: service, + subs: make(map[string]*SubscriptionsCache), + reap: true, + } + if es.reap { + go reap(es) + } + return es +} + func newSubscriptionsCache() *SubscriptionsCache { return &SubscriptionsCache{ &sync.Mutex{}, @@ -59,40 +80,19 @@ func (subsCache *SubscriptionsCache) poll() []interface{} { return evts } -// Catches events that callers subscribe to and adds them to an array ready to be polled. -type Subscriptions struct { - mtx *sync.RWMutex - service *rpc.Service - subs map[string]*SubscriptionsCache - reap bool -} - -func NewSubscriptions(service *rpc.Service) *Subscriptions { - es := &Subscriptions{ - mtx: &sync.RWMutex{}, - service: service, - subs: make(map[string]*SubscriptionsCache), - reap: true, - } - go reap(es) - return es -} - +// Remove old subscriptions not recently polled func reap(es *Subscriptions) { - if !es.reap { - return - } - time.Sleep(reaperTimeout) - es.mtx.Lock() - defer es.mtx.Unlock() - for id, sub := range es.subs { - if time.Since(sub.ts) > reaperThreshold { - // Seems like Go is ok with this.. - delete(es.subs, id) - es.service.Unsubscribe(context.Background(), id) + for { + time.Sleep(reaperPeriod) + for id, sub := range es.subs { + if time.Since(sub.ts) > reaperThreshold { + es.mtx.Lock() + delete(es.subs, id) + es.service.Unsubscribe(context.Background(), id) + es.mtx.Unlock() + } } } - go reap(es) } // Add a subscription and return the generated id. Note event dispatcher @@ -136,7 +136,6 @@ func (subs *Subscriptions) Poll(subId string) ([]interface{}, error) { func (subs *Subscriptions) Remove(subId string) error { subs.mtx.Lock() defer subs.mtx.Unlock() - // TODO Check this. _, ok := subs.subs[subId] if !ok { return fmt.Errorf("Subscription not active. ID: " + subId) diff --git a/rpc/v0/subscriptions_test.go b/rpc/v0/subscriptions_test.go index c3032dd6..96be266f 100644 --- a/rpc/v0/subscriptions_test.go +++ b/rpc/v0/subscriptions_test.go @@ -17,7 +17,6 @@ package v0 import ( "encoding/hex" "fmt" - "runtime" "testing" "time" @@ -31,10 +30,9 @@ var mockInterval = 20 * time.Millisecond // Test that event subscriptions can be added manually and then automatically reaped. func TestSubReaping(t *testing.T) { - runtime.GOMAXPROCS(runtime.NumCPU()) NUM_SUBS := 100 reaperThreshold = 200 * time.Millisecond - reaperTimeout = 100 * time.Millisecond + reaperPeriod = 100 * time.Millisecond mee := event.NewEmitter(logging.NewNoopLogger()) eSubs := NewSubscriptions(rpc.NewSubscribableService(mee, logging.NewNoopLogger())) @@ -78,7 +76,7 @@ func TestSubManualClose(t *testing.T) { NUM_SUBS := 100 // Keep the reaper out of this. reaperThreshold = 10000 * time.Millisecond - reaperTimeout = 10000 * time.Millisecond + reaperPeriod = 10000 * time.Millisecond mee := event.NewEmitter(logging.NewNoopLogger()) eSubs := NewSubscriptions(rpc.NewSubscribableService(mee, logging.NewNoopLogger())) @@ -125,7 +123,7 @@ func TestSubFlooding(t *testing.T) { NUM_SUBS := 100 // Keep the reaper out of this. reaperThreshold = 10000 * time.Millisecond - reaperTimeout = 10000 * time.Millisecond + reaperPeriod = 10000 * time.Millisecond // Crank it up. Now pressure is 10 times higher on each sub. mockInterval = 1 * time.Millisecond mee := event.NewEmitter(logging.NewNoopLogger()) -- GitLab