Skip to content

Commit

Permalink
Fix recoverer cleanup (#10417)
Browse files Browse the repository at this point in the history
* fix: remove log from pending, not just from visited cache

* recoverer: add/rm pending

* check removePending
  • Loading branch information
amirylm authored Aug 31, 2023
1 parent 876e81d commit 4e45352
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (r *logRecoverer) populatePending(f upkeepFilter, filteredLogs []logpoller.
visitedAt: time.Now(),
payload: payload,
}
r.pending = append(r.pending, payload)
r.addPending(payload)
}
return len(r.pending) - pendingSizeBefore, alreadyPending
}
Expand Down Expand Up @@ -511,26 +511,23 @@ func (r *logRecoverer) clean(ctx context.Context) {
lggr.Debug("no expired upkeeps")
return
}
cleaned, err := r.tryExpire(ctx, expired...)
err := r.tryExpire(ctx, expired...)
if err != nil {
lggr.Warnw("failed to clean visited upkeeps", "err", err)
}
if len(expired) > 0 {
lggr.Debugw("expired upkeeps", "expired", len(expired), "cleaned", cleaned)
}
}

func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) (int, error) {
func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
latestBlock, err := r.poller.LatestBlock(pg.WithParentCtx(ctx))
if err != nil {
return 0, fmt.Errorf("failed to get latest block: %w", err)
return fmt.Errorf("failed to get latest block: %w", err)
}
sort.Slice(ids, func(i, j int) bool {
return ids[i] < ids[j]
})
states, err := r.states.SelectByWorkIDs(ctx, ids...)
if err != nil {
return 0, fmt.Errorf("failed to get states: %w", err)
return fmt.Errorf("failed to get states: %w", err)
}
lggr := r.lggr.With("where", "clean")
start, _ := r.getRecoveryWindow(latestBlock)
Expand All @@ -550,12 +547,13 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) (int, error
if logBlock := rec.payload.Trigger.LogTriggerExtension.BlockNumber; int64(logBlock) < start {
// we can't recover this log anymore, so we remove it from the visited list
lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID,
"logBlock", logBlock, "start", start)
"latestBlock", latestBlock, "logBlock", logBlock, "start", start)
r.removePending(rec.payload.WorkID)
delete(r.visited, ids[i])
removed++
continue
}
r.pending = append(r.pending, rec.payload)
r.addPending(rec.payload)
rec.visitedAt = time.Now()
r.visited[ids[i]] = rec
default:
Expand All @@ -564,5 +562,36 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) (int, error
}
}

return removed, nil
if removed > 0 {
lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed)
}

return nil
}

// addPending adds a payload to the pending list if it's not already there.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) addPending(payload ocr2keepers.UpkeepPayload) {
var exist bool
pending := r.pending
for _, p := range pending {
if p.WorkID == payload.WorkID {
exist = true
}
}
if !exist {
r.pending = append(pending, payload)
}
}

// removePending removes a payload from the pending list.
// NOTE: the lock must be held before calling this function.
func (r *logRecoverer) removePending(workID string) {
updated := make([]ocr2keepers.UpkeepPayload, 0, len(r.pending))
for _, p := range r.pending {
if p.WorkID != workID {
updated = append(updated, p)
}
}
r.pending = updated
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logprovider
import (
"context"
"fmt"
"math"
"math/big"
"sort"
"testing"
Expand Down Expand Up @@ -97,11 +98,12 @@ func TestLogRecoverer_GetRecoverables(t *testing.T) {
}

func TestLogRecoverer_Clean(t *testing.T) {
oldLogsOffset := int64(20)

tests := []struct {
name string
pending []ocr2keepers.UpkeepPayload
visited map[string]visitedRecord
latestBlock int64
states []ocr2keepers.UpkeepState
wantPending []ocr2keepers.UpkeepPayload
wantVisited []string
Expand All @@ -110,7 +112,6 @@ func TestLogRecoverer_Clean(t *testing.T) {
"empty",
[]ocr2keepers.UpkeepPayload{},
map[string]visitedRecord{},
0,
[]ocr2keepers.UpkeepState{},
[]ocr2keepers.UpkeepPayload{},
[]string{},
Expand All @@ -120,44 +121,44 @@ func TestLogRecoverer_Clean(t *testing.T) {
[]ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
{WorkID: "3", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "3")},
},
map[string]visitedRecord{
"1": visitedRecord{time.Now(), ocr2keepers.UpkeepPayload{
WorkID: "1",
Trigger: ocr2keepers.Trigger{
LogTriggerExtension: &ocr2keepers.LogTriggerExtension{
BlockNumber: 50,
BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2),
},
},
}},
"2": visitedRecord{time.Now(), ocr2keepers.UpkeepPayload{
WorkID: "2",
Trigger: ocr2keepers.Trigger{
LogTriggerExtension: &ocr2keepers.LogTriggerExtension{
BlockNumber: 50,
BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset * 2),
},
},
}},
"3": visitedRecord{time.Now().Add(-time.Hour), ocr2keepers.UpkeepPayload{
WorkID: "3",
Trigger: ocr2keepers.Trigger{
LogTriggerExtension: &ocr2keepers.LogTriggerExtension{
BlockNumber: 50,
BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset - 10),
},
},
}},
"4": visitedRecord{time.Now().Add(-time.Hour), ocr2keepers.UpkeepPayload{
WorkID: "4",
Trigger: ocr2keepers.Trigger{
LogTriggerExtension: &ocr2keepers.LogTriggerExtension{
BlockNumber: 50,
BlockNumber: ocr2keepers.BlockNumber(oldLogsOffset + 10),
},
},
}},
},
200,
[]ocr2keepers.UpkeepState{
ocr2keepers.Ineligible,
ocr2keepers.UnknownState,
ocr2keepers.UnknownState,
},
[]ocr2keepers.UpkeepPayload{
Expand All @@ -176,8 +177,10 @@ func TestLogRecoverer_Clean(t *testing.T) {

lookbackBlocks := int64(100)
r, _, lp, statesReader := setupTestRecoverer(t, time.Millisecond*50, lookbackBlocks)
start, _ := r.getRecoveryWindow(0)
block24h := int64(math.Abs(float64(start)))

lp.On("LatestBlock", mock.Anything).Return(tc.latestBlock, nil)
lp.On("LatestBlock", mock.Anything).Return(block24h+oldLogsOffset, nil)
statesReader.On("SelectByWorkIDs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tc.states, nil)

r.lock.Lock()
Expand Down Expand Up @@ -914,6 +917,63 @@ func TestLogRecoverer_GetProposalData(t *testing.T) {
}
}

func TestLogRecoverer_pending(t *testing.T) {
tests := []struct {
name string
exist []ocr2keepers.UpkeepPayload
new []ocr2keepers.UpkeepPayload
want []ocr2keepers.UpkeepPayload
}{
{
"empty",
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
[]ocr2keepers.UpkeepPayload{},
},
{
"add new and existing",
[]ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
},
[]ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
[]ocr2keepers.UpkeepPayload{
{WorkID: "1", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "1")},
{WorkID: "2", UpkeepID: core.GenUpkeepID(ocr2keepers.LogTrigger, "2")},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
r := NewLogRecoverer(logger.TestLogger(t), nil, nil, nil, nil, nil, NewOptions(200))
r.lock.Lock()
r.pending = tc.exist
for _, p := range tc.new {
r.addPending(p)
}
pending := r.pending
require.GreaterOrEqual(t, len(pending), len(tc.new))
require.Equal(t, len(tc.want), len(pending))
sort.Slice(pending, func(i, j int) bool {
return pending[i].WorkID < pending[j].WorkID
})
for i := range pending {
require.Equal(t, tc.want[i].WorkID, pending[i].WorkID)
}
r.lock.Unlock()
for _, p := range tc.want {
r.removePending(p.WorkID)
}
r.lock.Lock()
defer r.lock.Unlock()
require.Equal(t, 0, len(r.pending))
})
}
}

type mockFilterStore struct {
UpkeepFilterStore
HasFn func(id *big.Int) bool
Expand Down

0 comments on commit 4e45352

Please sign in to comment.