From 94a65860ffce6f4c7069ce65e89caf6f4ea0993e Mon Sep 17 00:00:00 2001 From: Silas Davis <silas@monax.io> Date: Tue, 10 Apr 2018 00:48:09 +0100 Subject: [PATCH] Ensure that the full tendermint commit phase has finished (including recheckTxs) before allowing the Transactor to query transactions from the mempool again. This ensures SequentialSigningAccount will always retrieve the freshest sequence numbers since it will block until all transactions in the mempool have been replayed to CheckTx after a commit. Signed-off-by: Silas Davis <silas@monax.io> --- account/state/state_cache.go | 1 - blockchain/blockchain.go | 2 + consensus/tendermint/abci/app.go | 74 +++++++++----- consensus/tendermint/tendermint.go | 3 +- core/integration/test_wrapper.go | 10 +- core/kernel.go | 2 +- execution/accounts.go | 4 +- execution/events/events.go | 4 +- execution/execution.go | 16 ++- execution/transactor.go | 157 ++++++++++++++++++++--------- rpc/service.go | 38 +++---- rpc/tm/methods.go | 4 +- rpc/v0/integration/v0_test.go | 2 +- 13 files changed, 201 insertions(+), 116 deletions(-) diff --git a/account/state/state_cache.go b/account/state/state_cache.go index a3675177..81b0dcd2 100644 --- a/account/state/state_cache.go +++ b/account/state/state_cache.go @@ -91,7 +91,6 @@ func (cache *stateCache) UpdateAccount(account acm.Account) error { } accInfo.account = account accInfo.updated = true - return nil } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index c81f4524..dc13271f 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -52,6 +52,8 @@ type Tip interface { // Burrow's portion of the Blockchain state type Blockchain interface { + // Read locker + sync.Locker Root Tip // Returns an immutable copy of the tip diff --git a/consensus/tendermint/abci/app.go b/consensus/tendermint/abci/app.go index 99320fb3..22cb095a 100644 --- a/consensus/tendermint/abci/app.go +++ b/consensus/tendermint/abci/app.go @@ -2,6 +2,7 @@ package abci import ( "fmt" + "sync" "time" bcm "github.com/hyperledger/burrow/blockchain" @@ -17,11 +18,12 @@ import ( const responseInfoName = "Burrow" -type abciApp struct { +type App struct { // State - blockchain bcm.MutableBlockchain - checker execution.BatchExecutor - committer execution.BatchCommitter + blockchain bcm.MutableBlockchain + checker execution.BatchExecutor + committer execution.BatchCommitter + mempoolLocker sync.Locker // We need to cache these from BeginBlock for when we need actually need it in Commit block *abci_types.RequestBeginBlock // Utility @@ -30,11 +32,13 @@ type abciApp struct { logger *logging.Logger } +var _ abci_types.Application = &App{} + func NewApp(blockchain bcm.MutableBlockchain, checker execution.BatchExecutor, committer execution.BatchCommitter, - logger *logging.Logger) abci_types.Application { - return &abciApp{ + logger *logging.Logger) *App { + return &App{ blockchain: blockchain, checker: checker, committer: committer, @@ -43,7 +47,14 @@ func NewApp(blockchain bcm.MutableBlockchain, } } -func (app *abciApp) Info(info abci_types.RequestInfo) abci_types.ResponseInfo { +// Provide the Mempool lock. When provided we will attempt to acquire this lock in a goroutine during the Commit. We +// will keep the checker cache locked until we are able to acquire the mempool lock which signals the end of the commit +// and possible recheck on Tendermint's side. +func (app *App) SetMempoolLocker(mempoolLocker sync.Locker) { + app.mempoolLocker = mempoolLocker +} + +func (app *App) Info(info abci_types.RequestInfo) abci_types.ResponseInfo { tip := app.blockchain.Tip() return abci_types.ResponseInfo{ Data: responseInfoName, @@ -53,19 +64,19 @@ func (app *abciApp) Info(info abci_types.RequestInfo) abci_types.ResponseInfo { } } -func (app *abciApp) SetOption(option abci_types.RequestSetOption) (respSetOption abci_types.ResponseSetOption) { +func (app *App) SetOption(option abci_types.RequestSetOption) (respSetOption abci_types.ResponseSetOption) { respSetOption.Log = "SetOption not supported" respSetOption.Code = codes.UnsupportedRequestCode return } -func (app *abciApp) Query(reqQuery abci_types.RequestQuery) (respQuery abci_types.ResponseQuery) { +func (app *App) Query(reqQuery abci_types.RequestQuery) (respQuery abci_types.ResponseQuery) { respQuery.Log = "Query not supported" respQuery.Code = codes.UnsupportedRequestCode return } -func (app *abciApp) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { +func (app *App) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { tx, err := app.txDecoder.DecodeTx(txBytes) if err != nil { app.logger.TraceMsg("CheckTx decoding error", @@ -76,7 +87,6 @@ func (app *abciApp) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { Log: fmt.Sprintf("Encoding error: %s", err), } } - // TODO: map ExecTx errors to sensible ABCI error codes receipt := txs.GenerateReceipt(app.blockchain.ChainID(), tx) err = app.checker.Execute(tx) @@ -104,17 +114,17 @@ func (app *abciApp) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { } } -func (app *abciApp) InitChain(chain abci_types.RequestInitChain) (respInitChain abci_types.ResponseInitChain) { +func (app *App) InitChain(chain abci_types.RequestInitChain) (respInitChain abci_types.ResponseInitChain) { // Could verify agreement on initial validator set here return } -func (app *abciApp) BeginBlock(block abci_types.RequestBeginBlock) (respBeginBlock abci_types.ResponseBeginBlock) { +func (app *App) BeginBlock(block abci_types.RequestBeginBlock) (respBeginBlock abci_types.ResponseBeginBlock) { app.block = &block return } -func (app *abciApp) DeliverTx(txBytes []byte) abci_types.ResponseDeliverTx { +func (app *App) DeliverTx(txBytes []byte) abci_types.ResponseDeliverTx { tx, err := app.txDecoder.DecodeTx(txBytes) if err != nil { app.logger.TraceMsg("DeliverTx decoding error", @@ -152,12 +162,12 @@ func (app *abciApp) DeliverTx(txBytes []byte) abci_types.ResponseDeliverTx { } } -func (app *abciApp) EndBlock(reqEndBlock abci_types.RequestEndBlock) (respEndBlock abci_types.ResponseEndBlock) { +func (app *App) EndBlock(reqEndBlock abci_types.RequestEndBlock) (respEndBlock abci_types.ResponseEndBlock) { // Validator mutation goes here return } -func (app *abciApp) Commit() abci_types.ResponseCommit { +func (app *App) Commit() abci_types.ResponseCommit { tip := app.blockchain.Tip() app.logger.InfoMsg("Committing block", "tag", "Commit", @@ -169,12 +179,31 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { "last_block_time", tip.LastBlockTime(), "last_block_hash", tip.LastBlockHash()) - // Commit state before resetting check cache so that the emptied cache servicing some RPC requests will fall through - // to committed state when check state is reset - // TODO: determine why the ordering of updates does not experience invalid sequence number during recheck. It - // seems there is nothing to stop a Transact transaction from querying the checker cache before it has been replayed - // all transactions and so would formulate a transaction with the same sequence number as one in mempool. - // However this is not observed in v0_tests.go - we need to understand why or create a test that exposes this + // Lock the checker while we reset it and possibly while recheckTxs replays transactions + app.checker.Lock() + defer func() { + // Tendermint may replay transactions to the check cache during a recheck, which happens after we have returned + // from Commit(). The mempool is locked by Tendermint for the duration of the commit phase; during Commit() and + // the subsequent mempool.Update() so we schedule an acquisition of the mempool lock in a goroutine in order to + // 'observe' the mempool unlock event that happens later on. By keeping the checker read locked during that + // period we can ensure that anything querying the checker (such as service.MempoolAccounts()) will block until + // the full Tendermint commit phase has completed. + if app.mempoolLocker != nil { + go func() { + // we won't get this until after the commit and we will acquire strictly after this commit phase has + // ended (i.e. when Tendermint's BlockExecutor.Commit() returns + app.mempoolLocker.Lock() + // Prevent any mempool getting relocked while we unlock - we could just unlock immediately but if a new + // commit starts gives goroutines blocked on checker a chance to progress before the next commit phase + defer app.mempoolLocker.Unlock() + app.checker.Unlock() + }() + } else { + // If we have not be provided with access to the mempool lock + app.checker.Unlock() + } + }() + appHash, err := app.committer.Commit() if err != nil { return abci_types.ResponseCommit{ @@ -213,7 +242,6 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { app.blockchain.LastBlockHeight(), app.block.Header.Height), } } - return abci_types.ResponseCommit{ Code: codes.TxExecutionSuccessCode, Data: appHash, diff --git a/consensus/tendermint/tendermint.go b/consensus/tendermint/tendermint.go index 27fa18ee..c66c9699 100644 --- a/consensus/tendermint/tendermint.go +++ b/consensus/tendermint/tendermint.go @@ -55,8 +55,8 @@ func NewNode( // disable Tendermint's RPC conf.RPC.ListenAddress = "" - app := abci.NewApp(blockchain, checker, committer, logger) nde := &Node{} + app := abci.NewApp(blockchain, checker, committer, logger) nde.Node, err = node.NewNode(conf, privValidator, proxy.NewLocalClientCreator(app), func() (*tm_types.GenesisDoc, error) { @@ -68,6 +68,7 @@ func NewNode( if err != nil { return nil, err } + app.SetMempoolLocker(nde.MempoolReactor().Mempool) return nde, nil } diff --git a/core/integration/test_wrapper.go b/core/integration/test_wrapper.go index 20875d17..960dd5d5 100644 --- a/core/integration/test_wrapper.go +++ b/core/integration/test_wrapper.go @@ -32,6 +32,7 @@ import ( "github.com/hyperledger/burrow/logging" "github.com/hyperledger/burrow/logging/config" "github.com/hyperledger/burrow/logging/lifecycle" + "github.com/hyperledger/burrow/logging/structure" "github.com/hyperledger/burrow/permission" "github.com/hyperledger/burrow/rpc" tm_config "github.com/tendermint/tendermint/config" @@ -62,13 +63,8 @@ func TestWrapper(privateAccounts []acm.PrivateAccount, genesisDoc *genesis.Genes ExcludeTrace: false, RootSink: config.Sink(). SetTransform(config.FilterTransform(config.IncludeWhenAnyMatches, - //"","", - "method", "GetAccount", - "method", "BroadcastTx", - "tag", "sequence", - "tag", "Commit", - "tag", "CheckTx", - "tag", "DeliverTx", + structure.ComponentKey, "Tendermint", + structure.ScopeKey, "executor.Execute\\(tx txs.Tx\\)", )). //AddSinks(config.Sink().SetTransform(config.FilterTransform(config.ExcludeWhenAnyMatches, "run_call", "false")). AddSinks(config.Sink().SetTransform(config.PruneTransform("log_channel", "trace", "scope", "returns", "run_id", "args")). diff --git a/core/kernel.go b/core/kernel.go index 1a1fed0c..70fc3a71 100644 --- a/core/kernel.go +++ b/core/kernel.go @@ -105,7 +105,7 @@ func NewKernel(ctx context.Context, keyClient keys.KeyClient, privValidator tm_t logger) nameReg := state - service := rpc.NewService(ctx, state, nameReg, checker, committer, emitter, blockchain, keyClient, transactor, + service := rpc.NewService(ctx, state, nameReg, checker, emitter, blockchain, keyClient, transactor, query.NewNodeView(tmNode, txCodec), logger) launchers := []process.Launcher{ diff --git a/execution/accounts.go b/execution/accounts.go index eaa01a18..7602367e 100644 --- a/execution/accounts.go +++ b/execution/accounts.go @@ -10,6 +10,8 @@ import ( burrow_sync "github.com/hyperledger/burrow/sync" ) +// Accounts pairs an underlying state.Reader with a KeyClient to provide a signing variant of an account +// it also maintains a lock over addresses to provide a linearisation of signing events using SequentialSigningAccount type Accounts struct { burrow_sync.RingMutex state.Reader @@ -35,7 +37,7 @@ func NewAccounts(reader state.Reader, keyClient keys.KeyClient, mutexCount int) } } func (accs *Accounts) SigningAccount(address acm.Address, signer acm.Signer) (*SigningAccount, error) { - account, err := state.GetMutableAccount(accs.Reader, address) + account, err := state.GetMutableAccount(accs, address) if err != nil { return nil, err } diff --git a/execution/events/events.go b/execution/events/events.go index 6f8c394f..2d4dd548 100644 --- a/execution/events/events.go +++ b/execution/events/events.go @@ -71,8 +71,8 @@ func SubscribeAccountOutputSendTx(ctx context.Context, subscribable event.Subscr AndEquals(event.TxHashKey, hex.EncodeUpperToString(txHash)) return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) bool { - if eventDataCall, ok := message.(*EventDataTx); ok { - if sendTx, ok := eventDataCall.Tx.(*txs.SendTx); ok { + if edt, ok := message.(*EventDataTx); ok { + if sendTx, ok := edt.Tx.(*txs.SendTx); ok { ch <- sendTx } } diff --git a/execution/execution.go b/execution/execution.go index 82db5e94..ebe5ae15 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -33,10 +33,12 @@ import ( "github.com/hyperledger/burrow/txs" ) -// TODO +// TODO: make configurable const GasLimit = uint64(1000000) type BatchExecutor interface { + // Provides access to write lock for a BatchExecutor so reads can be prevented for the duration of a commit + sync.Locker state.Reader // Execute transaction against block cache (i.e. block buffer) Execute(tx txs.Tx) error @@ -103,6 +105,11 @@ func newExecutor(name string, runCall bool, backend *State, chainID string, tip return exe } +// executor exposes access to the underlying state cache protected by a RWMutex that prevents access while locked +// (during an ABCI commit). while access can occur (and needs to continue for CheckTx/DeliverTx to make progress) +// through calls to Execute() external readers will be blocked until the executor is unlocked that allows the Transactor +// to always access the freshest mempool state as needed by accounts.SequentialSigningAccount +// // Accounts func (exe *executor) GetAccount(address acm.Address) (acm.Account, error) { exe.RLock() @@ -118,8 +125,8 @@ func (exe *executor) GetStorage(address acm.Address, key binary.Word256) (binary } func (exe *executor) Commit() (hash []byte, err error) { - exe.Lock() - defer exe.Unlock() + // The write lock to the executor is controlled by the caller (e.g. abci.App) so we do not acquire it here to avoid + // deadlock defer func() { if r := recover(); r != nil { err = fmt.Errorf("recovered from panic in executor.Commit(): %v\n%v", r, debug.Stack()) @@ -145,8 +152,7 @@ func (exe *executor) Commit() (hash []byte, err error) { } func (exe *executor) Reset() error { - exe.Lock() - defer exe.Unlock() + // As with Commit() we do not take the write lock here exe.stateCache.Reset(exe.state) exe.nameRegCache.Reset(exe.state) return nil diff --git a/execution/transactor.go b/execution/transactor.go index 9ffc5477..f42278b5 100644 --- a/execution/transactor.go +++ b/execution/transactor.go @@ -20,6 +20,8 @@ import ( "runtime/debug" "time" + "bytes" + acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/account/state" "github.com/hyperledger/burrow/binary" @@ -162,71 +164,74 @@ func (trans *Transactor) BroadcastTx(tx txs.Tx) (*txs.Receipt, error) { // Orders calls to BroadcastTx using lock (waits for response from core before releasing) func (trans *Transactor) Transact(sequentialSigningAccount *SequentialSigningAccount, address *acm.Address, data []byte, gasLimit, fee uint64) (*txs.Receipt, error) { - // Note we rely on back pressure from BroadcastTx when serialising access to from a particular input account while - // the mempool rechecks (and so the checker, having just be reset does not yet have the latest sequence numbers - // for some accounts). BroadcastTx will block when the mempool is rechecking or during a commit so provided - // subsequent Transacts from the same input account block on those ahead of it we are able to stream transactions - // continuously with sequential sequence numbers. By taking this lock we ensure this. + + // Use the get the freshest sequence numbers from mempool state and hold the lock until we get a response from + // CheckTx inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } defer unlock() - txInput := &txs.TxInput{ - Address: inputAccount.Address(), - Amount: fee, - Sequence: inputAccount.Sequence() + 1, - PublicKey: inputAccount.PublicKey(), - } - tx := &txs.CallTx{ - Input: txInput, - Address: address, - GasLimit: gasLimit, - Fee: fee, - Data: data, + callTx, _, err := trans.formulateCallTx(inputAccount, address, data, gasLimit, fee) + if err != nil { + return nil, err } - // Got ourselves a tx. - err = tx.Sign(trans.tip.ChainID(), inputAccount) + err = callTx.Sign(trans.tip.ChainID(), inputAccount) if err != nil { return nil, err } - return trans.BroadcastTx(tx) + return trans.BroadcastTx(callTx) } func (trans *Transactor) TransactAndHold(sequentialSigningAccount *SequentialSigningAccount, address *acm.Address, data []byte, gasLimit, fee uint64) (*evm_events.EventDataCall, error) { - receipt, err := trans.Transact(sequentialSigningAccount, address, data, gasLimit, fee) + inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } + defer unlock() - // We want non-blocking on the first event received (but buffer the value), - // after which we want to block (and then discard the value - see below) - wc := make(chan *evm_events.EventDataCall, 1) + callTx, expectedReceipt, err := trans.formulateCallTx(inputAccount, address, data, gasLimit, fee) + if err != nil { + return nil, err + } subID, err := event.GenerateSubscriptionID() if err != nil { return nil, err } - err = evm_events.SubscribeAccountCall(context.Background(), trans.eventEmitter, subID, receipt.ContractAddress, - receipt.TxHash, 0, wc) + // We want non-blocking on the first event received (but buffer the value), + // after which we want to block (and then discard the value - see below) + ch := make(chan *evm_events.EventDataCall, 1) + + err = evm_events.SubscribeAccountCall(context.Background(), trans.eventEmitter, subID, expectedReceipt.ContractAddress, + expectedReceipt.TxHash, 0, ch) if err != nil { return nil, err } // Will clean up callback goroutine and subscription in pubsub defer trans.eventEmitter.UnsubscribeAll(context.Background(), subID) + receipt, err := trans.BroadcastTx(callTx) + if err != nil { + return nil, err + } + if !bytes.Equal(receipt.TxHash, expectedReceipt.TxHash) { + return nil, fmt.Errorf("BroadcastTx received TxHash %X but %X was expected", + receipt.TxHash, expectedReceipt.TxHash) + } + timer := time.NewTimer(BlockingTimeoutSeconds * time.Second) defer timer.Stop() select { case <-timer.C: - return nil, fmt.Errorf("transaction timed out TxHash: %X", receipt.TxHash) - case eventDataCall := <-wc: + return nil, fmt.Errorf("transaction timed out TxHash: %X", expectedReceipt.TxHash) + case eventDataCall := <-ch: if eventDataCall.Exception != "" { return nil, fmt.Errorf("error when transacting: " + eventDataCall.Exception) } else { @@ -234,66 +239,122 @@ func (trans *Transactor) TransactAndHold(sequentialSigningAccount *SequentialSig } } } +func (trans *Transactor) formulateCallTx(inputAccount *SigningAccount, address *acm.Address, data []byte, + gasLimit, fee uint64) (*txs.CallTx, *txs.Receipt, error) { + + txInput := &txs.TxInput{ + Address: inputAccount.Address(), + Amount: fee, + Sequence: inputAccount.Sequence() + 1, + PublicKey: inputAccount.PublicKey(), + } + tx := &txs.CallTx{ + Input: txInput, + Address: address, + GasLimit: gasLimit, + Fee: fee, + Data: data, + } + + // Got ourselves a tx. + err := tx.Sign(trans.tip.ChainID(), inputAccount) + if err != nil { + return nil, nil, err + } + receipt := txs.GenerateReceipt(trans.tip.ChainID(), tx) + return tx, &receipt, nil +} -func (trans *Transactor) Send(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { - tx := txs.NewSendTx() +func (trans *Transactor) Send(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, + amount uint64) (*txs.Receipt, error) { inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } defer unlock() - txInput := &txs.TxInput{ - Address: inputAccount.Address(), - Amount: amount, - Sequence: inputAccount.Sequence() + 1, - PublicKey: inputAccount.PublicKey(), - } - tx.Inputs = append(tx.Inputs, txInput) - txOutput := &txs.TxOutput{Address: toAddress, Amount: amount} - tx.Outputs = append(tx.Outputs, txOutput) - err = tx.Sign(trans.tip.ChainID(), inputAccount) + sendTx, _, err := trans.formulateSendTx(inputAccount, toAddress, amount) if err != nil { return nil, err } - return trans.BroadcastTx(tx) + + return trans.BroadcastTx(sendTx) } -func (trans *Transactor) SendAndHold(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { - receipt, err := trans.Send(sequentialSigningAccount, toAddress, amount) +func (trans *Transactor) SendAndHold(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, + amount uint64) (*txs.Receipt, error) { + + inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } + defer unlock() - wc := make(chan *txs.SendTx) + sendTx, expectedReceipt, err := trans.formulateSendTx(inputAccount, toAddress, amount) + if err != nil { + return nil, err + } subID, err := event.GenerateSubscriptionID() if err != nil { return nil, err } + wc := make(chan *txs.SendTx) err = exe_events.SubscribeAccountOutputSendTx(context.Background(), trans.eventEmitter, subID, toAddress, - receipt.TxHash, wc) + expectedReceipt.TxHash, wc) if err != nil { return nil, err } defer trans.eventEmitter.UnsubscribeAll(context.Background(), subID) + receipt, err := trans.BroadcastTx(sendTx) + if err != nil { + return nil, err + } + if !bytes.Equal(receipt.TxHash, expectedReceipt.TxHash) { + return nil, fmt.Errorf("BroadcastTx received TxHash %X but %X was expected", + receipt.TxHash, expectedReceipt.TxHash) + } + timer := time.NewTimer(BlockingTimeoutSeconds * time.Second) defer timer.Stop() select { case <-timer.C: - return nil, fmt.Errorf("transaction timed out TxHash: %X", receipt.TxHash) + return nil, fmt.Errorf("transaction timed out TxHash: %X", expectedReceipt.TxHash) case sendTx := <-wc: // This is a double check - we subscribed to this tx's hash so something has gone wrong if the amounts don't match if sendTx.Inputs[0].Amount == amount { - return receipt, nil + return expectedReceipt, nil } return nil, fmt.Errorf("received SendTx but hash doesn't seem to match what we subscribed to, "+ - "received SendTx: %v which does not match receipt on sending: %v", sendTx, receipt) + "received SendTx: %v which does not match receipt on sending: %v", sendTx, expectedReceipt) + } +} + +func (trans *Transactor) formulateSendTx(inputAccount *SigningAccount, toAddress acm.Address, + amount uint64) (*txs.SendTx, *txs.Receipt, error) { + + sendTx := txs.NewSendTx() + txInput := &txs.TxInput{ + Address: inputAccount.Address(), + Amount: amount, + Sequence: inputAccount.Sequence() + 1, + PublicKey: inputAccount.PublicKey(), } + sendTx.Inputs = append(sendTx.Inputs, txInput) + txOutput := &txs.TxOutput{Address: toAddress, Amount: amount} + sendTx.Outputs = append(sendTx.Outputs, txOutput) + + err := sendTx.Sign(trans.tip.ChainID(), inputAccount) + if err != nil { + return nil, nil, err + } + + receipt := txs.GenerateReceipt(trans.tip.ChainID(), sendTx) + return sendTx, &receipt, nil } func (trans *Transactor) TransactNameReg(sequentialSigningAccount *SequentialSigningAccount, name, data string, amount, diff --git a/rpc/service.go b/rpc/service.go index 28965ede..ed1b1062 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -18,8 +18,6 @@ import ( "context" "fmt" - "sync" - acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/account/state" "github.com/hyperledger/burrow/binary" @@ -45,10 +43,8 @@ const AccountsRingMutexCount = 100 // Base service that provides implementation for all underlying RPC methods type Service struct { ctx context.Context - committedState state.Iterable - commitLocker sync.Locker + state state.Iterable nameReg execution.NameRegIterable - accounts *execution.Accounts mempoolAccounts *execution.Accounts subscribable event.Subscribable blockchain bcm.Blockchain @@ -57,14 +53,13 @@ type Service struct { logger *logging.Logger } -func NewService(ctx context.Context, committedState state.Iterable, nameReg execution.NameRegIterable, - checker state.Reader, committer execution.BatchCommitter, subscribable event.Subscribable, blockchain bcm.Blockchain, - keyClient keys.KeyClient, transactor *execution.Transactor, nodeView query.NodeView, logger *logging.Logger) *Service { +func NewService(ctx context.Context, state state.Iterable, nameReg execution.NameRegIterable, + checker state.Reader, subscribable event.Subscribable, blockchain bcm.Blockchain, keyClient keys.KeyClient, + transactor *execution.Transactor, nodeView query.NodeView, logger *logging.Logger) *Service { return &Service{ ctx: ctx, - committedState: committedState, - accounts: execution.NewAccounts(committedState, keyClient, AccountsRingMutexCount), + state: state, mempoolAccounts: execution.NewAccounts(checker, keyClient, AccountsRingMutexCount), nameReg: nameReg, subscribable: subscribable, @@ -97,18 +92,13 @@ func (s *Service) Transactor() *execution.Transactor { // for a key it holds or is provided - it is down to the key-holder to manage the mutual information between transactions // concurrent within a new block window. -// Get the latest committed account state and signing accounts -func (s *Service) Accounts() *execution.Accounts { - return s.accounts -} - // Get pending account state residing in the mempool func (s *Service) MempoolAccounts() *execution.Accounts { return s.mempoolAccounts } -func (s *Service) CommitLocker() sync.Locker { - return s.commitLocker +func (s *Service) State() state.Reader { + return s.state } func (s *Service) ListUnconfirmedTxs(maxTxs int) (*ResultListUnconfirmedTxs, error) { @@ -233,7 +223,7 @@ func (s *Service) Genesis() (*ResultGenesis, error) { // Accounts func (s *Service) GetAccount(address acm.Address) (*ResultGetAccount, error) { - acc, err := s.accounts.GetAccount(address) + acc, err := s.state.GetAccount(address) if err != nil { return nil, err } @@ -245,7 +235,7 @@ func (s *Service) GetAccount(address acm.Address) (*ResultGetAccount, error) { func (s *Service) ListAccounts(predicate func(acm.Account) bool) (*ResultListAccounts, error) { accounts := make([]*acm.ConcreteAccount, 0) - s.committedState.IterateAccounts(func(account acm.Account) (stop bool) { + s.state.IterateAccounts(func(account acm.Account) (stop bool) { if predicate(account) { accounts = append(accounts, acm.AsConcreteAccount(account)) } @@ -259,7 +249,7 @@ func (s *Service) ListAccounts(predicate func(acm.Account) bool) (*ResultListAcc } func (s *Service) GetStorage(address acm.Address, key []byte) (*ResultGetStorage, error) { - account, err := s.accounts.GetAccount(address) + account, err := s.state.GetAccount(address) if err != nil { return nil, err } @@ -267,7 +257,7 @@ func (s *Service) GetStorage(address acm.Address, key []byte) (*ResultGetStorage return nil, fmt.Errorf("UnknownAddress: %s", address) } - value, err := s.accounts.GetStorage(address, binary.LeftPadWord256(key)) + value, err := s.state.GetStorage(address, binary.LeftPadWord256(key)) if err != nil { return nil, err } @@ -278,7 +268,7 @@ func (s *Service) GetStorage(address acm.Address, key []byte) (*ResultGetStorage } func (s *Service) DumpStorage(address acm.Address) (*ResultDumpStorage, error) { - account, err := s.accounts.GetAccount(address) + account, err := s.state.GetAccount(address) if err != nil { return nil, err } @@ -286,7 +276,7 @@ func (s *Service) DumpStorage(address acm.Address) (*ResultDumpStorage, error) { return nil, fmt.Errorf("UnknownAddress: %X", address) } var storageItems []StorageItem - s.committedState.IterateStorage(address, func(key, value binary.Word256) (stop bool) { + s.state.IterateStorage(address, func(key, value binary.Word256) (stop bool) { storageItems = append(storageItems, StorageItem{Key: key.UnpadLeft(), Value: value.UnpadLeft()}) return }) @@ -297,7 +287,7 @@ func (s *Service) DumpStorage(address acm.Address) (*ResultDumpStorage, error) { } func (s *Service) GetAccountHumanReadable(address acm.Address) (*ResultGetAccountHumanReadable, error) { - acc, err := s.accounts.GetAccount(address) + acc, err := s.state.GetAccount(address) if err != nil { return nil, err } diff --git a/rpc/tm/methods.go b/rpc/tm/methods.go index 27753d43..13f3445e 100644 --- a/rpc/tm/methods.go +++ b/rpc/tm/methods.go @@ -80,7 +80,7 @@ func GetRoutes(service *rpc.Service, logger *logging.Logger) map[string]*gorpc.R // Simulated call Call: gorpc.NewRPCFunc(func(fromAddress, toAddress acm.Address, data []byte) (*rpc.ResultCall, error) { - call, err := service.Transactor().Call(service.Accounts(), fromAddress, toAddress, data) + call, err := service.Transactor().Call(service.State(), fromAddress, toAddress, data) if err != nil { return nil, err } @@ -88,7 +88,7 @@ func GetRoutes(service *rpc.Service, logger *logging.Logger) map[string]*gorpc.R }, "fromAddress,toAddress,data"), CallCode: gorpc.NewRPCFunc(func(fromAddress acm.Address, code, data []byte) (*rpc.ResultCall, error) { - call, err := service.Transactor().CallCode(service.Accounts(), fromAddress, code, data) + call, err := service.Transactor().CallCode(service.State(), fromAddress, code, data) if err != nil { return nil, err } diff --git a/rpc/v0/integration/v0_test.go b/rpc/v0/integration/v0_test.go index b5229d74..8c333c74 100644 --- a/rpc/v0/integration/v0_test.go +++ b/rpc/v0/integration/v0_test.go @@ -63,7 +63,7 @@ func TestTransactCallNoCode(t *testing.T) { func TestTransactCreate(t *testing.T) { numGoroutines := 100 - numCreates := 10 + numCreates := 50 wg := new(sync.WaitGroup) wg.Add(numGoroutines) cli := v0.NewV0Client("http://localhost:1337/rpc") -- GitLab