Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pm: Only mark failed tx as redeemed for check tx err #2018

Merged
merged 3 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@

#### Orchestrator

- \#2018 Only mark tickets for failed transactions as redeemed when there is an error checking the transaction (@yondonfu)

#### Transcoder
14 changes: 9 additions & 5 deletions pm/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pm

import (
"math/big"
"strings"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/golang/glog"
Expand Down Expand Up @@ -128,16 +129,14 @@ ticketLoop:
case res := <-resCh:
// after receiving the response we can close the channel so it can be GC'd
close(resCh)
// If the ticket is used, we can mark it as redeemed
if res.err != nil {
glog.Errorf("Error redeeming err=%v", res.err)
_, checkTxErr := res.err.(errCheckTx)
if res.err != errIsUsedTicket && !checkTxErr {
// If the error is non-retryable then we mark the ticket as redeemed
if !isNonRetryableTicketErr(res.err) {
continue
}
}
err := q.store.MarkWinningTicketRedeemed(nextTicket, res.txHash)
if err != nil {
if err := q.store.MarkWinningTicketRedeemed(nextTicket, res.txHash); err != nil {
glog.Error(err)
continue
}
Expand All @@ -151,3 +150,8 @@ ticketLoop:
}
}
}

func isNonRetryableTicketErr(err error) bool {
// The latter check depends on logic in eth.client.CheckTx()
return err == errIsUsedTicket || strings.Contains(err.Error(), "transaction failed")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel great about this string check, but I ran into issues implementing an error type check since the tx failure check occurs in the eth package and importing an error type from the eth package in the pm package would result in an import cycle. There may be a better way to structure this check, but I went with the simple solution of a string check right now accepting the downside of exposing the pm package to the implementation details of eth.client.CheckTx().

}
53 changes: 41 additions & 12 deletions pm/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

ethcommon "github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -127,7 +128,7 @@ func TestTicketQueueLoop(t *testing.T) {
}
}

func TestTicketQueueLoop_IsUsedTicket_MarkAsRedeemed(t *testing.T) {
func TestTicketQueueLoop_IsNonRetryableTicketErr_MarkAsRedeemed(t *testing.T) {
assert := assert.New(t)

sender := RandAddress()
Expand All @@ -142,23 +143,51 @@ func TestTicketQueueLoop_IsUsedTicket_MarkAsRedeemed(t *testing.T) {
q.Start()
defer q.Stop()

ticket := defaultSignedTicket(sender, 0)
q.Add(ticket)
addTicket := func(ticket *SignedTicket) {
q.Add(ticket)

qlen, err := q.Length()
assert.Nil(err)
assert.Equal(1, qlen)
qlen, err := q.Length()
assert.Nil(err)
assert.Equal(1, qlen)
}

consumeQueue := func(qc *queueConsumer) {
done := make(chan struct{})
go qc.Wait(1, q, done)
time.Sleep(20 * time.Millisecond)

tm.blockNumSink <- big.NewInt(1)
<-done
time.Sleep(100 * time.Millisecond)
}

// Test that ticket is marked as redeemed if ticket is used
ticket := defaultSignedTicket(sender, 0)
addTicket(ticket)

qc := &queueConsumer{redemptionErr: errIsUsedTicket}
done := make(chan struct{})
go qc.Wait(1, q, done)
time.Sleep(20 * time.Millisecond)
consumeQueue(qc)
assert.True(ts.submitted[fmt.Sprintf("%x", ticket.Sig)])

tm.blockNumSink <- big.NewInt(1)
<-done
time.Sleep(100 * time.Millisecond)
// Test that ticket is marked as redeemed if tx fails
ticket = defaultSignedTicket(sender, 1)
addTicket(ticket)

qc = &queueConsumer{
redemptionErr: errors.New("transaction failed txHash=abc"),
}
consumeQueue(qc)
assert.True(ts.submitted[fmt.Sprintf("%x", ticket.Sig)])

// Test that ticket is not marked as redeemed if there is an error checking the tx, but the tx did not fail
ticket = defaultSignedTicket(sender, 2)
addTicket(ticket)

qc = &queueConsumer{
redemptionErr: errors.New("some other error"),
}
consumeQueue(qc)
assert.False(ts.submitted[fmt.Sprintf("%x", ticket.Sig)])
}

func TestTicketQueueLoopConcurrent(t *testing.T) {
Expand Down
29 changes: 16 additions & 13 deletions pm/sendermonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ var unixNow = func() int64 {
return time.Now().Unix()
}

type errCheckTx error
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this error type because we really care about whether the error when checking the tx is the result of a tx failure and this is accomplished by the string check in this PR making this error type unnecessary.


// SenderMonitor is an interface that describes methods used to
// monitor remote senders
type SenderMonitor interface {
Expand Down Expand Up @@ -291,17 +289,20 @@ func (sm *LocalSenderMonitor) startTicketQueueConsumerLoop(queue *ticketQueue, d
select {
case red := <-queue.Redeemable():
tx, err := sm.redeemWinningTicket(red.SignedTicket)
if err != nil {
red.resCh <- struct {
txHash ethcommon.Hash
err error
}{ethcommon.Hash{}, err}
} else {
red.resCh <- struct {
txHash ethcommon.Hash
err error
}{tx.Hash(), nil}
res := struct {
txHash ethcommon.Hash
err error
}{
ethcommon.Hash{},
err,
}
// FIXME: If there are replacement txs then tx.Hash() could be different
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: Create an issue to track this.

This was a problem before this PR and more substantial changes will be required to address it (i.e. return the tx that was mined from CheckTx()). The negative impact is relatively minimal though - the DB will just end up tracking the wrong tx hash for a redeemed ticket if a replacement tx was mined.

// from the hash of the replacement tx that was mined
if tx != nil {
res.txHash = tx.Hash()
}

red.resCh <- res
case <-done:
// When the ticket consumer exits, tell the ticketQueue
// to exit as well
Expand Down Expand Up @@ -350,6 +351,7 @@ func (sm *LocalSenderMonitor) cleanup() {
}
}

// Returns a non-nil tx if one is sent. Otherwise, returns a nil tx
func (sm *LocalSenderMonitor) redeemWinningTicket(ticket *SignedTicket) (*types.Transaction, error) {
availableFunds, err := sm.availableFunds(ticket.Sender)
if err != nil {
Expand Down Expand Up @@ -424,7 +426,8 @@ func (sm *LocalSenderMonitor) redeemWinningTicket(ticket *SignedTicket) (*types.
if monitor.Enabled {
monitor.TicketRedemptionError()
}
return nil, errCheckTx(err)
// Return tx so caller can utilize the tx if it fails
return tx, err
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gives the caller access to the hash of a failed tx.

}

if monitor.Enabled {
Expand Down
6 changes: 3 additions & 3 deletions pm/sendermonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,15 +787,15 @@ func TestRedeemWinningTicket_SingleTicket_CheckTxError(t *testing.T) {
sm := NewSenderMonitor(cfg, b, smgr, tm, ts)
sm.Start()
defer sm.Stop()
expErr := errCheckTx(errors.New("checktx error"))
expErr := errors.New("checktx error")
b.checkTxErr = expErr
assert := assert.New(t)

signedT := defaultSignedTicket(addr, uint32(0))

tx, err := sm.redeemWinningTicket(signedT)
assert.Nil(tx)
assert.IsType(expErr, err)
assert.NotNil(tx)
assert.Equal(expErr, err)
}

func TestRedeemWinningTicket_SingleTicket(t *testing.T) {
Expand Down