Newer
Older
// Copyright 2015, 2016 Eris Industries (UK) Ltd.
// This file is part of Eris-RT
// Eris-RT is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Eris-RT is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Eris-RT. If not, see <http://www.gnu.org/licenses/>.
// Taken originally from github.com/tendermint/tmsp/server.go
package tmsp
import (
"bufio"
"fmt"
"io"
"net"
"strings"
"sync"
. "github.com/tendermint/go-common"
Benjamin Bollen
committed
tmsp_types "github.com/tendermint/tmsp/types"
manager_types "github.com/eris-ltd/eris-db/manager/types"
)
// var maxNumberConnections = 2
type Server struct {
QuitService
proto string
addr string
listener net.Listener
appMtx sync.Mutex
Benjamin Bollen
committed
app manager_types.Application
Benjamin Bollen
committed
func NewServer(protoAddr string, app manager_types.Application) (*Server, error) {
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
parts := strings.SplitN(protoAddr, "://", 2)
proto, addr := parts[0], parts[1]
s := &Server{
proto: proto,
addr: addr,
listener: nil,
app: app,
}
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
_, err := s.Start() // Just start it
return s, err
}
func (s *Server) OnStart() error {
s.QuitService.OnStart()
ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}
s.listener = ln
go s.acceptConnectionsRoutine()
return nil
}
func (s *Server) OnStop() {
s.QuitService.OnStop()
s.listener.Close()
}
func (s *Server) acceptConnectionsRoutine() {
// semaphore := make(chan struct{}, maxNumberConnections)
for {
// semaphore <- struct{}{}
// Accept a connection
fmt.Println("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
Exit("Failed to accept connection: " + err.Error())
} else {
fmt.Println("Accepted a new connection")
}
closeConn := make(chan error, 2) // Push to signal connection closed
Benjamin Bollen
committed
responses := make(chan *tmsp_types.Response, 1000) // A channel to buffer responses
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Read requests from conn and deal with them
go s.handleRequests(closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(closeConn, responses, conn)
go func() {
// Wait until signal to close connection
errClose := <-closeConn
if errClose != nil {
fmt.Printf("Connection error: %v\n", errClose)
} else {
fmt.Println("Connection was closed.")
}
// Close the connection
err := conn.Close()
if err != nil {
fmt.Printf("Error in closing connection: %v\n", err)
}
// <-semaphore
}()
}
}
// Read requests from conn and deal with them
Benjamin Bollen
committed
func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *tmsp_types.Response) {
var count int
var bufReader = bufio.NewReader(conn)
for {
Benjamin Bollen
committed
var req = &tmsp_types.Request{}
err := tmsp_types.ReadMessage(bufReader, req)
if err != nil {
if err == io.EOF {
closeConn <- fmt.Errorf("Connection closed by client")
} else {
closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error())
}
return
}
s.appMtx.Lock()
count++
s.handleRequest(req, responses)
s.appMtx.Unlock()
}
}
Benjamin Bollen
committed
func (s *Server) handleRequest(req *tmsp_types.Request, responses chan<- *tmsp_types.Response) {
switch r := req.Value.(type) {
case *tmsp_types.Request_Echo:
responses <- tmsp_types.ToResponseEcho(r.Echo.Message)
case *tmsp_types.Request_Flush:
responses <- tmsp_types.ToResponseFlush()
case *tmsp_types.Request_Info:
responses <- tmsp_types.ToResponseInfo(data)
case *tmsp_types.Request_SetOption:
so := r.SetOption
logStr := s.app.SetOption(so.Key, so.Value)
responses <- tmsp_types.ToResponseSetOption(logStr)
case *tmsp_types.Request_AppendTx:
res := s.app.AppendTx(r.AppendTx.Tx)
responses <- tmsp_types.ToResponseAppendTx(res.Code, res.Data, res.Log)
case *tmsp_types.Request_CheckTx:
res := s.app.CheckTx(r.CheckTx.Tx)
responses <- tmsp_types.ToResponseCheckTx(res.Code, res.Data, res.Log)
case *tmsp_types.Request_Commit:
responses <- tmsp_types.ToResponseCommit(res.Code, res.Data, res.Log)
case *tmsp_types.Request_Query:
res := s.app.Query(r.Query.Query)
responses <- tmsp_types.ToResponseQuery(res.Code, res.Data, res.Log)
case *tmsp_types.Request_InitChain:
if app, ok := s.app.(tmsp_types.BlockchainAware); ok {
app.InitChain(r.InitChain.Validators)
responses <- tmsp_types.ToResponseInitChain()
responses <- tmsp_types.ToResponseInitChain()
case *tmsp_types.Request_EndBlock:
if app, ok := s.app.(tmsp_types.BlockchainAware); ok {
validators := app.EndBlock(r.EndBlock.Height)
responses <- tmsp_types.ToResponseEndBlock(validators)
responses <- tmsp_types.ToResponseEndBlock(nil)
responses <- tmsp_types.ToResponseException("Unknown request")
}
}
// Pull responses from 'responses' and write them to conn.
Benjamin Bollen
committed
func (s *Server) handleResponses(closeConn chan error, responses <-chan *tmsp_types.Response, conn net.Conn) {
var count int
var bufWriter = bufio.NewWriter(conn)
for {
var res = <-responses
Benjamin Bollen
committed
err := tmsp_types.WriteMessage(res, bufWriter)
if err != nil {
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
return
}
if _, ok := res.Value.(*tmsp_types.Response_Flush); ok {
err = bufWriter.Flush()
if err != nil {
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
return
}
}
count++
}
}