diff --git a/client/websocket_client.go b/client/websocket_client.go index 9e6484265e646d72a4d345025c30d70260dc2657..c6b628ddb91560f949586138736ef9c6148d0ead 100644 --- a/client/websocket_client.go +++ b/client/websocket_client.go @@ -24,6 +24,7 @@ import ( "github.com/tendermint/go-wire" "github.com/hyperledger/burrow/logging" + tendermint_client "github.com/hyperledger/burrow/rpc/tendermint/client" ctypes "github.com/hyperledger/burrow/rpc/tendermint/core/types" "github.com/hyperledger/burrow/txs" ) @@ -50,14 +51,16 @@ type burrowNodeWebsocketClient struct { } // Subscribe to an eventid -func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) Subscribe(eventid string) error { +func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) Subscribe(eventId string) error { // TODO we can in the background listen to the subscription id and remember it to ease unsubscribing later. - return burrowNodeWebsocketClient.tendermintWebsocket.Subscribe(eventid) + return tendermint_client.Subscribe(burrowNodeWebsocketClient.tendermintWebsocket, + eventId) } // Unsubscribe from an eventid func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) Unsubscribe(subscriptionId string) error { - return burrowNodeWebsocketClient.tendermintWebsocket.Unsubscribe(subscriptionId) + return tendermint_client.Unsubscribe(burrowNodeWebsocketClient.tendermintWebsocket, + subscriptionId) } // Returns a channel that will receive a confirmation with a result or the exception that @@ -73,10 +76,10 @@ func (burrowNodeWebsocketClient *burrowNodeWebsocketClient) WaitForConfirmation( var latestBlockHash []byte eid := txs.EventStringAccInput(inputAddr) - if err := burrowNodeWebsocketClient.tendermintWebsocket.Subscribe(eid); err != nil { + if err := burrowNodeWebsocketClient.Subscribe(eid); err != nil { return nil, fmt.Errorf("Error subscribing to AccInput event (%s): %v", eid, err) } - if err := burrowNodeWebsocketClient.tendermintWebsocket.Subscribe(txs.EventStringNewBlock()); err != nil { + if err := burrowNodeWebsocketClient.Subscribe(txs.EventStringNewBlock()); err != nil { return nil, fmt.Errorf("Error subscribing to NewBlock event: %v", err) } // Read the incoming events diff --git a/definitions/tendermint_pipe.go b/definitions/tendermint_pipe.go index 600bfbd19232bf078a3c21d560403046f962abca..faf78788a7f74d1efad0ae81df90b3c2cc520617 100644 --- a/definitions/tendermint_pipe.go +++ b/definitions/tendermint_pipe.go @@ -31,7 +31,7 @@ type TendermintPipe interface { // Subscribe attempts to subscribe the listener identified by listenerId to // the event named event. The Event result is written to rpcResponseWriter // which must be non-blocking - Subscribe(event string, + Subscribe(eventId string, rpcResponseWriter func(result rpc_tm_types.BurrowResult)) (*rpc_tm_types.ResultSubscribe, error) Unsubscribe(subscriptionId string) (*rpc_tm_types.ResultUnsubscribe, error) diff --git a/manager/burrow-mint/pipe.go b/manager/burrow-mint/pipe.go index 77efde509bd5dcfcfa0c73a64de6f838c521d614..e81fa8a3c4a9e16202b8a107c317d87520d4bc13 100644 --- a/manager/burrow-mint/pipe.go +++ b/manager/burrow-mint/pipe.go @@ -246,24 +246,26 @@ func (pipe *burrowMintPipe) consensusAndManagerEvents() edb_event.EventEmitter { //------------------------------------------------------------------------------ // Implement definitions.TendermintPipe for burrowMintPipe -func (pipe *burrowMintPipe) Subscribe(event string, +func (pipe *burrowMintPipe) Subscribe(eventId string, rpcResponseWriter func(result rpc_tm_types.BurrowResult)) (*rpc_tm_types.ResultSubscribe, error) { subscriptionId, err := edb_event.GenerateSubId() if err != nil { return nil, err logging.InfoMsg(pipe.logger, "Subscribing to event", - "event", event, "subscriptionId", subscriptionId) + "eventId", eventId, "subscriptionId", subscriptionId) } - pipe.consensusAndManagerEvents().Subscribe(subscriptionId, event, + pipe.consensusAndManagerEvents().Subscribe(subscriptionId, eventId, func(eventData txs.EventData) { - result := rpc_tm_types.BurrowResult(&rpc_tm_types.ResultEvent{event, - txs.EventData(eventData)}) + result := rpc_tm_types.BurrowResult( + &rpc_tm_types.ResultEvent{ + Event: eventId, + Data: txs.EventData(eventData)}) // NOTE: EventSwitch callbacks must be nonblocking rpcResponseWriter(result) }) return &rpc_tm_types.ResultSubscribe{ SubscriptionId: subscriptionId, - Event: event, + Event: eventId, }, nil } diff --git a/rpc/tendermint/client/websocket_client.go b/rpc/tendermint/client/websocket_client.go new file mode 100644 index 0000000000000000000000000000000000000000..4749517b72febf7b62929fe4fa7b18c986567bdb --- /dev/null +++ b/rpc/tendermint/client/websocket_client.go @@ -0,0 +1,39 @@ +// Copyright 2017 Monax Industries Limited +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import "github.com/tendermint/go-rpc/types" + +type WebsocketClient interface { + WriteJSON(v interface{}) error +} + +func Subscribe(websocketClient WebsocketClient, eventId string) error { + return websocketClient.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "subscribe", + Params: map[string]interface{}{"eventId": eventId}, + }) +} + +func Unsubscribe(websocketClient WebsocketClient, subscriptionId string) error { + return websocketClient.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "unsubscribe", + Params: map[string]interface{}{"subscriptionId": subscriptionId}, + }) +} diff --git a/rpc/tendermint/core/routes.go b/rpc/tendermint/core/routes.go index 83573742edc1cfb31083dd3dd5fb6c51a8944b8f..b295b3a6de5d12b5b71dccb8b274f35aad7143c3 100644 --- a/rpc/tendermint/core/routes.go +++ b/rpc/tendermint/core/routes.go @@ -38,7 +38,7 @@ type TendermintRoutes struct { func (tmRoutes *TendermintRoutes) GetRoutes() map[string]*rpc.RPCFunc { var routes = map[string]*rpc.RPCFunc{ - "subscribe": rpc.NewWSRPCFunc(tmRoutes.Subscribe, "event"), + "subscribe": rpc.NewWSRPCFunc(tmRoutes.Subscribe, "eventId"), "unsubscribe": rpc.NewWSRPCFunc(tmRoutes.Unsubscribe, "subscriptionId"), "status": rpc.NewRPCFunc(tmRoutes.StatusResult, ""), "net_info": rpc.NewRPCFunc(tmRoutes.NetInfoResult, ""), @@ -66,7 +66,7 @@ func (tmRoutes *TendermintRoutes) GetRoutes() map[string]*rpc.RPCFunc { } func (tmRoutes *TendermintRoutes) Subscribe(wsCtx rpctypes.WSRPCContext, - event string) (ctypes.BurrowResult, error) { + eventId string) (ctypes.BurrowResult, error) { // NOTE: RPCResponses of subscribed events have id suffix "#event" // TODO: we really ought to allow multiple subscriptions from the same client address // to the same event. The code as it stands reflects the somewhat broken tendermint @@ -74,7 +74,7 @@ func (tmRoutes *TendermintRoutes) Subscribe(wsCtx rpctypes.WSRPCContext, // and return it in the result. This would require clients to hang on to a // subscription id if they wish to unsubscribe, but then again they can just // drop their connection - result, err := tmRoutes.tendermintPipe.Subscribe(event, + result, err := tmRoutes.tendermintPipe.Subscribe(eventId, func(result ctypes.BurrowResult) { wsCtx.GetRemoteAddr() // NOTE: EventSwitch callbacks must be nonblocking diff --git a/rpc/tendermint/test/websocket_client_test.go b/rpc/tendermint/test/websocket_client_test.go index 6523290cf08e4fdd225554db299b3a741e96e357..4bb2609269bfdbb85978cc15f23fbea6d2292f3b 100644 --- a/rpc/tendermint/test/websocket_client_test.go +++ b/rpc/tendermint/test/websocket_client_test.go @@ -260,11 +260,12 @@ func TestSubscribe(t *testing.T) { var subId string subscribe(t, wsc, txs.EventStringNewBlock()) - timeout := time.NewTimer(timeoutSeconds * time.Second) + // timeout to check subscription process is live + timeout := time.After(timeoutSeconds * time.Second) Subscribe: for { select { - case <-timeout.C: + case <-timeout: t.Fatal("Timed out waiting for subscription result") case bs := <-wsc.ResultsCh: @@ -277,12 +278,13 @@ Subscribe: } } - seenBlock := false - timeout = time.NewTimer(timeoutSeconds * time.Second) + blocksSeen := 0 for { select { - case <-timeout.C: - if !seenBlock { + // wait long enough to check we don't see another new block event even though + // a block will have come + case <-time.After(expectBlockInSeconds * time.Second): + if blocksSeen == 0 { t.Fatal("Timed out without seeing a NewBlock event") } return @@ -292,13 +294,13 @@ Subscribe: if ok { _, ok := resultEvent.Data.(txs.EventDataNewBlock) if ok { - if seenBlock { - // There's a mild race here, but when we enter we've just seen a block - // so we should be able to unsubscribe before we see another block + if blocksSeen > 1 { t.Fatal("Continued to see NewBlock event after unsubscribing") } else { - seenBlock = true - unsubscribe(t, wsc, subId) + if blocksSeen == 0 { + unsubscribe(t, wsc, subId) + } + blocksSeen++ } } } diff --git a/rpc/tendermint/test/websocket_helpers.go b/rpc/tendermint/test/websocket_helpers.go index acade844555176695120d5cde11a410b149760d3..1e013b5203dade9e0b76eac80e5f1361b88bac5b 100644 --- a/rpc/tendermint/test/websocket_helpers.go +++ b/rpc/tendermint/test/websocket_helpers.go @@ -30,7 +30,8 @@ import ( ) const ( - timeoutSeconds = 2 + timeoutSeconds = 2 + expectBlockInSeconds = timeoutSeconds * 2 ) //-------------------------------------------------------------------------------- @@ -48,14 +49,14 @@ func newWSClient() *rpcclient.WSClient { // subscribe to an event func subscribe(t *testing.T, wsc *rpcclient.WSClient, eventId string) { - if err := wsc.Subscribe(eventId); err != nil { + if err := burrow_client.Subscribe(wsc, eventId); err != nil { t.Fatal(err) } } func subscribeAndGetSubscriptionId(t *testing.T, wsc *rpcclient.WSClient, eventId string) string { - if err := wsc.Subscribe(eventId); err != nil { + if err := burrow_client.Subscribe(wsc, eventId); err != nil { t.Fatal(err) } @@ -75,7 +76,7 @@ func subscribeAndGetSubscriptionId(t *testing.T, wsc *rpcclient.WSClient, // unsubscribe from an event func unsubscribe(t *testing.T, wsc *rpcclient.WSClient, subscriptionId string) { - if err := wsc.Unsubscribe(subscriptionId); err != nil { + if err := burrow_client.Unsubscribe(wsc, subscriptionId); err != nil { t.Fatal(err) } }