Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usingLsnAsTheRecordOffset #534

Merged
merged 7 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## Release History
### 1.11.0-Beta.1 (Unreleased)
#### Other Changes
* Construct `SourceRecord` offset based on `_lsn` from the item in `CosmosDBSourceConnector`. [PR 534](https://github.com/microsoft/kafka-connect-cosmosdb/pull/534)

### 1.10.0 (2023-10-10)
#### New Features
* Added compression feature to resolve duplicate records in a single batch when consuming from kafka topic in the bulk mode for sink connector through new config `connect.cosmos.sink.bulk.compression.enabled`. [PR 515](https://github.com/microsoft/kafka-connect-cosmosdb/pull/515)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.10.0</version>
<version>1.11.0-Beta.1</version>

<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosContainerRequestOptions;
import com.azure.cosmos.models.CosmosContainerResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -44,7 +43,7 @@ public class CosmosDBSourceTask extends SourceTask {

private static final Logger logger = LoggerFactory.getLogger(CosmosDBSourceTask.class);
private static final String OFFSET_KEY = "recordContinuationToken";
private static final String CONTINUATION_TOKEN = "ContinuationToken";
private static final String LSN_ATTRIBUTE_NAME = "_lsn";

private final AtomicBoolean running = new AtomicBoolean(false);
private CosmosAsyncClient client = null;
Expand Down Expand Up @@ -105,26 +104,8 @@ public void start(Map<String, String> map) {
logger.info("Started CosmosDB source task.");
}

private JsonNode getLeaseContainerRecord() {
String sql = "SELECT * FROM c WHERE IS_DEFINED(c.Owner)";
Iterable<JsonNode> filteredDocs = leaseContainer.queryItems(sql, new CosmosQueryRequestOptions(), JsonNode.class).toIterable();
if (filteredDocs.iterator().hasNext()) {
JsonNode result = filteredDocs.iterator().next();
// Return node only if it has the continuation token field present
if (result.has(CONTINUATION_TOKEN)) {
return result;
}
}

return null;
}

private String getContinuationToken() {
JsonNode leaseRecord = getLeaseContainerRecord();
if (client == null || leaseRecord == null) {
return null;
}
return leaseRecord.get(CONTINUATION_TOKEN).textValue();
private String getItemLsn(JsonNode item) {
return item.get(LSN_ATTRIBUTE_NAME).asText();
}

@Override
Expand Down Expand Up @@ -171,10 +152,8 @@ private void fillRecords(List<SourceRecord> records, String topic) throws Interr
messageKey = (messageKeyFieldNode != null) ? messageKeyFieldNode.toString() : "";
}

// Get the latest token and record as offset
// TODO: The continuationToken here is picked from any lease with owner, so maybe a little bit random
// change to show the continuationToken for the leases processed by the current worker
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getContinuationToken());
// Get the latest lsn and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));

if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class CosmosDBSourceTaskTest {
private final String topicName = "testtopic";
private final String containerName = "container666";
private final String databaseName = "fakeDatabase312";
private final String OFFSET_KEY = "recordContinuationToken";
private CosmosAsyncClient mockCosmosClient;
private CosmosAsyncContainer mockFeedContainer;
private CosmosAsyncContainer mockLeaseContainer;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void setup() throws IllegalAccessException {

@Test
public void testHandleChanges() throws JsonProcessingException, IllegalAccessException, InterruptedException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -124,7 +125,7 @@ public void testHandleChanges() throws JsonProcessingException, IllegalAccessExc

@Test
public void testPoll() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -144,7 +145,7 @@ public void testPoll() throws InterruptedException, JsonProcessingException, Ill
@Test
public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, JsonProcessingException, IllegalAccessException {
// test when should fillMoreRecords false, then poll method will return immediately
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);

Expand All @@ -168,7 +169,7 @@ public void testPoll_shouldFillMoreRecordsFalse() throws InterruptedException, J

@Test
public void testPollWithMessageKey() throws InterruptedException, JsonProcessingException {
String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -183,9 +184,27 @@ public void testPollWithMessageKey() throws InterruptedException, JsonProcessing
Assert.assertEquals("123", result.get(0).key());
}

@Test
public void testSourceRecordOffset() throws InterruptedException, JsonProcessingException {
String jsonString = "{\"id\":123,\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
changes.add(actualObj);

new Thread(() -> {
testTask.handleCosmosDbChanges(changes);
}).start();

List<SourceRecord> results = testTask.poll();
Assert.assertEquals(1, results.size());
Assert.assertEquals("123", results.get(0).key());
Assert.assertEquals("2", results.get(0).sourceOffset().get(OFFSET_KEY));
}

@Test
public void testZeroBatchSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -204,7 +223,7 @@ public void testZeroBatchSize() throws InterruptedException, JsonProcessingExcep

@Test
public void testSmallBufferSize() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand All @@ -224,7 +243,7 @@ public void testSmallBufferSize() throws InterruptedException, JsonProcessingExc

@Test(expected=IllegalStateException.class)
public void testEmptyAssignedContainerThrowsIllegalStateException() throws InterruptedException, JsonProcessingException, IllegalAccessException {
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\"}";
String jsonString = "{\"k1\":\"v1\",\"k2\":\"v2\", \"_lsn\":\"2\"}";
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(jsonString);
List<JsonNode> changes = new ArrayList<>();
Expand Down
Loading