From 6d429884fc4e00c36eb396db772686d60ada29e8 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 27 Sep 2024 12:12:55 -0700 Subject: [PATCH] Further clean up. --- processes/consumer/process_test.go | 67 +++++++++++++----------------- 1 file changed, 30 insertions(+), 37 deletions(-) diff --git a/processes/consumer/process_test.go b/processes/consumer/process_test.go index fd89eaecc..1695b9f84 100644 --- a/processes/consumer/process_test.go +++ b/processes/consumer/process_test.go @@ -104,8 +104,7 @@ func TestProcessMessageFailures(t *testing.T) { Format: &mgo, }) - vals := []string{ - `{ + val := `{ "schema": { "type": "struct", "fields": [{ @@ -157,32 +156,24 @@ func TestProcessMessageFailures(t *testing.T) { "ts_ms": 1668753329387, "transaction": null } -}`, - } +}` memoryDB := memDB - for _, val := range vals { - msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", 1004)) - if val != "" { - msg.KafkaMsg.Value = []byte(val) - } - - args = processArgs{ - Msg: msg, - GroupID: "foo", - TopicToConfigFormatMap: tcFmtMap, - } - - tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) - assert.NoError(t, err) - assert.Equal(t, table, tableName) - - td := memoryDB.GetOrCreateTableData(table) - // Check that there are corresponding row(s) in the memory DB - assert.Len(t, td.Rows(), 1) + msg.KafkaMsg.Key = []byte(fmt.Sprintf("Struct{id=%v}", 1004)) + msg.KafkaMsg.Value = []byte(val) + args = processArgs{ + Msg: msg, + GroupID: "foo", + TopicToConfigFormatMap: tcFmtMap, } + tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) + assert.NoError(t, err) + assert.Equal(t, table, tableName) + td := memoryDB.GetOrCreateTableData(table) + // Check that there are corresponding row(s) in the memory DB + assert.Len(t, td.Rows(), 1) var rowData map[string]any for _, row := range td.Rows() { @@ -190,22 +181,24 @@ func TestProcessMessageFailures(t *testing.T) { rowData = row } } - - val, isOk := rowData[constants.DeleteColumnMarker] - assert.True(t, isOk) - assert.False(t, val.(bool)) - - msg.KafkaMsg.Value = []byte("not a json object") - args = processArgs{ - Msg: msg, - GroupID: "foo", - TopicToConfigFormatMap: tcFmtMap, + { + rowValue, isOk := rowData[constants.DeleteColumnMarker] + assert.True(t, isOk) + assert.False(t, rowValue.(bool)) } + { + msg.KafkaMsg.Value = []byte("not a json object") + args = processArgs{ + Msg: msg, + GroupID: "foo", + TopicToConfigFormatMap: tcFmtMap, + } - tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) - assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal") - assert.Empty(t, tableName) - assert.True(t, td.NumberOfRows() > 0) + tableName, err = args.process(ctx, cfg, memDB, &mocks.FakeBaseline{}, metrics.NullMetricsProvider{}) + assert.ErrorContains(t, err, "cannot unmarshall event: failed to unmarshal json: invalid character 'o' in literal") + assert.Empty(t, tableName) + assert.True(t, td.NumberOfRows() > 0) + } } func TestProcessMessageSkip(t *testing.T) {