From adbd0bd7032a1bd651c5314fba914060530a3276 Mon Sep 17 00:00:00 2001
From: Silas Davis <silas@monax.io>
Date: Wed, 9 May 2018 22:02:31 +0100
Subject: [PATCH] Fix issues with out-of-order events Exposed by timeouts in
 waitForEvent helper

Signed-off-by: Silas Davis <silas@monax.io>
---
 event/emitter_test.go                       | 36 ++++++++
 rpc/tm/integration/client_test.go           | 33 ++++----
 rpc/tm/integration/websocket_client_test.go | 65 +++++++++------
 rpc/tm/integration/websocket_helpers.go     | 92 +++++++++++----------
 rpc/tm/methods.go                           |  2 +
 5 files changed, 146 insertions(+), 82 deletions(-)

diff --git a/event/emitter_test.go b/event/emitter_test.go
index c881cc53..d9709d6d 100644
--- a/event/emitter_test.go
+++ b/event/emitter_test.go
@@ -33,3 +33,39 @@ func TestEmitter(t *testing.T) {
 		t.Errorf("timed out before receiving message matching subscription query")
 	}
 }
+
+func TestOrdering(t *testing.T) {
+	em := NewEmitter(logging.NewNoopLogger())
+	ctx := context.Background()
+	out := make(chan interface{})
+
+	err := em.Subscribe(ctx, "TestOrdering1", NewQueryBuilder().AndEquals("foo", "bar"), out)
+	require.NoError(t, err)
+
+	err = em.Subscribe(ctx, "TestOrdering2", NewQueryBuilder().AndEquals("foo", "baz"), out)
+	require.NoError(t, err)
+
+	barTag := map[string]interface{}{"foo": "bar"}
+	bazTag := map[string]interface{}{"foo": "baz"}
+
+	msgs := [][]interface{}{
+		{"baz1", bazTag},
+		{"bar1", barTag},
+		{"bar2", barTag},
+		{"bar3", barTag},
+		{"baz2", bazTag},
+		{"baz3", bazTag},
+		{"bar4", barTag},
+	}
+
+	go func() {
+		for _, msg := range msgs {
+			em.Publish(ctx, msg[0], msg[1].(map[string]interface{}))
+		}
+		em.Publish(ctx, "stop", bazTag)
+	}()
+
+	for _, msg := range msgs {
+		assert.Equal(t, msg[0], <-out)
+	}
+}
diff --git a/rpc/tm/integration/client_test.go b/rpc/tm/integration/client_test.go
index 268953bc..d09ad176 100644
--- a/rpc/tm/integration/client_test.go
+++ b/rpc/tm/integration/client_test.go
@@ -49,13 +49,12 @@ func TestStatus(t *testing.T) {
 }
 
 func TestBroadcastTx(t *testing.T) {
-	wsc := newWSClient()
 	testWithAllClients(t, func(t *testing.T, clientName string, client tm_client.RPCClient) {
 		// Avoid duplicate Tx in mempool
 		amt := hashString(clientName) % 1000
 		toAddr := privateAccounts[1].Address()
 		tx := makeDefaultSendTxSigned(t, client, toAddr, amt)
-		receipt, err := broadcastTxAndWait(t, client, wsc, tx)
+		receipt, err := broadcastTxAndWait(t, client, tx)
 		require.NoError(t, err)
 		assert.False(t, receipt.CreatesContract, "This tx should not create a contract")
 		assert.NotEmpty(t, receipt.TxHash, "Failed to compute tx hash")
@@ -96,14 +95,12 @@ func TestGetStorage(t *testing.T) {
 	if testing.Short() {
 		t.Skip("skipping test in short mode.")
 	}
-	wsc := newWSClient()
-	defer stopWSClient(wsc)
 	testWithAllClients(t, func(t *testing.T, clientName string, client tm_client.RPCClient) {
 		amt, gasLim, fee := uint64(1100), uint64(1000), uint64(1000)
 		code := []byte{0x60, 0x5, 0x60, 0x1, 0x55}
 		// Call with nil address will create a contract
 		tx := makeDefaultCallTx(t, client, nil, code, amt, gasLim, fee)
-		receipt, err := broadcastTxAndWait(t, client, wsc, tx)
+		receipt, err := broadcastTxAndWait(t, client, tx)
 		assert.NoError(t, err)
 		assert.Equal(t, true, receipt.CreatesContract, "This transaction should"+
 			" create a contract")
@@ -150,14 +147,12 @@ func TestCallContract(t *testing.T) {
 	if testing.Short() {
 		t.Skip("skipping test in short mode.")
 	}
-	wsc := newWSClient()
-	defer stopWSClient(wsc)
 	testWithAllClients(t, func(t *testing.T, clientName string, client tm_client.RPCClient) {
 		// create the contract
 		amt, gasLim, fee := uint64(6969), uint64(1000), uint64(1000)
 		code, _, _ := simpleContract()
 		tx := makeDefaultCallTx(t, client, nil, code, amt, gasLim, fee)
-		receipt, err := broadcastTxAndWait(t, client, wsc, tx)
+		receipt, err := broadcastTxAndWait(t, client, tx)
 		assert.NoError(t, err)
 		if err != nil {
 			t.Fatalf("Problem broadcasting transaction: %v", err)
@@ -198,7 +193,7 @@ func TestNameReg(t *testing.T) {
 		// verify the name by both using the event and by checking get_name
 		subscribeAndWaitForNext(t, wsc, exe_events.EventStringNameReg(name),
 			func() {
-				broadcastTxAndWait(t, client, wsc, tx)
+				broadcastTx(t, client, tx)
 			},
 			func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) {
 
@@ -225,7 +220,7 @@ func TestNameReg(t *testing.T) {
 		amt = fee + numDesiredBlocks*txs.NameByteCostMultiplier*
 			txs.NameBlockCostMultiplier*txs.NameBaseCost(name, updatedData)
 		tx = makeDefaultNameTx(t, client, name, updatedData, amt, fee)
-		broadcastTxAndWait(t, client, wsc, tx)
+		broadcastTxAndWait(t, client, tx)
 		entry = getNameRegEntry(t, client, name)
 
 		assert.Equal(t, updatedData, entry.Data)
@@ -235,7 +230,7 @@ func TestNameReg(t *testing.T) {
 			getSequence(t, client, privateAccounts[1].Address())+1)
 		tx.Sign(genesisDoc.ChainID(), privateAccounts[1])
 
-		_, err := broadcastTxAndWait(t, client, wsc, tx)
+		_, err := tm_client.BroadcastTx(client, tx)
 		assert.Error(t, err, "Expected error when updating someone else's unexpired"+
 			" name registry entry")
 		if err != nil {
@@ -244,16 +239,24 @@ func TestNameReg(t *testing.T) {
 		}
 
 		// Wait a couple of blocks to make sure name registration expires
-		waitNBlocks(t, wsc, 3)
+		waitNBlocks(t, wsc, 5)
 
 		//now the entry should be expired, so we can update as non owner
 		const data2 = "this is not my beautiful house"
 		tx = txs.NewNameTxWithSequence(privateAccounts[1].PublicKey(), name, data2, amt, fee,
 			getSequence(t, client, privateAccounts[1].Address())+1)
 		tx.Sign(genesisDoc.ChainID(), privateAccounts[1])
-		_, err = broadcastTxAndWait(t, client, wsc, tx)
-		assert.NoError(t, err, "Should be able to update a previously expired name"+
-			" registry entry as a different address")
+
+		//_, err = tm_client.BroadcastTx(client, tx)
+		require.NoError(t, subscribeAndWaitForNext(t, wsc, exe_events.EventStringNameReg(name),
+			func() {
+				_, err = tm_client.BroadcastTx(client, tx)
+				assert.NoError(t, err, "Should be able to update a previously expired name"+
+					" registry entry as a different address")
+			},
+			func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) {
+				return true, nil
+			}))
 		entry = getNameRegEntry(t, client, name)
 		assert.Equal(t, data2, entry.Data)
 		assert.Equal(t, privateAccounts[1].Address(), entry.Owner)
diff --git a/rpc/tm/integration/websocket_client_test.go b/rpc/tm/integration/websocket_client_test.go
index 9d8cc707..00b3c723 100644
--- a/rpc/tm/integration/websocket_client_test.go
+++ b/rpc/tm/integration/websocket_client_test.go
@@ -108,13 +108,27 @@ func TestWSSend(t *testing.T) {
 		unsubscribe(t, wsc, subIdOutput)
 		stopWSClient(wsc)
 	}()
-	waitForEvent(t, wsc, eidInput, func() {
-		tx := makeDefaultSendTxSigned(t, jsonRpcClient, toAddr, amt)
-		broadcastTx(t, jsonRpcClient, tx)
-	}, unmarshalValidateSend(amt, toAddr))
 
-	waitForEvent(t, wsc, eidOutput, func() {},
-		unmarshalValidateSend(amt, toAddr))
+	tx := makeDefaultSendTxSigned(t, jsonRpcClient, toAddr, amt)
+	broadcastTx(t, jsonRpcClient, tx)
+
+	// Set of response IDs we expect
+	rids := map[string]struct{}{tm_client.EventResponseID(eidInput): {}, tm_client.EventResponseID(eidOutput): {}}
+
+	r := <-wsc.ResponsesCh
+	result, err := readResponse(r)
+	assert.NoError(t, err)
+	assert.NoError(t, unmarshalValidateSend(amt, toAddr, result))
+	delete(rids, r.ID)
+
+	r = <-wsc.ResponsesCh
+	result, err = readResponse(r)
+	assert.NoError(t, err)
+	assert.NoError(t, unmarshalValidateSend(amt, toAddr, result))
+	delete(rids, r.ID)
+
+	// Note currently we cannot guarantee order with pubsub event system
+	assert.Empty(t, rids, "Should receive input and output event")
 }
 
 // ensure events are only fired once for a given transaction
@@ -138,14 +152,12 @@ func TestWSDoubleFire(t *testing.T) {
 	}, func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) {
 		return true, nil
 	})
-	// but make sure we don't hear about it twice
-	err := waitForEvent(t, wsc, eid,
-		func() {},
-		func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) {
-			return false, nil
-		})
-	assert.True(t, err.Timeout(), "We should have timed out waiting for second"+
-		" %v event", eid)
+	select {
+	case <-wsc.ResponsesCh:
+		t.Fatal("Should time out waiting for second event")
+	case <-time.After(timeoutSeconds * time.Second):
+
+	}
 }
 
 // create a contract, wait for the event, and send it a msg, validate the return
@@ -163,11 +175,11 @@ func TestWSCallWait(t *testing.T) {
 		eid1 := exe_events.EventStringAccountInput(privateAccounts[0].Address())
 		subId1 := subscribeAndGetSubscriptionId(t, wsc, eid1)
 		// wait for the contract to be created
-		assert.False(t, waitForEvent(t, wsc, eid1, func() {
+		waitForEvent(t, wsc, eid1, func() {
 			tx := makeDefaultCallTx(t, jsonRpcClient, nil, code, amt, gasLim, fee)
 			receipt := broadcastTx(t, jsonRpcClient, tx)
 			contractAddr = receipt.ContractAddress
-		}, unmarshalValidateTx(amt, returnCode)).Timeout(), "waitForEvent timed out")
+		}, unmarshalValidateTx(amt, returnCode))
 
 		unsubscribe(t, wsc, subId1)
 
@@ -177,11 +189,11 @@ func TestWSCallWait(t *testing.T) {
 		subId2 := subscribeAndGetSubscriptionId(t, wsc, eid2)
 		// get the return value from a call
 		data := []byte{0x1}
-		assert.False(t, waitForEvent(t, wsc, eid2, func() {
+		waitForEvent(t, wsc, eid2, func() {
 			tx := makeDefaultCallTx(t, jsonRpcClient, &contractAddr, data, amt, gasLim, fee)
 			receipt := broadcastTx(t, jsonRpcClient, tx)
 			contractAddr = receipt.ContractAddress
-		}, unmarshalValidateTx(amt, returnVal)).Timeout(), "waitForEvent timed out")
+		}, unmarshalValidateTx(amt, returnVal))
 		unsubscribe(t, wsc, subId2)
 	}
 }
@@ -209,12 +221,12 @@ func TestWSCallNoWait(t *testing.T) {
 	defer unsubscribe(t, wsc, subId)
 
 	data := []byte{0x1}
-	assert.False(t, waitForEvent(t, wsc, eid, func() {
+	waitForEvent(t, wsc, eid, func() {
 		tx = txs.NewCallTxWithSequence(privateAccounts[0].PublicKey(), &contractAddr, data, amt, gasLim, fee,
 			sequence+3)
 		require.NoError(t, tx.Sign(genesisDoc.ChainID(), privateAccounts[0]))
 		broadcastTx(t, jsonRpcClient, tx)
-	}, unmarshalValidateTx(amt, returnVal)).Timeout(), "waitForEvent timed out")
+	}, unmarshalValidateTx(amt, returnVal))
 }
 
 // create two contracts, one of which calls the other
@@ -230,19 +242,22 @@ func TestWSCallCall(t *testing.T) {
 
 	// deploy the two contracts
 	tx := makeDefaultCallTx(t, jsonRpcClient, nil, code, amt, gasLim, fee)
-	receipt, err := broadcastTxAndWait(t, jsonRpcClient, wsc, tx)
-	require.NoError(t, err)
+	receipt := txs.GenerateReceipt(genesisDoc.ChainID(), tx)
 	contractAddr1 := receipt.ContractAddress
-
 	// subscribe to the new contracts
 	eid := evm_events.EventStringAccountCall(contractAddr1)
 	subId := subscribeAndGetSubscriptionId(t, wsc, eid)
 	defer unsubscribe(t, wsc, subId)
+
+	_, err := broadcastTxAndWait(t, jsonRpcClient, tx)
+	require.NoError(t, err)
+
 	// call contract2, which should call contract1, and wait for ev1
 	code, _, _ = simpleCallContract(contractAddr1)
 	tx = makeDefaultCallTx(t, jsonRpcClient, nil, code, amt, gasLim, fee)
-	receipt = broadcastTx(t, jsonRpcClient, tx)
-	contractAddr2 := receipt.ContractAddress
+	receipt2, err := broadcastTxAndWait(t, jsonRpcClient, tx)
+	require.NoError(t, err)
+	contractAddr2 := receipt2.ContractAddress
 
 	// let the contract get created first
 	waitForEvent(t, wsc, eid,
diff --git a/rpc/tm/integration/websocket_helpers.go b/rpc/tm/integration/websocket_helpers.go
index 11fbf9d9..9f0a4364 100644
--- a/rpc/tm/integration/websocket_helpers.go
+++ b/rpc/tm/integration/websocket_helpers.go
@@ -21,6 +21,7 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"runtime/debug"
 	"testing"
 	"time"
 
@@ -31,11 +32,12 @@ import (
 	"github.com/hyperledger/burrow/txs"
 	"github.com/stretchr/testify/require"
 	"github.com/tendermint/tendermint/rpc/lib/client"
+	"github.com/tendermint/tendermint/rpc/lib/types"
 	tm_types "github.com/tendermint/tendermint/types"
 )
 
 const (
-	timeoutSeconds       = 2
+	timeoutSeconds       = 4
 	expectBlockInSeconds = 2
 )
 
@@ -95,9 +97,9 @@ func unsubscribe(t *testing.T, wsc *rpcclient.WSClient, subscriptionId string) {
 }
 
 // broadcast transaction and wait for new block
-func broadcastTxAndWait(t *testing.T, client tm_client.RPCClient, wsc *rpcclient.WSClient,
-	tx txs.Tx) (*txs.Receipt, error) {
-
+func broadcastTxAndWait(t *testing.T, client tm_client.RPCClient, tx txs.Tx) (*txs.Receipt, error) {
+	wsc := newWSClient()
+	defer stopWSClient(wsc)
 	inputs := tx.GetInputs()
 	if len(inputs) == 0 {
 		t.Fatalf("cannot broadcastAndWait fot Tx with no inputs")
@@ -154,10 +156,12 @@ func runThenWaitForBlock(t *testing.T, wsc *rpcclient.WSClient, predicate blockP
 	subscribeAndWaitForNext(t, wsc, tm_types.EventNewBlock, runner, eventDataChecker)
 }
 
-func subscribeAndWaitForNext(t *testing.T, wsc *rpcclient.WSClient, event string, runner func(), checker resultEventChecker) {
+func subscribeAndWaitForNext(t *testing.T, wsc *rpcclient.WSClient, event string, runner func(),
+	checker resultEventChecker) error {
+
 	subId := subscribeAndGetSubscriptionId(t, wsc, event)
 	defer unsubscribe(t, wsc, subId)
-	waitForEvent(t, wsc, event, runner, checker)
+	return waitForEvent(t, wsc, event, runner, checker)
 }
 
 // waitForEvent executes runner that is expected to trigger events. It then
@@ -168,7 +172,8 @@ func subscribeAndWaitForNext(t *testing.T, wsc *rpcclient.WSClient, event string
 // waitForEvent will keep listening for new events. If an error is returned
 // waitForEvent will fail the test.
 func waitForEvent(t *testing.T, wsc *rpcclient.WSClient, eventID string, runner func(),
-	checker resultEventChecker) waitForEventResult {
+	checker resultEventChecker) error {
+
 	// go routine to wait for websocket msg
 	eventsCh := make(chan *rpc.ResultEvent)
 	shutdownEventsCh := make(chan bool, 1)
@@ -189,8 +194,7 @@ func waitForEvent(t *testing.T, wsc *rpcclient.WSClient, eventID string, runner
 					return
 				}
 				if r.ID == tm_client.EventResponseID(eventID) {
-					resultEvent := new(rpc.ResultEvent)
-					err := json.Unmarshal(r.Result, resultEvent)
+					resultEvent, err := readResponse(r)
 					if err != nil {
 						errCh <- err
 					} else {
@@ -210,54 +214,58 @@ func waitForEvent(t *testing.T, wsc *rpcclient.WSClient, eventID string, runner
 		select {
 		// wait for an event or timeout
 		case <-time.After(timeoutSeconds * time.Second):
-			return waitForEventResult{timeout: true}
+			t.Fatalf("waitForEvent timed out: %s", debug.Stack())
 		case eventData := <-eventsCh:
 			// run the check
 			stopWaiting, err := checker(eventID, eventData)
-			require.NoError(t, err)
+			if err != nil {
+				return err
+			}
 			if stopWaiting {
-				return waitForEventResult{}
+				return nil
 			}
 		case err := <-errCh:
-			t.Fatal(err)
+			return err
 		}
 	}
+	return nil
 }
 
-type waitForEventResult struct {
-	error
-	timeout bool
-}
-
-func (err waitForEventResult) Timeout() bool {
-	return err.timeout
+func readResponse(r rpctypes.RPCResponse) (*rpc.ResultEvent, error) {
+	if r.Error != nil {
+		return nil, r.Error
+	}
+	resultEvent := new(rpc.ResultEvent)
+	err := json.Unmarshal(r.Result, resultEvent)
+	if err != nil {
+		return nil, err
+	}
+	return resultEvent, nil
 }
 
 //--------------------------------------------------------------------------------
 
-func unmarshalValidateSend(amt uint64, toAddr acm.Address) resultEventChecker {
-	return func(eventID string, resultEvent *rpc.ResultEvent) (bool, error) {
-		data := resultEvent.EventDataTx
-		if data == nil {
-			return true, fmt.Errorf("event data %v is not EventDataTx", resultEvent)
-		}
-		if data.Exception != "" {
-			return true, fmt.Errorf(data.Exception)
-		}
-		tx := data.Tx.(*txs.SendTx)
-		if tx.Inputs[0].Address != privateAccounts[0].Address() {
-			return true, fmt.Errorf("senders do not match up! Got %s, expected %s", tx.Inputs[0].Address,
-				privateAccounts[0].Address())
-		}
-		if tx.Inputs[0].Amount != amt {
-			return true, fmt.Errorf("amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt)
-		}
-		if tx.Outputs[0].Address != toAddr {
-			return true, fmt.Errorf("receivers do not match up! Got %s, expected %s", tx.Outputs[0].Address,
-				privateAccounts[0].Address())
-		}
-		return true, nil
+func unmarshalValidateSend(amt uint64, toAddr acm.Address, resultEvent *rpc.ResultEvent) error {
+	data := resultEvent.EventDataTx
+	if data == nil {
+		return fmt.Errorf("event data %v is not EventDataTx", resultEvent)
+	}
+	if data.Exception != "" {
+		return fmt.Errorf(data.Exception)
+	}
+	tx := data.Tx.(*txs.SendTx)
+	if tx.Inputs[0].Address != privateAccounts[0].Address() {
+		return fmt.Errorf("senders do not match up! Got %s, expected %s", tx.Inputs[0].Address,
+			privateAccounts[0].Address())
+	}
+	if tx.Inputs[0].Amount != amt {
+		return fmt.Errorf("amt does not match up! Got %d, expected %d", tx.Inputs[0].Amount, amt)
+	}
+	if tx.Outputs[0].Address != toAddr {
+		return fmt.Errorf("receivers do not match up! Got %s, expected %s", tx.Outputs[0].Address,
+			privateAccounts[0].Address())
 	}
+	return nil
 }
 
 func unmarshalValidateTx(amt uint64, returnCode []byte) resultEventChecker {
diff --git a/rpc/tm/methods.go b/rpc/tm/methods.go
index 13f3445e..49e6696f 100644
--- a/rpc/tm/methods.go
+++ b/rpc/tm/methods.go
@@ -101,8 +101,10 @@ func GetRoutes(service *rpc.Service, logger *logging.Logger) map[string]*gorpc.R
 			if err != nil {
 				return nil, err
 			}
+
 			ctx, cancel := context.WithTimeout(context.Background(), SubscriptionTimeoutSeconds*time.Second)
 			defer cancel()
+
 			err = service.Subscribe(ctx, subscriptionID, eventID, func(resultEvent *rpc.ResultEvent) bool {
 				keepAlive := wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(
 					EventResponseID(wsCtx.Request.ID, eventID), resultEvent))
-- 
GitLab