diff --git a/processor/events_response.go b/processor/events_response.go index 6bc43476153..95686dfdacb 100644 --- a/processor/events_response.go +++ b/processor/events_response.go @@ -3,10 +3,11 @@ package processor import ( "time" + "github.com/samber/lo" + "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/processor/transformer" "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/samber/lo" ) func (proc *Handle) getDroppedJobs(response transformer.Response, eventsToTransform []transformer.TransformerEvent) []*jobsdb.JobT { @@ -18,20 +19,12 @@ func (proc *Handle) getDroppedJobs(response transformer.Response, eventsToTransf // in transformer response, multiple messageIDs could be batched together successFullMessageIDs := make([]string, 0) lo.ForEach(response.Events, func(e transformer.TransformerResponse, _ int) { - if len(e.Metadata.MessageIDs) > 0 { - successFullMessageIDs = append(successFullMessageIDs, e.Metadata.MessageIDs...) - } else { - successFullMessageIDs = append(successFullMessageIDs, e.Metadata.MessageID) - } + successFullMessageIDs = append(successFullMessageIDs, e.Metadata.GetMessagesIDs()...) }) // for failed as well failedMessageIDs := make([]string, 0) lo.ForEach(response.FailedEvents, func(e transformer.TransformerResponse, _ int) { - if len(e.Metadata.MessageIDs) > 0 { - failedMessageIDs = append(failedMessageIDs, e.Metadata.MessageIDs...) - } else { - failedMessageIDs = append(failedMessageIDs, e.Metadata.MessageID) - } + failedMessageIDs = append(failedMessageIDs, e.Metadata.GetMessagesIDs()...) }) // the remainder of the messageIDs are those that are dropped // we get jobs for those dropped messageIDs - for rsources_stats_collector @@ -40,7 +33,7 @@ func (proc *Handle) getDroppedJobs(response transformer.Response, eventsToTransf droppedJobs := make([]*jobsdb.JobT, 0) for _, e := range eventsToTransform { if _, ok := droppedMessageIDKeys[e.Metadata.MessageID]; ok { - var params = struct { + params := struct { SourceJobRunID string `json:"source_job_run_id"` SourceTaskRunID string `json:"source_task_run_id"` SourceID string `json:"source_id"` diff --git a/processor/processor.go b/processor/processor.go index 53cc9eb46f3..3b6ab26f8db 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -961,12 +961,12 @@ func (proc *Handle) getDestTransformerEvents(response transformer.Response, comm for i := range response.Events { // Update metrics maps userTransformedEvent := &response.Events[i] - var messages []types.SingularEventT - if len(userTransformedEvent.Metadata.MessageIDs) > 0 { - messages = lo.Map(userTransformedEvent.Metadata.MessageIDs, func(msgID string, _ int) types.SingularEventT { return eventsByMessageID[msgID].SingularEvent }) - } else { - messages = []types.SingularEventT{eventsByMessageID[userTransformedEvent.Metadata.MessageID].SingularEvent} - } + messages := lo.Map( + userTransformedEvent.Metadata.GetMessagesIDs(), + func(msgID string, _ int) types.SingularEventT { + return eventsByMessageID[msgID].SingularEvent + }, + ) for _, message := range messages { proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, stage, func() json.RawMessage { @@ -1181,12 +1181,12 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta var failedEventsToStore []*jobsdb.JobT for i := range response.FailedEvents { failedEvent := &response.FailedEvents[i] - var messages []types.SingularEventT - if len(failedEvent.Metadata.MessageIDs) > 0 { - messages = lo.Map(failedEvent.Metadata.MessageIDs, func(msgID string, _ int) types.SingularEventT { return eventsByMessageID[msgID].SingularEvent }) - } else { - messages = []types.SingularEventT{eventsByMessageID[failedEvent.Metadata.MessageID].SingularEvent} - } + messages := lo.Map( + failedEvent.Metadata.GetMessagesIDs(), + func(msgID string, _ int) types.SingularEventT { + return eventsByMessageID[msgID].SingularEvent + }, + ) payload, err := jsonfast.Marshal(messages) if err != nil { proc.logger.Errorf(`[Processor: getFailedEventJobs] Failed to unmarshal list of failed events: %v`, err) @@ -1825,6 +1825,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage) procErrorJobsByDestID := make(map[string][]*jobsdb.JobT) var batchDestJobs []*jobsdb.JobT var destJobs []*jobsdb.JobT + var droppedJobs []*jobsdb.JobT routerDestIDs := make(map[string]struct{}) destProcStart := time.Now() @@ -1856,6 +1857,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage) for o := range chOut { destJobs = append(destJobs, o.destJobs...) batchDestJobs = append(batchDestJobs, o.batchDestJobs...) + droppedJobs = append(droppedJobs, o.droppedJobs...) routerDestIDs = lo.Assign(routerDestIDs, o.routerDestIDs) in.reportMetrics = append(in.reportMetrics, o.reportMetrics...) for k, v := range o.errorsPerDestID { @@ -1874,6 +1876,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage) in.statusList, destJobs, batchDestJobs, + droppedJobs, procErrorJobsByDestID, in.procErrorJobs, @@ -1893,6 +1896,7 @@ type storeMessage struct { statusList []*jobsdb.JobStatusT destJobs []*jobsdb.JobT batchDestJobs []*jobsdb.JobT + droppedJobs []*jobsdb.JobT procErrorJobsByDestID map[string][]*jobsdb.JobT procErrorJobs []*jobsdb.JobT @@ -1913,6 +1917,7 @@ func (sm *storeMessage) merge(subJob *storeMessage) { sm.statusList = append(sm.statusList, subJob.statusList...) sm.destJobs = append(sm.destJobs, subJob.destJobs...) sm.batchDestJobs = append(sm.batchDestJobs, subJob.batchDestJobs...) + sm.droppedJobs = append(sm.droppedJobs, subJob.droppedJobs...) sm.procErrorJobs = append(sm.procErrorJobs, subJob.procErrorJobs...) for id, job := range subJob.procErrorJobsByDestID { @@ -2066,6 +2071,10 @@ func (proc *Handle) Store(partition string, in *storeMessage) { if err != nil { return fmt.Errorf("publishing rsources stats: %w", err) } + err = proc.saveDroppedJobs(in.droppedJobs, tx.Tx()) + if err != nil { + return fmt.Errorf("saving dropped jobs: %w", err) + } if proc.isReportingEnabled() { proc.reporting.Report(in.reportMetrics, tx.SqlTx()) @@ -2123,6 +2132,7 @@ type transformSrcDestOutput struct { batchDestJobs []*jobsdb.JobT errorsPerDestID map[string][]*jobsdb.JobT routerDestIDs map[string]struct{} + droppedJobs []*jobsdb.JobT } func (proc *Handle) transformSrcDest( @@ -2157,6 +2167,7 @@ func (proc *Handle) transformSrcDest( destJobs := make([]*jobsdb.JobT, 0) routerDestIDs := make(map[string]struct{}) procErrorJobsByDestID := make(map[string][]*jobsdb.JobT) + droppedJobs := make([]*jobsdb.JobT, 0) proc.config.configSubscriberLock.RLock() destType := proc.config.destinationIDtoTypeMap[destID] @@ -2225,8 +2236,7 @@ func (proc *Handle) transformSrcDest( var successCountMetadataMap map[string]MetricMetadata eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.UserTransformerStage, trackingPlanEnabled, transformationEnabled) failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.UserTransformerStage, transformationEnabled, trackingPlanEnabled) - droppedJobs := proc.getDroppedJobs(response, eventList) - proc.saveFailedJobs(append(failedJobs, droppedJobs...)) + droppedJobs = append(droppedJobs, proc.getDroppedJobs(response, eventList)...) if _, ok := procErrorJobsByDestID[destID]; !ok { procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) } @@ -2294,8 +2304,7 @@ func (proc *Handle) transformSrcDest( var successCountMetadataMap map[string]MetricMetadata eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled) failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled) - droppedJobs := proc.getDroppedJobs(response, eventsToTransform) - proc.saveFailedJobs(append(failedJobs, droppedJobs...)) + droppedJobs = append(droppedJobs, proc.getDroppedJobs(response, eventList)...) proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform)) // REPORTING - START @@ -2365,8 +2374,7 @@ func (proc *Handle) transformSrcDest( destTransformationStat.numEvents.Count(len(eventsToTransform)) destTransformationStat.numOutputSuccessEvents.Count(len(response.Events)) destTransformationStat.numOutputFailedEvents.Count(len(failedJobs)) - droppedJobs := proc.getDroppedJobs(response, eventsToTransform) - proc.saveFailedJobs(append(failedJobs, droppedJobs...)) + droppedJobs = append(droppedJobs, proc.getDroppedJobs(response, eventList)...) if _, ok := procErrorJobsByDestID[destID]; !ok { procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) @@ -2492,17 +2500,17 @@ func (proc *Handle) transformSrcDest( errorsPerDestID: procErrorJobsByDestID, reportMetrics: reportMetrics, routerDestIDs: routerDestIDs, + droppedJobs: droppedJobs, } } -func (proc *Handle) saveFailedJobs(failedJobs []*jobsdb.JobT) { +func (proc *Handle) saveDroppedJobs(failedJobs []*jobsdb.JobT, tx *jobsdb.Tx) error { if len(failedJobs) > 0 { - rsourcesStats := rsources.NewFailedJobsCollector(proc.rsourcesService) - rsourcesStats.JobsFailed(failedJobs) - _ = proc.writeErrorDB.WithTx(func(tx *jobsdb.Tx) error { - return rsourcesStats.Publish(context.TODO(), tx.Tx) - }) + rsourcesStats := rsources.NewDroppedJobsCollector(proc.rsourcesService) + rsourcesStats.JobsDropped(failedJobs) + return rsourcesStats.Publish(context.TODO(), tx.Tx) } + return nil } func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, filter bool) transformer.Response { diff --git a/processor/processor_test.go b/processor/processor_test.go index 1ff7e42c6cc..4a49588e226 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -1321,7 +1321,7 @@ var _ = Describe("Processor", Ordered, func() { } }) - c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil) + c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(nil) // one for newly stored jobs and one for dropped jobs c.mockArchivalDB.EXPECT(). WithStoreSafeTx( gomock.Any(), @@ -1795,11 +1795,6 @@ var _ = Describe("Processor", Ordered, func() { StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes() - // will be used to save failed events to failed keys table - c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { - _ = f(&jobsdb.Tx{}) - }).Times(1) - // One Store call is expected for all events c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1). Do(func(ctx context.Context, jobs []*jobsdb.JobT) { @@ -1938,10 +1933,6 @@ var _ = Describe("Processor", Ordered, func() { assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State) }) - c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) { - _ = f(&jobsdb.Tx{}) - }).Return(nil).Times(1) - // One Store call is expected for all events c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1). Do(func(ctx context.Context, jobs []*jobsdb.JobT) { diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index 11aea47846b..56b87d546e3 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -78,6 +78,13 @@ type Metadata struct { SourceDefinitionType string `json:"-"` } +func (m Metadata) GetMessagesIDs() []string { + if len(m.MessageIDs) > 0 { + return m.MessageIDs + } + return []string{m.MessageID} +} + type TransformerEvent struct { Message types.SingularEventT `json:"message"` Metadata Metadata `json:"metadata"` diff --git a/services/debugger/transformation/transformationStatusUploader.go b/services/debugger/transformation/transformationStatusUploader.go index e2784552b01..4665b07c8a2 100644 --- a/services/debugger/transformation/transformationStatusUploader.go +++ b/services/debugger/transformation/transformationStatusUploader.go @@ -370,8 +370,9 @@ func (h *Handle) processRecordTransformationStatus(tStatus *TransformationStatus } for _, failedEvent := range tStatus.FailedEvents { - if len(failedEvent.Metadata.MessageIDs) > 0 { - for _, msgID := range failedEvent.Metadata.MessageIDs { + messageIDs := failedEvent.Metadata.GetMessagesIDs() + for _, msgID := range messageIDs { + if msgID != "" { reportedMessageIDs[msgID] = struct{}{} singularEventWithReceivedAt := tStatus.EventsByMessageID[msgID] eventBefore := getEventBeforeTransform(singularEventWithReceivedAt.SingularEvent, singularEventWithReceivedAt.ReceivedAt) @@ -390,24 +391,6 @@ func (h *Handle) processRecordTransformationStatus(tStatus *TransformationStatus IsError: true, }) } - } else if failedEvent.Metadata.MessageID != "" { - reportedMessageIDs[failedEvent.Metadata.MessageID] = struct{}{} - singularEventWithReceivedAt := tStatus.EventsByMessageID[failedEvent.Metadata.MessageID] - eventBefore := getEventBeforeTransform(singularEventWithReceivedAt.SingularEvent, singularEventWithReceivedAt.ReceivedAt) - eventAfter := &EventsAfterTransform{ - Error: failedEvent.Error, - ReceivedAt: time.Now().Format(misc.RFC3339Milli), - StatusCode: failedEvent.StatusCode, - } - - h.RecordTransformationStatus(&TransformStatusT{ - TransformationID: tID, - SourceID: tStatus.SourceID, - DestinationID: tStatus.DestID, - EventBefore: eventBefore, - EventsAfter: eventAfter, - IsError: true, - }) } } diff --git a/services/rsources/stats_collector.go b/services/rsources/stats_collector.go index db019dc1aca..4f8f1c76c4a 100644 --- a/services/rsources/stats_collector.go +++ b/services/rsources/stats_collector.go @@ -44,7 +44,7 @@ type StatsCollector interface { // FailedJobsStatsCollector collects stats for failed jobs type FailedJobsStatsCollector interface { StatsPublisher - JobsFailed(jobs []*jobsdb.JobT) + JobsDropped(jobs []*jobsdb.JobT) } // NewStatsCollector creates a new stats collector @@ -58,9 +58,10 @@ func NewStatsCollector(jobservice JobService) StatsCollector { } } -// NewFailedJobsCollector creates a new stats collector for publishing failed job stats and records -func NewFailedJobsCollector(jobservice JobService) FailedJobsStatsCollector { +// NewDroppedJobsCollector creates a new stats collector for publishing failed job stats and records +func NewDroppedJobsCollector(jobservice JobService) FailedJobsStatsCollector { return &statsCollector{ + skipFailedRecords: true, jobService: jobservice, jobIdsToStatKeyIndex: map[int64]statKey{}, jobIdsToRecordIdIndex: map[int64]json.RawMessage{}, @@ -81,6 +82,7 @@ func (sk statKey) String() string { var _ StatsCollector = (*statsCollector)(nil) type statsCollector struct { + skipFailedRecords bool processing bool jobService JobService jobIdsToStatKeyIndex map[int64]statKey @@ -115,7 +117,7 @@ func (r *statsCollector) JobsStored(jobs []*jobsdb.JobT) { r.buildStats(jobs, nil, true) } -func (r *statsCollector) JobsFailed(jobs []*jobsdb.JobT) { +func (r *statsCollector) JobsDropped(jobs []*jobsdb.JobT) { r.processing = true r.buildStats(jobs, nil, true) jobStatuses := make([]*jobsdb.JobStatusT, 0, len(jobs)) @@ -247,13 +249,16 @@ func (r *statsCollector) buildStats(jobs []*jobsdb.JobT, failedJobs map[uuid.UUI if incrementIn { stats.In++ } + r.jobIdsToStatKeyIndex[job.JobID] = sk + if r.skipFailedRecords { + continue + } if recordId != "" && recordId != "null" && recordId != `""` { recordIdJson := json.RawMessage(recordId) if json.Valid(recordIdJson) { r.jobIdsToRecordIdIndex[job.JobID] = recordIdJson } } - r.jobIdsToStatKeyIndex[job.JobID] = sk } } } diff --git a/services/rsources/stats_collector_test.go b/services/rsources/stats_collector_test.go index 214dd8fd9a0..f32dbbf2ef6 100644 --- a/services/rsources/stats_collector_test.go +++ b/services/rsources/stats_collector_test.go @@ -31,7 +31,7 @@ var _ = Describe("Using StatsCollector", Serial, func() { mockCtrl = gomock.NewController(GinkgoT()) js = NewMockJobService(mockCtrl) statsCollector = NewStatsCollector(js) - failedRecordsCollector = NewFailedJobsCollector(js) + failedRecordsCollector = NewDroppedJobsCollector(js) jobs = []*jobsdb.JobT{} jobErrors = map[uuid.UUID]string{} jobStatuses = []*jobsdb.JobStatusT{} @@ -340,9 +340,9 @@ var _ = Describe("Using StatsCollector", Serial, func() { }) }) - Context("it calls failedRecordsCollector.JobsFailed", func() { + Context("it calls failedRecordsCollector.JobsDropped", func() { BeforeEach(func() { - failedRecordsCollector.JobsFailed(jobs) + failedRecordsCollector.JobsDropped(jobs) }) It("publishes both in and out stats and adds failed records", func() { @@ -362,23 +362,6 @@ var _ = Describe("Using StatsCollector", Serial, func() { }). Times(1) - failedRecords := []json.RawMessage{} - for i := 0; i < len(jobs); i++ { - failedRecords = append(failedRecords, []byte(`"recordId"`)) - } - js.EXPECT(). - AddFailedRecords( - gomock.Any(), - gomock.Any(), - params.JobRunID, - JobTargetKey{ - TaskRunID: params.TaskRunID, - SourceID: params.SourceID, - DestinationID: params.DestinationID, - }, - failedRecords). - Times(1) - err := failedRecordsCollector.Publish(context.TODO(), nil) Expect(err).To(BeNil()) })