diff --git a/common/db.go b/common/db.go index 14eed1a188..13378a0f31 100644 --- a/common/db.go +++ b/common/db.go @@ -303,7 +303,7 @@ func InitDB(dbPath string) (*DB, error) { d.insertWinningTicket = stmt // Select earliest ticket - stmt, err = db.Prepare("SELECT sender, recipient, faceValue, winProb, senderNonce, recipientRand, recipientRandHash, sig, creationRound, creationRoundBlockHash, paramsExpirationBlock FROM ticketQueue WHERE sender=? AND redeemedAt IS NULL AND txHash IS NULL ORDER BY createdAt ASC LIMIT 1") + stmt, err = db.Prepare("SELECT sender, recipient, faceValue, winProb, senderNonce, recipientRand, recipientRandHash, sig, creationRound, creationRoundBlockHash, paramsExpirationBlock FROM ticketQueue WHERE sender=? AND creationRound >= ? AND redeemedAt IS NULL AND txHash IS NULL ORDER BY createdAt ASC LIMIT 1") if err != nil { glog.Error("Unable to prepare selectEarliestWinningTicket ", err) d.Close() @@ -311,7 +311,7 @@ func InitDB(dbPath string) (*DB, error) { } d.selectEarliestWinningTicket = stmt - stmt, err = db.Prepare("SELECT count(sig) FROM ticketQueue WHERE sender=? AND redeemedAt IS NULL AND txHash IS NULL") + stmt, err = db.Prepare("SELECT count(sig) FROM ticketQueue WHERE sender=? AND creationRound >= ? AND redeemedAt IS NULL AND txHash IS NULL") if err != nil { glog.Error("Unable to prepare winningTicketCount ", err) d.Close() @@ -746,10 +746,10 @@ func (db *DB) RemoveWinningTicket(ticket *pm.SignedTicket) error { return nil } -// SelectEarliestWinningTicket selects the earliest stored winning ticket for a 'sender' -// which is not yet redeemed -func (db *DB) SelectEarliestWinningTicket(sender ethcommon.Address) (*pm.SignedTicket, error) { - row := db.selectEarliestWinningTicket.QueryRow(sender.Hex()) +// SelectEarliestWinningTicket selects the earliest stored winning ticket for a 'sender' that is not expired and not yet redeemed +func (db *DB) SelectEarliestWinningTicket(sender ethcommon.Address, minCreationRound int64) (*pm.SignedTicket, error) { + + row := db.selectEarliestWinningTicket.QueryRow(sender.Hex(), minCreationRound) var ( senderString string recipient string @@ -789,8 +789,8 @@ func (db *DB) SelectEarliestWinningTicket(sender ethcommon.Address) (*pm.SignedT } // WinningTicketCount returns the amount of non-redeemed winning tickets for a 'sender' -func (db *DB) WinningTicketCount(sender ethcommon.Address) (int, error) { - row := db.winningTicketCount.QueryRow(sender.Hex()) +func (db *DB) WinningTicketCount(sender ethcommon.Address, minCreationRound int64) (int, error) { + row := db.winningTicketCount.QueryRow(sender.Hex(), minCreationRound) var count64 int64 if err := row.Scan(&count64); err != nil { if err.Error() != "sql: no rows in result set" { diff --git a/common/db_test.go b/common/db_test.go index d629cee1e9..945356ad8f 100644 --- a/common/db_test.go +++ b/common/db_test.go @@ -750,7 +750,7 @@ func TestWinningTicketCount(t *testing.T) { _, ticket, sig, recipientRand := defaultWinningTicket(t) ticket.Sender = sender - count, err := dbh.WinningTicketCount(sender) + count, err := dbh.WinningTicketCount(sender, ticket.CreationRound) assert.Nil(err) assert.Equal(count, 0) @@ -770,7 +770,7 @@ func TestWinningTicketCount(t *testing.T) { }) require.Nil(err) - count, err = dbh.WinningTicketCount(sender) + count, err = dbh.WinningTicketCount(sender, ticket.CreationRound) assert.Nil(err) assert.Equal(count, 1) @@ -789,9 +789,14 @@ func TestWinningTicketCount(t *testing.T) { }, pm.RandHash()) require.Nil(err) - count, err = dbh.WinningTicketCount(sender) + count, err = dbh.WinningTicketCount(sender, ticket.CreationRound) assert.Nil(err) assert.Equal(count, 1) + + // all tickets are expirted, should return 0 + count, err = dbh.WinningTicketCount(sender, ticket.CreationRound+100) + assert.Nil(err) + assert.Equal(count, 0) } func TestInsertWinningTicket_GivenValidInputs_InsertsOneRowCorrectly(t *testing.T) { @@ -947,6 +952,8 @@ func TestSelectEarliestWinningTicket(t *testing.T) { _, ticket, sig, recipientRand := defaultWinningTicket(t) ticket.Sender = ethcommon.HexToAddress("charizard") + defaultCreationRound := ticket.CreationRound + signedTicket0 := &pm.SignedTicket{ Ticket: ticket, Sig: sig, @@ -964,16 +971,21 @@ func TestSelectEarliestWinningTicket(t *testing.T) { require.Nil(err) // no tickets found - earliest, err := dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard")) + earliest, err := dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard"), defaultCreationRound) assert.Nil(err) assert.Nil(earliest) err = dbh.StoreWinningTicket(signedTicket0) require.Nil(err) - earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard")) + earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard"), defaultCreationRound) assert.Nil(err) assert.Equal(signedTicket0, earliest) + // test ticket expired + earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard"), defaultCreationRound+100) + assert.Nil(err) + assert.Nil(earliest) + _, ticket, sig, recipientRand = defaultWinningTicket(t) ticket.Sender = ethcommon.HexToAddress("charizard") signedTicket2 := &pm.SignedTicket{ @@ -981,19 +993,26 @@ func TestSelectEarliestWinningTicket(t *testing.T) { Sig: pm.RandBytes(32), RecipientRand: new(big.Int).SetBytes(pm.RandBytes(32)), } + signedTicket2.CreationRound = ticket.CreationRound + 100 err = dbh.StoreWinningTicket(signedTicket2) require.Nil(err) - earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard")) + earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard"), defaultCreationRound) assert.Nil(err) assert.Equal(earliest, signedTicket0) + // Test excluding expired tickets + earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard"), defaultCreationRound+100) + assert.Nil(err) + assert.Equal(earliest, signedTicket2) + // Test excluding submitted tickets err = dbh.MarkWinningTicketRedeemed(signedTicket0, pm.RandHash()) require.Nil(err) - earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard")) + earliest, err = dbh.SelectEarliestWinningTicket(ethcommon.HexToAddress("charizard"), defaultCreationRound) assert.Equal(earliest, signedTicket2) + } func TestMarkWinningTicketRedeemed_GivenNilTicket_ReturnsError(t *testing.T) { @@ -1089,7 +1108,7 @@ func TestRemoveWinningTicket(t *testing.T) { require.Nil(err) // confirm ticket is added correctly - count, err := dbh.WinningTicketCount(ticket.Sender) + count, err := dbh.WinningTicketCount(ticket.Sender, ticket.CreationRound) require.Nil(err) require.Equal(count, 1) @@ -1099,14 +1118,14 @@ func TestRemoveWinningTicket(t *testing.T) { err = dbh.RemoveWinningTicket(&signedTicketDup) assert.NoError(err) // confirm ticket is not removed - count, err = dbh.WinningTicketCount(signedTicket.Sender) + count, err = dbh.WinningTicketCount(signedTicket.Sender, ticket.CreationRound) require.Nil(err) assert.Equal(count, 1) err = dbh.RemoveWinningTicket(signedTicket) assert.Nil(err) // confirm ticket is removed correctly - count, _ = dbh.WinningTicketCount(ticket.Sender) + count, _ = dbh.WinningTicketCount(ticket.Sender, ticket.CreationRound) require.Equal(count, 0) } diff --git a/pm/queue.go b/pm/queue.go index 29daf3f951..62bf76db83 100644 --- a/pm/queue.go +++ b/pm/queue.go @@ -4,10 +4,11 @@ import ( "math/big" ethcommon "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/event" "github.com/golang/glog" ) +const ticketValidityPeriod = 2 + // RedeemableEmitter is an interface that describes methods for // emitting redeemable tickets type RedeemableEmitter interface { @@ -32,9 +33,7 @@ type redemption struct { // // Based off of: https://github.com/lightningnetwork/lnd/blob/master/htlcswitch/queue.go type ticketQueue struct { - // blockSub returns a subscription to receive the last seen block number - blockSub func(chan<- *big.Int) event.Subscription - + tm TimeManager // redeemable is a channel that a queue consumer will receive // redeemable tickets on as a sender's max float becomes // sufficient to cover the face value of tickets @@ -46,11 +45,11 @@ type ticketQueue struct { quit chan struct{} } -func newTicketQueue(store TicketStore, sender ethcommon.Address, blockSub func(chan<- *big.Int) event.Subscription) *ticketQueue { +func newTicketQueue(sender ethcommon.Address, sm *LocalSenderMonitor) *ticketQueue { return &ticketQueue{ - blockSub: blockSub, + tm: sm.tm, redeemable: make(chan *redemption), - store: store, + store: sm.ticketStore, sender: sender, quit: make(chan struct{}), } @@ -80,7 +79,7 @@ func (q *ticketQueue) Redeemable() chan *redemption { // Length returns the current length of the queue func (q *ticketQueue) Length() (int, error) { - return q.store.WinningTicketCount(q.sender) + return q.store.WinningTicketCount(q.sender, new(big.Int).Sub(q.tm.LastInitializedRound(), big.NewInt(ticketValidityPeriod)).Int64()) } // startQueueLoop blocks until the ticket queue is non-empty. When the queue is non-empty @@ -92,7 +91,7 @@ func (q *ticketQueue) Length() (int, error) { // the ticket at the head of the queue and send it into q.redeemable which an external listener can use to receive redeemable tickets func (q *ticketQueue) startQueueLoop() { blockNums := make(chan *big.Int, 10) - sub := q.blockSub(blockNums) + sub := q.tm.SubscribeBlocks(blockNums) defer sub.Unsubscribe() ticketLoop: @@ -109,7 +108,7 @@ ticketLoop: continue } for i := 0; i < int(numTickets); i++ { - nextTicket, err := q.store.SelectEarliestWinningTicket(q.sender) + nextTicket, err := q.store.SelectEarliestWinningTicket(q.sender, new(big.Int).Sub(q.tm.LastInitializedRound(), big.NewInt(ticketValidityPeriod)).Int64()) if err != nil { glog.Errorf("Unable select earliest winning ticket err=%v", err) continue ticketLoop @@ -129,9 +128,12 @@ ticketLoop: case res := <-resCh: // after receiving the response we can close the channel so it can be GC'd close(resCh) + // If the ticket is used, we can mark it as redeemed if res.err != nil { glog.Errorf("Error redeeming err=%v", res.err) - continue + if res.err != errIsUsedTicket { + continue + } } err := q.store.MarkWinningTicketRedeemed(nextTicket, res.txHash) if err != nil { diff --git a/pm/queue_test.go b/pm/queue_test.go index 6cd2aa2630..cfce4470b0 100644 --- a/pm/queue_test.go +++ b/pm/queue_test.go @@ -1,6 +1,7 @@ package pm import ( + "fmt" "math/big" "sync" "testing" @@ -30,8 +31,9 @@ func defaultSignedTicket(sender ethcommon.Address, senderNonce uint32) *SignedTi } type queueConsumer struct { - redeemable []*redemption - mu sync.Mutex + redeemable []*redemption + mu sync.Mutex + redemptionErr error } // Redeemable returns the consumed redeemable tickets from a ticket queue @@ -56,7 +58,7 @@ func (qc *queueConsumer) Wait(num int, e RedeemableEmitter, done chan struct{}) ticket.resCh <- struct { txHash ethcommon.Hash err error - }{RandHash(), nil} + }{ticket.SignedTicket.Hash(), qc.redemptionErr} } } done <- struct{}{} @@ -67,9 +69,13 @@ func TestTicketQueueLoop(t *testing.T) { sender := RandAddress() ts := newStubTicketStore() - tm := &stubTimeManager{} + tm := &stubTimeManager{round: big.NewInt(100)} + sm := &LocalSenderMonitor{ + ticketStore: ts, + tm: tm, + } - q := newTicketQueue(ts, sender, tm.SubscribeBlocks) + q := newTicketQueue(sender, sm) q.Start() defer q.Stop() @@ -105,7 +111,7 @@ func TestTicketQueueLoop(t *testing.T) { qlen, err = q.Length() assert.Nil(err) assert.Equal(1, qlen) - earliest, err := q.store.SelectEarliestWinningTicket(sender) + earliest, err := q.store.SelectEarliestWinningTicket(sender, nonExpTicket.CreationRound) assert.Nil(err) assert.Equal(earliest, nonExpTicket) @@ -117,17 +123,56 @@ func TestTicketQueueLoop(t *testing.T) { redeemable := qc.Redeemable() for i := 0; i < numTickets; i++ { assert.Equal(uint32(i), redeemable[i].SignedTicket.SenderNonce) + assert.True(ts.submitted[fmt.Sprintf("%x", redeemable[i].SignedTicket.Sig)]) } } +func TestTicketQueueLoop_IsUsedTicket_MarkAsRedeemed(t *testing.T) { + assert := assert.New(t) + + sender := RandAddress() + ts := newStubTicketStore() + tm := &stubTimeManager{round: big.NewInt(100)} + sm := &LocalSenderMonitor{ + ticketStore: ts, + tm: tm, + } + + q := newTicketQueue(sender, sm) + q.Start() + defer q.Stop() + + ticket := defaultSignedTicket(sender, 0) + q.Add(ticket) + + qlen, err := q.Length() + assert.Nil(err) + assert.Equal(1, qlen) + + qc := &queueConsumer{redemptionErr: errIsUsedTicket} + done := make(chan struct{}) + go qc.Wait(1, q, done) + time.Sleep(20 * time.Millisecond) + + tm.blockNumSink <- big.NewInt(1) + <-done + time.Sleep(20 * time.Millisecond) + + assert.True(ts.submitted[fmt.Sprintf("%x", ticket.Sig)]) +} + func TestTicketQueueLoopConcurrent(t *testing.T) { assert := assert.New(t) sender := RandAddress() ts := newStubTicketStore() - tm := &stubTimeManager{} + tm := &stubTimeManager{round: big.NewInt(100)} + sm := &LocalSenderMonitor{ + ticketStore: ts, + tm: tm, + } - q := newTicketQueue(ts, sender, tm.SubscribeBlocks) + q := newTicketQueue(sender, sm) q.Start() defer q.Stop() @@ -188,9 +233,13 @@ func TestTicketQueueConsumeBlockNums(t *testing.T) { sender := RandAddress() ts := newStubTicketStore() - tm := &stubTimeManager{} + tm := &stubTimeManager{round: big.NewInt(100)} + sm := &LocalSenderMonitor{ + ticketStore: ts, + tm: tm, + } - q := newTicketQueue(ts, sender, tm.SubscribeBlocks) + q := newTicketQueue(sender, sm) q.Start() defer q.Stop() time.Sleep(20 * time.Millisecond) @@ -206,9 +255,13 @@ func TestTicketQueue_Add(t *testing.T) { sender := RandAddress() ts := newStubTicketStore() - tm := &stubTimeManager{} + tm := &stubTimeManager{round: big.NewInt(100)} + sm := &LocalSenderMonitor{ + ticketStore: ts, + tm: tm, + } - q := newTicketQueue(ts, sender, tm.SubscribeBlocks) + q := newTicketQueue(sender, sm) ticket := defaultSignedTicket(sender, 0) @@ -227,9 +280,13 @@ func TestTicketQueue_Length(t *testing.T) { sender := RandAddress() ts := newStubTicketStore() - tm := &stubTimeManager{} + tm := &stubTimeManager{round: big.NewInt(100)} + sm := &LocalSenderMonitor{ + ticketStore: ts, + tm: tm, + } - q := newTicketQueue(ts, sender, tm.SubscribeBlocks) + q := newTicketQueue(sender, sm) ts.tickets[sender] = []*SignedTicket{defaultSignedTicket(sender, 0), defaultSignedTicket(sender, 1), defaultSignedTicket(sender, 2)} diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index ecfd7dcb31..24ffc2c84f 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -268,7 +268,7 @@ func (sm *LocalSenderMonitor) ensureCache(addr ethcommon.Address) { // Caller should hold the lock for LocalSenderMonitor unless the caller is // ensureCache() in which case the caller of ensureCache() should hold the lock func (sm *LocalSenderMonitor) cache(addr ethcommon.Address) { - queue := newTicketQueue(sm.ticketStore, addr, sm.tm.SubscribeBlocks) + queue := newTicketQueue(addr, sm) queue.Start() done := make(chan struct{}) go sm.startTicketQueueConsumerLoop(queue, done) @@ -354,6 +354,21 @@ func (sm *LocalSenderMonitor) redeemWinningTicket(ticket *SignedTicket) (*types. return nil, err } + // Fail early if ticket is used + used, err := sm.broker.IsUsedTicket(ticket.Ticket) + if err != nil { + if monitor.Enabled { + monitor.TicketRedemptionError(ticket.Ticket.Sender.String()) + } + return nil, err + } + if used { + if monitor.Enabled { + monitor.TicketRedemptionError(ticket.Ticket.Sender.String()) + } + return nil, errIsUsedTicket + } + ctx, cancel := context.WithTimeout(context.Background(), sm.cfg.RPCTimeout) gasPrice, err := sm.cfg.SuggestGasPrice(ctx) if err != nil { diff --git a/pm/sendermonitor_test.go b/pm/sendermonitor_test.go index bf4ae22e0a..7d76374041 100644 --- a/pm/sendermonitor_test.go +++ b/pm/sendermonitor_test.go @@ -606,6 +606,48 @@ func TestAvailableFunds(t *testing.T) { assert.Equal(expFunds, funds) } +func TestRedeemWinningTicket_IsUsedTicket(t *testing.T) { + claimant, b, smgr, tm := localSenderMonitorFixture() + addr := RandAddress() + smgr.info[addr] = &SenderInfo{ + Deposit: big.NewInt(500), + WithdrawRound: big.NewInt(0), + Reserve: &ReserveInfo{ + FundsRemaining: big.NewInt(1000), + ClaimedInCurrentRound: big.NewInt(0), + }, + } + + ts := newStubTicketStore() + smgr.claimedReserve[addr] = big.NewInt(100) + sm := NewSenderMonitor(claimant, b, smgr, tm, ts) + sm.Start() + defer sm.Stop() + assert := assert.New(t) + + signedT := defaultSignedTicket(addr, uint32(0)) + + // test error + b.isUsedErr = errors.New("isUsed error") + tx, err := sm.redeemWinningTicket(signedT) + assert.Nil(tx) + assert.EqualError(err, b.isUsedErr.Error()) + + // test used + b.isUsedErr = nil + b.usedTickets[signedT.Hash()] = true + tx, err = sm.redeemWinningTicket(signedT) + assert.Nil(tx) + assert.EqualError(err, errIsUsedTicket.Error()) + + // test not used + b.usedTickets[signedT.Hash()] = false + tx, err = sm.redeemWinningTicket(signedT) + assert.Nil(err) + assert.NotNil(tx) + assert.True(b.IsUsedTicket(signedT.Ticket)) +} + func TestRedeemWinningTicket_CheckAvailableFunds(t *testing.T) { assert := assert.New(t) @@ -896,6 +938,7 @@ func localSenderMonitorFixture() (*LocalSenderMonitorConfig, *stubBroker, *stubS smgr := newStubSenderManager() tm := &stubTimeManager{ transcoderPoolSize: big.NewInt(5), + round: big.NewInt(100), } return cfg, b, smgr, tm } diff --git a/pm/stub.go b/pm/stub.go index 4a808a5f81..d3d9a92d2c 100644 --- a/pm/stub.go +++ b/pm/stub.go @@ -54,7 +54,7 @@ func (ts *stubTicketStore) StoreWinningTicket(ticket *SignedTicket) error { return nil } -func (ts *stubTicketStore) SelectEarliestWinningTicket(sender ethcommon.Address) (*SignedTicket, error) { +func (ts *stubTicketStore) SelectEarliestWinningTicket(sender ethcommon.Address, minCreationRound int64) (*SignedTicket, error) { ts.lock.Lock() defer ts.lock.Unlock() if ts.loadShouldFail { @@ -95,7 +95,7 @@ func (ts *stubTicketStore) RemoveWinningTicket(ticket *SignedTicket) error { return nil } -func (ts *stubTicketStore) WinningTicketCount(sender ethcommon.Address) (int, error) { +func (ts *stubTicketStore) WinningTicketCount(sender ethcommon.Address, minCreationRound int64) (int, error) { ts.lock.Lock() defer ts.lock.Unlock() if ts.loadShouldFail { @@ -138,6 +138,7 @@ type stubBroker struct { claimableReserveShouldFail bool checkTxErr error + isUsedErr error } func newStubBroker() *stubBroker { @@ -188,6 +189,10 @@ func (b *stubBroker) IsUsedTicket(ticket *Ticket) (bool, error) { b.mu.Lock() defer b.mu.Unlock() + if b.isUsedErr != nil { + return false, b.isUsedErr + } + return b.usedTickets[ticket.Hash()], nil } diff --git a/pm/ticketstore.go b/pm/ticketstore.go index eda486e0d8..01d5a69d5e 100644 --- a/pm/ticketstore.go +++ b/pm/ticketstore.go @@ -1,13 +1,15 @@ package pm -import ethcommon "github.com/ethereum/go-ethereum/common" +import ( + ethcommon "github.com/ethereum/go-ethereum/common" +) // TicketStore is an interface which describes an object capable // of persisting tickets type TicketStore interface { // SelectEarliestWinningTicket selects the earliest stored winning ticket for a 'sender' // which is not yet redeemed - SelectEarliestWinningTicket(sender ethcommon.Address) (*SignedTicket, error) + SelectEarliestWinningTicket(sender ethcommon.Address, minCreationRound int64) (*SignedTicket, error) // RemoveWinningTicket removes a ticket RemoveWinningTicket(ticket *SignedTicket) error @@ -20,5 +22,5 @@ type TicketStore interface { MarkWinningTicketRedeemed(ticket *SignedTicket, txHash ethcommon.Hash) error // WinningTicketCount returns the amount of non-redeemed winning tickets for a sender in the TicketStore - WinningTicketCount(sender ethcommon.Address) (int, error) + WinningTicketCount(sender ethcommon.Address, minCreationRound int64) (int, error) } diff --git a/pm/validator.go b/pm/validator.go index 3cd1fbc4b2..eb459b9991 100644 --- a/pm/validator.go +++ b/pm/validator.go @@ -15,6 +15,7 @@ var ( errInvalidTicketSignature = errors.New("invalid ticket signature") errInvalidCreationRound = errors.New("invalid ticket creation round") errInvalidCreationRoundBlockHash = errors.New("invalid ticket creation round block hash") + errIsUsedTicket = errors.New("ticket already used") ) // Validator is an interface which describes an object capable