diff --git a/account/state/state_cache.go b/account/state/state_cache.go index a3675177e53bfe12a4ae3d1f5d84bf3abad58a6d..81b0dcd2c24469968be0db44f3d82b4ec7d9e559 100644 --- a/account/state/state_cache.go +++ b/account/state/state_cache.go @@ -91,7 +91,6 @@ func (cache *stateCache) UpdateAccount(account acm.Account) error { } accInfo.account = account accInfo.updated = true - return nil } diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index c81f4524e0e516be3bdea4494033030b90b92d46..dc13271f7d6901467da4510830860c04cb504af1 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -52,6 +52,8 @@ type Tip interface { // Burrow's portion of the Blockchain state type Blockchain interface { + // Read locker + sync.Locker Root Tip // Returns an immutable copy of the tip diff --git a/consensus/tendermint/abci/app.go b/consensus/tendermint/abci/app.go index 99320fb3b9b558d1d1a845bb507e37882b7eb93e..22cb095ad5a6305fef1a2ac4d45efde545c6ac27 100644 --- a/consensus/tendermint/abci/app.go +++ b/consensus/tendermint/abci/app.go @@ -2,6 +2,7 @@ package abci import ( "fmt" + "sync" "time" bcm "github.com/hyperledger/burrow/blockchain" @@ -17,11 +18,12 @@ import ( const responseInfoName = "Burrow" -type abciApp struct { +type App struct { // State - blockchain bcm.MutableBlockchain - checker execution.BatchExecutor - committer execution.BatchCommitter + blockchain bcm.MutableBlockchain + checker execution.BatchExecutor + committer execution.BatchCommitter + mempoolLocker sync.Locker // We need to cache these from BeginBlock for when we need actually need it in Commit block *abci_types.RequestBeginBlock // Utility @@ -30,11 +32,13 @@ type abciApp struct { logger *logging.Logger } +var _ abci_types.Application = &App{} + func NewApp(blockchain bcm.MutableBlockchain, checker execution.BatchExecutor, committer execution.BatchCommitter, - logger *logging.Logger) abci_types.Application { - return &abciApp{ + logger *logging.Logger) *App { + return &App{ blockchain: blockchain, checker: checker, committer: committer, @@ -43,7 +47,14 @@ func NewApp(blockchain bcm.MutableBlockchain, } } -func (app *abciApp) Info(info abci_types.RequestInfo) abci_types.ResponseInfo { +// Provide the Mempool lock. When provided we will attempt to acquire this lock in a goroutine during the Commit. We +// will keep the checker cache locked until we are able to acquire the mempool lock which signals the end of the commit +// and possible recheck on Tendermint's side. +func (app *App) SetMempoolLocker(mempoolLocker sync.Locker) { + app.mempoolLocker = mempoolLocker +} + +func (app *App) Info(info abci_types.RequestInfo) abci_types.ResponseInfo { tip := app.blockchain.Tip() return abci_types.ResponseInfo{ Data: responseInfoName, @@ -53,19 +64,19 @@ func (app *abciApp) Info(info abci_types.RequestInfo) abci_types.ResponseInfo { } } -func (app *abciApp) SetOption(option abci_types.RequestSetOption) (respSetOption abci_types.ResponseSetOption) { +func (app *App) SetOption(option abci_types.RequestSetOption) (respSetOption abci_types.ResponseSetOption) { respSetOption.Log = "SetOption not supported" respSetOption.Code = codes.UnsupportedRequestCode return } -func (app *abciApp) Query(reqQuery abci_types.RequestQuery) (respQuery abci_types.ResponseQuery) { +func (app *App) Query(reqQuery abci_types.RequestQuery) (respQuery abci_types.ResponseQuery) { respQuery.Log = "Query not supported" respQuery.Code = codes.UnsupportedRequestCode return } -func (app *abciApp) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { +func (app *App) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { tx, err := app.txDecoder.DecodeTx(txBytes) if err != nil { app.logger.TraceMsg("CheckTx decoding error", @@ -76,7 +87,6 @@ func (app *abciApp) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { Log: fmt.Sprintf("Encoding error: %s", err), } } - // TODO: map ExecTx errors to sensible ABCI error codes receipt := txs.GenerateReceipt(app.blockchain.ChainID(), tx) err = app.checker.Execute(tx) @@ -104,17 +114,17 @@ func (app *abciApp) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { } } -func (app *abciApp) InitChain(chain abci_types.RequestInitChain) (respInitChain abci_types.ResponseInitChain) { +func (app *App) InitChain(chain abci_types.RequestInitChain) (respInitChain abci_types.ResponseInitChain) { // Could verify agreement on initial validator set here return } -func (app *abciApp) BeginBlock(block abci_types.RequestBeginBlock) (respBeginBlock abci_types.ResponseBeginBlock) { +func (app *App) BeginBlock(block abci_types.RequestBeginBlock) (respBeginBlock abci_types.ResponseBeginBlock) { app.block = &block return } -func (app *abciApp) DeliverTx(txBytes []byte) abci_types.ResponseDeliverTx { +func (app *App) DeliverTx(txBytes []byte) abci_types.ResponseDeliverTx { tx, err := app.txDecoder.DecodeTx(txBytes) if err != nil { app.logger.TraceMsg("DeliverTx decoding error", @@ -152,12 +162,12 @@ func (app *abciApp) DeliverTx(txBytes []byte) abci_types.ResponseDeliverTx { } } -func (app *abciApp) EndBlock(reqEndBlock abci_types.RequestEndBlock) (respEndBlock abci_types.ResponseEndBlock) { +func (app *App) EndBlock(reqEndBlock abci_types.RequestEndBlock) (respEndBlock abci_types.ResponseEndBlock) { // Validator mutation goes here return } -func (app *abciApp) Commit() abci_types.ResponseCommit { +func (app *App) Commit() abci_types.ResponseCommit { tip := app.blockchain.Tip() app.logger.InfoMsg("Committing block", "tag", "Commit", @@ -169,12 +179,31 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { "last_block_time", tip.LastBlockTime(), "last_block_hash", tip.LastBlockHash()) - // Commit state before resetting check cache so that the emptied cache servicing some RPC requests will fall through - // to committed state when check state is reset - // TODO: determine why the ordering of updates does not experience invalid sequence number during recheck. It - // seems there is nothing to stop a Transact transaction from querying the checker cache before it has been replayed - // all transactions and so would formulate a transaction with the same sequence number as one in mempool. - // However this is not observed in v0_tests.go - we need to understand why or create a test that exposes this + // Lock the checker while we reset it and possibly while recheckTxs replays transactions + app.checker.Lock() + defer func() { + // Tendermint may replay transactions to the check cache during a recheck, which happens after we have returned + // from Commit(). The mempool is locked by Tendermint for the duration of the commit phase; during Commit() and + // the subsequent mempool.Update() so we schedule an acquisition of the mempool lock in a goroutine in order to + // 'observe' the mempool unlock event that happens later on. By keeping the checker read locked during that + // period we can ensure that anything querying the checker (such as service.MempoolAccounts()) will block until + // the full Tendermint commit phase has completed. + if app.mempoolLocker != nil { + go func() { + // we won't get this until after the commit and we will acquire strictly after this commit phase has + // ended (i.e. when Tendermint's BlockExecutor.Commit() returns + app.mempoolLocker.Lock() + // Prevent any mempool getting relocked while we unlock - we could just unlock immediately but if a new + // commit starts gives goroutines blocked on checker a chance to progress before the next commit phase + defer app.mempoolLocker.Unlock() + app.checker.Unlock() + }() + } else { + // If we have not be provided with access to the mempool lock + app.checker.Unlock() + } + }() + appHash, err := app.committer.Commit() if err != nil { return abci_types.ResponseCommit{ @@ -213,7 +242,6 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { app.blockchain.LastBlockHeight(), app.block.Header.Height), } } - return abci_types.ResponseCommit{ Code: codes.TxExecutionSuccessCode, Data: appHash, diff --git a/consensus/tendermint/tendermint.go b/consensus/tendermint/tendermint.go index 27fa18ee4cdcf59e5589edfd7b86c99cae68d627..c66c9699085096c1403769a12c4769a1accec578 100644 --- a/consensus/tendermint/tendermint.go +++ b/consensus/tendermint/tendermint.go @@ -55,8 +55,8 @@ func NewNode( // disable Tendermint's RPC conf.RPC.ListenAddress = "" - app := abci.NewApp(blockchain, checker, committer, logger) nde := &Node{} + app := abci.NewApp(blockchain, checker, committer, logger) nde.Node, err = node.NewNode(conf, privValidator, proxy.NewLocalClientCreator(app), func() (*tm_types.GenesisDoc, error) { @@ -68,6 +68,7 @@ func NewNode( if err != nil { return nil, err } + app.SetMempoolLocker(nde.MempoolReactor().Mempool) return nde, nil } diff --git a/core/integration/test_wrapper.go b/core/integration/test_wrapper.go index 20875d1723db28cfedbfde7d6d31c14b7420704b..960dd5d5edbdde71d747097757019a860d3eb08c 100644 --- a/core/integration/test_wrapper.go +++ b/core/integration/test_wrapper.go @@ -32,6 +32,7 @@ import ( "github.com/hyperledger/burrow/logging" "github.com/hyperledger/burrow/logging/config" "github.com/hyperledger/burrow/logging/lifecycle" + "github.com/hyperledger/burrow/logging/structure" "github.com/hyperledger/burrow/permission" "github.com/hyperledger/burrow/rpc" tm_config "github.com/tendermint/tendermint/config" @@ -62,13 +63,8 @@ func TestWrapper(privateAccounts []acm.PrivateAccount, genesisDoc *genesis.Genes ExcludeTrace: false, RootSink: config.Sink(). SetTransform(config.FilterTransform(config.IncludeWhenAnyMatches, - //"","", - "method", "GetAccount", - "method", "BroadcastTx", - "tag", "sequence", - "tag", "Commit", - "tag", "CheckTx", - "tag", "DeliverTx", + structure.ComponentKey, "Tendermint", + structure.ScopeKey, "executor.Execute\\(tx txs.Tx\\)", )). //AddSinks(config.Sink().SetTransform(config.FilterTransform(config.ExcludeWhenAnyMatches, "run_call", "false")). AddSinks(config.Sink().SetTransform(config.PruneTransform("log_channel", "trace", "scope", "returns", "run_id", "args")). diff --git a/core/kernel.go b/core/kernel.go index 1a1fed0c465be49b492a0984ac4fc84b6620ced1..70fc3a7117deb42bdc202ebe5def061aacb8ef19 100644 --- a/core/kernel.go +++ b/core/kernel.go @@ -105,7 +105,7 @@ func NewKernel(ctx context.Context, keyClient keys.KeyClient, privValidator tm_t logger) nameReg := state - service := rpc.NewService(ctx, state, nameReg, checker, committer, emitter, blockchain, keyClient, transactor, + service := rpc.NewService(ctx, state, nameReg, checker, emitter, blockchain, keyClient, transactor, query.NewNodeView(tmNode, txCodec), logger) launchers := []process.Launcher{ diff --git a/execution/accounts.go b/execution/accounts.go index eaa01a188060befb672b6d8f542b27e59abd7f42..7602367eb35c66dfe23535d38bf6718ab25863b3 100644 --- a/execution/accounts.go +++ b/execution/accounts.go @@ -10,6 +10,8 @@ import ( burrow_sync "github.com/hyperledger/burrow/sync" ) +// Accounts pairs an underlying state.Reader with a KeyClient to provide a signing variant of an account +// it also maintains a lock over addresses to provide a linearisation of signing events using SequentialSigningAccount type Accounts struct { burrow_sync.RingMutex state.Reader @@ -35,7 +37,7 @@ func NewAccounts(reader state.Reader, keyClient keys.KeyClient, mutexCount int) } } func (accs *Accounts) SigningAccount(address acm.Address, signer acm.Signer) (*SigningAccount, error) { - account, err := state.GetMutableAccount(accs.Reader, address) + account, err := state.GetMutableAccount(accs, address) if err != nil { return nil, err } diff --git a/execution/events/events.go b/execution/events/events.go index 6f8c394fc20f095d3c8292552c00fb5011c02ac0..2d4dd548b17ff083dcfd4d9fb338e3c1af9935b9 100644 --- a/execution/events/events.go +++ b/execution/events/events.go @@ -71,8 +71,8 @@ func SubscribeAccountOutputSendTx(ctx context.Context, subscribable event.Subscr AndEquals(event.TxHashKey, hex.EncodeUpperToString(txHash)) return event.SubscribeCallback(ctx, subscribable, subscriber, query, func(message interface{}) bool { - if eventDataCall, ok := message.(*EventDataTx); ok { - if sendTx, ok := eventDataCall.Tx.(*txs.SendTx); ok { + if edt, ok := message.(*EventDataTx); ok { + if sendTx, ok := edt.Tx.(*txs.SendTx); ok { ch <- sendTx } } diff --git a/execution/execution.go b/execution/execution.go index 82db5e94cb4f3ebaeed8b0a306f1b3ac990eed8c..ebe5ae159e95742f823088b133f5979a9a37826d 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -33,10 +33,12 @@ import ( "github.com/hyperledger/burrow/txs" ) -// TODO +// TODO: make configurable const GasLimit = uint64(1000000) type BatchExecutor interface { + // Provides access to write lock for a BatchExecutor so reads can be prevented for the duration of a commit + sync.Locker state.Reader // Execute transaction against block cache (i.e. block buffer) Execute(tx txs.Tx) error @@ -103,6 +105,11 @@ func newExecutor(name string, runCall bool, backend *State, chainID string, tip return exe } +// executor exposes access to the underlying state cache protected by a RWMutex that prevents access while locked +// (during an ABCI commit). while access can occur (and needs to continue for CheckTx/DeliverTx to make progress) +// through calls to Execute() external readers will be blocked until the executor is unlocked that allows the Transactor +// to always access the freshest mempool state as needed by accounts.SequentialSigningAccount +// // Accounts func (exe *executor) GetAccount(address acm.Address) (acm.Account, error) { exe.RLock() @@ -118,8 +125,8 @@ func (exe *executor) GetStorage(address acm.Address, key binary.Word256) (binary } func (exe *executor) Commit() (hash []byte, err error) { - exe.Lock() - defer exe.Unlock() + // The write lock to the executor is controlled by the caller (e.g. abci.App) so we do not acquire it here to avoid + // deadlock defer func() { if r := recover(); r != nil { err = fmt.Errorf("recovered from panic in executor.Commit(): %v\n%v", r, debug.Stack()) @@ -145,8 +152,7 @@ func (exe *executor) Commit() (hash []byte, err error) { } func (exe *executor) Reset() error { - exe.Lock() - defer exe.Unlock() + // As with Commit() we do not take the write lock here exe.stateCache.Reset(exe.state) exe.nameRegCache.Reset(exe.state) return nil diff --git a/execution/transactor.go b/execution/transactor.go index 9ffc547760af366f9ae4222592ac5af44c226ed8..f42278b580a15c02cb8d6f695cfc562ef984e6ee 100644 --- a/execution/transactor.go +++ b/execution/transactor.go @@ -20,6 +20,8 @@ import ( "runtime/debug" "time" + "bytes" + acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/account/state" "github.com/hyperledger/burrow/binary" @@ -162,71 +164,74 @@ func (trans *Transactor) BroadcastTx(tx txs.Tx) (*txs.Receipt, error) { // Orders calls to BroadcastTx using lock (waits for response from core before releasing) func (trans *Transactor) Transact(sequentialSigningAccount *SequentialSigningAccount, address *acm.Address, data []byte, gasLimit, fee uint64) (*txs.Receipt, error) { - // Note we rely on back pressure from BroadcastTx when serialising access to from a particular input account while - // the mempool rechecks (and so the checker, having just be reset does not yet have the latest sequence numbers - // for some accounts). BroadcastTx will block when the mempool is rechecking or during a commit so provided - // subsequent Transacts from the same input account block on those ahead of it we are able to stream transactions - // continuously with sequential sequence numbers. By taking this lock we ensure this. + + // Use the get the freshest sequence numbers from mempool state and hold the lock until we get a response from + // CheckTx inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } defer unlock() - txInput := &txs.TxInput{ - Address: inputAccount.Address(), - Amount: fee, - Sequence: inputAccount.Sequence() + 1, - PublicKey: inputAccount.PublicKey(), - } - tx := &txs.CallTx{ - Input: txInput, - Address: address, - GasLimit: gasLimit, - Fee: fee, - Data: data, + callTx, _, err := trans.formulateCallTx(inputAccount, address, data, gasLimit, fee) + if err != nil { + return nil, err } - // Got ourselves a tx. - err = tx.Sign(trans.tip.ChainID(), inputAccount) + err = callTx.Sign(trans.tip.ChainID(), inputAccount) if err != nil { return nil, err } - return trans.BroadcastTx(tx) + return trans.BroadcastTx(callTx) } func (trans *Transactor) TransactAndHold(sequentialSigningAccount *SequentialSigningAccount, address *acm.Address, data []byte, gasLimit, fee uint64) (*evm_events.EventDataCall, error) { - receipt, err := trans.Transact(sequentialSigningAccount, address, data, gasLimit, fee) + inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } + defer unlock() - // We want non-blocking on the first event received (but buffer the value), - // after which we want to block (and then discard the value - see below) - wc := make(chan *evm_events.EventDataCall, 1) + callTx, expectedReceipt, err := trans.formulateCallTx(inputAccount, address, data, gasLimit, fee) + if err != nil { + return nil, err + } subID, err := event.GenerateSubscriptionID() if err != nil { return nil, err } - err = evm_events.SubscribeAccountCall(context.Background(), trans.eventEmitter, subID, receipt.ContractAddress, - receipt.TxHash, 0, wc) + // We want non-blocking on the first event received (but buffer the value), + // after which we want to block (and then discard the value - see below) + ch := make(chan *evm_events.EventDataCall, 1) + + err = evm_events.SubscribeAccountCall(context.Background(), trans.eventEmitter, subID, expectedReceipt.ContractAddress, + expectedReceipt.TxHash, 0, ch) if err != nil { return nil, err } // Will clean up callback goroutine and subscription in pubsub defer trans.eventEmitter.UnsubscribeAll(context.Background(), subID) + receipt, err := trans.BroadcastTx(callTx) + if err != nil { + return nil, err + } + if !bytes.Equal(receipt.TxHash, expectedReceipt.TxHash) { + return nil, fmt.Errorf("BroadcastTx received TxHash %X but %X was expected", + receipt.TxHash, expectedReceipt.TxHash) + } + timer := time.NewTimer(BlockingTimeoutSeconds * time.Second) defer timer.Stop() select { case <-timer.C: - return nil, fmt.Errorf("transaction timed out TxHash: %X", receipt.TxHash) - case eventDataCall := <-wc: + return nil, fmt.Errorf("transaction timed out TxHash: %X", expectedReceipt.TxHash) + case eventDataCall := <-ch: if eventDataCall.Exception != "" { return nil, fmt.Errorf("error when transacting: " + eventDataCall.Exception) } else { @@ -234,66 +239,122 @@ func (trans *Transactor) TransactAndHold(sequentialSigningAccount *SequentialSig } } } +func (trans *Transactor) formulateCallTx(inputAccount *SigningAccount, address *acm.Address, data []byte, + gasLimit, fee uint64) (*txs.CallTx, *txs.Receipt, error) { + + txInput := &txs.TxInput{ + Address: inputAccount.Address(), + Amount: fee, + Sequence: inputAccount.Sequence() + 1, + PublicKey: inputAccount.PublicKey(), + } + tx := &txs.CallTx{ + Input: txInput, + Address: address, + GasLimit: gasLimit, + Fee: fee, + Data: data, + } + + // Got ourselves a tx. + err := tx.Sign(trans.tip.ChainID(), inputAccount) + if err != nil { + return nil, nil, err + } + receipt := txs.GenerateReceipt(trans.tip.ChainID(), tx) + return tx, &receipt, nil +} -func (trans *Transactor) Send(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { - tx := txs.NewSendTx() +func (trans *Transactor) Send(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, + amount uint64) (*txs.Receipt, error) { inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } defer unlock() - txInput := &txs.TxInput{ - Address: inputAccount.Address(), - Amount: amount, - Sequence: inputAccount.Sequence() + 1, - PublicKey: inputAccount.PublicKey(), - } - tx.Inputs = append(tx.Inputs, txInput) - txOutput := &txs.TxOutput{Address: toAddress, Amount: amount} - tx.Outputs = append(tx.Outputs, txOutput) - err = tx.Sign(trans.tip.ChainID(), inputAccount) + sendTx, _, err := trans.formulateSendTx(inputAccount, toAddress, amount) if err != nil { return nil, err } - return trans.BroadcastTx(tx) + + return trans.BroadcastTx(sendTx) } -func (trans *Transactor) SendAndHold(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { - receipt, err := trans.Send(sequentialSigningAccount, toAddress, amount) +func (trans *Transactor) SendAndHold(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, + amount uint64) (*txs.Receipt, error) { + + inputAccount, unlock, err := sequentialSigningAccount.Lock() if err != nil { return nil, err } + defer unlock() - wc := make(chan *txs.SendTx) + sendTx, expectedReceipt, err := trans.formulateSendTx(inputAccount, toAddress, amount) + if err != nil { + return nil, err + } subID, err := event.GenerateSubscriptionID() if err != nil { return nil, err } + wc := make(chan *txs.SendTx) err = exe_events.SubscribeAccountOutputSendTx(context.Background(), trans.eventEmitter, subID, toAddress, - receipt.TxHash, wc) + expectedReceipt.TxHash, wc) if err != nil { return nil, err } defer trans.eventEmitter.UnsubscribeAll(context.Background(), subID) + receipt, err := trans.BroadcastTx(sendTx) + if err != nil { + return nil, err + } + if !bytes.Equal(receipt.TxHash, expectedReceipt.TxHash) { + return nil, fmt.Errorf("BroadcastTx received TxHash %X but %X was expected", + receipt.TxHash, expectedReceipt.TxHash) + } + timer := time.NewTimer(BlockingTimeoutSeconds * time.Second) defer timer.Stop() select { case <-timer.C: - return nil, fmt.Errorf("transaction timed out TxHash: %X", receipt.TxHash) + return nil, fmt.Errorf("transaction timed out TxHash: %X", expectedReceipt.TxHash) case sendTx := <-wc: // This is a double check - we subscribed to this tx's hash so something has gone wrong if the amounts don't match if sendTx.Inputs[0].Amount == amount { - return receipt, nil + return expectedReceipt, nil } return nil, fmt.Errorf("received SendTx but hash doesn't seem to match what we subscribed to, "+ - "received SendTx: %v which does not match receipt on sending: %v", sendTx, receipt) + "received SendTx: %v which does not match receipt on sending: %v", sendTx, expectedReceipt) + } +} + +func (trans *Transactor) formulateSendTx(inputAccount *SigningAccount, toAddress acm.Address, + amount uint64) (*txs.SendTx, *txs.Receipt, error) { + + sendTx := txs.NewSendTx() + txInput := &txs.TxInput{ + Address: inputAccount.Address(), + Amount: amount, + Sequence: inputAccount.Sequence() + 1, + PublicKey: inputAccount.PublicKey(), } + sendTx.Inputs = append(sendTx.Inputs, txInput) + txOutput := &txs.TxOutput{Address: toAddress, Amount: amount} + sendTx.Outputs = append(sendTx.Outputs, txOutput) + + err := sendTx.Sign(trans.tip.ChainID(), inputAccount) + if err != nil { + return nil, nil, err + } + + receipt := txs.GenerateReceipt(trans.tip.ChainID(), sendTx) + return sendTx, &receipt, nil } func (trans *Transactor) TransactNameReg(sequentialSigningAccount *SequentialSigningAccount, name, data string, amount, diff --git a/rpc/service.go b/rpc/service.go index 28965eded7324281a5e136bab5933392d36f073e..ed1b1062de472e4ce38fcaa8f4b8aaee0d1a41df 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -18,8 +18,6 @@ import ( "context" "fmt" - "sync" - acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/account/state" "github.com/hyperledger/burrow/binary" @@ -45,10 +43,8 @@ const AccountsRingMutexCount = 100 // Base service that provides implementation for all underlying RPC methods type Service struct { ctx context.Context - committedState state.Iterable - commitLocker sync.Locker + state state.Iterable nameReg execution.NameRegIterable - accounts *execution.Accounts mempoolAccounts *execution.Accounts subscribable event.Subscribable blockchain bcm.Blockchain @@ -57,14 +53,13 @@ type Service struct { logger *logging.Logger } -func NewService(ctx context.Context, committedState state.Iterable, nameReg execution.NameRegIterable, - checker state.Reader, committer execution.BatchCommitter, subscribable event.Subscribable, blockchain bcm.Blockchain, - keyClient keys.KeyClient, transactor *execution.Transactor, nodeView query.NodeView, logger *logging.Logger) *Service { +func NewService(ctx context.Context, state state.Iterable, nameReg execution.NameRegIterable, + checker state.Reader, subscribable event.Subscribable, blockchain bcm.Blockchain, keyClient keys.KeyClient, + transactor *execution.Transactor, nodeView query.NodeView, logger *logging.Logger) *Service { return &Service{ ctx: ctx, - committedState: committedState, - accounts: execution.NewAccounts(committedState, keyClient, AccountsRingMutexCount), + state: state, mempoolAccounts: execution.NewAccounts(checker, keyClient, AccountsRingMutexCount), nameReg: nameReg, subscribable: subscribable, @@ -97,18 +92,13 @@ func (s *Service) Transactor() *execution.Transactor { // for a key it holds or is provided - it is down to the key-holder to manage the mutual information between transactions // concurrent within a new block window. -// Get the latest committed account state and signing accounts -func (s *Service) Accounts() *execution.Accounts { - return s.accounts -} - // Get pending account state residing in the mempool func (s *Service) MempoolAccounts() *execution.Accounts { return s.mempoolAccounts } -func (s *Service) CommitLocker() sync.Locker { - return s.commitLocker +func (s *Service) State() state.Reader { + return s.state } func (s *Service) ListUnconfirmedTxs(maxTxs int) (*ResultListUnconfirmedTxs, error) { @@ -233,7 +223,7 @@ func (s *Service) Genesis() (*ResultGenesis, error) { // Accounts func (s *Service) GetAccount(address acm.Address) (*ResultGetAccount, error) { - acc, err := s.accounts.GetAccount(address) + acc, err := s.state.GetAccount(address) if err != nil { return nil, err } @@ -245,7 +235,7 @@ func (s *Service) GetAccount(address acm.Address) (*ResultGetAccount, error) { func (s *Service) ListAccounts(predicate func(acm.Account) bool) (*ResultListAccounts, error) { accounts := make([]*acm.ConcreteAccount, 0) - s.committedState.IterateAccounts(func(account acm.Account) (stop bool) { + s.state.IterateAccounts(func(account acm.Account) (stop bool) { if predicate(account) { accounts = append(accounts, acm.AsConcreteAccount(account)) } @@ -259,7 +249,7 @@ func (s *Service) ListAccounts(predicate func(acm.Account) bool) (*ResultListAcc } func (s *Service) GetStorage(address acm.Address, key []byte) (*ResultGetStorage, error) { - account, err := s.accounts.GetAccount(address) + account, err := s.state.GetAccount(address) if err != nil { return nil, err } @@ -267,7 +257,7 @@ func (s *Service) GetStorage(address acm.Address, key []byte) (*ResultGetStorage return nil, fmt.Errorf("UnknownAddress: %s", address) } - value, err := s.accounts.GetStorage(address, binary.LeftPadWord256(key)) + value, err := s.state.GetStorage(address, binary.LeftPadWord256(key)) if err != nil { return nil, err } @@ -278,7 +268,7 @@ func (s *Service) GetStorage(address acm.Address, key []byte) (*ResultGetStorage } func (s *Service) DumpStorage(address acm.Address) (*ResultDumpStorage, error) { - account, err := s.accounts.GetAccount(address) + account, err := s.state.GetAccount(address) if err != nil { return nil, err } @@ -286,7 +276,7 @@ func (s *Service) DumpStorage(address acm.Address) (*ResultDumpStorage, error) { return nil, fmt.Errorf("UnknownAddress: %X", address) } var storageItems []StorageItem - s.committedState.IterateStorage(address, func(key, value binary.Word256) (stop bool) { + s.state.IterateStorage(address, func(key, value binary.Word256) (stop bool) { storageItems = append(storageItems, StorageItem{Key: key.UnpadLeft(), Value: value.UnpadLeft()}) return }) @@ -297,7 +287,7 @@ func (s *Service) DumpStorage(address acm.Address) (*ResultDumpStorage, error) { } func (s *Service) GetAccountHumanReadable(address acm.Address) (*ResultGetAccountHumanReadable, error) { - acc, err := s.accounts.GetAccount(address) + acc, err := s.state.GetAccount(address) if err != nil { return nil, err } diff --git a/rpc/tm/methods.go b/rpc/tm/methods.go index 27753d43d6f98b86803fe911cfb74a0d87165d53..13f3445e2959310ddda1c05fe4d1400ee395a52a 100644 --- a/rpc/tm/methods.go +++ b/rpc/tm/methods.go @@ -80,7 +80,7 @@ func GetRoutes(service *rpc.Service, logger *logging.Logger) map[string]*gorpc.R // Simulated call Call: gorpc.NewRPCFunc(func(fromAddress, toAddress acm.Address, data []byte) (*rpc.ResultCall, error) { - call, err := service.Transactor().Call(service.Accounts(), fromAddress, toAddress, data) + call, err := service.Transactor().Call(service.State(), fromAddress, toAddress, data) if err != nil { return nil, err } @@ -88,7 +88,7 @@ func GetRoutes(service *rpc.Service, logger *logging.Logger) map[string]*gorpc.R }, "fromAddress,toAddress,data"), CallCode: gorpc.NewRPCFunc(func(fromAddress acm.Address, code, data []byte) (*rpc.ResultCall, error) { - call, err := service.Transactor().CallCode(service.Accounts(), fromAddress, code, data) + call, err := service.Transactor().CallCode(service.State(), fromAddress, code, data) if err != nil { return nil, err } diff --git a/rpc/v0/integration/v0_test.go b/rpc/v0/integration/v0_test.go index b5229d74f927db974663f47ad71fea5619a7dd3e..8c333c7409eaa379788d5fb52baa14fb239e2c74 100644 --- a/rpc/v0/integration/v0_test.go +++ b/rpc/v0/integration/v0_test.go @@ -63,7 +63,7 @@ func TestTransactCallNoCode(t *testing.T) { func TestTransactCreate(t *testing.T) { numGoroutines := 100 - numCreates := 10 + numCreates := 50 wg := new(sync.WaitGroup) wg.Add(numGoroutines) cli := v0.NewV0Client("http://localhost:1337/rpc")