diff --git a/CHANGELOG.md b/CHANGELOG.md
index f2207025..6eb11e55 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,8 @@
## Release History
+### 1.11.0-Beta.1 (Unreleased)
+#### Other Changes
+* Construct `SourceRecord` offset based on `_lsn` from the item in `CosmosDBSourceConnector`. [PR 534](https://github.com/microsoft/kafka-connect-cosmosdb/pull/534)
+
### 1.10.0 (2023-10-10)
#### New Features
* Added compression feature to resolve duplicate records in a single batch when consuming from kafka topic in the bulk mode for sink connector through new config `connect.cosmos.sink.bulk.compression.enabled`. [PR 515](https://github.com/microsoft/kafka-connect-cosmosdb/pull/515)
diff --git a/pom.xml b/pom.xml
index ab6a50bf..5567b949 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.azure.cosmos.kafka
kafka-connect-cosmos
- 1.10.0
+ 1.11.0-Beta.1
kafka-connect-cosmos
https://github.com/microsoft/kafka-connect-cosmosdb
diff --git a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
index 5e3c2757..bb87e758 100644
--- a/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
+++ b/src/main/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTask.java
@@ -17,7 +17,6 @@
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
-import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
@@ -44,7 +43,7 @@ public class CosmosDBSourceTask extends SourceTask {
private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class);
private static final String OFFSET_KEY = "recordContinuationToken";
- private static final String CONTINUATION_TOKEN = "ContinuationToken";
+ private static final String LSN_ATTRIBUTE_NAME = "_lsn";
private final AtomicBoolean running = new AtomicBoolean(false);
private CosmosAsyncClient client = null;
@@ -105,26 +104,8 @@ public void start(Map map) {
logger.info("Started CosmosDB source task.");
}
- private JsonNode getLeaseContainerRecord() {
- String sql = "SELECT * FROM c WHERE IS_DEFINED(c.Owner)";
- Iterable filteredDocs = leaseContainer.queryItems(sql, new CosmosQueryRequestOptions(), JsonNode.class).toIterable();
- if (filteredDocs.iterator().hasNext()) {
- JsonNode result = filteredDocs.iterator().next();
- // Return node only if it has the continuation token field present
- if (result.has(CONTINUATION_TOKEN)) {
- return result;
- }
- }
-
- return null;
- }
-
- private String getContinuationToken() {
- JsonNode leaseRecord = getLeaseContainerRecord();
- if (client == null || leaseRecord == null) {
- return null;
- }
- return leaseRecord.get(CONTINUATION_TOKEN).textValue();
+ private String getItemLsn(JsonNode item) {
+ return item.get(LSN_ATTRIBUTE_NAME).asText();
}
@Override
@@ -171,10 +152,8 @@ private void fillRecords(List records, String topic) throws Interr
messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : "";
}
- // Get the latest token and record as offset
- // TODO: The continuationToken here is picked from any lease with owner, so maybe a little bit random
- // change to show the continuationToken for the leases processed by the current worker
- Map sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken());
+ // Get the latest lsn and record as offset
+ Map sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));
if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
diff --git a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
index 6d9f3320..fe6c05d6 100644
--- a/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
+++ b/src/test/java/com/azure/cosmos/kafka/connect/source/CosmosDBSourceTaskTest.java
@@ -35,6 +35,7 @@ public class CosmosDBSourceTaskTest {
private final String topicName = "testtopic";
private final String containerName = "container666";
private final String databaseName = "fakeDatabase312";
+ private final String OFFSET_KEY = "recordContinuationToken";
private CosmosAsyncClient mockCosmosClient;
private CosmosAsyncContainer mockFeedContainer;
private CosmosAsyncContainer mockLeaseContainer;
@@ -97,7 +98,7 @@ public void setup() throws IllegalAccessException {
@Test
public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException {
- String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List changes = new ArrayList<>();
@@ -124,7 +125,7 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc
@Test
public void testPoll() throws InterruptedException, JsonProcessingException, IllegalAccessException {
- String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List changes = new ArrayList<>();
@@ -144,7 +145,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill
@Test
public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, JsonProcessingException, IllegalAccessException {
// test when should fillMoreRecords false, then poll method will return immediately
- String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
@@ -168,7 +169,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J
@Test
public void testPollWithMessageKey() throws InterruptedException, JsonProcessingException {
- String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\"}";
+ String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List changes = new ArrayList<>();
@@ -183,9 +184,27 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing
Assert.assertEquals("123", result.get(0).key());
}
+ @Test
+ public void testSourceRecordOffset() throws InterruptedException, JsonProcessingException {
+ String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode actualObj = mapper.readTree(jsonString);
+ List changes = new ArrayList<>();
+ changes.add(actualObj);
+
+ new Thread(() -> {
+ testTask.handleCosmosDbChanges(changes);
+ }).start();
+
+ List results = testTask.poll();
+ Assert.assertEquals(1, results.size());
+ Assert.assertEquals("123", results.get(0).key());
+ Assert.assertEquals("2", results.get(0).sourceOffset().get(OFFSET_KEY));
+ }
+
@Test
public void testZeroBatchSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
- String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List changes = new ArrayList<>();
@@ -204,7 +223,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep
@Test
public void testSmallBufferSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
- String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List changes = new ArrayList<>();
@@ -224,7 +243,7 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc
@Test(expected=IllegalStateException.class)
public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException {
- String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
+ String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List changes = new ArrayList<>();