Skip to content
Snippets Groups Projects
Unverified Commit 3cabaeec authored by Benjamin Bollen's avatar Benjamin Bollen
Browse files

Read server config from Viper; move servers into core

parent 59ca4795
No related branches found
No related tags found
No related merge requests found
package core
import (
"encoding/json"
"net/http"
"github.com/gin-gonic/gin"
log "github.com/eris-ltd/eris-logger"
core_types "github.com/eris-ltd/eris-db/core/types"
definitions "github.com/eris-ltd/eris-db/definitions"
rpc "github.com/eris-ltd/eris-db/rpc"
server "github.com/eris-ltd/eris-db/server"
)
// Server used to handle JSON-RPC 2.0 requests. Implements server.Server
type JsonRpcServer struct {
service server.HttpService
running bool
}
// Create a new JsonRpcServer
func NewJsonRpcServer(service server.HttpService) *JsonRpcServer {
return &JsonRpcServer{service: service}
}
// Start adds the rpc path to the router.
func (this *JsonRpcServer) Start(jsonRpcEndpoint string, router *gin.Engine) {
router.POST(jsonRpcEndpoint, this.handleFunc)
this.running = true
}
// Is the server currently running?
func (this *JsonRpcServer) Running() bool {
return this.running
}
// Shut the server down. Does nothing.
func (this *JsonRpcServer) ShutDown() {
this.running = false
}
// Handler passes message on directly to the service which expects
// a normal http request and response writer.
func (this *JsonRpcServer) handleFunc(c *gin.Context) {
r := c.Request
w := c.Writer
this.service.Process(r, w)
}
// Used for ErisDb. Implements server.HttpService
type ErisDbJsonService struct {
codec rpc.Codec
pipe definitions.Pipe
eventSubs *EventSubscriptions
defaultHandlers map[string]RequestHandlerFunc
}
// Create a new JSON-RPC 2.0 service for erisdb (tendermint).
func NewErisDbJsonService(codec rpc.Codec, pipe definitions.Pipe,
eventSubs *EventSubscriptions) server.HttpService {
tmhttps := &ErisDbJsonService{codec: codec, pipe: pipe, eventSubs: eventSubs}
mtds := &ErisDbMethods{codec, pipe}
dhMap := mtds.getMethods()
// Events
dhMap[EVENT_SUBSCRIBE] = tmhttps.EventSubscribe
dhMap[EVENT_UNSUBSCRIBE] = tmhttps.EventUnsubscribe
dhMap[EVENT_POLL] = tmhttps.EventPoll
tmhttps.defaultHandlers = dhMap
return tmhttps
}
// Process a request.
func (this *ErisDbJsonService) Process(r *http.Request, w http.ResponseWriter) {
// Create new request object and unmarshal.
req := &rpc.RPCRequest{}
decoder := json.NewDecoder(r.Body)
errU := decoder.Decode(req)
// Error when decoding.
if errU != nil {
this.writeError("Failed to parse request: "+errU.Error(), "", rpc.PARSE_ERROR, w)
return
}
// Wrong protocol version.
if req.JSONRPC != "2.0" {
this.writeError("Wrong protocol version: "+req.JSONRPC, req.Id, rpc.INVALID_REQUEST, w)
return
}
mName := req.Method
if handler, ok := this.defaultHandlers[mName]; ok {
resp, errCode, err := handler(req, w)
if err != nil {
this.writeError(err.Error(), req.Id, errCode, w)
} else {
this.writeResponse(req.Id, resp, w)
}
} else {
this.writeError("Method not found: "+mName, req.Id, rpc.METHOD_NOT_FOUND, w)
}
}
// Helper for writing error responses.
func (this *ErisDbJsonService) writeError(msg, id string, code int, w http.ResponseWriter) {
response := rpc.NewRPCErrorResponse(id, code, msg)
err := this.codec.Encode(response, w)
// If there's an error here all bets are off.
if err != nil {
http.Error(w, "Failed to marshal standard error response: "+err.Error(), 500)
return
}
w.WriteHeader(200)
}
// Helper for writing responses.
func (this *ErisDbJsonService) writeResponse(id string, result interface{}, w http.ResponseWriter) {
log.Debug("Result: %v\n", result)
response := rpc.NewRPCResponse(id, result)
err := this.codec.Encode(response, w)
log.Debug("Response: %v\n", response)
if err != nil {
this.writeError("Internal error: "+err.Error(), id, rpc.INTERNAL_ERROR, w)
return
}
w.WriteHeader(200)
}
// *************************************** Events ************************************
// Subscribe to an event.
func (this *ErisDbJsonService) EventSubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
param := &EventIdParam{}
err := json.Unmarshal(request.Params, param)
if err != nil {
return nil, rpc.INVALID_PARAMS, err
}
eventId := param.EventId
subId, errC := this.eventSubs.add(eventId)
if errC != nil {
return nil, rpc.INTERNAL_ERROR, errC
}
return &core_types.EventSub{subId}, 0, nil
}
// Un-subscribe from an event.
func (this *ErisDbJsonService) EventUnsubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
param := &SubIdParam{}
err := json.Unmarshal(request.Params, param)
if err != nil {
return nil, rpc.INVALID_PARAMS, err
}
subId := param.SubId
result, errC := this.pipe.Events().Unsubscribe(subId)
if errC != nil {
return nil, rpc.INTERNAL_ERROR, errC
}
return &core_types.EventUnsub{result}, 0, nil
}
// Check subscription event cache for new data.
func (this *ErisDbJsonService) EventPoll(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
param := &SubIdParam{}
err := json.Unmarshal(request.Params, param)
if err != nil {
return nil, rpc.INVALID_PARAMS, err
}
subId := param.SubId
result, errC := this.eventSubs.poll(subId)
if errC != nil {
return nil, rpc.INTERNAL_ERROR, errC
}
return &core_types.PollResponse{result}, 0, nil
}
package core
import (
"encoding/hex"
"fmt"
"strconv"
"strings"
"github.com/gin-gonic/gin"
core_types "github.com/eris-ltd/eris-db/core/types"
definitions "github.com/eris-ltd/eris-db/definitions"
rpc "github.com/eris-ltd/eris-db/rpc"
server "github.com/eris-ltd/eris-db/server"
"github.com/eris-ltd/eris-db/txs"
"github.com/eris-ltd/eris-db/util"
)
// Provides a REST-like web-api. Implements server.Server
// TODO more routers. Also, start looking into how better status codes
// can be gotten.
type RestServer struct {
codec rpc.Codec
pipe definitions.Pipe
eventSubs *EventSubscriptions
running bool
}
// Create a new rest server.
func NewRestServer(codec rpc.Codec, pipe definitions.Pipe, eventSubs *EventSubscriptions) *RestServer {
return &RestServer{codec: codec, pipe: pipe, eventSubs: eventSubs}
}
// Starting the server means registering all the handlers with the router.
func (this *RestServer) Start(config *server.ServerConfig, router *gin.Engine) {
// Accounts
router.GET("/accounts", parseSearchQuery, this.handleAccounts)
router.GET("/accounts/:address", addressParam, this.handleAccount)
router.GET("/accounts/:address/storage", addressParam, this.handleStorage)
router.GET("/accounts/:address/storage/:key", addressParam, keyParam, this.handleStorageAt)
// Blockchain
router.GET("/blockchain", this.handleBlockchainInfo)
router.GET("/blockchain/chain_id", this.handleChainId)
router.GET("/blockchain/genesis_hash", this.handleGenesisHash)
router.GET("/blockchain/latest_block_height", this.handleLatestBlockHeight)
router.GET("/blockchain/latest_block", this.handleLatestBlock)
router.GET("/blockchain/blocks", parseSearchQuery, this.handleBlocks)
router.GET("/blockchain/block/:height", heightParam, this.handleBlock)
// Consensus
router.GET("/consensus", this.handleConsensusState)
router.GET("/consensus/validators", this.handleValidatorList)
// Events
router.POST("/event_subs", this.handleEventSubscribe)
router.GET("/event_subs/:id", this.handleEventPoll)
router.DELETE("/event_subs/:id", this.handleEventUnsubscribe)
// NameReg
router.GET("/namereg", parseSearchQuery, this.handleNameRegEntries)
router.GET("/namereg/:key", nameParam, this.handleNameRegEntry)
// Network
router.GET("/network", this.handleNetworkInfo)
router.GET("/network/client_version", this.handleClientVersion)
router.GET("/network/moniker", this.handleMoniker)
router.GET("/network/listening", this.handleListening)
router.GET("/network/listeners", this.handleListeners)
router.GET("/network/peers", this.handlePeers)
router.GET("/network/peers/:address", peerAddressParam, this.handlePeer)
// Tx related (TODO get txs has still not been implemented)
router.POST("/txpool", this.handleBroadcastTx)
router.GET("/txpool", this.handleUnconfirmedTxs)
// Code execution
router.POST("/calls", this.handleCall)
router.POST("/codecalls", this.handleCallCode)
// Unsafe
router.GET("/unsafe/pa_generator", this.handleGenPrivAcc)
router.POST("/unsafe/txpool", parseTxModifier, this.handleTransact)
router.POST("/unsafe/namereg/txpool", this.handleTransactNameReg)
router.POST("/unsafe/tx_signer", this.handleSignTx)
this.running = true
}
// Is the server currently running?
func (this *RestServer) Running() bool {
return this.running
}
// Shut the server down. Does nothing.
func (this *RestServer) ShutDown() {
this.running = false
}
// ********************************* Accounts *********************************
func (this *RestServer) handleGenPrivAcc(c *gin.Context) {
addr := &AddressParam{}
var acc interface{}
var err error
if addr.Address == nil || len(addr.Address) == 0 {
acc, err = this.pipe.Accounts().GenPrivAccount()
} else {
acc, err = this.pipe.Accounts().GenPrivAccountFromKey(addr.Address)
}
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(acc, c.Writer)
}
func (this *RestServer) handleAccounts(c *gin.Context) {
var filters []*core_types.FilterData
fs, exists := c.Get("filters")
if exists {
filters = fs.([]*core_types.FilterData)
}
accs, err := this.pipe.Accounts().Accounts(filters)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(accs, c.Writer)
}
func (this *RestServer) handleAccount(c *gin.Context) {
addr := c.MustGet("addrBts").([]byte)
acc, err := this.pipe.Accounts().Account(addr)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(acc, c.Writer)
}
func (this *RestServer) handleStorage(c *gin.Context) {
addr := c.MustGet("addrBts").([]byte)
s, err := this.pipe.Accounts().Storage(addr)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(s, c.Writer)
}
func (this *RestServer) handleStorageAt(c *gin.Context) {
addr := c.MustGet("addrBts").([]byte)
key := c.MustGet("keyBts").([]byte)
sa, err := this.pipe.Accounts().StorageAt(addr, key)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(sa, c.Writer)
}
// ********************************* Blockchain *********************************
func (this *RestServer) handleBlockchainInfo(c *gin.Context) {
bci, err := this.pipe.Blockchain().Info()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(bci, c.Writer)
}
func (this *RestServer) handleGenesisHash(c *gin.Context) {
gh, err := this.pipe.Blockchain().GenesisHash()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.GenesisHash{gh}, c.Writer)
}
func (this *RestServer) handleChainId(c *gin.Context) {
cId, err := this.pipe.Blockchain().ChainId()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.ChainId{cId}, c.Writer)
}
func (this *RestServer) handleLatestBlockHeight(c *gin.Context) {
lbh, err := this.pipe.Blockchain().LatestBlockHeight()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.LatestBlockHeight{lbh}, c.Writer)
}
func (this *RestServer) handleLatestBlock(c *gin.Context) {
lb, err := this.pipe.Blockchain().LatestBlock()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(lb, c.Writer)
}
func (this *RestServer) handleBlocks(c *gin.Context) {
var filters []*core_types.FilterData
fs, exists := c.Get("filters")
if exists {
filters = fs.([]*core_types.FilterData)
}
blocks, err := this.pipe.Blockchain().Blocks(filters)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(blocks, c.Writer)
}
func (this *RestServer) handleBlock(c *gin.Context) {
height := c.MustGet("height").(int)
block, err := this.pipe.Blockchain().Block(height)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(block, c.Writer)
}
// ********************************* Consensus *********************************
func (this *RestServer) handleConsensusState(c *gin.Context) {
cs, err := this.pipe.Consensus().State()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(cs, c.Writer)
}
func (this *RestServer) handleValidatorList(c *gin.Context) {
vl, err := this.pipe.Consensus().Validators()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(vl, c.Writer)
}
// ********************************* Events *********************************
func (this *RestServer) handleEventSubscribe(c *gin.Context) {
param := &EventIdParam{}
errD := this.codec.Decode(param, c.Request.Body)
if errD != nil {
c.AbortWithError(500, errD)
}
subId, err := this.eventSubs.add(param.EventId)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.EventSub{subId}, c.Writer)
}
func (this *RestServer) handleEventPoll(c *gin.Context) {
subId := c.MustGet("id").(string)
data, err := this.eventSubs.poll(subId)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.PollResponse{data}, c.Writer)
}
func (this *RestServer) handleEventUnsubscribe(c *gin.Context) {
subId := c.MustGet("id").(string)
err := this.eventSubs.remove(subId)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.EventUnsub{true}, c.Writer)
}
// ********************************* NameReg *********************************
func (this *RestServer) handleNameRegEntries(c *gin.Context) {
var filters []*core_types.FilterData
fs, exists := c.Get("filters")
if exists {
filters = fs.([]*core_types.FilterData)
}
entries, err := this.pipe.NameReg().Entries(filters)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(entries, c.Writer)
}
func (this *RestServer) handleNameRegEntry(c *gin.Context) {
name := c.MustGet("name").(string)
entry, err := this.pipe.NameReg().Entry(name)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(entry, c.Writer)
}
// ********************************* Network *********************************
func (this *RestServer) handleNetworkInfo(c *gin.Context) {
nInfo, err := this.pipe.Net().Info()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(nInfo, c.Writer)
}
func (this *RestServer) handleClientVersion(c *gin.Context) {
version, err := this.pipe.Net().ClientVersion()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.ClientVersion{version}, c.Writer)
}
func (this *RestServer) handleMoniker(c *gin.Context) {
moniker, err := this.pipe.Net().Moniker()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.Moniker{moniker}, c.Writer)
}
func (this *RestServer) handleListening(c *gin.Context) {
listening, err := this.pipe.Net().Listening()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.Listening{listening}, c.Writer)
}
func (this *RestServer) handleListeners(c *gin.Context) {
listeners, err := this.pipe.Net().Listeners()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(&core_types.Listeners{listeners}, c.Writer)
}
func (this *RestServer) handlePeers(c *gin.Context) {
peers, err := this.pipe.Net().Peers()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(peers, c.Writer)
}
func (this *RestServer) handlePeer(c *gin.Context) {
address := c.MustGet("address").(string)
peer, err := this.pipe.Net().Peer(address)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(peer, c.Writer)
}
// ********************************* Transactions *********************************
func (this *RestServer) handleBroadcastTx(c *gin.Context) {
param := &txs.CallTx{}
errD := this.codec.Decode(param, c.Request.Body)
if errD != nil {
c.AbortWithError(500, errD)
}
receipt, err := this.pipe.Transactor().BroadcastTx(param)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(receipt, c.Writer)
}
func (this *RestServer) handleUnconfirmedTxs(c *gin.Context) {
txs, err := this.pipe.Transactor().UnconfirmedTxs()
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(txs, c.Writer)
}
func (this *RestServer) handleCall(c *gin.Context) {
param := &CallParam{}
errD := this.codec.Decode(param, c.Request.Body)
if errD != nil {
c.AbortWithError(500, errD)
}
call, err := this.pipe.Transactor().Call(param.From, param.Address, param.Data)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(call, c.Writer)
}
func (this *RestServer) handleCallCode(c *gin.Context) {
param := &CallCodeParam{}
errD := this.codec.Decode(param, c.Request.Body)
if errD != nil {
c.AbortWithError(500, errD)
}
call, err := this.pipe.Transactor().CallCode(param.From, param.Code, param.Data)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(call, c.Writer)
}
func (this *RestServer) handleTransact(c *gin.Context) {
_, hold := c.Get("hold")
param := &TransactParam{}
errD := this.codec.Decode(param, c.Request.Body)
if errD != nil {
c.AbortWithError(500, errD)
}
if hold {
res, err := this.pipe.Transactor().TransactAndHold(param.PrivKey, param.Address, param.Data, param.GasLimit, param.Fee)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(res, c.Writer)
} else {
receipt, err := this.pipe.Transactor().Transact(param.PrivKey, param.Address, param.Data, param.GasLimit, param.Fee)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(receipt, c.Writer)
}
}
func (this *RestServer) handleTransactNameReg(c *gin.Context) {
param := &TransactNameRegParam{}
errD := this.codec.Decode(param, c.Request.Body)
if errD != nil {
c.AbortWithError(500, errD)
}
receipt, err := this.pipe.Transactor().TransactNameReg(param.PrivKey, param.Name, param.Data, param.Amount, param.Fee)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(receipt, c.Writer)
}
func (this *RestServer) handleSignTx(c *gin.Context) {
param := &SignTxParam{}
errD := this.codec.Decode(param, c.Request.Body)
if errD != nil {
c.AbortWithError(500, errD)
}
tx, err := this.pipe.Transactor().SignTx(param.Tx, param.PrivAccounts)
if err != nil {
c.AbortWithError(500, err)
}
c.Writer.WriteHeader(200)
this.codec.Encode(tx, c.Writer)
}
// ********************************* Middleware *********************************
func addressParam(c *gin.Context) {
addr := c.Param("address")
if !util.IsAddress(addr) {
c.AbortWithError(400, fmt.Errorf("Malformed address param: "+addr))
}
bts, _ := hex.DecodeString(addr)
c.Set("addrBts", bts)
c.Next()
}
func nameParam(c *gin.Context) {
name := c.Param("key")
c.Set("name", name)
c.Next()
}
func keyParam(c *gin.Context) {
key := c.Param("key")
bts, err := hex.DecodeString(key)
if err != nil {
c.AbortWithError(400, err)
}
c.Set("keyBts", bts)
c.Next()
}
func heightParam(c *gin.Context) {
h, err := strconv.Atoi(c.Param("height"))
if err != nil {
c.AbortWithError(400, err)
}
if h < 0 {
c.AbortWithError(400, fmt.Errorf("Negative number used as height."))
}
c.Set("height", h)
c.Next()
}
func subIdParam(c *gin.Context) {
subId := c.Param("id")
if len(subId) != 64 || !util.IsHex(subId) {
c.AbortWithError(400, fmt.Errorf("Malformed event id"))
}
c.Set("id", subId)
c.Next()
}
// TODO
func peerAddressParam(c *gin.Context) {
subId := c.Param("address")
c.Set("address", subId)
c.Next()
}
func parseTxModifier(c *gin.Context) {
hold := c.Query("hold")
if hold == "true" {
c.Set("hold", true)
} else if hold != "" {
if hold != "false" {
c.Writer.WriteHeader(400)
c.Writer.Write([]byte("tx hold must be either 'true' or 'false', found: " + hold))
c.Abort()
}
}
}
func parseSearchQuery(c *gin.Context) {
q := c.Query("q")
if q != "" {
data, err := _parseSearchQuery(q)
if err != nil {
c.Writer.WriteHeader(400)
c.Writer.Write([]byte(err.Error()))
c.Abort()
// c.AbortWithError(400, err)
return
}
c.Set("filters", data)
}
}
func _parseSearchQuery(queryString string) ([]*core_types.FilterData, error) {
if len(queryString) == 0 {
return nil, nil
}
filters := strings.Split(queryString, " ")
fdArr := []*core_types.FilterData{}
for _, f := range filters {
kv := strings.Split(f, ":")
if len(kv) != 2 {
return nil, fmt.Errorf("Malformed query. Missing ':' separator: " + f)
}
if kv[0] == "" {
return nil, fmt.Errorf("Malformed query. Field name missing: " + f)
}
fd, fd2, errTfd := toFilterData(kv[0], kv[1])
if errTfd != nil {
return nil, errTfd
}
fdArr = append(fdArr, fd)
if fd2 != nil {
fdArr = append(fdArr, fd2)
}
}
return fdArr, nil
}
// Parse the query statement and create . Two filter data in case of a range param.
func toFilterData(field, stmt string) (*core_types.FilterData, *core_types.FilterData, error) {
// In case statement is empty
if stmt == "" {
return &core_types.FilterData{field, "==", ""}, nil, nil
}
// Simple routine based on string splitting. TODO add quoted range query.
if stmt[0] == '>' || stmt[0] == '<' || stmt[0] == '=' || stmt[0] == '!' {
// This means a normal operator. If one character then stop, otherwise
// peek at next and check if it's a "=".
if len(stmt) == 1 {
return &core_types.FilterData{field, stmt[0:1], ""}, nil, nil
} else if stmt[1] == '=' {
return &core_types.FilterData{field, stmt[:2], stmt[2:]}, nil, nil
} else {
return &core_types.FilterData{field, stmt[0:1], stmt[1:]}, nil, nil
}
} else {
// Either we have a range query here or a malformed query.
rng := strings.Split(stmt, "..")
// This is for when there is no op, but the value is not empty.
if len(rng) == 1 {
return &core_types.FilterData{field, "==", stmt}, nil, nil
}
// The rest.
if len(rng) != 2 || rng[0] == "" || rng[1] == "" {
return nil, nil, fmt.Errorf("Malformed query statement: " + stmt)
}
var min string
var max string
if rng[0] == "*" {
min = "min"
} else {
min = rng[0]
}
if rng[1] == "*" {
max = "max"
} else {
max = rng[1]
}
return &core_types.FilterData{field, ">=", min}, &core_types.FilterData{field, "<=", max}, nil
}
return nil, nil, nil
}
package core
import (
"encoding/json"
"fmt"
"github.com/tendermint/go-events"
log "github.com/eris-ltd/eris-logger"
core_types "github.com/eris-ltd/eris-db/core/types"
definitions "github.com/eris-ltd/eris-db/definitions"
rpc "github.com/eris-ltd/eris-db/rpc"
server "github.com/eris-ltd/eris-db/server"
)
// Used for ErisDb. Implements WebSocketService.
type ErisDbWsService struct {
codec rpc.Codec
pipe definitions.Pipe
defaultHandlers map[string]RequestHandlerFunc
}
// Create a new websocket service.
func NewErisDbWsService(codec rpc.Codec,
pipe definitions.Pipe) server.WebSocketService {
tmwss := &ErisDbWsService{codec: codec, pipe: pipe}
mtds := &ErisDbMethods{codec, pipe}
dhMap := mtds.getMethods()
// Events
dhMap[EVENT_SUBSCRIBE] = tmwss.EventSubscribe
dhMap[EVENT_UNSUBSCRIBE] = tmwss.EventUnsubscribe
tmwss.defaultHandlers = dhMap
return tmwss
}
// Process a request.
func (this *ErisDbWsService) Process(msg []byte, session *server.WSSession) {
log.Debug("REQUEST: %s\n", string(msg))
// Create new request object and unmarshal.
req := &rpc.RPCRequest{}
errU := json.Unmarshal(msg, req)
// Error when unmarshaling.
if errU != nil {
this.writeError("Failed to parse request: "+errU.Error()+" . Raw: "+string(msg), "", rpc.PARSE_ERROR, session)
return
}
// Wrong protocol version.
if req.JSONRPC != "2.0" {
this.writeError("Wrong protocol version: "+req.JSONRPC, req.Id, rpc.INVALID_REQUEST, session)
return
}
mName := req.Method
if handler, ok := this.defaultHandlers[mName]; ok {
resp, errCode, err := handler(req, session)
if err != nil {
this.writeError(err.Error(), req.Id, errCode, session)
} else {
this.writeResponse(req.Id, resp, session)
}
} else {
this.writeError("Method not found: "+mName, req.Id, rpc.METHOD_NOT_FOUND, session)
}
}
// Convenience method for writing error responses.
func (this *ErisDbWsService) writeError(msg, id string, code int, session *server.WSSession) {
response := rpc.NewRPCErrorResponse(id, code, msg)
bts, err := this.codec.EncodeBytes(response)
// If there's an error here all bets are off.
if err != nil {
panic("Failed to marshal standard error response." + err.Error())
}
session.Write(bts)
}
// Convenience method for writing responses.
func (this *ErisDbWsService) writeResponse(id string, result interface{}, session *server.WSSession) error {
response := rpc.NewRPCResponse(id, result)
bts, err := this.codec.EncodeBytes(response)
log.Debug("RESPONSE: %v\n", response)
if err != nil {
this.writeError("Internal error: "+err.Error(), id, rpc.INTERNAL_ERROR, session)
return err
}
return session.Write(bts)
}
// *************************************** Events ************************************
func (this *ErisDbWsService) EventSubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
session, ok := requester.(*server.WSSession)
if !ok {
return 0, rpc.INTERNAL_ERROR, fmt.Errorf("Passing wrong object to websocket events")
}
param := &EventIdParam{}
err := this.codec.DecodeBytes(param, request.Params)
if err != nil {
return nil, rpc.INVALID_PARAMS, err
}
eventId := param.EventId
subId, errSID := generateSubId()
if errSID != nil {
return nil, rpc.INTERNAL_ERROR, errSID
}
callback := func(ret events.EventData) {
this.writeResponse(subId, ret, session)
}
_, errC := this.pipe.Events().Subscribe(subId, eventId, callback)
if errC != nil {
return nil, rpc.INTERNAL_ERROR, errC
}
return &core_types.EventSub{subId}, 0, nil
}
func (this *ErisDbWsService) EventUnsubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
param := &EventIdParam{}
err := this.codec.DecodeBytes(param, request.Params)
if err != nil {
return nil, rpc.INVALID_PARAMS, err
}
eventId := param.EventId
result, errC := this.pipe.Events().Unsubscribe(eventId)
if errC != nil {
return nil, rpc.INTERNAL_ERROR, errC
}
return &core_types.EventUnsub{result}, 0, nil
}
func (this *ErisDbWsService) EventPoll(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
return nil, rpc.INTERNAL_ERROR, fmt.Errorf("Cannot poll with websockets")
}
// Copyright 2015, 2016 Eris Industries (UK) Ltd.
// This file is part of Eris-RT
// Eris-RT is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Eris-RT is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"fmt"
"math"
viper "github.com/spf13/viper"
)
type (
ServerConfig struct {
Bind Bind `toml:"bind"`
TLS TLS `toml:"TLS"`
CORS CORS `toml:"CORS"`
HTTP HTTP `toml:"HTTP"`
WebSocket WebSocket `toml:"web_socket"`
Logging Logging `toml:"logging"`
}
Bind struct {
Address string `toml:"address"`
Port uint16 `toml:"port"`
}
TLS struct {
TLS bool `toml:"tls"`
CertPath string `toml:"cert_path"`
KeyPath string `toml:"key_path"`
}
// Options stores configurations
CORS struct {
Enable bool `toml:"enable"`
AllowOrigins []string `toml:"allow_origins"`
AllowCredentials bool `toml:"allow_credentials"`
AllowMethods []string `toml:"allow_methods"`
AllowHeaders []string `toml:"allow_headers"`
ExposeHeaders []string `toml:"expose_headers"`
MaxAge uint64 `toml:"max_age"`
}
HTTP struct {
JsonRpcEndpoint string `toml:"json_rpc_endpoint"`
}
WebSocket struct {
WebSocketEndpoint string `toml:"websocket_endpoint"`
MaxWebSocketSessions uint16 `toml:"max_websocket_sessions"`
ReadBufferSize uint64 `toml:"read_buffer_size"`
WriteBufferSize uint64 `toml:"write_buffer_size"`
}
Logging struct {
ConsoleLogLevel string `toml:"console_log_level"`
FileLogLevel string `toml:"file_log_level"`
LogFile string `toml:"log_file"`
}
)
func ReadServerConfig(viper *viper.Viper) (*ServerConfig, error) {
// TODO: [ben] replace with a more elegant way of asserting
// the possible conversion to the type domain
// check domain range for bind.port
bindPortInt := viper.GetInt("bind.port")
var bindPortUint16 uint16 = 0
if bindPortInt >= 0 && bindPortInt <= math.MaxUint16 {
bindPortUint16 = uint16(bindPortInt)
} else {
return nil, fmt.Errorf("Failed to read binding port from configuration: %v",
bindPortInt)
}
// check domain range for cors.max_age
maxAge := viper.GetInt("cors.max_age")
var maxAgeUint64 uint64 = 0
if maxAge >= 0 {
maxAgeUint64 = uint64(maxAge)
} else {
return nil, fmt.Errorf("Failed to read maximum age for CORS: %v", maxAge)
}
// check domain range for websocket.max_sessions
maxWebsocketSessions := viper.GetInt("websocket.max_sessions")
var maxWebsocketSessionsUint16 uint16 = 0
if maxWebsocketSessions >= 0 && maxWebsocketSessions <= math.MaxUint16 {
maxWebsocketSessionsUint16 = uint16(maxWebsocketSessions)
} else {
return nil, fmt.Errorf("Failed to read maximum websocket sessions: %v",
maxWebsocketSessions)
}
// check domain range for websocket.read_buffer_size
readBufferSize := viper.GetInt("websocket.read_buffer_size")
var readBufferSizeUint64 uint64 = 0
if readBufferSize >= 0 {
readBufferSizeUint64 = uint64(readBufferSize)
} else {
return nil, fmt.Errorf("Failed to read websocket read buffer size: %v",
readBufferSize)
}
// check domain range for websocket.write_buffer_size
writeBufferSize := viper.GetInt("websocket.read_buffer_size")
var writeBufferSizeUint64 uint64 = 0
if writeBufferSize >= 0 {
writeBufferSizeUint64 = uint64(writeBufferSize)
} else {
return nil, fmt.Errorf("Failed to read websocket write buffer size: %v",
writeBufferSize)
}
return &ServerConfig {
Bind: Bind {
Address: viper.GetString("bind.address"),
Port: bindPortUint16,
},
TLS: TLS {
TLS: viper.GetBool("tls.tls"),
CertPath: viper.GetString("tls.cert_path"),
KeyPath: viper.GetString("tls.key_path"),
},
CORS: CORS {
Enable: viper.GetBool("cors.enable"),
AllowOrigins: viper.GetStringSlice("cors.allow_origins"),
AllowCredentials: viper.GetBool("cors.allow_credentials"),
AllowMethods: viper.GetStringSlice("cors.allow_methods"),
AllowHeaders: viper.GetStringSlice("cors.allow_headers"),
ExposeHeaders: viper.GetStringSlice("cors.expose_headers"),
MaxAge: maxAgeUint64,
},
HTTP: HTTP {
JsonRpcEndpoint: viper.GetString("http.json_rpc_endpoint"),
},
WebSocket: WebSocket {
WebSocketEndpoint: viper.GetString("websocket.endpoint"),
MaxWebSocketSessions: maxWebsocketSessionsUint16,
ReadBufferSize: readBufferSizeUint64,
WriteBufferSize: writeBufferSizeUint64,
},
Logging: Logging{
ConsoleLogLevel: viper.GetString("logging.console_log_level"),
FileLogLevel: viper.GetString("logging.file_log_level"),
LogFile: viper.GetString("logging.log_file"),
},
}, nil
}
......@@ -5,15 +5,13 @@ import (
"os"
"github.com/tendermint/log15"
cfg "github.com/eris-ltd/eris-db/config"
)
var rootHandler log15.Handler
// This is basically the same code as in tendermint. Initialize root
// and maybe later also track the loggers here.
func InitLogger(config *cfg.ServerConfig) {
func InitLogger(config *ServerConfig) {
consoleLogLevel := config.Logging.ConsoleLogLevel
......
......@@ -7,8 +7,6 @@ import (
"net/http"
"time"
cfg "github.com/eris-ltd/eris-db/config"
"github.com/gin-gonic/gin"
cors "github.com/tommy351/gin-cors"
"gopkg.in/tylerb/graceful.v1"
......@@ -24,7 +22,7 @@ type HttpService interface {
// A server serves a number of different http calls.
type Server interface {
Start(*cfg.ServerConfig, *gin.Engine)
Start(*ServerConfig, *gin.Engine)
Running() bool
ShutDown()
}
......@@ -38,7 +36,7 @@ type Server interface {
// 'Start()'. Stop event listeners can be added up to the point where
// the server is stopped and the event is fired.
type ServeProcess struct {
config *cfg.ServerConfig
config *ServerConfig
servers []Server
stopChan chan struct{}
stoppedChan chan struct{}
......@@ -186,10 +184,11 @@ func (this *ServeProcess) StopEventChannel() <-chan struct{} {
}
// Creates a new serve process.
func NewServeProcess(config *cfg.ServerConfig, servers ...Server) *ServeProcess {
var scfg cfg.ServerConfig
func NewServeProcess(config *ServerConfig, servers ...Server) (*ServeProcess,
error) {
var scfg ServerConfig
if config == nil {
scfg = cfg.DefaultServerConfig()
return nil, fmt.Errorf("Nil passed as server configuration")
} else {
scfg = *config
}
......@@ -198,7 +197,7 @@ func NewServeProcess(config *cfg.ServerConfig, servers ...Server) *ServeProcess
startListeners := make([]chan struct{}, 0)
stopListeners := make([]chan struct{}, 0)
sp := &ServeProcess{&scfg, servers, stopChan, stoppedChan, startListeners, stopListeners, nil}
return sp
return sp, nil
}
// Used to enable log15 logging instead of the default Gin logging.
......@@ -219,7 +218,7 @@ func logHandler(c *gin.Context) {
}
func NewCORSMiddleware(options cfg.CORS) gin.HandlerFunc {
func NewCORSMiddleware(options CORS) gin.HandlerFunc {
o := cors.Options{
AllowCredentials: options.AllowCredentials,
AllowHeaders: options.AllowHeaders,
......
......@@ -6,8 +6,6 @@ import (
"sync"
"time"
cfg "github.com/eris-ltd/eris-db/config"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
......@@ -42,7 +40,7 @@ type WebSocketServer struct {
running bool
maxSessions uint
sessionManager *SessionManager
config *cfg.ServerConfig
config *ServerConfig
allOrigins bool
}
......@@ -59,7 +57,7 @@ func NewWebSocketServer(maxSessions uint, service WebSocketService) *WebSocketSe
}
// Start the server. Adds the handler to the router and sets everything up.
func (this *WebSocketServer) Start(config *cfg.ServerConfig, router *gin.Engine) {
func (this *WebSocketServer) Start(config *ServerConfig, router *gin.Engine) {
this.config = config
......
......@@ -66,7 +66,26 @@ genesis_file = "genesis.json"
[servers]
[servers.bind]
address = ""
port = 1337
[servers.tls]
tls=false
cert_path=""
key_path=""
[servers.cors]
enable=false
allow_origins=[]
allow_credentials=false
allow_methods=[]
allow_headers=[]
expose_headers=[]
max_age=0
[servers.http]
json_rpc_endpoint="/rpc"
[servers.websocket]
enable = true
......@@ -75,6 +94,11 @@ genesis_file = "genesis.json"
read_buffer_size = 4096
write_buffer_size = 4096
[servers.logging]
console_log_level="info"
file_log_level="warn"
log_file=""
################################################################################
################################################################################
##
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment