diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java index 7e380c59c7fc..8c342cecb3cc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/EmptyAirbyteSource.java @@ -83,16 +83,22 @@ public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoo */ isResetBasedForConfig = false; } else { - stateWrapper = StateMessageHelper.getTypedState(workerSourceConfig.getState().getState(), useStreamCapableState); + if (workerSourceConfig.getState() != null) { + stateWrapper = StateMessageHelper.getTypedState(workerSourceConfig.getState().getState(), useStreamCapableState); - if (stateWrapper.isPresent() && - stateWrapper.get().getStateType() == StateType.LEGACY && - !isResetAllStreamsInCatalog(workerSourceConfig)) { - log.error("The state a legacy one but we are trying to do a partial update, this is not supported."); - throw new IllegalStateException("Try to perform a partial reset on a legacy state"); + if (stateWrapper.isPresent() && + stateWrapper.get().getStateType() == StateType.LEGACY && + !isResetAllStreamsInCatalog(workerSourceConfig)) { + log.error("The state a legacy one but we are trying to do a partial update, this is not supported."); + throw new IllegalStateException("Try to perform a partial reset on a legacy state"); + } + + isResetBasedForConfig = true; + } else { + /// No state + isResetBasedForConfig = false; } - isResetBasedForConfig = true; } } isStarted = true; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java index df9743cf0785..928a9ee9995e 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java @@ -352,6 +352,39 @@ public void testLegacyWithNewConfig() throws Exception { .isEmpty(); } + @Test + public void testLegacyWithNullState() throws Exception { + final List streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c")); + + final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration() + .withStreamsToReset(streamToReset); + final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog() + .withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + + final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() + .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) + .withCatalog(airbyteCatalogWithExtraStream); + + emptyAirbyteSource.start(workerSourceConfig, null); + + final Optional maybeMessage = emptyAirbyteSource.attemptRead(); + Assertions.assertThat(maybeMessage) + .isNotEmpty(); + + final AirbyteMessage message = maybeMessage.get(); + Assertions.assertThat(message.getType()).isEqualTo(Type.STATE); + + final AirbyteStateMessage stateMessage = message.getState(); + Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.LEGACY); + Assertions.assertThat(stateMessage.getData()).isEqualTo(Jsons.emptyObject()); + + Assertions.assertThat(emptyAirbyteSource.attemptRead()) + .isEmpty(); + } + private void testReceiveNullStreamState(final StreamDescriptor streamDescriptor) { final Optional maybeMessage = emptyAirbyteSource.attemptRead(); Assertions.assertThat(maybeMessage)