Skip to content

Commit

Permalink
Merge pull request #534 from xinlian12/removeUnnecessaryLeaseQuery
Browse files Browse the repository at this point in the history
usingLsnAsTheRecordOffset
  • Loading branch information
xinlian12 authored Oct 23, 2023
2 parents c2d37a8 + 3d4f33f commit 0f01263
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## Release History
### 1.11.0-Beta.1 (Unreleased)
#### Other Changes
* Construct `SourceRecord` offset based on `_lsn` from the item in `CosmosDBSourceConnector`. [PR 534](https://github.com/microsoft/kafka-connect-cosmosdb/pull/534)

### 1.10.0 (2023-10-10)
#### New Features
* Added compression feature to resolve duplicate records in a single batch when consuming from kafka topic in the bulk mode for sink connector through new config `connect.cosmos.sink.bulk.compression.enabled`. [PR 515](https://github.com/microsoft/kafka-connect-cosmosdb/pull/515)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.10.0</version>
<version>1.11.0-Beta.1</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -44,7 +43,7 @@ public class CosmosDBSourceTask extends SourceTask {

private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class);
private static final String OFFSET_KEY = "recordContinuationToken";
private static final String CONTINUATION_TOKEN = "ContinuationToken";
private static final String LSN_ATTRIBUTE_NAME = "_lsn";

private final AtomicBoolean running = new AtomicBoolean(false);
private CosmosAsyncClient client = null;
Expand Down Expand Up @@ -105,26 +104,8 @@ public void start(Map<String, String> map) {
logger.info("Started CosmosDB source task.");
}

private JsonNode getLeaseContainerRecord() {
String sql = "SELECT * FROM c WHERE IS_DEFINED(c.Owner)";
Iterable<JsonNode> filteredDocs = leaseContainer.queryItems(sql, new CosmosQueryRequestOptions(), JsonNode.class).toIterable();
if (filteredDocs.iterator().hasNext()) {
JsonNode result = filteredDocs.iterator().next();
// Return node only if it has the continuation token field present
if (result.has(CONTINUATION_TOKEN)) {
return result;
}
}

return null;
}

private String getContinuationToken() {
JsonNode leaseRecord = getLeaseContainerRecord();
if (client == null || leaseRecord == null) {
return null;
}
return leaseRecord.get(CONTINUATION_TOKEN).textValue();
private String getItemLsn(JsonNode item) {
return item.get(LSN_ATTRIBUTE_NAME).asText();
}

@Override
Expand Down Expand Up @@ -171,10 +152,8 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : "";
}

// Get the latest token and record as offset
// TODO: The continuationToken here is picked from any lease with owner, so maybe a little bit random
// change to show the continuationToken for the leases processed by the current worker
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken());
// Get the latest lsn and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));

if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class CosmosDBSourceTaskTest {
private final String topicName = "testtopic";
private final String containerName = "container666";
private final String databaseName = "fakeDatabase312";
private final String OFFSET_KEY = "recordContinuationToken";
private CosmosAsyncClient mockCosmosClient;
private CosmosAsyncContainer mockFeedContainer;
private CosmosAsyncContainer mockLeaseContainer;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void setup() throws IllegalAccessException {

@Test
public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -124,7 +125,7 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc

@Test
public void testPoll() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -144,7 +145,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill
@Test
public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, JsonProcessingException, IllegalAccessException {
// test when should fillMoreRecords false, then poll method will return immediately
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);

Expand All @@ -168,7 +169,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J

@Test
public void testPollWithMessageKey() throws InterruptedException, JsonProcessingException {
String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -183,9 +184,27 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing
Assert.assertEquals("123", result.get(0).key());
}

@Test
public void testSourceRecordOffset() throws InterruptedException, JsonProcessingException {
String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
changes.add(actualObj);

new Thread(() -> {
testTask.handleCosmosDbChanges(changes);
}).start();

List<SourceRecord> results = testTask.poll();
Assert.assertEquals(1, results.size());
Assert.assertEquals("123", results.get(0).key());
Assert.assertEquals("2", results.get(0).sourceOffset().get(OFFSET_KEY));
}

@Test
public void testZeroBatchSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -204,7 +223,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep

@Test
public void testSmallBufferSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -224,7 +243,7 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc

@Test(expected=IllegalStateException.class)
public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand Down

0 comments on commit 0f01263

Please sign in to comment.