From 49d5274c0539199b2a815487ee823e65615794b5 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Oct 2023 10:52:10 -0700 Subject: [PATCH 1/3] using lsn as the sourceoffset --- .../connect/source/CosmosDBSourceTask.java | 31 +++---------------- .../source/CosmosDBSourceTaskTest.java | 19 ++++++++++++ 2 files changed, 24 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java index 5e3c2757..bb87e758 100644 --- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java +++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java @@ -17,7 +17,6 @@ import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosContainerResponse; -import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.ThroughputProperties; import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Schema; @@ -44,7 +43,7 @@ public class CosmosDBSourceTask extends SourceTask { private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class); private static final String OFFSET_KEY = "recordContinuationToken"; - private static final String CONTINUATION_TOKEN = "ContinuationToken"; + private static final String LSN_ATTRIBUTE_NAME = "_lsn"; private final AtomicBoolean running = new AtomicBoolean(false); private CosmosAsyncClient client = null; @@ -105,26 +104,8 @@ public void start(Map map) { logger.info("Started CosmosDB source task."); } - private JsonNode getLeaseContainerRecord() { - String sql = "SELECT * FROM c WHERE IS_DEFINED(c.Owner)"; - Iterable filteredDocs = leaseContainer.queryItems(sql, new CosmosQueryRequestOptions(), JsonNode.class).toIterable(); - if (filteredDocs.iterator().hasNext()) { - JsonNode result = filteredDocs.iterator().next(); - // Return node only if it has the continuation token field present - if (result.has(CONTINUATION_TOKEN)) { - return result; - } - } - - return null; - } - - private String getContinuationToken() { - JsonNode leaseRecord = getLeaseContainerRecord(); - if (client == null || leaseRecord == null) { - return null; - } - return leaseRecord.get(CONTINUATION_TOKEN).textValue(); + private String getItemLsn(JsonNode item) { + return item.get(LSN_ATTRIBUTE_NAME).asText(); } @Override @@ -171,10 +152,8 @@ private void fillRecords(List records, String topic) throws Interr messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : ""; } - // Get the latest token and record as offset - // TODO: The continuationToken here is picked from any lease with owner, so maybe a little bit random - // change to show the continuationToken for the leases processed by the current worker - Map sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken()); + // Get the latest lsn and record as offset + Map sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node)); if (logger.isDebugEnabled()) { logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY)); diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java index 6d9f3320..a46ed27d 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java @@ -35,6 +35,7 @@ public class CosmosDBSourceTaskTest { private final String topicName = "testtopic"; private final String containerName = "container666"; private final String databaseName = "fakeDatabase312"; + private final String OFFSET_KEY = "recordContinuationToken"; private CosmosAsyncClient mockCosmosClient; private CosmosAsyncContainer mockFeedContainer; private CosmosAsyncContainer mockLeaseContainer; @@ -183,6 +184,24 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing Assert.assertEquals("123", result.get(0).key()); } + @Test + public void testSourceRecordOffset() throws InterruptedException, JsonProcessingException { + String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; + ObjectMapper mapper = new ObjectMapper(); + JsonNode actualObj = mapper.readTree(jsonString); + List changes = new ArrayList<>(); + changes.add(actualObj); + + new Thread(() -> { + testTask.handleCosmosDbChanges(changes); + }).start(); + + List results = testTask.poll(); + Assert.assertEquals(1, results.size()); + Assert.assertEquals("123", results.get(0).key()); + Assert.assertEquals("2", results.get(0).sourceOffset().get(OFFSET_KEY)); + } + @Test public void testZeroBatchSize() throws InterruptedException, JsonProcessingException, IllegalAccessException { String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; From 41e5ab233dc0fcf33c1aaa21d07de7246fc89ed0 Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Oct 2023 12:26:51 -0700 Subject: [PATCH 2/3] fix tests --- .../connect/source/CosmosDBSourceTaskTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java index a46ed27d..fe6c05d6 100644 --- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java +++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java @@ -98,7 +98,7 @@ public void setup() throws IllegalAccessException { @Test public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException { - String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; + String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(jsonString); List changes = new ArrayList<>(); @@ -125,7 +125,7 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc @Test public void testPoll() throws InterruptedException, JsonProcessingException, IllegalAccessException { - String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; + String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(jsonString); List changes = new ArrayList<>(); @@ -145,7 +145,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill @Test public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, JsonProcessingException, IllegalAccessException { // test when should fillMoreRecords false, then poll method will return immediately - String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; + String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(jsonString); @@ -169,7 +169,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J @Test public void testPollWithMessageKey() throws InterruptedException, JsonProcessingException { - String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\"}"; + String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(jsonString); List changes = new ArrayList<>(); @@ -204,7 +204,7 @@ public void testSourceRecordOffset() throws InterruptedException, JsonProcessing @Test public void testZeroBatchSize() throws InterruptedException, JsonProcessingException, IllegalAccessException { - String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; + String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(jsonString); List changes = new ArrayList<>(); @@ -223,7 +223,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep @Test public void testSmallBufferSize() throws InterruptedException, JsonProcessingException, IllegalAccessException { - String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; + String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(jsonString); List changes = new ArrayList<>(); @@ -243,7 +243,7 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc @Test(expected=IllegalStateException.class) public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException { - String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}"; + String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}"; ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(jsonString); List changes = new ArrayList<>(); From 3d4f33fd79b25b4f51f48d773f9ca2d2d091104a Mon Sep 17 00:00:00 2001 From: annie-mac Date: Tue, 17 Oct 2023 12:33:01 -0700 Subject: [PATCH 3/3] update changelog --- CHANGELOG.md | 4 ++++ pom.xml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f2207025..6eb11e55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ ## Release History +### 1.11.0-Beta.1 (Unreleased) +#### Other Changes +* Construct `SourceRecord` offset based on `_lsn` from the item in `CosmosDBSourceConnector`. [PR 534](https://github.com/microsoft/kafka-connect-cosmosdb/pull/534) + ### 1.10.0 (2023-10-10) #### New Features * Added compression feature to resolve duplicate records in a single batch when consuming from kafka topic in the bulk mode for sink connector through new config `connect.cosmos.sink.bulk.compression.enabled`. [PR 515](https://github.com/microsoft/kafka-connect-cosmosdb/pull/515) diff --git a/pom.xml b/pom.xml index ab6a50bf..5567b949 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.azure.cosmos.kafka kafka-connect-cosmos - 1.10.0 + 1.11.0-Beta.1 kafka-connect-cosmos https://github.com/microsoft/kafka-connect-cosmosdb