Skip to content

Commit

Permalink
pm: isUsedTicket check when redeeming ticket
Browse files Browse the repository at this point in the history
  • Loading branch information
kyriediculous committed Oct 7, 2020
1 parent e82b98a commit 3a458d2
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 4 deletions.
5 changes: 4 additions & 1 deletion pm/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,12 @@ ticketLoop:
case res := <-resCh:
// after receiving the response we can close the channel so it can be GC'd
close(resCh)
// If the ticket is used, we can mark it as redeemed
if res.err != nil {
glog.Errorf("Error redeeming err=%v", res.err)
continue
if res.err != errIsUsedTicket {
continue
}
}
err := q.store.MarkWinningTicketRedeemed(nextTicket, res.txHash)
if err != nil {
Expand Down
43 changes: 40 additions & 3 deletions pm/queue_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pm

import (
"fmt"
"math/big"
"sync"
"testing"
Expand Down Expand Up @@ -30,8 +31,9 @@ func defaultSignedTicket(sender ethcommon.Address, senderNonce uint32) *SignedTi
}

type queueConsumer struct {
redeemable []*redemption
mu sync.Mutex
redeemable []*redemption
mu sync.Mutex
redemptionErr error
}

// Redeemable returns the consumed redeemable tickets from a ticket queue
Expand All @@ -56,7 +58,7 @@ func (qc *queueConsumer) Wait(num int, e RedeemableEmitter, done chan struct{})
ticket.resCh <- struct {
txHash ethcommon.Hash
err error
}{RandHash(), nil}
}{ticket.SignedTicket.Hash(), qc.redemptionErr}
}
}
done <- struct{}{}
Expand Down Expand Up @@ -121,9 +123,44 @@ func TestTicketQueueLoop(t *testing.T) {
redeemable := qc.Redeemable()
for i := 0; i < numTickets; i++ {
assert.Equal(uint32(i), redeemable[i].SignedTicket.SenderNonce)
assert.True(ts.submitted[fmt.Sprintf("%x", redeemable[i].SignedTicket.Sig)])
}
}

func TestTicketQueueLoop_IsUsedTicket_MarkAsRedeemed(t *testing.T) {
assert := assert.New(t)

sender := RandAddress()
ts := newStubTicketStore()
tm := &stubTimeManager{round: big.NewInt(100)}
sm := &LocalSenderMonitor{
ticketStore: ts,
tm: tm,
}

q := newTicketQueue(sender, sm)
q.Start()
defer q.Stop()

ticket := defaultSignedTicket(sender, 0)
q.Add(ticket)

qlen, err := q.Length()
assert.Nil(err)
assert.Equal(1, qlen)

qc := &queueConsumer{redemptionErr: errIsUsedTicket}
done := make(chan struct{})
go qc.Wait(1, q, done)
time.Sleep(20 * time.Millisecond)

tm.blockNumSink <- big.NewInt(1)
<-done
time.Sleep(20 * time.Millisecond)

assert.True(ts.submitted[fmt.Sprintf("%x", ticket.Sig)])
}

func TestTicketQueueLoopConcurrent(t *testing.T) {
assert := assert.New(t)

Expand Down
15 changes: 15 additions & 0 deletions pm/sendermonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,21 @@ func (sm *LocalSenderMonitor) redeemWinningTicket(ticket *SignedTicket) (*types.
return nil, err
}

// Fail early if ticket is used
used, err := sm.broker.IsUsedTicket(ticket.Ticket)
if err != nil {
if monitor.Enabled {
monitor.TicketRedemptionError(ticket.Ticket.Sender.String())
}
return nil, err
}
if used {
if monitor.Enabled {
monitor.TicketRedemptionError(ticket.Ticket.Sender.String())
}
return nil, errIsUsedTicket
}

ctx, cancel := context.WithTimeout(context.Background(), sm.cfg.RPCTimeout)
gasPrice, err := sm.cfg.SuggestGasPrice(ctx)
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions pm/sendermonitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,48 @@ func TestAvailableFunds(t *testing.T) {
assert.Equal(expFunds, funds)
}

func TestRedeemWinningTicket_IsUsedTicket(t *testing.T) {
claimant, b, smgr, tm := localSenderMonitorFixture()
addr := RandAddress()
smgr.info[addr] = &SenderInfo{
Deposit: big.NewInt(500),
WithdrawRound: big.NewInt(0),
Reserve: &ReserveInfo{
FundsRemaining: big.NewInt(1000),
ClaimedInCurrentRound: big.NewInt(0),
},
}

ts := newStubTicketStore()
smgr.claimedReserve[addr] = big.NewInt(100)
sm := NewSenderMonitor(claimant, b, smgr, tm, ts)
sm.Start()
defer sm.Stop()
assert := assert.New(t)

signedT := defaultSignedTicket(addr, uint32(0))

// test error
b.isUsedErr = errors.New("isUsed error")
tx, err := sm.redeemWinningTicket(signedT)
assert.Nil(tx)
assert.EqualError(err, b.isUsedErr.Error())

// test used
b.isUsedErr = nil
b.usedTickets[signedT.Hash()] = true
tx, err = sm.redeemWinningTicket(signedT)
assert.Nil(tx)
assert.EqualError(err, errIsUsedTicket.Error())

// test not used
b.usedTickets[signedT.Hash()] = false
tx, err = sm.redeemWinningTicket(signedT)
assert.Nil(err)
assert.NotNil(tx)
assert.True(b.IsUsedTicket(signedT.Ticket))
}

func TestRedeemWinningTicket_CheckAvailableFunds(t *testing.T) {
assert := assert.New(t)

Expand Down
5 changes: 5 additions & 0 deletions pm/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type stubBroker struct {
claimableReserveShouldFail bool

checkTxErr error
isUsedErr error
}

func newStubBroker() *stubBroker {
Expand Down Expand Up @@ -188,6 +189,10 @@ func (b *stubBroker) IsUsedTicket(ticket *Ticket) (bool, error) {
b.mu.Lock()
defer b.mu.Unlock()

if b.isUsedErr != nil {
return false, b.isUsedErr
}

return b.usedTickets[ticket.Hash()], nil
}

Expand Down
1 change: 1 addition & 0 deletions pm/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var (
errInvalidTicketSignature = errors.New("invalid ticket signature")
errInvalidCreationRound = errors.New("invalid ticket creation round")
errInvalidCreationRoundBlockHash = errors.New("invalid ticket creation round block hash")
errIsUsedTicket = errors.New("ticket already used")
)

// Validator is an interface which describes an object capable
Expand Down

0 comments on commit 3a458d2

Please sign in to comment.