diff --git a/rpc/v0/json_service.go b/rpc/v0/json_service.go index d075eeddcf586832a9d62e28301c2ea57fdf8530..08459b7049a497636c9c96b819d9b618ae424927 100644 --- a/rpc/v0/json_service.go +++ b/rpc/v0/json_service.go @@ -27,7 +27,7 @@ import ( // EventSubscribe type EventSub struct { - SubId string `json:"sub_id"` + SubId string `json:"subId"` } // EventUnsubscribe diff --git a/rpc/v0/json_service_test.go b/rpc/v0/json_service_test.go index 86dbad4cf294fa493ef07190beae2680345c64ec..23ec7620908b691326cff65c5c13095ea8cedb7d 100644 --- a/rpc/v0/json_service_test.go +++ b/rpc/v0/json_service_test.go @@ -13,3 +13,18 @@ // limitations under the License. package v0 + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestPollResponse(t *testing.T) { + pr := PollResponse{} + bs, err := json.Marshal(pr) + require.NoError(t, err) + fmt.Println(string(bs)) +} diff --git a/rpc/v0/subscriptions.go b/rpc/v0/subscriptions.go index 8bb91864790cfc3d08c6d790c5da96ca0ed2a2ec..74fff345ab2df6d98307bb5fa148b221434ac05f 100644 --- a/rpc/v0/subscriptions.go +++ b/rpc/v0/subscriptions.go @@ -25,10 +25,18 @@ import ( ) var ( - reaperTimeout = 5 * time.Second - reaperThreshold = 10 * time.Second + reaperPeriod = 5 * time.Second + reaperThreshold = 20 * time.Second ) +// Catches events that callers subscribe to and adds them to an array ready to be polled. +type Subscriptions struct { + mtx *sync.RWMutex + service *rpc.Service + subs map[string]*SubscriptionsCache + reap bool +} + type SubscriptionsCache struct { mtx *sync.Mutex events []interface{} @@ -36,6 +44,19 @@ type SubscriptionsCache struct { subId string } +func NewSubscriptions(service *rpc.Service) *Subscriptions { + es := &Subscriptions{ + mtx: &sync.RWMutex{}, + service: service, + subs: make(map[string]*SubscriptionsCache), + reap: true, + } + if es.reap { + go reap(es) + } + return es +} + func newSubscriptionsCache() *SubscriptionsCache { return &SubscriptionsCache{ &sync.Mutex{}, @@ -59,40 +80,19 @@ func (subsCache *SubscriptionsCache) poll() []interface{} { return evts } -// Catches events that callers subscribe to and adds them to an array ready to be polled. -type Subscriptions struct { - mtx *sync.RWMutex - service *rpc.Service - subs map[string]*SubscriptionsCache - reap bool -} - -func NewSubscriptions(service *rpc.Service) *Subscriptions { - es := &Subscriptions{ - mtx: &sync.RWMutex{}, - service: service, - subs: make(map[string]*SubscriptionsCache), - reap: true, - } - go reap(es) - return es -} - +// Remove old subscriptions not recently polled func reap(es *Subscriptions) { - if !es.reap { - return - } - time.Sleep(reaperTimeout) - es.mtx.Lock() - defer es.mtx.Unlock() - for id, sub := range es.subs { - if time.Since(sub.ts) > reaperThreshold { - // Seems like Go is ok with this.. - delete(es.subs, id) - es.service.Unsubscribe(context.Background(), id) + for { + time.Sleep(reaperPeriod) + for id, sub := range es.subs { + if time.Since(sub.ts) > reaperThreshold { + es.mtx.Lock() + delete(es.subs, id) + es.service.Unsubscribe(context.Background(), id) + es.mtx.Unlock() + } } } - go reap(es) } // Add a subscription and return the generated id. Note event dispatcher @@ -136,7 +136,6 @@ func (subs *Subscriptions) Poll(subId string) ([]interface{}, error) { func (subs *Subscriptions) Remove(subId string) error { subs.mtx.Lock() defer subs.mtx.Unlock() - // TODO Check this. _, ok := subs.subs[subId] if !ok { return fmt.Errorf("Subscription not active. ID: " + subId) diff --git a/rpc/v0/subscriptions_test.go b/rpc/v0/subscriptions_test.go index c3032dd6f861a394a2caa3388f51c2f89b83a7dd..96be266f7032972ec8b266c262c850284929506b 100644 --- a/rpc/v0/subscriptions_test.go +++ b/rpc/v0/subscriptions_test.go @@ -17,7 +17,6 @@ package v0 import ( "encoding/hex" "fmt" - "runtime" "testing" "time" @@ -31,10 +30,9 @@ var mockInterval = 20 * time.Millisecond // Test that event subscriptions can be added manually and then automatically reaped. func TestSubReaping(t *testing.T) { - runtime.GOMAXPROCS(runtime.NumCPU()) NUM_SUBS := 100 reaperThreshold = 200 * time.Millisecond - reaperTimeout = 100 * time.Millisecond + reaperPeriod = 100 * time.Millisecond mee := event.NewEmitter(logging.NewNoopLogger()) eSubs := NewSubscriptions(rpc.NewSubscribableService(mee, logging.NewNoopLogger())) @@ -78,7 +76,7 @@ func TestSubManualClose(t *testing.T) { NUM_SUBS := 100 // Keep the reaper out of this. reaperThreshold = 10000 * time.Millisecond - reaperTimeout = 10000 * time.Millisecond + reaperPeriod = 10000 * time.Millisecond mee := event.NewEmitter(logging.NewNoopLogger()) eSubs := NewSubscriptions(rpc.NewSubscribableService(mee, logging.NewNoopLogger())) @@ -125,7 +123,7 @@ func TestSubFlooding(t *testing.T) { NUM_SUBS := 100 // Keep the reaper out of this. reaperThreshold = 10000 * time.Millisecond - reaperTimeout = 10000 * time.Millisecond + reaperPeriod = 10000 * time.Millisecond // Crank it up. Now pressure is 10 times higher on each sub. mockInterval = 1 * time.Millisecond mee := event.NewEmitter(logging.NewNoopLogger())