From 5b79d411476d0e50aa066f8d0d5a89202619ae31 Mon Sep 17 00:00:00 2001 From: Silas Davis <silas@erisindustries.com> Date: Thu, 16 Feb 2017 14:33:22 +0000 Subject: [PATCH] Replace further instances of old style logging and improve structure --- core/core.go | 6 +- glide.lock | 2 + logging/loggers/shared_test.go | 4 +- manager/eris-mint/evm/log.go | 21 -- manager/eris-mint/state/execution.go | 318 ++++++++++++++------------- manager/eris-mint/state/log.go | 21 -- server/log.go | 21 -- server/server.go | 93 +++++--- server/websocket.go | 206 +++++++++-------- 9 files changed, 336 insertions(+), 356 deletions(-) delete mode 100644 manager/eris-mint/evm/log.go delete mode 100644 manager/eris-mint/state/log.go delete mode 100644 server/log.go diff --git a/core/core.go b/core/core.go index 144b4d4e..12ba4005 100644 --- a/core/core.go +++ b/core/core.go @@ -41,6 +41,7 @@ type Core struct { evsw events.EventSwitch pipe definitions.Pipe tendermintPipe definitions.TendermintPipe + logger logging_types.InfoTraceLogger } func NewCore(chainId string, @@ -73,6 +74,7 @@ func NewCore(chainId string, evsw: evsw, pipe: pipe, tendermintPipe: tendermintPipe, + logger: logger, }, nil } @@ -99,9 +101,9 @@ func (core *Core) NewGatewayV0(config *server.ServerConfig) (*server.ServeProces jsonServer := rpc_v0.NewJsonRpcServer(tmjs) restServer := rpc_v0.NewRestServer(codec, core.pipe, eventSubscriptions) wsServer := server.NewWebSocketServer(config.WebSocket.MaxWebSocketSessions, - tmwss) + tmwss, core.logger) // Create a server process. - proc, err := server.NewServeProcess(config, jsonServer, restServer, wsServer) + proc, err := server.NewServeProcess(config, core.logger, jsonServer, restServer, wsServer) if err != nil { return nil, fmt.Errorf("Failed to load gateway: %v", err) } diff --git a/glide.lock b/glide.lock index 2336fc21..7c043776 100644 --- a/glide.lock +++ b/glide.lock @@ -33,6 +33,8 @@ imports: version: 114ebc77443db9a153692233294e48bc7e184215 - name: github.com/fsnotify/fsnotify version: 30411dbcefb7a1da7e84f75530ad3abe4011b4f8 +- name: github.com/Graylog2/go-gelf + version: 5bfd5bbbfe86489017c268bf86539fcec7e28d8e - name: github.com/gin-gonic/gin version: f931d1ea80ae95a6fc739213cdd9399bd2967fb6 subpackages: diff --git a/logging/loggers/shared_test.go b/logging/loggers/shared_test.go index 56445b6e..86783f49 100644 --- a/logging/loggers/shared_test.go +++ b/logging/loggers/shared_test.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/go-kit/kit/log" + kitlog "github.com/go-kit/kit/log" ) const logLineTimeout time.Duration = time.Second @@ -50,7 +50,7 @@ func newTestLogger() *testLogger { func makeTestLogger(err error) *testLogger { cl := NewChannelLogger(100) logLineCh := make(chan ([]interface{})) - go cl.DrainForever(log.LoggerFunc(func(keyvals ...interface{}) error { + go cl.DrainForever(kitlog.LoggerFunc(func(keyvals ...interface{}) error { logLineCh <- keyvals return nil })) diff --git a/manager/eris-mint/evm/log.go b/manager/eris-mint/evm/log.go deleted file mode 100644 index 8b556519..00000000 --- a/manager/eris-mint/evm/log.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2017 Monax Industries Limited -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package vm - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "vm") diff --git a/manager/eris-mint/state/execution.go b/manager/eris-mint/state/execution.go index 80b5474f..147758e0 100644 --- a/manager/eris-mint/state/execution.go +++ b/manager/eris-mint/state/execution.go @@ -22,6 +22,7 @@ import ( acm "github.com/eris-ltd/eris-db/account" "github.com/eris-ltd/eris-db/common/sanity" core_types "github.com/eris-ltd/eris-db/core/types" + logging_types "github.com/eris-ltd/eris-db/logging/types" "github.com/eris-ltd/eris-db/manager/eris-mint/evm" ptypes "github.com/eris-ltd/eris-db/permission/types" // for GlobalPermissionAddress ... "github.com/eris-ltd/eris-db/txs" @@ -316,7 +317,8 @@ func adjustByOutputs(accounts map[string]*acm.Account, outs []*txs.TxOutput) { // If the tx is invalid, an error will be returned. // Unlike ExecBlock(), state will not be altered. -func ExecTx(blockCache *BlockCache, tx txs.Tx, runCall bool, evc events.Fireable) (err error) { +func ExecTx(blockCache *BlockCache, tx txs.Tx, runCall bool, evc events.Fireable, + logger logging_types.InfoTraceLogger) (err error) { // TODO: do something with fees fees := int64(0) @@ -515,7 +517,7 @@ func ExecTx(blockCache *BlockCache, tx txs.Tx, runCall bool, evc events.Fireable CALL_COMPLETE: // err may or may not be nil. - // Create a receipt from the ret and whether errored. + // Create a receipt from the ret and whether errored. log.Notice("VM call complete", "caller", caller, "callee", callee, "return", ret, "err", err) // Fire Events for sender and receiver @@ -665,182 +667,182 @@ func ExecTx(blockCache *BlockCache, tx txs.Tx, runCall bool, evc events.Fireable return nil - // Consensus related Txs inactivated for now - // TODO! - /* - case *txs.BondTx: - valInfo := blockCache.State().GetValidatorInfo(tx.PubKey.Address()) - if valInfo != nil { - // TODO: In the future, check that the validator wasn't destroyed, - // add funds, merge UnbondTo outputs, and unbond validator. - return errors.New("Adding coins to existing validators not yet supported") - } + // Consensus related Txs inactivated for now + // TODO! + /* + case *txs.BondTx: + valInfo := blockCache.State().GetValidatorInfo(tx.PubKey.Address()) + if valInfo != nil { + // TODO: In the future, check that the validator wasn't destroyed, + // add funds, merge UnbondTo outputs, and unbond validator. + return errors.New("Adding coins to existing validators not yet supported") + } - accounts, err := getInputs(blockCache, tx.Inputs) - if err != nil { - return err - } + accounts, err := getInputs(blockCache, tx.Inputs) + if err != nil { + return err + } - // add outputs to accounts map - // if any outputs don't exist, all inputs must have CreateAccount perm - // though outputs aren't created until unbonding/release time - canCreate := hasCreateAccountPermission(blockCache, accounts) - for _, out := range tx.UnbondTo { - acc := blockCache.GetAccount(out.Address) - if acc == nil && !canCreate { - return fmt.Errorf("At least one input does not have permission to create accounts") - } + // add outputs to accounts map + // if any outputs don't exist, all inputs must have CreateAccount perm + // though outputs aren't created until unbonding/release time + canCreate := hasCreateAccountPermission(blockCache, accounts) + for _, out := range tx.UnbondTo { + acc := blockCache.GetAccount(out.Address) + if acc == nil && !canCreate { + return fmt.Errorf("At least one input does not have permission to create accounts") } + } - bondAcc := blockCache.GetAccount(tx.PubKey.Address()) - if !hasBondPermission(blockCache, bondAcc) { - return fmt.Errorf("The bonder does not have permission to bond") - } + bondAcc := blockCache.GetAccount(tx.PubKey.Address()) + if !hasBondPermission(blockCache, bondAcc) { + return fmt.Errorf("The bonder does not have permission to bond") + } - if !hasBondOrSendPermission(blockCache, accounts) { - return fmt.Errorf("At least one input lacks permission to bond") - } + if !hasBondOrSendPermission(blockCache, accounts) { + return fmt.Errorf("At least one input lacks permission to bond") + } - signBytes := acm.SignBytes(_s.ChainID, tx) - inTotal, err := validateInputs(accounts, signBytes, tx.Inputs) - if err != nil { - return err - } - if !tx.PubKey.VerifyBytes(signBytes, tx.Signature) { - return txs.ErrTxInvalidSignature - } - outTotal, err := validateOutputs(tx.UnbondTo) - if err != nil { - return err - } - if outTotal > inTotal { - return txs.ErrTxInsufficientFunds - } - fee := inTotal - outTotal - fees += fee + signBytes := acm.SignBytes(_s.ChainID, tx) + inTotal, err := validateInputs(accounts, signBytes, tx.Inputs) + if err != nil { + return err + } + if !tx.PubKey.VerifyBytes(signBytes, tx.Signature) { + return txs.ErrTxInvalidSignature + } + outTotal, err := validateOutputs(tx.UnbondTo) + if err != nil { + return err + } + if outTotal > inTotal { + return txs.ErrTxInsufficientFunds + } + fee := inTotal - outTotal + fees += fee - // Good! Adjust accounts - adjustByInputs(accounts, tx.Inputs) - for _, acc := range accounts { - blockCache.UpdateAccount(acc) - } - // Add ValidatorInfo - _s.SetValidatorInfo(&txs.ValidatorInfo{ - Address: tx.PubKey.Address(), - PubKey: tx.PubKey, - UnbondTo: tx.UnbondTo, - FirstBondHeight: _s.LastBlockHeight + 1, - FirstBondAmount: outTotal, - }) - // Add Validator - added := _s.BondedValidators.Add(&txs.Validator{ - Address: tx.PubKey.Address(), - PubKey: tx.PubKey, - BondHeight: _s.LastBlockHeight + 1, - VotingPower: outTotal, - Accum: 0, - }) - if !added { - PanicCrisis("Failed to add validator") - } - if evc != nil { - // TODO: fire for all inputs - evc.FireEvent(txs.EventStringBond(), txs.EventDataTx{tx, nil, ""}) - } - return nil + // Good! Adjust accounts + adjustByInputs(accounts, tx.Inputs) + for _, acc := range accounts { + blockCache.UpdateAccount(acc) + } + // Add ValidatorInfo + _s.SetValidatorInfo(&txs.ValidatorInfo{ + Address: tx.PubKey.Address(), + PubKey: tx.PubKey, + UnbondTo: tx.UnbondTo, + FirstBondHeight: _s.LastBlockHeight + 1, + FirstBondAmount: outTotal, + }) + // Add Validator + added := _s.BondedValidators.Add(&txs.Validator{ + Address: tx.PubKey.Address(), + PubKey: tx.PubKey, + BondHeight: _s.LastBlockHeight + 1, + VotingPower: outTotal, + Accum: 0, + }) + if !added { + PanicCrisis("Failed to add validator") + } + if evc != nil { + // TODO: fire for all inputs + evc.FireEvent(txs.EventStringBond(), txs.EventDataTx{tx, nil, ""}) + } + return nil - case *txs.UnbondTx: - // The validator must be active - _, val := _s.BondedValidators.GetByAddress(tx.Address) - if val == nil { - return txs.ErrTxInvalidAddress - } + case *txs.UnbondTx: + // The validator must be active + _, val := _s.BondedValidators.GetByAddress(tx.Address) + if val == nil { + return txs.ErrTxInvalidAddress + } - // Verify the signature - signBytes := acm.SignBytes(_s.ChainID, tx) - if !val.PubKey.VerifyBytes(signBytes, tx.Signature) { - return txs.ErrTxInvalidSignature - } + // Verify the signature + signBytes := acm.SignBytes(_s.ChainID, tx) + if !val.PubKey.VerifyBytes(signBytes, tx.Signature) { + return txs.ErrTxInvalidSignature + } - // tx.Height must be greater than val.LastCommitHeight - if tx.Height <= val.LastCommitHeight { - return errors.New("Invalid unbond height") - } + // tx.Height must be greater than val.LastCommitHeight + if tx.Height <= val.LastCommitHeight { + return errors.New("Invalid unbond height") + } - // Good! - _s.unbondValidator(val) - if evc != nil { - evc.FireEvent(txs.EventStringUnbond(), txs.EventDataTx{tx, nil, ""}) - } - return nil + // Good! + _s.unbondValidator(val) + if evc != nil { + evc.FireEvent(txs.EventStringUnbond(), txs.EventDataTx{tx, nil, ""}) + } + return nil - case *txs.RebondTx: - // The validator must be inactive - _, val := _s.UnbondingValidators.GetByAddress(tx.Address) - if val == nil { - return txs.ErrTxInvalidAddress - } + case *txs.RebondTx: + // The validator must be inactive + _, val := _s.UnbondingValidators.GetByAddress(tx.Address) + if val == nil { + return txs.ErrTxInvalidAddress + } - // Verify the signature - signBytes := acm.SignBytes(_s.ChainID, tx) - if !val.PubKey.VerifyBytes(signBytes, tx.Signature) { - return txs.ErrTxInvalidSignature - } + // Verify the signature + signBytes := acm.SignBytes(_s.ChainID, tx) + if !val.PubKey.VerifyBytes(signBytes, tx.Signature) { + return txs.ErrTxInvalidSignature + } - // tx.Height must be in a suitable range - minRebondHeight := _s.LastBlockHeight - (validatorTimeoutBlocks / 2) - maxRebondHeight := _s.LastBlockHeight + 2 - if !((minRebondHeight <= tx.Height) && (tx.Height <= maxRebondHeight)) { - return errors.New(Fmt("Rebond height not in range. Expected %v <= %v <= %v", - minRebondHeight, tx.Height, maxRebondHeight)) - } + // tx.Height must be in a suitable range + minRebondHeight := _s.LastBlockHeight - (validatorTimeoutBlocks / 2) + maxRebondHeight := _s.LastBlockHeight + 2 + if !((minRebondHeight <= tx.Height) && (tx.Height <= maxRebondHeight)) { + return errors.New(Fmt("Rebond height not in range. Expected %v <= %v <= %v", + minRebondHeight, tx.Height, maxRebondHeight)) + } - // Good! - _s.rebondValidator(val) - if evc != nil { - evc.FireEvent(txs.EventStringRebond(), txs.EventDataTx{tx, nil, ""}) - } - return nil + // Good! + _s.rebondValidator(val) + if evc != nil { + evc.FireEvent(txs.EventStringRebond(), txs.EventDataTx{tx, nil, ""}) + } + return nil - case *txs.DupeoutTx: - // Verify the signatures - _, accused := _s.BondedValidators.GetByAddress(tx.Address) + case *txs.DupeoutTx: + // Verify the signatures + _, accused := _s.BondedValidators.GetByAddress(tx.Address) + if accused == nil { + _, accused = _s.UnbondingValidators.GetByAddress(tx.Address) if accused == nil { - _, accused = _s.UnbondingValidators.GetByAddress(tx.Address) - if accused == nil { - return txs.ErrTxInvalidAddress - } - } - voteASignBytes := acm.SignBytes(_s.ChainID, &tx.VoteA) - voteBSignBytes := acm.SignBytes(_s.ChainID, &tx.VoteB) - if !accused.PubKey.VerifyBytes(voteASignBytes, tx.VoteA.Signature) || - !accused.PubKey.VerifyBytes(voteBSignBytes, tx.VoteB.Signature) { - return txs.ErrTxInvalidSignature + return txs.ErrTxInvalidAddress } + } + voteASignBytes := acm.SignBytes(_s.ChainID, &tx.VoteA) + voteBSignBytes := acm.SignBytes(_s.ChainID, &tx.VoteB) + if !accused.PubKey.VerifyBytes(voteASignBytes, tx.VoteA.Signature) || + !accused.PubKey.VerifyBytes(voteBSignBytes, tx.VoteB.Signature) { + return txs.ErrTxInvalidSignature + } - // Verify equivocation - // TODO: in the future, just require one vote from a previous height that - // doesn't exist on this chain. - if tx.VoteA.Height != tx.VoteB.Height { - return errors.New("DupeoutTx heights don't match") - } - if tx.VoteA.Round != tx.VoteB.Round { - return errors.New("DupeoutTx rounds don't match") - } - if tx.VoteA.Type != tx.VoteB.Type { - return errors.New("DupeoutTx types don't match") - } - if bytes.Equal(tx.VoteA.BlockHash, tx.VoteB.BlockHash) { - return errors.New("DupeoutTx blockhashes shouldn't match") - } + // Verify equivocation + // TODO: in the future, just require one vote from a previous height that + // doesn't exist on this chain. + if tx.VoteA.Height != tx.VoteB.Height { + return errors.New("DupeoutTx heights don't match") + } + if tx.VoteA.Round != tx.VoteB.Round { + return errors.New("DupeoutTx rounds don't match") + } + if tx.VoteA.Type != tx.VoteB.Type { + return errors.New("DupeoutTx types don't match") + } + if bytes.Equal(tx.VoteA.BlockHash, tx.VoteB.BlockHash) { + return errors.New("DupeoutTx blockhashes shouldn't match") + } - // Good! (Bad validator!) - _s.destroyValidator(accused) - if evc != nil { - evc.FireEvent(txs.EventStringDupeout(), txs.EventDataTx{tx, nil, ""}) - } - return nil - */ + // Good! (Bad validator!) + _s.destroyValidator(accused) + if evc != nil { + evc.FireEvent(txs.EventStringDupeout(), txs.EventDataTx{tx, nil, ""}) + } + return nil + */ case *txs.PermissionsTx: var inAcc *acm.Account diff --git a/manager/eris-mint/state/log.go b/manager/eris-mint/state/log.go deleted file mode 100644 index d8e71b79..00000000 --- a/manager/eris-mint/state/log.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2017 Monax Industries Limited -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package state - -import ( - "github.com/tendermint/go-logger" -) - -var log = logger.New("module", "state") diff --git a/server/log.go b/server/log.go deleted file mode 100644 index e2bcb50b..00000000 --- a/server/log.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2017 Monax Industries Limited -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "github.com/tendermint/log15" -) - -var log = log15.New("module", "eris/server") diff --git a/server/server.go b/server/server.go index e5ace666..3c066f96 100644 --- a/server/server.go +++ b/server/server.go @@ -21,6 +21,8 @@ import ( "net/http" "time" + "github.com/eris-ltd/eris-db/logging" + logging_types "github.com/eris-ltd/eris-db/logging/types" "github.com/gin-gonic/gin" cors "github.com/tommy351/gin-cors" "gopkg.in/tylerb/graceful.v1" @@ -57,17 +59,18 @@ type ServeProcess struct { startListenChans []chan struct{} stopListenChans []chan struct{} srv *graceful.Server + logger logging_types.InfoTraceLogger } // Initializes all the servers and starts listening for connections. -func (this *ServeProcess) Start() error { +func (serveProcess *ServeProcess) Start() error { router := gin.New() - config := this.config + config := serveProcess.config ch := NewCORSMiddleware(config.CORS) - router.Use(gin.Recovery(), logHandler, contentTypeMW, ch) + router.Use(gin.Recovery(), logHandler(serveProcess.logger), contentTypeMW, ch) address := config.Bind.Address port := config.Bind.Port @@ -84,7 +87,7 @@ func (this *ServeProcess) Start() error { } // Start the servers/handlers. - for _, s := range this.servers { + for _, s := range serveProcess.servers { s.Start(config, router) } @@ -117,15 +120,18 @@ func (this *ServeProcess) Start() error { } else { lst = l } - this.srv = srv - log.Info("Server started.") - for _, c := range this.startListenChans { + serveProcess.srv = srv + logging.InfoMsg(serveProcess.logger, "Server started.", + "chain_id", serveProcess.config.ChainId, + "address", serveProcess.config.Bind.Address, + "port", serveProcess.config.Bind.Port) + for _, c := range serveProcess.startListenChans { c <- struct{}{} } // Start the serve routine. go func() { - this.srv.Serve(lst) - for _, s := range this.servers { + serveProcess.srv.Serve(lst) + for _, s := range serveProcess.servers { s.ShutDown() } }() @@ -133,16 +139,16 @@ func (this *ServeProcess) Start() error { // on the graceful Server. This happens when someone // calls 'Stop' on the process. go func() { - <-this.stopChan - log.Info("Close signal sent to server.") - this.srv.Stop(killTime) + <-serveProcess.stopChan + logging.InfoMsg(serveProcess.logger, "Close signal sent to server.") + serveProcess.srv.Stop(killTime) }() // Listen to the servers stop event. It is triggered when // the server has been fully shut down. go func() { - <-this.srv.StopChan() - log.Info("Server stop event fired. Good bye.") - for _, c := range this.stopListenChans { + <-serveProcess.srv.StopChan() + logging.InfoMsg(serveProcess.logger, "Server stop event fired. Good bye.") + for _, c := range serveProcess.stopListenChans { c <- struct{}{} } }() @@ -152,8 +158,8 @@ func (this *ServeProcess) Start() error { // Stop will release the port, process any remaining requests // up until the timeout duration is passed, at which point it // will abort them and shut down. -func (this *ServeProcess) Stop(timeout time.Duration) error { - for _, s := range this.servers { +func (serveProcess *ServeProcess) Stop(timeout time.Duration) error { + for _, s := range serveProcess.servers { s.ShutDown() } toChan := make(chan struct{}) @@ -164,8 +170,8 @@ func (this *ServeProcess) Stop(timeout time.Duration) error { }() } - lChan := this.StopEventChannel() - this.stopChan <- struct{}{} + lChan := serveProcess.StopEventChannel() + serveProcess.stopChan <- struct{}{} select { case <-lChan: return nil @@ -178,9 +184,9 @@ func (this *ServeProcess) Stop(timeout time.Duration) error { // is fired after the Start() function is called, and after // the server has started listening for incoming connections. // An error here . -func (this *ServeProcess) StartEventChannel() <-chan struct{} { +func (serveProcess *ServeProcess) StartEventChannel() <-chan struct{} { lChan := make(chan struct{}, 1) - this.startListenChans = append(this.startListenChans, lChan) + serveProcess.startListenChans = append(serveProcess.startListenChans, lChan) return lChan } @@ -189,15 +195,15 @@ func (this *ServeProcess) StartEventChannel() <-chan struct{} { // timeout has passed. When the timeout has passed it will wait // for confirmation from the http.Server, which normally takes // a very short time (milliseconds). -func (this *ServeProcess) StopEventChannel() <-chan struct{} { +func (serveProcess *ServeProcess) StopEventChannel() <-chan struct{} { lChan := make(chan struct{}, 1) - this.stopListenChans = append(this.stopListenChans, lChan) + serveProcess.stopListenChans = append(serveProcess.stopListenChans, lChan) return lChan } // Creates a new serve process. -func NewServeProcess(config *ServerConfig, servers ...Server) (*ServeProcess, - error) { +func NewServeProcess(config *ServerConfig, logger logging_types.InfoTraceLogger, + servers ...Server) (*ServeProcess, error) { var scfg ServerConfig if config == nil { return nil, fmt.Errorf("Nil passed as server configuration") @@ -208,26 +214,41 @@ func NewServeProcess(config *ServerConfig, servers ...Server) (*ServeProcess, stoppedChan := make(chan struct{}, 1) startListeners := make([]chan struct{}, 0) stopListeners := make([]chan struct{}, 0) - sp := &ServeProcess{&scfg, servers, stopChan, stoppedChan, startListeners, stopListeners, nil} + sp := &ServeProcess{ + config: &scfg, + servers: servers, + stopChan: stopChan, + stoppedChan: stoppedChan, + startListenChans: startListeners, + stopListenChans: stopListeners, + srv: nil, + logger: logging.WithScope(logger, "ServeProcess"), + } return sp, nil } // Used to enable log15 logging instead of the default Gin logging. // This is done mainly because we at Eris uses log15 in other components. -func logHandler(c *gin.Context) { - - path := c.Request.URL.Path +func logHandler(logger logging_types.InfoTraceLogger) gin.HandlerFunc { + logger = logging.WithScope(logger, "ginLogHandler") + return func(c *gin.Context) { - // Process request - c.Next() + path := c.Request.URL.Path - clientIP := c.ClientIP() - method := c.Request.Method - statusCode := c.Writer.Status() - comment := c.Errors.String() + // Process request + c.Next() - log.Info("[GIN] HTTP: "+clientIP, "Code", statusCode, "Method", method, "path", path, "error", comment) + clientIP := c.ClientIP() + method := c.Request.Method + statusCode := c.Writer.Status() + comment := c.Errors.String() + logger.Info("client_ip", clientIP, + "status_code", statusCode, + "method", method, + "path", path, + "error", comment) + } } func NewCORSMiddleware(options CORS) gin.HandlerFunc { diff --git a/server/websocket.go b/server/websocket.go index aff2f94c..ff422fea 100644 --- a/server/websocket.go +++ b/server/websocket.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "github.com/eris-ltd/eris-db/logging" + logging_types "github.com/eris-ltd/eris-db/logging/types" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) @@ -56,6 +58,7 @@ type WebSocketServer struct { sessionManager *SessionManager config *ServerConfig allOrigins bool + logger logging_types.InfoTraceLogger } // Create a new server. @@ -63,69 +66,72 @@ type WebSocketServer struct { // NOTE: This is not the total number of connections allowed - only those that are // upgraded to websockets. Requesting a websocket connection will fail with a 503 if // the server is at capacity. -func NewWebSocketServer(maxSessions uint16, service WebSocketService) *WebSocketServer { +func NewWebSocketServer(maxSessions uint16, service WebSocketService, + logger logging_types.InfoTraceLogger) *WebSocketServer { return &WebSocketServer{ maxSessions: maxSessions, - sessionManager: NewSessionManager(maxSessions, service), + sessionManager: NewSessionManager(maxSessions, service, logger), + logger: logging.WithScope(logger, "WebSocketServer"), } } // Start the server. Adds the handler to the router and sets everything up. -func (this *WebSocketServer) Start(config *ServerConfig, router *gin.Engine) { +func (wsServer *WebSocketServer) Start(config *ServerConfig, router *gin.Engine) { - this.config = config + wsServer.config = config - this.upgrader = websocket.Upgrader{ + wsServer.upgrader = websocket.Upgrader{ ReadBufferSize: int(config.WebSocket.ReadBufferSize), // TODO Will this be enough for massive "get blockchain" requests? WriteBufferSize: int(config.WebSocket.WriteBufferSize), } - this.upgrader.CheckOrigin = func(r *http.Request) bool { return true } - router.GET(config.WebSocket.WebSocketEndpoint, this.handleFunc) - this.running = true + wsServer.upgrader.CheckOrigin = func(r *http.Request) bool { return true } + router.GET(config.WebSocket.WebSocketEndpoint, wsServer.handleFunc) + wsServer.running = true } // Is the server currently running. -func (this *WebSocketServer) Running() bool { - return this.running +func (wsServer *WebSocketServer) Running() bool { + return wsServer.running } // Shut the server down. -func (this *WebSocketServer) ShutDown() { - this.sessionManager.Shutdown() - this.running = false +func (wsServer *WebSocketServer) ShutDown() { + wsServer.sessionManager.Shutdown() + wsServer.running = false } // Get the session-manager. -func (this *WebSocketServer) SessionManager() *SessionManager { - return this.sessionManager +func (wsServer *WebSocketServer) SessionManager() *SessionManager { + return wsServer.sessionManager } // Handler for websocket requests. -func (this *WebSocketServer) handleFunc(c *gin.Context) { +func (wsServer *WebSocketServer) handleFunc(c *gin.Context) { r := c.Request w := c.Writer // Upgrade to websocket. - wsConn, uErr := this.upgrader.Upgrade(w, r, nil) + wsConn, uErr := wsServer.upgrader.Upgrade(w, r, nil) if uErr != nil { - uErrStr := "Failed to upgrade to websocket connection: " + uErr.Error() - http.Error(w, uErrStr, 400) - log.Info(uErrStr) + errMsg := "Failed to upgrade to websocket connection" + http.Error(w, fmt.Sprintf("%s: %s", errMsg, uErr.Error()), 400) + logging.InfoMsg(wsServer.logger, errMsg, "error", uErr) return } - session, cErr := this.sessionManager.createSession(wsConn) + session, cErr := wsServer.sessionManager.createSession(wsConn) if cErr != nil { - cErrStr := "Failed to establish websocket connection: " + cErr.Error() - http.Error(w, cErrStr, 503) - log.Info(cErrStr) + errMsg := "Failed to establish websocket connection" + http.Error(w, fmt.Sprintf("%s: %s", errMsg, cErr.Error()), 503) + logging.InfoMsg(wsServer.logger, errMsg, "error", cErr) return } // Start the connection. - log.Info("New websocket connection.", "sessionId", session.id) + logging.InfoMsg(wsServer.logger, "New websocket connection", + "session_id", session.id) session.Open() } @@ -148,57 +154,59 @@ type WSSession struct { service WebSocketService opened bool closed bool + logger logging_types.InfoTraceLogger } // Write a text message to the client. -func (this *WSSession) Write(msg []byte) error { - if this.closed { - log.Warn("Attempting to write to closed session.", "sessionId", this.id) +func (wsSession *WSSession) Write(msg []byte) error { + if wsSession.closed { + logging.InfoMsg(wsSession.logger, "Attempting to write to closed session.") return fmt.Errorf("Session is closed") } - this.writeChan <- msg + wsSession.writeChan <- msg return nil } // Private. Helper for writing control messages. -func (this *WSSession) write(mt int, payload []byte) error { - this.wsConn.SetWriteDeadline(time.Now().Add(writeWait)) - return this.wsConn.WriteMessage(mt, payload) +func (wsSession *WSSession) write(mt int, payload []byte) error { + wsSession.wsConn.SetWriteDeadline(time.Now().Add(writeWait)) + return wsSession.wsConn.WriteMessage(mt, payload) } // Get the session id number. -func (this *WSSession) Id() uint { - return this.id +func (wsSession *WSSession) Id() uint { + return wsSession.id } // Starts the read and write pumps. Blocks on the former. // Notifies all the observers. -func (this *WSSession) Open() { - this.opened = true - this.sessionManager.notifyOpened(this) - go this.writePump() - this.readPump() +func (wsSession *WSSession) Open() { + wsSession.opened = true + wsSession.sessionManager.notifyOpened(wsSession) + go wsSession.writePump() + wsSession.readPump() } // Closes the net connection and cleans up. Notifies all the observers. -func (this *WSSession) Close() { - if !this.closed { - this.closed = true - this.wsConn.Close() - this.sessionManager.removeSession(this.id) - log.Info("Closing websocket connection.", "sessionId", this.id, "remaining", len(this.sessionManager.activeSessions)) - this.sessionManager.notifyClosed(this) +func (wsSession *WSSession) Close() { + if !wsSession.closed { + wsSession.closed = true + wsSession.wsConn.Close() + wsSession.sessionManager.removeSession(wsSession.id) + logging.InfoMsg(wsSession.logger, "Closing websocket connection.", + "remaining_active_sessions", len(wsSession.sessionManager.activeSessions)) + wsSession.sessionManager.notifyClosed(wsSession) } } // Has the session been opened? -func (this *WSSession) Opened() bool { - return this.opened +func (wsSession *WSSession) Opened() bool { + return wsSession.opened } // Has the session been closed? -func (this *WSSession) Closed() bool { - return this.closed +func (wsSession *WSSession) Closed() bool { + return wsSession.closed } // Pump debugging @@ -210,7 +218,7 @@ var wpm *sync.Mutex = &sync.Mutex{} */ // Read loop. Will terminate on a failed read. -func (this *WSSession) readPump() { +func (wsSession *WSSession) readPump() { /* rpm.Lock() rp++ @@ -223,38 +231,40 @@ func (this *WSSession) readPump() { rpm.Unlock() }() */ - this.wsConn.SetReadLimit(maxMessageSize) + wsSession.wsConn.SetReadLimit(maxMessageSize) // this.wsConn.SetReadDeadline(time.Now().Add(pongWait)) // this.wsConn.SetPongHandler(func(string) error { this.wsConn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { // Read - msgType, msg, err := this.wsConn.ReadMessage() + msgType, msg, err := wsSession.wsConn.ReadMessage() // Read error. if err != nil { // Socket could have been gracefully closed, so not really an error. - log.Info("Socket closed. Removing.", "error", err.Error()) - this.writeCloseChan <- struct{}{} + logging.InfoMsg(wsSession.logger, + "Socket closed. Removing.", "error", err) + wsSession.writeCloseChan <- struct{}{} return } if msgType != websocket.TextMessage { - log.Info("Receiving non text-message from client, closing.") - this.writeCloseChan <- struct{}{} + logging.InfoMsg(wsSession.logger, + "Receiving non text-message from client, closing.") + wsSession.writeCloseChan <- struct{}{} return } go func() { // Process the request. - this.service.Process(msg, this) + wsSession.service.Process(msg, wsSession) }() } } // Writes messages coming in on the write channel. Will terminate on failed writes, // if pings are not responded to, or if a message comes in on the write close channel. -func (this *WSSession) writePump() { +func (wsSession *WSSession) writePump() { /* wpm.Lock() wp++ @@ -271,23 +281,24 @@ func (this *WSSession) writePump() { defer func() { // ticker.Stop() - this.Close() + wsSession.Close() }() // Write loop. Blocks while waiting for data to come in over a channel. for { select { // Write request. - case msg := <-this.writeChan: + case msg := <-wsSession.writeChan: // Write the bytes to the socket. - err := this.wsConn.WriteMessage(websocket.TextMessage, msg) + err := wsSession.wsConn.WriteMessage(websocket.TextMessage, msg) if err != nil { // Could be due to the socket being closed so not really an error. - log.Info("Writing to socket failed. Closing.") + logging.InfoMsg(wsSession.logger, + "Writing to socket failed. Closing.") return } - case <-this.writeCloseChan: + case <-wsSession.writeCloseChan: return // Ticker run out. Time for another ping message. /* @@ -311,10 +322,12 @@ type SessionManager struct { service WebSocketService openEventChans []chan *WSSession closeEventChans []chan *WSSession + logger logging_types.InfoTraceLogger } // Create a new WebsocketManager. -func NewSessionManager(maxSessions uint16, wss WebSocketService) *SessionManager { +func NewSessionManager(maxSessions uint16, wss WebSocketService, + logger logging_types.InfoTraceLogger) *SessionManager { return &SessionManager{ maxSessions: maxSessions, activeSessions: make(map[uint]*WSSession), @@ -323,24 +336,25 @@ func NewSessionManager(maxSessions uint16, wss WebSocketService) *SessionManager service: wss, openEventChans: []chan *WSSession{}, closeEventChans: []chan *WSSession{}, + logger: logging.WithScope(logger, "SessionManager"), } } // TODO -func (this *SessionManager) Shutdown() { - this.activeSessions = nil +func (sessionManager *SessionManager) Shutdown() { + sessionManager.activeSessions = nil } // Add a listener to session open events. -func (this *SessionManager) SessionOpenEventChannel() <-chan *WSSession { +func (sessionManager *SessionManager) SessionOpenEventChannel() <-chan *WSSession { lChan := make(chan *WSSession, 1) - this.openEventChans = append(this.openEventChans, lChan) + sessionManager.openEventChans = append(sessionManager.openEventChans, lChan) return lChan } // Remove a listener from session open events. -func (this *SessionManager) RemoveSessionOpenEventChannel(lChan chan *WSSession) bool { - ec := this.openEventChans +func (sessionManager *SessionManager) RemoveSessionOpenEventChannel(lChan chan *WSSession) bool { + ec := sessionManager.openEventChans if len(ec) == 0 { return false } @@ -354,15 +368,15 @@ func (this *SessionManager) RemoveSessionOpenEventChannel(lChan chan *WSSession) } // Add a listener to session close events -func (this *SessionManager) SessionCloseEventChannel() <-chan *WSSession { +func (sessionManager *SessionManager) SessionCloseEventChannel() <-chan *WSSession { lChan := make(chan *WSSession, 1) - this.closeEventChans = append(this.closeEventChans, lChan) + sessionManager.closeEventChans = append(sessionManager.closeEventChans, lChan) return lChan } // Remove a listener from session close events. -func (this *SessionManager) RemoveSessionCloseEventChannel(lChan chan *WSSession) bool { - ec := this.closeEventChans +func (sessionManager *SessionManager) RemoveSessionCloseEventChannel(lChan chan *WSSession) bool { + ec := sessionManager.closeEventChans if len(ec) == 0 { return false } @@ -376,55 +390,57 @@ func (this *SessionManager) RemoveSessionCloseEventChannel(lChan chan *WSSession } // Used to notify all observers that a new session was opened. -func (this *SessionManager) notifyOpened(session *WSSession) { - for _, lChan := range this.openEventChans { +func (sessionManager *SessionManager) notifyOpened(session *WSSession) { + for _, lChan := range sessionManager.openEventChans { lChan <- session } } // Used to notify all observers that a new session was closed. -func (this *SessionManager) notifyClosed(session *WSSession) { - for _, lChan := range this.closeEventChans { +func (sessionManager *SessionManager) notifyClosed(session *WSSession) { + for _, lChan := range sessionManager.closeEventChans { lChan <- session } } // Creates a new session and adds it to the manager. -func (this *SessionManager) createSession(wsConn *websocket.Conn) (*WSSession, error) { +func (sessionManager *SessionManager) createSession(wsConn *websocket.Conn) (*WSSession, error) { // Check that the capacity hasn't been exceeded. - this.mtx.Lock() - defer this.mtx.Unlock() - if this.atCapacity() { + sessionManager.mtx.Lock() + defer sessionManager.mtx.Unlock() + if sessionManager.atCapacity() { return nil, fmt.Errorf("Already at capacity") } // Create and start - newId, _ := this.idPool.GetId() + newId, _ := sessionManager.idPool.GetId() conn := &WSSession{ - sessionManager: this, + sessionManager: sessionManager, id: newId, wsConn: wsConn, writeChan: make(chan []byte, maxMessageSize), writeCloseChan: make(chan struct{}), - service: this.service, + service: sessionManager.service, + logger: logging.WithScope(sessionManager.logger, "WSSession"). + With("session_id", newId), } - this.activeSessions[conn.id] = conn + sessionManager.activeSessions[conn.id] = conn return conn, nil } // Remove a session from the list. -func (this *SessionManager) removeSession(id uint) { - this.mtx.Lock() - defer this.mtx.Unlock() +func (sessionManager *SessionManager) removeSession(id uint) { + sessionManager.mtx.Lock() + defer sessionManager.mtx.Unlock() // Check that it exists. - _, ok := this.activeSessions[id] + _, ok := sessionManager.activeSessions[id] if ok { - delete(this.activeSessions, id) - this.idPool.ReleaseId(id) + delete(sessionManager.activeSessions, id) + sessionManager.idPool.ReleaseId(id) } } // True if the number of active connections is at the maximum. -func (this *SessionManager) atCapacity() bool { - return len(this.activeSessions) >= int(this.maxSessions) +func (sessionManager *SessionManager) atCapacity() bool { + return len(sessionManager.activeSessions) >= int(sessionManager.maxSessions) } -- GitLab