Newer
Older
ep "github.com/eris-ltd/eris-db/erisdb/pipe"
rpc "github.com/eris-ltd/eris-db/rpc"
"github.com/eris-ltd/eris-db/server"
"github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/types"
)
// Used for ErisDb. Implements WebSocketService.
type ErisDbWsService struct {
codec rpc.Codec
pipe ep.Pipe
defaultHandlers map[string]RequestHandlerFunc
}
// Create a new websocket service.
func NewErisDbWsService(codec rpc.Codec, pipe ep.Pipe) server.WebSocketService {
tmwss := &ErisDbWsService{codec: codec, pipe: pipe}
mtds := &ErisDbMethods{codec, pipe}
dhMap := mtds.getMethods()
// Events
dhMap[EVENT_SUBSCRIBE] = tmwss.EventSubscribe
dhMap[EVENT_UNSUBSCRIBE] = tmwss.EventUnsubscribe
tmwss.defaultHandlers = dhMap
return tmwss
}
// Process a request.
func (this *ErisDbWsService) Process(msg []byte, session *server.WSSession) {
// Create new request object and unmarshal.
req := &rpc.RPCRequest{}
this.writeError("Failed to parse request: "+errU.Error()+" . Raw: "+string(msg), "", rpc.PARSE_ERROR, session)
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
return
}
// Wrong protocol version.
if req.JSONRPC != "2.0" {
this.writeError("Wrong protocol version: "+req.JSONRPC, req.Id, rpc.INVALID_REQUEST, session)
return
}
mName := req.Method
if handler, ok := this.defaultHandlers[mName]; ok {
resp, errCode, err := handler(req, session)
if err != nil {
this.writeError(err.Error(), req.Id, errCode, session)
} else {
this.writeResponse(req.Id, resp, session)
}
} else {
this.writeError("Method not found: "+mName, req.Id, rpc.METHOD_NOT_FOUND, session)
}
}
// Convenience method for writing error responses.
func (this *ErisDbWsService) writeError(msg, id string, code int, session *server.WSSession) {
response := rpc.NewRPCErrorResponse(id, code, msg)
bts, err := this.codec.EncodeBytes(response)
// If there's an error here all bets are off.
if err != nil {
panic("Failed to marshal standard error response." + err.Error())
}
session.Write(bts)
}
// Convenience method for writing responses.
func (this *ErisDbWsService) writeResponse(id string, result interface{}, session *server.WSSession) error {
response := rpc.NewRPCResponse(id, result)
bts, err := this.codec.EncodeBytes(response)
if err != nil {
this.writeError("Internal error: "+err.Error(), id, rpc.INTERNAL_ERROR, session)
}
// *************************************** Events ************************************
func (this *ErisDbWsService) EventSubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
session, ok := requester.(*server.WSSession)
if !ok {
return 0, rpc.INTERNAL_ERROR, fmt.Errorf("Passing wrong object to websocket events")
}
param := &EventIdParam{}
err := this.codec.DecodeBytes(param, request.Params)
if err != nil {
return nil, rpc.INVALID_PARAMS, err
}
eventId := param.EventId
subId, errSID := generateSubId()
if errSID != nil {
return nil, rpc.INTERNAL_ERROR, errSID
}
writeErr := this.writeResponse(subId, ret, session)
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
}
_, errC := this.pipe.Events().Subscribe(subId, eventId, callback)
if errC != nil {
return nil, rpc.INTERNAL_ERROR, errC
}
return &ep.EventSub{subId}, 0, nil
}
func (this *ErisDbWsService) EventUnsubscribe(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
param := &EventIdParam{}
err := this.codec.DecodeBytes(param, request.Params)
if err != nil {
return nil, rpc.INVALID_PARAMS, err
}
eventId := param.EventId
result, errC := this.pipe.Events().Unsubscribe(eventId)
if errC != nil {
return nil, rpc.INTERNAL_ERROR, errC
}
return &ep.EventUnsub{result}, 0, nil
}
func (this *ErisDbWsService) EventPoll(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) {
return nil, rpc.INTERNAL_ERROR, fmt.Errorf("Cannot poll with websockets")
}