Skip to content

Commit

Permalink
Removed guava dependency from pom.xml to avoid security vulnerability
Browse files Browse the repository at this point in the history
  • Loading branch information
kushagraThapar committed Mar 24, 2023
1 parent e8e3a47 commit 0bfe13e
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 54 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,6 @@
<version>4.0.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.google.common.collect.Iterators;
import org.apache.kafka.connect.data.ConnectSchema;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -32,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;
Expand Down Expand Up @@ -109,8 +109,12 @@ public void testBulkWriteWithNonTransientException() {
ArgumentCaptor<Iterable<CosmosItemOperation>> parameters = ArgumentCaptor.forClass(Iterable.class);
verify(container, times(1)).executeBulkOperations(parameters.capture());

AtomicInteger count = new AtomicInteger();
parameters.getValue().forEach(cosmosItemOperation -> {
count.incrementAndGet();
});
Iterator<CosmosItemOperation> bulkExecutionParameters = parameters.getValue().iterator();
assertEquals(2, Iterators.size(bulkExecutionParameters));
assertEquals(2, getIteratorSize(bulkExecutionParameters));
}

@Test
Expand Down Expand Up @@ -141,9 +145,9 @@ public void testBulkWriteSucceededWithTransientException() {

List<Iterable<CosmosItemOperation>> allParameters = parameters.getAllValues();
assertEquals(3, allParameters.size());
assertEquals(2, Iterators.size(allParameters.get(0).iterator()));
assertEquals(1, Iterators.size(allParameters.get(1).iterator()));
assertEquals(1, Iterators.size(allParameters.get(2).iterator()));
assertEquals(2, getIteratorSize(allParameters.get(0).iterator()));
assertEquals(1, getIteratorSize(allParameters.get(1).iterator()));
assertEquals(1, getIteratorSize(allParameters.get(2).iterator()));
}


Expand Down Expand Up @@ -210,4 +214,13 @@ private CosmosBulkOperationResponse mockFailedBulkOperationResponse(SinkRecord s

return mockedBulkOptionResponse;
}

private int getIteratorSize(Iterator<?> iterator) {
int count = 0;
while (iterator.hasNext()) {
iterator.next();
count++;
}
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package com.azure.cosmos.kafka.connect.sink;

import com.azure.cosmos.kafka.connect.CosmosDBConfig.CosmosClientBuilder;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.junit.Test;
import org.mockito.MockedStatic;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -25,7 +25,7 @@ public class CosmosDBSinkConnectorTest {

@Test
public void testValidateEmptyConfigFailsRequiredFields() {
Config config = new CosmosDBSinkConnector().validate(ImmutableMap.of());
Config config = new CosmosDBSinkConnector().validate(Collections.emptyMap());

Map<String, List<String>> errorMessages = config.configValues().stream()
.collect(Collectors.toMap(ConfigValue::name, ConfigValue::errorMessages));
Expand All @@ -46,7 +46,7 @@ public void testValidateCannotConnectToCosmos() {
.when(() -> CosmosClientBuilder.createClient(anyString(), anyString()))
.thenThrow(IllegalArgumentException.class);

Config config = connector.validate(ImmutableMap.of(
Config config = connector.validate(Map.of(
CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF, "https://endpoint:port/",
CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF, "superSecretPassword",
CosmosDBSinkConfig.COSMOS_DATABASE_NAME_CONF, "superAwesomeDatabase",
Expand All @@ -71,7 +71,7 @@ public void testValidateHappyPath() {
.then(answerVoid((s1, s2) -> {
}));

Config config = connector.validate(ImmutableMap.of(
Config config = connector.validate(Map.of(
CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF,
"https://cosmos-instance.documents.azure.com:443/",
CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF, "superSecretPassword",
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testValidateTopicMapValidFormat() {
}

private void invalidTopicMapString(CosmosDBSinkConnector connector, String topicMapConfig) {
Config config = connector.validate(ImmutableMap.of(
Config config = connector.validate(Map.of(
CosmosDBSinkConfig.COSMOS_CONN_ENDPOINT_CONF, "https://endpoint:port/",
CosmosDBSinkConfig.COSMOS_CONN_KEY_CONF, "superSecretPassword",
CosmosDBSinkConfig.COSMOS_DATABASE_NAME_CONF, "superAwesomeDatabase",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.azure.cosmos.kafka.connect.sink;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -28,7 +26,7 @@ public void emptyStructToEmptyMap() {
Schema schema = SchemaBuilder.struct()
.build();
Struct struct = new Struct(schema);
assertEquals(ImmutableMap.of(), StructToJsonMap.toJsonMap(struct));
assertEquals(Map.of(), StructToJsonMap.toJsonMap(struct));
}


Expand All @@ -38,10 +36,10 @@ public void structWithEmptyArrayToMap() {
.field("array_of_boolean", SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build());

Struct struct = new Struct(schema)
.put("array_of_boolean", ImmutableList.of());
.put("array_of_boolean", Map.of());

Map<String, Object> converted = StructToJsonMap.toJsonMap(struct);
assertEquals(ImmutableList.of(), ((List<Boolean>) converted.get("array_of_boolean")));
assertEquals(List.of(), converted.get("array_of_boolean"));
}

@Test
Expand Down Expand Up @@ -86,8 +84,8 @@ public void complexStructToMap() {
.put("string", quickBrownFox)
.put("struct", new Struct(embeddedSchema)
.put("embedded_string", quickBrownFox))
.put("array_of_boolean", ImmutableList.of(false))
.put("array_of_struct", ImmutableList.of(
.put("array_of_boolean", List.of(false))
.put("array_of_struct", List.of(
new Struct(embeddedSchema).put("embedded_string", quickBrownFox)));

Map<String, Object> converted = StructToJsonMap.toJsonMap(struct);
Expand All @@ -105,7 +103,7 @@ public void complexStructToMap() {
assertEquals(quickBrownFox, converted.get("string"));
assertEquals(quickBrownFox, ((Map<String, Object>) converted.get("struct")).get("embedded_string"));
assertEquals(false, ((List<Boolean>) converted.get("array_of_boolean")).get(0));
assertEquals(ImmutableMap.of("embedded_string", quickBrownFox), ((List<Struct>) converted.get("array_of_struct")).get(0));
assertEquals(Map.of("embedded_string", quickBrownFox), ((List<Struct>) converted.get("array_of_struct")).get(0));
assertNull(converted.get("optional_string"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.azure.cosmos.kafka.connect.sink.id.strategy;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -16,6 +14,9 @@
import org.junit.runners.Parameterized;
import org.mockito.Mock;

import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;
Expand All @@ -24,7 +25,7 @@
public class ProvidedInStrategyTest {
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> parameters() {
return ImmutableList.of(
return List.of(
new Object[]{ProvidedInValueStrategy.class, new ProvidedInValueStrategy()},
new Object[]{ProvidedInKeyStrategy.class, new ProvidedInKeyStrategy()}
);
Expand All @@ -43,7 +44,7 @@ public static Iterable<Object[]> parameters() {
public void setUp() {
initMocks(this);

strategy.configure(ImmutableMap.of());
strategy.configure(Map.of());
}

private void returnOnKeyOrValue(Schema schema, Object ret) {
Expand All @@ -64,21 +65,21 @@ public void valueNotStructOrMapShouldFail() {

@Test(expected = ConnectException.class)
public void noIdInValueShouldFail() {
returnOnKeyOrValue(null, ImmutableMap.of());
returnOnKeyOrValue(null, Map.of());
strategy.generateId(record);
}

@Test
public void stringIdOnMapShouldReturn() {
returnOnKeyOrValue(null, ImmutableMap.of(
returnOnKeyOrValue(null, Map.of(
"id", "1234567"
));
assertEquals("1234567", strategy.generateId(record));
}

@Test
public void nonStringIdOnMapShouldReturn() {
returnOnKeyOrValue(null, ImmutableMap.of(
returnOnKeyOrValue(null, Map.of(
"id", 1234567
));
assertEquals("1234567", strategy.generateId(record));
Expand Down Expand Up @@ -113,7 +114,7 @@ public void structIdOnStructShouldReturn() {

@Test
public void jsonPathOnStruct() {
strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"));

Schema idSchema = SchemaBuilder.struct()
.field("name", Schema.STRING_SCHEMA)
Expand All @@ -130,61 +131,61 @@ public void jsonPathOnStruct() {

@Test
public void jsonPathOnMap() {
strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.name"));
returnOnKeyOrValue(null,
ImmutableMap.of("id", ImmutableMap.of("name", "franz kafka")));
Map.of("id", Map.of("name", "franz kafka")));

assertEquals("franz kafka", strategy.generateId(record));
}

@Test(expected = ConnectException.class)
public void invalidJsonPathThrows() {
strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "invalid.path"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "invalid.path"));
returnOnKeyOrValue(null,
ImmutableMap.of("id", ImmutableMap.of("name", "franz kafka")));
Map.of("id", Map.of("name", "franz kafka")));

strategy.generateId(record);
}

@Test(expected = ConnectException.class)
public void jsonPathNotExistThrows() {
strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.not.exist"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id.not.exist"));
returnOnKeyOrValue(null,
ImmutableMap.of("id", ImmutableMap.of("name", "franz kafka")));
Map.of("id", Map.of("name", "franz kafka")));

strategy.generateId(record);
}

@Test
public void complexJsonPath() {
returnOnKeyOrValue(null,
ImmutableMap.of("id", ImmutableList.of(
ImmutableMap.of("id", 0,
Map.of("id", List.of(
Map.of("id", 0,
"name", "cosmos kramer",
"occupation", "unknown"),
ImmutableMap.of("id", 1,
Map.of("id", 1,
"name", "franz kafka",
"occupation", "writer")
)));

strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[0].name"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[0].name"));
assertEquals("cosmos kramer", strategy.generateId(record));

strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[1].name"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[1].name"));
assertEquals("franz kafka", strategy.generateId(record));

strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[*].id"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id[*].id"));
assertEquals("[0,1]", strategy.generateId(record));

strategy.configure(ImmutableMap.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id"));
strategy.configure(Map.of(ProvidedInConfig.JSON_PATH_CONFIG, "$.id"));
assertEquals(
"[{\"id\":0,\"name\":\"cosmos kramer\",\"occupation\":\"unknown\"},{\"id\":1,\"name\":\"franz kafka\",\"occupation\":\"writer\"}]",
strategy.generateId(record));
}

@Test
public void generatedIdSanitized() {
returnOnKeyOrValue(null, ImmutableMap.of("id", "#my/special\\id?"));
returnOnKeyOrValue(null, Map.of("id", "#my/special\\id?"));

String id = strategy.generateId(record);
assertEquals("_my_special_id_", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@

package com.azure.cosmos.kafka.connect.sink.id.strategy;

import com.google.common.collect.ImmutableMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Test;

import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -19,7 +20,7 @@ public class TemplateStrategyTest {

@Test
public void simpleKey() {
strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}"));
strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}"));
SinkRecord record = mock(SinkRecord.class);
when(record.keySchema()).thenReturn(Schema.STRING_SCHEMA);
when(record.key()).thenReturn("test");
Expand All @@ -30,7 +31,7 @@ public void simpleKey() {

@Test
public void kafkaMetadata() {
strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${topic}-${partition}-${offset}"));
strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${topic}-${partition}-${offset}"));
SinkRecord record = mock(SinkRecord.class);
when(record.topic()).thenReturn("mytopic");
when(record.kafkaPartition()).thenReturn(0);
Expand All @@ -42,14 +43,14 @@ public void kafkaMetadata() {

@Test
public void unknownVariablePreserved() {
strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${unknown}"));
strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${unknown}"));
String id = strategy.generateId(mock(SinkRecord.class));
assertEquals("${unknown}", id);
}

@Test
public void nestedStruct() {
strategy.configure(ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}"));
strategy.configure(Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "${key}"));
SinkRecord record = mock(SinkRecord.class);
Schema nestedSchema = SchemaBuilder.struct()
.field("nested_field", Schema.STRING_SCHEMA)
Expand All @@ -74,7 +75,7 @@ public void nestedStruct() {
@Test
public void fullKeyStrategyUsesFullKey() {
strategy = new FullKeyStrategy();
strategy.configure(ImmutableMap.of());
strategy.configure(Map.of());
SinkRecord record = mock(SinkRecord.class);
Schema schema = SchemaBuilder.struct()
.field("string_field", Schema.STRING_SCHEMA)
Expand All @@ -96,7 +97,7 @@ public void fullKeyStrategyUsesFullKey() {
@Test
public void metadataStrategyUsesMetadataWithDeliminator() {
strategy = new KafkaMetadataStrategy();
strategy.configure(ImmutableMap.of(KafkaMetadataStrategyConfig.DELIMITER_CONFIG, "_"));
strategy.configure(Map.of(KafkaMetadataStrategyConfig.DELIMITER_CONFIG, "_"));
SinkRecord record = mock(SinkRecord.class);
when(record.topic()).thenReturn("topic");
when(record.kafkaPartition()).thenReturn(0);
Expand All @@ -110,7 +111,7 @@ public void metadataStrategyUsesMetadataWithDeliminator() {
public void generatedIdSanitized() {
strategy = new TemplateStrategy();
strategy.configure(
ImmutableMap.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "#my/special\\id?"));
Map.of(TemplateStrategyConfig.TEMPLATE_CONFIG, "#my/special\\id?"));
SinkRecord record = mock(SinkRecord.class);

String id = strategy.generateId(record);
Expand Down

0 comments on commit 0bfe13e

Please sign in to comment.