diff --git a/.circleci/config.yml b/.circleci/config.yml index f09a13f05a04ca6fb8512662d9bb9d05ed95d3b6..3de942fc59b4470c9e5d70eba903c1919a09ad69 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ defaults: &defaults working_directory: /go/src/github.com/hyperledger/burrow docker: - - image: circleci/golang:1.9.3 + - image: circleci/golang:1.10.1 tag_filters: &tags_filters tags: diff --git a/consensus/tendermint/abci/app.go b/consensus/tendermint/abci/app.go index 4585470ae87fbf819bd7ddda28faa19cf1c31766..99320fb3b9b558d1d1a845bb507e37882b7eb93e 100644 --- a/consensus/tendermint/abci/app.go +++ b/consensus/tendermint/abci/app.go @@ -171,6 +171,10 @@ func (app *abciApp) Commit() abci_types.ResponseCommit { // Commit state before resetting check cache so that the emptied cache servicing some RPC requests will fall through // to committed state when check state is reset + // TODO: determine why the ordering of updates does not experience invalid sequence number during recheck. It + // seems there is nothing to stop a Transact transaction from querying the checker cache before it has been replayed + // all transactions and so would formulate a transaction with the same sequence number as one in mempool. + // However this is not observed in v0_tests.go - we need to understand why or create a test that exposes this appHash, err := app.committer.Commit() if err != nil { return abci_types.ResponseCommit{ diff --git a/execution/accounts.go b/execution/accounts.go index dcc4cb9e0db88915e912f4b6d26e2e9880b5686e..eaa01a188060befb672b6d8f542b27e59abd7f42 100644 --- a/execution/accounts.go +++ b/execution/accounts.go @@ -28,7 +28,8 @@ type SequentialSigningAccount struct { func NewAccounts(reader state.Reader, keyClient keys.KeyClient, mutexCount int) *Accounts { return &Accounts{ - RingMutex: *burrow_sync.NewRingMutex(mutexCount), + // TODO: use the no hash variant of RingMutex after it has a test + RingMutex: *burrow_sync.NewRingMutexXXHash(mutexCount), Reader: reader, keyClient: keyClient, } @@ -86,5 +87,9 @@ type UnlockFunc func() func (ssa *SequentialSigningAccount) Lock() (*SigningAccount, UnlockFunc, error) { ssa.accountLocker.Lock() account, err := ssa.getter() + if err != nil { + ssa.accountLocker.Unlock() + return nil, nil, err + } return account, ssa.accountLocker.Unlock, err } diff --git a/execution/transactor.go b/execution/transactor.go index 62249eaec4be56f5afb79e92dd82e474a8361d4a..9ffc547760af366f9ae4222592ac5af44c226ed8 100644 --- a/execution/transactor.go +++ b/execution/transactor.go @@ -168,6 +168,9 @@ func (trans *Transactor) Transact(sequentialSigningAccount *SequentialSigningAcc // subsequent Transacts from the same input account block on those ahead of it we are able to stream transactions // continuously with sequential sequence numbers. By taking this lock we ensure this. inputAccount, unlock, err := sequentialSigningAccount.Lock() + if err != nil { + return nil, err + } defer unlock() txInput := &txs.TxInput{ @@ -236,6 +239,9 @@ func (trans *Transactor) Send(sequentialSigningAccount *SequentialSigningAccount tx := txs.NewSendTx() inputAccount, unlock, err := sequentialSigningAccount.Lock() + if err != nil { + return nil, err + } defer unlock() txInput := &txs.TxInput{ Address: inputAccount.Address(), @@ -294,6 +300,9 @@ func (trans *Transactor) TransactNameReg(sequentialSigningAccount *SequentialSig fee uint64) (*txs.Receipt, error) { inputAccount, unlock, err := sequentialSigningAccount.Lock() + if err != nil { + return nil, err + } defer unlock() // Formulate and sign tx := txs.NewNameTxWithSequence(inputAccount.PublicKey(), name, data, amount, fee, inputAccount.Sequence()+1) diff --git a/sync/ring_mutex.go b/sync/ring_mutex.go index 01abecd97c2baa20fda354eca80934c48d870c4d..50240956eda7e38e5f9eca423d0c5a5a5982f5a0 100644 --- a/sync/ring_mutex.go +++ b/sync/ring_mutex.go @@ -3,12 +3,16 @@ package sync import ( "sync" + "hash" + + "encoding/binary" + "github.com/OneOfOne/xxhash" ) type RingMutex struct { mtxs []sync.RWMutex - hasherPool sync.Pool + hash func(address []byte) uint64 mutexCount uint64 } @@ -18,17 +22,44 @@ type RingMutex struct { // hash function // modulo size. If some addresses collide modulo size they will be unnecessary // contention between those addresses, but you can trade space against contention // as desired. -func NewRingMutex(mutexCount int) *RingMutex { - return &RingMutex{ +func NewRingMutex(mutexCount int, hashMaker func() hash.Hash64) *RingMutex { + ringMutex := &RingMutex{ + mutexCount: uint64(mutexCount), // max slice length is bounded by max(int) thus the argument type mtxs: make([]sync.RWMutex, mutexCount, mutexCount), - hasherPool: sync.Pool{ + hash: func(address []byte) uint64 { + buf := make([]byte, 8) + copy(buf, address) + return binary.LittleEndian.Uint64(buf) + }, + } + if hashMaker != nil { + hasherPool := &sync.Pool{ New: func() interface{} { - return xxhash.New64() + return hashMaker() }, - }, - mutexCount: uint64(mutexCount), + } + ringMutex.hash = func(address []byte) uint64 { + h := hasherPool.Get().(hash.Hash64) + defer func() { + h.Reset() + hasherPool.Put(h) + }() + h.Write(address) + return h.Sum64() + } } + return ringMutex +} + +func NewRingMutexNoHash(mutexCount int) *RingMutex { + return NewRingMutex(mutexCount, nil) +} + +func NewRingMutexXXHash(mutexCount int) *RingMutex { + return NewRingMutex(mutexCount, func() hash.Hash64 { + return xxhash.New64() + }) } func (mtx *RingMutex) Lock(address []byte) { @@ -59,13 +90,3 @@ func (mtx *RingMutex) Mutex(address []byte) *sync.RWMutex { func (mtx *RingMutex) index(address []byte) uint64 { return mtx.hash(address) % mtx.mutexCount } - -func (mtx *RingMutex) hash(address []byte) uint64 { - h := mtx.hasherPool.Get().(*xxhash.XXHash64) - defer func() { - h.Reset() - mtx.hasherPool.Put(h) - }() - h.Write(address) - return h.Sum64() -} diff --git a/sync/ring_mutex_test.go b/sync/ring_mutex_test.go index 7eea39b02fa8bd76ea6d9c6a73efc5c63fa36b6a..b6d8f40dc7c180d84a31b63556325653897c67cb 100644 --- a/sync/ring_mutex_test.go +++ b/sync/ring_mutex_test.go @@ -9,69 +9,72 @@ import ( "github.com/stretchr/testify/assert" ) -func TestRingMutex_Lock(t *testing.T) { - // Using fewer mutexes than addresses to lock against should cause contention +func TestRingMutexXXHash_Lock(t *testing.T) { mutexCount := 10 numAddresses := byte(20) - mtx := NewRingMutex(mutexCount) - writeCh := make(chan []byte) - checksum := 0 + mtxs := []*RingMutex{NewRingMutexXXHash(mutexCount)} - // We'll try to acquire a locks on all of our unique addresses, knowing that - // some of them will share an underlying RWMutex - for i := byte(0); i < numAddresses; i++ { - address := []byte{i} - go func() { - mtx.Lock(address) - writeCh <- address - }() - } + for _, mtx := range mtxs { + // Using fewer mutexes than addresses to lock against should cause contention + writeCh := make(chan []byte) + checksum := 0 + + // We'll try to acquire a locks on all of our unique addresses, knowing that + // some of them will share an underlying RWMutex + for i := byte(0); i < numAddresses; i++ { + address := []byte{i} + go func() { + mtx.Lock(address) + writeCh <- address + }() + } - // We should receive a message from all of those addresses for which we could - // acquire a lock, this should be almost surely deterministic since we are - // launching our goroutines sequentially from a single goroutine (if this bit - // breaks we can add a short pause between the 'go' statements above, for the - // purposes of the predictability of this test) - addresses := receiveAddresses(writeCh) - checksum += len(addresses) - // we hit lock contention on the tenth address so get 9 back - assert.Equal(t, 9, len(addresses)) - // Unlock the 9 locked mutexes - unlockAddresses(mtx, addresses) + // We should receive a message from all of those addresses for which we could + // acquire a lock, this should be almost surely deterministic since we are + // launching our goroutines sequentially from a single goroutine (if this bit + // breaks we can add a short pause between the 'go' statements above, for the + // purposes of the predictability of this test) + addresses := receiveAddresses(writeCh) + checksum += len(addresses) + // we hit lock contention on the tenth address so get 9 back + assert.Equal(t, 9, len(addresses)) + // Unlock the 9 locked mutexes + unlockAddresses(mtx, addresses) - // Which should trigger another batch to make progress - addresses = receiveAddresses(writeCh) - checksum += len(addresses) - // Again the number we get back (but not the order) should be deterministic - // because we are unlocking sequentially from a single goroutine - assert.Equal(t, 7, len(addresses)) - unlockAddresses(mtx, addresses) + // Which should trigger another batch to make progress + addresses = receiveAddresses(writeCh) + checksum += len(addresses) + // Again the number we get back (but not the order) should be deterministic + // because we are unlocking sequentially from a single goroutine + assert.Equal(t, 7, len(addresses)) + unlockAddresses(mtx, addresses) - // And again - addresses = receiveAddresses(writeCh) - checksum += len(addresses) - assert.Equal(t, 3, len(addresses)) - unlockAddresses(mtx, addresses) + // And again + addresses = receiveAddresses(writeCh) + checksum += len(addresses) + assert.Equal(t, 3, len(addresses)) + unlockAddresses(mtx, addresses) - // And so on - addresses = receiveAddresses(writeCh) - checksum += len(addresses) - assert.Equal(t, 1, len(addresses)) - unlockAddresses(mtx, addresses) + // And so on + addresses = receiveAddresses(writeCh) + checksum += len(addresses) + assert.Equal(t, 1, len(addresses)) + unlockAddresses(mtx, addresses) - // Until we have unblocked all of the goroutines we released - addresses = receiveAddresses(writeCh) - checksum += len(addresses) - assert.Equal(t, 0, len(addresses)) - unlockAddresses(mtx, addresses) - checksum += len(addresses) + // Until we have unblocked all of the goroutines we released + addresses = receiveAddresses(writeCh) + checksum += len(addresses) + assert.Equal(t, 0, len(addresses)) + unlockAddresses(mtx, addresses) + checksum += len(addresses) - // Check we've heard back from all of them - assert.EqualValues(t, numAddresses, checksum) + // Check we've heard back from all of them + assert.EqualValues(t, numAddresses, checksum) + } } -func TestRingMutex_hash(t *testing.T) { - mtx := NewRingMutex(10) +func TestRingMutex_XXHash(t *testing.T) { + mtx := NewRingMutexXXHash(10) address, err := base64.StdEncoding.DecodeString("/+ulTkCzpYg2ePaZtqS8dycJBLY9387yZPst8LX5YL0=") assert.NoError(t, err) assert.EqualValues(t, 8509033946529530334, mtx.hash(address)) diff --git a/txs/unbond_tx.go b/txs/unbond_tx.go index 8bd84f2920dbe11e940b7c22109534c6d407016f..786530295ff2596aea475ea8b534953fa53d0986 100644 --- a/txs/unbond_tx.go +++ b/txs/unbond_tx.go @@ -26,7 +26,7 @@ func NewUnbondTx(addr acm.Address, height int) *UnbondTx { func (tx *UnbondTx) Sign(chainID string, signingAccounts ...acm.AddressableSigner) error { if len(signingAccounts) != 1 { - return fmt.Errorf("UnbondTx expects a single AddressableSigner for its signature but %v were provieded", + return fmt.Errorf("UnbondTx expects a single AddressableSigner for its signature but %v were provided", len(signingAccounts)) } var err error