Skip to content

Commit

Permalink
Merge pull request #6362 from filecoin-project/fix/evt-many-called-ep…
Browse files Browse the repository at this point in the history
…och-1.10

events: Fix handling of multiple matched events per epoch
  • Loading branch information
magik6k authored May 31, 2021
2 parents cc90b61 + 8003a8a commit 0413022
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 5 deletions.
12 changes: 7 additions & 5 deletions chain/events/events_called.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ func (e *hcEvents) processHeadChangeEvent(rev, app []*types.TipSet) error {

// Queue up calls until there have been enough blocks to reach
// confidence on the message calls
for tid, data := range newCalls {
e.queueForConfidence(tid, data, nil, ts)
for tid, calls := range newCalls {
for _, data := range calls {
e.queueForConfidence(tid, data, nil, ts)
}
}

for at := e.lastTs.Height(); at <= ts.Height(); at++ {
Expand Down Expand Up @@ -472,7 +474,7 @@ func newMessageEvents(ctx context.Context, hcAPI headChangeAPI, cs eventAPI) mes
}

// Check if there are any new actor calls
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventData, error) {
func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID][]eventData, error) {
pts, err := me.cs.ChainGetTipSet(me.ctx, ts.Parents()) // we actually care about messages in the parent tipset here
if err != nil {
log.Errorf("getting parent tipset in checkNewCalls: %s", err)
Expand All @@ -483,7 +485,7 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
defer me.lk.RUnlock()

// For each message in the tipset
res := make(map[triggerID]eventData)
res := make(map[triggerID][]eventData)
me.messagesForTs(pts, func(msg *types.Message) {
// TODO: provide receipts

Expand All @@ -498,7 +500,7 @@ func (me *messageEvents) checkNewCalls(ts *types.TipSet) (map[triggerID]eventDat
// If there was a match, include the message in the results for the
// trigger
if matched {
res[tid] = msg
res[tid] = append(res[tid], msg)
}
}
})
Expand Down
59 changes: 59 additions & 0 deletions chain/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1323,3 +1323,62 @@ func TestStateChangedTimeout(t *testing.T) {
fcs.advance(0, 5, nil)
require.False(t, called)
}

func TestCalledMultiplePerEpoch(t *testing.T) {
fcs := &fakeCS{
t: t,
h: 1,

msgs: map[cid.Cid]fakeMsg{},
blkMsgs: map[cid.Cid]cid.Cid{},
tsc: newTSCache(2*build.ForkLengthThreshold, nil),
}
require.NoError(t, fcs.tsc.add(fcs.makeTs(t, nil, 1, dummyCid)))

events := NewEvents(context.Background(), fcs)

t0123, err := address.NewFromString("t0123")
require.NoError(t, err)

at := 0

err = events.Called(func(ts *types.TipSet) (d bool, m bool, e error) {
return false, true, nil
}, func(msg *types.Message, rec *types.MessageReceipt, ts *types.TipSet, curH abi.ChainEpoch) (bool, error) {
switch at {
case 0:
require.Equal(t, uint64(1), msg.Nonce)
require.Equal(t, abi.ChainEpoch(4), ts.Height())
case 1:
require.Equal(t, uint64(2), msg.Nonce)
require.Equal(t, abi.ChainEpoch(4), ts.Height())
default:
t.Fatal("apply should only get called twice, at: ", at)
}
at++
return true, nil
}, func(_ context.Context, ts *types.TipSet) error {
switch at {
case 2:
require.Equal(t, abi.ChainEpoch(4), ts.Height())
case 3:
require.Equal(t, abi.ChainEpoch(4), ts.Height())
default:
t.Fatal("revert should only get called twice, at: ", at)
}
at++
return nil
}, 3, 20, matchAddrMethod(t0123, 5))
require.NoError(t, err)

fcs.advance(0, 10, map[int]cid.Cid{
1: fcs.fakeMsgs(fakeMsg{
bmsgs: []*types.Message{
{To: t0123, From: t0123, Method: 5, Nonce: 1},
{To: t0123, From: t0123, Method: 5, Nonce: 2},
},
}),
})

fcs.advance(9, 1, nil)
}

0 comments on commit 0413022

Please sign in to comment.