diff --git a/Godeps/_workspace/src/github.com/tendermint/tendermint/consensus/state.go b/Godeps/_workspace/src/github.com/tendermint/tendermint/consensus/state.go index 2f8c5e746880374abe125e4901e65c24bf0b5b98..7e253bb880453ec6789bf11707fdc9c6994bca7d 100644 --- a/Godeps/_workspace/src/github.com/tendermint/tendermint/consensus/state.go +++ b/Godeps/_workspace/src/github.com/tendermint/tendermint/consensus/state.go @@ -1308,7 +1308,7 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe cs.stagedState.Save() // Update mempool. - cs.mempoolReactor.ResetForBlockAndState(block, cs.stagedState) + cs.mempoolReactor.Mempool.ResetForBlockAndState(block, cs.stagedState) // Fire off event if cs.evsw != nil && cs.evc != nil { diff --git a/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool.go b/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool.go index 625b56010117b9dcc1783a339070884b1b2aeae3..2788ced7cee38b00d5484f10a54cf5df15be2540 100644 --- a/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool.go +++ b/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool.go @@ -1,143 +1,249 @@ -/* -Mempool receives new transactions and applies them to the latest committed state. -If the transaction is acceptable, then it broadcasts the tx to peers. - -When this node happens to be the next proposer, it simply uses the recently -modified state (and the associated transactions) to construct a proposal. -*/ - package mempool import ( + "container/list" "sync" + "sync/atomic" + "time" + + "github.com/tendermint/go-clist" sm "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/state" "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/types" ) +/* + +The mempool pushes new txs onto the proxyAppConn. +It gets a stream of (req, res) tuples from the proxy. +The memool stores good txs in a concurrent linked-list. + +Multiple concurrent go-routines can traverse this linked-list +safely by calling .NextWait() on each element. + +So we have several go-routines: +1. Consensus calling Update() and Reap() synchronously +2. Many mempool reactor's peer routines calling CheckTx() +3. Many mempool reactor's peer routines traversing the txs linked list +4. Another goroutine calling GarbageCollectTxs() periodically + +To manage these goroutines, there are three methods of locking. +1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) +2. Mutations to the linked-list elements are atomic +3. CheckTx() calls can be paused upon Update() and Reap(), protected by .mtx + +Garbage collection of old elements from mempool.txs is handlde via +the DetachPrev() call, which makes old elements not reachable by +peer broadcastTxRoutine() automatically garbage collected. + +TODO: Better handle tmsp client errors. (make it automatically handle connection errors) + +*/ + +const cacheSize = 100000 + type Mempool struct { mtx sync.Mutex state *sm.State cache *sm.BlockCache - txs []types.Tx // TODO: we need to add a map to facilitate replace-by-fee + + txs *clist.CList // concurrent linked-list of good txs + counter int64 // simple incrementing counter + height int // the last block Update()'d to + rechecking int32 // for re-checking filtered txs on Update() + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + + // Keep a cache of already-seen txs. + // This reduces the pressure on the proxyApp. + cacheMap map[string]struct{} + cacheList *list.List } func NewMempool(state *sm.State) *Mempool { - return &Mempool{ - state: state, - cache: sm.NewBlockCache(state), + mempool := &Mempool{ + state: state, + cache: sm.NewBlockCache(state), + txs: clist.New(), + counter: 0, + height: 0, + rechecking: 0, + recheckCursor: nil, + recheckEnd: nil, + + cacheMap: make(map[string]struct{}, cacheSize), + cacheList: list.New(), } + return mempool } -func (mem *Mempool) GetState() *sm.State { - return mem.state -} - -func (mem *Mempool) GetCache() *sm.BlockCache { - return mem.cache +// Return the first element of mem.txs for peer goroutines to call .NextWait() on. +// Blocks until txs has elements. +func (mem *Mempool) TxsFrontWait() *clist.CElement { + return mem.txs.FrontWait() } -func (mem *Mempool) GetHeight() int { - mem.mtx.Lock() - defer mem.mtx.Unlock() - return mem.state.LastBlockHeight +func (mem *Mempool) TxID(tx types.Tx) string { + return string(types.TxID(mem.state.ChainID, tx)) } -// Apply tx to the state and remember it. +// Try a new transaction in the mempool. +// Potentially blocking if we're blocking on Update() or Reap(). func (mem *Mempool) AddTx(tx types.Tx) (err error) { mem.mtx.Lock() defer mem.mtx.Unlock() + + // CACHE + if _, exists := mem.cacheMap[mem.TxID(tx)]; exists { + return nil + } + if mem.cacheList.Len() >= cacheSize { + popped := mem.cacheList.Front() + poppedTx := popped.Value.(types.Tx) + delete(mem.cacheMap, mem.TxID(poppedTx)) + mem.cacheList.Remove(popped) + } + mem.cacheMap[mem.TxID(tx)] = struct{}{} + mem.cacheList.PushBack(tx) + // END CACHE + err = sm.ExecTx(mem.cache, tx, false, nil) if err != nil { log.Info("AddTx() error", "tx", tx, "error", err) return err } else { log.Info("AddTx() success", "tx", tx) - mem.txs = append(mem.txs, tx) + mem.counter++ + memTx := &mempoolTx{ + counter: mem.counter, + height: int64(mem.height), + tx: tx, + } + mem.txs.PushBack(memTx) return nil } + return nil } -func (mem *Mempool) GetProposalTxs() []types.Tx { +func (mem *Mempool) GetState() *sm.State { + return mem.state +} + +func (mem *Mempool) GetCache() *sm.BlockCache { + return mem.cache +} + +func (mem *Mempool) GetHeight() int { mem.mtx.Lock() defer mem.mtx.Unlock() - log.Info("GetProposalTxs:", "txs", mem.txs) - return mem.txs + return mem.state.LastBlockHeight } -// We use this to inform peer routines of how the mempool has been updated -type ResetInfo struct { - Height int - Included []Range - Invalid []Range +// Get the valid transactions remaining +func (mem *Mempool) GetProposalTxs() []types.Tx { + mem.mtx.Lock() + defer mem.mtx.Unlock() + + for atomic.LoadInt32(&mem.rechecking) > 0 { + // TODO: Something better? + time.Sleep(time.Millisecond * 10) + } + + txs := mem.collectTxs() + return txs } -type Range struct { - Start int - Length int +func (mem *Mempool) collectTxs() []types.Tx { + txs := make([]types.Tx, 0, mem.txs.Len()) + for e := mem.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*mempoolTx) + txs = append(txs, memTx.tx) + } + return txs } -// "block" is the new block being committed. -// "state" is the result of state.AppendBlock("block"). -// Txs that are present in "block" are discarded from mempool. -// Txs that have become invalid in the new "state" are also discarded. -func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) ResetInfo { +// Tell mempool that these txs were committed. +// Mempool will discard these txs. +// NOTE: this should be called *after* block is committed by consensus. +func (mem *Mempool) ResetForBlockAndState(block *types.Block, state *sm.State) { mem.mtx.Lock() defer mem.mtx.Unlock() + mem.state = state.Copy() mem.cache = sm.NewBlockCache(mem.state) - // First, create a lookup map of txns in new block. - blockTxsMap := make(map[string]struct{}) + // First, create a lookup map of txns in new txs. + txsMap := make(map[string]struct{}) for _, tx := range block.Data.Txs { - blockTxsMap[string(types.TxID(state.ChainID, tx))] = struct{}{} + txsMap[mem.TxID(tx)] = struct{}{} } - // Now we filter all txs from mem.txs that are in blockTxsMap, - // and ExecTx on what remains. Only valid txs are kept. - // We track the ranges of txs included in the block and invalidated by it - // so we can tell peer routines - var ri = ResetInfo{Height: block.Height} - var validTxs []types.Tx - includedStart, invalidStart := -1, -1 - for i, tx := range mem.txs { - txID := types.TxID(state.ChainID, tx) - if _, ok := blockTxsMap[string(txID)]; ok { - startRange(&includedStart, i) // start counting included txs - endRange(&invalidStart, i, &ri.Invalid) // stop counting invalid txs - log.Info("Filter out, already committed", "tx", tx, "txID", txID) - } else { - endRange(&includedStart, i, &ri.Included) // stop counting included txs - err := sm.ExecTx(mem.cache, tx, false, nil) - if err != nil { - startRange(&invalidStart, i) // start counting invalid txs - log.Info("Filter out, no longer valid", "tx", tx, "error", err) - } else { - endRange(&invalidStart, i, &ri.Invalid) // stop counting invalid txs - log.Info("Filter in, new, valid", "tx", tx, "txID", txID) - validTxs = append(validTxs, tx) - } + // Set height + mem.height = block.Height + // Remove transactions that are already in txs. + goodTxs := mem.filterTxs(txsMap) + // Recheck mempool txs + // TODO: make optional + mem.recheckTxs(goodTxs) + + // At this point, mem.txs are being rechecked. + // mem.recheckCursor re-scans mem.txs and possibly removes some txs. + // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. +} + +func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx { + goodTxs := make([]types.Tx, 0, mem.txs.Len()) + for e := mem.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*mempoolTx) + if _, ok := blockTxsMap[mem.TxID(memTx.tx)]; ok { + // Remove the tx since already in block. + mem.txs.Remove(e) + e.DetachPrev() + continue } + // Good tx! + goodTxs = append(goodTxs, memTx.tx) } - endRange(&includedStart, len(mem.txs)-1, &ri.Included) // stop counting included txs - endRange(&invalidStart, len(mem.txs)-1, &ri.Invalid) // stop counting invalid txs - - // We're done! - log.Info("New txs", "txs", validTxs, "oldTxs", mem.txs) - mem.txs = validTxs - return ri + return goodTxs } -func startRange(start *int, i int) { - if *start < 0 { - *start = i +// NOTE: pass in goodTxs because mem.txs can mutate concurrently. +func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { + if len(goodTxs) == 0 { + return + } + atomic.StoreInt32(&mem.rechecking, 1) + mem.recheckCursor = mem.txs.Front() + mem.recheckEnd = mem.txs.Back() + + for _, tx := range goodTxs { + err := sm.ExecTx(mem.cache, tx, false, nil) + if err != nil { + // Tx became invalidated due to newly committed block. + mem.txs.Remove(mem.recheckCursor) + mem.recheckCursor.DetachPrev() + } + if mem.recheckCursor == mem.recheckEnd { + mem.recheckCursor = nil + } else { + mem.recheckCursor = mem.recheckCursor.Next() + } + if mem.recheckCursor == nil { + // Done! + atomic.StoreInt32(&mem.rechecking, 0) + } } } -func endRange(start *int, i int, ranger *[]Range) { - if *start >= 0 { - length := i - *start - *ranger = append(*ranger, Range{*start, length}) - *start = -1 - } +//-------------------------------------------------------------------------------- + +// A transaction that successfully ran +type mempoolTx struct { + counter int64 // a simple incrementing counter + height int64 // height that this tx had been validated in + tx types.Tx // +} + +func (memTx *mempoolTx) Height() int { + return int(atomic.LoadInt64(&memTx.height)) } diff --git a/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool_test.go b/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool_test.go index ab6e4e993dc27275b67a201fa6295c1a908986a2..1d93efc90ed3b3a37422ee647f239169f5c49366 100644 --- a/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool_test.go +++ b/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/mempool_test.go @@ -1,273 +1,113 @@ package mempool import ( - "fmt" + "encoding/binary" "sync" "testing" - "time" - acm "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/account" - _ "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/config/tendermint_test" - sm "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/state" - "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/proxy" + "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmsp/example/counter" + tmsp "github.com/tendermint/tmsp/types" ) -var someAddr = []byte("ABCDEFGHIJABCDEFGHIJ") +func TestSerialReap(t *testing.T) { + + app := counter.NewCounterApplication(true) + app.SetOption("serial", "on") + mtx := new(sync.Mutex) + appConnMem := proxy.NewLocalAppConn(mtx, app) + appConnCon := proxy.NewLocalAppConn(mtx, app) + mempool := NewMempool(appConnMem) + + appendTxsRange := func(start, end int) { + // Append some txs. + for i := start; i < end; i++ { + + // This will succeed + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + err := mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + + // This will fail because not serial (incrementing) + // However, error should still be nil. + // It just won't show up on Reap(). + err = mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } -// number of txs -var nTxs = 100 - -// what the ResetInfo should look like after ResetForBlockAndState -var TestResetInfoData = ResetInfo{ - Included: []Range{ - Range{0, 5}, - Range{10, 10}, - Range{30, 5}, - }, - Invalid: []Range{ - Range{5, 5}, - Range{20, 8}, // let 28 and 29 be valid - Range{35, 64}, // let 99 be valid - }, -} - -// inverse of the ResetInfo -var notInvalidNotIncluded = map[int]struct{}{ - 28: struct{}{}, - 29: struct{}{}, - 99: struct{}{}, -} - -func newSendTx(t *testing.T, mempool *Mempool, from *acm.PrivAccount, to []byte, amt int64) types.Tx { - tx := types.NewSendTx() - tx.AddInput(mempool.GetCache(), from.PubKey, amt) - tx.AddOutput(to, amt) - tx.SignInput(config.GetString("chain_id"), 0, from) - if err := mempool.AddTx(tx); err != nil { - t.Fatal(err) - } - return tx -} - -func addTxs(t *testing.T, mempool *Mempool, lastAcc *acm.PrivAccount, privAccs []*acm.PrivAccount) []types.Tx { - txs := make([]types.Tx, nTxs) - for i := 0; i < nTxs; i++ { - if _, ok := notInvalidNotIncluded[i]; ok { - txs[i] = newSendTx(t, mempool, lastAcc, someAddr, 10) - } else { - txs[i] = newSendTx(t, mempool, privAccs[i%len(privAccs)], privAccs[(i+1)%len(privAccs)].Address, 5) } } - return txs -} - -func makeBlock(mempool *Mempool) *types.Block { - txs := mempool.GetProposalTxs() - var includedTxs []types.Tx - for _, rid := range TestResetInfoData.Included { - includedTxs = append(includedTxs, txs[rid.Start:rid.Start+rid.Length]...) - } - - mempool.mtx.Lock() - state := mempool.state - state.LastBlockHeight += 1 - mempool.mtx.Unlock() - return &types.Block{ - Header: &types.Header{ - ChainID: state.ChainID, - Height: state.LastBlockHeight, - NumTxs: len(includedTxs), - }, - Data: &types.Data{ - Txs: includedTxs, - }, - } -} -// Add txs. Grab chunks to put in block. All the others become invalid because of nonce errors except those in notInvalidNotIncluded -func TestResetInfo(t *testing.T) { - amtPerAccount := int64(100000) - state, privAccs, _ := sm.RandGenesisState(6, false, amtPerAccount, 1, true, 100) - - mempool := NewMempool(state) - - lastAcc := privAccs[5] // we save him (his tx wont become invalid) - privAccs = privAccs[:5] - - txs := addTxs(t, mempool, lastAcc, privAccs) - - // its actually an invalid block since we're skipping nonces - // but all we care about is how the mempool responds after - block := makeBlock(mempool) - - ri := mempool.ResetForBlockAndState(block, state) - - if len(ri.Included) != len(TestResetInfoData.Included) { - t.Fatalf("invalid number of included ranges. Got %d, expected %d\n", len(ri.Included), len(TestResetInfoData.Included)) + reapCheck := func(exp int) { + txs := mempool.Reap() + if len(txs) != exp { + t.Fatalf("Expected to reap %v txs but got %v", exp, len(txs)) + } } - if len(ri.Invalid) != len(TestResetInfoData.Invalid) { - t.Fatalf("invalid number of invalid ranges. Got %d, expected %d\n", len(ri.Invalid), len(TestResetInfoData.Invalid)) + updateRange := func(start, end int) { + txs := make([]types.Tx, 0) + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + txs = append(txs, txBytes) + } + mempool.Update(0, txs) } - for i, rid := range ri.Included { - inc := TestResetInfoData.Included[i] - if rid.Start != inc.Start { - t.Fatalf("Invalid start of range. Got %d, expected %d\n", inc.Start, rid.Start) + commitRange := func(start, end int) { + // Append some txs. + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + code, result, logStr := appConnCon.AppendTx(txBytes) + if code != tmsp.CodeType_OK { + t.Errorf("Error committing tx. Code:%v result:%X log:%v", + code, result, logStr) + } } - if rid.Length != inc.Length { - t.Fatalf("Invalid length of range. Got %d, expected %d\n", inc.Length, rid.Length) + hash, log := appConnCon.Commit() + if len(hash) != 8 { + t.Errorf("Error committing. Hash:%X log:%v", hash, log) } } - txs = mempool.GetProposalTxs() - if len(txs) != len(notInvalidNotIncluded) { - t.Fatalf("Expected %d txs left in mempool. Got %d", len(notInvalidNotIncluded), len(txs)) - } -} + //---------------------------------------- -//------------------------------------------------------------------------------------------ + // Append some txs. + appendTxsRange(0, 100) -type TestPeer struct { - sync.Mutex - running bool - height int + // Reap the txs. + reapCheck(100) - t *testing.T + // Reap again. We should get the same amount + reapCheck(100) - received int - txs map[string]int + // Append 0 to 999, we should reap 900 new txs + // because 100 were already counted. + appendTxsRange(0, 1000) - timeoutFail int + // Reap the txs. + reapCheck(1000) - done chan int -} + // Reap again. We should get the same amount + reapCheck(1000) -func newPeer(t *testing.T, state *sm.State) *TestPeer { - return &TestPeer{ - running: true, - height: state.LastBlockHeight, - t: t, - txs: make(map[string]int), - done: make(chan int), - } -} + // Commit from the conensus AppConn + commitRange(0, 500) + updateRange(0, 500) -func (tp *TestPeer) IsRunning() bool { - tp.Lock() - defer tp.Unlock() - return tp.running -} - -func (tp *TestPeer) SetRunning(running bool) { - tp.Lock() - defer tp.Unlock() - tp.running = running -} - -func (tp *TestPeer) Send(chID byte, msg interface{}) bool { - if tp.timeoutFail > 0 { - time.Sleep(time.Second * time.Duration(tp.timeoutFail)) - return false - } - tx := msg.(*TxMessage).Tx - id := types.TxID(config.GetString("chain_id"), tx) - if _, ok := tp.txs[string(id)]; ok { - tp.t.Fatal("received the same tx twice!") - } - tp.txs[string(id)] = tp.received - tp.received += 1 - tp.done <- tp.received - return true -} - -func (tp *TestPeer) Get(key string) interface{} { - return tp -} - -func (tp *TestPeer) GetHeight() int { - return tp.height -} - -func TestBroadcast(t *testing.T) { - state, privAccs, _ := sm.RandGenesisState(6, false, 10000, 1, true, 100) - mempool := NewMempool(state) - reactor := NewMempoolReactor(mempool) - reactor.Start() - - lastAcc := privAccs[5] // we save him (his tx wont become invalid) - privAccs = privAccs[:5] + // We should have 500 left. + reapCheck(500) - peer := newPeer(t, state) - newBlockChan := make(chan ResetInfo) - tickerChan := make(chan time.Time) - go reactor.broadcastTxRoutine(tickerChan, newBlockChan, peer) - - // we don't broadcast any before updating - fmt.Println("dont broadcast any") - addTxs(t, mempool, lastAcc, privAccs) - block := makeBlock(mempool) - ri := mempool.ResetForBlockAndState(block, state) - newBlockChan <- ri - peer.height = ri.Height - tickerChan <- time.Now() - pullTxs(t, peer, len(mempool.txs)) // should have sent whatever txs are left (3) - - toBroadcast := []int{1, 3, 7, 9, 11, 12, 18, 20, 21, 28, 29, 30, 31, 34, 35, 36, 50, 90, 99, 100} - for _, N := range toBroadcast { - peer = resetPeer(t, reactor, mempool, state, tickerChan, newBlockChan, peer) - - // we broadcast N txs before updating - fmt.Println("broadcast", N) - addTxs(t, mempool, lastAcc, privAccs) - txsToSendPerCheck = N - tickerChan <- time.Now() - pullTxs(t, peer, txsToSendPerCheck) // should have sent N txs - block = makeBlock(mempool) - ri := mempool.ResetForBlockAndState(block, state) - newBlockChan <- ri - peer.height = ri.Height - txsToSendPerCheck = 100 - tickerChan <- time.Now() - left := len(mempool.txs) - if N > 99 { - left -= 3 - } else if N > 29 { - left -= 2 - } else if N > 28 { - left -= 1 - } - pullTxs(t, peer, left) // should have sent whatever txs are left that havent been sent - } -} - -func pullTxs(t *testing.T, peer *TestPeer, N int) { - timer := time.NewTicker(time.Second * 2) - for i := 0; i < N; i++ { - select { - case <-peer.done: - case <-timer.C: - panic(fmt.Sprintf("invalid number of received messages. Got %d, expected %d\n", i, N)) - } - } - - if N == 0 { - select { - case <-peer.done: - t.Fatalf("should not have sent any more txs") - case <-timer.C: - } - } -} + // Append 100 invalid txs and 100 valid txs + appendTxsRange(900, 1100) -func resetPeer(t *testing.T, reactor *MempoolReactor, mempool *Mempool, state *sm.State, tickerChan chan time.Time, newBlockChan chan ResetInfo, peer *TestPeer) *TestPeer { - // reset peer - mempool.txs = []types.Tx{} - mempool.state = state - mempool.cache = sm.NewBlockCache(state) - peer.SetRunning(false) - tickerChan <- time.Now() - peer = newPeer(t, state) - go reactor.broadcastTxRoutine(tickerChan, newBlockChan, peer) - return peer + // We should have 600 now. + reapCheck(600) } diff --git a/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/reactor.go b/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/reactor.go index 5e546d3418a1a9e75f7678b62f33fcd25ba1d81a..a494955684dc0911ca8c481d11ba186a8cc3dedf 100644 --- a/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/reactor.go +++ b/Godeps/_workspace/src/github.com/tendermint/tendermint/mempool/reactor.go @@ -2,7 +2,6 @@ package mempool import ( "bytes" - "errors" "fmt" "reflect" "time" @@ -10,26 +9,24 @@ import ( . "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/common" "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/events" "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/p2p" - sm "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/state" "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/types" "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/tendermint/tendermint/wire" + + "github.com/tendermint/go-clist" ) -var ( +const ( MempoolChannel = byte(0x30) - checkExecutedTxsMilliseconds = 1 // check for new mempool txs to send to peer - txsToSendPerCheck = 64 // send up to this many txs from the mempool per check - newBlockChCapacity = 100 // queue to process this many ResetInfos per peer + maxMempoolMessageSize = 1048576 // 1MB TODO make it configurable + peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount ) // MempoolReactor handles mempool tx broadcasting amongst peers. type MempoolReactor struct { p2p.BaseReactor - - Mempool *Mempool - - evsw events.Fireable + Mempool *Mempool // TODO: un-expose + evsw events.Fireable } func NewMempoolReactor(mempool *Mempool) *MempoolReactor { @@ -52,11 +49,7 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { // Implements Reactor func (memR *MempoolReactor) AddPeer(peer *p2p.Peer) { - // Each peer gets a go routine on which we broadcast transactions in the same order we applied them to our state. - newBlockChan := make(chan ResetInfo, newBlockChCapacity) - peer.Data.Set(types.PeerMempoolChKey, newBlockChan) - timer := time.NewTicker(time.Millisecond * time.Duration(checkExecutedTxsMilliseconds)) - go memR.broadcastTxRoutine(timer.C, newBlockChan, peer) + go memR.broadcastTxRoutine(peer) } // Implements Reactor @@ -71,7 +64,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { log.Warn("Error decoding message", "error", err) return } - log.Notice("MempoolReactor received message", "msg", msg) + log.Info("Receive", "src", src, "chId", chID, "msg", msg) switch msg := msg.(type) { case *TxMessage: @@ -89,27 +82,6 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { } } -// "block" is the new block being committed. -// "state" is the result of state.AppendBlock("block"). -// Txs that are present in "block" are discarded from mempool. -// Txs that have become invalid in the new "state" are also discarded. -func (memR *MempoolReactor) ResetForBlockAndState(block *types.Block, state *sm.State) { - ri := memR.Mempool.ResetForBlockAndState(block, state) - for _, peer := range memR.Switch.Peers().List() { - peerMempoolChI := peer.Data.Get(types.PeerMempoolChKey) - peerMempoolCh, ok := peerMempoolChI.(chan ResetInfo) - if !ok { - // peer created before reset info established? - continue - } - select { - case peerMempoolCh <- ri: - default: - memR.Switch.StopPeerForError(peer, errors.New("Peer's mempool push channel full")) - } - } -} - // Just an alias for AddTx since broadcasting happens in peer routines func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error { return memR.Mempool.AddTx(tx) @@ -125,91 +97,42 @@ type Peer interface { Get(string) interface{} } -// send new mempool txs to peer, strictly in order we applied them to our state. -// new blocks take chunks out of the mempool, but we've already sent some txs to the peer. -// so we wait to hear that the peer has progressed to the new height, and then continue sending txs from where we left off -func (memR *MempoolReactor) broadcastTxRoutine(tickerChan <-chan time.Time, newBlockChan chan ResetInfo, peer Peer) { - var height = memR.Mempool.GetHeight() - var txsSent int // new txs sent for height. (reset every new height) - +// Send new mempool txs to peer. +// TODO: Handle mempool or reactor shutdown? +// As is this routine may block forever if no new txs come in. +func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { + var next *clist.CElement for { - select { - case <-tickerChan: - if !peer.IsRunning() { - return - } - - // make sure the peer is up to date - if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { - peerState := peerState_i.(PeerState) - if peerState.GetHeight() < height { - continue - } - } else { + if !memR.IsRunning() { + return // Quit! + } + if next == nil { + // This happens because the CElement we were looking at got + // garbage collected (removed). That is, .NextWait() returned nil. + // Go ahead and start from the beginning. + next = memR.Mempool.TxsFrontWait() // Wait until a tx is available + } + memTx := next.Value.(*mempoolTx) + // make sure the peer is up to date + height := memTx.Height() + if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { + peerState := peerState_i.(PeerState) + if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) continue } - - // check the mempool for new transactions - newTxs := memR.getNewTxs(height) - txsSentLoop := 0 - start := time.Now() - - TX_LOOP: - for i := txsSent; i < len(newTxs) && txsSentLoop < txsToSendPerCheck; i++ { - tx := newTxs[i] - msg := &TxMessage{Tx: tx} - success := peer.Send(MempoolChannel, msg) - if !success { - break TX_LOOP - } else { - txsSentLoop += 1 - } - } - - if txsSentLoop > 0 { - txsSent += txsSentLoop - log.Info("Sent txs to peer", "txsSentLoop", txsSentLoop, - "took", time.Since(start), "txsSent", txsSent, "newTxs", len(newTxs)) - } - - case ri := <-newBlockChan: - height = ri.Height - - // find out how many txs below what we've sent were included in a block and how many became invalid - included := tallyRangesUpTo(ri.Included, txsSent) - invalidated := tallyRangesUpTo(ri.Invalid, txsSent) - - txsSent -= included + invalidated - } - } -} - -// fetch new txs from the mempool -func (memR *MempoolReactor) getNewTxs(height int) (txs []types.Tx) { - memR.Mempool.mtx.Lock() - defer memR.Mempool.mtx.Unlock() - - // if the mempool got ahead of us just return empty txs - if memR.Mempool.state.LastBlockHeight != height { - return - } - return memR.Mempool.txs -} - -// return the size of ranges less than upTo -func tallyRangesUpTo(ranger []Range, upTo int) int { - totalUpTo := 0 - for _, r := range ranger { - if r.Start >= upTo { - break } - if r.Start+r.Length >= upTo { - totalUpTo += upTo - r.Start - break + // send memTx + msg := &TxMessage{Tx: memTx.tx} + success := peer.Send(MempoolChannel, struct{ MempoolMessage }{msg}) + if !success { + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue } - totalUpTo += r.Length + + next = next.NextWait() + continue } - return totalUpTo } // implements events.Eventable diff --git a/cmd/erisdb/main.go b/cmd/erisdb/main.go index 350cca81a33ad7a68eb2f93f9d5f89983f2523c7..11eb26e080bf931197658da87e2a76d0d2275efa 100644 --- a/cmd/erisdb/main.go +++ b/cmd/erisdb/main.go @@ -2,9 +2,11 @@ package main import ( "fmt" - _ "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/Sirupsen/logrus" // hack cuz godeps :( edb "github.com/eris-ltd/eris-db/erisdb" "os" + + _ "github.com/eris-ltd/eris-db/Godeps/_workspace/src/github.com/Sirupsen/logrus" // hack cuz godeps :( + _ "github.com/tendermint/go-clist" // godeps ... ) // TODO the input stuff.