diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 19f78835b9..1045b4a14b 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -28,4 +28,6 @@ #### Orchestrator +- \#2018 Only mark tickets for failed transactions as redeemed when there is an error checking the transaction (@yondonfu) + #### Transcoder diff --git a/pm/queue.go b/pm/queue.go index f6d3526ba2..88444348f0 100644 --- a/pm/queue.go +++ b/pm/queue.go @@ -2,6 +2,7 @@ package pm import ( "math/big" + "strings" ethcommon "github.com/ethereum/go-ethereum/common" "github.com/golang/glog" @@ -128,16 +129,14 @@ ticketLoop: case res := <-resCh: // after receiving the response we can close the channel so it can be GC'd close(resCh) - // If the ticket is used, we can mark it as redeemed if res.err != nil { glog.Errorf("Error redeeming err=%v", res.err) - _, checkTxErr := res.err.(errCheckTx) - if res.err != errIsUsedTicket && !checkTxErr { + // If the error is non-retryable then we mark the ticket as redeemed + if !isNonRetryableTicketErr(res.err) { continue } } - err := q.store.MarkWinningTicketRedeemed(nextTicket, res.txHash) - if err != nil { + if err := q.store.MarkWinningTicketRedeemed(nextTicket, res.txHash); err != nil { glog.Error(err) continue } @@ -151,3 +150,8 @@ ticketLoop: } } } + +func isNonRetryableTicketErr(err error) bool { + // The latter check depends on logic in eth.client.CheckTx() + return err == errIsUsedTicket || strings.Contains(err.Error(), "transaction failed") +} diff --git a/pm/queue_test.go b/pm/queue_test.go index 4031f0d440..1f74c2fc78 100644 --- a/pm/queue_test.go +++ b/pm/queue_test.go @@ -8,6 +8,7 @@ import ( "time" ethcommon "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -127,7 +128,7 @@ func TestTicketQueueLoop(t *testing.T) { } } -func TestTicketQueueLoop_IsUsedTicket_MarkAsRedeemed(t *testing.T) { +func TestTicketQueueLoop_IsNonRetryableTicketErr_MarkAsRedeemed(t *testing.T) { assert := assert.New(t) sender := RandAddress() @@ -142,23 +143,51 @@ func TestTicketQueueLoop_IsUsedTicket_MarkAsRedeemed(t *testing.T) { q.Start() defer q.Stop() - ticket := defaultSignedTicket(sender, 0) - q.Add(ticket) + addTicket := func(ticket *SignedTicket) { + q.Add(ticket) - qlen, err := q.Length() - assert.Nil(err) - assert.Equal(1, qlen) + qlen, err := q.Length() + assert.Nil(err) + assert.Equal(1, qlen) + } + + consumeQueue := func(qc *queueConsumer) { + done := make(chan struct{}) + go qc.Wait(1, q, done) + time.Sleep(20 * time.Millisecond) + + tm.blockNumSink <- big.NewInt(1) + <-done + time.Sleep(100 * time.Millisecond) + } + + // Test that ticket is marked as redeemed if ticket is used + ticket := defaultSignedTicket(sender, 0) + addTicket(ticket) qc := &queueConsumer{redemptionErr: errIsUsedTicket} - done := make(chan struct{}) - go qc.Wait(1, q, done) - time.Sleep(20 * time.Millisecond) + consumeQueue(qc) + assert.True(ts.submitted[fmt.Sprintf("%x", ticket.Sig)]) - tm.blockNumSink <- big.NewInt(1) - <-done - time.Sleep(100 * time.Millisecond) + // Test that ticket is marked as redeemed if tx fails + ticket = defaultSignedTicket(sender, 1) + addTicket(ticket) + qc = &queueConsumer{ + redemptionErr: errors.New("transaction failed txHash=abc"), + } + consumeQueue(qc) assert.True(ts.submitted[fmt.Sprintf("%x", ticket.Sig)]) + + // Test that ticket is not marked as redeemed if there is an error checking the tx, but the tx did not fail + ticket = defaultSignedTicket(sender, 2) + addTicket(ticket) + + qc = &queueConsumer{ + redemptionErr: errors.New("some other error"), + } + consumeQueue(qc) + assert.False(ts.submitted[fmt.Sprintf("%x", ticket.Sig)]) } func TestTicketQueueLoopConcurrent(t *testing.T) { diff --git a/pm/sendermonitor.go b/pm/sendermonitor.go index c51a7a9836..386cbacb24 100644 --- a/pm/sendermonitor.go +++ b/pm/sendermonitor.go @@ -25,8 +25,6 @@ var unixNow = func() int64 { return time.Now().Unix() } -type errCheckTx error - // SenderMonitor is an interface that describes methods used to // monitor remote senders type SenderMonitor interface { @@ -291,17 +289,20 @@ func (sm *LocalSenderMonitor) startTicketQueueConsumerLoop(queue *ticketQueue, d select { case red := <-queue.Redeemable(): tx, err := sm.redeemWinningTicket(red.SignedTicket) - if err != nil { - red.resCh <- struct { - txHash ethcommon.Hash - err error - }{ethcommon.Hash{}, err} - } else { - red.resCh <- struct { - txHash ethcommon.Hash - err error - }{tx.Hash(), nil} + res := struct { + txHash ethcommon.Hash + err error + }{ + ethcommon.Hash{}, + err, + } + // FIXME: If there are replacement txs then tx.Hash() could be different + // from the hash of the replacement tx that was mined + if tx != nil { + res.txHash = tx.Hash() } + + red.resCh <- res case <-done: // When the ticket consumer exits, tell the ticketQueue // to exit as well @@ -350,6 +351,7 @@ func (sm *LocalSenderMonitor) cleanup() { } } +// Returns a non-nil tx if one is sent. Otherwise, returns a nil tx func (sm *LocalSenderMonitor) redeemWinningTicket(ticket *SignedTicket) (*types.Transaction, error) { availableFunds, err := sm.availableFunds(ticket.Sender) if err != nil { @@ -424,7 +426,8 @@ func (sm *LocalSenderMonitor) redeemWinningTicket(ticket *SignedTicket) (*types. if monitor.Enabled { monitor.TicketRedemptionError() } - return nil, errCheckTx(err) + // Return tx so caller can utilize the tx if it fails + return tx, err } if monitor.Enabled { diff --git a/pm/sendermonitor_test.go b/pm/sendermonitor_test.go index 6fba523866..7f24f9ca93 100644 --- a/pm/sendermonitor_test.go +++ b/pm/sendermonitor_test.go @@ -787,15 +787,15 @@ func TestRedeemWinningTicket_SingleTicket_CheckTxError(t *testing.T) { sm := NewSenderMonitor(cfg, b, smgr, tm, ts) sm.Start() defer sm.Stop() - expErr := errCheckTx(errors.New("checktx error")) + expErr := errors.New("checktx error") b.checkTxErr = expErr assert := assert.New(t) signedT := defaultSignedTicket(addr, uint32(0)) tx, err := sm.redeemWinningTicket(signedT) - assert.Nil(tx) - assert.IsType(expErr, err) + assert.NotNil(tx) + assert.Equal(expErr, err) } func TestRedeemWinningTicket_SingleTicket(t *testing.T) {