Skip to content

Commit

Permalink
Further clean up.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 27, 2024
1 parent 13f2a61 commit 6d42988
Showing 1 changed file with 30 additions and 37 deletions.
67 changes: 30 additions & 37 deletions processes/consumer/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ func TestProcessMessageFailures(t *testing.T) {
Format: &mgo,
})

vals := []string{
`{
val := `{
"schema": {
"type": "struct",
"fields": [{
Expand Down Expand Up @@ -157,55 +156,49 @@ func TestProcessMessageFailures(t *testing.T) {
"ts_ms": 1668753329387,
"transaction": null
}
}`,
}
}`

memoryDB := memDB
for _, val := range vals {
msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", 1004))
if val != "" {
msg.KafkaMsg.Value = []byte(val)
}

args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)

td := memoryDB.GetOrCreateTableData(table)
// Check that there are corresponding row(s) in the memory DB
assert.Len(t, td.Rows(), 1)
msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", 1004))
msg.KafkaMsg.Value = []byte(val)
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.NoError(t, err)
assert.Equal(t, table, tableName)

td := memoryDB.GetOrCreateTableData(table)
// Check that there are corresponding row(s) in the memory DB
assert.Len(t, td.Rows(), 1)

var rowData map[string]any
for _, row := range td.Rows() {
if row["_id"] == int64(1004) {
rowData = row
}
}

val, isOk := rowData[constants.DeleteColumnMarker]
assert.True(t, isOk)
assert.False(t, val.(bool))

msg.KafkaMsg.Value = []byte("not a json object")
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
{
rowValue, isOk := rowData[constants.DeleteColumnMarker]
assert.True(t, isOk)
assert.False(t, rowValue.(bool))
}
{
msg.KafkaMsg.Value = []byte("not a json object")
args = processArgs{
Msg: msg,
GroupID: "foo",
TopicToConfigFormatMap: tcFmtMap,
}

tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal")
assert.Empty(t, tableName)
assert.True(t, td.NumberOfRows() > 0)
tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{})
assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal")
assert.Empty(t, tableName)
assert.True(t, td.NumberOfRows() > 0)
}
}

func TestProcessMessageSkip(t *testing.T) {
Expand Down

0 comments on commit 6d42988

Please sign in to comment.