diff --git a/client/client.go b/client/client.go index 42082a6e7eebc47ce689d15574f77c28e45ad818..2a9e260395d443eda89d34a258a90f23fb9c670a 100644 --- a/client/client.go +++ b/client/client.go @@ -86,7 +86,7 @@ func (erisNodeClient *ErisNodeClient) Broadcast(tx txs.Tx) (*txs.Receipt, error) return &receipt, nil } -func (erisNodeClient *ErisNodeClient) DeriveWebsocketClient() (nodeWsClient *NodeWebSocketClient, err error) { +func (erisNodeClient *ErisNodeClient) DeriveWebsocketClient() (nodeWsClient *NodeWebsocketClient, err error) { var wsAddr string nodeAddr := erisNodeClient.broadcastRPC if strings.HasPrefix(nodeAddr, "http://") { @@ -107,11 +107,13 @@ func (erisNodeClient *ErisNodeClient) DeriveWebsocketClient() (nodeWsClient *Nod "endpoint": "/websocket", }).Debug("Subscribing to websocket address") wsClient := rpcclient.NewWSClient(wsAddr, "/websocket") - // NOTE: Failure to start is reported over an error channel - wsClient.Start() - return &ErisNodeWebsocketClient{ + if _, err = wsClient.Start(); err != nil { + return nil, err + } + derivedErisNodeWebsocketClient := &ErisNodeWebsocketClient{ tendermintWebsocket: wsClient, - }, nil + } + return (*NodeWebsocketClient)(derivedErisNodeWebsocketClient), nil } //------------------------------------------------------------------------------------ diff --git a/client/websocket_client.go b/client/websocket_client.go index f3b388514531e66ecd9df5ee78ec7065814ed9b3..c7e02a1e124fcb6c9107123acd495057a24d3af1 100644 --- a/client/websocket_client.go +++ b/client/websocket_client.go @@ -21,8 +21,12 @@ import ( "fmt" "github.com/tendermint/go-rpc/client" + "github.com/tendermint/go-wire" + + log "github.com/eris-ltd/eris-logger" "github.com/eris-ltd/eris-bd/txs" + ctypes "github.com/eris-ltd/eris-db/rpc/tendermint/core/types" ) type Confirmation { @@ -42,20 +46,75 @@ type ErisNodeWebsocketClient struct { // Subscribe to an eventid func (erisNodeWebsocketClient *ErisNodeWebsocketClient) Subscribe(eventid string) error { + // TODO we can in the background listen to the subscription id and remember it to ease unsubscribing later. return erisNodeWebsocketClient.tendermintWebsocket.Subscribe(eventid) } // Unsubscribe from an eventid -func (erisNodeWebsocketClient *ErisNodeWebsocketClient) Unsubscribe(eventid string) error { - return erisNodeWebsocketClient.tendermintWebsocket.Unsubscribe(eventid) +func (erisNodeWebsocketClient *ErisNodeWebsocketClient) Unsubscribe(subscriptionId string) error { + return erisNodeWebsocketClient.tendermintWebsocket.Unsubscribe(subscriptionId) } -// Returns a channel with a sign -func (erisNodeWebsocketClient *ErisNodeWebsocketClient) Wait(eventid string) (chan Confirmation, error) { +// Returns a channel that will receive a confirmation with a result or the exception that +// has been confirmed; or an error is returned and the confirmation channel is nil. +func (erisNodeWebsocketClient *ErisNodeWebsocketClient) WaitForConfirmation(eventid string) (chan Confirmation, error) { // Setup the confirmation channel to be returned confirmationChannel := make(chan Confirmation, 1) var latestBlockHash []byte + eid := txs.EventStringAccInput(inputAddr) + + // Read the incoming events + go func() { + var err error + for { + resultBytes := <- erisNodeWebsocketClient.tendermintWebsocket.ResultsCh + result := new(ctypes.ErisDBResult) + if wire.readJSONPtr(result, r, &err); err != nil { + // keep calm and carry on + log.Errorf("eris-client - Failed to unmarshal json bytes for websocket event: %s", err) + continue + } + + event, ok := (*result).(*ctypes.ResultEvent) + if !ok { + // keep calm and carry on + log.Error("eris-client - Failed to cast to ResultEvent for websocket event") + continue + } + + blockData, ok := event.Data.(txs.EventDataNewBlock) + if ok { + latestBlockHash = blockData.Block.Hash() + log.WithFields(log.Field{ + "new block": blockData.Block, + "latest hash": latestBlockHash, + }).Debug("Registered new block") + continue + } + + // we don't accept events unless they came after a new block (ie. in) + if latestBlockHash == nil { + continue + } + + if event.Event != eid { + log.Warnf("Received unsolicited event! Got %s, expected %s\n", result.Event, eid) + continue + } + + data, ok := result.Data.(txs.EventDataTx) + if !ok { + // We are on the lookout for EventDataTx + confirmationChannel <- Confirmation{ + BlockHash: latestBlockHash, + Event: nil // or result.Data ? + Exception: fmt.Errorf("response error: expected result.Data to be *types.EventDataTx") + } + } + } + } + } func (erisNodeWebsocketClient *ErisNodeWebsocketClient) Close() { @@ -66,8 +125,12 @@ func (erisNodeWebsocketClient *ErisNodeWebsocketClient) Close() { func (erisNodeWebsocketClient *ErisNodeWebsocketClient) assertNoErrors() error { if erisNodeWebsocketClient.tendermintWebsocket != nil { - + select { + case err := <-erisNodeWebsocketClient.tendermintWebsocket.ErrorCh: + return err + default: + return nil } else { - return fmt.Errorf("") + return fmt.Errorf("Eris-client") } } \ No newline at end of file