Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block ChangeFeedMode interoperability for ChangeFeedProcessor. #43798

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.changefeed.pkversion.ServiceItemLease;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand All @@ -21,11 +23,12 @@

import java.time.Duration;

import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;

public class BootstrapperImplTests {

private static final String baseContinuationStringForFullRange = "{\"V\":1," +
private static final String BASE_CONTINUATION_STRING_FOR_EPK_FULL_RANGE = "{\"V\":1," +
"\"Rid\":\"%s\"," +
"\"Continuation\":[" +
"{\"token\":\"%s\",\"range\":{\"min\":\"\",\"max\":\"FF\"}}" +
Expand All @@ -34,9 +37,12 @@ public class BootstrapperImplTests {

@DataProvider(name = "leaseProvider")
public Object[][] leaseProvider() {

String BASE_CONTINUATION_STRING_FOR_PK_FULL_RANGE = "\"100\"";

return new Object[][] {
{
createLeaseWithContinuation(
createEpkRangeBasedLeaseWithContinuation(
true,
ChangeFeedMode.FULL_FIDELITY,
ChangeFeedStartFromInternal.createFromNow(),
Expand All @@ -45,10 +51,11 @@ public Object[][] leaseProvider() {
"0",
"-FF",
"0"),
null,
false
},
{
createLeaseWithContinuation(
createEpkRangeBasedLeaseWithContinuation(
true,
ChangeFeedMode.INCREMENTAL,
ChangeFeedStartFromInternal.createFromNow(),
Expand All @@ -57,10 +64,11 @@ public Object[][] leaseProvider() {
"0",
"-FF",
"0"),
null,
true
},
{
createLeaseWithContinuation(
createEpkRangeBasedLeaseWithContinuation(
false,
ChangeFeedMode.INCREMENTAL,
ChangeFeedStartFromInternal.createFromNow(),
Expand All @@ -69,10 +77,11 @@ public Object[][] leaseProvider() {
"0",
"-FF",
"0"),
null,
false
},
{
createLeaseWithContinuation(
createEpkRangeBasedLeaseWithContinuation(
false,
ChangeFeedMode.FULL_FIDELITY,
ChangeFeedStartFromInternal.createFromNow(),
Expand All @@ -81,13 +90,51 @@ public Object[][] leaseProvider() {
"0",
"-FF",
"0"),
null,
false
},
{
createEpkRangeBasedLeaseWithContinuation(
true,
ChangeFeedMode.FULL_FIDELITY,
ChangeFeedStartFromInternal.createFromNow(),
"XyJKUI7=",
"NO67Hq=",
"0",
"-FF",
"0"),
createPkRangeBasedLeaseWithContinuation(
true,
"XyJKUI7=",
"NO67Hq=",
"-FF",
BASE_CONTINUATION_STRING_FOR_PK_FULL_RANGE),
true
},
{
null,
createPkRangeBasedLeaseWithContinuation(
true,
"XyJKUI7=",
"NO67Hq=",
"-FF",
BASE_CONTINUATION_STRING_FOR_PK_FULL_RANGE),
true
},
{
null,
null,
false
}
};
}

@Test(groups = {"unit"}, dataProvider = "leaseProvider")
public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(ServiceItemLeaseV1 lease, boolean expectIllegalStateException) {
public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(
ServiceItemLeaseV1 epkRangeBasedLease,
ServiceItemLease pkRangeBasedLease,
boolean expectIllegalStateException) {

Duration lockTime = Duration.ofSeconds(5);
Duration expireTIme = Duration.ofSeconds(5);

Expand All @@ -104,13 +151,30 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
Mockito.when(leaseStoreMock.releaseInitializationLock()).thenReturn(Mono.empty());

LeaseStoreManager epkRangeVersionLeaseStoreManagerMock = Mockito.mock(LeaseStoreManager.class);
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.just(lease));
LeaseStoreManager pkRangeVersionLeaseStoreManagerMock = Mockito.mock(LeaseStoreManager.class);

ChangeFeedProcessorOptions changeFeedProcessorOptionsMock = Mockito.mock(ChangeFeedProcessorOptions.class);

if (epkRangeBasedLease == null) {
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.empty());
} else {
Mockito.when(epkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.just(epkRangeBasedLease));
}

if (pkRangeBasedLease == null) {
Mockito.when(pkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.empty());
} else {
Mockito.when(pkRangeVersionLeaseStoreManagerMock.getTopLeases(Mockito.eq(1))).thenReturn(Flux.just(pkRangeBasedLease));
}

Bootstrapper bootstrapper = new BootstrapperImpl(
partitionSynchronizerMock,
leaseStoreMock,
lockTime,
expireTIme,
epkRangeVersionLeaseStoreManagerMock,
pkRangeVersionLeaseStoreManagerMock,
changeFeedProcessorOptionsMock,
ChangeFeedMode.FULL_FIDELITY);

if (expectIllegalStateException) {
Expand All @@ -119,12 +183,17 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
bootstrapper.initialize().block();
}

Mockito.verify(epkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
Mockito.verify(pkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));

if (pkRangeBasedLease == null) {
Mockito.verify(epkRangeVersionLeaseStoreManagerMock, times(1)).getTopLeases(Mockito.eq(1));
}

Mockito.verify(partitionSynchronizerMock, times(1)).createMissingLeases();
Mockito.verify(leaseStoreMock, times(2)).isInitialized();
}

private static ServiceItemLeaseV1 createLeaseWithContinuation(
private static ServiceItemLeaseV1 createEpkRangeBasedLeaseWithContinuation(
boolean withContinuation,
ChangeFeedMode changeFeedMode,
ChangeFeedStartFromInternal startFromSettings,
Expand All @@ -141,8 +210,9 @@ private static ServiceItemLeaseV1 createLeaseWithContinuation(

if (withContinuation) {
FeedRangePartitionKeyRangeImpl feedRangePartitionKeyRangeImpl = new FeedRangePartitionKeyRangeImpl(pkRangeId);

String continuationAsJsonString = String.format(
baseContinuationStringForFullRange,
BASE_CONTINUATION_STRING_FOR_EPK_FULL_RANGE,
collectionRid,
continuationToken,
pkRangeId);
Expand All @@ -161,4 +231,24 @@ private static ServiceItemLeaseV1 createLeaseWithContinuation(

return lease;
}

private static ServiceItemLease createPkRangeBasedLeaseWithContinuation(
boolean withContinuation,
String databaseRid,
String collectionRid,
String leaseToken,
String continuationToken) {

ServiceItemLease lease = new ServiceItemLease();

lease.setId(String.format("%s_%s..%s", databaseRid, collectionRid, leaseToken));

lease = lease.withLeaseToken(leaseToken);

if (withContinuation) {
lease = lease.withContinuationToken(continuationToken);
}

return lease;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -72,6 +73,11 @@ public void initializeStoreFromPkRangeIdVersionLeaseStore() {
Mockito.when(partitionSynchronizerMock.createMissingLeases(Mockito.any())).thenReturn(Mono.empty());

LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);

ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();

changeFeedProcessorOptions.setLeasePrefix("testLease");

Mockito
.when(leaseStoreMock.isInitialized())
.thenReturn(Mono.just(false))
Expand All @@ -94,6 +100,7 @@ public void initializeStoreFromPkRangeIdVersionLeaseStore() {
expireTIme,
pkRangeIdVersionLeaseStoreManagerMock,
epkRangeVersionLeaseStoreManagerMock,
changeFeedProcessorOptions,
ChangeFeedMode.INCREMENTAL);

bootstrapper.initialize().block();
Expand All @@ -114,6 +121,8 @@ public void initializeStoreFromScratch() {
Duration expireTIme = Duration.ofSeconds(5);

PartitionSynchronizer partitionSynchronizerMock = Mockito.mock(PartitionSynchronizer.class);
ChangeFeedProcessorOptions changeFeedProcessorOptionsMock = Mockito.mock(ChangeFeedProcessorOptions.class);

Mockito.when(partitionSynchronizerMock.createMissingLeases()).thenReturn(Mono.empty());

LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);
Expand All @@ -139,6 +148,7 @@ public void initializeStoreFromScratch() {
expireTIme,
pkRangeIdVersionLeaseStoreManagerMock,
epkRangeVersionLeaseStoreManagerMock,
changeFeedProcessorOptionsMock,
ChangeFeedMode.INCREMENTAL);

bootstrapper.initialize().block();
Expand All @@ -156,6 +166,8 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
Duration expireTIme = Duration.ofSeconds(5);

PartitionSynchronizer partitionSynchronizerMock = Mockito.mock(PartitionSynchronizer.class);
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();

Mockito.when(partitionSynchronizerMock.createMissingLeases(Mockito.any())).thenReturn(Mono.empty());

LeaseStore leaseStoreMock = Mockito.mock(LeaseStore.class);
Expand All @@ -181,6 +193,7 @@ public void tryInitializeStoreFromEpkVersionLeaseStoreWithExistingLeases(Service
expireTIme,
pkRangeIdVersionLeaseStoreManagerMock,
epkRangeVersionLeaseStoreManagerMock,
changeFeedProcessorOptions,
ChangeFeedMode.INCREMENTAL);

if (expectIllegalStateException) {
Expand Down
Loading
Loading