diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 38aa49a613..244c307ded 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -600,6 +600,7 @@ var ( Succeeded = jobStateT{isValid: true, isTerminal: true, State: "succeeded"} Aborted = jobStateT{isValid: true, isTerminal: true, State: "aborted"} Migrated = jobStateT{isValid: true, isTerminal: true, State: "migrated"} + Filtered = jobStateT{isValid: true, isTerminal: true, State: "filtered"} validTerminalStates []string validNonTerminalStates []string @@ -615,6 +616,7 @@ var jobStates = []jobStateT{ Aborted, Migrated, Importing, + Filtered, } // OwnerType for this jobsdb instance diff --git a/processor/eventfilter/eventfilter.go b/processor/eventfilter/eventfilter.go index 96439a4081..f34a3c516f 100644 --- a/processor/eventfilter/eventfilter.go +++ b/processor/eventfilter/eventfilter.go @@ -128,13 +128,11 @@ func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEve messageType := strings.TrimSpace(strings.ToLower(getMessageType(&transformerEvent.Message))) if messageType == "" { // We will abort the event - errMessage := "Invalid message type. Type assertion failed" - resp := &transformer.TransformerResponse{ + return false, &transformer.TransformerResponse{ Output: transformerEvent.Message, StatusCode: 400, Metadata: transformerEvent.Metadata, - Error: errMessage, + Error: "Invalid message type. Type assertion failed", } - return false, resp } isSupportedMsgType := slices.Contains(supportedMsgTypes, messageType) @@ -143,12 +141,16 @@ func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEve "supportedMsgTypes", supportedMsgTypes, "messageType", messageType, ) // We will not allow the event - return false, nil + return false, &transformer.TransformerResponse{ + Output: transformerEvent.Message, StatusCode: types.FilterEventCode, + Metadata: transformerEvent.Metadata, + Error: "Message type not supported", + } } // MessageType filtering -- ENDS // hybridModeCloudEventsFilter.srcType.[eventProperty] filtering -- STARTS - allow, failedResponse := FilterEventsForHybridMode(ConnectionModeFilterParams{ + allow := FilterEventsForHybridMode(ConnectionModeFilterParams{ Destination: &transformerEvent.Destination, SrcType: transformerEvent.Metadata.SourceDefinitionType, Event: &EventParams{MessageType: messageType}, @@ -163,7 +165,11 @@ func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEve }) if !allow { - return allow, failedResponse + return allow, &transformer.TransformerResponse{ + Output: transformerEvent.Message, StatusCode: types.FilterEventCode, + Metadata: transformerEvent.Metadata, + Error: "Filtering event based on hybridModeFilter", + } } // hybridModeCloudEventsFilter.srcType.[eventProperty] filtering -- ENDS @@ -195,7 +201,7 @@ Example: ... } */ -func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) (bool, *transformer.TransformerResponse) { +func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) bool { destination := connectionModeFilterParams.Destination srcType := strings.TrimSpace(connectionModeFilterParams.SrcType) messageType := connectionModeFilterParams.Event.MessageType @@ -203,30 +209,30 @@ func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterPa if srcType == "" { pkgLogger.Debug("sourceType is empty string, filtering event based on default behaviour") - return evaluatedDefaultBehaviour, nil + return evaluatedDefaultBehaviour } destConnModeI := misc.MapLookup(destination.Config, "connectionMode") if destConnModeI == nil { pkgLogger.Debug("connectionMode not present, filtering event based on default behaviour") - return evaluatedDefaultBehaviour, nil + return evaluatedDefaultBehaviour } destConnectionMode, isDestConnModeString := destConnModeI.(string) if !isDestConnModeString || destConnectionMode != hybridMode { pkgLogger.Debugf("Provided connectionMode(%v) is in wrong format or the mode is not %q, filtering event based on default behaviour", destConnModeI, hybridMode) - return evaluatedDefaultBehaviour, nil + return evaluatedDefaultBehaviour } sourceEventPropertiesI := misc.MapLookup(destination.DestinationDefinition.Config, hybridModeEventsFilterKey, srcType) if sourceEventPropertiesI == nil { pkgLogger.Debugf("Destination definition config doesn't contain proper values for %[1]v or %[1]v.%[2]v", hybridModeEventsFilterKey, srcType) - return evaluatedDefaultBehaviour, nil + return evaluatedDefaultBehaviour } eventProperties, isOk := sourceEventPropertiesI.(map[string]interface{}) if !isOk || len(eventProperties) == 0 { pkgLogger.Debugf("'%v.%v' is not correctly defined", hybridModeEventsFilterKey, srcType) - return evaluatedDefaultBehaviour, nil + return evaluatedDefaultBehaviour } // Flag indicating to let the event pass through @@ -251,7 +257,7 @@ func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterPa allowEvent = slices.Contains(messageTypes, messageType) && evaluatedDefaultBehaviour } } - return allowEvent, nil + return allowEvent } type EventPropsTypes interface { diff --git a/processor/eventfilter/eventfilter_test.go b/processor/eventfilter/eventfilter_test.go index bdccf814a1..be6d7f1145 100644 --- a/processor/eventfilter/eventfilter_test.go +++ b/processor/eventfilter/eventfilter_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/processor/transformer" ) func TestFilterEventsForHybridMode(t *testing.T) { @@ -402,7 +403,7 @@ func TestFilterEventsForHybridMode(t *testing.T) { }, } - actualOutput, _ := FilterEventsForHybridMode( + actualOutput := FilterEventsForHybridMode( ConnectionModeFilterParams{ SrcType: source.SourceDefinition.Type, Destination: destination, @@ -453,3 +454,64 @@ func TestConvertToArrayOfType(t *testing.T) { }) } } + +func TestAllowEventToDestTransformation(t *testing.T) { + type testCaseT struct { + caseName string + transformerEvent *transformer.TransformerEvent + expected bool + supportedMsgTypes []string + expectedResp *transformer.TransformerResponse + } + + testCases := []testCaseT{ + { + caseName: "if message type is invalid, return false with statusCode 400", + transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": ""}}, + expected: false, + supportedMsgTypes: []string{"track"}, + expectedResp: &transformer.TransformerResponse{ + Output: map[string]interface{}{"type": ""}, + StatusCode: 400, + Error: "Invalid message type. Type assertion failed", + }, + }, + { + caseName: "if message type is unsupported, return false with statusCode 298", + transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": "identify"}}, + expected: false, + supportedMsgTypes: []string{"track"}, + expectedResp: &transformer.TransformerResponse{ + Output: map[string]interface{}{"type": "identify"}, + StatusCode: 298, + Error: "Message type not supported", + }, + }, + { + caseName: "if event is filtered due to FilterEventsForHybridMode, return statusCode 298", + transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": "track"}, Metadata: transformer.Metadata{}}, + expected: false, + supportedMsgTypes: []string{"track"}, + expectedResp: &transformer.TransformerResponse{ + Output: map[string]interface{}{"type": "track"}, + StatusCode: 298, + Error: "Filtering event based on hybridModeFilter", + }, + }, + { + caseName: "if event is legit, return true with nil response", + transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": "track"}, Destination: backendconfig.DestinationT{IsProcessorEnabled: true}}, + expected: true, + supportedMsgTypes: []string{"track"}, + expectedResp: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.caseName, func(t *testing.T) { + allow, resp := AllowEventToDestTransformation(tc.transformerEvent, tc.supportedMsgTypes) + require.Equal(t, tc.expected, allow) + require.Equal(t, tc.expectedResp, resp) + }) + } +} diff --git a/processor/processor.go b/processor/processor.go index d8d390674d..c08a5f4f6b 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -193,10 +193,11 @@ var defaultTransformerFeatures = `{ }` type DestStatT struct { - numEvents stats.Measurement - numOutputSuccessEvents stats.Measurement - numOutputFailedEvents stats.Measurement - transformTime stats.Measurement + numEvents stats.Measurement + numOutputSuccessEvents stats.Measurement + numOutputFailedEvents stats.Measurement + numOutputFilteredEvents stats.Measurement + transformTime stats.Measurement } type ParametersT struct { @@ -233,6 +234,15 @@ type MetricMetadata struct { trackingPlanVersion int } +type NonSuccessfulTransformationMetrics struct { + failedJobs []*jobsdb.JobT + failedMetrics []*types.PUReportedMetric + failedCountMap map[string]int64 + filteredJobs []*jobsdb.JobT + filteredMetrics []*types.PUReportedMetric + filteredCountMap map[string]int64 +} + type ( SourceIDT string ) @@ -269,13 +279,18 @@ func (proc *Handle) newUserTransformationStat(sourceID, workspaceID string, dest errTags := misc.CopyStringMap(tags) errTags["error"] = "true" numOutputFailedEvents := proc.statsFactory.NewTaggedStat("proc_transform_stage_out_count", stats.CountType, errTags) + + filterTags := misc.CopyStringMap(tags) + filterTags["error"] = "filtered" + numOutputFilteredEvents := proc.statsFactory.NewTaggedStat("proc_transform_stage_out_count", stats.CountType, filterTags) transformTime := proc.statsFactory.NewTaggedStat("proc_transform_stage_duration", stats.TimerType, tags) return &DestStatT{ - numEvents: numEvents, - numOutputSuccessEvents: numOutputSuccessEvents, - numOutputFailedEvents: numOutputFailedEvents, - transformTime: transformTime, + numEvents: numEvents, + numOutputSuccessEvents: numOutputSuccessEvents, + numOutputFailedEvents: numOutputFailedEvents, + numOutputFilteredEvents: numOutputFilteredEvents, + transformTime: transformTime, } } @@ -291,13 +306,18 @@ func (proc *Handle) newDestinationTransformationStat(sourceID, workspaceID, tran errTags := misc.CopyStringMap(tags) errTags["error"] = "true" numOutputFailedEvents := proc.statsFactory.NewTaggedStat("proc_transform_stage_out_count", stats.CountType, errTags) + + filterTags := misc.CopyStringMap(tags) + filterTags["error"] = "filtered" + numOutputFilteredEvents := proc.statsFactory.NewTaggedStat("proc_transform_stage_out_count", stats.CountType, filterTags) destTransform := proc.statsFactory.NewTaggedStat("proc_transform_stage_duration", stats.TimerType, tags) return &DestStatT{ - numEvents: numEvents, - numOutputSuccessEvents: numOutputSuccessEvents, - numOutputFailedEvents: numOutputFailedEvents, - transformTime: destTransform, + numEvents: numEvents, + numOutputSuccessEvents: numOutputSuccessEvents, + numOutputFailedEvents: numOutputFailedEvents, + numOutputFilteredEvents: numOutputFilteredEvents, + transformTime: destTransform, } } @@ -311,13 +331,18 @@ func (proc *Handle) newEventFilterStat(sourceID, workspaceID string, destination errTags := misc.CopyStringMap(tags) errTags["error"] = "true" numOutputFailedEvents := proc.statsFactory.NewTaggedStat("proc_event_filter_out_count", stats.CountType, errTags) + + filterTags := misc.CopyStringMap(tags) + filterTags["error"] = "filtered" + numOutputFilteredEvents := proc.statsFactory.NewTaggedStat("proc_event_filter_out_count", stats.CountType, filterTags) eventFilterTime := proc.statsFactory.NewTaggedStat("proc_event_filter_time", stats.TimerType, tags) return &DestStatT{ - numEvents: numEvents, - numOutputSuccessEvents: numOutputSuccessEvents, - numOutputFailedEvents: numOutputFailedEvents, - transformTime: eventFilterTime, + numEvents: numEvents, + numOutputSuccessEvents: numOutputSuccessEvents, + numOutputFailedEvents: numOutputFailedEvents, + numOutputFilteredEvents: numOutputFilteredEvents, + transformTime: eventFilterTime, } } @@ -1167,14 +1192,26 @@ func (proc *Handle) updateMetricMaps( } } -func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, stage string, transformationEnabled, trackingPlanEnabled bool) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) { - failedMetrics := make([]*types.PUReportedMetric, 0) +func (proc *Handle) getNonSuccessfulMetrics(response transformer.Response, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, stage string, transformationEnabled, trackingPlanEnabled bool) *NonSuccessfulTransformationMetrics { + m := &NonSuccessfulTransformationMetrics{} + + grouped := lo.GroupBy(response.FailedEvents, func(event transformer.TransformerResponse) bool { return event.StatusCode == types.FilterEventCode }) + filtered, failed := grouped[true], grouped[false] + + m.filteredJobs, m.filteredMetrics, m.filteredCountMap = proc.getTransformationMetrics(filtered, jobsdb.Filtered.State, commonMetaData, eventsByMessageID, stage, transformationEnabled, trackingPlanEnabled) + m.failedJobs, m.failedMetrics, m.failedCountMap = proc.getTransformationMetrics(failed, jobsdb.Aborted.State, commonMetaData, eventsByMessageID, stage, transformationEnabled, trackingPlanEnabled) + + return m +} + +func (proc *Handle) getTransformationMetrics(transformerResponses []transformer.TransformerResponse, state string, commonMetaData *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt, stage string, transformationEnabled, trackingPlanEnabled bool) ([]*jobsdb.JobT, []*types.PUReportedMetric, map[string]int64) { + metrics := make([]*types.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*types.ConnectionDetails) statusDetailsMap := make(map[string]map[string]*types.StatusDetail) - failedCountMap := make(map[string]int64) - var failedEventsToStore []*jobsdb.JobT - for i := range response.FailedEvents { - failedEvent := &response.FailedEvents[i] + countMap := make(map[string]int64) + var jobs []*jobsdb.JobT + for i := range transformerResponses { + failedEvent := &transformerResponses[i] messages := lo.Map( failedEvent.Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT { @@ -1183,18 +1220,18 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta ) payload, err := jsonfast.Marshal(messages) if err != nil { - proc.logger.Errorf(`[Processor: getFailedEventJobs] Failed to unmarshal list of failed events: %v`, err) + proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal list of failed events: %v`, err) continue } for _, message := range messages { - proc.updateMetricMaps(nil, failedCountMap, connectionDetailsMap, statusDetailsMap, failedEvent, jobsdb.Aborted.State, stage, func() json.RawMessage { + proc.updateMetricMaps(nil, countMap, connectionDetailsMap, statusDetailsMap, failedEvent, state, stage, func() json.RawMessage { if proc.transientSources.Apply(commonMetaData.SourceID) { return []byte(`{}`) } sampleEvent, err := jsonfast.Marshal(message) if err != nil { - proc.logger.Errorf(`[Processor: getFailedEventJobs] Failed to unmarshal first element in failed events: %v`, err) + proc.logger.Errorf(`[Processor: getTransformationMetrics] Failed to unmarshal first element in failed events: %v`, err) sampleEvent = []byte(`{}`) } return sampleEvent @@ -1202,7 +1239,7 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta } proc.logger.Debugf( - "[Processor: getFailedEventJobs] Error [%d] for source %q and destination %q: %s", + "[Processor: getTransformationMetrics] Error [%d] for source %q and destination %q: %s", failedEvent.StatusCode, commonMetaData.SourceID, commonMetaData.DestinationID, failedEvent.Error, ) @@ -1237,7 +1274,7 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta UserID: failedEvent.Metadata.RudderID, WorkspaceId: failedEvent.Metadata.WorkspaceID, } - failedEventsToStore = append(failedEventsToStore, &newFailedJob) + jobs = append(jobs, &newFailedJob) procErrorStat := stats.Default.NewTaggedStat("proc_error_counts", stats.CountType, stats.Tags{ "destName": commonMetaData.DestinationType, @@ -1285,13 +1322,13 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta PUDetails: *types.CreatePUDetails(inPU, pu, false, false), StatusDetail: sd, } - failedMetrics = append(failedMetrics, m) + metrics = append(metrics, m) } } } // REPORTING - END - return failedEventsToStore, failedMetrics, failedCountMap + return jobs, metrics, countMap } func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, sourceId string) { @@ -1337,7 +1374,7 @@ func (proc *Handle) updateSourceEventStatsDetailed(event types.SingularEventT, s } } -func getDiffMetrics(inPU, pu string, inCountMetadataMap map[string]MetricMetadata, inCountMap, successCountMap, failedCountMap map[string]int64) []*types.PUReportedMetric { +func getDiffMetrics(inPU, pu string, inCountMetadataMap map[string]MetricMetadata, inCountMap, successCountMap, failedCountMap, filteredCountMap map[string]int64) []*types.PUReportedMetric { // Calculate diff and append to reportMetrics // diff = successCount + abortCount - inCount diffMetrics := make([]*types.PUReportedMetric, 0) @@ -1353,7 +1390,8 @@ func getDiffMetrics(inPU, pu string, inCountMetadataMap map[string]MetricMetadat } successCount := successCountMap[key] failedCount := failedCountMap[key] - diff := successCount + failedCount - inCount + filteredCount := filteredCountMap[key] + diff := successCount + failedCount + filteredCount - inCount if diff != 0 { metricMetadata := inCountMetadataMap[key] metric := &types.PUReportedMetric{ @@ -1683,6 +1721,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *transf inCountMap, outCountMap, map[string]int64{}, + map[string]int64{}, ) reportMetrics = append(reportMetrics, diffMetrics...) } @@ -2245,14 +2284,15 @@ func (proc *Handle) transformSrcDest( var successCountMap map[string]int64 var successCountMetadataMap map[string]MetricMetadata eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.UserTransformerStage, trackingPlanEnabled, transformationEnabled) - failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.UserTransformerStage, transformationEnabled, trackingPlanEnabled) - droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), failedJobs...)...) + nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, transformer.UserTransformerStage, transformationEnabled, trackingPlanEnabled) + droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventList), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := procErrorJobsByDestID[destID]; !ok { procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) } - procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], failedJobs...) + procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...) userTransformationStat.numOutputSuccessEvents.Count(len(eventsToTransform)) - userTransformationStat.numOutputFailedEvents.Count(len(failedJobs)) + userTransformationStat.numOutputFailedEvents.Count(len(nonSuccessMetrics.failedJobs)) + userTransformationStat.numOutputFilteredEvents.Count(len(nonSuccessMetrics.filteredJobs)) proc.logger.Debug("Custom Transform output size", len(eventsToTransform)) trace.Logf(ctx, "UserTransform", "User Transform output size: %d", len(eventsToTransform)) @@ -2266,10 +2306,12 @@ func (proc *Handle) transformSrcDest( inCountMetadataMap, inCountMap, successCountMap, - failedCountMap, + nonSuccessMetrics.failedCountMap, + nonSuccessMetrics.filteredCountMap, ) reportMetrics = append(reportMetrics, successMetrics...) - reportMetrics = append(reportMetrics, failedMetrics...) + reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) + reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) reportMetrics = append(reportMetrics, diffMetrics...) // successCountMap will be inCountMap for filtering events based on supported event types @@ -2313,8 +2355,8 @@ func (proc *Handle) transformSrcDest( var successMetrics []*types.PUReportedMetric var successCountMap map[string]int64 var successCountMetadataMap map[string]MetricMetadata - failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled) - droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), failedJobs...)...) + nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled) + droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled) proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform)) @@ -2331,9 +2373,10 @@ func (proc *Handle) transformSrcDest( } } - diffMetrics := getDiffMetrics(inPU, types.EVENT_FILTER, inCountMetadataMap, inCountMap, successCountMap, failedCountMap) + diffMetrics := getDiffMetrics(inPU, types.EVENT_FILTER, inCountMetadataMap, inCountMap, successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap) reportMetrics = append(reportMetrics, successMetrics...) - reportMetrics = append(reportMetrics, failedMetrics...) + reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) + reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) reportMetrics = append(reportMetrics, diffMetrics...) // successCountMap will be inCountMap for destination transform @@ -2344,7 +2387,8 @@ func (proc *Handle) transformSrcDest( eventFilterStat := proc.newEventFilterStat(sourceID, workspaceID, destination) eventFilterStat.numEvents.Count(eventFilterInCount) eventFilterStat.numOutputSuccessEvents.Count(len(response.Events)) - eventFilterStat.numOutputFailedEvents.Count(len(failedJobs)) + eventFilterStat.numOutputFailedEvents.Count(len(nonSuccessMetrics.failedJobs)) + eventFilterStat.numOutputFilteredEvents.Count(len(nonSuccessMetrics.filteredJobs)) eventFilterStat.transformTime.Since(s) // Filtering events based on the supported message types - END @@ -2379,19 +2423,20 @@ func (proc *Handle) transformSrcDest( proc.logger.Debugf("Dest Transform output size %d", len(response.Events)) trace.Logf(ctx, "DestTransform", "output size %d", len(response.Events)) - failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs( + nonSuccessMetrics := proc.getNonSuccessfulMetrics( response, commonMetaData, eventsByMessageID, transformer.DestTransformerStage, transformationEnabled, trackingPlanEnabled, ) destTransformationStat.numEvents.Count(len(eventsToTransform)) destTransformationStat.numOutputSuccessEvents.Count(len(response.Events)) - destTransformationStat.numOutputFailedEvents.Count(len(failedJobs)) - droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), failedJobs...)...) + destTransformationStat.numOutputFailedEvents.Count(len(nonSuccessMetrics.failedJobs)) + destTransformationStat.numOutputFilteredEvents.Count(len(nonSuccessMetrics.filteredJobs)) + droppedJobs = append(droppedJobs, append(proc.getDroppedJobs(response, eventsToTransform), append(nonSuccessMetrics.failedJobs, nonSuccessMetrics.filteredJobs...)...)...) if _, ok := procErrorJobsByDestID[destID]; !ok { procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0) } - procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], failedJobs...) + procErrorJobsByDestID[destID] = append(procErrorJobsByDestID[destID], nonSuccessMetrics.failedJobs...) // REPORTING - PROCESSOR metrics - START if proc.isReportingEnabled() { @@ -2416,9 +2461,10 @@ func (proc *Handle) transformSrcDest( } } - diffMetrics := getDiffMetrics(types.EVENT_FILTER, types.DEST_TRANSFORMER, inCountMetadataMap, inCountMap, successCountMap, failedCountMap) + diffMetrics := getDiffMetrics(types.EVENT_FILTER, types.DEST_TRANSFORMER, inCountMetadataMap, inCountMap, successCountMap, nonSuccessMetrics.failedCountMap, nonSuccessMetrics.filteredCountMap) - reportMetrics = append(reportMetrics, failedMetrics...) + reportMetrics = append(reportMetrics, nonSuccessMetrics.failedMetrics...) + reportMetrics = append(reportMetrics, nonSuccessMetrics.filteredMetrics...) reportMetrics = append(reportMetrics, successMetrics...) reportMetrics = append(reportMetrics, diffMetrics...) } @@ -2538,7 +2584,6 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, // filter unsupported message types var resp transformer.TransformerResponse - var errMessage string for i := range events { event := &events[i] @@ -2571,12 +2616,13 @@ func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, messageEvent, typOk := event.Message["event"].(string) if !typOk { // add to FailedEvents - errMessage = "Invalid message event. Type assertion failed" - resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 400, Metadata: event.Metadata, Error: errMessage} + resp = transformer.TransformerResponse{Output: event.Message, StatusCode: 400, Metadata: event.Metadata, Error: "Invalid message event. Type assertion failed"} failedEvents = append(failedEvents, resp) continue } if !slices.Contains(supportedEvents.values, messageEvent) { + resp = transformer.TransformerResponse{Output: event.Message, StatusCode: types.FilterEventCode, Metadata: event.Metadata, Error: "Event not supported"} + failedEvents = append(failedEvents, resp) continue } } diff --git a/processor/processor_test.go b/processor/processor_test.go index f9390bcee5..d952226a1b 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -2135,6 +2135,7 @@ var _ = Describe("Processor", Ordered, func() { Expect(processor.Start(ctx)).To(BeNil()) }) }) + Context("isDestinationEnabled", func() { It("should filter based on consent management preferences", func() { event := types.SingularEventT{ @@ -2187,6 +2188,89 @@ var _ = Describe("Processor", Ordered, func() { Expect(processor.isDestinationAvailable(event, SourceID3)).To(BeTrue()) }) }) + + Context("getNonSuccessfulMetrics", func() { + It("getNonSuccessfulMetrics", func() { + event1 := types.SingularEventT{ + "event": "Demo Track1", + "messageId": "msg1", + } + event2 := types.SingularEventT{ + "event": "Demo Track2", + "messageId": "msg2", + } + event3 := types.SingularEventT{ + "event": "Demo Track3", + "messageId": "msg3", + } + + c.mockGatewayJobsDB.EXPECT().DeleteExecuting().Times(1) + + mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) + + processor := prepareHandle(NewHandle(mockTransformer)) + + Setup(processor, c, false, true) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) + + commonMetadata := transformer.Metadata{SourceID: SourceIDEnabled, DestinationID: DestinationIDEnabledA} + singularEventWithReceivedAt1 := types.SingularEventWithReceivedAt{ + SingularEvent: event1, + ReceivedAt: time.Now(), + } + singularEventWithReceivedAt2 := types.SingularEventWithReceivedAt{ + SingularEvent: event2, + ReceivedAt: time.Now(), + } + singularEventWithReceivedAt3 := types.SingularEventWithReceivedAt{ + SingularEvent: event3, + ReceivedAt: time.Now(), + } + eventsByMessageID := map[string]types.SingularEventWithReceivedAt{ + "msg1": singularEventWithReceivedAt1, + "msg2": singularEventWithReceivedAt2, + "msg3": singularEventWithReceivedAt3, + } + metadata1 := commonMetadata + metadata1.MessageID = "msg1" + metadata2 := commonMetadata + metadata2.MessageID = "msg2" + metadata3 := commonMetadata + metadata3.MessageID = "msg3" + + FailedEvents := []transformer.TransformerResponse{ + {StatusCode: 400, Metadata: metadata1, Output: event1}, + {StatusCode: 298, Metadata: metadata2, Output: event2}, + {StatusCode: 299, Metadata: metadata3, Output: event2}, + } + + transformerResponse := transformer.Response{ + Events: []transformer.TransformerResponse{}, + FailedEvents: FailedEvents, + } + + m := processor.getNonSuccessfulMetrics(transformerResponse, + &commonMetadata, + eventsByMessageID, + transformer.DestTransformerStage, + false, + false) + + key := fmt.Sprintf("%s!<<#>>!%s!<<#>>!%s!<<#>>!%s!<<#>>!%s", commonMetadata.SourceID, commonMetadata.DestinationID, commonMetadata.SourceJobRunID, commonMetadata.EventName, commonMetadata.EventType) + + Expect(len(m.failedJobs)).To(Equal(2)) + Expect(len(m.failedMetrics)).To(Equal(2)) + Expect(m.failedMetrics[0].StatusDetail.StatusCode).To(Equal(400)) + Expect(m.failedMetrics[1].StatusDetail.StatusCode).To(Equal(299)) + Expect(int(m.failedCountMap[key])).To(Equal(2)) + + Expect(len(m.filteredJobs)).To(Equal(1)) + Expect(len(m.filteredMetrics)).To(Equal(1)) + Expect(int(m.filteredCountMap[key])).To(Equal(1)) + }) + }) }) var _ = Describe("Static Function Tests", func() { @@ -2246,7 +2330,7 @@ var _ = Describe("Static Function Tests", func() { Context("getDiffMetrics Tests", func() { It("Should match diffMetrics response for Empty Inputs", func() { - response := getDiffMetrics("some-string-1", "some-string-2", map[string]MetricMetadata{}, map[string]int64{}, map[string]int64{}, map[string]int64{}) + response := getDiffMetrics("some-string-1", "some-string-2", map[string]MetricMetadata{}, map[string]int64{}, map[string]int64{}, map[string]int64{}, map[string]int64{}) Expect(len(response)).To(Equal(0)) }) @@ -2280,6 +2364,10 @@ var _ = Describe("Static Function Tests", func() { "some-key-1": 1, "some-key-2": 2, } + filteredCountMap := map[string]int64{ + "some-key-1": 2, + "some-key-2": 3, + } expectedResponse := []*types.PUReportedMetric{ { @@ -2298,7 +2386,7 @@ var _ = Describe("Static Function Tests", func() { }, StatusDetail: &types.StatusDetail{ Status: "diff", - Count: 3, + Count: 5, StatusCode: 0, SampleResponse: "", SampleEvent: []byte(`{}`), @@ -2320,7 +2408,7 @@ var _ = Describe("Static Function Tests", func() { }, StatusDetail: &types.StatusDetail{ Status: "diff", - Count: 4, + Count: 7, StatusCode: 0, SampleResponse: "", SampleEvent: []byte(`{}`), @@ -2328,7 +2416,7 @@ var _ = Describe("Static Function Tests", func() { }, } - response := getDiffMetrics("some-string-1", "some-string-2", inCountMetadataMap, inCountMap, successCountMap, failedCountMap) + response := getDiffMetrics("some-string-1", "some-string-2", inCountMetadataMap, inCountMap, successCountMap, failedCountMap, filteredCountMap) assertReportMetric(expectedResponse, response) }) }) @@ -2468,6 +2556,12 @@ var _ = Describe("Static Function Tests", func() { }, }, FailedEvents: []transformer.TransformerResponse{ + { + Output: events[1].Message, + StatusCode: 298, + Metadata: events[1].Metadata, + Error: "Message type not supported", + }, { Output: events[2].Message, StatusCode: 400, @@ -2535,6 +2629,12 @@ var _ = Describe("Static Function Tests", func() { }, }, FailedEvents: []transformer.TransformerResponse{ + { + Output: events[0].Message, + StatusCode: 298, + Metadata: events[0].Metadata, + Error: "Message type not supported", + }, { Output: events[2].Message, StatusCode: 400, @@ -2727,6 +2827,12 @@ var _ = Describe("Static Function Tests", func() { }, }, FailedEvents: []transformer.TransformerResponse{ + { + Output: events[0].Message, + StatusCode: 298, + Metadata: events[0].Metadata, + Error: "Event not supported", + }, { Output: events[2].Message, StatusCode: 400, @@ -2885,13 +2991,21 @@ var _ = Describe("Static Function Tests", func() { StatusCode: 200, Metadata: events[0].Metadata, }, - // { - // Output: events[1].Message, - // StatusCode: 200, - // Metadata: events[1].Metadata, - // }, }, - FailedEvents: nil, + FailedEvents: []transformer.TransformerResponse{ + { + Output: events[1].Message, + StatusCode: 298, + Metadata: events[1].Metadata, + Error: "Message type not supported", + }, + { + Output: events[2].Message, + StatusCode: 298, + Metadata: events[2].Metadata, + Error: "Message type not supported", + }, + }, } response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) @@ -2967,8 +3081,27 @@ var _ = Describe("Static Function Tests", func() { }, } expectedResponse := transformer.Response{ - Events: nil, - FailedEvents: nil, + Events: nil, + FailedEvents: []transformer.TransformerResponse{ + { + Output: events[0].Message, + StatusCode: 298, + Metadata: events[0].Metadata, + Error: "Filtering event based on hybridModeFilter", + }, + { + Output: events[1].Message, + StatusCode: 298, + Metadata: events[1].Metadata, + Error: "Filtering event based on hybridModeFilter", + }, + { + Output: events[2].Message, + StatusCode: 298, + Metadata: events[2].Metadata, + Error: "Message type not supported", + }, + }, } response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) @@ -3056,7 +3189,14 @@ var _ = Describe("Static Function Tests", func() { Metadata: events[1].Metadata, }, }, - FailedEvents: nil, + FailedEvents: []transformer.TransformerResponse{ + { + Output: events[2].Message, + StatusCode: 298, + Metadata: events[2].Metadata, + Error: "Message type not supported", + }, + }, } response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) @@ -3139,7 +3279,14 @@ var _ = Describe("Static Function Tests", func() { Metadata: events[1].Metadata, }, }, - FailedEvents: nil, + FailedEvents: []transformer.TransformerResponse{ + { + Output: events[2].Message, + StatusCode: 298, + Metadata: events[2].Metadata, + Error: "Message type not supported", + }, + }, } response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) @@ -3225,7 +3372,14 @@ var _ = Describe("Static Function Tests", func() { Metadata: events[1].Metadata, }, }, - FailedEvents: nil, + FailedEvents: []transformer.TransformerResponse{ + { + Output: events[2].Message, + StatusCode: 298, + Metadata: events[2].Metadata, + Error: "Message type not supported", + }, + }, } response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) @@ -3314,7 +3468,14 @@ var _ = Describe("Static Function Tests", func() { Metadata: events[1].Metadata, }, }, - FailedEvents: nil, + FailedEvents: []transformer.TransformerResponse{ + { + Output: events[2].Message, + StatusCode: 298, + Metadata: events[2].Metadata, + Error: "Message type not supported", + }, + }, } response := ConvertToFilteredTransformerResponse(events, true) Expect(response).To(Equal(expectedResponse)) diff --git a/processor/trackingplan.go b/processor/trackingplan.go index 9b4c8b42e4..cc47f45eac 100644 --- a/processor/trackingplan.go +++ b/processor/trackingplan.go @@ -14,10 +14,11 @@ import ( ) type TrackingPlanStatT struct { - numEvents stats.Measurement - numValidationSuccessEvents stats.Measurement - numValidationFailedEvents stats.Measurement - tpValidationTime stats.Measurement + numEvents stats.Measurement + numValidationSuccessEvents stats.Measurement + numValidationFailedEvents stats.Measurement + numValidationFilteredEvents stats.Measurement + tpValidationTime stats.Measurement } // reportViolations It is going add violationErrors in context depending upon certain criteria: @@ -113,19 +114,21 @@ func (proc *Handle) validateEvents(groupedEventsBySourceId map[SourceIDT][]trans var successMetrics []*types.PUReportedMetric eventsToTransform, successMetrics, _, _ := proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.TrackingPlanValidationStage, true, false) // Note: Sending false for usertransformation enabled is safe because this stage is before user transformation. - failedJobs, failedMetrics, _ := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.TrackingPlanValidationStage, false, true) + nonSuccessMetrics := proc.getNonSuccessfulMetrics(response, commonMetaData, eventsByMessageID, transformer.TrackingPlanValidationStage, false, true) validationStat.numValidationSuccessEvents.Count(len(eventsToTransform)) - validationStat.numValidationFailedEvents.Count(len(failedJobs)) + validationStat.numValidationFailedEvents.Count(len(nonSuccessMetrics.failedJobs)) + validationStat.numValidationFilteredEvents.Count(len(nonSuccessMetrics.filteredJobs)) proc.logger.Debug("Validation output size", len(eventsToTransform)) - validatedErrorJobs = append(validatedErrorJobs, failedJobs...) + validatedErrorJobs = append(validatedErrorJobs, nonSuccessMetrics.failedJobs...) // REPORTING - START if proc.isReportingEnabled() { // There will be no diff metrics for tracking plan validation validatedReportMetrics = append(validatedReportMetrics, successMetrics...) - validatedReportMetrics = append(validatedReportMetrics, failedMetrics...) + validatedReportMetrics = append(validatedReportMetrics, nonSuccessMetrics.failedMetrics...) + validatedReportMetrics = append(validatedReportMetrics, nonSuccessMetrics.filteredMetrics...) } // REPORTING - END @@ -168,12 +171,14 @@ func (proc *Handle) newValidationStat(metadata *transformer.Metadata) *TrackingP numEvents := proc.statsFactory.NewTaggedStat("proc_num_tp_input_events", stats.CountType, tags) numValidationSuccessEvents := proc.statsFactory.NewTaggedStat("proc_num_tp_output_success_events", stats.CountType, tags) numValidationFailedEvents := proc.statsFactory.NewTaggedStat("proc_num_tp_output_failed_events", stats.CountType, tags) + numValidationFilteredEvents := proc.statsFactory.NewTaggedStat("proc_num_tp_output_filtered_events", stats.CountType, tags) tpValidationTime := proc.statsFactory.NewTaggedStat("proc_tp_validation", stats.TimerType, tags) return &TrackingPlanStatT{ - numEvents: numEvents, - numValidationSuccessEvents: numValidationSuccessEvents, - numValidationFailedEvents: numValidationFailedEvents, - tpValidationTime: tpValidationTime, + numEvents: numEvents, + numValidationSuccessEvents: numValidationSuccessEvents, + numValidationFailedEvents: numValidationFailedEvents, + numValidationFilteredEvents: numValidationFilteredEvents, + tpValidationTime: tpValidationTime, } } diff --git a/processor/transformer/transformer.go b/processor/transformer/transformer.go index 575002044b..5108cda987 100644 --- a/processor/transformer/transformer.go +++ b/processor/transformer/transformer.go @@ -448,7 +448,17 @@ func (trans *handle) doPost(ctx context.Context, rawJSON []byte, url, stage stri requestStartTime := time.Now() trace.WithRegion(ctx, "request/post", func() { - resp, reqErr = trans.client.Post(url, "application/json; charset=utf-8", bytes.NewBuffer(rawJSON)) + var req *http.Request + req, reqErr = http.NewRequest("POST", url, bytes.NewBuffer(rawJSON)) + if reqErr != nil { + return + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + // Header to let transformer know that the client understands event filter code + req.Header.Set("X-Feature-Filter-Code", "?1") + + resp, reqErr = trans.client.Do(req) }) trans.requestTime(tags, time.Since(requestStartTime)) if reqErr != nil { diff --git a/router/handle.go b/router/handle.go index dc550e2006..b910e05314 100644 --- a/router/handle.go +++ b/router/handle.go @@ -320,7 +320,7 @@ func (rt *Handle) commitStatusList(workerJobStatuses *[]workerJobStatus) { sd.Count++ } } - case jobsdb.Succeeded.State: + case jobsdb.Succeeded.State, jobsdb.Filtered.State: routerWorkspaceJobStatusCount[workspaceID]++ sd.Count++ completedJobsList = append(completedJobsList, workerJobStatus.job) diff --git a/router/internal/eventorder/eventorder.go b/router/internal/eventorder/eventorder.go index 8dee47b558..3483bca44d 100644 --- a/router/internal/eventorder/eventorder.go +++ b/router/internal/eventorder/eventorder.go @@ -186,6 +186,8 @@ func (b *Barrier) StateChanged(key string, jobID int64, state string) error { switch state { case jobsdb.Succeeded.State: command = &jobSucceededCmd{jsCmd} + case jobsdb.Filtered.State: + command = &jobFilteredCmd{jsCmd} case jobsdb.Failed.State: command = &jobFailedCommand{jsCmd} case jobsdb.Aborted.State: @@ -365,6 +367,26 @@ func (c *jobSucceededCmd) execute(b *Barrier) { } } +// jobFilteredCmd is a command that is executed when a job is filtered. +type jobFilteredCmd struct { + *cmd +} + +// removes the barrier for this key, if it exists +func (c *jobFilteredCmd) execute(b *Barrier) { + if barrier, ok := b.barriers[c.key]; ok { + barrier.Leave(c.jobID) + if barrier.failedJobID != nil && *barrier.failedJobID != c.jobID { // out-of-sync command (failed commands get executed immediately) + return + } + barrier.failedJobID = nil + barrier.drainLimiter = nil + if barrier.Inactive() { + delete(b.barriers, c.key) + } + } +} + // jobAbortedCommand is a command that is executed when a job has aborted. type jobAbortedCommand struct { *cmd diff --git a/router/router_test.go b/router/router_test.go index 548c3ffecb..8e82d6aa96 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -1452,6 +1452,165 @@ var _ = Describe("router", func() { }, 20*time.Second, 100*time.Millisecond).Should(Equal(true)) }) + It("skip sendpost && (if statusCode returned is 298 then mark as filtered & if statusCode returned is 299 then mark as succeeded)", func() { + mockNetHandle := mocksRouter.NewMockNetHandle(c.mockCtrl) + mockTransformer := mocksTransformer.NewMockTransformer(c.mockCtrl) + router := &Handle{ + Reporting: &reporting.NOOP{}, + netHandle: mockNetHandle, + } + c.mockBackendConfig.EXPECT().AccessToken().AnyTimes() + router.Setup(gaDestinationDefinition, logger.NOP, conf, c.mockBackendConfig, c.mockRouterJobsDB, c.mockProcErrorsDB, transientsource.NewEmptyService(), rsources.NewNoOpService(), destinationdebugger.NewNoOpService()) + router.transformer = mockTransformer + router.noOfWorkers = 1 + router.reloadableConfig.noOfJobsToBatchInAWorker = misc.SingleValueLoader(3) + router.reloadableConfig.routerTimeout = misc.SingleValueLoader(time.Duration(math.MaxInt64)) + + gaPayload := `{"body": {"XML": {}, "FORM": {}, "JSON": {}}, "type": "REST", "files": {}, "method": "POST", "params": {"t": "event", "v": "1", "an": "RudderAndroidClient", "av": "1.0", "ds": "android-sdk", "ea": "Demo Track", "ec": "Demo Category", "el": "Demo Label", "ni": 0, "qt": 59268380964, "ul": "en-US", "cid": "anon_id", "tid": "UA-185645846-1", "uip": "[::1]", "aiid": "com.rudderlabs.android.sdk"}, "userId": "anon_id", "headers": {}, "version": "1", "endpoint": "https://www.google-analytics.com/collect"}` + parameters := fmt.Sprintf(`{"source_id": "1fMCVYZboDlYlauh4GFsEo2JU77", "destination_id": "%s", "message_id": "2f548e6d-60f6-44af-a1f4-62b3272445c3", "received_at": "2021-06-28T10:04:48.527+05:30", "transform_at": "router"}`, gaDestinationID) // skipcq: GO-R4002 + + toRetryJobsList := []*jobsdb.JobT{ + { + UUID: uuid.New(), + UserID: "u1", + JobID: 2009, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: customVal["GA"], + EventPayload: []byte(gaPayload), + LastJobStatus: jobsdb.JobStatusT{ + AttemptNum: 1, + ErrorResponse: []byte(`{"firstAttemptedAt": "2021-06-28T15:57:30.742+05:30"}`), + }, + Parameters: []byte(parameters), + }, + } + + unprocessedJobsList := []*jobsdb.JobT{ + { + UUID: uuid.New(), + UserID: "u1", + JobID: 2010, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: customVal["GA"], + EventPayload: []byte(gaPayload), + LastJobStatus: jobsdb.JobStatusT{ + AttemptNum: 0, + }, + Parameters: []byte(parameters), + }, + { + UUID: uuid.New(), + UserID: "u2", + JobID: 2011, + CreatedAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + ExpireAt: time.Date(2020, 0o4, 28, 13, 26, 0o0, 0o0, time.UTC), + CustomVal: customVal["GA"], + EventPayload: []byte(gaPayload), + LastJobStatus: jobsdb.JobStatusT{ + AttemptNum: 0, + }, + Parameters: []byte(parameters), + }, + } + + allJobs := append(toRetryJobsList, unprocessedJobsList...) + + payloadLimit := router.reloadableConfig.payloadLimit + callAllJobs := c.mockRouterJobsDB.EXPECT().GetToProcess(gomock.Any(), + jobsdb.GetQueryParams{ + CustomValFilters: []string{customVal["GA"]}, + ParameterFilters: []jobsdb.ParameterFilterT{{Name: "destination_id", Value: gaDestinationID}}, + PayloadSizeLimit: payloadLimit.Load(), + JobsLimit: 10000, + }, nil).Times(1).Return(&jobsdb.MoreJobsResult{JobsResult: jobsdb.JobsResult{Jobs: allJobs}}, nil) + + c.mockRouterJobsDB.EXPECT().UpdateJobStatus(gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1). + Do(func(ctx context.Context, statuses []*jobsdb.JobStatusT, _, _ interface{}) { + assertJobStatus(toRetryJobsList[0], statuses[0], jobsdb.Executing.State, "", `{}`, 1) + assertJobStatus(unprocessedJobsList[0], statuses[1], jobsdb.Executing.State, "", `{}`, 0) + assertJobStatus(unprocessedJobsList[1], statuses[2], jobsdb.Executing.State, "", `{}`, 0) + }).Return(nil).After(callAllJobs) + + mockTransformer.EXPECT().Transform("ROUTER_TRANSFORM", gomock.Any()).After(callAllJobs).Times(1).DoAndReturn( + func(_ string, transformMessage *types.TransformMessageT) []types.DestinationJobT { + assertRouterJobs(&transformMessage.Data[0], toRetryJobsList[0]) + assertRouterJobs(&transformMessage.Data[1], unprocessedJobsList[0]) + assertRouterJobs(&transformMessage.Data[2], unprocessedJobsList[1]) + + return []types.DestinationJobT{ + { + Message: []byte(`{"message": "some transformed message"}`), + JobMetadataArray: []types.JobMetadataT{ + { + UserID: "u1", + JobID: 2009, + AttemptNum: 1, + JobT: toRetryJobsList[0], + }, + }, + Batched: false, + Error: `{}`, + StatusCode: 200, + }, + { + Message: []byte(`{"message": "some transformed message"}`), + JobMetadataArray: []types.JobMetadataT{ + { + UserID: "u1", + JobID: 2010, + JobT: unprocessedJobsList[0], + }, + }, + Batched: false, + Error: `{}`, + StatusCode: 298, + }, + { + Message: []byte(`{"message": "some transformed message"}`), + JobMetadataArray: []types.JobMetadataT{ + { + UserID: "u2", + JobID: 2011, + JobT: unprocessedJobsList[1], + }, + }, + Batched: false, + Error: `{}`, + StatusCode: 299, + }, + } + }) + + mockNetHandle.EXPECT().SendPost(gomock.Any(), gomock.Any()).Times(1).Return(&routerUtils.SendPostResponse{StatusCode: 200, ResponseBody: []byte("")}) + done := make(chan struct{}) + c.mockRouterJobsDB.EXPECT().WithUpdateSafeTx(gomock.Any(), gomock.Any()).Times(1).Do(func(ctx context.Context, f func(tx jobsdb.UpdateSafeTx) error) { + _ = f(jobsdb.EmptyUpdateSafeTx()) + close(done) + }).Return(nil) + c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Len(len(toRetryJobsList)+len(unprocessedJobsList)), []string{customVal["GA"]}, nil).Times(1). + Do(func(ctx context.Context, txn jobsdb.UpdateSafeTx, statuses []*jobsdb.JobStatusT, _, _ interface{}) { + assertTransformJobStatuses(toRetryJobsList[0], statuses[0], jobsdb.Succeeded.State, "200", 2) + assertTransformJobStatuses(unprocessedJobsList[0], statuses[1], jobsdb.Filtered.State, "298", 1) + assertTransformJobStatuses(unprocessedJobsList[1], statuses[2], jobsdb.Succeeded.State, "299", 1) + }) + + <-router.backendConfigInitialized + worker := newPartitionWorker(context.Background(), router, gaDestinationID) + defer worker.Stop() + Expect(worker.Work()).To(BeTrue()) + Expect(worker.pickupCount).To(Equal(3)) + Eventually(func() bool { + select { + case <-done: + return true + default: + return false + } + }, 20*time.Second, 100*time.Millisecond).Should(Equal(true)) + }) + /* Job1 u1 Job2 u1 diff --git a/router/transformer/transformer.go b/router/transformer/transformer.go index 7bce15b2f0..fa025f4cfc 100644 --- a/router/transformer/transformer.go +++ b/router/transformer/transformer.go @@ -119,8 +119,17 @@ func (trans *handle) Transform(transformType string, transformMessage *types.Tra for { s := time.Now() - resp, err = trans.client.Post(url, "application/json; charset=utf-8", - bytes.NewBuffer(rawJSON)) + req, err := http.NewRequest("POST", url, bytes.NewBuffer(rawJSON)) + if err != nil { + // No point in retrying if we can't even create a request. Panicking as per convention. + panic(fmt.Errorf("JS HTTP request creation error: URL: %v Error: %+v", url, err)) + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + // Header to let transformer know that the client understands event filter code + req.Header.Set("X-Feature-Filter-Code", "?1") + + resp, err = trans.client.Do(req) if err == nil { // If no err returned by client.Post, reading body. diff --git a/router/worker.go b/router/worker.go index 704ca64f06..d7e072599c 100644 --- a/router/worker.go +++ b/router/worker.go @@ -28,6 +28,7 @@ import ( destinationdebugger "github.com/rudderlabs/rudder-server/services/debugger/destination" "github.com/rudderlabs/rudder-server/services/oauth" "github.com/rudderlabs/rudder-server/utils/misc" + utilTypes "github.com/rudderlabs/rudder-server/utils/types" ) // worker a structure to define a worker for sending events to sinks @@ -556,7 +557,18 @@ func (w *worker) processDestinationJobs() { } else { respStatusCode = destinationJob.StatusCode respBody = destinationJob.Error - errorAt = routerutils.ERROR_AT_TF + switch destinationJob.StatusCode { + case utilTypes.FilterEventCode: + if respBody == "" { + respBody = "Event filtered" + } + case utilTypes.SuppressEventCode: + if respBody == "" { + respBody = "Event handled by transformer" + } + default: + errorAt = routerutils.ERROR_AT_TF + } } prevRespStatusCode = respStatusCode @@ -761,6 +773,9 @@ func (w *worker) postStatusOnResponseQ(respStatusCode int, payload json.RawMessa if isSuccessStatus(respStatusCode) { status.JobState = jobsdb.Succeeded.State + if respStatusCode == utilTypes.FilterEventCode { + status.JobState = jobsdb.Filtered.State + } w.logger.Debugf("sending success status to response") w.rt.responseQ <- workerJobStatus{userID: destinationJobMetadata.UserID, worker: w, job: destinationJobMetadata.JobT, status: status} } else { @@ -806,7 +821,7 @@ func (w *worker) sendRouterResponseCountStat(status *jobsdb.JobStatusT, destinat destinationTag := misc.GetTagName(destination.ID, destination.Name) var alert bool alert = w.allowRouterAbortedAlert(errorAt) - if status.JobState == jobsdb.Succeeded.State { + if status.JobState == jobsdb.Succeeded.State || status.JobState == jobsdb.Filtered.State { alert = !w.rt.reloadableConfig.skipRtAbortAlertForTransformation.Load() || !w.rt.reloadableConfig.skipRtAbortAlertForDelivery.Load() errorAt = "" } diff --git a/services/rsources/stats_collector.go b/services/rsources/stats_collector.go index 4f8f1c76c4..34b1c2fd76 100644 --- a/services/rsources/stats_collector.go +++ b/services/rsources/stats_collector.go @@ -152,7 +152,8 @@ func (r *statsCollector) JobStatusesUpdated(jobStatuses []*jobsdb.JobStatusT) { stats, ok := r.statsIndex[statKey] if ok { switch jobStatus.JobState { - case jobsdb.Succeeded.State: + // Filtered state is being considered as a success. If we want to report them separately, we can add a new field in stats + case jobsdb.Succeeded.State, jobsdb.Filtered.State: stats.Out++ case jobsdb.Aborted.State: stats.Failed++ diff --git a/utils/types/types.go b/utils/types/types.go index 7c64307a93..ab415a0c80 100644 --- a/utils/types/types.go +++ b/utils/types/types.go @@ -10,6 +10,11 @@ import ( "github.com/rudderlabs/rudder-server/enterprise/suppress-user/model" ) +const ( + FilterEventCode = 298 + SuppressEventCode = 299 +) + // SingularEventT single event structrue type SingularEventT map[string]interface{}