From 5a6a8d36a1e8fd1735752fc8b4a9d289cd144e23 Mon Sep 17 00:00:00 2001 From: Silas Davis <silas@monax.io> Date: Thu, 15 Mar 2018 10:48:23 +0000 Subject: [PATCH] Reintroduce execution state saving to database and loading from database Signed-off-by: Silas Davis <silas@monax.io> --- .gitignore | 4 +- blockchain/blockchain.go | 98 +++++- consensus/tendermint/abci/app.go | 10 +- consensus/tendermint/query/node_view.go | 6 +- consensus/tendermint/tendermint.go | 54 ++- core/kernel.go | 104 ++++-- core/kernel_test.go | 74 ++++- event/emitter.go | 4 +- execution/execution.go | 15 +- execution/execution_test.go | 2 +- execution/state.go | 346 +++++++------------- execution/state_test.go | 30 +- genesis/deterministic_genesis.go | 6 +- server/server.go => process/process.go | 10 +- rpc/service.go | 1 + rpc/tm/integration/shared.go | 28 +- rpc/tm/integration/shared_test.go | 2 + rpc/tm/integration/websocket_client_test.go | 59 ++-- txs/go_wire_codec.go | 12 +- 19 files changed, 516 insertions(+), 349 deletions(-) rename server/server.go => process/process.go (78%) diff --git a/.gitignore b/.gitignore index 8dc689c1..062ac40a 100644 --- a/.gitignore +++ b/.gitignore @@ -13,4 +13,6 @@ debug .idea .vscode -.gopath_bos \ No newline at end of file +.gopath_bos +burrow.toml + diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index c0816017..6cda8da5 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -15,14 +15,21 @@ package blockchain import ( - "time" - + "bytes" + "encoding/json" + "fmt" "sync" + "time" acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/genesis" + "github.com/hyperledger/burrow/logging" + logging_types "github.com/hyperledger/burrow/logging/types" + dbm "github.com/tendermint/tmlibs/db" ) +var stateKey = []byte("BlockchainState") + // Immutable Root of blockchain type Root interface { // ChainID precomputed from GenesisDoc @@ -56,7 +63,7 @@ type Blockchain interface { type MutableBlockchain interface { Blockchain - CommitBlock(blockTime time.Time, blockHash, appHash []byte) + CommitBlock(blockTime time.Time, blockHash, appHash []byte) error } type root struct { @@ -74,6 +81,7 @@ type tip struct { type blockchain struct { sync.RWMutex + db dbm.DB *root *tip validators []acm.Validator @@ -84,8 +92,38 @@ var _ Tip = &blockchain{} var _ Blockchain = &blockchain{} var _ MutableBlockchain = &blockchain{} +type PersistedState struct { + AppHashAfterLastBlock []byte + LastBlockHeight uint64 + GenesisDoc genesis.GenesisDoc +} + +func LoadOrNewBlockchain(db dbm.DB, genesisDoc *genesis.GenesisDoc, + logger logging_types.InfoTraceLogger) (*blockchain, error) { + + logger = logging.WithScope(logger, "LoadOrNewBlockchain") + logging.InfoMsg(logger, "Trying to load blockchain state from database", + "database_key", stateKey) + blockchain, err := LoadBlockchain(db) + if err != nil { + return nil, fmt.Errorf("error loading blockchain state from database: %v", err) + } + if blockchain != nil { + dbHash := blockchain.genesisDoc.Hash() + argHash := genesisDoc.Hash() + if !bytes.Equal(dbHash, argHash) { + return nil, fmt.Errorf("GenesisDoc passed to LoadOrNewBlockchain has hash: 0x%X, which does not "+ + "match the one found in database: 0x%X", argHash, dbHash) + } + return blockchain, nil + } + + logging.InfoMsg(logger, "No existing blockchain state found in database, making new blockchain") + return NewBlockchain(db, genesisDoc), nil +} + // Pointer to blockchain state initialised from genesis -func NewBlockchain(genesisDoc *genesis.GenesisDoc) *blockchain { +func NewBlockchain(db dbm.DB, genesisDoc *genesis.GenesisDoc) *blockchain { var validators []acm.Validator for _, gv := range genesisDoc.Validators { validators = append(validators, acm.ConcreteValidator{ @@ -95,6 +133,7 @@ func NewBlockchain(genesisDoc *genesis.GenesisDoc) *blockchain { } root := NewRoot(genesisDoc) return &blockchain{ + db: db, root: root, tip: &tip{ lastBlockTime: root.genesisDoc.GenesisTime, @@ -104,6 +143,21 @@ func NewBlockchain(genesisDoc *genesis.GenesisDoc) *blockchain { } } +func LoadBlockchain(db dbm.DB) (*blockchain, error) { + buf := db.Get(stateKey) + if len(buf) == 0 { + return nil, nil + } + persistedState, err := Decode(buf) + if err != nil { + return nil, err + } + blockchain := NewBlockchain(db, &persistedState.GenesisDoc) + blockchain.lastBlockHeight = persistedState.LastBlockHeight + blockchain.appHashAfterLastBlock = persistedState.AppHashAfterLastBlock + return blockchain, nil +} + func NewRoot(genesisDoc *genesis.GenesisDoc) *root { return &root{ chainID: genesisDoc.ChainID(), @@ -122,13 +176,25 @@ func NewTip(lastBlockHeight uint64, lastBlockTime time.Time, lastBlockHash []byt } } -func (bc *blockchain) CommitBlock(blockTime time.Time, blockHash, appHash []byte) { +func (bc *blockchain) CommitBlock(blockTime time.Time, blockHash, appHash []byte) error { bc.Lock() defer bc.Unlock() bc.lastBlockHeight += 1 bc.lastBlockTime = blockTime bc.lastBlockHash = blockHash bc.appHashAfterLastBlock = appHash + return bc.save() +} + +func (bc *blockchain) save() error { + if bc.db != nil { + encodedState, err := bc.Encode() + if err != nil { + return err + } + bc.db.SetSync(stateKey, encodedState) + } + return nil } func (bc *blockchain) Root() Root { @@ -152,6 +218,28 @@ func (bc *blockchain) Validators() []acm.Validator { return vs } +func (bc *blockchain) Encode() ([]byte, error) { + persistedState := &PersistedState{ + GenesisDoc: bc.genesisDoc, + AppHashAfterLastBlock: bc.appHashAfterLastBlock, + LastBlockHeight: bc.lastBlockHeight, + } + encodedState, err := json.Marshal(persistedState) + if err != nil { + return nil, err + } + return encodedState, nil +} + +func Decode(encodedState []byte) (*PersistedState, error) { + persistedState := new(PersistedState) + err := json.Unmarshal(encodedState, persistedState) + if err != nil { + return nil, err + } + return persistedState, nil +} + func (r *root) ChainID() string { return r.chainID } diff --git a/consensus/tendermint/abci/app.go b/consensus/tendermint/abci/app.go index 74be6be9..9a3a1db2 100644 --- a/consensus/tendermint/abci/app.go +++ b/consensus/tendermint/abci/app.go @@ -175,7 +175,7 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { if err != nil { return abci_types.ResponseCommit{ Code: codes.CommitErrorCode, - Log: fmt.Sprintf("Could not commit block: %s", err), + Log: fmt.Sprintf("Could not commit transactions in block to execution state: %s", err), } } // Just kill the cache - it is badly implemented @@ -185,7 +185,13 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { app.checker.Reset() // Commit to our blockchain state - app.blockchain.CommitBlock(time.Unix(int64(app.block.Header.Time), 0), app.block.Hash, appHash) + err = app.blockchain.CommitBlock(time.Unix(int64(app.block.Header.Time), 0), app.block.Hash, appHash) + if err != nil { + return abci_types.ResponseCommit{ + Code: codes.CommitErrorCode, + Log: fmt.Sprintf("Could not commit block to blockchain state: %s", err), + } + } // Perform a sanity check our block height if app.blockchain.LastBlockHeight() != uint64(app.block.Header.Height) { diff --git a/consensus/tendermint/query/node_view.go b/consensus/tendermint/query/node_view.go index b0ecc2bd..2d33ea53 100644 --- a/consensus/tendermint/query/node_view.go +++ b/consensus/tendermint/query/node_view.go @@ -4,10 +4,10 @@ import ( "fmt" acm "github.com/hyperledger/burrow/account" + "github.com/hyperledger/burrow/consensus/tendermint" "github.com/hyperledger/burrow/txs" "github.com/tendermint/tendermint/consensus" ctypes "github.com/tendermint/tendermint/consensus/types" - "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" ) @@ -35,11 +35,11 @@ type NodeView interface { } type nodeView struct { - tmNode *node.Node + tmNode *tendermint.Node txDecoder txs.Decoder } -func NewNodeView(tmNode *node.Node, txDecoder txs.Decoder) NodeView { +func NewNodeView(tmNode *tendermint.Node, txDecoder txs.Decoder) NodeView { return &nodeView{ tmNode: tmNode, txDecoder: txDecoder, diff --git a/consensus/tendermint/tendermint.go b/consensus/tendermint/tendermint.go index 8f38c5be..26c61a73 100644 --- a/consensus/tendermint/tendermint.go +++ b/consensus/tendermint/tendermint.go @@ -3,8 +3,11 @@ package tendermint import ( "fmt" + "context" + bcm "github.com/hyperledger/burrow/blockchain" "github.com/hyperledger/burrow/consensus/tendermint/abci" + "github.com/hyperledger/burrow/event" "github.com/hyperledger/burrow/execution" "github.com/hyperledger/burrow/genesis" "github.com/hyperledger/burrow/logging/structure" @@ -15,8 +18,16 @@ import ( "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/proxy" tm_types "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" ) +type Node struct { + *node.Node + closers []interface { + Close() + } +} + func NewNode( conf *config.Config, privValidator tm_types.PrivValidator, @@ -24,23 +35,29 @@ func NewNode( blockchain bcm.MutableBlockchain, checker execution.BatchExecutor, committer execution.BatchCommitter, - logger logging_types.InfoTraceLogger) (*node.Node, error) { + logger logging_types.InfoTraceLogger) (*Node, error) { + var err error // disable Tendermint's RPC conf.RPC.ListenAddress = "" app := abci.NewApp(blockchain, checker, committer, logger) - return node.NewNode(conf, privValidator, + nde := &Node{} + nde.Node, err = node.NewNode(conf, privValidator, proxy.NewLocalClientCreator(app), func() (*tm_types.GenesisDoc, error) { return genesisDoc, nil }, - node.DefaultDBProvider, + nde.DBProvider, NewLogger(logger.WithPrefix(structure.ComponentKey, "Tendermint"). With(structure.ScopeKey, "tendermint.NewNode"))) + if err != nil { + return nil, err + } + return nde, nil } -func BroadcastTxAsyncFunc(validator *node.Node, txEncoder txs.Encoder) func(tx txs.Tx, +func BroadcastTxAsyncFunc(validator *Node, txEncoder txs.Encoder) func(tx txs.Tx, callback func(res *abci_types.Response)) error { return func(tx txs.Tx, callback func(res *abci_types.Response)) error { @@ -57,6 +74,19 @@ func BroadcastTxAsyncFunc(validator *node.Node, txEncoder txs.Encoder) func(tx t } } +// Since Tendermint doesn't close its DB connections +func (n *Node) DBProvider(ctx *node.DBContext) (dbm.DB, error) { + db := dbm.NewDB(ctx.ID, ctx.Config.DBBackend, ctx.Config.DBDir()) + n.closers = append(n.closers, db) + return db, nil +} + +func (n *Node) Close() { + for _, closer := range n.closers { + closer.Close() + } +} + func DeriveGenesisDoc(burrowGenesisDoc *genesis.GenesisDoc) *tm_types.GenesisDoc { validators := make([]tm_types.GenesisValidator, len(burrowGenesisDoc.Validators)) for i, validator := range burrowGenesisDoc.Validators { @@ -73,3 +103,19 @@ func DeriveGenesisDoc(burrowGenesisDoc *genesis.GenesisDoc) *tm_types.GenesisDoc AppHash: burrowGenesisDoc.Hash(), } } + +func SubscribeNewBlock(ctx context.Context, subscribable event.Subscribable, subscriber string, + ch chan<- *tm_types.EventDataNewBlock) error { + query := event.QueryForEventID(tm_types.EventNewBlock) + + return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) bool { + tmEventData, ok := message.(tm_types.TMEventData) + if ok { + eventDataNewBlock, ok := tmEventData.Unwrap().(tm_types.EventDataNewBlock) + if ok { + ch <- &eventDataNewBlock + } + } + return true + }) +} diff --git a/core/kernel.go b/core/kernel.go index 3562b730..9d52342b 100644 --- a/core/kernel.go +++ b/core/kernel.go @@ -23,6 +23,8 @@ import ( "syscall" "time" + "bytes" + bcm "github.com/hyperledger/burrow/blockchain" "github.com/hyperledger/burrow/consensus/tendermint" "github.com/hyperledger/burrow/consensus/tendermint/query" @@ -32,11 +34,11 @@ import ( "github.com/hyperledger/burrow/logging" "github.com/hyperledger/burrow/logging/structure" logging_types "github.com/hyperledger/burrow/logging/types" + "github.com/hyperledger/burrow/process" "github.com/hyperledger/burrow/rpc" "github.com/hyperledger/burrow/rpc/tm" "github.com/hyperledger/burrow/rpc/v0" v0_server "github.com/hyperledger/burrow/rpc/v0/server" - "github.com/hyperledger/burrow/server" "github.com/hyperledger/burrow/txs" tm_config "github.com/tendermint/tendermint/config" tm_types "github.com/tendermint/tendermint/types" @@ -48,28 +50,37 @@ const ServerShutdownTimeoutMilliseconds = 1000 // Kernel is the root structure of Burrow type Kernel struct { - emitter event.Emitter - service rpc.Service - serverLaunchers []server.Launcher - servers map[string]server.Server - logger logging_types.InfoTraceLogger - shutdownNotify chan struct{} - shutdownOnce sync.Once + // Expose these public-facing interfaces to allow programmatic extension of the Kernel by other projects + Emitter event.Emitter + Service rpc.Service + Launchers []process.Launcher + Logger logging_types.InfoTraceLogger + processes map[string]process.Process + shutdownNotify chan struct{} + shutdownOnce sync.Once } -func NewKernel(ctx context.Context, privValidator tm_types.PrivValidator, genesisDoc *genesis.GenesisDoc, tmConf *tm_config.Config, - rpcConfig *rpc.RPCConfig, logger logging_types.InfoTraceLogger) (*Kernel, error) { +func NewKernel(ctx context.Context, privValidator tm_types.PrivValidator, genesisDoc *genesis.GenesisDoc, + tmConf *tm_config.Config, rpcConfig *rpc.RPCConfig, logger logging_types.InfoTraceLogger) (*Kernel, error) { logger = logging.WithScope(logger, "NewKernel") stateDB := dbm.NewDB("burrow_state", dbm.GoLevelDBBackendStr, tmConf.DBDir()) - state, err := execution.MakeGenesisState(stateDB, genesisDoc) + state, err := execution.LoadOrMakeGenesisState(stateDB, genesisDoc, logger) + if err != nil { + return nil, fmt.Errorf("error making or loading genesis state: %v", err) + } + + blockchain, err := bcm.LoadOrNewBlockchain(stateDB, genesisDoc, logger) if err != nil { - return nil, fmt.Errorf("could not make genesis state: %v", err) + return nil, fmt.Errorf("error creating or loading blockchain state: %v", err) } - state.Save() - blockchain := bcm.NewBlockchain(genesisDoc) + // These should be in sync unless we are at the genesis block + if blockchain.LastBlockHeight() > 0 && !bytes.Equal(blockchain.AppHashAfterLastBlock(), state.LastSavedHash()) { + return nil, fmt.Errorf("blockchain app hash: 0x%X does not match execution state hash: 0x%X", + blockchain.AppHashAfterLastBlock(), state.LastSavedHash()) + } tmGenesisDoc := tendermint.DeriveGenesisDoc(genesisDoc) checker := execution.NewBatchChecker(state, tmGenesisDoc.ChainID, blockchain, logger) @@ -93,10 +104,20 @@ func NewKernel(ctx context.Context, privValidator tm_types.PrivValidator, genesi // which increments the creator's account Sequence and SendTxs service := rpc.NewService(ctx, state, state, emitter, blockchain, transactor, query.NewNodeView(tmNode, txCodec), logger) - launchers := []server.Launcher{ + launchers := []process.Launcher{ + { + Name: "Database", + Launch: func() (process.Process, error) { + // Just close database + return process.ShutdownFunc(func(ctx context.Context) error { + stateDB.Close() + return nil + }), nil + }, + }, { Name: "Tendermint", - Launch: func() (server.Server, error) { + Launch: func() (process.Process, error) { err := tmNode.Start() if err != nil { return nil, fmt.Errorf("error starting Tendermint node: %v", err) @@ -109,24 +130,37 @@ func NewKernel(ctx context.Context, privValidator tm_types.PrivValidator, genesi if err != nil { return nil, fmt.Errorf("could not subscribe to Tendermint events: %v", err) } - return server.ShutdownFunc(func(ctx context.Context) error { - return tmNode.Stop() + return process.ShutdownFunc(func(ctx context.Context) error { + err := tmNode.Stop() + if err != nil { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-tmNode.Quit: + logging.InfoMsg(logger, "Tendermint Node has quit, closing DB connections...") + // Close tendermint database connections using our wrapper + tmNode.Close() + return nil + } + return err }), nil }, }, { Name: "RPC/tm", - Launch: func() (server.Server, error) { + Launch: func() (process.Process, error) { listener, err := tm.StartServer(service, "/websocket", rpcConfig.TM.ListenAddress, emitter, logger) if err != nil { return nil, err } - return server.FromListeners(listener), nil + return process.FromListeners(listener), nil }, }, { Name: "RPC/V0", - Launch: func() (server.Server, error) { + Launch: func() (process.Process, error) { codec := v0.NewTCodec() jsonServer := v0.NewJSONServer(v0.NewJSONService(codec, service, logger)) websocketServer := v0_server.NewWebSocketServer(rpcConfig.V0.Server.WebSocket.MaxWebSocketSessions, @@ -146,24 +180,24 @@ func NewKernel(ctx context.Context, privValidator tm_types.PrivValidator, genesi } return &Kernel{ - emitter: emitter, - service: service, - serverLaunchers: launchers, - servers: make(map[string]server.Server), - logger: logger, - shutdownNotify: make(chan struct{}), + Emitter: emitter, + Service: service, + Launchers: launchers, + processes: make(map[string]process.Process), + Logger: logger, + shutdownNotify: make(chan struct{}), }, nil } // Boot the kernel starting Tendermint and RPC layers func (kern *Kernel) Boot() error { - for _, launcher := range kern.serverLaunchers { + for _, launcher := range kern.Launchers { srvr, err := launcher.Launch() if err != nil { return fmt.Errorf("error launching %s server: %v", launcher.Name, err) } - kern.servers[launcher.Name] = srvr + kern.processes[launcher.Name] = srvr } go kern.supervise() return nil @@ -182,7 +216,7 @@ func (kern *Kernel) supervise() { signals := make(chan os.Signal, 1) signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) sig := <-signals - logging.InfoMsg(kern.logger, fmt.Sprintf("Caught %v signal so shutting down", sig), + logging.InfoMsg(kern.Logger, fmt.Sprintf("Caught %v signal so shutting down", sig), "signal", sig.String()) kern.Shutdown(context.Background()) } @@ -190,15 +224,15 @@ func (kern *Kernel) supervise() { // Stop the kernel allowing for a graceful shutdown of components in order func (kern *Kernel) Shutdown(ctx context.Context) (err error) { kern.shutdownOnce.Do(func() { - logger := logging.WithScope(kern.logger, "Shutdown") + logger := logging.WithScope(kern.Logger, "Shutdown") logging.InfoMsg(logger, "Attempting graceful shutdown...") logging.InfoMsg(logger, "Shutting down servers") ctx, cancel := context.WithTimeout(ctx, ServerShutdownTimeoutMilliseconds*time.Millisecond) defer cancel() // Shutdown servers in reverse order to boot - for i := len(kern.serverLaunchers) - 1; i >= 0; i-- { - name := kern.serverLaunchers[i].Name - srvr, ok := kern.servers[name] + for i := len(kern.Launchers) - 1; i >= 0; i-- { + name := kern.Launchers[i].Name + srvr, ok := kern.processes[name] if ok { logging.InfoMsg(logger, "Shutting down server", "server_name", name) sErr := srvr.Shutdown(ctx) @@ -213,7 +247,7 @@ func (kern *Kernel) Shutdown(ctx context.Context) (err error) { } } logging.InfoMsg(logger, "Shutdown complete") - logging.Sync(kern.logger) + logging.Sync(kern.Logger) // We don't want to wait for them, but yielding for a cooldown Let other goroutines flush // potentially interesting final output (e.g. log messages) time.Sleep(time.Millisecond * CooldownMilliseconds) diff --git a/core/kernel_test.go b/core/kernel_test.go index 0ff68e0f..dd14a02a 100644 --- a/core/kernel_test.go +++ b/core/kernel_test.go @@ -5,12 +5,19 @@ import ( "os" "testing" + "time" + + "fmt" + + "github.com/hyperledger/burrow/consensus/tendermint" "github.com/hyperledger/burrow/consensus/tendermint/validator" "github.com/hyperledger/burrow/genesis" "github.com/hyperledger/burrow/logging/loggers" + logging_types "github.com/hyperledger/burrow/logging/types" "github.com/hyperledger/burrow/rpc" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" tm_config "github.com/tendermint/tendermint/config" + tm_types "github.com/tendermint/tendermint/types" ) const testDir = "./test_scratch/kernel_test" @@ -20,14 +27,65 @@ func TestBootThenShutdown(t *testing.T) { os.MkdirAll(testDir, 0777) os.Chdir(testDir) tmConf := tm_config.DefaultConfig() - //logger, _ := lifecycle.NewStdErrLogger() + //logger, _, _ := lifecycle.NewStdErrLogger() logger := loggers.NewNoopInfoTraceLogger() - genesisDoc, privateAccounts := genesis.NewDeterministicGenesis(123).GenesisDoc(1, true, 1000, 1, true, 1000) - privValidator := validator.NewPrivValidatorMemory(privateAccounts[0], privateAccounts[0]) + genesisDoc, _, privateValidators := genesis.NewDeterministicGenesis(123).GenesisDoc(1, true, 1000, 1, true, 1000) + privValidator := validator.NewPrivValidatorMemory(privateValidators[0], privateValidators[0]) + assert.NoError(t, bootWaitBlocksShutdown(privValidator, genesisDoc, tmConf, logger, nil)) +} + +func TestBootShutdownResume(t *testing.T) { + os.RemoveAll(testDir) + os.MkdirAll(testDir, 0777) + os.Chdir(testDir) + tmConf := tm_config.DefaultConfig() + //logger, _, _ := lifecycle.NewStdErrLogger() + logger := loggers.NewNoopInfoTraceLogger() + genesisDoc, _, privateValidators := genesis.NewDeterministicGenesis(123).GenesisDoc(1, true, 1000, 1, true, 1000) + privValidator := validator.NewPrivValidatorMemory(privateValidators[0], privateValidators[0]) + + i := int64(1) + // asserts we get a consecutive run of blocks + blockChecker := func(block *tm_types.EventDataNewBlock) { + assert.Equal(t, i, block.Block.Height) + i++ + } + // First run + assert.NoError(t, bootWaitBlocksShutdown(privValidator, genesisDoc, tmConf, logger, blockChecker)) + // Resume and check we pick up where we left off + assert.NoError(t, bootWaitBlocksShutdown(privValidator, genesisDoc, tmConf, logger, blockChecker)) + // Resuming with mismatched genesis should fail + genesisDoc.Salt = []byte("foo") + assert.Error(t, bootWaitBlocksShutdown(privValidator, genesisDoc, tmConf, logger, blockChecker)) +} + +func bootWaitBlocksShutdown(privValidator tm_types.PrivValidator, genesisDoc *genesis.GenesisDoc, + tmConf *tm_config.Config, logger logging_types.InfoTraceLogger, + blockChecker func(block *tm_types.EventDataNewBlock)) error { + kern, err := NewKernel(context.Background(), privValidator, genesisDoc, tmConf, rpc.DefaultRPCConfig(), logger) - require.NoError(t, err) + if err != nil { + return err + } + err = kern.Boot() - require.NoError(t, err) - err = kern.Shutdown(context.Background()) - require.NoError(t, err) + if err != nil { + return err + } + + ch := make(chan *tm_types.EventDataNewBlock) + tendermint.SubscribeNewBlock(context.Background(), kern.Emitter, "TestBootShutdownResume", ch) + for i := 0; i < 2; i++ { + select { + case <-time.After(2 * time.Second): + if err != nil { + return fmt.Errorf("timed out waiting for block") + } + case ednb := <-ch: + if blockChecker != nil { + blockChecker(ednb) + } + } + } + return kern.Shutdown(context.Background()) } diff --git a/event/emitter.go b/event/emitter.go index 73996b7c..d41d0fd1 100644 --- a/event/emitter.go +++ b/event/emitter.go @@ -23,7 +23,7 @@ import ( "github.com/hyperledger/burrow/logging/structure" logging_types "github.com/hyperledger/burrow/logging/types" - "github.com/hyperledger/burrow/server" + "github.com/hyperledger/burrow/process" "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/pubsub" ) @@ -45,7 +45,7 @@ type Publisher interface { type Emitter interface { Subscribable Publisher - server.Server + process.Process } // The events struct has methods for working with events. diff --git a/execution/execution.go b/execution/execution.go index 2f0c243f..0689e6c9 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -96,7 +96,7 @@ func newExecutor(runCall bool, blockCache: NewBlockCache(state), publisher: eventFireable, eventCache: event.NewEventCache(eventFireable), - logger: logger.With(structure.ComponentKey, "Execution"), + logger: logger.With(structure.ComponentKey, "Executor"), } } @@ -136,10 +136,13 @@ func (exe *executor) Commit() ([]byte, error) { // sync the cache exe.blockCache.Sync() // save state to disk - exe.state.Save() + err := exe.state.Save() + if err != nil { + return nil, err + } // flush events to listeners (XXX: note issue with blocking) exe.eventCache.Flush() - return exe.state.Hash(), nil + return exe.state.LastSavedHash(), nil } func (exe *executor) Reset() error { @@ -156,7 +159,10 @@ func (exe *executor) Execute(tx txs.Tx) (err error) { err = fmt.Errorf("recovered from panic in executor.Execute(%s): %v", tx.String(), r) } }() - logger := logging.WithScope(exe.logger, "executor.Execute(tx txs.Tx)") + txHash := txs.TxHash(exe.chainID, tx) + logger := logging.WithScope(exe.logger, "executor.Execute(tx txs.Tx)").With( + "run_call", exe.runCall, + "tx_hash", txHash) logging.TraceMsg(logger, "Executing transaction", "tx", tx.String()) // TODO: do something with fees fees := uint64(0) @@ -213,7 +219,6 @@ func (exe *executor) Execute(tx txs.Tx) (err error) { // if the exe.eventCache is nil, nothing will happen if exe.eventCache != nil { - txHash := txs.TxHash(exe.chainID, tx) for _, i := range tx.Inputs { events.PublishAccountInput(exe.eventCache, i.Address, txHash, tx, nil, "") } diff --git a/execution/execution_test.go b/execution/execution_test.go index 2362c3f4..b1df5684 100644 --- a/execution/execution_test.go +++ b/execution/execution_test.go @@ -121,7 +121,7 @@ func makeUsers(n int) []acm.PrivateAccount { } func makeExecutor(state *State) *executor { - return newExecutor(true, state, testChainID, bcm.NewBlockchain(testGenesisDoc), event.NewEmitter(logger), + return newExecutor(true, state, testChainID, bcm.NewBlockchain(nil, testGenesisDoc), event.NewEmitter(logger), logger) } diff --git a/execution/state.go b/execution/state.go index 6928480b..4564c146 100644 --- a/execution/state.go +++ b/execution/state.go @@ -16,6 +16,7 @@ package execution import ( "bytes" + "encoding/json" "fmt" "io" "sync" @@ -24,6 +25,8 @@ import ( acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/binary" "github.com/hyperledger/burrow/genesis" + "github.com/hyperledger/burrow/logging" + logging_types "github.com/hyperledger/burrow/logging/types" "github.com/hyperledger/burrow/permission" ptypes "github.com/hyperledger/burrow/permission" "github.com/hyperledger/burrow/txs" @@ -34,12 +37,8 @@ import ( ) var ( - stateKey = []byte("stateKey") - minBondAmount = int64(1) // TODO adjust - defaultAccountsCacheCapacity = 1000 // TODO adjust - unbondingPeriodBlocks = int(60 * 24 * 365) // TODO probably better to make it time based. - validatorTimeoutBlocks = int(10) // TODO adjust - maxLoadStateElementSize = 0 // no max + stateKey = []byte("ExecutionState") + defaultAccountsCacheCapacity = 1000 // TODO adjust ) // TODO @@ -50,39 +49,68 @@ const GasLimit = uint64(1000000) // NOTE: not goroutine-safe. type State struct { sync.RWMutex - db dbm.DB - // BondedValidators *types.ValidatorSet - // LastBondedValidators *types.ValidatorSet - // UnbondingValidators *types.ValidatorSet - accounts merkle.Tree // Shouldn't be accessed directly. - validatorInfos merkle.Tree // Shouldn't be accessed directly. - nameReg merkle.Tree // Shouldn't be accessed directly. + db dbm.DB + accounts merkle.Tree // Shouldn't be accessed directly. + nameReg merkle.Tree // Shouldn't be accessed directly. + lastSavedHash []byte + logger logging_types.InfoTraceLogger } // Implements account and blockchain state var _ acm.Updater = &State{} - var _ acm.StateIterable = &State{} - var _ acm.StateWriter = &State{} -func MakeGenesisState(db dbm.DB, genDoc *genesis.GenesisDoc) (*State, error) { - if len(genDoc.Validators) == 0 { +type PersistedState struct { + AccountsRootHash []byte + NameRegHash []byte +} + +func newState(db dbm.DB) *State { + return &State{ + db: db, + accounts: iavl.NewIAVLTree(defaultAccountsCacheCapacity, db), + nameReg: iavl.NewIAVLTree(0, db), + } +} + +func LoadOrMakeGenesisState(db dbm.DB, genesisDoc *genesis.GenesisDoc, + logger logging_types.InfoTraceLogger) (*State, error) { + + logger = logging.WithScope(logger, "LoadOrMakeGenesisState") + logging.InfoMsg(logger, "Trying to load execution state from database", + "database_key", stateKey) + state, err := LoadState(db) + if err != nil { + return nil, fmt.Errorf("error loading genesis state from database: %v", err) + } + if state != nil { + return state, nil + } + + logging.InfoMsg(logger, "No existing execution state found in database, making genesis state") + return MakeGenesisState(db, genesisDoc) +} + +// Make genesis state from GenesisDoc and save to DB +func MakeGenesisState(db dbm.DB, genesisDoc *genesis.GenesisDoc) (*State, error) { + if len(genesisDoc.Validators) == 0 { return nil, fmt.Errorf("the genesis file has no validators") } - if genDoc.GenesisTime.IsZero() { + state := newState(db) + + if genesisDoc.GenesisTime.IsZero() { // NOTE: [ben] change GenesisTime to requirement on v0.17 // GenesisTime needs to be deterministic across the chain // and should be required in the genesis file; // the requirement is not yet enforced when lacking set // time to 11/18/2016 @ 4:09am (UTC) - genDoc.GenesisTime = time.Unix(1479442162, 0) + genesisDoc.GenesisTime = time.Unix(1479442162, 0) } // Make accounts state tree - accounts := iavl.NewIAVLTree(defaultAccountsCacheCapacity, db) - for _, genAcc := range genDoc.Accounts { + for _, genAcc := range genesisDoc.Accounts { perm := genAcc.Permissions acc := &acm.ConcreteAccount{ Address: genAcc.Address, @@ -93,13 +121,13 @@ func MakeGenesisState(db dbm.DB, genDoc *genesis.GenesisDoc) (*State, error) { if err != nil { return nil, err } - accounts.Set(acc.Address.Bytes(), encodedAcc) + state.accounts.Set(acc.Address.Bytes(), encodedAcc) } // global permissions are saved as the 0 address // so they are included in the accounts tree globalPerms := ptypes.DefaultAccountPermissions - globalPerms = genDoc.GlobalPermissions + globalPerms = genesisDoc.GlobalPermissions // XXX: make sure the set bits are all true // Without it the HasPermission() functions will fail globalPerms.Base.SetBit = ptypes.AllPermFlags @@ -113,130 +141,99 @@ func MakeGenesisState(db dbm.DB, genDoc *genesis.GenesisDoc) (*State, error) { if err != nil { return nil, err } - accounts.Set(permsAcc.Address.Bytes(), encodedPermsAcc) - - // Make validatorInfos state tree && validators slice - /* - validatorInfos := merkle.NewIAVLTree(wire.BasicCodec, types.ValidatorInfoCodec, 0, db) - validators := make([]*types.Validator, len(genDoc.Validators)) - for i, val := range genDoc.Validators { - pubKey := val.PublicKey - address := pubKey.Address() - - // Make ValidatorInfo - valInfo := &types.ValidatorInfo{ - Address: address, - PublicKey: pubKey, - UnbondTo: make([]*types.TxOutput, len(val.UnbondTo)), - FirstBondHeight: 0, - FirstBondAmount: val.Amount, - } - for i, unbondTo := range val.UnbondTo { - valInfo.UnbondTo[i] = &types.TxOutput{ - Address: unbondTo.Address, - Amount: unbondTo.Amount, - } - } - validatorInfos.Set(address, valInfo) - - // Make validator - validators[i] = &types.Validator{ - Address: address, - PublicKey: pubKey, - VotingPower: val.Amount, - } - } - */ - - // Make namereg tree - nameReg := iavl.NewIAVLTree(0, db) - // TODO: add names, contracts to genesis.json + state.accounts.Set(permsAcc.Address.Bytes(), encodedPermsAcc) // IAVLTrees must be persisted before copy operations. - accounts.Save() - //validatorInfos.Save() - nameReg.Save() + err = state.Save() + if err != nil { + return nil, err + } + return state, nil - return &State{ - db: db, - //BondedValidators: types.NewValidatorSet(validators), - //LastBondedValidators: types.NewValidatorSet(nil), - //UnbondingValidators: types.NewValidatorSet(nil), - accounts: accounts, - //validatorInfos: validatorInfos, - nameReg: nameReg, - }, nil } +// Tries to load the execution state from DB, returns nil with no error if no state found func LoadState(db dbm.DB) (*State, error) { - s := &State{db: db} + state := newState(db) buf := db.Get(stateKey) if len(buf) == 0 { return nil, nil - } else { - r, n, err := bytes.NewReader(buf), new(int), new(error) - wire.ReadBinaryPtr(&s, r, 0, n, err) - if *err != nil { - // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED - return nil, fmt.Errorf("data has been corrupted or its spec has changed: %v", *err) - } } - return s, nil + persistedState, err := Decode(buf) + if err != nil { + return nil, err + } + state.accounts.Load(persistedState.AccountsRootHash) + state.nameReg.Load(persistedState.NameRegHash) + err = state.Save() + if err != nil { + return nil, err + } + return state, nil } -func (s *State) Save() { +func (s *State) Save() error { s.Lock() defer s.Unlock() s.accounts.Save() - //s.validatorInfos.Save() s.nameReg.Save() - s.db.SetSync(stateKey, wire.BinaryBytes(s)) + encodedState, err := s.Encode() + if err != nil { + return err + } + s.db.SetSync(stateKey, encodedState) + s.lastSavedHash = s.hash() + return nil +} + +func (s *State) LastSavedHash() []byte { + return s.lastSavedHash +} + +func (s *State) Encode() ([]byte, error) { + persistedState := &PersistedState{ + AccountsRootHash: s.accounts.Hash(), + NameRegHash: s.nameReg.Hash(), + } + encodedState, err := json.Marshal(persistedState) + if err != nil { + return nil, err + } + return encodedState, nil +} + +func Decode(encodedState []byte) (*PersistedState, error) { + persistedState := new(PersistedState) + err := json.Unmarshal(encodedState, persistedState) + if err != nil { + return nil, err + } + return persistedState, nil } // CONTRACT: // Copy() is a cheap way to take a snapshot, // as if State were copied by value. // TODO [Silas]: Kill this with fire it is totally broken - there is no safe way to copy IAVLTree while sharing database -func (s *State) Copy() *State { +func (s *State) copy() *State { return &State{ - db: s.db, - // BondedValidators: s.BondedValidators.Copy(), // TODO remove need for Copy() here. - // LastBondedValidators: s.LastBondedValidators.Copy(), // That is, make updates to the validator set - // UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. + db: s.db, accounts: s.accounts.Copy(), - //validatorInfos: s.validatorInfos.Copy(), - nameReg: s.nameReg.Copy(), + nameReg: s.nameReg.Copy(), } } -//func (s *State) Copy() *State { -// stateCopy := &State{ -// db: dbm.NewMemDB(), -// chainID: s.chainID, -// lastBlockHeight: s.lastBlockHeight, -// lastBlockAppHash: s.lastBlockAppHash, -// lastBlockTime: s.lastBlockTime, -// // BondedValidators: s.BondedValidators.Copy(), // TODO remove need for Copy() here. -// // LastBondedValidators: s.LastBondedValidators.Copy(), // That is, make updates to the validator set -// // UnbondingValidators: s.UnbondingValidators.Copy(), // copy the valSet lazily. -// accounts: copyTree(s.accounts), -// //validatorInfos: s.validatorInfos.Copy(), -// nameReg: copyTree(s.nameReg), -// evc: nil, -// } -// stateCopy.Save() -// return stateCopy -//} - -// Returns a hash that represents the state data, excluding Last* +// Computes the state hash, also computed on save where it is returned func (s *State) Hash() []byte { s.RLock() defer s.RUnlock() + return s.hash() +} + +// As Hash without lock +func (s *State) hash() []byte { return merkle.SimpleHashFromMap(map[string]interface{}{ - //"BondedValidators": s.BondedValidators, - //"UnbondingValidators": s.UnbondingValidators, - "Accounts": s.accounts.Hash(), - //"ValidatorInfos": s.validatorInfos, + "Accounts": s.accounts, "NameRegistry": s.nameReg, }) } @@ -253,6 +250,12 @@ func (s *State) GetAccount(address acm.Address) (acm.Account, error) { } func (s *State) UpdateAccount(account acm.Account) error { + // TODO: interop with StateCache by performing an update on the StorageRoot here if storage is dirty + // we need `dirtyStorage map[acm.Address]bool` + //if dirtyStorage[account] == true { + // s.accountStorage(account.Address()) + // := acm.AsMutableAccount(account).SetStorageRoot() + //} s.Lock() defer s.Unlock() encodedAccount, err := account.Encode() @@ -290,116 +293,6 @@ func (s *State) IterateAccounts(consumer func(acm.Account) (stop bool)) (stopped return } -// State.accounts -//------------------------------------- -// State.validators - -// XXX: now handled by tendermint core - -/* - -// The returned ValidatorInfo is a copy, so mutating it -// has no side effects. -func (s *State) GetValidatorInfo(address []byte) *types.ValidatorInfo { - _, valInfo := s.validatorInfos.Get(address) - if valInfo == nil { - return nil - } - return valInfo.(*types.ValidatorInfo).Copy() -} - -// Returns false if new, true if updated. -// The valInfo is copied before setting, so mutating it -// afterwards has no side effects. -func (s *State) SetValidatorInfo(valInfo *types.ValidatorInfo) (updated bool) { - return s.validatorInfos.Set(valInfo.Address, valInfo.Copy()) -} - -func (s *State) GetValidatorInfos() merkle.Tree { - return s.validatorInfos.Copy() -} - -func (s *State) unbondValidator(val *types.Validator) { - // Move validator to UnbondingValidators - val, removed := s.BondedValidators.Remove(val.Address) - if !removed { - PanicCrisis("Couldn't remove validator for unbonding") - } - val.UnbondHeight = s.lastBlockHeight + 1 - added := s.UnbondingValidators.Add(val) - if !added { - PanicCrisis("Couldn't add validator for unbonding") - } -} - -func (s *State) rebondValidator(val *types.Validator) { - // Move validator to BondingValidators - val, removed := s.UnbondingValidators.Remove(val.Address) - if !removed { - PanicCrisis("Couldn't remove validator for rebonding") - } - val.BondHeight = s.lastBlockHeight + 1 - added := s.BondedValidators.Add(val) - if !added { - PanicCrisis("Couldn't add validator for rebonding") - } -} - -func (s *State) releaseValidator(val *types.Validator) { - // Update validatorInfo - valInfo := s.GetValidatorInfo(val.Address) - if valInfo == nil { - PanicSanity("Couldn't find validatorInfo for release") - } - valInfo.ReleasedHeight = s.lastBlockHeight + 1 - s.SetValidatorInfo(valInfo) - - // Send coins back to UnbondTo outputs - accounts, err := getOrMakeOutputs(s, nil, valInfo.UnbondTo) - if err != nil { - PanicSanity("Couldn't get or make unbondTo accounts") - } - adjustByOutputs(accounts, valInfo.UnbondTo) - for _, acc := range accounts { - s.UpdateAccount(acc) - } - - // Remove validator from UnbondingValidators - _, removed := s.UnbondingValidators.Remove(val.Address) - if !removed { - PanicCrisis("Couldn't remove validator for release") - } -} - -func (s *State) destroyValidator(val *types.Validator) { - // Update validatorInfo - valInfo := s.GetValidatorInfo(val.Address) - if valInfo == nil { - PanicSanity("Couldn't find validatorInfo for release") - } - valInfo.DestroyedHeight = s.lastBlockHeight + 1 - valInfo.DestroyedAmount = val.VotingPower - s.SetValidatorInfo(valInfo) - - // Remove validator - _, removed := s.BondedValidators.Remove(val.Address) - if !removed { - _, removed := s.UnbondingValidators.Remove(val.Address) - if !removed { - PanicCrisis("Couldn't remove validator for destruction") - } - } - -} - -// Set the validator infos tree -func (s *State) SetValidatorInfos(validatorInfos merkle.Tree) { - s.validatorInfos = validatorInfos -} - -*/ - -// State.validators //------------------------------------- // State.storage @@ -434,6 +327,7 @@ func (s *State) GetStorage(address acm.Address, key binary.Word256) (binary.Word } func (s *State) SetStorage(address acm.Address, key, value binary.Word256) error { + // TODO: not sure this actually works - loading at old hash s.Lock() defer s.Unlock() storageTree, err := s.accountStorage(address) diff --git a/execution/state_test.go b/execution/state_test.go index d7ee60be..9a681bbe 100644 --- a/execution/state_test.go +++ b/execution/state_test.go @@ -37,7 +37,7 @@ import ( ) var deterministicGenesis = genesis.NewDeterministicGenesis(34059836243380576) -var testGenesisDoc, testPrivAccounts = deterministicGenesis. +var testGenesisDoc, testPrivAccounts, _ = deterministicGenesis. GenesisDoc(3, true, 1000, 1, true, 1000) var testChainID = testGenesisDoc.ChainID() @@ -52,7 +52,7 @@ func execTxWithStateAndBlockchain(state *State, tip bcm.Tip, tx txs.Tx) error { } func execTxWithState(state *State, tx txs.Tx) error { - return execTxWithStateAndBlockchain(state, bcm.NewBlockchain(testGenesisDoc), tx) + return execTxWithStateAndBlockchain(state, bcm.NewBlockchain(nil, testGenesisDoc), tx) } func commitNewBlock(state *State, blockchain bcm.MutableBlockchain) { @@ -70,7 +70,7 @@ func execTxWithStateNewBlock(state *State, blockchain bcm.MutableBlockchain, tx func makeGenesisState(numAccounts int, randBalance bool, minBalance uint64, numValidators int, randBonded bool, minBonded int64) (*State, []acm.PrivateAccount) { - testGenesisDoc, privAccounts := deterministicGenesis.GenesisDoc(numAccounts, randBalance, minBalance, + testGenesisDoc, privAccounts, _ := deterministicGenesis.GenesisDoc(numAccounts, randBalance, minBalance, numValidators, randBonded, minBonded) s0, err := MakeGenesisState(dbm.NewMemDB(), testGenesisDoc) if err != nil { @@ -104,9 +104,9 @@ func TestCopyState(t *testing.T) { } // Check hash of copy - s0Copy := s0.Copy() + s0Copy := s0.copy() assert.Equal(t, s0Hash, s0Copy.Hash(), "Expected state copy hash to be the same") - assert.Equal(t, s0Copy.Copy().Hash(), s0Copy.Hash(), "Expected COPY COPY COPY the same") + assert.Equal(t, s0Copy.copy().Hash(), s0Copy.Hash(), "Expected COPY COPY COPY the same") // Mutate the original; hash should change. acc0Address := privAccounts[0].Address() @@ -250,7 +250,7 @@ func TestTxSequence(t *testing.T) { tx.AddInputWithSequence(acc0PubKey, 1, sequence) tx.AddOutput(acc1.Address(), 1) tx.Inputs[0].Signature = acm.ChainSign(privAccounts[0], testChainID, tx) - stateCopy := state.Copy() + stateCopy := state.copy() err := execTxWithState(stateCopy, tx) if i == 1 { // Sequence is good. @@ -284,7 +284,7 @@ func TestNameTxs(t *testing.T) { state.Save() txs.MinNameRegistrationPeriod = 5 - blockchain := bcm.NewBlockchain(testGenesisDoc) + blockchain := bcm.NewBlockchain(nil, testGenesisDoc) startingBlock := blockchain.LastBlockHeight() // try some bad names. these should all fail @@ -486,7 +486,7 @@ func TestCreates(t *testing.T) { acc1 := getAccount(state, privAccounts[1].Address()) acc2 := getAccount(state, privAccounts[2].Address()) - state = state.Copy() + state = state.copy() newAcc1 := getAccount(state, acc1.Address()) newAcc1.SetCode(preFactoryCode) newAcc2 := getAccount(state, acc2.Address()) @@ -568,7 +568,7 @@ func TestContractSend(t *testing.T) { acc1 := getAccount(state, privAccounts[1].Address()) acc2 := getAccount(state, privAccounts[2].Address()) - state = state.Copy() + state = state.copy() newAcc1 := getAccount(state, acc1.Address()) newAcc1.SetCode(callerCode) state.UpdateAccount(newAcc1) @@ -613,7 +613,7 @@ func TestMerklePanic(t *testing.T) { state.Save() // SendTx. { - stateSendTx := state.Copy() + stateSendTx := state.copy() tx := &txs.SendTx{ Inputs: []*txs.TxInput{ { @@ -642,7 +642,7 @@ func TestMerklePanic(t *testing.T) { // CallTx. Just runs through it and checks the transfer. See vm, rpc tests for more { - stateCallTx := state.Copy() + stateCallTx := state.copy() newAcc1 := getAccount(stateCallTx, acc1.Address()) newAcc1.SetCode([]byte{0x60}) stateCallTx.UpdateAccount(newAcc1) @@ -680,7 +680,7 @@ func TestTxs(t *testing.T) { // SendTx. { - stateSendTx := state.Copy() + stateSendTx := state.copy() tx := &txs.SendTx{ Inputs: []*txs.TxInput{ { @@ -717,7 +717,7 @@ func TestTxs(t *testing.T) { // CallTx. Just runs through it and checks the transfer. See vm, rpc tests for more { - stateCallTx := state.Copy() + stateCallTx := state.copy() newAcc1 := getAccount(stateCallTx, acc1.Address()) newAcc1.SetCode([]byte{0x60}) stateCallTx.UpdateAccount(newAcc1) @@ -771,7 +771,7 @@ basis, and nodes can leave and rejoin the network at will, acc proof-of-work chain as proof of what happened while they were gone ` entryAmount := uint64(10000) - stateNameTx := state.Copy() + stateNameTx := state.copy() tx := &txs.NameTx{ Input: &txs.TxInput{ Address: acc0.Address(), @@ -890,7 +890,7 @@ func TestSelfDestruct(t *testing.T) { tx.Input.Signature = acm.ChainSign(privAccounts[0], testChainID, tx) // we use cache instead of execTxWithState so we can run the tx twice - exe := NewBatchCommitter(state, testChainID, bcm.NewBlockchain(testGenesisDoc), event.NewNoOpPublisher(), logger) + exe := NewBatchCommitter(state, testChainID, bcm.NewBlockchain(nil, testGenesisDoc), event.NewNoOpPublisher(), logger) if err := exe.Execute(tx); err != nil { t.Errorf("Got error in executing call transaction, %v", err) } diff --git a/genesis/deterministic_genesis.go b/genesis/deterministic_genesis.go index d511cc20..402a588b 100644 --- a/genesis/deterministic_genesis.go +++ b/genesis/deterministic_genesis.go @@ -21,7 +21,7 @@ func NewDeterministicGenesis(seed int64) *deterministicGenesis { } func (dg *deterministicGenesis) GenesisDoc(numAccounts int, randBalance bool, minBalance uint64, numValidators int, - randBonded bool, minBonded int64) (*GenesisDoc, []acm.PrivateAccount) { + randBonded bool, minBonded int64) (*GenesisDoc, []acm.PrivateAccount, []acm.PrivateAccount) { accounts := make([]Account, numAccounts) privAccounts := make([]acm.PrivateAccount, numAccounts) @@ -38,8 +38,10 @@ func (dg *deterministicGenesis) GenesisDoc(numAccounts int, randBalance bool, mi privAccounts[i] = privAccount } validators := make([]Validator, numValidators) + privValidators := make([]acm.PrivateAccount, numValidators) for i := 0; i < numValidators; i++ { validator := acm.GeneratePrivateAccountFromSecret(fmt.Sprintf("val_%v", i)) + privValidators[i] = validator validators[i] = Validator{ BasicAccount: BasicAccount{ Address: validator.Address(), @@ -59,7 +61,7 @@ func (dg *deterministicGenesis) GenesisDoc(numAccounts int, randBalance bool, mi GenesisTime: time.Unix(1506172037, 0), Accounts: accounts, Validators: validators, - }, privAccounts + }, privAccounts, privValidators } diff --git a/server/server.go b/process/process.go similarity index 78% rename from server/server.go rename to process/process.go index 08a70efb..960f5b7e 100644 --- a/server/server.go +++ b/process/process.go @@ -1,4 +1,4 @@ -package server +package process import ( "context" @@ -7,7 +7,7 @@ import ( ) // Copies the signature from http.Server's graceful shutdown method -type Server interface { +type Process interface { Shutdown(context.Context) error } @@ -19,7 +19,7 @@ func (sf ShutdownFunc) Shutdown(ctx context.Context) error { type Launcher struct { Name string - Launch func() (Server, error) + Launch func() (Process, error) } type listenersServer struct { @@ -27,8 +27,8 @@ type listenersServer struct { listeners map[net.Listener]struct{} } -// Providers a Server implementation from Listeners that are closed on shutdown -func FromListeners(listeners ...net.Listener) Server { +// Providers a Process implementation from Listeners that are closed on shutdown +func FromListeners(listeners ...net.Listener) Process { lns := make(map[net.Listener]struct{}, len(listeners)) for _, l := range listeners { lns[l] = struct{}{} diff --git a/rpc/service.go b/rpc/service.go index 1c127df6..517e898b 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -243,6 +243,7 @@ func (s *service) GetAccount(address acm.Address) (*ResultGetAccount, error) { if err != nil { return nil, err } + s.logger.Trace("method", "GetAccount", "sequence", acc.Sequence()) return &ResultGetAccount{Account: acm.AsConcreteAccount(acc)}, nil } diff --git a/rpc/tm/integration/shared.go b/rpc/tm/integration/shared.go index a12358f1..55b31ebc 100644 --- a/rpc/tm/integration/shared.go +++ b/rpc/tm/integration/shared.go @@ -35,6 +35,7 @@ import ( "github.com/hyperledger/burrow/core" "github.com/hyperledger/burrow/execution" "github.com/hyperledger/burrow/genesis" + "github.com/hyperledger/burrow/logging/config" "github.com/hyperledger/burrow/logging/lifecycle" "github.com/hyperledger/burrow/logging/loggers" "github.com/hyperledger/burrow/permission" @@ -72,7 +73,7 @@ var ( // We use this to wrap tests func TestWrapper(runner func() int) int { - fmt.Println("Running with integration TestWrapper (rpc/tm/client/shared.go)...") + fmt.Println("Running with integration TestWrapper (rpc/tm/integration/shared.go)...") os.RemoveAll(testDir) os.MkdirAll(testDir, 0777) @@ -81,7 +82,24 @@ func TestWrapper(runner func() int) int { tmConf := tm_config.DefaultConfig() logger := loggers.NewNoopInfoTraceLogger() if debugLogging { - logger, _, _ = lifecycle.NewStdErrLogger() + var err error + // Change config as needed + logger, err = lifecycle.NewLoggerFromLoggingConfig(&config.LoggingConfig{ + RootSink: config.Sink(). + SetTransform(config.FilterTransform(config.IncludeWhenAnyMatches, + //"","", + "method", "GetAccount", + "message", "execution error", + "message", "Incrementing sequence number", + )). + AddSinks(config.Sink().SetTransform(config.FilterTransform(config.ExcludeWhenAnyMatches, "run_call", "false")). + AddSinks(config.Sink().SetTransform(config.PruneTransform("log_channel", "trace", "scope", "returns", "run_id", "args")). + AddSinks(config.Sink().SetTransform(config.SortTransform("tx_hash", "time", "message", "method")). + SetOutput(config.StdoutOutput())))), + }) + if err != nil { + panic(err) + } } privValidator := validator.NewPrivValidatorMemory(privateAccounts[0], privateAccounts[0]) @@ -90,7 +108,10 @@ func TestWrapper(runner func() int) int { if err != nil { panic(err) } - defer kernel.Shutdown(context.Background()) + // Sometimes better to not shutdown as logging errors on shutdown may obscure real issue + defer func() { + //kernel.Shutdown(context.Background()) + }() err = kernel.Boot() if err != nil { @@ -165,7 +186,6 @@ func makeDefaultNameTx(t *testing.T, client tm_client.RPCClient, name, value str // get an account's sequence number func getSequence(t *testing.T, client tm_client.RPCClient, addr acm.Address) uint64 { - acc, err := tm_client.GetAccount(client, addr) if err != nil { t.Fatal(err) diff --git a/rpc/tm/integration/shared_test.go b/rpc/tm/integration/shared_test.go index d126df30..9e8f1d17 100644 --- a/rpc/tm/integration/shared_test.go +++ b/rpc/tm/integration/shared_test.go @@ -20,6 +20,7 @@ package integration import ( "os" "testing" + "time" ) // Needs to be in a _test.go file to be picked up @@ -28,5 +29,6 @@ func TestMain(m *testing.M) { return m.Run() }) + time.Sleep(3 * time.Second) os.Exit(returnValue) } diff --git a/rpc/tm/integration/websocket_client_test.go b/rpc/tm/integration/websocket_client_test.go index e13e817e..8f7c974f 100644 --- a/rpc/tm/integration/websocket_client_test.go +++ b/rpc/tm/integration/websocket_client_test.go @@ -150,40 +150,39 @@ func TestWSDoubleFire(t *testing.T) { // create a contract, wait for the event, and send it a msg, validate the return func TestWSCallWait(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } wsc := newWSClient() - eid1 := exe_events.EventStringAccountInput(privateAccounts[0].Address()) - subId1 := subscribeAndGetSubscriptionId(t, wsc, eid1) - defer func() { + defer stopWSClient(wsc) + // Mini soak test + for i := 0; i < 20; i++ { + if testing.Short() { + t.Skip("skipping test in short mode.") + } + amt, gasLim, fee := uint64(10000), uint64(1000), uint64(1000) + code, returnCode, returnVal := simpleContract() + var contractAddr acm.Address + eid1 := exe_events.EventStringAccountInput(privateAccounts[0].Address()) + subId1 := subscribeAndGetSubscriptionId(t, wsc, eid1) + // wait for the contract to be created + waitForEvent(t, wsc, eid1, func() { + tx := makeDefaultCallTx(t, jsonRpcClient, nil, code, amt, gasLim, fee) + receipt := broadcastTx(t, jsonRpcClient, tx) + contractAddr = receipt.ContractAddress + }, unmarshalValidateTx(amt, returnCode)) unsubscribe(t, wsc, subId1) - stopWSClient(wsc) - }() - amt, gasLim, fee := uint64(10000), uint64(1000), uint64(1000) - code, returnCode, returnVal := simpleContract() - var contractAddr acm.Address - // wait for the contract to be created - waitForEvent(t, wsc, eid1, func() { - tx := makeDefaultCallTx(t, jsonRpcClient, nil, code, amt, gasLim, fee) - receipt := broadcastTx(t, jsonRpcClient, tx) - contractAddr = receipt.ContractAddress - }, unmarshalValidateTx(amt, returnCode)) - // susbscribe to the new contract - amt = uint64(10001) - eid2 := exe_events.EventStringAccountOutput(contractAddr) - subId2 := subscribeAndGetSubscriptionId(t, wsc, eid2) - defer func() { + // susbscribe to the new contract + amt = uint64(10001) + eid2 := exe_events.EventStringAccountOutput(contractAddr) + subId2 := subscribeAndGetSubscriptionId(t, wsc, eid2) + // get the return value from a call + data := []byte{0x1} + waitForEvent(t, wsc, eid2, func() { + tx := makeDefaultCallTx(t, jsonRpcClient, &contractAddr, data, amt, gasLim, fee) + receipt := broadcastTx(t, jsonRpcClient, tx) + contractAddr = receipt.ContractAddress + }, unmarshalValidateTx(amt, returnVal)) unsubscribe(t, wsc, subId2) - }() - // get the return value from a call - data := []byte{0x1} - waitForEvent(t, wsc, eid2, func() { - tx := makeDefaultCallTx(t, jsonRpcClient, &contractAddr, data, amt, gasLim, fee) - receipt := broadcastTx(t, jsonRpcClient, tx) - contractAddr = receipt.ContractAddress - }, unmarshalValidateTx(amt, returnVal)) + } } // create a contract and send it a msg without waiting. wait for contract event diff --git a/txs/go_wire_codec.go b/txs/go_wire_codec.go index ebedfcf7..320c1f5e 100644 --- a/txs/go_wire_codec.go +++ b/txs/go_wire_codec.go @@ -31,7 +31,8 @@ func (gwc *goWireCodec) EncodeTx(tx Tx) ([]byte, error) { if err != nil { return nil, err } - return buf.Bytes(), nil + // Tendermint mempool exhibits odd concurrency issues when using a mutable buffer + return copyBuffer(buf) } // panic on err @@ -51,3 +52,12 @@ func (gwc *goWireCodec) recycle(buf *bytes.Buffer) { buf.Reset() gwc.bufferPool.Put(buf) } + +func copyBuffer(buf *bytes.Buffer) ([]byte, error) { + bs := make([]byte, buf.Len()) + _, err := buf.Read(bs) + if err != nil { + return nil, err + } + return bs, nil +} -- GitLab