diff --git a/account/state/state_cache.go b/account/state/state_cache.go index 76661f9098477b9468f2d4840c88ea545c2ffdef..a3675177e53bfe12a4ae3d1f5d84bf3abad58a6d 100644 --- a/account/state/state_cache.go +++ b/account/state/state_cache.go @@ -32,8 +32,7 @@ type Cache interface { type stateCache struct { sync.RWMutex - name string - + name string backend Reader accounts map[acm.Address]*accountInfo } diff --git a/cmd/burrow/main.go b/cmd/burrow/main.go index 9f5925ff17a7eac0a510ba5fc4375c5923f01811..bacc1ccd4e9261fbaa0346cd1c0e06063a6171c3 100644 --- a/cmd/burrow/main.go +++ b/cmd/burrow/main.go @@ -229,7 +229,7 @@ func main() { } } - if *chainNameOpt != ""{ + if *chainNameOpt != "" { if conf.GenesisDoc == nil { fatalf("Unable to set ChainName since no GenesisDoc/GenesisSpec provided.") } diff --git a/consensus/tendermint/abci/app.go b/consensus/tendermint/abci/app.go index d23bde6849734f24ef44e5bff08cb323f0f57256..4585470ae87fbf819bd7ddda28faa19cf1c31766 100644 --- a/consensus/tendermint/abci/app.go +++ b/consensus/tendermint/abci/app.go @@ -2,7 +2,6 @@ package abci import ( "fmt" - "sync" "time" bcm "github.com/hyperledger/burrow/blockchain" @@ -19,7 +18,6 @@ import ( const responseInfoName = "Burrow" type abciApp struct { - mtx sync.Mutex // State blockchain bcm.MutableBlockchain checker execution.BatchExecutor @@ -68,8 +66,6 @@ func (app *abciApp) Query(reqQuery abci_types.RequestQuery) (respQuery abci_type } func (app *abciApp) CheckTx(txBytes []byte) abci_types.ResponseCheckTx { - app.mtx.Lock() - defer app.mtx.Unlock() tx, err := app.txDecoder.DecodeTx(txBytes) if err != nil { app.logger.TraceMsg("CheckTx decoding error", @@ -119,8 +115,6 @@ func (app *abciApp) BeginBlock(block abci_types.RequestBeginBlock) (respBeginBlo } func (app *abciApp) DeliverTx(txBytes []byte) abci_types.ResponseDeliverTx { - app.mtx.Lock() - defer app.mtx.Unlock() tx, err := app.txDecoder.DecodeTx(txBytes) if err != nil { app.logger.TraceMsg("DeliverTx decoding error", @@ -164,8 +158,6 @@ func (app *abciApp) EndBlock(reqEndBlock abci_types.RequestEndBlock) (respEndBlo } func (app *abciApp) Commit() abci_types.ResponseCommit { - app.mtx.Lock() - defer app.mtx.Unlock() tip := app.blockchain.Tip() app.logger.InfoMsg("Committing block", "tag", "Commit", @@ -178,7 +170,7 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { "last_block_hash", tip.LastBlockHash()) // Commit state before resetting check cache so that the emptied cache servicing some RPC requests will fall through - // to committed state + // to committed state when check state is reset appHash, err := app.committer.Commit() if err != nil { return abci_types.ResponseCommit{ @@ -187,20 +179,20 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { } } - err = app.checker.Reset() + // Commit to our blockchain state + err = app.blockchain.CommitBlock(time.Unix(int64(app.block.Header.Time), 0), app.block.Hash, appHash) if err != nil { return abci_types.ResponseCommit{ Code: codes.CommitErrorCode, - Log: fmt.Sprintf("Could not reset check cache during commit: %s", err), + Log: fmt.Sprintf("Could not commit block to blockchain state: %s", err), } } - // Commit to our blockchain state - err = app.blockchain.CommitBlock(time.Unix(int64(app.block.Header.Time), 0), app.block.Hash, appHash) + err = app.checker.Reset() if err != nil { return abci_types.ResponseCommit{ Code: codes.CommitErrorCode, - Log: fmt.Sprintf("Could not commit block to blockchain state: %s", err), + Log: fmt.Sprintf("Could not reset check cache during commit: %s", err), } } @@ -217,6 +209,7 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { app.blockchain.LastBlockHeight(), app.block.Header.Height), } } + return abci_types.ResponseCommit{ Code: codes.TxExecutionSuccessCode, Data: appHash, diff --git a/core/integration/test_wrapper.go b/core/integration/test_wrapper.go index c5ee2892a651dcfac896cd7cc62a5731955848f3..20875d1723db28cfedbfde7d6d31c14b7420704b 100644 --- a/core/integration/test_wrapper.go +++ b/core/integration/test_wrapper.go @@ -59,6 +59,7 @@ func TestWrapper(privateAccounts []acm.PrivateAccount, genesisDoc *genesis.Genes var err error // Change config as needed logger, err = lifecycle.NewLoggerFromLoggingConfig(&config.LoggingConfig{ + ExcludeTrace: false, RootSink: config.Sink(). SetTransform(config.FilterTransform(config.IncludeWhenAnyMatches, //"","", diff --git a/core/kernel.go b/core/kernel.go index 70fc3a7117deb42bdc202ebe5def061aacb8ef19..1a1fed0c465be49b492a0984ac4fc84b6620ced1 100644 --- a/core/kernel.go +++ b/core/kernel.go @@ -105,7 +105,7 @@ func NewKernel(ctx context.Context, keyClient keys.KeyClient, privValidator tm_t logger) nameReg := state - service := rpc.NewService(ctx, state, nameReg, checker, emitter, blockchain, keyClient, transactor, + service := rpc.NewService(ctx, state, nameReg, checker, committer, emitter, blockchain, keyClient, transactor, query.NewNodeView(tmNode, txCodec), logger) launchers := []process.Launcher{ diff --git a/execution/accounts.go b/execution/accounts.go index 05fe03e28d159c1e15d544abfaee4f812f6c3ec5..dcc4cb9e0db88915e912f4b6d26e2e9880b5686e 100644 --- a/execution/accounts.go +++ b/execution/accounts.go @@ -2,13 +2,16 @@ package execution import ( "fmt" + "sync" acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/account/state" "github.com/hyperledger/burrow/keys" + burrow_sync "github.com/hyperledger/burrow/sync" ) type Accounts struct { + burrow_sync.RingMutex state.Reader keyClient keys.KeyClient } @@ -18,15 +21,19 @@ type SigningAccount struct { acm.Signer } -func NewAccounts(reader state.Reader, keyClient keys.KeyClient) *Accounts { +type SequentialSigningAccount struct { + accountLocker sync.Locker + getter func() (*SigningAccount, error) +} + +func NewAccounts(reader state.Reader, keyClient keys.KeyClient, mutexCount int) *Accounts { return &Accounts{ + RingMutex: *burrow_sync.NewRingMutex(mutexCount), Reader: reader, keyClient: keyClient, } } - -func (accs *Accounts) SigningAccount(address acm.Address) (*SigningAccount, error) { - signer := keys.Signer(accs.keyClient, address) +func (accs *Accounts) SigningAccount(address acm.Address, signer acm.Signer) (*SigningAccount, error) { account, err := state.GetMutableAccount(accs.Reader, address) if err != nil { return nil, err @@ -48,28 +55,36 @@ func (accs *Accounts) SigningAccount(address acm.Address) (*SigningAccount, erro }, nil } -func (accs *Accounts) SigningAccountFromPrivateKey(privateKeyBytes []byte) (*SigningAccount, error) { +func (accs *Accounts) SequentialSigningAccount(address acm.Address) *SequentialSigningAccount { + signer := keys.Signer(accs.keyClient, address) + return &SequentialSigningAccount{ + accountLocker: accs.Mutex(address.Bytes()), + getter: func() (*SigningAccount, error) { + return accs.SigningAccount(address, signer) + }, + } +} + +func (accs *Accounts) SequentialSigningAccountFromPrivateKey(privateKeyBytes []byte) (*SequentialSigningAccount, error) { if len(privateKeyBytes) != 64 { - return nil, fmt.Errorf("Private key is not of the right length: %d\n", len(privateKeyBytes)) + return nil, fmt.Errorf("private key is not of the right length: %d\n", len(privateKeyBytes)) } privateAccount, err := acm.GeneratePrivateAccountFromPrivateKeyBytes(privateKeyBytes) if err != nil { return nil, err } - account, err := state.GetMutableAccount(accs, privateAccount.Address()) - if err != nil { - return nil, err - } - // If the account is unknown to us return zeroed account for the address derived from the private key - if account == nil { - account = acm.ConcreteAccount{ - Address: privateAccount.Address(), - }.MutableAccount() - } - // Set the public key in case it was not known previously (needed for signing with an unseen account) - account.SetPublicKey(privateAccount.PublicKey()) - return &SigningAccount{ - Account: account, - Signer: privateAccount, + return &SequentialSigningAccount{ + accountLocker: accs.Mutex(privateAccount.Address().Bytes()), + getter: func() (*SigningAccount, error) { + return accs.SigningAccount(privateAccount.Address(), privateAccount) + }, }, nil } + +type UnlockFunc func() + +func (ssa *SequentialSigningAccount) Lock() (*SigningAccount, UnlockFunc, error) { + ssa.accountLocker.Lock() + account, err := ssa.getter() + return account, ssa.accountLocker.Unlock, err +} diff --git a/execution/execution.go b/execution/execution.go index de404f232e73a3f81a12946bf7d3af43667a02ad..82db5e94cb4f3ebaeed8b0a306f1b3ac990eed8c 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -69,42 +69,32 @@ type executor struct { var _ BatchExecutor = (*executor)(nil) // Wraps a cache of what is variously known as the 'check cache' and 'mempool' -func NewBatchChecker(state *State, - chainID string, - tip bcm.Tip, - logger *logging.Logger, +func NewBatchChecker(backend *State, chainID string, tip bcm.Tip, logger *logging.Logger, options ...ExecutionOption) BatchExecutor { - return newExecutor(false, state, chainID, tip, event.NewNoOpPublisher(), + + return newExecutor("CheckCache", false, backend, chainID, tip, event.NewNoOpPublisher(), logger.WithScope("NewBatchExecutor"), options...) } -func NewBatchCommitter(state *State, - chainID string, - tip bcm.Tip, - publisher event.Publisher, - logger *logging.Logger, +func NewBatchCommitter(backend *State, chainID string, tip bcm.Tip, publisher event.Publisher, logger *logging.Logger, options ...ExecutionOption) BatchCommitter { - return newExecutor(true, state, chainID, tip, publisher, + return newExecutor("CommitCache", true, backend, chainID, tip, publisher, logger.WithScope("NewBatchCommitter"), options...) } -func newExecutor(runCall bool, - backend *State, - chainID string, - tip bcm.Tip, - eventFireable event.Publisher, - logger *logging.Logger, - options ...ExecutionOption) *executor { +func newExecutor(name string, runCall bool, backend *State, chainID string, tip bcm.Tip, publisher event.Publisher, + logger *logging.Logger, options ...ExecutionOption) *executor { + exe := &executor{ chainID: chainID, tip: tip, runCall: runCall, state: backend, - stateCache: state.NewCache(backend), + stateCache: state.NewCache(backend, state.Name(name)), nameRegCache: NewNameRegCache(backend), - publisher: eventFireable, - eventCache: event.NewEventCache(eventFireable), + publisher: publisher, + eventCache: event.NewEventCache(publisher), logger: logger.With(structure.ComponentKey, "Executor"), } for _, option := range options { @@ -132,7 +122,7 @@ func (exe *executor) Commit() (hash []byte, err error) { defer exe.Unlock() defer func() { if r := recover(); r != nil { - err = fmt.Errorf("recovered from panic in executor.Commit(): %v", r) + err = fmt.Errorf("recovered from panic in executor.Commit(): %v\n%v", r, debug.Stack()) } }() // flush the caches @@ -334,7 +324,7 @@ func (exe *executor) Execute(tx txs.Tx) (err error) { callee acm.MutableAccount = nil // initialized below code []byte = nil ret []byte = nil - txCache = state.NewCache(exe.stateCache) + txCache = state.NewCache(exe.stateCache, state.Name("TxCache")) params = evm.Params{ BlockHeight: exe.tip.LastBlockHeight(), BlockHash: binary.LeftPadWord256(exe.tip.LastBlockHash()), diff --git a/execution/execution_test.go b/execution/execution_test.go index f1cdfb56a3e62bd64196934d2b0834e4c1432c59..50ce476712b1182f84a7dc725369223efe6b0412 100644 --- a/execution/execution_test.go +++ b/execution/execution_test.go @@ -129,12 +129,12 @@ func makeUsers(n int) []acm.AddressableSigner { } func makeExecutor(state *State) *executor { - return newExecutor(true, state, testChainID, bcm.NewBlockchain(nil, testGenesisDoc), event.NewEmitter(logger), - logger) + return newExecutor("makeExecutorCache", true, state, testChainID, + bcm.NewBlockchain(nil, testGenesisDoc), event.NewEmitter(logger), logger) } func newBaseGenDoc(globalPerm, accountPerm ptypes.AccountPermissions) genesis.GenesisDoc { - genAccounts := []genesis.Account{} + var genAccounts []genesis.Account for _, user := range users[:5] { accountPermCopy := accountPerm // Create new instance for custom overridability. genAccounts = append(genAccounts, genesis.Account{ @@ -1676,7 +1676,8 @@ func TestSelfDestruct(t *testing.T) { } func execTxWithStateAndBlockchain(state *State, tip bcm.Tip, tx txs.Tx) error { - exe := newExecutor(true, state, testChainID, tip, event.NewNoOpPublisher(), logger) + exe := newExecutor("execTxWithStateAndBlockchainCache", true, state, testChainID, tip, + event.NewNoOpPublisher(), logger) if err := exe.Execute(tx); err != nil { return err } else { diff --git a/execution/transactor.go b/execution/transactor.go index d2cfa6f4b5e5ad13f8a7487eca9884b4fc947ca8..62249eaec4be56f5afb79e92dd82e474a8361d4a 100644 --- a/execution/transactor.go +++ b/execution/transactor.go @@ -43,11 +43,6 @@ type Call struct { GasUsed uint64 } -type SequencedAddressableSigner interface { - acm.AddressableSigner - Sequence() uint64 -} - // Transactor is the controller/middleware for the v0 RPC type Transactor struct { tip blockchain.Tip @@ -130,7 +125,8 @@ func (trans *Transactor) BroadcastTxAsync(tx txs.Tx, callback func(res *abci_typ return trans.broadcastTxAsync(tx, callback) } -// Broadcast a transaction. +// Broadcast a transaction and waits for a response from the mempool. Transactions to BroadcastTx will block during +// various mempool operations (managed by Tendermint) including mempool Reap, Commit, and recheckTx. func (trans *Transactor) BroadcastTx(tx txs.Tx) (*txs.Receipt, error) { trans.logger.Trace.Log("method", "BroadcastTx", "tx_hash", tx.Hash(trans.tip.ChainID()), @@ -164,19 +160,16 @@ func (trans *Transactor) BroadcastTx(tx txs.Tx) (*txs.Receipt, error) { } // Orders calls to BroadcastTx using lock (waits for response from core before releasing) -func (trans *Transactor) Transact(inputAccount SequencedAddressableSigner, address *acm.Address, data []byte, gasLimit, - fee uint64) (*txs.Receipt, error) { - // TODO: [Silas] we should consider revising this method and removing fee, or - // possibly adding an amount parameter. It is non-sensical to just be able to - // set the fee. Our support of fees in general is questionable since at the - // moment all we do is deduct the fee effectively leaking token. It is possible - // someone may be using the sending of native token to payable functions but - // they can be served by broadcasting a token. - - // We hard-code the amount to be equal to the fee which means the CallTx we - // generate transfers 0 value, which is the most sensible default since in - // recent solidity compilers the EVM generated will throw an error if value - // is transferred to a non-payable function. +func (trans *Transactor) Transact(sequentialSigningAccount *SequentialSigningAccount, address *acm.Address, data []byte, + gasLimit, fee uint64) (*txs.Receipt, error) { + // Note we rely on back pressure from BroadcastTx when serialising access to from a particular input account while + // the mempool rechecks (and so the checker, having just be reset does not yet have the latest sequence numbers + // for some accounts). BroadcastTx will block when the mempool is rechecking or during a commit so provided + // subsequent Transacts from the same input account block on those ahead of it we are able to stream transactions + // continuously with sequential sequence numbers. By taking this lock we ensure this. + inputAccount, unlock, err := sequentialSigningAccount.Lock() + defer unlock() + txInput := &txs.TxInput{ Address: inputAccount.Address(), Amount: fee, @@ -192,17 +185,17 @@ func (trans *Transactor) Transact(inputAccount SequencedAddressableSigner, addre } // Got ourselves a tx. - err := tx.Sign(trans.tip.ChainID(), inputAccount) + err = tx.Sign(trans.tip.ChainID(), inputAccount) if err != nil { return nil, err } return trans.BroadcastTx(tx) } -func (trans *Transactor) TransactAndHold(inputAccount SequencedAddressableSigner, address *acm.Address, data []byte, gasLimit, +func (trans *Transactor) TransactAndHold(sequentialSigningAccount *SequentialSigningAccount, address *acm.Address, data []byte, gasLimit, fee uint64) (*evm_events.EventDataCall, error) { - receipt, err := trans.Transact(inputAccount, address, data, gasLimit, fee) + receipt, err := trans.Transact(sequentialSigningAccount, address, data, gasLimit, fee) if err != nil { return nil, err } @@ -239,9 +232,11 @@ func (trans *Transactor) TransactAndHold(inputAccount SequencedAddressableSigner } } -func (trans *Transactor) Send(inputAccount SequencedAddressableSigner, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { +func (trans *Transactor) Send(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { tx := txs.NewSendTx() + inputAccount, unlock, err := sequentialSigningAccount.Lock() + defer unlock() txInput := &txs.TxInput{ Address: inputAccount.Address(), Amount: amount, @@ -252,15 +247,15 @@ func (trans *Transactor) Send(inputAccount SequencedAddressableSigner, toAddress txOutput := &txs.TxOutput{Address: toAddress, Amount: amount} tx.Outputs = append(tx.Outputs, txOutput) - err := tx.Sign(trans.tip.ChainID(), inputAccount) + err = tx.Sign(trans.tip.ChainID(), inputAccount) if err != nil { return nil, err } return trans.BroadcastTx(tx) } -func (trans *Transactor) SendAndHold(inputAccount SequencedAddressableSigner, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { - receipt, err := trans.Send(inputAccount, toAddress, amount) +func (trans *Transactor) SendAndHold(sequentialSigningAccount *SequentialSigningAccount, toAddress acm.Address, amount uint64) (*txs.Receipt, error) { + receipt, err := trans.Send(sequentialSigningAccount, toAddress, amount) if err != nil { return nil, err } @@ -287,7 +282,7 @@ func (trans *Transactor) SendAndHold(inputAccount SequencedAddressableSigner, to return nil, fmt.Errorf("transaction timed out TxHash: %X", receipt.TxHash) case sendTx := <-wc: // This is a double check - we subscribed to this tx's hash so something has gone wrong if the amounts don't match - if sendTx.Inputs[0].Address == inputAccount.Address() && sendTx.Inputs[0].Amount == amount { + if sendTx.Inputs[0].Amount == amount { return receipt, nil } return nil, fmt.Errorf("received SendTx but hash doesn't seem to match what we subscribed to, "+ @@ -295,12 +290,14 @@ func (trans *Transactor) SendAndHold(inputAccount SequencedAddressableSigner, to } } -func (trans *Transactor) TransactNameReg(inputAccount SequencedAddressableSigner, name, data string, amount, +func (trans *Transactor) TransactNameReg(sequentialSigningAccount *SequentialSigningAccount, name, data string, amount, fee uint64) (*txs.Receipt, error) { + inputAccount, unlock, err := sequentialSigningAccount.Lock() + defer unlock() // Formulate and sign tx := txs.NewNameTxWithSequence(inputAccount.PublicKey(), name, data, amount, fee, inputAccount.Sequence()+1) - err := tx.Sign(trans.tip.ChainID(), inputAccount) + err = tx.Sign(trans.tip.ChainID(), inputAccount) if err != nil { return nil, err } diff --git a/rpc/service.go b/rpc/service.go index c7d9cf522479b7dc6a097417326beb4d03e26b54..28965eded7324281a5e136bab5933392d36f073e 100644 --- a/rpc/service.go +++ b/rpc/service.go @@ -18,6 +18,8 @@ import ( "context" "fmt" + "sync" + acm "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/account/state" "github.com/hyperledger/burrow/binary" @@ -38,11 +40,13 @@ import ( // Magic! Should probably be configurable, but not shouldn't be so huge we // end up DoSing ourselves. const MaxBlockLookback = 1000 +const AccountsRingMutexCount = 100 // Base service that provides implementation for all underlying RPC methods type Service struct { ctx context.Context committedState state.Iterable + commitLocker sync.Locker nameReg execution.NameRegIterable accounts *execution.Accounts mempoolAccounts *execution.Accounts @@ -54,14 +58,14 @@ type Service struct { } func NewService(ctx context.Context, committedState state.Iterable, nameReg execution.NameRegIterable, - checker state.Reader, subscribable event.Subscribable, blockchain bcm.Blockchain, keyClient keys.KeyClient, - transactor *execution.Transactor, nodeView query.NodeView, logger *logging.Logger) *Service { + checker state.Reader, committer execution.BatchCommitter, subscribable event.Subscribable, blockchain bcm.Blockchain, + keyClient keys.KeyClient, transactor *execution.Transactor, nodeView query.NodeView, logger *logging.Logger) *Service { return &Service{ ctx: ctx, committedState: committedState, - accounts: execution.NewAccounts(committedState, keyClient), - mempoolAccounts: execution.NewAccounts(checker, keyClient), + accounts: execution.NewAccounts(committedState, keyClient, AccountsRingMutexCount), + mempoolAccounts: execution.NewAccounts(checker, keyClient, AccountsRingMutexCount), nameReg: nameReg, subscribable: subscribable, blockchain: blockchain, @@ -103,6 +107,10 @@ func (s *Service) MempoolAccounts() *execution.Accounts { return s.mempoolAccounts } +func (s *Service) CommitLocker() sync.Locker { + return s.commitLocker +} + func (s *Service) ListUnconfirmedTxs(maxTxs int) (*ResultListUnconfirmedTxs, error) { // Get all transactions for now transactions, err := s.nodeView.MempoolTransactions(maxTxs) diff --git a/rpc/v0/integration/v0_test.go b/rpc/v0/integration/v0_test.go index 2a0eaee4121052ecd3c3c7a07d776f22f1af3ed8..a0e3b8564c4388e501fe03f8de595ebdfd70e65b 100644 --- a/rpc/v0/integration/v0_test.go +++ b/rpc/v0/integration/v0_test.go @@ -21,12 +21,18 @@ import ( "encoding/hex" "testing" + "context" + + "sync" + "github.com/hyperledger/burrow/account" "github.com/hyperledger/burrow/binary" + "github.com/hyperledger/burrow/consensus/tendermint" "github.com/hyperledger/burrow/execution/evm/abi" "github.com/hyperledger/burrow/rpc/v0" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/types" ) func TestTransactCallNoCode(t *testing.T) { @@ -52,33 +58,46 @@ func TestTransactCallNoCode(t *testing.T) { } func TestTransactCreate(t *testing.T) { + numGoroutines := 100 + numCreates := 10 + wg := new(sync.WaitGroup) + wg.Add(numGoroutines) cli := v0.NewV0Client("http://localhost:1337/rpc") - // Flip flops between sending private key and input address to test private key and address based signing privKey, inputAddress := privKeyInputAddressAlternator(privateAccounts[0]) - for i := 0; i < 1000; i++ { - bc, err := hex.DecodeString(strangeLoopBytecode) - require.NoError(t, err) - create, err := cli.Transact(v0.TransactParam{ - PrivKey: privKey(i), - InputAddress: inputAddress(i), - Address: nil, - Data: bc, - Fee: 2, - GasLimit: 10000, - }) - require.NoError(t, err) - assert.True(t, create.CreatesContract) + bc, err := hex.DecodeString(strangeLoopBytecode) + require.NoError(t, err) + countCh := committedTxCount(t) + for i := 0; i < numGoroutines; i++ { + go func() { + for j := 0; j < numCreates; j++ { + create, err := cli.Transact(v0.TransactParam{ + PrivKey: privKey(j), + InputAddress: inputAddress(j), + Address: nil, + Data: bc, + Fee: 2, + GasLimit: 10000, + }) + if assert.NoError(t, err) { + assert.True(t, create.CreatesContract) + } + } + wg.Done() + }() } + wg.Wait() + + assert.Equal(t, numGoroutines*numCreates, <-countCh) } func BenchmarkTransactCreateContract(b *testing.B) { cli := v0.NewV0Client("http://localhost:1337/rpc") privKey, inputAddress := privKeyInputAddressAlternator(privateAccounts[0]) + bc, err := hex.DecodeString(strangeLoopBytecode) + require.NoError(b, err) for i := 0; i < b.N; i++ { - bc, err := hex.DecodeString(strangeLoopBytecode) - require.NoError(b, err) create, err := cli.Transact(v0.TransactParam{ PrivKey: privKey(i), InputAddress: inputAddress(i), @@ -94,37 +113,41 @@ func BenchmarkTransactCreateContract(b *testing.B) { func TestTransactAndHold(t *testing.T) { cli := v0.NewV0Client("http://localhost:1337/rpc") - bc, err := hex.DecodeString(strangeLoopBytecode) require.NoError(t, err) - privKey, inputAddress := privKeyInputAddressAlternator(privateAccounts[0]) - for i := 0; i < 2; i++ { - create, err := cli.TransactAndHold(v0.TransactParam{ - PrivKey: privKey(i), - InputAddress: inputAddress(i), - Address: nil, - Data: bc, - Fee: 2, - GasLimit: 10000, - }) - require.NoError(t, err) - assert.Equal(t, 0, create.StackDepth) - functionID := abi.FunctionID("UpsieDownsie()") - call, err := cli.TransactAndHold(v0.TransactParam{ - PrivKey: privKey(i), - InputAddress: inputAddress(i), - Address: create.CallData.Callee.Bytes(), - Data: functionID[:], - Fee: 2, - GasLimit: 10000, - }) - require.NoError(t, err) - depth := binary.Uint64FromWord256(binary.LeftPadWord256(call.Return)) - // Would give 23 if taken from wrong frame - assert.Equal(t, 18, int(depth)) + numGoroutines := 5 + numRuns := 2 + countCh := committedTxCount(t) + for i := 0; i < numGoroutines; i++ { + for j := 0; j < numRuns; j++ { + create, err := cli.TransactAndHold(v0.TransactParam{ + PrivKey: privKey(j), + InputAddress: inputAddress(j), + Address: nil, + Data: bc, + Fee: 2, + GasLimit: 10000, + }) + require.NoError(t, err) + assert.Equal(t, 0, create.StackDepth) + functionID := abi.FunctionID("UpsieDownsie()") + call, err := cli.TransactAndHold(v0.TransactParam{ + PrivKey: privKey(j), + InputAddress: inputAddress(j), + Address: create.CallData.Callee.Bytes(), + Data: functionID[:], + Fee: 2, + GasLimit: 10000, + }) + require.NoError(t, err) + depth := binary.Uint64FromWord256(binary.LeftPadWord256(call.Return)) + // Would give 23 if taken from wrong frame + assert.Equal(t, 18, int(depth)) + } } + assert.Equal(t, numGoroutines*numRuns*2, <-countCh) } func TestSend(t *testing.T) { @@ -160,6 +183,34 @@ func TestSendAndHold(t *testing.T) { } } +func committedTxCount(t *testing.T) chan int { + var numTxs int64 + emptyBlocks := 0 + maxEmptyBlocks := 1 + outCh := make(chan int) + ch := make(chan *types.EventDataNewBlock) + ctx, cancel := context.WithCancel(context.Background()) + tendermint.SubscribeNewBlock(ctx, kernel.Emitter, "TestThings", ch) + + go func() { + for ed := range ch { + if ed.Block.NumTxs == 0 { + emptyBlocks++ + } else { + emptyBlocks = 0 + } + if emptyBlocks > maxEmptyBlocks { + break + } + numTxs += ed.Block.NumTxs + t.Logf("Total TXs committed at block %v: %v (+%v)\n", ed.Block.Height, numTxs, ed.Block.NumTxs) + } + outCh <- int(numTxs) + cancel() + }() + return outCh +} + // Returns a pair of functions that mutually exclusively return the private key bytes or input address bytes of a // private account in the same iteration of a loop indexed by an int func privKeyInputAddressAlternator(privateAccount account.PrivateAccount) (func(int) []byte, func(int) []byte) { diff --git a/rpc/v0/methods.go b/rpc/v0/methods.go index c17c7e61c69256746e6fbe66bafaa8318e22fec4..346b53148a780d307473f31c106f3a2ab796edf5 100644 --- a/rpc/v0/methods.go +++ b/rpc/v0/methods.go @@ -68,7 +68,6 @@ type RequestHandlerFunc func(request *rpc.RPCRequest, requester interface{}) (in func GetMethods(codec rpc.Codec, service *rpc.Service, logger *logging.Logger) map[string]RequestHandlerFunc { accountFilterFactory := filters.NewAccountFilterFactory() nameRegFilterFactory := filters.NewNameRegFilterFactory() - return map[string]RequestHandlerFunc{ // Accounts GET_ACCOUNTS: func(request *rpc.RPCRequest, requester interface{}) (interface{}, int, error) { @@ -310,7 +309,7 @@ func GetMethods(codec rpc.Codec, service *rpc.Service, logger *logging.Logger) m if err != nil { return nil, rpc.INVALID_PARAMS, err } - inputAccount, err := service.MempoolAccounts().SigningAccountFromPrivateKey(param.PrivKey) + inputAccount, err := signingAccount(service.MempoolAccounts(), param.PrivKey, param.InputAddress) if err != nil { return nil, rpc.INVALID_PARAMS, err } @@ -445,7 +444,7 @@ func GetMethods(codec rpc.Codec, service *rpc.Service, logger *logging.Logger) m } // Gets signing account from onr of private key or address - failing if both are provided -func signingAccount(accounts *execution.Accounts, privKey, addressBytes []byte) (*execution.SigningAccount, error) { +func signingAccount(accounts *execution.Accounts, privKey, addressBytes []byte) (*execution.SequentialSigningAccount, error) { if len(addressBytes) > 0 { if len(privKey) > 0 { return nil, fmt.Errorf("privKey and address provided but only one or the other should be given") @@ -454,8 +453,8 @@ func signingAccount(accounts *execution.Accounts, privKey, addressBytes []byte) if err != nil { return nil, err } - return accounts.SigningAccount(address) + return accounts.SequentialSigningAccount(address), nil } - return accounts.SigningAccountFromPrivateKey(privKey) + return accounts.SequentialSigningAccountFromPrivateKey(privKey) }