From f632852bb6e032292b9cbadb427b03c807ddb018 Mon Sep 17 00:00:00 2001
From: Silas Davis <silas@erisindustries.com>
Date: Mon, 22 Aug 2016 12:17:50 +0100
Subject: [PATCH] Use a random subscription id as rpc/v0 does on rpc/tendermint

---
 definitions/tendermint_pipe.go         |  4 +-
 event/events.go                        |  3 +-
 manager/eris-mint/pipe.go              | 29 ++++++------
 rpc/tendermint/core/routes.go          | 10 ++---
 rpc/tendermint/core/types/responses.go |  3 ++
 rpc/tendermint/test/client_ws_test.go  | 47 ++++++++++---------
 rpc/tendermint/test/tests.go           | 62 ++++++++++++++++++++++++--
 rpc/tendermint/test/ws_helpers.go      | 42 ++++++++++++++---
 8 files changed, 148 insertions(+), 52 deletions(-)

diff --git a/definitions/tendermint_pipe.go b/definitions/tendermint_pipe.go
index 76e7a2a6..bffa94ad 100644
--- a/definitions/tendermint_pipe.go
+++ b/definitions/tendermint_pipe.go
@@ -33,9 +33,9 @@ type TendermintPipe interface {
 	// Subscribe attempts to subscribe the listener identified by listenerId to
 	// the event named event. The Event result is written to rpcResponseWriter
 	// which must be non-blocking
-	Subscribe(listenerId, event string,
+	Subscribe(event string,
 		rpcResponseWriter func(result rpc_tm_types.ErisDBResult)) (*rpc_tm_types.ResultSubscribe, error)
-	Unsubscribe(listenerId, event string) (*rpc_tm_types.ResultUnsubscribe, error)
+	Unsubscribe(subscriptionId string) (*rpc_tm_types.ResultUnsubscribe, error)
 
 	// Net
 	Status() (*rpc_tm_types.ResultStatus, error)
diff --git a/event/events.go b/event/events.go
index fab36968..08899c95 100644
--- a/event/events.go
+++ b/event/events.go
@@ -134,7 +134,8 @@ func GenerateSubId() (string, error) {
 	b := make([]byte, 32)
 	_, err := rand.Read(b)
 	if err != nil {
-		return "", err
+		return "", fmt.Errorf("Could not generate random bytes for a subscription" +
+				" id: %v", err)
 	}
 	rStr := hex.EncodeToString(b)
 	return strings.ToUpper(rStr), nil
diff --git a/manager/eris-mint/pipe.go b/manager/eris-mint/pipe.go
index 1c01c75b..3783a572 100644
--- a/manager/eris-mint/pipe.go
+++ b/manager/eris-mint/pipe.go
@@ -232,31 +232,34 @@ func (pipe *erisMintPipe) consensusAndManagerEvents() edb_event.EventEmitter {
 
 //------------------------------------------------------------------------------
 // Implement definitions.TendermintPipe for erisMintPipe
-func (pipe *erisMintPipe) Subscribe(listenerId, event string,
+func (pipe *erisMintPipe) Subscribe(event string,
 	rpcResponseWriter func(result rpc_tm_types.ErisDBResult)) (*rpc_tm_types.ResultSubscribe, error) {
-	log.WithFields(log.Fields{"listenerId": listenerId, "event": event}).
+	subscriptionId, err := edb_event.GenerateSubId()
+	if err != nil {
+		return nil, err
+	}
+
+	log.WithFields(log.Fields{"event": event, "subscriptionId": subscriptionId}).
 		Info("Subscribing to event")
 
-	pipe.consensusAndManagerEvents().Subscribe(subscriptionId(listenerId, event), event,
+	pipe.consensusAndManagerEvents().Subscribe(subscriptionId, event,
 		func(eventData txs.EventData) {
 			result := rpc_tm_types.ErisDBResult(&rpc_tm_types.ResultEvent{event,
 				txs.EventData(eventData)})
 			// NOTE: EventSwitch callbacks must be nonblocking
 			rpcResponseWriter(result)
 		})
-	return &rpc_tm_types.ResultSubscribe{}, nil
+	return &rpc_tm_types.ResultSubscribe{
+		SubscriptionId: subscriptionId,
+		Event: event,
+	}, nil
 }
 
-func (pipe *erisMintPipe) Unsubscribe(listenerId,
-	event string) (*rpc_tm_types.ResultUnsubscribe, error) {
-	log.WithFields(log.Fields{"listenerId": listenerId, "event": event}).
+func (pipe *erisMintPipe) Unsubscribe(subscriptionId string) (*rpc_tm_types.ResultUnsubscribe, error) {
+	log.WithFields(log.Fields{"subscriptionId": subscriptionId}).
 		Info("Unsubscribing from event")
-	pipe.consensusAndManagerEvents().Unsubscribe(subscriptionId(listenerId, event))
-	return &rpc_tm_types.ResultUnsubscribe{}, nil
-}
-
-func subscriptionId(listenerId, event string) string {
-	return fmt.Sprintf("%s#%s", listenerId, event)
+	pipe.consensusAndManagerEvents().Unsubscribe(subscriptionId)
+	return &rpc_tm_types.ResultUnsubscribe{SubscriptionId: subscriptionId}, nil
 }
 
 func (pipe *erisMintPipe) Status() (*rpc_tm_types.ResultStatus, error) {
diff --git a/rpc/tendermint/core/routes.go b/rpc/tendermint/core/routes.go
index 02ffadfb..7fc6b03f 100644
--- a/rpc/tendermint/core/routes.go
+++ b/rpc/tendermint/core/routes.go
@@ -25,7 +25,7 @@ type TendermintRoutes struct {
 func (tmRoutes *TendermintRoutes) GetRoutes() map[string]*rpc.RPCFunc {
 	var routes = map[string]*rpc.RPCFunc{
 		"subscribe":               rpc.NewWSRPCFunc(tmRoutes.Subscribe, "event"),
-		"unsubscribe":             rpc.NewWSRPCFunc(tmRoutes.Unsubscribe, "event"),
+		"unsubscribe":             rpc.NewWSRPCFunc(tmRoutes.Unsubscribe, "subscriptionId"),
 		"status":                  rpc.NewRPCFunc(tmRoutes.StatusResult, ""),
 		"net_info":                rpc.NewRPCFunc(tmRoutes.NetInfoResult, ""),
 		"genesis":                 rpc.NewRPCFunc(tmRoutes.GenesisResult, ""),
@@ -61,8 +61,9 @@ func (tmRoutes *TendermintRoutes) Subscribe(wsCtx rpctypes.WSRPCContext,
 	// and return it in the result. This would require clients to hang on to a
 	// subscription id if they wish to unsubscribe, but then again they can just
 	// drop their connection
-	result, err := tmRoutes.tendermintPipe.Subscribe(wsCtx.GetRemoteAddr(), event,
+	result, err := tmRoutes.tendermintPipe.Subscribe(event,
 		func(result ctypes.ErisDBResult) {
+			wsCtx.GetRemoteAddr()
 			// NOTE: EventSwitch callbacks must be nonblocking
 			wsCtx.TryWriteRPCResponse(
 				rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &result, ""))
@@ -75,9 +76,8 @@ func (tmRoutes *TendermintRoutes) Subscribe(wsCtx rpctypes.WSRPCContext,
 }
 
 func (tmRoutes *TendermintRoutes) Unsubscribe(wsCtx rpctypes.WSRPCContext,
-	event string) (ctypes.ErisDBResult, error) {
-	result, err := tmRoutes.tendermintPipe.Unsubscribe(wsCtx.GetRemoteAddr(),
-		event)
+	subscriptionId string) (ctypes.ErisDBResult, error) {
+	result, err := tmRoutes.tendermintPipe.Unsubscribe(subscriptionId)
 	if err != nil {
 		return nil, err
 	} else {
diff --git a/rpc/tendermint/core/types/responses.go b/rpc/tendermint/core/types/responses.go
index 179e26da..ed050c03 100644
--- a/rpc/tendermint/core/types/responses.go
+++ b/rpc/tendermint/core/types/responses.go
@@ -60,9 +60,12 @@ type ResultStatus struct {
 }
 
 type ResultSubscribe struct {
+	Event string `json:"event"`
+	SubscriptionId string `json:"subscription_id"`
 }
 
 type ResultUnsubscribe struct {
+	SubscriptionId string `json:"subscription_id"`
 }
 
 type ResultNetInfo struct {
diff --git a/rpc/tendermint/test/client_ws_test.go b/rpc/tendermint/test/client_ws_test.go
index 2f55ebf9..4c48c8a5 100644
--- a/rpc/tendermint/test/client_ws_test.go
+++ b/rpc/tendermint/test/client_ws_test.go
@@ -24,9 +24,9 @@ func TestWSConnect(t *testing.T) {
 func TestWSNewBlock(t *testing.T) {
 	wsc := newWSClient(t)
 	eid := txs.EventStringNewBlock()
-	subscribe(t, wsc, eid)
+	subId := subscribeAndGetSubscriptionId(t, wsc, eid)
 	defer func() {
-		unsubscribe(t, wsc, eid)
+		unsubscribe(t, wsc, subId)
 		wsc.Stop()
 	}()
 	waitForEvent(t, wsc, eid, func() {},
@@ -43,9 +43,9 @@ func TestWSBlockchainGrowth(t *testing.T) {
 	}
 	wsc := newWSClient(t)
 	eid := txs.EventStringNewBlock()
-	subscribe(t, wsc, eid)
+	subId := subscribeAndGetSubscriptionId(t, wsc, eid)
 	defer func() {
-		unsubscribe(t, wsc, eid)
+		unsubscribe(t, wsc, subId)
 		wsc.Stop()
 	}()
 	// listen for NewBlock, ensure height increases by 1
@@ -80,11 +80,11 @@ func TestWSSend(t *testing.T) {
 	wsc := newWSClient(t)
 	eidInput := txs.EventStringAccInput(user[0].Address)
 	eidOutput := txs.EventStringAccOutput(toAddr)
-	subscribe(t, wsc, eidInput)
-	subscribe(t, wsc, eidOutput)
+	subIdInput := subscribeAndGetSubscriptionId(t, wsc, eidInput)
+	subIdOutput := subscribeAndGetSubscriptionId(t, wsc, eidOutput)
 	defer func() {
-		unsubscribe(t, wsc, eidInput)
-		unsubscribe(t, wsc, eidOutput)
+		unsubscribe(t, wsc, subIdInput)
+		unsubscribe(t, wsc, subIdOutput)
 		wsc.Stop()
 	}()
 	waitForEvent(t, wsc, eidInput, func() {
@@ -103,9 +103,9 @@ func TestWSDoubleFire(t *testing.T) {
 	}
 	wsc := newWSClient(t)
 	eid := txs.EventStringAccInput(user[0].Address)
-	subscribe(t, wsc, eid)
+	subId := subscribeAndGetSubscriptionId(t, wsc, eid)
 	defer func() {
-		unsubscribe(t, wsc, eid)
+		unsubscribe(t, wsc, subId)
 		wsc.Stop()
 	}()
 	amt := int64(100)
@@ -134,9 +134,9 @@ func TestWSCallWait(t *testing.T) {
 	}
 	wsc := newWSClient(t)
 	eid1 := txs.EventStringAccInput(user[0].Address)
-	subscribe(t, wsc, eid1)
+	subId1 := subscribeAndGetSubscriptionId(t, wsc, eid1)
 	defer func() {
-		unsubscribe(t, wsc, eid1)
+		unsubscribe(t, wsc, subId1)
 		wsc.Stop()
 	}()
 	amt, gasLim, fee := int64(10000), int64(1000), int64(1000)
@@ -152,9 +152,9 @@ func TestWSCallWait(t *testing.T) {
 	// susbscribe to the new contract
 	amt = int64(10001)
 	eid2 := txs.EventStringAccOutput(contractAddr)
-	subscribe(t, wsc, eid2)
+	subId2 := subscribeAndGetSubscriptionId(t, wsc, eid2)
 	defer func() {
-		unsubscribe(t, wsc, eid2)
+		unsubscribe(t, wsc, subId2)
 	}()
 	// get the return value from a call
 	data := []byte{0x1}
@@ -182,9 +182,9 @@ func TestWSCallNoWait(t *testing.T) {
 	// susbscribe to the new contract
 	amt = int64(10001)
 	eid := txs.EventStringAccOutput(contractAddr)
-	subscribe(t, wsc, eid)
+	subId := subscribeAndGetSubscriptionId(t, wsc, eid)
 	defer func() {
-		unsubscribe(t, wsc, eid)
+		unsubscribe(t, wsc, subId)
 		wsc.Stop()
 	}()
 	// get the return value from a call
@@ -217,23 +217,28 @@ func TestWSCallCall(t *testing.T) {
 
 	// susbscribe to the new contracts
 	amt = int64(10001)
-	eid1 := txs.EventStringAccCall(contractAddr1)
-	subscribe(t, wsc, eid1)
+	eid := txs.EventStringAccCall(contractAddr1)
+	subId := subscribeAndGetSubscriptionId(t, wsc, eid)
 	defer func() {
-		unsubscribe(t, wsc, eid1)
+		unsubscribe(t, wsc, subId)
 		wsc.Stop()
 	}()
 	// call contract2, which should call contract1, and wait for ev1
 
 	// let the contract get created first
-	waitForEvent(t, wsc, eid1, func() {
+	waitForEvent(t, wsc, eid, func() {
 	}, func(eid string, b txs.EventData) (bool, error) {
 		return true, nil
 	})
 	// call it
-	waitForEvent(t, wsc, eid1, func() {
+	waitForEvent(t, wsc, eid, func() {
 		tx := makeDefaultCallTx(t, wsTyp, contractAddr2, nil, amt, gasLim, fee)
 		broadcastTx(t, wsTyp, tx)
 		*txid = txs.TxHash(chainID, tx)
 	}, unmarshalValidateCall(user[0].Address, returnVal, txid))
 }
+
+func TestSubscribe(t *testing.T) {
+	testSubscribe(t)
+}
+
diff --git a/rpc/tendermint/test/tests.go b/rpc/tendermint/test/tests.go
index f1eac5cb..454b6024 100644
--- a/rpc/tendermint/test/tests.go
+++ b/rpc/tendermint/test/tests.go
@@ -6,9 +6,12 @@ import (
 	"testing"
 
 	edbcli "github.com/eris-ltd/eris-db/rpc/tendermint/client"
+	core_types "github.com/eris-ltd/eris-db/rpc/tendermint/core/types"
 	"github.com/eris-ltd/eris-db/txs"
-
 	"github.com/stretchr/testify/assert"
+
+	"time"
+
 	tm_common "github.com/tendermint/go-common"
 	"golang.org/x/crypto/ripemd160"
 )
@@ -207,7 +210,7 @@ func testNameReg(t *testing.T, typ string) {
 	assert.Equal(t, user[0].Address, entry.Owner)
 
 	// update the data as the owner, make sure still there
-	numDesiredBlocks = int64(4)
+	numDesiredBlocks = int64(3)
 	const updatedData = "these are amongst the things I wish to bestow upon the youth of generations come: a safe supply of honey, and a better money. For what else shall they need"
 	amt = fee + numDesiredBlocks*txs.NameByteCostMultiplier*txs.NameBlockCostMultiplier*txs.NameBaseCost(name, updatedData)
 	tx = makeDefaultNameTx(t, typ, name, updatedData, amt, fee)
@@ -218,7 +221,6 @@ func testNameReg(t *testing.T, typ string) {
 	assert.Equal(t, updatedData, entry.Data)
 
 	// try to update as non owner, should fail
-	//waitBlocks(t, wsc, 4)
 	tx = txs.NewNameTxWithNonce(user[1].PubKey, name, "never mind", amt, fee,
 		getNonce(t, typ, user[1].Address)+1)
 	tx.Sign(chainID, user[1])
@@ -251,7 +253,59 @@ func asEventDataTx(t *testing.T, eventData txs.EventData) txs.EventDataTx {
 	return eventDataTx
 }
 
-func doNothing(eventId string, eventData txs.EventData) (bool, error) {
+func doNothing(_ string, _ txs.EventData) (bool, error) {
 	// And ask waitForEvent to stop waiting
 	return true, nil
 }
+
+func testSubscribe(t *testing.T) {
+	var subId string
+	wsc := newWSClient(t)
+	subscribe(t, wsc, txs.EventStringNewBlock())
+
+	timeout := time.NewTimer(timeoutSeconds * time.Second)
+Subscribe:
+	for {
+		select {
+		case <-timeout.C:
+			t.Fatal("Timed out waiting for subscription result")
+
+		case bs := <-wsc.ResultsCh:
+			resultSubscribe, ok := readResult(t, bs).(*core_types.ResultSubscribe)
+			if ok {
+				assert.Equal(t, txs.EventStringNewBlock(), resultSubscribe.Event)
+				subId = resultSubscribe.SubscriptionId
+				break Subscribe
+			}
+		}
+	}
+
+	seenBlock := false
+	timeout = time.NewTimer(timeoutSeconds * time.Second)
+	for {
+		select {
+		case <-timeout.C:
+			if !seenBlock {
+				t.Fatal("Timed out without seeing a NewBlock event")
+			}
+			return
+
+		case bs := <-wsc.ResultsCh:
+			resultEvent, ok := readResult(t, bs).(*core_types.ResultEvent)
+			if ok {
+				_, ok := resultEvent.Data.(txs.EventDataNewBlock)
+				if ok {
+					if seenBlock {
+						// There's a mild race here, but when we enter we've just seen a block
+						// so we should be able to unsubscribe before we see another block
+						t.Fatal("Continued to see NewBlock event after unsubscribing")
+					} else {
+						seenBlock = true
+						unsubscribe(t, wsc, subId)
+					}
+				}
+			}
+		}
+	}
+}
+
diff --git a/rpc/tendermint/test/ws_helpers.go b/rpc/tendermint/test/ws_helpers.go
index bd694259..ed92bca1 100644
--- a/rpc/tendermint/test/ws_helpers.go
+++ b/rpc/tendermint/test/ws_helpers.go
@@ -36,15 +36,35 @@ func newWSClient(t *testing.T) *client.WSClient {
 }
 
 // subscribe to an event
-func subscribe(t *testing.T, wsc *client.WSClient, eventid string) {
-	if err := wsc.Subscribe(eventid); err != nil {
+func subscribe(t *testing.T, wsc *client.WSClient, eventId string) {
+	if err := wsc.Subscribe(eventId); err != nil {
 		t.Fatal(err)
 	}
 }
 
+func subscribeAndGetSubscriptionId(t *testing.T, wsc *client.WSClient,
+	eventId string) string {
+	if err := wsc.Subscribe(eventId); err != nil {
+		t.Fatal(err)
+	}
+
+	timeout := time.NewTimer(timeoutSeconds * time.Second)
+	for {
+		select {
+		case <-timeout.C:
+			t.Fatal("Timeout waiting for subscription result")
+		case bs := <-wsc.ResultsCh:
+			resultSubscribe, ok := readResult(t, bs).(*ctypes.ResultSubscribe)
+			if ok {
+				return resultSubscribe.SubscriptionId
+			}
+		}
+	}
+}
+
 // unsubscribe from an event
-func unsubscribe(t *testing.T, wsc *client.WSClient, eventid string) {
-	if err := wsc.Unsubscribe(eventid); err != nil {
+func unsubscribe(t *testing.T, wsc *client.WSClient, subscriptionId string) {
+	if err := wsc.Unsubscribe(subscriptionId); err != nil {
 		t.Fatal(err)
 	}
 }
@@ -93,13 +113,13 @@ func runThenWaitForBlock(t *testing.T, wsc *client.WSClient,
 func subscribeAndWaitForNext(t *testing.T, wsc *client.WSClient, event string,
 	runner func(),
 	eventDataChecker func(string, txs.EventData) (bool, error)) {
-	subscribe(t, wsc, event)
+	subId := subscribeAndGetSubscriptionId(t, wsc, event)
+	defer unsubscribe(t, wsc, subId)
 	waitForEvent(t,
 		wsc,
 		event,
 		runner,
 		eventDataChecker)
-	unsubscribe(t, wsc, event)
 }
 
 // waitForEvent executes runner that is expected to trigger events. It then
@@ -299,3 +319,13 @@ func UnmarshalEvent(b json.RawMessage) (string, events.EventData) {
 	}
 	return event.Event, event.Data
 }
+
+func readResult(t *testing.T, bs []byte) ctypes.ErisDBResult {
+	var err error
+	result := new(ctypes.ErisDBResult)
+	wire.ReadJSONPtr(result, bs, &err)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return *result
+}
-- 
GitLab