diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java index 747f9eb4cac6..9bbeb99be12a 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java @@ -88,7 +88,7 @@ public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoo if (stateWrapper.isPresent() && stateWrapper.get().getStateType() == StateType.LEGACY && - !isResetAllStreamsInCatalog(workerSourceConfig)) { + !resettingAllCatalogStreams(workerSourceConfig)) { log.error("The state a legacy one but we are trying to do a partial update, this is not supported."); throw new IllegalStateException("Try to perform a partial reset on a legacy state"); } @@ -174,15 +174,14 @@ private Optional emitLegacyState() { } } - private boolean isResetAllStreamsInCatalog(final WorkerSourceConfig sourceConfig) { + private boolean resettingAllCatalogStreams(final WorkerSourceConfig sourceConfig) { final Set catalogStreamDescriptors = sourceConfig.getCatalog().getStreams().stream().map( configuredAirbyteStream -> new StreamDescriptor() .withName(configuredAirbyteStream.getStream().getName()) .withNamespace(configuredAirbyteStream.getStream().getNamespace())) .collect(Collectors.toSet()); - final Set configStreamDescriptors = new HashSet<>(streamsToReset); - - return catalogStreamDescriptors.equals(configStreamDescriptors); + final Set streamsToResetDescriptors = new HashSet<>(streamsToReset); + return streamsToResetDescriptors.containsAll(catalogStreamDescriptors); } private AirbyteMessage getNullStreamStateMessage(final StreamDescriptor streamsToReset) { diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java index 5c5d90442e61..be2b61c9afd7 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java @@ -302,8 +302,10 @@ public void testPerStreamWithMissingState() throws Exception { Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } + // In the LEGACY state, if the list of streams passed in to be reset does not include every stream + // in the Catalog, then something has gone wrong and we should throw an error @Test - public void testLegacyWithNewConfigMissingStream() { + public void testLegacyWithMissingCatalogStream() { final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); @@ -327,13 +329,53 @@ public void testLegacyWithNewConfigMissingStream() { } + // If there are extra streams to reset that do not exist in the Catalog, the reset should work + // properly with all streams being reset + @Test + public void testLegacyWithResettingExtraStreamNotInCatalog() throws Exception { + final List streamToResetWithExtra = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c", "d")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToResetWithExtra); + final ConfiguredAirbyteCatalog airbyteCatalog = new ConfiguredAirbyteCatalog() + .withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withState(new State() + .withState(Jsons.emptyObject())) + .withCatalog(airbyteCatalog); + + emptyAirbyteSource.start(workerSourceConfig, null); + + final Optional maybeMessage = emptyAirbyteSource.attemptRead(); + Assertions.assertThat(maybeMessage) + .isNotEmpty(); + + final AirbyteMessage message = maybeMessage.get(); + Assertions.assertThat(message.getType()).isEqualTo(Type.STATE); + + final AirbyteStateMessage stateMessage = message.getState(); + Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.LEGACY); + Assertions.assertThat(stateMessage.getData()).isEqualTo(Jsons.emptyObject()); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + + Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); + + } + @Test public void testLegacyWithNewConfig() throws Exception { final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() .withStreamsToReset(streamToReset); - final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog() + final ConfiguredAirbyteCatalog airbyteCatalog = new ConfiguredAirbyteCatalog() .withStreams(Lists.newArrayList( new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), @@ -343,7 +385,7 @@ public void testLegacyWithNewConfig() throws Exception { .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) .withState(new State() .withState(Jsons.emptyObject())) - .withCatalog(airbyteCatalogWithExtraStream); + .withCatalog(airbyteCatalog); emptyAirbyteSource.start(workerSourceConfig, null);