From e8e3a474d06481bbf05e836946a62e342504bbbb Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Fri, 24 Mar 2023 12:04:04 -0700 Subject: [PATCH 1/3] Fixed sink bug when using struct value for record --- .../kafka/connect/sink/CosmosDBSinkTask.java | 16 +++- .../connect/sink/CosmosDBSinkTaskTest.java | 87 ++++++++++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java b/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java index 0e391c67..c5e24ecb 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTask.java @@ -111,12 +111,26 @@ public void put(Collection records) { Object recordValue; if (record.value() instanceof Struct) { recordValue = StructToJsonMap.toJsonMap((Struct) record.value()); + // TODO: Do we need to update the value schema to map or keep it struct? } else { recordValue = record.value(); } maybeInsertId(recordValue, record); - toBeWrittenRecordList.add(record); + + // Create an updated record with from the current record and the updated record value + final SinkRecord updatedRecord = new SinkRecord(record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + record.valueSchema(), + recordValue, + record.kafkaOffset(), + record.timestamp(), + record.timestampType(), + record.headers()); + + toBeWrittenRecordList.add(updatedRecord); } try { diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java index ccfd54df..f5314691 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkTaskTest.java @@ -7,6 +7,9 @@ import com.azure.cosmos.CosmosContainer; import com.azure.cosmos.CosmosDatabase; import com.azure.cosmos.implementation.BadRequestException; +import com.azure.cosmos.kafka.connect.source.JsonToStruct; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; @@ -23,9 +26,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.fail; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyString; @@ -108,7 +113,7 @@ public void sinkWriteFailed() { } try { - testTask.put(Arrays.asList(record)); + testTask.put(List.of(record)); fail("Expected ConnectException on bad message"); } catch (ConnectException ce) { @@ -158,7 +163,7 @@ public void sinkWriteSucceeded() { } try { - testTask.put(Arrays.asList(record)); + testTask.put(List.of(record)); } catch (ConnectException ce) { fail("Expected sink write succeeded. but got: " + ce.getMessage()); } catch (Throwable t) { @@ -172,5 +177,83 @@ public void sinkWriteSucceeded() { } } } + + @Test + public void sinkWriteSucceededWithStructRecordValue() { + Schema stringSchema = new ConnectSchema(Schema.Type.STRING); + Schema structSchema = new ConnectSchema(Schema.Type.STRUCT); + ObjectNode objectNode = new ObjectNode(new JsonNodeFactory(false)) + .put("foo", "fooz") + .put("bar", "baaz"); + JsonToStruct jsonToStruct = new JsonToStruct(); + + Object recordValue = jsonToStruct.recordToSchemaAndValue(objectNode).value(); + + + SinkRecord record = new SinkRecord(topicName, 1, stringSchema, "nokey", structSchema, recordValue, 0L); + assertNotNull(record.value()); + + SinkWriteResponse sinkWriteResponse = new SinkWriteResponse(); + sinkWriteResponse.getSucceededRecords().add(record); + + MockedConstruction mockedWriterConstruction = null; + AtomicReference> sinkRecords = new AtomicReference<>(); + try { + if (this.isBulkModeEnabled) { + mockedWriterConstruction = mockConstructionWithAnswer(BulkWriter.class, invocation -> { + if (invocation.getMethod().equals(BulkWriter.class.getMethod("write", List.class))) { + sinkRecords.set(invocation.getArgument(0)); + return sinkWriteResponse; + } + + throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); + }); + } else { + mockedWriterConstruction = mockConstructionWithAnswer(PointWriter.class, invocation -> { + if (invocation.getMethod().equals(PointWriter.class.getMethod("write", List.class))) { + sinkRecords.set(invocation.getArgument(0)); + return sinkWriteResponse; + } + + throw new IllegalStateException("Not implemented for method " + invocation.getMethod().getName()); + }); + } + + try { + testTask.put(List.of(record)); + } catch (ConnectException ce) { + fail("Expected sink write succeeded. but got: " + ce.getMessage()); + } catch (Throwable t) { + fail("Expected sink write succeeded, but got: " + t.getClass().getName()); + } + + assertEquals(1, mockedWriterConstruction.constructed().size()); + + SinkRecord sinkRecord = sinkRecords.get().get(0); + assertRecordEquals(record, sinkRecord); + + Object value = sinkRecord.value(); + assertTrue(value instanceof Map); + + assertEquals("fooz", ((Map) value).get("foo")); + assertEquals("baaz", ((Map) value).get("bar")); + } finally { + if (mockedWriterConstruction != null) { + mockedWriterConstruction.close(); + } + } + } + + private void assertRecordEquals(SinkRecord record, SinkRecord updatedRecord) { + assertEquals(record.kafkaOffset(), updatedRecord.kafkaOffset()); + assertEquals(record.timestamp(), updatedRecord.timestamp()); + assertEquals(record.timestampType(), updatedRecord.timestampType()); + assertEquals(record.headers(), updatedRecord.headers()); + assertEquals(record.keySchema(), updatedRecord.keySchema()); + assertEquals(record.valueSchema(), updatedRecord.valueSchema()); + assertEquals(record.key(), updatedRecord.key()); + assertEquals(record.topic(), updatedRecord.topic()); + assertEquals(record.kafkaPartition(), updatedRecord.kafkaPartition()); + } } From 0bfe13e75b52c3abd5b28d9fd7f24ef7f92a56e4 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Fri, 24 Mar 2023 12:19:13 -0700 Subject: [PATCH 2/3] Removed guava dependency from pom.xml to avoid security vulnerability --- pom.xml | 6 --- .../kafka/connect/sink/BulkWriterTests.java | 23 +++++++--- .../sink/CosmosDBSinkConnectorTest.java | 10 ++--- .../connect/sink/StructToJsonMapTest.java | 14 +++--- .../id/strategy/ProvidedInStrategyTest.java | 45 ++++++++++--------- .../id/strategy/TemplateStrategyTest.java | 17 +++---- 6 files changed, 61 insertions(+), 54 deletions(-) diff --git a/pom.xml b/pom.xml index 73be7fa1..dea28296 100644 --- a/pom.xml +++ b/pom.xml @@ -133,12 +133,6 @@ 4.0.3 test - - com.google.guava - guava - 29.0-jre - test - io.confluent kafka-avro-serializer diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java index ed4a7de9..3930c21e 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/BulkWriterTests.java @@ -16,7 +16,6 @@ import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.PartitionKeyDefinition; -import com.google.common.collect.Iterators; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; @@ -32,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertTrue; @@ -109,8 +109,12 @@ public void testBulkWriteWithNonTransientException() { ArgumentCaptor> parameters = ArgumentCaptor.forClass(Iterable.class); verify(container, times(1)).executeBulkOperations(parameters.capture()); + AtomicInteger count = new AtomicInteger(); + parameters.getValue().forEach(cosmosItemOperation -> { + count.incrementAndGet(); + }); Iterator bulkExecutionParameters = parameters.getValue().iterator(); - assertEquals(2, Iterators.size(bulkExecutionParameters)); + assertEquals(2, getIteratorSize(bulkExecutionParameters)); } @Test @@ -141,9 +145,9 @@ public void testBulkWriteSucceededWithTransientException() { List> allParameters = parameters.getAllValues(); assertEquals(3, allParameters.size()); - assertEquals(2, Iterators.size(allParameters.get(0).iterator())); - assertEquals(1, Iterators.size(allParameters.get(1).iterator())); - assertEquals(1, Iterators.size(allParameters.get(2).iterator())); + assertEquals(2, getIteratorSize(allParameters.get(0).iterator())); + assertEquals(1, getIteratorSize(allParameters.get(1).iterator())); + assertEquals(1, getIteratorSize(allParameters.get(2).iterator())); } @@ -210,4 +214,13 @@ private CosmosBulkOperationResponse mockFailedBulkOperationResponse(SinkRecord s return mockedBulkOptionResponse; } + + private int getIteratorSize(Iterator iterator) { + int count = 0; + while (iterator.hasNext()) { + iterator.next(); + count++; + } + return count; + } } diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkConnectorTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkConnectorTest.java index 9bb1cefc..2de5dd31 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkConnectorTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/CosmosDBSinkConnectorTest.java @@ -4,12 +4,12 @@ package com.azure.cosmos.kafka.connect.sink; import com.azure.cosmos.kafka.connect.CosmosDBConfig.CosmosClientBuilder; -import com.google.common.collect.ImmutableMap; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; import org.junit.Test; import org.mockito.MockedStatic; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -25,7 +25,7 @@ public class CosmosDBSinkConnectorTest { @Test public void testValidateEmptyConfigFailsRequiredFields() { - Config config = new CosmosDBSinkConnector().validate(ImmutableMap.of()); + Config config = new CosmosDBSinkConnector().validate(Collections.emptyMap()); Map> errorMessages = config.configValues().stream() .collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages)); @@ -46,7 +46,7 @@ public void testValidateCannotConnectToCosmos() { .when(() -> CosmosClientBuilder.createClient(anyString(), anyString())) .thenThrow(IllegalArgumentException.class); - Config config = connector.validate(ImmutableMap.of( + Config config = connector.validate(Map.of( CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF, "https://endpoint:port/", CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF, "superSecretPassword", CosmosDBSinkConfig.COSMOS_DATABASE_NAME_CONF, "superAwesomeDatabase", @@ -71,7 +71,7 @@ public void testValidateHappyPath() { .then(answerVoid((s1, s2) -> { })); - Config config = connector.validate(ImmutableMap.of( + Config config = connector.validate(Map.of( CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF, "https://cosmos-instance.documents.azure.com:443/", CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF, "superSecretPassword", @@ -106,7 +106,7 @@ public void testValidateTopicMapValidFormat() { } private void invalidTopicMapString(CosmosDBSinkConnector connector, String topicMapConfig) { - Config config = connector.validate(ImmutableMap.of( + Config config = connector.validate(Map.of( CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF, "https://endpoint:port/", CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF, "superSecretPassword", CosmosDBSinkConfig.COSMOS_DATABASE_NAME_CONF, "superAwesomeDatabase", diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java index 994800bc..e905a66b 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java @@ -3,8 +3,6 @@ package com.azure.cosmos.kafka.connect.sink; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -28,7 +26,7 @@ public void emptyStructToEmptyMap() { Schema schema = SchemaBuilder.struct() .build(); Struct struct = new Struct(schema); - assertEquals(ImmutableMap.of(), StructToJsonMap.toJsonMap(struct)); + assertEquals(Map.of(), StructToJsonMap.toJsonMap(struct)); } @@ -38,10 +36,10 @@ public void structWithEmptyArrayToMap() { .field("array_of_boolean", SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build()); Struct struct = new Struct(schema) - .put("array_of_boolean", ImmutableList.of()); + .put("array_of_boolean", Map.of()); Map converted = StructToJsonMap.toJsonMap(struct); - assertEquals(ImmutableList.of(), ((List) converted.get("array_of_boolean"))); + assertEquals(List.of(), converted.get("array_of_boolean")); } @Test @@ -86,8 +84,8 @@ public void complexStructToMap() { .put("string", quickBrownFox) .put("struct", new Struct(embeddedSchema) .put("embedded_string", quickBrownFox)) - .put("array_of_boolean", ImmutableList.of(false)) - .put("array_of_struct", ImmutableList.of( + .put("array_of_boolean", List.of(false)) + .put("array_of_struct", List.of( new Struct(embeddedSchema).put("embedded_string", quickBrownFox))); Map converted = StructToJsonMap.toJsonMap(struct); @@ -105,7 +103,7 @@ public void complexStructToMap() { assertEquals(quickBrownFox, converted.get("string")); assertEquals(quickBrownFox, ((Map) converted.get("struct")).get("embedded_string")); assertEquals(false, ((List) converted.get("array_of_boolean")).get(0)); - assertEquals(ImmutableMap.of("embedded_string", quickBrownFox), ((List) converted.get("array_of_struct")).get(0)); + assertEquals(Map.of("embedded_string", quickBrownFox), ((List) converted.get("array_of_struct")).get(0)); assertNull(converted.get("optional_string")); } diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java index 5ccfacdd..44dd59c1 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java @@ -3,8 +3,6 @@ package com.azure.cosmos.kafka.connect.sink.id.strategy; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; @@ -16,6 +14,9 @@ import org.junit.runners.Parameterized; import org.mockito.Mock; +import java.util.List; +import java.util.Map; + import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -24,7 +25,7 @@ public class ProvidedInStrategyTest { @Parameterized.Parameters(name = "{0}") public static Iterable parameters() { - return ImmutableList.of( + return List.of( new Object[]{ProvidedInValueStrategy.class, new ProvidedInValueStrategy()}, new Object[]{ProvidedInKeyStrategy.class, new ProvidedInKeyStrategy()} ); @@ -43,7 +44,7 @@ public static Iterable parameters() { public void setUp() { initMocks(this); - strategy.configure(ImmutableMap.of()); + strategy.configure(Map.of()); } private void returnOnKeyOrValue(Schema schema, Object ret) { @@ -64,13 +65,13 @@ public void valueNotStructOrMapShouldFail() { @Test(expected = ConnectException.class) public void noIdInValueShouldFail() { - returnOnKeyOrValue(null, ImmutableMap.of()); + returnOnKeyOrValue(null, Map.of()); strategy.generateId(record); } @Test public void stringIdOnMapShouldReturn() { - returnOnKeyOrValue(null, ImmutableMap.of( + returnOnKeyOrValue(null, Map.of( "id", "1234567" )); assertEquals("1234567", strategy.generateId(record)); @@ -78,7 +79,7 @@ public void stringIdOnMapShouldReturn() { @Test public void nonStringIdOnMapShouldReturn() { - returnOnKeyOrValue(null, ImmutableMap.of( + returnOnKeyOrValue(null, Map.of( "id", 1234567 )); assertEquals("1234567", strategy.generateId(record)); @@ -113,7 +114,7 @@ public void structIdOnStructShouldReturn() { @Test public void jsonPathOnStruct() { - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name")); Schema idSchema = SchemaBuilder.struct() .field("name", Schema.STRING_SCHEMA) @@ -130,27 +131,27 @@ public void jsonPathOnStruct() { @Test public void jsonPathOnMap() { - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name")); returnOnKeyOrValue(null, - ImmutableMap.of("id", ImmutableMap.of("name", "franz kafka"))); + Map.of("id", Map.of("name", "franz kafka"))); assertEquals("franz kafka", strategy.generateId(record)); } @Test(expected = ConnectException.class) public void invalidJsonPathThrows() { - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "invalid.path")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "invalid.path")); returnOnKeyOrValue(null, - ImmutableMap.of("id", ImmutableMap.of("name", "franz kafka"))); + Map.of("id", Map.of("name", "franz kafka"))); strategy.generateId(record); } @Test(expected = ConnectException.class) public void jsonPathNotExistThrows() { - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.not.exist")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.not.exist")); returnOnKeyOrValue(null, - ImmutableMap.of("id", ImmutableMap.of("name", "franz kafka"))); + Map.of("id", Map.of("name", "franz kafka"))); strategy.generateId(record); } @@ -158,25 +159,25 @@ public void jsonPathNotExistThrows() { @Test public void complexJsonPath() { returnOnKeyOrValue(null, - ImmutableMap.of("id", ImmutableList.of( - ImmutableMap.of("id", 0, + Map.of("id", List.of( + Map.of("id", 0, "name", "cosmos kramer", "occupation", "unknown"), - ImmutableMap.of("id", 1, + Map.of("id", 1, "name", "franz kafka", "occupation", "writer") ))); - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[0].name")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[0].name")); assertEquals("cosmos kramer", strategy.generateId(record)); - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[1].name")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[1].name")); assertEquals("franz kafka", strategy.generateId(record)); - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[*].id")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[*].id")); assertEquals("[0,1]", strategy.generateId(record)); - strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id")); + strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id")); assertEquals( "[{\"id\":0,\"name\":\"cosmos kramer\",\"occupation\":\"unknown\"},{\"id\":1,\"name\":\"franz kafka\",\"occupation\":\"writer\"}]", strategy.generateId(record)); @@ -184,7 +185,7 @@ public void complexJsonPath() { @Test public void generatedIdSanitized() { - returnOnKeyOrValue(null, ImmutableMap.of("id", "#my/special\\id?")); + returnOnKeyOrValue(null, Map.of("id", "#my/special\\id?")); String id = strategy.generateId(record); assertEquals("_my_special_id_", id); diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/TemplateStrategyTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/TemplateStrategyTest.java index a2b524e1..69281e2c 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/TemplateStrategyTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/TemplateStrategyTest.java @@ -3,13 +3,14 @@ package com.azure.cosmos.kafka.connect.sink.id.strategy; -import com.google.common.collect.ImmutableMap; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.Test; +import java.util.Map; + import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -19,7 +20,7 @@ public class TemplateStrategyTest { @Test public void simpleKey() { - strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}")); + strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}")); SinkRecord record = mock(SinkRecord.class); when(record.keySchema()).thenReturn(Schema.STRING_SCHEMA); when(record.key()).thenReturn("test"); @@ -30,7 +31,7 @@ public void simpleKey() { @Test public void kafkaMetadata() { - strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${topic}-${partition}-${offset}")); + strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${topic}-${partition}-${offset}")); SinkRecord record = mock(SinkRecord.class); when(record.topic()).thenReturn("mytopic"); when(record.kafkaPartition()).thenReturn(0); @@ -42,14 +43,14 @@ public void kafkaMetadata() { @Test public void unknownVariablePreserved() { - strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${unknown}")); + strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${unknown}")); String id = strategy.generateId(mock(SinkRecord.class)); assertEquals("${unknown}", id); } @Test public void nestedStruct() { - strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}")); + strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}")); SinkRecord record = mock(SinkRecord.class); Schema nestedSchema = SchemaBuilder.struct() .field("nested_field", Schema.STRING_SCHEMA) @@ -74,7 +75,7 @@ public void nestedStruct() { @Test public void fullKeyStrategyUsesFullKey() { strategy = new FullKeyStrategy(); - strategy.configure(ImmutableMap.of()); + strategy.configure(Map.of()); SinkRecord record = mock(SinkRecord.class); Schema schema = SchemaBuilder.struct() .field("string_field", Schema.STRING_SCHEMA) @@ -96,7 +97,7 @@ public void fullKeyStrategyUsesFullKey() { @Test public void metadataStrategyUsesMetadataWithDeliminator() { strategy = new KafkaMetadataStrategy(); - strategy.configure(ImmutableMap.of(KafkaMetadataStrategyConfig.DELIMITER_CONFIG, "_")); + strategy.configure(Map.of(KafkaMetadataStrategyConfig.DELIMITER_CONFIG, "_")); SinkRecord record = mock(SinkRecord.class); when(record.topic()).thenReturn("topic"); when(record.kafkaPartition()).thenReturn(0); @@ -110,7 +111,7 @@ public void metadataStrategyUsesMetadataWithDeliminator() { public void generatedIdSanitized() { strategy = new TemplateStrategy(); strategy.configure( - ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "#my/special\\id?")); + Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "#my/special\\id?")); SinkRecord record = mock(SinkRecord.class); String id = strategy.generateId(record); From 4c7a3be4a6418097bd7ef4d8363eae133c763e21 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Fri, 24 Mar 2023 12:50:01 -0700 Subject: [PATCH 3/3] Fixed tests --- .../connect/sink/StructToJsonMapTest.java | 2 +- .../id/strategy/ProvidedInStrategyTest.java | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java index e905a66b..b0e6350a 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/StructToJsonMapTest.java @@ -36,7 +36,7 @@ public void structWithEmptyArrayToMap() { .field("array_of_boolean", SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build()); Struct struct = new Struct(schema) - .put("array_of_boolean", Map.of()); + .put("array_of_boolean", List.of()); Map converted = StructToJsonMap.toJsonMap(struct); assertEquals(List.of(), converted.get("array_of_boolean")); diff --git a/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java b/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java index 44dd59c1..876acb44 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/sink/id/strategy/ProvidedInStrategyTest.java @@ -14,6 +14,7 @@ import org.junit.runners.Parameterized; import org.mockito.Mock; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -158,15 +159,15 @@ public void jsonPathNotExistThrows() { @Test public void complexJsonPath() { - returnOnKeyOrValue(null, - Map.of("id", List.of( - Map.of("id", 0, - "name", "cosmos kramer", - "occupation", "unknown"), - Map.of("id", 1, - "name", "franz kafka", - "occupation", "writer") - ))); + Map map1 = new LinkedHashMap<>(); + map1.put("id", 0); + map1.put("name", "cosmos kramer"); + map1.put("occupation", "unknown"); + Map map2 = new LinkedHashMap<>(); + map2.put("id", 1); + map2.put("name", "franz kafka"); + map2.put("occupation", "writer"); + returnOnKeyOrValue(null, Map.of("id", List.of(map1, map2))); strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[0].name")); assertEquals("cosmos kramer", strategy.generateId(record));