-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: add rsources stats for dropped events at processor #3852
Conversation
processor/processor.go
Outdated
@@ -2216,7 +2216,8 @@ func (proc *Handle) transformSrcDest( | |||
var successCountMetadataMap map[string]MetricMetadata | |||
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.UserTransformerStage, trackingPlanEnabled, transformationEnabled) | |||
failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.UserTransformerStage, transformationEnabled, trackingPlanEnabled) | |||
proc.saveFailedJobs(failedJobs) | |||
droppedJobs := proc.getDroppedJobs(response, eventList) | |||
proc.saveFailedJobs(append(failedJobs, droppedJobs...)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that for rudder sources' use cases both failed and dropped jobs in processor's pipeline could be considered as dropped (i.e. non-retryable)
- We could rename
proc.saveFailedJobs
toproc.saveDroppedJobs
rsources.NewFailedJobsCollector
torsources.NewDroppedJobsCollector
rsources.JobsFailed
torsources.JobsDropped
- Dropped jobs failed keys shouldn't be captured
- It would be better to report rsources dropped jobs during
Store
, once for all scenarios as the last operation of the method, after storing router, batchrouter & proc_errors
func (proc *Handle) getDroppedJobs(response transformer.Response, eventsToTransform []transformer.TransformerEvent) []*jobsdb.JobT { | ||
// each messageID is one event when sending to the transformer | ||
inputMessageIDs := lo.Map(eventsToTransform, func(e transformer.TransformerEvent, _ int) string { | ||
return e.Metadata.MessageID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused why processor's pipeline operates on the assumption that messageIDs are unique. Dedup is a feature that can be turned off, right? Am I missing some other important fact with respect to message ids and the processor's pipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is correct but dedup ideally should never be turned off
Also for many of the downstream destinations like Warehouse they do operate under the assumption that the primary key is the messageID which is unique
20511d7
to
6c16717
Compare
9eadca3
to
0dbe30f
Compare
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #3852 +/- ##
==========================================
- Coverage 69.07% 69.00% -0.08%
==========================================
Files 351 352 +1
Lines 52814 52858 +44
==========================================
- Hits 36480 36473 -7
- Misses 14043 14087 +44
- Partials 2291 2298 +7
☔ View full report in Codecov by Sentry. |
6514b03
to
dbbd0f7
Compare
6adcb4f
to
a544978
Compare
Description
Linear Ticket
Ensure drop events are marked as failed in source [job-status endpoint]
Security