diff --git a/rpc/tm/lib/client/http_client.go b/rpc/tm/lib/client/http_client.go new file mode 100644 index 0000000000000000000000000000000000000000..46b07c48d6f6c892b0583c384ca5bf23c61454c8 --- /dev/null +++ b/rpc/tm/lib/client/http_client.go @@ -0,0 +1,195 @@ +package client + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net" + "net/http" + "net/url" + "reflect" + "strings" + + "github.com/hyperledger/burrow/rpc/tm/lib/types" + "github.com/pkg/errors" +) + +// HTTPClient is a common interface for JSONRPCClient and URIClient. +type HTTPClient interface { + Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) +} + +// TODO: Deprecate support for IP:PORT or /path/to/socket +func makeHTTPDialer(remoteAddr string) (string, func(string, string) (net.Conn, error)) { + parts := strings.SplitN(remoteAddr, "://", 2) + var protocol, address string + if len(parts) == 1 { + // default to tcp if nothing specified + protocol, address = "tcp", remoteAddr + } else if len(parts) == 2 { + protocol, address = parts[0], parts[1] + } else { + // return a invalid message + msg := fmt.Sprintf("Invalid addr: %s", remoteAddr) + return msg, func(_ string, _ string) (net.Conn, error) { + return nil, errors.New(msg) + } + } + // accept http as an alias for tcp + if protocol == "http" { + protocol = "tcp" + } + + // replace / with . for http requests (kvstore domain) + trimmedAddress := strings.Replace(address, "/", ".", -1) + return trimmedAddress, func(proto, addr string) (net.Conn, error) { + return net.Dial(protocol, address) + } +} + +// We overwrite the http.Client.Dial so we can do http over tcp or unix. +// remoteAddr should be fully featured (eg. with tcp:// or unix://) +func makeHTTPClient(remoteAddr string) (string, *http.Client) { + address, dialer := makeHTTPDialer(remoteAddr) + return "http://" + address, &http.Client{ + Transport: &http.Transport{ + Dial: dialer, + }, + } +} + +//------------------------------------------------------------------------------------ + +// JSONRPCClient takes params as a slice +type JSONRPCClient struct { + address string + client *http.Client +} + +// NewJSONRPCClient returns a JSONRPCClient pointed at the given address. +func NewJSONRPCClient(remote string) *JSONRPCClient { + address, client := makeHTTPClient(remote) + return &JSONRPCClient{ + address: address, + client: client, + } +} + +func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + request, err := types.MapToRequest("jsonrpc-client", method, params) + if err != nil { + return nil, err + } + requestBytes, err := json.Marshal(request) + if err != nil { + return nil, err + } + // log.Info(string(requestBytes)) + requestBuf := bytes.NewBuffer(requestBytes) + // log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes))) + httpResponse, err := c.client.Post(c.address, "text/json", requestBuf) + if err != nil { + return nil, err + } + defer httpResponse.Body.Close() // nolint: errcheck + + responseBytes, err := ioutil.ReadAll(httpResponse.Body) + if err != nil { + return nil, err + } + // log.Info(Fmt("RPC response: %v", string(responseBytes))) + return unmarshalResponseBytes(responseBytes, result) +} + +//------------------------------------------------------------- + +// URI takes params as a map +type URIClient struct { + address string + client *http.Client +} + +func NewURIClient(remote string) *URIClient { + address, client := makeHTTPClient(remote) + return &URIClient{ + address: address, + client: client, + } +} + +func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + values, err := argsToURLValues(params) + if err != nil { + return nil, err + } + // log.Info(Fmt("URI request to %v (%v): %v", c.address, method, values)) + resp, err := c.client.PostForm(c.address+"/"+method, values) + if err != nil { + return nil, err + } + defer resp.Body.Close() // nolint: errcheck + + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return unmarshalResponseBytes(responseBytes, result) +} + +//------------------------------------------------ + +func unmarshalResponseBytes(responseBytes []byte, result interface{}) (interface{}, error) { + // Read response. If rpc/core/types is imported, the result will unmarshal + // into the correct type. + // log.Notice("response", "response", string(responseBytes)) + var err error + response := &types.RPCResponse{} + err = json.Unmarshal(responseBytes, response) + if err != nil { + return nil, errors.Errorf("Error unmarshalling rpc response: %v", err) + } + if response.Error != nil { + return nil, errors.Errorf("Response error: %v", response.Error) + } + // Unmarshal the RawMessage into the result. + err = json.Unmarshal(response.Result, result) + if err != nil { + return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err) + } + return result, nil +} + +func argsToURLValues(args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + err := argsToJSON(args) + if err != nil { + return nil, err + } + for key, val := range args { + values.Set(key, val.(string)) + } + return values, nil +} + +func argsToJSON(args map[string]interface{}) error { + for k, v := range args { + rt := reflect.TypeOf(v) + isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8 + if isByteSlice { + bs := reflect.ValueOf(v).Bytes() + args[k] = fmt.Sprintf("0x%X", bs) + continue + } + + data, err := json.Marshal(v) + if err != nil { + return err + } + args[k] = string(data) + } + return nil +} diff --git a/rpc/tm/lib/client/ws_client.go b/rpc/tm/lib/client/ws_client.go new file mode 100644 index 0000000000000000000000000000000000000000..1527d9311cd736ae85ccb137c2880e6b6c2f3a8d --- /dev/null +++ b/rpc/tm/lib/client/ws_client.go @@ -0,0 +1,482 @@ +package client + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/hyperledger/burrow/rpc/tm/lib/types" + "github.com/pkg/errors" + "github.com/rcrowley/go-metrics" + cmn "github.com/tendermint/tmlibs/common" +) + +const ( + defaultMaxReconnectAttempts = 25 + defaultWriteWait = 0 + defaultReadWait = 0 + defaultPingPeriod = 0 +) + +// WSClient is a WebSocket client. The methods of WSClient are safe for use by +// multiple goroutines. +type WSClient struct { + cmn.BaseService + + conn *websocket.Conn + + Address string // IP:PORT or /path/to/socket + Endpoint string // /websocket/url/endpoint + Dialer func(string, string) (net.Conn, error) + + // Time between sending a ping and receiving a pong. See + // https://godoc.org/github.com/rcrowley/go-metrics#Timer. + PingPongLatencyTimer metrics.Timer + + // Single user facing channel to read RPCResponses from, closed only when the client is being stopped. + ResponsesCh chan types.RPCResponse + + // Callback, which will be called each time after successful reconnect. + onReconnect func() + + // internal channels + send chan types.RPCRequest // user requests + backlog chan types.RPCRequest // stores a single user request received during a conn failure + reconnectAfter chan error // reconnect requests + readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine + + wg sync.WaitGroup + + mtx sync.RWMutex + sentLastPingAt time.Time + reconnecting bool + + // Maximum reconnect attempts (0 or greater; default: 25). + maxReconnectAttempts int + + // Time allowed to write a message to the server. 0 means block until operation succeeds. + writeWait time.Duration + + // Time allowed to read the next message from the server. 0 means block until operation succeeds. + readWait time.Duration + + // Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent. + pingPeriod time.Duration +} + +// NewWSClient returns a new client. See the commentary on the func(*WSClient) +// functions for a detailed description of how to configure ping period and +// pong wait time. The endpoint argument must begin with a `/`. +func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient { + addr, dialer := makeHTTPDialer(remoteAddr) + c := &WSClient{ + Address: addr, + Dialer: dialer, + Endpoint: endpoint, + PingPongLatencyTimer: metrics.NewTimer(), + + maxReconnectAttempts: defaultMaxReconnectAttempts, + readWait: defaultReadWait, + writeWait: defaultWriteWait, + pingPeriod: defaultPingPeriod, + } + c.BaseService = *cmn.NewBaseService(nil, "WSClient", c) + for _, option := range options { + option(c) + } + return c +} + +// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error. +// It should only be used in the constructor and is not Goroutine-safe. +func MaxReconnectAttempts(max int) func(*WSClient) { + return func(c *WSClient) { + c.maxReconnectAttempts = max + } +} + +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor and is not Goroutine-safe. +func ReadWait(readWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + c.readWait = readWait + } +} + +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor and is not Goroutine-safe. +func WriteWait(writeWait time.Duration) func(*WSClient) { + return func(c *WSClient) { + c.writeWait = writeWait + } +} + +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. +func PingPeriod(pingPeriod time.Duration) func(*WSClient) { + return func(c *WSClient) { + c.pingPeriod = pingPeriod + } +} + +// OnReconnect sets the callback, which will be called every time after +// successful reconnect. +func OnReconnect(cb func()) func(*WSClient) { + return func(c *WSClient) { + c.onReconnect = cb + } +} + +// String returns WS client full address. +func (c *WSClient) String() string { + return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint) +} + +// OnStart implements cmn.Service by dialing a server and creating read and +// write routines. +func (c *WSClient) OnStart() error { + err := c.dial() + if err != nil { + return err + } + + c.ResponsesCh = make(chan types.RPCResponse) + + c.send = make(chan types.RPCRequest) + // 1 additional error may come from the read/write + // goroutine depending on which failed first. + c.reconnectAfter = make(chan error, 1) + // capacity for 1 request. a user won't be able to send more because the send + // channel is unbuffered. + c.backlog = make(chan types.RPCRequest, 1) + + c.startReadWriteRoutines() + go c.reconnectRoutine() + + return nil +} + +// OnStop implements cmn.Service. +func (c *WSClient) OnStop() {} + +// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit +// channel is closed. +func (c *WSClient) Stop() error { + if err := c.BaseService.Stop(); err != nil { + return err + } + // only close user-facing channels when we can't write to them + c.wg.Wait() + close(c.ResponsesCh) + + return nil +} + +// IsReconnecting returns true if the client is reconnecting right now. +func (c *WSClient) IsReconnecting() bool { + c.mtx.RLock() + defer c.mtx.RUnlock() + return c.reconnecting +} + +// IsActive returns true if the client is running and not reconnecting. +func (c *WSClient) IsActive() bool { + return c.IsRunning() && !c.IsReconnecting() +} + +// Send the given RPC request to the server. Results will be available on +// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or +// ctx.Done is closed. +func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error { + select { + case c.send <- request: + c.Logger.Info("sent a request", "req", request) + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Call the given method. See Send description. +func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error { + request, err := types.MapToRequest("ws-client", method, params) + if err != nil { + return err + } + return c.Send(ctx, request) +} + +// CallWithArrayParams the given method with params in a form of array. See +// Send description. +func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error { + request, err := types.ArrayToRequest("ws-client", method, params) + if err != nil { + return err + } + return c.Send(ctx, request) +} + +/////////////////////////////////////////////////////////////////////////////// +// Private methods + +func (c *WSClient) dial() error { + dialer := &websocket.Dialer{ + NetDial: c.Dialer, + Proxy: http.ProxyFromEnvironment, + } + rHeader := http.Header{} + conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader) + if err != nil { + return err + } + c.conn = conn + return nil +} + +// reconnect tries to redial up to maxReconnectAttempts with exponential +// backoff. +func (c *WSClient) reconnect() error { + attempt := 0 + + c.mtx.Lock() + c.reconnecting = true + c.mtx.Unlock() + defer func() { + c.mtx.Lock() + c.reconnecting = false + c.mtx.Unlock() + }() + + for { + jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns) + backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second) + + c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration) + time.Sleep(backoffDuration) + + err := c.dial() + if err != nil { + c.Logger.Error("failed to redial", "err", err) + } else { + c.Logger.Info("reconnected") + if c.onReconnect != nil { + go c.onReconnect() + } + return nil + } + + attempt++ + + if attempt > c.maxReconnectAttempts { + return errors.Wrap(err, "reached maximum reconnect attempts") + } + } +} + +func (c *WSClient) startReadWriteRoutines() { + c.wg.Add(2) + c.readRoutineQuit = make(chan struct{}) + go c.readRoutine() + go c.writeRoutine() +} + +func (c *WSClient) processBacklog() error { + select { + case request := <-c.backlog: + if c.writeWait > 0 { + if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil { + c.Logger.Error("failed to set write deadline", "err", err) + } + } + if err := c.conn.WriteJSON(request); err != nil { + c.Logger.Error("failed to resend request", "err", err) + c.reconnectAfter <- err + // requeue request + c.backlog <- request + return err + } + c.Logger.Info("resend a request", "req", request) + default: + } + return nil +} + +func (c *WSClient) reconnectRoutine() { + for { + select { + case originalError := <-c.reconnectAfter: + // wait until writeRoutine and readRoutine finish + c.wg.Wait() + if err := c.reconnect(); err != nil { + c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError) + c.Stop() + return + } + // drain reconnectAfter + LOOP: + for { + select { + case <-c.reconnectAfter: + default: + break LOOP + } + } + err := c.processBacklog() + if err == nil { + c.startReadWriteRoutines() + } + + case <-c.Quit(): + return + } + } +} + +// The client ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *WSClient) writeRoutine() { + var ticker *time.Ticker + if c.pingPeriod > 0 { + // ticker with a predefined period + ticker = time.NewTicker(c.pingPeriod) + } else { + // ticker that never fires + ticker = &time.Ticker{C: make(<-chan time.Time)} + } + + defer func() { + ticker.Stop() + if err := c.conn.Close(); err != nil { + // ignore error; it will trigger in tests + // likely because it's closing an already closed connection + } + c.wg.Done() + }() + + for { + select { + case request := <-c.send: + if c.writeWait > 0 { + if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil { + c.Logger.Error("failed to set write deadline", "err", err) + } + } + if err := c.conn.WriteJSON(request); err != nil { + c.Logger.Error("failed to send request", "err", err) + c.reconnectAfter <- err + // add request to the backlog, so we don't lose it + c.backlog <- request + return + } + case <-ticker.C: + if c.writeWait > 0 { + if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil { + c.Logger.Error("failed to set write deadline", "err", err) + } + } + if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + c.Logger.Error("failed to write ping", "err", err) + c.reconnectAfter <- err + return + } + c.mtx.Lock() + c.sentLastPingAt = time.Now() + c.mtx.Unlock() + c.Logger.Debug("sent ping") + case <-c.readRoutineQuit: + return + case <-c.Quit(): + if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil { + c.Logger.Error("failed to write message", "err", err) + } + return + } + } +} + +// The client ensures that there is at most one reader to a connection by +// executing all reads from this goroutine. +func (c *WSClient) readRoutine() { + defer func() { + if err := c.conn.Close(); err != nil { + // ignore error; it will trigger in tests + // likely because it's closing an already closed connection + } + c.wg.Done() + }() + + c.conn.SetPongHandler(func(string) error { + // gather latency stats + c.mtx.RLock() + t := c.sentLastPingAt + c.mtx.RUnlock() + c.PingPongLatencyTimer.UpdateSince(t) + + c.Logger.Debug("got pong") + return nil + }) + + for { + // reset deadline for every message type (control or data) + if c.readWait > 0 { + if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil { + c.Logger.Error("failed to set read deadline", "err", err) + } + } + _, data, err := c.conn.ReadMessage() + if err != nil { + if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + return + } + + c.Logger.Error("failed to read response", "err", err) + close(c.readRoutineQuit) + c.reconnectAfter <- err + return + } + + var response types.RPCResponse + err = json.Unmarshal(data, &response) + if err != nil { + c.Logger.Error("failed to parse response", "err", err, "data", string(data)) + continue + } + c.Logger.Info("got response", "resp", response.Result) + // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking + // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop + // both readRoutine and writeRoutine + select { + case <-c.Quit(): + case c.ResponsesCh <- response: + } + } +} + +/////////////////////////////////////////////////////////////////////////////// +// Predefined methods + +// Subscribe to a query. Note the server must have a "subscribe" route +// defined. +func (c *WSClient) Subscribe(ctx context.Context, query string) error { + params := map[string]interface{}{"query": query} + return c.Call(ctx, "subscribe", params) +} + +// Unsubscribe from a query. Note the server must have a "unsubscribe" route +// defined. +func (c *WSClient) Unsubscribe(ctx context.Context, query string) error { + params := map[string]interface{}{"query": query} + return c.Call(ctx, "unsubscribe", params) +} + +// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route +// defined. +func (c *WSClient) UnsubscribeAll(ctx context.Context) error { + params := map[string]interface{}{} + return c.Call(ctx, "unsubscribe_all", params) +} diff --git a/rpc/tm/lib/doc.go b/rpc/tm/lib/doc.go new file mode 100644 index 0000000000000000000000000000000000000000..2bc438593705184b5ba2a94e602a350ab69fe1cc --- /dev/null +++ b/rpc/tm/lib/doc.go @@ -0,0 +1,103 @@ +/* +HTTP RPC server supporting calls via uri params, jsonrpc, and jsonrpc over websockets + +# Client Requests + +Suppose we want to expose the rpc function `HelloWorld(name string, num int)`. + +## GET (URI) + +As a GET request, it would have URI encoded parameters, and look like: + +``` +curl 'http://localhost:8008/hello_world?name="my_world"&num=5' +``` + +Note the `'` around the url, which is just so bash doesn't ignore the quotes in `"my_world"`. +This should also work: + +``` +curl http://localhost:8008/hello_world?name=\"my_world\"&num=5 +``` + +A GET request to `/` returns a list of available endpoints. +For those which take arguments, the arguments will be listed in order, with `_` where the actual value should be. + +## POST (JSONRPC) + +As a POST request, we use JSONRPC. For instance, the same request would have this as the body: + +``` +{ + "jsonrpc": "2.0", + "id": "anything", + "method": "hello_world", + "params": { + "name": "my_world", + "num": 5 + } +} +``` + +With the above saved in file `data.json`, we can make the request with + +``` +curl --data @data.json http://localhost:8008 +``` + +## WebSocket (JSONRPC) + +All requests are exposed over websocket in the same form as the POST JSONRPC. +Websocket connections are available at their own endpoint, typically `/websocket`, +though this is configurable when starting the server. + +# Server Definition + +Define some types and routes: + +``` +type ResultStatus struct { + Value string +} + +// Define some routes +var Routes = map[string]*rpcserver.RPCFunc{ + "status": rpcserver.NewRPCFunc(Status, "arg"), +} + +// an rpc function +func Status(v string) (*ResultStatus, error) { + return &ResultStatus{v}, nil +} + +``` + +Now start the server: + +``` +mux := http.NewServeMux() +rpcserver.RegisterRPCFuncs(mux, Routes) +wm := rpcserver.NewWebsocketManager(Routes) +mux.HandleFunc("/websocket", wm.WebsocketHandler) +logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)) +go func() { + _, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger) + if err != nil { + panic(err) + } +}() + +``` + +Note that unix sockets are supported as well (eg. `/path/to/socket` instead of `0.0.0.0:8008`) + +Now see all available endpoints by sending a GET request to `0.0.0.0:8008`. +Each route is available as a GET request, as a JSONRPCv2 POST request, and via JSONRPCv2 over websockets. + + +# Examples + +* [Tendermint](https://github.com/tendermint/tendermint/blob/master/rpc/core/routes.go) +* [tm-monitor](https://github.com/tendermint/tools/blob/master/tm-monitor/rpc.go) +*/ +package rpc diff --git a/rpc/tm/lib/rpc_test.go b/rpc/tm/lib/rpc_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f34b09f68f1c10e011565934847f796daf6920ff --- /dev/null +++ b/rpc/tm/lib/rpc_test.go @@ -0,0 +1,378 @@ +package rpc + +import ( + "bytes" + "context" + crand "crypto/rand" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "os" + "os/exec" + "testing" + "time" + + "github.com/go-kit/kit/log/term" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tendermint/go-amino" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" + + client "github.com/tendermint/tendermint/rpc/lib/client" + server "github.com/tendermint/tendermint/rpc/lib/server" + types "github.com/tendermint/tendermint/rpc/lib/types" +) + +// Client and Server should work over tcp or unix sockets +const ( + tcpAddr = "tcp://0.0.0.0:47768" + + unixSocket = "/tmp/rpc_test.sock" + unixAddr = "unix://" + unixSocket + + websocketEndpoint = "/websocket/endpoint" +) + +type ResultEcho struct { + Value string `json:"value"` +} + +type ResultEchoInt struct { + Value int `json:"value"` +} + +type ResultEchoBytes struct { + Value []byte `json:"value"` +} + +type ResultEchoDataBytes struct { + Value cmn.HexBytes `json:"value"` +} + +// Define some routes +var Routes = map[string]*server.RPCFunc{ + "echo": server.NewRPCFunc(EchoResult, "arg"), + "echo_ws": server.NewWSRPCFunc(EchoWSResult, "arg"), + "echo_bytes": server.NewRPCFunc(EchoBytesResult, "arg"), + "echo_data_bytes": server.NewRPCFunc(EchoDataBytesResult, "arg"), + "echo_int": server.NewRPCFunc(EchoIntResult, "arg"), +} + +// Amino codec required to encode/decode everything above. +var RoutesCdc = amino.NewCodec() + +func EchoResult(v string) (*ResultEcho, error) { + return &ResultEcho{v}, nil +} + +func EchoWSResult(wsCtx types.WSRPCContext, v string) (*ResultEcho, error) { + return &ResultEcho{v}, nil +} + +func EchoIntResult(v int) (*ResultEchoInt, error) { + return &ResultEchoInt{v}, nil +} + +func EchoBytesResult(v []byte) (*ResultEchoBytes, error) { + return &ResultEchoBytes{v}, nil +} + +func EchoDataBytesResult(v cmn.HexBytes) (*ResultEchoDataBytes, error) { + return &ResultEchoDataBytes{v}, nil +} + +func TestMain(m *testing.M) { + setup() + code := m.Run() + os.Exit(code) +} + +var colorFn = func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "socket" { + if keyvals[i+1] == "tcp" { + return term.FgBgColor{Fg: term.DarkBlue} + } else if keyvals[i+1] == "unix" { + return term.FgBgColor{Fg: term.DarkCyan} + } + } + } + return term.FgBgColor{} +} + +// launch unix and tcp servers +func setup() { + logger := log.NewTMLoggerWithColorFn(log.NewSyncWriter(os.Stdout), colorFn) + + cmd := exec.Command("rm", "-f", unixSocket) + err := cmd.Start() + if err != nil { + panic(err) + } + if err = cmd.Wait(); err != nil { + panic(err) + } + + tcpLogger := logger.With("socket", "tcp") + mux := http.NewServeMux() + server.RegisterRPCFuncs(mux, Routes, RoutesCdc, tcpLogger) + wm := server.NewWebsocketManager(Routes, RoutesCdc, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) + wm.SetLogger(tcpLogger) + mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) + go func() { + _, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger) + if err != nil { + panic(err) + } + }() + + unixLogger := logger.With("socket", "unix") + mux2 := http.NewServeMux() + server.RegisterRPCFuncs(mux2, Routes, RoutesCdc, unixLogger) + wm = server.NewWebsocketManager(Routes, RoutesCdc) + wm.SetLogger(unixLogger) + mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) + go func() { + _, err := server.StartHTTPServer(unixAddr, mux2, unixLogger) + if err != nil { + panic(err) + } + }() + + // wait for servers to start + time.Sleep(time.Second * 2) +} + +func echoViaHTTP(cl client.HTTPClient, val string) (string, error) { + params := map[string]interface{}{ + "arg": val, + } + result := new(ResultEcho) + if _, err := cl.Call("echo", params, result); err != nil { + return "", err + } + return result.Value, nil +} + +func echoIntViaHTTP(cl client.HTTPClient, val int) (int, error) { + params := map[string]interface{}{ + "arg": val, + } + result := new(ResultEchoInt) + if _, err := cl.Call("echo_int", params, result); err != nil { + return 0, err + } + return result.Value, nil +} + +func echoBytesViaHTTP(cl client.HTTPClient, bytes []byte) ([]byte, error) { + params := map[string]interface{}{ + "arg": bytes, + } + result := new(ResultEchoBytes) + if _, err := cl.Call("echo_bytes", params, result); err != nil { + return []byte{}, err + } + return result.Value, nil +} + +func echoDataBytesViaHTTP(cl client.HTTPClient, bytes cmn.HexBytes) (cmn.HexBytes, error) { + params := map[string]interface{}{ + "arg": bytes, + } + result := new(ResultEchoDataBytes) + if _, err := cl.Call("echo_data_bytes", params, result); err != nil { + return []byte{}, err + } + return result.Value, nil +} + +func testWithHTTPClient(t *testing.T, cl client.HTTPClient) { + val := "acbd" + got, err := echoViaHTTP(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) + + val2 := randBytes(t) + got2, err := echoBytesViaHTTP(cl, val2) + require.Nil(t, err) + assert.Equal(t, got2, val2) + + val3 := cmn.HexBytes(randBytes(t)) + got3, err := echoDataBytesViaHTTP(cl, val3) + require.Nil(t, err) + assert.Equal(t, got3, val3) + + val4 := rand.Intn(10000) + got4, err := echoIntViaHTTP(cl, val4) + require.Nil(t, err) + assert.Equal(t, got4, val4) +} + +func echoViaWS(cl *client.WSClient, val string) (string, error) { + params := map[string]interface{}{ + "arg": val, + } + err := cl.Call(context.Background(), "echo", params) + if err != nil { + return "", err + } + + msg := <-cl.ResponsesCh + if msg.Error != nil { + return "", err + + } + result := new(ResultEcho) + err = json.Unmarshal(msg.Result, result) + if err != nil { + return "", nil + } + return result.Value, nil +} + +func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) { + params := map[string]interface{}{ + "arg": bytes, + } + err := cl.Call(context.Background(), "echo_bytes", params) + if err != nil { + return []byte{}, err + } + + msg := <-cl.ResponsesCh + if msg.Error != nil { + return []byte{}, msg.Error + + } + result := new(ResultEchoBytes) + err = json.Unmarshal(msg.Result, result) + if err != nil { + return []byte{}, nil + } + return result.Value, nil +} + +func testWithWSClient(t *testing.T, cl *client.WSClient) { + val := "acbd" + got, err := echoViaWS(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) + + val2 := randBytes(t) + got2, err := echoBytesViaWS(cl, val2) + require.Nil(t, err) + assert.Equal(t, got2, val2) +} + +//------------- + +func TestServersAndClientsBasic(t *testing.T) { + serverAddrs := [...]string{tcpAddr, unixAddr} + for _, addr := range serverAddrs { + cl1 := client.NewURIClient(addr) + fmt.Printf("=== testing server on %s using %v client", addr, cl1) + testWithHTTPClient(t, cl1) + + cl2 := client.NewJSONRPCClient(addr) + fmt.Printf("=== testing server on %s using %v client", addr, cl2) + testWithHTTPClient(t, cl2) + + cl3 := client.NewWSClient(addr, websocketEndpoint) + cl3.SetLogger(log.TestingLogger()) + err := cl3.Start() + require.Nil(t, err) + fmt.Printf("=== testing server on %s using %v client", addr, cl3) + testWithWSClient(t, cl3) + cl3.Stop() + } +} + +func TestHexStringArg(t *testing.T) { + cl := client.NewURIClient(tcpAddr) + // should NOT be handled as hex + val := "0xabc" + got, err := echoViaHTTP(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) +} + +func TestQuotedStringArg(t *testing.T) { + cl := client.NewURIClient(tcpAddr) + // should NOT be unquoted + val := "\"abc\"" + got, err := echoViaHTTP(cl, val) + require.Nil(t, err) + assert.Equal(t, got, val) +} + +func TestWSNewWSRPCFunc(t *testing.T) { + cl := client.NewWSClient(tcpAddr, websocketEndpoint) + cl.SetLogger(log.TestingLogger()) + err := cl.Start() + require.Nil(t, err) + defer cl.Stop() + + val := "acbd" + params := map[string]interface{}{ + "arg": val, + } + err = cl.Call(context.Background(), "echo_ws", params) + require.Nil(t, err) + + msg := <-cl.ResponsesCh + if msg.Error != nil { + t.Fatal(err) + } + result := new(ResultEcho) + err = json.Unmarshal(msg.Result, result) + require.Nil(t, err) + got := result.Value + assert.Equal(t, got, val) +} + +func TestWSHandlesArrayParams(t *testing.T) { + cl := client.NewWSClient(tcpAddr, websocketEndpoint) + cl.SetLogger(log.TestingLogger()) + err := cl.Start() + require.Nil(t, err) + defer cl.Stop() + + val := "acbd" + params := []interface{}{val} + err = cl.CallWithArrayParams(context.Background(), "echo_ws", params) + require.Nil(t, err) + + msg := <-cl.ResponsesCh + if msg.Error != nil { + t.Fatalf("%+v", err) + } + result := new(ResultEcho) + err = json.Unmarshal(msg.Result, result) + require.Nil(t, err) + got := result.Value + assert.Equal(t, got, val) +} + +// TestWSClientPingPong checks that a client & server exchange pings +// & pongs so connection stays alive. +func TestWSClientPingPong(t *testing.T) { + cl := client.NewWSClient(tcpAddr, websocketEndpoint) + cl.SetLogger(log.TestingLogger()) + err := cl.Start() + require.Nil(t, err) + defer cl.Stop() + + time.Sleep(6 * time.Second) +} + +func randBytes(t *testing.T) []byte { + n := rand.Intn(10) + 2 + buf := make([]byte, n) + _, err := crand.Read(buf) + require.Nil(t, err) + return bytes.Replace(buf, []byte("="), []byte{100}, -1) +} diff --git a/rpc/tm/lib/server/handlers.go b/rpc/tm/lib/server/handlers.go new file mode 100644 index 0000000000000000000000000000000000000000..8f9e103ed6820ac1a419dad80484d5c98670deb2 --- /dev/null +++ b/rpc/tm/lib/server/handlers.go @@ -0,0 +1,771 @@ +package server + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "reflect" + "runtime/debug" + "sort" + "strings" + "time" + + "github.com/gorilla/websocket" + "github.com/hyperledger/burrow/consensus/tendermint" + "github.com/hyperledger/burrow/logging" + "github.com/hyperledger/burrow/logging/structure" + "github.com/hyperledger/burrow/rpc/tm/lib/types" + "github.com/pkg/errors" + cmn "github.com/tendermint/tmlibs/common" +) + +// RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions. +// "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse +func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger *logging.Logger) { + // HTTP endpoints + for funcName, rpcFunc := range funcMap { + mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc, logger)) + } + + // JSONRPC endpoints + mux.HandleFunc("/", makeJSONRPCHandler(funcMap, logger)) +} + +//------------------------------------- +// function introspection + +// RPCFunc contains the introspected type information for a function +type RPCFunc struct { + f reflect.Value // underlying rpc function + args []reflect.Type // type of each function arg + returns []reflect.Type // type of each return arg + argNames []string // name of each argument + ws bool // websocket only +} + +// NewRPCFunc wraps a function for introspection. +// f is the function, args are comma separated argument names +func NewRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, false) +} + +// NewWSRPCFunc wraps a function for introspection and use in the websockets. +func NewWSRPCFunc(f interface{}, args string) *RPCFunc { + return newRPCFunc(f, args, true) +} + +func newRPCFunc(f interface{}, args string, ws bool) *RPCFunc { + var argNames []string + if args != "" { + argNames = strings.Split(args, ",") + } + return &RPCFunc{ + f: reflect.ValueOf(f), + args: funcArgTypes(f), + returns: funcReturnTypes(f), + argNames: argNames, + ws: ws, + } +} + +// return a function's argument types +func funcArgTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumIn() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.In(i) + } + return typez +} + +// return a function's return types +func funcReturnTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumOut() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.Out(i) + } + return typez +} + +// function introspection +//----------------------------------------------------------------------------- +// rpc.json + +// jsonrpc calls grab the given method's function info and runs reflect.Call +func makeJSONRPCHandler(funcMap map[string]*RPCFunc, logger *logging.Logger) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b, err := ioutil.ReadAll(r.Body) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCInvalidRequestError("", errors.Wrap(err, "Error reading request body"))) + return + } + // if its an empty request (like from a browser), + // just display a list of functions + if len(b) == 0 { + writeListOfEndpoints(w, r, funcMap) + return + } + + var request types.RPCRequest + err = json.Unmarshal(b, &request) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCParseError("", errors.Wrap(err, "Error unmarshalling request"))) + return + } + // A Notification is a Request object without an "id" member. + // The Server MUST NOT reply to a Notification, including those that are within a batch request. + if request.ID == "" { + logger.TraceMsg("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") + return + } + if len(r.URL.Path) > 1 { + WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path))) + return + } + rpcFunc := funcMap[request.Method] + if rpcFunc == nil || rpcFunc.ws { + WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID)) + return + } + var args []reflect.Value + if len(request.Params) > 0 { + args, err = jsonParamsToArgsRPC(rpcFunc, request.Params) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) + return + } + } + returns := rpcFunc.f.Call(args) + logger.InfoMsg("HTTP JSONRPC called", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCInternalError(request.ID, err)) + return + } + WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse(request.ID, result)) + } +} + +func mapParamsToArgs(rpcFunc *RPCFunc, params map[string]json.RawMessage, argsOffset int) ([]reflect.Value, error) { + values := make([]reflect.Value, len(rpcFunc.argNames)) + for i, argName := range rpcFunc.argNames { + argType := rpcFunc.args[i+argsOffset] + + if p, ok := params[argName]; ok && p != nil && len(p) > 0 { + val := reflect.New(argType) + err := json.Unmarshal(p, val.Interface()) + if err != nil { + return nil, err + } + values[i] = val.Elem() + } else { // use default for that type + values[i] = reflect.Zero(argType) + } + } + + return values, nil +} + +func arrayParamsToArgs(rpcFunc *RPCFunc, params []json.RawMessage, argsOffset int) ([]reflect.Value, error) { + if len(rpcFunc.argNames) != len(params) { + return nil, errors.Errorf("Expected %v parameters (%v), got %v (%v)", + len(rpcFunc.argNames), rpcFunc.argNames, len(params), params) + } + + values := make([]reflect.Value, len(params)) + for i, p := range params { + argType := rpcFunc.args[i+argsOffset] + val := reflect.New(argType) + err := json.Unmarshal(p, val.Interface()) + if err != nil { + return nil, err + } + values[i] = val.Elem() + } + return values, nil +} + +// `raw` is unparsed json (from json.RawMessage) encoding either a map or an array. +// `argsOffset` should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames). +// +// Example: +// rpcFunc.args = [rpctypes.WSRPCContext string] +// rpcFunc.argNames = ["arg"] +func jsonParamsToArgs(rpcFunc *RPCFunc, raw []byte, argsOffset int) ([]reflect.Value, error) { + + // TODO: Make more efficient, perhaps by checking the first character for '{' or '['? + // First, try to get the map. + var m map[string]json.RawMessage + err := json.Unmarshal(raw, &m) + if err == nil { + return mapParamsToArgs(rpcFunc, m, argsOffset) + } + + // Otherwise, try an array. + var a []json.RawMessage + err = json.Unmarshal(raw, &a) + if err == nil { + return arrayParamsToArgs(rpcFunc, a, argsOffset) + } + + // Otherwise, bad format, we cannot parse + return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err) +} + +// Convert a []interface{} OR a map[string]interface{} to properly typed values +func jsonParamsToArgsRPC(rpcFunc *RPCFunc, params json.RawMessage) ([]reflect.Value, error) { + return jsonParamsToArgs(rpcFunc, params, 0) +} + +// Same as above, but with the first param the websocket connection +func jsonParamsToArgsWS(rpcFunc *RPCFunc, params json.RawMessage, wsCtx types.WSRPCContext) ([]reflect.Value, error) { + values, err := jsonParamsToArgs(rpcFunc, params, 1) + if err != nil { + return nil, err + } + return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil +} + +// rpc.json +//----------------------------------------------------------------------------- +// rpc.http + +// convert from a function name to the http handler +func makeHTTPHandler(rpcFunc *RPCFunc, logger *logging.Logger) func(http.ResponseWriter, *http.Request) { + // Exception for websocket endpoints + if rpcFunc.ws { + return func(w http.ResponseWriter, r *http.Request) { + WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError("")) + } + } + logger = logger.WithScope("makeHTTPHandler") + // All other endpoints + return func(w http.ResponseWriter, r *http.Request) { + logger.TraceMsg("HTTP REST Handler received request", "request", r) + args, err := httpParamsToArgs(rpcFunc, r) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCInvalidParamsError("", errors.Wrap(err, "Error converting http params to arguments"))) + return + } + returns := rpcFunc.f.Call(args) + logger.InfoMsg("HTTP REST", "method", r.URL.Path, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, types.RPCInternalError("", err)) + return + } + WriteRPCResponseHTTP(w, types.NewRPCSuccessResponse("", result)) + } +} + +// Covert an http query to a list of properly typed values. +// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). +func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { + values := make([]reflect.Value, len(rpcFunc.args)) + + for i, name := range rpcFunc.argNames { + argType := rpcFunc.args[i] + + values[i] = reflect.Zero(argType) // set default for that type + + arg := GetParam(r, name) + // log.Notice("param to arg", "argType", argType, "name", name, "arg", arg) + + if "" == arg { + continue + } + + v, err, ok := nonJSONToArg(argType, arg) + if err != nil { + return nil, err + } + if ok { + values[i] = v + continue + } + + values[i], err = jsonStringToArg(argType, arg) + if err != nil { + return nil, err + } + } + + return values, nil +} + +func jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { + v := reflect.New(ty) + err := json.Unmarshal([]byte(arg), v.Interface()) + if err != nil { + return v, err + } + v = v.Elem() + return v, nil +} + +func nonJSONToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) { + isQuotedString := strings.HasPrefix(arg, `"`) && strings.HasSuffix(arg, `"`) + isHexString := strings.HasPrefix(strings.ToLower(arg), "0x") + expectingString := ty.Kind() == reflect.String + expectingByteSlice := ty.Kind() == reflect.Slice && ty.Elem().Kind() == reflect.Uint8 + + if isHexString { + if !expectingString && !expectingByteSlice { + err := errors.Errorf("Got a hex string arg, but expected '%s'", + ty.Kind().String()) + return reflect.ValueOf(nil), err, false + } + + var value []byte + value, err := hex.DecodeString(arg[2:]) + if err != nil { + return reflect.ValueOf(nil), err, false + } + if ty.Kind() == reflect.String { + return reflect.ValueOf(string(value)), nil, true + } + return reflect.ValueOf([]byte(value)), nil, true + } + + if isQuotedString && expectingByteSlice { + v := reflect.New(reflect.TypeOf("")) + err := json.Unmarshal([]byte(arg), v.Interface()) + if err != nil { + return reflect.ValueOf(nil), err, false + } + v = v.Elem() + return reflect.ValueOf([]byte(v.String())), nil, true + } + + return reflect.ValueOf(nil), nil, false +} + +// rpc.http +//----------------------------------------------------------------------------- +// rpc.websocket + +const ( + defaultWSWriteChanCapacity = 1000 + defaultWSWriteWait = 10 * time.Second + defaultWSReadWait = 30 * time.Second + defaultWSPingPeriod = (defaultWSReadWait * 9) / 10 +) + +// A single websocket connection contains listener id, underlying ws +// connection, and the event switch for subscribing to events. +// +// In case of an error, the connection is stopped. +type wsConnection struct { + cmn.BaseService + + remoteAddr string + baseConn *websocket.Conn + writeChan chan types.RPCResponse + + funcMap map[string]*RPCFunc + + // write channel capacity + writeChanCapacity int + + // each write times out after this. + writeWait time.Duration + + // Connection times out if we haven't received *anything* in this long, not even pings. + readWait time.Duration + + // Send pings to server with this period. Must be less than readWait, but greater than zero. + pingPeriod time.Duration + + // object that is used to subscribe / unsubscribe from events + eventSub types.EventSubscriber +} + +// NewWSConnection wraps websocket.Conn. +// +// See the commentary on the func(*wsConnection) functions for a detailed +// description of how to configure ping period and pong wait time. NOTE: if the +// write buffer is full, pongs may be dropped, which may cause clients to +// disconnect. see https://github.com/gorilla/websocket/issues/97 +func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, + logger *logging.Logger, options ...func(*wsConnection)) *wsConnection { + wsc := &wsConnection{ + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + funcMap: funcMap, + writeWait: defaultWSWriteWait, + writeChanCapacity: defaultWSWriteChanCapacity, + readWait: defaultWSReadWait, + pingPeriod: defaultWSPingPeriod, + } + for _, option := range options { + option(wsc) + } + + wsc.BaseService = *cmn.NewBaseService(tendermint.NewLogger(logger.With("remote", baseConn.RemoteAddr())), + "wsConnection", wsc) + return wsc +} + +// EventSubscriber sets object that is used to subscribe / unsubscribe from +// events - not Goroutine-safe. If none given, default node's eventBus will be +// used. +func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.eventSub = eventSub + } +} + +// WriteWait sets the amount of time to wait before a websocket write times out. +// It should only be used in the constructor - not Goroutine-safe. +func WriteWait(writeWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeWait = writeWait + } +} + +// WriteChanCapacity sets the capacity of the websocket write channel. +// It should only be used in the constructor - not Goroutine-safe. +func WriteChanCapacity(cap int) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.writeChanCapacity = cap + } +} + +// ReadWait sets the amount of time to wait before a websocket read times out. +// It should only be used in the constructor - not Goroutine-safe. +func ReadWait(readWait time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.readWait = readWait + } +} + +// PingPeriod sets the duration for sending websocket pings. +// It should only be used in the constructor - not Goroutine-safe. +func PingPeriod(pingPeriod time.Duration) func(*wsConnection) { + return func(wsc *wsConnection) { + wsc.pingPeriod = pingPeriod + } +} + +// OnStart implements cmn.Service by starting the read and write routines. It +// blocks until the connection closes. +func (wsc *wsConnection) OnStart() error { + wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity) + + // Read subscriptions/unsubscriptions to events + go wsc.readRoutine() + // Write responses, BLOCKING. + wsc.writeRoutine() + + return nil +} + +// OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions. +func (wsc *wsConnection) OnStop() { + // Both read and write loops close the websocket connection when they exit their loops. + // The writeChan is never closed, to allow WriteRPCResponse() to fail. + if wsc.eventSub != nil { + wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr) + } +} + +// GetRemoteAddr returns the remote address of the underlying connection. +// It implements WSRPCConnection +func (wsc *wsConnection) GetRemoteAddr() string { + return wsc.remoteAddr +} + +// GetEventSubscriber implements WSRPCConnection by returning event subscriber. +func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber { + return wsc.eventSub +} + +// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. +// It implements WSRPCConnection. It is Goroutine-safe. +func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { + select { + case <-wsc.Quit(): + return + case wsc.writeChan <- resp: + } +} + +// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block. +// It implements WSRPCConnection. It is Goroutine-safe +func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool { + select { + case <-wsc.Quit(): + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + +// Read from the socket and subscribe to or unsubscribe from events +func (wsc *wsConnection) readRoutine() { + defer func() { + if r := recover(); r != nil { + err, ok := r.(error) + if !ok { + err = fmt.Errorf("WSJSONRPC: %v", r) + } + wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) + wsc.WriteRPCResponse(types.RPCInternalError("unknown", err)) + go wsc.readRoutine() + } else { + wsc.baseConn.Close() // nolint: errcheck + } + }() + + wsc.baseConn.SetPongHandler(func(m string) error { + return wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)) + }) + + for { + select { + case <-wsc.Quit(): + return + default: + // reset deadline for every type of message (control or data) + if err := wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait)); err != nil { + wsc.Logger.Error("failed to set read deadline", "err", err) + } + var in []byte + _, in, err := wsc.baseConn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + wsc.Logger.Info("Client closed the connection") + } else { + wsc.Logger.Error("Failed to read request", "err", err) + } + wsc.Stop() + return + } + + var request types.RPCRequest + err = json.Unmarshal(in, &request) + if err != nil { + wsc.WriteRPCResponse(types.RPCParseError("", errors.Wrap(err, "Error unmarshaling request"))) + continue + } + + // A Notification is a Request object without an "id" member. + // The Server MUST NOT reply to a Notification, including those that are within a batch request. + if request.ID == "" { + wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") + continue + } + + // Now, fetch the RPCFunc and execute it. + + rpcFunc := wsc.funcMap[request.Method] + if rpcFunc == nil { + wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) + continue + } + var args []reflect.Value + if rpcFunc.ws { + wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc} + if len(request.Params) > 0 { + args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) + } + } else { + if len(request.Params) > 0 { + args, err = jsonParamsToArgsRPC(rpcFunc, request.Params) + } + } + if err != nil { + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) + continue + } + returns := rpcFunc.f.Call(args) + + // TODO: Need to encode args/returns to string if we want to log them + wsc.Logger.Info("WSJSONRPC", "method", request.Method) + + result, err := unreflectResult(returns) + if err != nil { + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) + continue + } else { + wsc.WriteRPCResponse(types.NewRPCSuccessResponse(request.ID, result)) + continue + } + + } + } +} + +// receives on a write channel and writes out on the socket +func (wsc *wsConnection) writeRoutine() { + pingTicker := time.NewTicker(wsc.pingPeriod) + defer func() { + pingTicker.Stop() + if err := wsc.baseConn.Close(); err != nil { + wsc.Logger.Error("Error closing connection", "err", err) + } + }() + + // https://github.com/gorilla/websocket/issues/97 + pongs := make(chan string, 1) + wsc.baseConn.SetPingHandler(func(m string) error { + select { + case pongs <- m: + default: + } + return nil + }) + + for { + select { + case m := <-pongs: + err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m)) + if err != nil { + wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err) + } + case <-pingTicker.C: + err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{}) + if err != nil { + wsc.Logger.Error("Failed to write ping", "err", err) + wsc.Stop() + return + } + case msg := <-wsc.writeChan: + jsonBytes, err := json.MarshalIndent(msg, "", " ") + if err != nil { + wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) + } else { + if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { + wsc.Logger.Error("Failed to write response", "err", err) + wsc.Stop() + return + } + } + case <-wsc.Quit(): + return + } + } +} + +// All writes to the websocket must (re)set the write deadline. +// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) +func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { + if err := wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait)); err != nil { + return err + } + return wsc.baseConn.WriteMessage(msgType, msg) +} + +//---------------------------------------- + +// WebsocketManager is the main manager for all websocket connections. +// It holds the event switch and a map of functions for routing. +// NOTE: The websocket path is defined externally, e.g. in node/node.go +type WebsocketManager struct { + websocket.Upgrader + funcMap map[string]*RPCFunc + logger *logging.Logger + wsConnOptions []func(*wsConnection) +} + +// NewWebsocketManager returns a new WebsocketManager that routes according to +// the given funcMap and connects to the server with the given connection +// options. +func NewWebsocketManager(funcMap map[string]*RPCFunc, logger *logging.Logger, + wsConnOptions ...func(*wsConnection)) *WebsocketManager { + return &WebsocketManager{ + funcMap: funcMap, + Upgrader: websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // TODO ??? + return true + }, + }, + logger: logger, + wsConnOptions: wsConnOptions, + } +} + +// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection. +func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { + wsConn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO - return http error + wm.logger.TraceMsg("Failed to upgrade to websocket connection", structure.ErrorKey, err) + return + } + + // register connection + con := NewWSConnection(wsConn, wm.funcMap, wm.logger, wm.wsConnOptions...) + wm.logger.InfoMsg("New websocket connection", "remote_address", con.remoteAddr) + err = con.Start() // Blocking + if err != nil { + wm.logger.TraceMsg("Error starting connection", structure.ErrorKey, err) + } +} + +// rpc.websocket +//----------------------------------------------------------------------------- + +// NOTE: assume returns is result struct and error. If error is not nil, return it +func unreflectResult(returns []reflect.Value) (interface{}, error) { + errV := returns[1] + if errV.Interface() != nil { + return nil, errors.Errorf("%v", errV.Interface()) + } + rv := returns[0] + // the result is a registered interface, + // we need a pointer to it so we can marshal with type byte + rvp := reflect.New(rv.Type()) + rvp.Elem().Set(rv) + return rvp.Interface(), nil +} + +// writes a list of available rpc endpoints as an html page +func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { + noArgNames := []string{} + argNames := []string{} + for name, funcData := range funcMap { + if len(funcData.args) == 0 { + noArgNames = append(noArgNames, name) + } else { + argNames = append(argNames, name) + } + } + sort.Strings(noArgNames) + sort.Strings(argNames) + buf := new(bytes.Buffer) + buf.WriteString("<html><body>") + buf.WriteString("<br>Available endpoints:<br>") + + for _, name := range noArgNames { + link := fmt.Sprintf("//%s/%s", r.Host, name) + buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link)) + } + + buf.WriteString("<br>Endpoints that require arguments:<br>") + for _, name := range argNames { + link := fmt.Sprintf("//%s/%s?", r.Host, name) + funcData := funcMap[name] + for i, argName := range funcData.argNames { + link += argName + "=_" + if i < len(funcData.argNames)-1 { + link += "&" + } + } + buf.WriteString(fmt.Sprintf("<a href=\"%s\">%s</a></br>", link, link)) + } + buf.WriteString("</body></html>") + w.Header().Set("Content-Type", "text/html") + w.WriteHeader(200) + w.Write(buf.Bytes()) // nolint: errcheck +} diff --git a/rpc/tm/lib/server/handlers_test.go b/rpc/tm/lib/server/handlers_test.go new file mode 100644 index 0000000000000000000000000000000000000000..44bebb0ec7638352e2ad6d91b62e2fe0a5b47420 --- /dev/null +++ b/rpc/tm/lib/server/handlers_test.go @@ -0,0 +1,93 @@ +package server + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/hyperledger/burrow/logging" + "github.com/hyperledger/burrow/rpc/tm/lib/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func testMux() *http.ServeMux { + funcMap := map[string]*RPCFunc{ + "c": NewRPCFunc(func(s string, i int) (string, error) { return "foo", nil }, "s,i"), + } + mux := http.NewServeMux() + logger := logging.NewNoopLogger() + RegisterRPCFuncs(mux, funcMap, logger) + + return mux +} + +func statusOK(code int) bool { return code >= 200 && code <= 299 } + +// Ensure that nefarious/unintended inputs to `params` +// do not crash our RPC handlers. +// See Issue https://github.com/tendermint/tendermint/issues/708. +func TestRPCParams(t *testing.T) { + mux := testMux() + tests := []struct { + payload string + wantErr string + }{ + // bad + {`{"jsonrpc": "2.0", "id": "0"}`, "Method not found"}, + {`{"jsonrpc": "2.0", "method": "y", "id": "0"}`, "Method not found"}, + {`{"method": "c", "id": "0", "params": a}`, "invalid character"}, + {`{"method": "c", "id": "0", "params": ["a"]}`, "got 1"}, + {`{"method": "c", "id": "0", "params": ["a", "b"]}`, "of type int"}, + {`{"method": "c", "id": "0", "params": [1, 1]}`, "of type string"}, + + // good + {`{"jsonrpc": "2.0", "method": "c", "id": "0", "params": null}`, ""}, + {`{"method": "c", "id": "0", "params": {}}`, ""}, + {`{"method": "c", "id": "0", "params": ["a", 10]}`, ""}, + } + + for i, tt := range tests { + req, _ := http.NewRequest("POST", "http://localhost/", strings.NewReader(tt.payload)) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + res := rec.Result() + // Always expecting back a JSONRPCResponse + assert.True(t, statusOK(res.StatusCode), "#%d: should always return 2XX", i) + blob, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Errorf("#%d: err reading body: %v", i, err) + continue + } + + recv := new(types.RPCResponse) + assert.Nil(t, json.Unmarshal(blob, recv), "#%d: expecting successful parsing of an RPCResponse:\nblob: %s", i, blob) + assert.NotEqual(t, recv, new(types.RPCResponse), "#%d: not expecting a blank RPCResponse", i) + + if tt.wantErr == "" { + assert.Nil(t, recv.Error, "#%d: not expecting an error", i) + } else { + assert.True(t, recv.Error.Code < 0, "#%d: not expecting a positive JSONRPC code", i) + // The wanted error is either in the message or the data + assert.Contains(t, recv.Error.Message+recv.Error.Data, tt.wantErr, "#%d: expected substring", i) + } + } +} + +func TestRPCNotification(t *testing.T) { + mux := testMux() + body := strings.NewReader(`{"jsonrpc": "2.0"}`) + req, _ := http.NewRequest("POST", "http://localhost/", body) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + res := rec.Result() + + // Always expecting back a JSONRPCResponse + require.True(t, statusOK(res.StatusCode), "should always return 2XX") + blob, err := ioutil.ReadAll(res.Body) + require.Nil(t, err, "reading from the body should not give back an error") + require.Equal(t, len(blob), 0, "a notification SHOULD NOT be responded to by the server") +} diff --git a/rpc/tm/lib/server/http_params.go b/rpc/tm/lib/server/http_params.go new file mode 100644 index 0000000000000000000000000000000000000000..248e451b2d9fc5d8f61286cb39966a012a3b3122 --- /dev/null +++ b/rpc/tm/lib/server/http_params.go @@ -0,0 +1,90 @@ +package server + +import ( + "net/http" + "regexp" + "strconv" + + "github.com/pkg/errors" + "github.com/tmthrgd/go-hex" +) + +var ( + // Parts of regular expressions + atom = "[A-Z0-9!#$%&'*+\\-/=?^_`{|}~]+" + dotAtom = atom + `(?:\.` + atom + `)*` + domain = `[A-Z0-9.-]+\.[A-Z]{2,4}` + + RE_HEX = regexp.MustCompile(`^(?i)[a-f0-9]+$`) + RE_EMAIL = regexp.MustCompile(`^(?i)(` + dotAtom + `)@(` + dotAtom + `)$`) + RE_ADDRESS = regexp.MustCompile(`^(?i)[a-z0-9]{25,34}$`) + RE_HOST = regexp.MustCompile(`^(?i)(` + domain + `)$`) + + //RE_ID12 = regexp.MustCompile(`^[a-zA-Z0-9]{12}$`) +) + +func GetParam(r *http.Request, param string) string { + s := r.URL.Query().Get(param) + if s == "" { + s = r.FormValue(param) + } + return s +} + +func GetParamByteSlice(r *http.Request, param string) ([]byte, error) { + s := GetParam(r, param) + return hex.DecodeString(s) +} + +func GetParamInt64(r *http.Request, param string) (int64, error) { + s := GetParam(r, param) + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return i, nil +} + +func GetParamInt32(r *http.Request, param string) (int32, error) { + s := GetParam(r, param) + i, err := strconv.ParseInt(s, 10, 32) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return int32(i), nil +} + +func GetParamUint64(r *http.Request, param string) (uint64, error) { + s := GetParam(r, param) + i, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return i, nil +} + +func GetParamUint(r *http.Request, param string) (uint, error) { + s := GetParam(r, param) + i, err := strconv.ParseUint(s, 10, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return uint(i), nil +} + +func GetParamRegexp(r *http.Request, param string, re *regexp.Regexp) (string, error) { + s := GetParam(r, param) + if !re.MatchString(s) { + return "", errors.Errorf(param, "Did not match regular expression %v", re.String()) + } + return s, nil +} + +func GetParamFloat64(r *http.Request, param string) (float64, error) { + s := GetParam(r, param) + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0, errors.Errorf(param, err.Error()) + } + return f, nil +} diff --git a/rpc/tm/lib/server/http_server.go b/rpc/tm/lib/server/http_server.go new file mode 100644 index 0000000000000000000000000000000000000000..d16696d48b85bb3b06e3f570feb26bd251142b46 --- /dev/null +++ b/rpc/tm/lib/server/http_server.go @@ -0,0 +1,160 @@ +// Commons for HTTP handling +package server + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "net/http" + "runtime/debug" + "strings" + "time" + + "github.com/hyperledger/burrow/logging" + "github.com/hyperledger/burrow/logging/structure" + "github.com/hyperledger/burrow/rpc/tm/lib/types" + "github.com/pkg/errors" +) + +func StartHTTPServer(listenAddr string, handler http.Handler, logger *logging.Logger) (listener net.Listener, err error) { + var proto, addr string + parts := strings.SplitN(listenAddr, "://", 2) + if len(parts) != 2 { + return nil, errors.Errorf("Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", listenAddr) + } + proto, addr = parts[0], parts[1] + + logger.InfoMsg("Starting RPC HTTP server", "listen_address", listenAddr) + listener, err = net.Listen(proto, addr) + if err != nil { + return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) + } + + go func() { + err := http.Serve( + listener, + RecoverAndLogHandler(handler, logger), + ) + logger.TraceMsg("RPC HTTP server stopped", structure.ErrorKey, err) + }() + return listener, nil +} + +func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, keyFile string, + logger *logging.Logger) (listener net.Listener, err error) { + + var proto, addr string + parts := strings.SplitN(listenAddr, "://", 2) + if len(parts) != 2 { + return nil, errors.Errorf("Invalid listening address %s (use fully formed addresses, including the tcp:// or unix:// prefix)", listenAddr) + } + proto, addr = parts[0], parts[1] + + logger.InfoMsg("Starting RPC HTTPS server", "listen_address", listenAddr, "cert_file", certFile, + "key_file", keyFile) + listener, err = net.Listen(proto, addr) + if err != nil { + return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err) + } + + go func() { + err := http.ServeTLS( + listener, + RecoverAndLogHandler(handler, logger), + certFile, + keyFile, + ) + logger.TraceMsg("RPC HTTPS server stopped", structure.ErrorKey, err) + }() + return listener, nil +} + +func WriteRPCResponseHTTPError(w http.ResponseWriter, httpCode int, res types.RPCResponse) { + jsonBytes, err := json.MarshalIndent(res, "", " ") + if err != nil { + panic(err) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(httpCode) + w.Write(jsonBytes) // nolint: errcheck, gas +} + +func WriteRPCResponseHTTP(w http.ResponseWriter, res types.RPCResponse) { + jsonBytes, err := json.MarshalIndent(res, "", " ") + if err != nil { + panic(err) + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + w.Write(jsonBytes) // nolint: errcheck, gas +} + +//----------------------------------------------------------------------------- + +// Wraps an HTTP handler, adding error logging. +// If the inner function panics, the outer function recovers, logs, sends an +// HTTP 500 error response. +func RecoverAndLogHandler(handler http.Handler, logger *logging.Logger) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Wrap the ResponseWriter to remember the status + rww := &ResponseWriterWrapper{-1, w} + begin := time.Now() + + // Common headers + origin := r.Header.Get("Origin") + rww.Header().Set("Access-Control-Allow-Origin", origin) + rww.Header().Set("Access-Control-Allow-Credentials", "true") + rww.Header().Set("Access-Control-Expose-Headers", "X-Server-Time") + rww.Header().Set("X-Server-Time", fmt.Sprintf("%v", begin.Unix())) + + defer func() { + // Send a 500 error if a panic happens during a handler. + // Without this, Chrome & Firefox were retrying aborted ajax requests, + // at least to my localhost. + if e := recover(); e != nil { + + // If RPCResponse + if res, ok := e.(types.RPCResponse); ok { + WriteRPCResponseHTTP(rww, res) + } else { + // For the rest, + logger.TraceMsg("Panic in RPC HTTP handler", structure.ErrorKey, e, + "stack", string(debug.Stack())) + rww.WriteHeader(http.StatusInternalServerError) + WriteRPCResponseHTTP(rww, types.RPCInternalError("", e.(error))) + } + } + + // Finally, log. + durationMS := time.Since(begin).Nanoseconds() / 1000000 + if rww.Status == -1 { + rww.Status = 200 + } + logger.InfoMsg("Served RPC HTTP response", + "method", r.Method, "url", r.URL, + "status", rww.Status, "duration", durationMS, + "remote_address", r.RemoteAddr, + ) + }() + + handler.ServeHTTP(rww, r) + }) +} + +// Remember the status for logging +type ResponseWriterWrapper struct { + Status int + http.ResponseWriter +} + +func (w *ResponseWriterWrapper) WriteHeader(status int) { + w.Status = status + w.ResponseWriter.WriteHeader(status) +} + +// implements http.Hijacker +func (w *ResponseWriterWrapper) Hijack() (net.Conn, *bufio.ReadWriter, error) { + return w.ResponseWriter.(http.Hijacker).Hijack() +} diff --git a/rpc/tm/lib/server/parse_test.go b/rpc/tm/lib/server/parse_test.go new file mode 100644 index 0000000000000000000000000000000000000000..0b5550c23a06878f1a0a5523fbb7317a68f95d79 --- /dev/null +++ b/rpc/tm/lib/server/parse_test.go @@ -0,0 +1,174 @@ +package server + +import ( + "encoding/json" + "strconv" + "testing" + + "github.com/stretchr/testify/assert" + cmn "github.com/tendermint/tmlibs/common" +) + +func TestParseJSONMap(t *testing.T) { + assert := assert.New(t) + + input := []byte(`{"value":"1234","height":22}`) + + // naive is float,string + var p1 map[string]interface{} + err := json.Unmarshal(input, &p1) + if assert.Nil(err) { + h, ok := p1["height"].(float64) + if assert.True(ok, "%#v", p1["height"]) { + assert.EqualValues(22, h) + } + v, ok := p1["value"].(string) + if assert.True(ok, "%#v", p1["value"]) { + assert.EqualValues("1234", v) + } + } + + // preloading map with values doesn't help + tmp := 0 + p2 := map[string]interface{}{ + "value": &cmn.HexBytes{}, + "height": &tmp, + } + err = json.Unmarshal(input, &p2) + if assert.Nil(err) { + h, ok := p2["height"].(float64) + if assert.True(ok, "%#v", p2["height"]) { + assert.EqualValues(22, h) + } + v, ok := p2["value"].(string) + if assert.True(ok, "%#v", p2["value"]) { + assert.EqualValues("1234", v) + } + } + + // preload here with *pointers* to the desired types + // struct has unknown types, but hard-coded keys + tmp = 0 + p3 := struct { + Value interface{} `json:"value"` + Height interface{} `json:"height"` + }{ + Height: &tmp, + Value: &cmn.HexBytes{}, + } + err = json.Unmarshal(input, &p3) + if assert.Nil(err) { + h, ok := p3.Height.(*int) + if assert.True(ok, "%#v", p3.Height) { + assert.Equal(22, *h) + } + v, ok := p3.Value.(*cmn.HexBytes) + if assert.True(ok, "%#v", p3.Value) { + assert.EqualValues([]byte{0x12, 0x34}, *v) + } + } + + // simplest solution, but hard-coded + p4 := struct { + Value cmn.HexBytes `json:"value"` + Height int `json:"height"` + }{} + err = json.Unmarshal(input, &p4) + if assert.Nil(err) { + assert.EqualValues(22, p4.Height) + assert.EqualValues([]byte{0x12, 0x34}, p4.Value) + } + + // so, let's use this trick... + // dynamic keys on map, and we can deserialize to the desired types + var p5 map[string]*json.RawMessage + err = json.Unmarshal(input, &p5) + if assert.Nil(err) { + var h int + err = json.Unmarshal(*p5["height"], &h) + if assert.Nil(err) { + assert.Equal(22, h) + } + + var v cmn.HexBytes + err = json.Unmarshal(*p5["value"], &v) + if assert.Nil(err) { + assert.Equal(cmn.HexBytes{0x12, 0x34}, v) + } + } +} + +func TestParseJSONArray(t *testing.T) { + assert := assert.New(t) + + input := []byte(`["1234",22]`) + + // naive is float,string + var p1 []interface{} + err := json.Unmarshal(input, &p1) + if assert.Nil(err) { + v, ok := p1[0].(string) + if assert.True(ok, "%#v", p1[0]) { + assert.EqualValues("1234", v) + } + h, ok := p1[1].(float64) + if assert.True(ok, "%#v", p1[1]) { + assert.EqualValues(22, h) + } + } + + // preloading map with values helps here (unlike map - p2 above) + tmp := 0 + p2 := []interface{}{&cmn.HexBytes{}, &tmp} + err = json.Unmarshal(input, &p2) + if assert.Nil(err) { + v, ok := p2[0].(*cmn.HexBytes) + if assert.True(ok, "%#v", p2[0]) { + assert.EqualValues([]byte{0x12, 0x34}, *v) + } + h, ok := p2[1].(*int) + if assert.True(ok, "%#v", p2[1]) { + assert.EqualValues(22, *h) + } + } +} + +func TestParseRPC(t *testing.T) { + assert := assert.New(t) + + demo := func(height int, name string) {} + call := NewRPCFunc(demo, "height,name") + + cases := []struct { + raw string + height int64 + name string + fail bool + }{ + // should parse + {`[7, "flew"]`, 7, "flew", false}, + {`{"name": "john", "height": 22}`, 22, "john", false}, + // defaults + {`{"name": "solo", "unused": "stuff"}`, 0, "solo", false}, + // should fail - wrong types/length + {`["flew", 7]`, 0, "", true}, + {`[7,"flew",100]`, 0, "", true}, + {`{"name": -12, "height": "fred"}`, 0, "", true}, + } + for idx, tc := range cases { + i := strconv.Itoa(idx) + data := []byte(tc.raw) + vals, err := jsonParamsToArgs(call, data, 0) + if tc.fail { + assert.NotNil(err, i) + } else { + assert.Nil(err, "%s: %+v", i, err) + if assert.Equal(2, len(vals), i) { + assert.Equal(tc.height, vals[0].Int(), i) + assert.Equal(tc.name, vals[1].String(), i) + } + } + + } + +} diff --git a/rpc/tm/lib/types/types.go b/rpc/tm/lib/types/types.go new file mode 100644 index 0000000000000000000000000000000000000000..42a7a219540c8305f51a940820d0e3870f54eceb --- /dev/null +++ b/rpc/tm/lib/types/types.go @@ -0,0 +1,183 @@ +package types + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/pkg/errors" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" +) + +//---------------------------------------- +// REQUEST + +type RPCRequest struct { + JSONRPC string `json:"jsonrpc"` + ID string `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} +} + +func NewRPCRequest(id string, method string, params json.RawMessage) RPCRequest { + return RPCRequest{ + JSONRPC: "2.0", + ID: id, + Method: method, + Params: params, + } +} + +func (req RPCRequest) String() string { + return fmt.Sprintf("[%s %s]", req.ID, req.Method) +} + +func MapToRequest(id string, method string, params map[string]interface{}) (RPCRequest, error) { + var params_ = make(map[string]json.RawMessage, len(params)) + for name, value := range params { + valueJSON, err := json.Marshal(value) + if err != nil { + return RPCRequest{}, err + } + params_[name] = valueJSON + } + payload, err := json.Marshal(params_) // NOTE: Amino doesn't handle maps yet. + if err != nil { + return RPCRequest{}, err + } + request := NewRPCRequest(id, method, payload) + return request, nil +} + +func ArrayToRequest(id string, method string, params []interface{}) (RPCRequest, error) { + var params_ = make([]json.RawMessage, len(params)) + for i, value := range params { + valueJSON, err := json.Marshal(value) + if err != nil { + return RPCRequest{}, err + } + params_[i] = valueJSON + } + payload, err := json.Marshal(params_) // NOTE: Amino doesn't handle maps yet. + if err != nil { + return RPCRequest{}, err + } + request := NewRPCRequest(id, method, payload) + return request, nil +} + +//---------------------------------------- +// RESPONSE + +type RPCError struct { + Code int `json:"code"` + Message string `json:"message"` + Data string `json:"data,omitempty"` +} + +func (err RPCError) Error() string { + const baseFormat = "RPC error %v - %s" + if err.Data != "" { + return fmt.Sprintf(baseFormat+": %s", err.Code, err.Message, err.Data) + } + return fmt.Sprintf(baseFormat, err.Code, err.Message) +} + +type RPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID string `json:"id"` + Result json.RawMessage `json:"result,omitempty"` + Error *RPCError `json:"error,omitempty"` +} + +func NewRPCSuccessResponse(id string, res interface{}) RPCResponse { + var rawMsg json.RawMessage + + if res != nil { + var js []byte + js, err := json.Marshal(res) + if err != nil { + return RPCInternalError(id, errors.Wrap(err, "Error marshalling response")) + } + rawMsg = json.RawMessage(js) + } + + return RPCResponse{JSONRPC: "2.0", ID: id, Result: rawMsg} +} + +func NewRPCErrorResponse(id string, code int, msg string, data string) RPCResponse { + return RPCResponse{ + JSONRPC: "2.0", + ID: id, + Error: &RPCError{Code: code, Message: msg, Data: data}, + } +} + +func (resp RPCResponse) String() string { + if resp.Error == nil { + return fmt.Sprintf("[%s %v]", resp.ID, resp.Result) + } + return fmt.Sprintf("[%s %s]", resp.ID, resp.Error) +} + +func RPCParseError(id string, err error) RPCResponse { + return NewRPCErrorResponse(id, -32700, "Parse error. Invalid JSON", err.Error()) +} + +func RPCInvalidRequestError(id string, err error) RPCResponse { + return NewRPCErrorResponse(id, -32600, "Invalid Request", err.Error()) +} + +func RPCMethodNotFoundError(id string) RPCResponse { + return NewRPCErrorResponse(id, -32601, "Method not found", "") +} + +func RPCInvalidParamsError(id string, err error) RPCResponse { + return NewRPCErrorResponse(id, -32602, "Invalid params", err.Error()) +} + +func RPCInternalError(id string, err error) RPCResponse { + return NewRPCErrorResponse(id, -32603, "Internal error", err.Error()) +} + +func RPCServerError(id string, err error) RPCResponse { + return NewRPCErrorResponse(id, -32000, "Server error", err.Error()) +} + +//---------------------------------------- + +// *wsConnection implements this interface. +type WSRPCConnection interface { + GetRemoteAddr() string + WriteRPCResponse(resp RPCResponse) + TryWriteRPCResponse(resp RPCResponse) bool + GetEventSubscriber() EventSubscriber +} + +// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber +type EventSubscriber interface { + Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error + Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error + UnsubscribeAll(ctx context.Context, subscriber string) error +} + +// websocket-only RPCFuncs take this as the first parameter. +type WSRPCContext struct { + Request RPCRequest + WSRPCConnection +} + +//---------------------------------------- +// SOCKETS +// +// Determine if its a unix or tcp socket. +// If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port +// TODO: deprecate +func SocketType(listenAddr string) string { + socketType := "unix" + if len(strings.Split(listenAddr, ":")) >= 2 { + socketType = "tcp" + } + return socketType +} diff --git a/rpc/tm/lib/types/types_test.go b/rpc/tm/lib/types/types_test.go new file mode 100644 index 0000000000000000000000000000000000000000..1227bbd149a8c6996d4acf6800d1566fd7ba53cc --- /dev/null +++ b/rpc/tm/lib/types/types_test.go @@ -0,0 +1,48 @@ +package types + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +type SampleResult struct { + Value string +} + +func TestResponses(t *testing.T) { + assert := assert.New(t) + + a := NewRPCSuccessResponse("1", &SampleResult{"hello"}) + b, _ := json.Marshal(a) + s := `{"jsonrpc":"2.0","id":"1","result":{"Value":"hello"}}` + assert.Equal(string(s), string(b)) + + d := RPCParseError("1", errors.New("Hello world")) + e, _ := json.Marshal(d) + f := `{"jsonrpc":"2.0","id":"1","error":{"code":-32700,"message":"Parse error. Invalid JSON","data":"Hello world"}}` + assert.Equal(string(f), string(e)) + + g := RPCMethodNotFoundError("2") + h, _ := json.Marshal(g) + i := `{"jsonrpc":"2.0","id":"2","error":{"code":-32601,"message":"Method not found"}}` + assert.Equal(string(h), string(i)) +} + +func TestRPCError(t *testing.T) { + assert.Equal(t, "RPC error 12 - Badness: One worse than a code 11", + fmt.Sprintf("%v", &RPCError{ + Code: 12, + Message: "Badness", + Data: "One worse than a code 11", + })) + + assert.Equal(t, "RPC error 12 - Badness", + fmt.Sprintf("%v", &RPCError{ + Code: 12, + Message: "Badness", + })) +}