-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KS-196] ReportCodec implementation for Streams trigger #13218
Conversation
bolekk
commented
May 16, 2024
•
edited
Loading
edited
- Implement Codec, which validates report signatures and decodes needed fields.
- Pass report context from Merucry Transmitter, which is needed to validate signatures.
- Update fake Syncer to run successful e2e tests.
49d7904
to
db12eea
Compare
func (c Codec) Unwrap(raw values.Value) ([]datastreams.FeedReport, error) { | ||
var _ datastreams.ReportCodec = &codec{} | ||
|
||
func (c *codec) UnwrapValid(wrapped values.Value, allowedSigners [][]byte, minRequiredSignatures int) ([]datastreams.FeedReport, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May I ask, why is the list of reports passed as a wrapped values.Value
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
values.Value is a universal wrapper for all data passed around during workflow execution. Trigger Service will wrap it and then the receiver DON will unwrap it here.
s.wg.Add(1) | ||
go s.launch(ctx) | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth implementing a StateMachine
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Syncer implementation will soon be replaced with a real one reading stuff from the chain. Right now it's here just to allow easy local and staging tests. Please ignore all shortcomings ;)
} | ||
s.subServices = append(s.subServices, triggerCap) | ||
} | ||
if slices.Contains(triggerDONPeers, myId) { | ||
s.lggr.Info("member of a capability DON - starting remote publishers") | ||
|
||
{ | ||
/*{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to delete this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to leave it for easier local tests - will be replaced within 2 weeks.
count++ | ||
if count > maxRetrySeconds { | ||
s.lggr.Error("failed to get Streams Trigger from the Registry") | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a permanent failure that leaves the node in a broken state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broken meaning unable to send trigger events but otherwise functional.
core/capabilities/syncer.go
Outdated
count := 0 | ||
for { | ||
count++ | ||
if count > maxRetrySeconds { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems confusingly named, count doesn't appear to be a timer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, will remove the "seconds" thanks
return | ||
} | ||
s.subServices = append(s.subServices, triggerCap) | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this be cleaner using a backoff.Backoff with permanent retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes but probably not worth it - again this Syncer won't be used in prod.
"0x9CcE7293a4Cc2621b61193135A95928735e4795F", | ||
"0x3c775F20bCB2108C1A818741Ce332Bb5fe0dB925", | ||
"0x50314239e2CF05555ceeD53E7F47eB2A8Eab0dbB", | ||
"0xd76A4f98898c3b9A72b244476d7337b50D54BCd8", | ||
"0x656A873f6895b8a03Fb112dE927d43FA54B2c92A", | ||
"0x5d1e87d87bF2e0cD4Ea64F381a2dbF45e5f0a553", | ||
"0x91d9b0062265514f012Eb8fABA59372fD9520f56", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heads up, that this will change to be bytes32
onchain (https://github.com/smartcontractkit/chainlink/pull/13183/files#diff-d94c1796b3544ef8780af7e16b5361c8fb9e4d63f492faeda6a3362a8363f51bR45-R51). We can do decoding once the Syncer is up, but not sure the best place to do that, thread: https://chainlink-core.slack.com/archives/C064418MJHE/p1716308390423929
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine. As long as the Syncer passes correct binary addresses to all services, we're good.
core/capabilities/syncer.go
Outdated
if err2 != nil { | ||
// NOTE: it's possible that the jobs are not launched yet at this moment. | ||
// If not found yet, Syncer won't add to Registry but retry on the next tick. | ||
s.lggr.Infow("trigger not found yet ...", "capabilityId", capId, "error", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.lggr.Infow("trigger not found yet ...", "capabilityId", capId, "error", err) | |
s.lggr.Infow("trigger not found yet ...", "capabilityId", capId, "error", err2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixing, thanks
workflowDONs := map[string]capabilities.DON{ | ||
workflowDonInfo.ID: workflowDonInfo, | ||
} | ||
triggerCap := remote.NewTriggerPublisher(config, underlying, triggerInfo, triggerCapabilityDonInfo, workflowDONs, s.dispatcher, s.lggr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, this should only happen when DON isPublic
, right? Currently, this assumes that the capability is always remote without actually checking for local existence, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct!
@@ -388,15 +388,22 @@ func (mt *mercuryTransmitter) HealthReport() map[string]error { | |||
return report | |||
} | |||
|
|||
func (mt *mercuryTransmitter) sendToTrigger(report ocrtypes.Report, signatures []ocrtypes.AttributedOnchainSignature) error { | |||
func (mt *mercuryTransmitter) sendToTrigger(report ocrtypes.Report, rawReportCtx [3][32]byte, signatures []ocrtypes.AttributedOnchainSignature) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this rawReportCtx [3][32]byte,
be based on a workflow spec config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How so? The context encodes OCR round and epoch in which the report was produced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it's not clear what this context is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be pretty clear in the context of the Transmitter because it already uses it elsewhere.
1. Implement Codec, which validates report signatures and decodes needed fields. 2. Pass report context from Merucry Transmitter, which is needed to validate signatures. 3. Update fake Syncer to run successful e2e tests.
Quality Gate passedIssues Measures |