Skip to content

Commit

Permalink
Fix empty airbyte source check (#14509)
Browse files Browse the repository at this point in the history
* fix empty airbyte source check
  • Loading branch information
alovew authored Jul 7, 2022
1 parent fc3efe1 commit 19c33b0
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoo

if (stateWrapper.isPresent() &&
stateWrapper.get().getStateType() == StateType.LEGACY &&
!isResetAllStreamsInCatalog(workerSourceConfig)) {
!resettingAllCatalogStreams(workerSourceConfig)) {
log.error("The state a legacy one but we are trying to do a partial update, this is not supported.");
throw new IllegalStateException("Try to perform a partial reset on a legacy state");
}
Expand Down Expand Up @@ -174,15 +174,14 @@ private Optional<AirbyteMessage> emitLegacyState() {
}
}

private boolean isResetAllStreamsInCatalog(final WorkerSourceConfig sourceConfig) {
private boolean resettingAllCatalogStreams(final WorkerSourceConfig sourceConfig) {
final Set<StreamDescriptor> catalogStreamDescriptors = sourceConfig.getCatalog().getStreams().stream().map(
configuredAirbyteStream -> new StreamDescriptor()
.withName(configuredAirbyteStream.getStream().getName())
.withNamespace(configuredAirbyteStream.getStream().getNamespace()))
.collect(Collectors.toSet());
final Set<StreamDescriptor> configStreamDescriptors = new HashSet<>(streamsToReset);

return catalogStreamDescriptors.equals(configStreamDescriptors);
final Set<StreamDescriptor> streamsToResetDescriptors = new HashSet<>(streamsToReset);
return streamsToResetDescriptors.containsAll(catalogStreamDescriptors);
}

private AirbyteMessage getNullStreamStateMessage(final StreamDescriptor streamsToReset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,10 @@ public void testPerStreamWithMissingState() throws Exception {
Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue();
}

// In the LEGACY state, if the list of streams passed in to be reset does not include every stream
// in the Catalog, then something has gone wrong and we should throw an error
@Test
public void testLegacyWithNewConfigMissingStream() {
public void testLegacyWithMissingCatalogStream() {

final List<StreamDescriptor> streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c"));

Expand All @@ -327,13 +329,53 @@ public void testLegacyWithNewConfigMissingStream() {

}

// If there are extra streams to reset that do not exist in the Catalog, the reset should work
// properly with all streams being reset
@Test
public void testLegacyWithResettingExtraStreamNotInCatalog() throws Exception {
final List<StreamDescriptor> streamToResetWithExtra = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c", "d"));

final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration()
.withStreamsToReset(streamToResetWithExtra);
final ConfiguredAirbyteCatalog airbyteCatalog = new ConfiguredAirbyteCatalog()
.withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c"))));

final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig()
.withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration))
.withState(new State()
.withState(Jsons.emptyObject()))
.withCatalog(airbyteCatalog);

emptyAirbyteSource.start(workerSourceConfig, null);

final Optional<AirbyteMessage> maybeMessage = emptyAirbyteSource.attemptRead();
Assertions.assertThat(maybeMessage)
.isNotEmpty();

final AirbyteMessage message = maybeMessage.get();
Assertions.assertThat(message.getType()).isEqualTo(Type.STATE);

final AirbyteStateMessage stateMessage = message.getState();
Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.LEGACY);
Assertions.assertThat(stateMessage.getData()).isEqualTo(Jsons.emptyObject());

Assertions.assertThat(emptyAirbyteSource.attemptRead())
.isEmpty();

Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue();

}

@Test
public void testLegacyWithNewConfig() throws Exception {
final List<StreamDescriptor> streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c"));

final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration()
.withStreamsToReset(streamToReset);
final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog()
final ConfiguredAirbyteCatalog airbyteCatalog = new ConfiguredAirbyteCatalog()
.withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")),
Expand All @@ -343,7 +385,7 @@ public void testLegacyWithNewConfig() throws Exception {
.withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration))
.withState(new State()
.withState(Jsons.emptyObject()))
.withCatalog(airbyteCatalogWithExtraStream);
.withCatalog(airbyteCatalog);

emptyAirbyteSource.start(workerSourceConfig, null);

Expand Down

0 comments on commit 19c33b0

Please sign in to comment.