-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mercury feeds starts at the first block number not at zero #9664
Changes from 8 commits
09b9211
3a48f1a
5177c29
b488773
ac54714
e90b177
53a7341
e92a181
0f3502f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,12 @@ type Runner interface { | |
ExecuteRun(ctx context.Context, spec pipeline.Spec, vars pipeline.Vars, l logger.Logger) (run pipeline.Run, trrs pipeline.TaskRunResults, err error) | ||
} | ||
|
||
type Fetcher interface { | ||
// FetchInitialMaxFinalizedBlockNumber should fetch the initial max | ||
// finalized block number from the mercury server. | ||
FetchInitialMaxFinalizedBlockNumber(context.Context) (*int64, error) | ||
} | ||
|
||
type datasource struct { | ||
pipelineRunner Runner | ||
jb job.Job | ||
|
@@ -42,15 +48,16 @@ type datasource struct { | |
|
||
mu sync.RWMutex | ||
|
||
chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData | ||
chainHeadTracker ChainHeadTracker | ||
fetcher relaymercury.Fetcher | ||
chEnhancedTelem chan<- ocrcommon.EnhancedTelemetryMercuryData | ||
chainHeadTracker ChainHeadTracker | ||
fetcher Fetcher | ||
initialBlockNumber *int64 | ||
} | ||
|
||
var _ relaymercury.DataSource = &datasource{} | ||
|
||
func NewDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker ChainHeadTracker, fetcher relaymercury.Fetcher) *datasource { | ||
return &datasource{pr, jb, spec, lggr, rr, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher} | ||
func NewDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, lggr logger.Logger, rr chan pipeline.Run, enhancedTelemChan chan ocrcommon.EnhancedTelemetryMercuryData, chainHeadTracker ChainHeadTracker, fetcher Fetcher, initialBlockNumber *int64) *datasource { | ||
return &datasource{pr, jb, spec, lggr, rr, sync.RWMutex{}, enhancedTelemChan, chainHeadTracker, fetcher, initialBlockNumber} | ||
} | ||
|
||
func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestamp, fetchMaxFinalizedBlockNum bool) (obs relaymercury.Observation, err error) { | ||
|
@@ -63,7 +70,36 @@ func (ds *datasource) Observe(ctx context.Context, repts ocrtypes.ReportTimestam | |
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
obs.MaxFinalizedBlockNumber.Val, obs.MaxFinalizedBlockNumber.Err = ds.fetcher.FetchInitialMaxFinalizedBlockNumber(ctx) | ||
var val *int64 | ||
val, err = ds.fetcher.FetchInitialMaxFinalizedBlockNumber(ctx) | ||
if err != nil { | ||
obs.MaxFinalizedBlockNumber.Err = err | ||
return | ||
} | ||
if val != nil { | ||
obs.MaxFinalizedBlockNumber.Val = *val | ||
return | ||
} | ||
if ds.initialBlockNumber == nil { | ||
if obs.CurrentBlockNum.Err != nil { | ||
obs.MaxFinalizedBlockNumber.Err = fmt.Errorf("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed. No initialBlockNumber was set, tried to use current block number to determine maxFinalizedBlockNumber but got error: %w", obs.CurrentBlockNum.Err) | ||
} else { | ||
// Subract 1 here because we will later add 1 to the | ||
// maxFinalizedBlockNumber to get the first validFromBlockNum, which | ||
// ought to be the same as current block num. | ||
obs.MaxFinalizedBlockNumber.Val = obs.CurrentBlockNum.Val - 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggestion: How about using negative value (aka invalid) to denote There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber unset, using currentBlockNum=%d-1)", obs.MaxFinalizedBlockNumber.Val, obs.CurrentBlockNum.Val) | ||
} | ||
} else { | ||
// NOTE: It's important to subtract 1 if the server is missing any past | ||
// report (brand new feed) since we will add 1 to the | ||
// maxFinalizedBlockNumber to get the first validFromBlockNum, which | ||
// ought to be zero. | ||
// | ||
// If "initialBlockNumber" is set to zero, this will give a starting block of zero. | ||
obs.MaxFinalizedBlockNumber.Val = *ds.initialBlockNumber - 1 | ||
ds.lggr.Infof("FetchInitialMaxFinalizedBlockNumber returned empty LatestReport; this is a new feed so maxFinalizedBlockNumber=%d (initialBlockNumber=%d)", obs.MaxFinalizedBlockNumber.Val, *ds.initialBlockNumber) | ||
} | ||
}() | ||
} else { | ||
obs.MaxFinalizedBlockNumber.Err = errors.New("fetchMaxFinalizedBlockNum=false") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this because by far the most common case will to have this not be set