diff --git a/core/json_service.go b/core/json_service.go new file mode 100644 index 0000000000000000000000000000000000000000..e385870fbbd9deb65f63cfcf75826354bf6b91a9 --- /dev/null +++ b/core/json_service.go @@ -0,0 +1,183 @@ +package core + +import ( + "encoding/json" + "net/http" + + "github.com/gin-gonic/gin" + + log "github.com/eris-ltd/eris-logger" + + core_types "github.com/eris-ltd/eris-db/core/types" + definitions "github.com/eris-ltd/eris-db/definitions" + rpc "github.com/eris-ltd/eris-db/rpc" + server "github.com/eris-ltd/eris-db/server" +) + +// Server used to handle JSON-RPC 2.0 requests. Implements server.Server +type JsonRpcServer struct { + service server.HttpService + running bool +} + +// Create a new JsonRpcServer +func NewJsonRpcServer(service server.HttpService) *JsonRpcServer { + return &JsonRpcServer{service: service} +} + +// Start adds the rpc path to the router. +func (this *JsonRpcServer) Start(jsonRpcEndpoint string, router *gin.Engine) { + router.POST(jsonRpcEndpoint, this.handleFunc) + this.running = true +} + +// Is the server currently running? +func (this *JsonRpcServer) Running() bool { + return this.running +} + +// Shut the server down. Does nothing. +func (this *JsonRpcServer) ShutDown() { + this.running = false +} + +// Handler passes message on directly to the service which expects +// a normal http request and response writer. +func (this *JsonRpcServer) handleFunc(c *gin.Context) { + r := c.Request + w := c.Writer + + this.service.Process(r, w) +} + +// Used for ErisDb. Implements server.HttpService +type ErisDbJsonService struct { + codec rpc.Codec + pipe definitions.Pipe + eventSubs *EventSubscriptions + defaultHandlers map[string]RequestHandlerFunc +} + +// Create a new JSON-RPC 2.0 service for erisdb (tendermint). +func NewErisDbJsonService(codec rpc.Codec, pipe definitions.Pipe, + eventSubs *EventSubscriptions) server.HttpService { + + tmhttps := &ErisDbJsonService{codec: codec, pipe: pipe, eventSubs: eventSubs} + mtds := &ErisDbMethods{codec, pipe} + + dhMap := mtds.getMethods() + // Events + dhMap[EVENT_SUBSCRIBE] = tmhttps.EventSubscribe + dhMap[EVENT_UNSUBSCRIBE] = tmhttps.EventUnsubscribe + dhMap[EVENT_POLL] = tmhttps.EventPoll + tmhttps.defaultHandlers = dhMap + return tmhttps +} + +// Process a request. +func (this *ErisDbJsonService) Process(r *http.Request, w http.ResponseWriter) { + + // Create new request object and unmarshal. + req := &rpc.RPCRequest{} + decoder := json.NewDecoder(r.Body) + errU := decoder.Decode(req) + + // Error when decoding. + if errU != nil { + this.writeError("Failed to parse request: "+errU.Error(), "", rpc.PARSE_ERROR, w) + return + } + + // Wrong protocol version. + if req.JSONRPC != "2.0" { + this.writeError("Wrong protocol version: "+req.JSONRPC, req.Id, rpc.INVALID_REQUEST, w) + return + } + + mName := req.Method + + if handler, ok := this.defaultHandlers[mName]; ok { + resp, errCode, err := handler(req, w) + if err != nil { + this.writeError(err.Error(), req.Id, errCode, w) + } else { + this.writeResponse(req.Id, resp, w) + } + } else { + this.writeError("Method not found: "+mName, req.Id, rpc.METHOD_NOT_FOUND, w) + } +} + +// Helper for writing error responses. +func (this *ErisDbJsonService) writeError(msg, id string, code int, w http.ResponseWriter) { + response := rpc.NewRPCErrorResponse(id, code, msg) + err := this.codec.Encode(response, w) + // If there's an error here all bets are off. + if err != nil { + http.Error(w, "Failed to marshal standard error response: "+err.Error(), 500) + return + } + w.WriteHeader(200) +} + +// Helper for writing responses. +func (this *ErisDbJsonService) writeResponse(id string, result interface{}, w http.ResponseWriter) { + log.Debug("Result: %v\n", result) + response := rpc.NewRPCResponse(id, result) + err := this.codec.Encode(response, w) + log.Debug("Response: %v\n", response) + if err != nil { + this.writeError("Internal error: "+err.Error(), id, rpc.INTERNAL_ERROR, w) + return + } + w.WriteHeader(200) +} + +// *************************************** Events ************************************ + +// Subscribe to an event. +func (this *ErisDbJsonService) EventSubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &EventIdParam{} + err := json.Unmarshal(request.Params, param) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + eventId := param.EventId + subId, errC := this.eventSubs.add(eventId) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.EventSub{subId}, 0, nil +} + +// Un-subscribe from an event. +func (this *ErisDbJsonService) EventUnsubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &SubIdParam{} + err := json.Unmarshal(request.Params, param) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + subId := param.SubId + + result, errC := this.pipe.Events().Unsubscribe(subId) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.EventUnsub{result}, 0, nil +} + +// Check subscription event cache for new data. +func (this *ErisDbJsonService) EventPoll(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &SubIdParam{} + err := json.Unmarshal(request.Params, param) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + subId := param.SubId + + result, errC := this.eventSubs.poll(subId) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.PollResponse{result}, 0, nil +} diff --git a/core/restServer.go b/core/restServer.go new file mode 100644 index 0000000000000000000000000000000000000000..2875317c056978687cf95eefbd11c5ecb53663e0 --- /dev/null +++ b/core/restServer.go @@ -0,0 +1,638 @@ +package core + +import ( + "encoding/hex" + "fmt" + "strconv" + "strings" + + "github.com/gin-gonic/gin" + + core_types "github.com/eris-ltd/eris-db/core/types" + definitions "github.com/eris-ltd/eris-db/definitions" + rpc "github.com/eris-ltd/eris-db/rpc" + server "github.com/eris-ltd/eris-db/server" + "github.com/eris-ltd/eris-db/txs" + "github.com/eris-ltd/eris-db/util" +) + +// Provides a REST-like web-api. Implements server.Server +// TODO more routers. Also, start looking into how better status codes +// can be gotten. +type RestServer struct { + codec rpc.Codec + pipe definitions.Pipe + eventSubs *EventSubscriptions + running bool +} + +// Create a new rest server. +func NewRestServer(codec rpc.Codec, pipe definitions.Pipe, eventSubs *EventSubscriptions) *RestServer { + return &RestServer{codec: codec, pipe: pipe, eventSubs: eventSubs} +} + +// Starting the server means registering all the handlers with the router. +func (this *RestServer) Start(config *server.ServerConfig, router *gin.Engine) { + // Accounts + router.GET("/accounts", parseSearchQuery, this.handleAccounts) + router.GET("/accounts/:address", addressParam, this.handleAccount) + router.GET("/accounts/:address/storage", addressParam, this.handleStorage) + router.GET("/accounts/:address/storage/:key", addressParam, keyParam, this.handleStorageAt) + // Blockchain + router.GET("/blockchain", this.handleBlockchainInfo) + router.GET("/blockchain/chain_id", this.handleChainId) + router.GET("/blockchain/genesis_hash", this.handleGenesisHash) + router.GET("/blockchain/latest_block_height", this.handleLatestBlockHeight) + router.GET("/blockchain/latest_block", this.handleLatestBlock) + router.GET("/blockchain/blocks", parseSearchQuery, this.handleBlocks) + router.GET("/blockchain/block/:height", heightParam, this.handleBlock) + // Consensus + router.GET("/consensus", this.handleConsensusState) + router.GET("/consensus/validators", this.handleValidatorList) + // Events + router.POST("/event_subs", this.handleEventSubscribe) + router.GET("/event_subs/:id", this.handleEventPoll) + router.DELETE("/event_subs/:id", this.handleEventUnsubscribe) + // NameReg + router.GET("/namereg", parseSearchQuery, this.handleNameRegEntries) + router.GET("/namereg/:key", nameParam, this.handleNameRegEntry) + // Network + router.GET("/network", this.handleNetworkInfo) + router.GET("/network/client_version", this.handleClientVersion) + router.GET("/network/moniker", this.handleMoniker) + router.GET("/network/listening", this.handleListening) + router.GET("/network/listeners", this.handleListeners) + router.GET("/network/peers", this.handlePeers) + router.GET("/network/peers/:address", peerAddressParam, this.handlePeer) + // Tx related (TODO get txs has still not been implemented) + router.POST("/txpool", this.handleBroadcastTx) + router.GET("/txpool", this.handleUnconfirmedTxs) + // Code execution + router.POST("/calls", this.handleCall) + router.POST("/codecalls", this.handleCallCode) + // Unsafe + router.GET("/unsafe/pa_generator", this.handleGenPrivAcc) + router.POST("/unsafe/txpool", parseTxModifier, this.handleTransact) + router.POST("/unsafe/namereg/txpool", this.handleTransactNameReg) + router.POST("/unsafe/tx_signer", this.handleSignTx) + this.running = true +} + +// Is the server currently running? +func (this *RestServer) Running() bool { + return this.running +} + +// Shut the server down. Does nothing. +func (this *RestServer) ShutDown() { + this.running = false +} + +// ********************************* Accounts ********************************* + +func (this *RestServer) handleGenPrivAcc(c *gin.Context) { + addr := &AddressParam{} + + var acc interface{} + var err error + if addr.Address == nil || len(addr.Address) == 0 { + acc, err = this.pipe.Accounts().GenPrivAccount() + } else { + acc, err = this.pipe.Accounts().GenPrivAccountFromKey(addr.Address) + } + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(acc, c.Writer) +} + +func (this *RestServer) handleAccounts(c *gin.Context) { + var filters []*core_types.FilterData + fs, exists := c.Get("filters") + if exists { + filters = fs.([]*core_types.FilterData) + } + accs, err := this.pipe.Accounts().Accounts(filters) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(accs, c.Writer) +} + +func (this *RestServer) handleAccount(c *gin.Context) { + addr := c.MustGet("addrBts").([]byte) + acc, err := this.pipe.Accounts().Account(addr) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(acc, c.Writer) +} + +func (this *RestServer) handleStorage(c *gin.Context) { + addr := c.MustGet("addrBts").([]byte) + s, err := this.pipe.Accounts().Storage(addr) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(s, c.Writer) +} + +func (this *RestServer) handleStorageAt(c *gin.Context) { + addr := c.MustGet("addrBts").([]byte) + key := c.MustGet("keyBts").([]byte) + sa, err := this.pipe.Accounts().StorageAt(addr, key) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(sa, c.Writer) +} + +// ********************************* Blockchain ********************************* + +func (this *RestServer) handleBlockchainInfo(c *gin.Context) { + bci, err := this.pipe.Blockchain().Info() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(bci, c.Writer) +} + +func (this *RestServer) handleGenesisHash(c *gin.Context) { + gh, err := this.pipe.Blockchain().GenesisHash() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.GenesisHash{gh}, c.Writer) +} + +func (this *RestServer) handleChainId(c *gin.Context) { + cId, err := this.pipe.Blockchain().ChainId() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.ChainId{cId}, c.Writer) +} + +func (this *RestServer) handleLatestBlockHeight(c *gin.Context) { + lbh, err := this.pipe.Blockchain().LatestBlockHeight() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.LatestBlockHeight{lbh}, c.Writer) +} + +func (this *RestServer) handleLatestBlock(c *gin.Context) { + lb, err := this.pipe.Blockchain().LatestBlock() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(lb, c.Writer) +} + +func (this *RestServer) handleBlocks(c *gin.Context) { + var filters []*core_types.FilterData + fs, exists := c.Get("filters") + if exists { + filters = fs.([]*core_types.FilterData) + } + + blocks, err := this.pipe.Blockchain().Blocks(filters) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(blocks, c.Writer) +} + +func (this *RestServer) handleBlock(c *gin.Context) { + height := c.MustGet("height").(int) + block, err := this.pipe.Blockchain().Block(height) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(block, c.Writer) +} + +// ********************************* Consensus ********************************* +func (this *RestServer) handleConsensusState(c *gin.Context) { + + cs, err := this.pipe.Consensus().State() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(cs, c.Writer) +} + +func (this *RestServer) handleValidatorList(c *gin.Context) { + vl, err := this.pipe.Consensus().Validators() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(vl, c.Writer) +} + +// ********************************* Events ********************************* + +func (this *RestServer) handleEventSubscribe(c *gin.Context) { + param := &EventIdParam{} + errD := this.codec.Decode(param, c.Request.Body) + if errD != nil { + c.AbortWithError(500, errD) + } + subId, err := this.eventSubs.add(param.EventId) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.EventSub{subId}, c.Writer) +} + +func (this *RestServer) handleEventPoll(c *gin.Context) { + subId := c.MustGet("id").(string) + data, err := this.eventSubs.poll(subId) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.PollResponse{data}, c.Writer) +} + +func (this *RestServer) handleEventUnsubscribe(c *gin.Context) { + subId := c.MustGet("id").(string) + err := this.eventSubs.remove(subId) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.EventUnsub{true}, c.Writer) +} + +// ********************************* NameReg ********************************* + +func (this *RestServer) handleNameRegEntries(c *gin.Context) { + var filters []*core_types.FilterData + fs, exists := c.Get("filters") + if exists { + filters = fs.([]*core_types.FilterData) + } + entries, err := this.pipe.NameReg().Entries(filters) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(entries, c.Writer) +} + +func (this *RestServer) handleNameRegEntry(c *gin.Context) { + name := c.MustGet("name").(string) + entry, err := this.pipe.NameReg().Entry(name) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(entry, c.Writer) +} + +// ********************************* Network ********************************* + +func (this *RestServer) handleNetworkInfo(c *gin.Context) { + nInfo, err := this.pipe.Net().Info() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(nInfo, c.Writer) +} + +func (this *RestServer) handleClientVersion(c *gin.Context) { + version, err := this.pipe.Net().ClientVersion() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.ClientVersion{version}, c.Writer) +} + +func (this *RestServer) handleMoniker(c *gin.Context) { + moniker, err := this.pipe.Net().Moniker() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.Moniker{moniker}, c.Writer) +} + +func (this *RestServer) handleListening(c *gin.Context) { + listening, err := this.pipe.Net().Listening() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.Listening{listening}, c.Writer) +} + +func (this *RestServer) handleListeners(c *gin.Context) { + listeners, err := this.pipe.Net().Listeners() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(&core_types.Listeners{listeners}, c.Writer) +} + +func (this *RestServer) handlePeers(c *gin.Context) { + peers, err := this.pipe.Net().Peers() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(peers, c.Writer) +} + +func (this *RestServer) handlePeer(c *gin.Context) { + address := c.MustGet("address").(string) + peer, err := this.pipe.Net().Peer(address) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(peer, c.Writer) +} + +// ********************************* Transactions ********************************* + +func (this *RestServer) handleBroadcastTx(c *gin.Context) { + param := &txs.CallTx{} + errD := this.codec.Decode(param, c.Request.Body) + if errD != nil { + c.AbortWithError(500, errD) + } + receipt, err := this.pipe.Transactor().BroadcastTx(param) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(receipt, c.Writer) +} + +func (this *RestServer) handleUnconfirmedTxs(c *gin.Context) { + + txs, err := this.pipe.Transactor().UnconfirmedTxs() + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(txs, c.Writer) +} + +func (this *RestServer) handleCall(c *gin.Context) { + param := &CallParam{} + errD := this.codec.Decode(param, c.Request.Body) + if errD != nil { + c.AbortWithError(500, errD) + } + call, err := this.pipe.Transactor().Call(param.From, param.Address, param.Data) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(call, c.Writer) +} + +func (this *RestServer) handleCallCode(c *gin.Context) { + param := &CallCodeParam{} + errD := this.codec.Decode(param, c.Request.Body) + if errD != nil { + c.AbortWithError(500, errD) + } + call, err := this.pipe.Transactor().CallCode(param.From, param.Code, param.Data) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(call, c.Writer) +} + +func (this *RestServer) handleTransact(c *gin.Context) { + + _, hold := c.Get("hold") + + param := &TransactParam{} + errD := this.codec.Decode(param, c.Request.Body) + if errD != nil { + c.AbortWithError(500, errD) + } + if hold { + res, err := this.pipe.Transactor().TransactAndHold(param.PrivKey, param.Address, param.Data, param.GasLimit, param.Fee) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(res, c.Writer) + } else { + receipt, err := this.pipe.Transactor().Transact(param.PrivKey, param.Address, param.Data, param.GasLimit, param.Fee) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(receipt, c.Writer) + } +} + +func (this *RestServer) handleTransactNameReg(c *gin.Context) { + param := &TransactNameRegParam{} + errD := this.codec.Decode(param, c.Request.Body) + if errD != nil { + c.AbortWithError(500, errD) + } + receipt, err := this.pipe.Transactor().TransactNameReg(param.PrivKey, param.Name, param.Data, param.Amount, param.Fee) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(receipt, c.Writer) +} + +func (this *RestServer) handleSignTx(c *gin.Context) { + param := &SignTxParam{} + errD := this.codec.Decode(param, c.Request.Body) + if errD != nil { + c.AbortWithError(500, errD) + } + tx, err := this.pipe.Transactor().SignTx(param.Tx, param.PrivAccounts) + if err != nil { + c.AbortWithError(500, err) + } + c.Writer.WriteHeader(200) + this.codec.Encode(tx, c.Writer) +} + +// ********************************* Middleware ********************************* + +func addressParam(c *gin.Context) { + addr := c.Param("address") + if !util.IsAddress(addr) { + c.AbortWithError(400, fmt.Errorf("Malformed address param: "+addr)) + } + bts, _ := hex.DecodeString(addr) + c.Set("addrBts", bts) + c.Next() +} + +func nameParam(c *gin.Context) { + name := c.Param("key") + c.Set("name", name) + c.Next() +} + +func keyParam(c *gin.Context) { + key := c.Param("key") + bts, err := hex.DecodeString(key) + if err != nil { + c.AbortWithError(400, err) + } + c.Set("keyBts", bts) + c.Next() +} + +func heightParam(c *gin.Context) { + h, err := strconv.Atoi(c.Param("height")) + if err != nil { + c.AbortWithError(400, err) + } + if h < 0 { + c.AbortWithError(400, fmt.Errorf("Negative number used as height.")) + } + c.Set("height", h) + c.Next() +} + +func subIdParam(c *gin.Context) { + subId := c.Param("id") + if len(subId) != 64 || !util.IsHex(subId) { + c.AbortWithError(400, fmt.Errorf("Malformed event id")) + } + c.Set("id", subId) + c.Next() +} + +// TODO +func peerAddressParam(c *gin.Context) { + subId := c.Param("address") + c.Set("address", subId) + c.Next() +} + +func parseTxModifier(c *gin.Context) { + hold := c.Query("hold") + if hold == "true" { + c.Set("hold", true) + } else if hold != "" { + if hold != "false" { + c.Writer.WriteHeader(400) + c.Writer.Write([]byte("tx hold must be either 'true' or 'false', found: " + hold)) + c.Abort() + } + } +} + +func parseSearchQuery(c *gin.Context) { + q := c.Query("q") + if q != "" { + data, err := _parseSearchQuery(q) + if err != nil { + c.Writer.WriteHeader(400) + c.Writer.Write([]byte(err.Error())) + c.Abort() + // c.AbortWithError(400, err) + return + } + c.Set("filters", data) + } +} + +func _parseSearchQuery(queryString string) ([]*core_types.FilterData, error) { + if len(queryString) == 0 { + return nil, nil + } + filters := strings.Split(queryString, " ") + fdArr := []*core_types.FilterData{} + for _, f := range filters { + kv := strings.Split(f, ":") + if len(kv) != 2 { + return nil, fmt.Errorf("Malformed query. Missing ':' separator: " + f) + } + if kv[0] == "" { + return nil, fmt.Errorf("Malformed query. Field name missing: " + f) + } + + fd, fd2, errTfd := toFilterData(kv[0], kv[1]) + if errTfd != nil { + return nil, errTfd + } + fdArr = append(fdArr, fd) + if fd2 != nil { + fdArr = append(fdArr, fd2) + } + } + return fdArr, nil +} + +// Parse the query statement and create . Two filter data in case of a range param. +func toFilterData(field, stmt string) (*core_types.FilterData, *core_types.FilterData, error) { + // In case statement is empty + if stmt == "" { + return &core_types.FilterData{field, "==", ""}, nil, nil + } + // Simple routine based on string splitting. TODO add quoted range query. + if stmt[0] == '>' || stmt[0] == '<' || stmt[0] == '=' || stmt[0] == '!' { + // This means a normal operator. If one character then stop, otherwise + // peek at next and check if it's a "=". + + if len(stmt) == 1 { + return &core_types.FilterData{field, stmt[0:1], ""}, nil, nil + } else if stmt[1] == '=' { + return &core_types.FilterData{field, stmt[:2], stmt[2:]}, nil, nil + } else { + return &core_types.FilterData{field, stmt[0:1], stmt[1:]}, nil, nil + } + } else { + // Either we have a range query here or a malformed query. + rng := strings.Split(stmt, "..") + // This is for when there is no op, but the value is not empty. + if len(rng) == 1 { + return &core_types.FilterData{field, "==", stmt}, nil, nil + } + // The rest. + if len(rng) != 2 || rng[0] == "" || rng[1] == "" { + return nil, nil, fmt.Errorf("Malformed query statement: " + stmt) + } + var min string + var max string + if rng[0] == "*" { + min = "min" + } else { + min = rng[0] + } + if rng[1] == "*" { + max = "max" + } else { + max = rng[1] + } + return &core_types.FilterData{field, ">=", min}, &core_types.FilterData{field, "<=", max}, nil + } + return nil, nil, nil +} diff --git a/core/wsService.go b/core/wsService.go new file mode 100644 index 0000000000000000000000000000000000000000..0937a04cc5e06bc38b38b6333be9c2e374845c82 --- /dev/null +++ b/core/wsService.go @@ -0,0 +1,139 @@ +package core + +import ( + "encoding/json" + "fmt" + + "github.com/tendermint/go-events" + + log "github.com/eris-ltd/eris-logger" + + core_types "github.com/eris-ltd/eris-db/core/types" + definitions "github.com/eris-ltd/eris-db/definitions" + rpc "github.com/eris-ltd/eris-db/rpc" + server "github.com/eris-ltd/eris-db/server" +) + +// Used for ErisDb. Implements WebSocketService. +type ErisDbWsService struct { + codec rpc.Codec + pipe definitions.Pipe + defaultHandlers map[string]RequestHandlerFunc +} + +// Create a new websocket service. +func NewErisDbWsService(codec rpc.Codec, + pipe definitions.Pipe) server.WebSocketService { + tmwss := &ErisDbWsService{codec: codec, pipe: pipe} + mtds := &ErisDbMethods{codec, pipe} + + dhMap := mtds.getMethods() + // Events + dhMap[EVENT_SUBSCRIBE] = tmwss.EventSubscribe + dhMap[EVENT_UNSUBSCRIBE] = tmwss.EventUnsubscribe + tmwss.defaultHandlers = dhMap + return tmwss +} + +// Process a request. +func (this *ErisDbWsService) Process(msg []byte, session *server.WSSession) { + log.Debug("REQUEST: %s\n", string(msg)) + // Create new request object and unmarshal. + req := &rpc.RPCRequest{} + errU := json.Unmarshal(msg, req) + + // Error when unmarshaling. + if errU != nil { + this.writeError("Failed to parse request: "+errU.Error()+" . Raw: "+string(msg), "", rpc.PARSE_ERROR, session) + return + } + + // Wrong protocol version. + if req.JSONRPC != "2.0" { + this.writeError("Wrong protocol version: "+req.JSONRPC, req.Id, rpc.INVALID_REQUEST, session) + return + } + + mName := req.Method + + if handler, ok := this.defaultHandlers[mName]; ok { + resp, errCode, err := handler(req, session) + if err != nil { + this.writeError(err.Error(), req.Id, errCode, session) + } else { + this.writeResponse(req.Id, resp, session) + } + } else { + this.writeError("Method not found: "+mName, req.Id, rpc.METHOD_NOT_FOUND, session) + } +} + +// Convenience method for writing error responses. +func (this *ErisDbWsService) writeError(msg, id string, code int, session *server.WSSession) { + response := rpc.NewRPCErrorResponse(id, code, msg) + bts, err := this.codec.EncodeBytes(response) + // If there's an error here all bets are off. + if err != nil { + panic("Failed to marshal standard error response." + err.Error()) + } + session.Write(bts) +} + +// Convenience method for writing responses. +func (this *ErisDbWsService) writeResponse(id string, result interface{}, session *server.WSSession) error { + response := rpc.NewRPCResponse(id, result) + bts, err := this.codec.EncodeBytes(response) + log.Debug("RESPONSE: %v\n", response) + if err != nil { + this.writeError("Internal error: "+err.Error(), id, rpc.INTERNAL_ERROR, session) + return err + } + return session.Write(bts) +} + +// *************************************** Events ************************************ + +func (this *ErisDbWsService) EventSubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + session, ok := requester.(*server.WSSession) + if !ok { + return 0, rpc.INTERNAL_ERROR, fmt.Errorf("Passing wrong object to websocket events") + } + param := &EventIdParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + eventId := param.EventId + subId, errSID := generateSubId() + if errSID != nil { + return nil, rpc.INTERNAL_ERROR, errSID + } + + callback := func(ret events.EventData) { + this.writeResponse(subId, ret, session) + } + _, errC := this.pipe.Events().Subscribe(subId, eventId, callback) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.EventSub{subId}, 0, nil +} + +func (this *ErisDbWsService) EventUnsubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + param := &EventIdParam{} + err := this.codec.DecodeBytes(param, request.Params) + if err != nil { + return nil, rpc.INVALID_PARAMS, err + } + eventId := param.EventId + + result, errC := this.pipe.Events().Unsubscribe(eventId) + if errC != nil { + return nil, rpc.INTERNAL_ERROR, errC + } + return &core_types.EventUnsub{result}, 0, nil +} + +func (this *ErisDbWsService) EventPoll(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { + return nil, rpc.INTERNAL_ERROR, fmt.Errorf("Cannot poll with websockets") +} diff --git a/server/config.go b/server/config.go new file mode 100644 index 0000000000000000000000000000000000000000..ac5734fe0499b9331cea9035b0cb4c53fb7673b1 --- /dev/null +++ b/server/config.go @@ -0,0 +1,160 @@ +// Copyright 2015, 2016 Eris Industries (UK) Ltd. +// This file is part of Eris-RT + +// Eris-RT is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Eris-RT is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>. + +package server + +import ( + "fmt" + "math" + + viper "github.com/spf13/viper" +) + +type ( + ServerConfig struct { + Bind Bind `toml:"bind"` + TLS TLS `toml:"TLS"` + CORS CORS `toml:"CORS"` + HTTP HTTP `toml:"HTTP"` + WebSocket WebSocket `toml:"web_socket"` + Logging Logging `toml:"logging"` + } + + Bind struct { + Address string `toml:"address"` + Port uint16 `toml:"port"` + } + + TLS struct { + TLS bool `toml:"tls"` + CertPath string `toml:"cert_path"` + KeyPath string `toml:"key_path"` + } + + // Options stores configurations + CORS struct { + Enable bool `toml:"enable"` + AllowOrigins []string `toml:"allow_origins"` + AllowCredentials bool `toml:"allow_credentials"` + AllowMethods []string `toml:"allow_methods"` + AllowHeaders []string `toml:"allow_headers"` + ExposeHeaders []string `toml:"expose_headers"` + MaxAge uint64 `toml:"max_age"` + } + + HTTP struct { + JsonRpcEndpoint string `toml:"json_rpc_endpoint"` + } + + WebSocket struct { + WebSocketEndpoint string `toml:"websocket_endpoint"` + MaxWebSocketSessions uint16 `toml:"max_websocket_sessions"` + ReadBufferSize uint64 `toml:"read_buffer_size"` + WriteBufferSize uint64 `toml:"write_buffer_size"` + } + + Logging struct { + ConsoleLogLevel string `toml:"console_log_level"` + FileLogLevel string `toml:"file_log_level"` + LogFile string `toml:"log_file"` + } +) + +func ReadServerConfig(viper *viper.Viper) (*ServerConfig, error) { + // TODO: [ben] replace with a more elegant way of asserting + // the possible conversion to the type domain + + // check domain range for bind.port + bindPortInt := viper.GetInt("bind.port") + var bindPortUint16 uint16 = 0 + if bindPortInt >= 0 && bindPortInt <= math.MaxUint16 { + bindPortUint16 = uint16(bindPortInt) + } else { + return nil, fmt.Errorf("Failed to read binding port from configuration: %v", + bindPortInt) + } + // check domain range for cors.max_age + maxAge := viper.GetInt("cors.max_age") + var maxAgeUint64 uint64 = 0 + if maxAge >= 0 { + maxAgeUint64 = uint64(maxAge) + } else { + return nil, fmt.Errorf("Failed to read maximum age for CORS: %v", maxAge) + } + // check domain range for websocket.max_sessions + maxWebsocketSessions := viper.GetInt("websocket.max_sessions") + var maxWebsocketSessionsUint16 uint16 = 0 + if maxWebsocketSessions >= 0 && maxWebsocketSessions <= math.MaxUint16 { + maxWebsocketSessionsUint16 = uint16(maxWebsocketSessions) + } else { + return nil, fmt.Errorf("Failed to read maximum websocket sessions: %v", + maxWebsocketSessions) + } + // check domain range for websocket.read_buffer_size + readBufferSize := viper.GetInt("websocket.read_buffer_size") + var readBufferSizeUint64 uint64 = 0 + if readBufferSize >= 0 { + readBufferSizeUint64 = uint64(readBufferSize) + } else { + return nil, fmt.Errorf("Failed to read websocket read buffer size: %v", + readBufferSize) + } + + // check domain range for websocket.write_buffer_size + writeBufferSize := viper.GetInt("websocket.read_buffer_size") + var writeBufferSizeUint64 uint64 = 0 + if writeBufferSize >= 0 { + writeBufferSizeUint64 = uint64(writeBufferSize) + } else { + return nil, fmt.Errorf("Failed to read websocket write buffer size: %v", + writeBufferSize) + } + + return &ServerConfig { + Bind: Bind { + Address: viper.GetString("bind.address"), + Port: bindPortUint16, + }, + TLS: TLS { + TLS: viper.GetBool("tls.tls"), + CertPath: viper.GetString("tls.cert_path"), + KeyPath: viper.GetString("tls.key_path"), + }, + CORS: CORS { + Enable: viper.GetBool("cors.enable"), + AllowOrigins: viper.GetStringSlice("cors.allow_origins"), + AllowCredentials: viper.GetBool("cors.allow_credentials"), + AllowMethods: viper.GetStringSlice("cors.allow_methods"), + AllowHeaders: viper.GetStringSlice("cors.allow_headers"), + ExposeHeaders: viper.GetStringSlice("cors.expose_headers"), + MaxAge: maxAgeUint64, + }, + HTTP: HTTP { + JsonRpcEndpoint: viper.GetString("http.json_rpc_endpoint"), + }, + WebSocket: WebSocket { + WebSocketEndpoint: viper.GetString("websocket.endpoint"), + MaxWebSocketSessions: maxWebsocketSessionsUint16, + ReadBufferSize: readBufferSizeUint64, + WriteBufferSize: writeBufferSizeUint64, + }, + Logging: Logging{ + ConsoleLogLevel: viper.GetString("logging.console_log_level"), + FileLogLevel: viper.GetString("logging.file_log_level"), + LogFile: viper.GetString("logging.log_file"), + }, + }, nil +} diff --git a/server/logging.go b/server/logging.go index 7c8b105d891b539dfcf3b8d3812519a4259064d4..173fecaf34371dea336d152b0d911efac4f7309d 100644 --- a/server/logging.go +++ b/server/logging.go @@ -5,15 +5,13 @@ import ( "os" "github.com/tendermint/log15" - - cfg "github.com/eris-ltd/eris-db/config" ) var rootHandler log15.Handler // This is basically the same code as in tendermint. Initialize root // and maybe later also track the loggers here. -func InitLogger(config *cfg.ServerConfig) { +func InitLogger(config *ServerConfig) { consoleLogLevel := config.Logging.ConsoleLogLevel diff --git a/server/server.go b/server/server.go index c7e3dc465364f66c7bf848f1c8e931d4b8c5e0fc..d95d1d4c5dc2984b1d0eade614866c9d9755ac72 100644 --- a/server/server.go +++ b/server/server.go @@ -7,8 +7,6 @@ import ( "net/http" "time" - cfg "github.com/eris-ltd/eris-db/config" - "github.com/gin-gonic/gin" cors "github.com/tommy351/gin-cors" "gopkg.in/tylerb/graceful.v1" @@ -24,7 +22,7 @@ type HttpService interface { // A server serves a number of different http calls. type Server interface { - Start(*cfg.ServerConfig, *gin.Engine) + Start(*ServerConfig, *gin.Engine) Running() bool ShutDown() } @@ -38,7 +36,7 @@ type Server interface { // 'Start()'. Stop event listeners can be added up to the point where // the server is stopped and the event is fired. type ServeProcess struct { - config *cfg.ServerConfig + config *ServerConfig servers []Server stopChan chan struct{} stoppedChan chan struct{} @@ -186,10 +184,11 @@ func (this *ServeProcess) StopEventChannel() <-chan struct{} { } // Creates a new serve process. -func NewServeProcess(config *cfg.ServerConfig, servers ...Server) *ServeProcess { - var scfg cfg.ServerConfig +func NewServeProcess(config *ServerConfig, servers ...Server) (*ServeProcess, + error) { + var scfg ServerConfig if config == nil { - scfg = cfg.DefaultServerConfig() + return nil, fmt.Errorf("Nil passed as server configuration") } else { scfg = *config } @@ -198,7 +197,7 @@ func NewServeProcess(config *cfg.ServerConfig, servers ...Server) *ServeProcess startListeners := make([]chan struct{}, 0) stopListeners := make([]chan struct{}, 0) sp := &ServeProcess{&scfg, servers, stopChan, stoppedChan, startListeners, stopListeners, nil} - return sp + return sp, nil } // Used to enable log15 logging instead of the default Gin logging. @@ -219,7 +218,7 @@ func logHandler(c *gin.Context) { } -func NewCORSMiddleware(options cfg.CORS) gin.HandlerFunc { +func NewCORSMiddleware(options CORS) gin.HandlerFunc { o := cors.Options{ AllowCredentials: options.AllowCredentials, AllowHeaders: options.AllowHeaders, diff --git a/server/websocket.go b/server/websocket.go index 2fde5ca59f2d75312edbc6d0838f26b505cdd2e1..766eeef5307cc2dc25f26e91d74624256e5f4530 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -6,8 +6,6 @@ import ( "sync" "time" - cfg "github.com/eris-ltd/eris-db/config" - "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) @@ -42,7 +40,7 @@ type WebSocketServer struct { running bool maxSessions uint sessionManager *SessionManager - config *cfg.ServerConfig + config *ServerConfig allOrigins bool } @@ -59,7 +57,7 @@ func NewWebSocketServer(maxSessions uint, service WebSocketService) *WebSocketSe } // Start the server. Adds the handler to the router and sets everything up. -func (this *WebSocketServer) Start(config *cfg.ServerConfig, router *gin.Engine) { +func (this *WebSocketServer) Start(config *ServerConfig, router *gin.Engine) { this.config = config diff --git a/server_config.toml b/server_config.toml index bd96bbce39f0f3f52f4217c61513b0726228979f..0c2793ac2654721a7ab50d071d3adddc5162c12d 100644 --- a/server_config.toml +++ b/server_config.toml @@ -66,7 +66,26 @@ genesis_file = "genesis.json" [servers] + [servers.bind] + address = "" + port = 1337 + + [servers.tls] + tls=false + cert_path="" + key_path="" + + [servers.cors] + enable=false + allow_origins=[] + allow_credentials=false + allow_methods=[] + allow_headers=[] + expose_headers=[] + max_age=0 + [servers.http] + json_rpc_endpoint="/rpc" [servers.websocket] enable = true @@ -75,6 +94,11 @@ genesis_file = "genesis.json" read_buffer_size = 4096 write_buffer_size = 4096 + [servers.logging] + console_log_level="info" + file_log_level="warn" + log_file="" + ################################################################################ ################################################################################ ##