Skip to content

Commit

Permalink
Fix build (#14799)
Browse files Browse the repository at this point in the history
  • Loading branch information
benmoriceau authored Jul 18, 2022
1 parent dd97278 commit 6d81a75
Showing 1 changed file with 31 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void end() {
}

@Test
public void testIncrementalCdcSync(TestInfo testInfo) throws Exception {
public void testIncrementalCdcSync(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());

final UUID connectionId = createCdcConnection();
Expand All @@ -174,7 +174,7 @@ public void testIncrementalCdcSync(TestInfo testInfo) throws Exception {
List<DestinationCdcRecordMatcher> expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);

List<StreamDescriptor> expectedStreams = List.of(
final List<StreamDescriptor> expectedStreams = List.of(
new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE),
new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE));
assertGlobalStateContainsStreams(connectionId, expectedStreams);
Expand Down Expand Up @@ -251,7 +251,7 @@ public void testIncrementalCdcSync(TestInfo testInfo) throws Exception {
// tests that incremental syncs still work properly even when using a destination connector that was
// built on the old protocol that did not have any per-stream state fields
@Test
public void testIncrementalCdcSyncWithLegacyDestinationConnector(TestInfo testInfo) throws Exception {
public void testIncrementalCdcSyncWithLegacyDestinationConnector(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());
final UUID postgresDestDefId = testHarness.getPostgresDestinationDefinitionId();
// Fetch the current/most recent source definition version
Expand All @@ -272,7 +272,7 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector(TestInfo testIn
}

@Test
public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception {
public void testDeleteRecordCdcSync(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());

final UUID connectionId = createCdcConnection();
Expand All @@ -284,7 +284,7 @@ public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception {
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));

final Database source = testHarness.getSourceDatabase();
List<DestinationCdcRecordMatcher> expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
final List<DestinationCdcRecordMatcher> expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);

final Instant beforeDelete = Instant.now();
Expand All @@ -293,7 +293,7 @@ public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception {
// delete a record
source.query(ctx -> ctx.execute("DELETE FROM id_and_name WHERE id=1"));

Map<String, Object> deletedRecordMap = new HashMap<>();
final Map<String, Object> deletedRecordMap = new HashMap<>();
deletedRecordMap.put(COLUMN_ID, 1);
deletedRecordMap.put(COLUMN_NAME, null);
expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
Expand All @@ -311,7 +311,7 @@ public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception {
}

@Test
public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception {
public void testPartialResetFromSchemaUpdate(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());

final UUID connectionId = createCdcConnection();
Expand All @@ -323,29 +323,29 @@ public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception

final Database source = testHarness.getSourceDatabase();

List<DestinationCdcRecordMatcher> expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
final List<DestinationCdcRecordMatcher> expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);

List<DestinationCdcRecordMatcher> expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
final List<DestinationCdcRecordMatcher> expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);

StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE);
StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE);
final StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE);
final StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE);
assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor));

LOGGER.info("Removing color palette table");
source.query(ctx -> ctx.dropTable(COLOR_PALETTE_TABLE).execute());

LOGGER.info("Refreshing schema and updating connection");
final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId));
UUID sourceId = createCdcSource().getSourceId();
AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId);
final UUID sourceId = createCdcSource().getSourceId();
final AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId);
LOGGER.info("Refreshed catalog: {}", refreshedCatalog);
WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead);
final WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead);
webBackendApi.webBackendUpdateConnectionNew(update);

LOGGER.info("Waiting for sync job after update to complete");
JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId);
final JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId);
waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);

// We do not check that the source and the dest are in sync here because removing a stream doesn't
Expand All @@ -354,7 +354,7 @@ public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception
}

@Test
public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Exception {
public void testPartialResetFromStreamSelection(final TestInfo testInfo) throws Exception {
LOGGER.info("Starting {}", testInfo.getDisplayName());

final UUID connectionId = createCdcConnection();
Expand All @@ -366,14 +366,14 @@ public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Except

final Database source = testHarness.getSourceDatabase();

List<DestinationCdcRecordMatcher> expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
final List<DestinationCdcRecordMatcher> expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);

List<DestinationCdcRecordMatcher> expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
final List<DestinationCdcRecordMatcher> expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);

StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE);
StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE);
final StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE);
final StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE);
assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor));

LOGGER.info("Removing color palette stream from configured catalog");
Expand Down Expand Up @@ -447,8 +447,8 @@ public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Except
assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor));
}

private List<DestinationCdcRecordMatcher> getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException {
List<JsonNode> sourceRecords = testHarness.retrieveSourceRecords(source, tableName);
private List<DestinationCdcRecordMatcher> getCdcRecordMatchersFromSource(final Database source, final String tableName) throws SQLException {
final List<JsonNode> sourceRecords = testHarness.retrieveSourceRecords(source, tableName);
return new ArrayList<>(sourceRecords
.stream()
.map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty()))
Expand Down Expand Up @@ -500,7 +500,8 @@ private SourceRead createCdcSource() throws ApiException {
Jsons.jsonNode(sourceDbConfigMap));
}

private void assertDestinationMatches(String streamName, List<DestinationCdcRecordMatcher> expectedDestRecordMatchers) throws Exception {
private void assertDestinationMatches(final String streamName, final List<DestinationCdcRecordMatcher> expectedDestRecordMatchers)
throws Exception {
final List<JsonNode> destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, streamName));
if (destRecords.size() != expectedDestRecordMatchers.size()) {
final String errorMessage = String.format(
Expand All @@ -512,22 +513,22 @@ private void assertDestinationMatches(String streamName, List<DestinationCdcReco
throw new IllegalStateException(errorMessage);
}

for (DestinationCdcRecordMatcher recordMatcher : expectedDestRecordMatchers) {
for (final DestinationCdcRecordMatcher recordMatcher : expectedDestRecordMatchers) {
final List<JsonNode> matchingDestRecords = destRecords.stream().filter(destRecord -> {
Map<String, Object> sourceRecordMap = Jsons.object(recordMatcher.sourceRecord, Map.class);
Map<String, Object> destRecordMap = Jsons.object(destRecord, Map.class);
final Map<String, Object> sourceRecordMap = Jsons.object(recordMatcher.sourceRecord, Map.class);
final Map<String, Object> destRecordMap = Jsons.object(destRecord, Map.class);

boolean sourceRecordValuesMatch = sourceRecordMap.keySet()
final boolean sourceRecordValuesMatch = sourceRecordMap.keySet()
.stream()
.allMatch(column -> Objects.equals(sourceRecordMap.get(column), destRecordMap.get(column)));

final Object cdcUpdatedAtValue = destRecordMap.get(CDC_UPDATED_AT_COLUMN);
// use epoch millis to guarantee the two values are at the same precision
boolean cdcUpdatedAtMatches = cdcUpdatedAtValue != null
final boolean cdcUpdatedAtMatches = cdcUpdatedAtValue != null
&& Instant.parse(String.valueOf(cdcUpdatedAtValue)).toEpochMilli() >= recordMatcher.minUpdatedAt.toEpochMilli();

final Object cdcDeletedAtValue = destRecordMap.get(CDC_DELETED_AT_COLUMN);
boolean cdcDeletedAtMatches;
final boolean cdcDeletedAtMatches;
if (recordMatcher.minDeletedAt.isPresent()) {
cdcDeletedAtMatches = cdcDeletedAtValue != null
&& Instant.parse(String.valueOf(cdcDeletedAtValue)).toEpochMilli() >= recordMatcher.minDeletedAt.get().toEpochMilli();
Expand Down Expand Up @@ -594,6 +595,7 @@ private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connectio
}

// can be helpful for debugging
@SuppressWarnings("PMD")
private void printDbs() throws SQLException {
final Database sourceDb = testHarness.getSourceDatabase();
Set<SchemaTableNamePair> pairs = testHarness.listAllTables(sourceDb);
Expand Down

0 comments on commit 6d81a75

Please sign in to comment.