Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter events support #3882

Merged
merged 5 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,7 @@ var (
Succeeded = jobStateT{isValid: true, isTerminal: true, State: "succeeded"}
Aborted = jobStateT{isValid: true, isTerminal: true, State: "aborted"}
Migrated = jobStateT{isValid: true, isTerminal: true, State: "migrated"}
Filtered = jobStateT{isValid: true, isTerminal: true, State: "filtered"}
chandumlg marked this conversation as resolved.
Show resolved Hide resolved

validTerminalStates []string
validNonTerminalStates []string
Expand All @@ -615,6 +616,7 @@ var jobStates = []jobStateT{
Aborted,
Migrated,
Importing,
Filtered,
}

// OwnerType for this jobsdb instance
Expand Down
34 changes: 20 additions & 14 deletions processor/eventfilter/eventfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,11 @@ func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEve
messageType := strings.TrimSpace(strings.ToLower(getMessageType(&transformerEvent.Message)))
if messageType == "" {
// We will abort the event
errMessage := "Invalid message type. Type assertion failed"
resp := &transformer.TransformerResponse{
return false, &transformer.TransformerResponse{
Output: transformerEvent.Message, StatusCode: 400,
Metadata: transformerEvent.Metadata,
Error: errMessage,
Error: "Invalid message type. Type assertion failed",
}
return false, resp
}

isSupportedMsgType := slices.Contains(supportedMsgTypes, messageType)
Expand All @@ -143,12 +141,16 @@ func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEve
"supportedMsgTypes", supportedMsgTypes, "messageType", messageType,
)
// We will not allow the event
return false, nil
return false, &transformer.TransformerResponse{
Output: transformerEvent.Message, StatusCode: types.FilterEventCode,
Metadata: transformerEvent.Metadata,
Error: "Message type not supported",
}
}
// MessageType filtering -- ENDS

// hybridModeCloudEventsFilter.srcType.[eventProperty] filtering -- STARTS
allow, failedResponse := FilterEventsForHybridMode(ConnectionModeFilterParams{
allow := FilterEventsForHybridMode(ConnectionModeFilterParams{
Destination: &transformerEvent.Destination,
SrcType: transformerEvent.Metadata.SourceDefinitionType,
Event: &EventParams{MessageType: messageType},
Expand All @@ -163,7 +165,11 @@ func AllowEventToDestTransformation(transformerEvent *transformer.TransformerEve
})

if !allow {
return allow, failedResponse
return allow, &transformer.TransformerResponse{
Output: transformerEvent.Message, StatusCode: types.FilterEventCode,
Metadata: transformerEvent.Metadata,
Error: "Filtering event based on hybridModeFilter",
}
}
// hybridModeCloudEventsFilter.srcType.[eventProperty] filtering -- ENDS

Expand Down Expand Up @@ -195,38 +201,38 @@ Example:
...
}
*/
func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) (bool, *transformer.TransformerResponse) {
func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterParams) bool {
destination := connectionModeFilterParams.Destination
srcType := strings.TrimSpace(connectionModeFilterParams.SrcType)
messageType := connectionModeFilterParams.Event.MessageType
evaluatedDefaultBehaviour := connectionModeFilterParams.DefaultBehaviour

if srcType == "" {
pkgLogger.Debug("sourceType is empty string, filtering event based on default behaviour")
return evaluatedDefaultBehaviour, nil
return evaluatedDefaultBehaviour
}

destConnModeI := misc.MapLookup(destination.Config, "connectionMode")
if destConnModeI == nil {
pkgLogger.Debug("connectionMode not present, filtering event based on default behaviour")
return evaluatedDefaultBehaviour, nil
return evaluatedDefaultBehaviour
}
destConnectionMode, isDestConnModeString := destConnModeI.(string)
if !isDestConnModeString || destConnectionMode != hybridMode {
pkgLogger.Debugf("Provided connectionMode(%v) is in wrong format or the mode is not %q, filtering event based on default behaviour", destConnModeI, hybridMode)
return evaluatedDefaultBehaviour, nil
return evaluatedDefaultBehaviour
}

sourceEventPropertiesI := misc.MapLookup(destination.DestinationDefinition.Config, hybridModeEventsFilterKey, srcType)
if sourceEventPropertiesI == nil {
pkgLogger.Debugf("Destination definition config doesn't contain proper values for %[1]v or %[1]v.%[2]v", hybridModeEventsFilterKey, srcType)
return evaluatedDefaultBehaviour, nil
return evaluatedDefaultBehaviour
}
eventProperties, isOk := sourceEventPropertiesI.(map[string]interface{})

if !isOk || len(eventProperties) == 0 {
pkgLogger.Debugf("'%v.%v' is not correctly defined", hybridModeEventsFilterKey, srcType)
return evaluatedDefaultBehaviour, nil
return evaluatedDefaultBehaviour
}

// Flag indicating to let the event pass through
Expand All @@ -251,7 +257,7 @@ func FilterEventsForHybridMode(connectionModeFilterParams ConnectionModeFilterPa
allowEvent = slices.Contains(messageTypes, messageType) && evaluatedDefaultBehaviour
}
}
return allowEvent, nil
return allowEvent
}

type EventPropsTypes interface {
Expand Down
64 changes: 63 additions & 1 deletion processor/eventfilter/eventfilter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/require"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/processor/transformer"
)

func TestFilterEventsForHybridMode(t *testing.T) {
Expand Down Expand Up @@ -402,7 +403,7 @@ func TestFilterEventsForHybridMode(t *testing.T) {
},
}

actualOutput, _ := FilterEventsForHybridMode(
actualOutput := FilterEventsForHybridMode(
ConnectionModeFilterParams{
SrcType: source.SourceDefinition.Type,
Destination: destination,
Expand Down Expand Up @@ -453,3 +454,64 @@ func TestConvertToArrayOfType(t *testing.T) {
})
}
}

func TestAllowEventToDestTransformation(t *testing.T) {
type testCaseT struct {
caseName string
transformerEvent *transformer.TransformerEvent
expected bool
supportedMsgTypes []string
expectedResp *transformer.TransformerResponse
}

testCases := []testCaseT{
{
caseName: "if message type is invalid, return false with statusCode 400",
transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": ""}},
expected: false,
supportedMsgTypes: []string{"track"},
expectedResp: &transformer.TransformerResponse{
Output: map[string]interface{}{"type": ""},
StatusCode: 400,
Error: "Invalid message type. Type assertion failed",
},
},
{
caseName: "if message type is unsupported, return false with statusCode 298",
transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": "identify"}},
expected: false,
supportedMsgTypes: []string{"track"},
expectedResp: &transformer.TransformerResponse{
Output: map[string]interface{}{"type": "identify"},
StatusCode: 298,
Error: "Message type not supported",
},
},
{
caseName: "if event is filtered due to FilterEventsForHybridMode, return statusCode 298",
transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": "track"}, Metadata: transformer.Metadata{}},
expected: false,
supportedMsgTypes: []string{"track"},
expectedResp: &transformer.TransformerResponse{
Output: map[string]interface{}{"type": "track"},
StatusCode: 298,
Error: "Filtering event based on hybridModeFilter",
},
},
{
caseName: "if event is legit, return true with nil response",
transformerEvent: &transformer.TransformerEvent{Message: map[string]interface{}{"type": "track"}, Destination: backendconfig.DestinationT{IsProcessorEnabled: true}},
expected: true,
supportedMsgTypes: []string{"track"},
expectedResp: nil,
},
}

for _, tc := range testCases {
t.Run(tc.caseName, func(t *testing.T) {
allow, resp := AllowEventToDestTransformation(tc.transformerEvent, tc.supportedMsgTypes)
require.Equal(t, tc.expected, allow)
require.Equal(t, tc.expectedResp, resp)
})
}
}
Loading