diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 7213ca4a161a..33fcde8c1ae4 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -3729,7 +3729,13 @@ components: format: int64 status: $ref: "#/components/schemas/JobStatus" - streams: + resetConfig: + $ref: "#/components/schemas/ResetConfig" + ResetConfig: + type: object + description: contains information about how a reset was configured. only populated if the job was a reset. + properties: + streamsToReset: type: array items: $ref: "#/components/schemas/StreamDescriptor" diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 5ef450def3e0..2afa687f2ae8 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -80,7 +80,7 @@ public static AirbyteCatalog configuredCatalogToCatalog(final ConfiguredAirbyteC configuredCatalog.getStreams() .stream() .map(ConfiguredAirbyteStream::getStream) - .collect(Collectors.toList())); + .toList()); } /** @@ -122,7 +122,7 @@ public static List extractStreamDescriptors(final ConfiguredAi public static List extractStreamDescriptors(final AirbyteCatalog catalog) { return catalog.getStreams() .stream() - .map(abStream -> new StreamDescriptor().withName(abStream.getName()).withNamespace(abStream.getNamespace())) + .map(CatalogHelpers::extractDescriptor) .toList(); } @@ -138,7 +138,7 @@ public static ConfiguredAirbyteCatalog toDefaultConfiguredCatalog(final AirbyteC .withStreams(catalog.getStreams() .stream() .map(CatalogHelpers::toDefaultConfiguredStream) - .collect(Collectors.toList())); + .toList()); } public static ConfiguredAirbyteStream toDefaultConfiguredStream(final AirbyteStream stream) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 51c329881ea7..15a78246a450 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -21,11 +21,13 @@ import io.airbyte.api.model.generated.JobStatus; import io.airbyte.api.model.generated.JobWithAttemptsRead; import io.airbyte.api.model.generated.LogRead; +import io.airbyte.api.model.generated.ResetConfig; import io.airbyte.api.model.generated.SourceDefinitionRead; import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; @@ -41,12 +43,11 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; public class JobConverter { - private static final int LOG_TAIL_SIZE = 1000000; - private final WorkerEnvironment workerEnvironment; private final LogConfigs logConfigs; @@ -58,13 +59,13 @@ public JobConverter(final WorkerEnvironment workerEnvironment, final LogConfigs public JobInfoRead getJobInfoRead(final Job job) { return new JobInfoRead() .job(getJobWithAttemptsRead(job).getJob()) - .attempts(job.getAttempts().stream().map(attempt -> getAttemptInfoRead(attempt)).collect(Collectors.toList())); + .attempts(job.getAttempts().stream().map(this::getAttemptInfoRead).collect(Collectors.toList())); } - public JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead, - final SourceDefinitionRead sourceDefinitionRead, - final DestinationDefinitionRead destinationDefinitionRead, - final AirbyteVersion airbyteVersion) { + public static JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead, + final SourceDefinitionRead sourceDefinitionRead, + final DestinationDefinitionRead destinationDefinitionRead, + final AirbyteVersion airbyteVersion) { return new JobDebugRead() .id(jobInfoRead.getJob().getId()) .configId(jobInfoRead.getJob().getConfigId()) @@ -84,10 +85,30 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) { .id(job.getId()) .configId(configId) .configType(configType) + .resetConfig(extractResetConfigIfReset(job).orElse(null)) .createdAt(job.getCreatedAtInSecond()) .updatedAt(job.getUpdatedAtInSecond()) .status(Enums.convertTo(job.getStatus(), JobStatus.class))) - .attempts(job.getAttempts().stream().map(attempt -> getAttemptRead(attempt)).collect(Collectors.toList())); + .attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).toList()); + } + + /** + * If the job is of type RESET, extracts the part of the reset config that we expose in the API. + * Otherwise, returns empty optional. + * + * @param job - job + * @return api representation of reset config + */ + private static Optional extractResetConfigIfReset(final Job job) { + if (job.getConfigType() == ConfigType.RESET_CONNECTION) { + return Optional.ofNullable( + new ResetConfig().streamsToReset(job.getConfig().getResetConnection().getResetSourceConfiguration().getStreamsToReset() + .stream() + .map(ProtocolConverters::streamDescriptorToApi) + .toList())); + } else { + return Optional.empty(); + } } public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) { diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java b/airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java index b71771e76da9..671ff6939a0b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/ProtocolConverters.java @@ -11,6 +11,10 @@ */ public class ProtocolConverters { + public static StreamDescriptor streamDescriptorToApi(final io.airbyte.config.StreamDescriptor protocolStreamDescriptor) { + return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace()); + } + public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) { return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace()); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index ec005f4a35ac..a5fed32955f4 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -137,7 +137,7 @@ private JobDebugInfoRead buildJobDebugInfoRead(final JobInfoRead jobInfoRead) final DestinationRead destination = getDestinationRead(connection); final SourceDefinitionRead sourceDefinitionRead = getSourceDefinitionRead(source); final DestinationDefinitionRead destinationDefinitionRead = getDestinationDefinitionRead(destination); - final JobDebugRead jobDebugRead = jobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion); + final JobDebugRead jobDebugRead = JobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion); return new JobDebugInfoRead() .attempts(jobInfoRead.getAttempts()) diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java index 1ef32d8f919a..b7198b26458b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java @@ -25,28 +25,38 @@ import io.airbyte.api.model.generated.JobRead; import io.airbyte.api.model.generated.JobWithAttemptsRead; import io.airbyte.api.model.generated.LogRead; +import io.airbyte.api.model.generated.ResetConfig; import io.airbyte.api.model.generated.SourceDefinitionRead; +import io.airbyte.api.model.generated.StreamDescriptor; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.FailureReason.FailureType; -import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobConfig; +import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; import io.airbyte.config.JobOutput.OutputType; +import io.airbyte.config.JobResetConnectionConfig; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.ResetSourceConfiguration; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StreamSyncStats; import io.airbyte.config.SyncStats; import io.airbyte.config.helpers.LogConfigs; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.scheduler.models.Attempt; import io.airbyte.scheduler.models.AttemptStatus; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.models.JobStatus; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -60,10 +70,7 @@ class JobConverterTest { private static final String JOB_CONFIG_ID = "123"; private static final JobStatus JOB_STATUS = JobStatus.RUNNING; private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING; - private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE; - private static final JobConfig JOB_CONFIG = new JobConfig() - .withConfigType(CONFIG_TYPE) - .withCheckConnection(new JobCheckConnectionConfig()); + private static final JobConfig.ConfigType CONFIG_TYPE = ConfigType.SYNC; private static final Path LOG_PATH = Path.of("log_path"); private static final long CREATED_AT = System.currentTimeMillis() / 1000; private static final long RECORDS_EMITTED = 15L; @@ -76,6 +83,12 @@ class JobConverterTest { private static final String FAILURE_STACKTRACE = "stacktrace"; private static final boolean PARTIAL_SUCCESS = false; + private static final JobConfig JOB_CONFIG = new JobConfig() + .withConfigType(CONFIG_TYPE) + .withSync(new JobSyncConfig().withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("users")), + new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("accounts")))))); + private static final JobOutput JOB_OUTPUT = new JobOutput() .withOutputType(OutputType.SYNC) .withSync(new StandardSyncOutput() @@ -104,7 +117,7 @@ class JobConverterTest { .id(JOB_ID) .configId(JOB_CONFIG_ID) .status(io.airbyte.api.model.generated.JobStatus.RUNNING) - .configType(JobConfigType.CHECK_CONNECTION_SOURCE) + .configType(JobConfigType.SYNC) .createdAt(CREATED_AT) .updatedAt(CREATED_AT)) .attempts(Lists.newArrayList(new AttemptInfoRead() @@ -149,7 +162,7 @@ class JobConverterTest { .id(JOB_ID) .configId(JOB_CONFIG_ID) .status(io.airbyte.api.model.generated.JobStatus.RUNNING) - .configType(JobConfigType.CHECK_CONNECTION_SOURCE) + .configType(JobConfigType.SYNC) .airbyteVersion(airbyteVersion.serialize()) .sourceDefinition(sourceDefinitionRead) .destinationDefinition(destinationDefinitionRead); @@ -192,31 +205,56 @@ public void setUp() { } @Test - public void testGetJobInfoRead() { + void testGetJobInfoRead() { assertEquals(JOB_INFO, jobConverter.getJobInfoRead(job)); } @Test - public void testGetDebugJobInfoRead() { - assertEquals(JOB_DEBUG_INFO, jobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion)); + void testGetDebugJobInfoRead() { + assertEquals(JOB_DEBUG_INFO, JobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion)); } @Test - public void testGetJobWithAttemptsRead() { - assertEquals(JOB_WITH_ATTEMPTS_READ, jobConverter.getJobWithAttemptsRead(job)); + void testGetJobWithAttemptsRead() { + assertEquals(JOB_WITH_ATTEMPTS_READ, JobConverter.getJobWithAttemptsRead(job)); } @Test - public void testGetJobRead() { - final JobWithAttemptsRead jobReadActual = jobConverter.getJobWithAttemptsRead(job); + void testGetJobRead() { + final JobWithAttemptsRead jobReadActual = JobConverter.getJobWithAttemptsRead(job); assertEquals(JOB_WITH_ATTEMPTS_READ, jobReadActual); } @Test - public void testEnumConversion() { + void testEnumConversion() { assertTrue(Enums.isCompatible(JobConfig.ConfigType.class, JobConfigType.class)); assertTrue(Enums.isCompatible(JobStatus.class, io.airbyte.api.model.generated.JobStatus.class)); assertTrue(Enums.isCompatible(AttemptStatus.class, io.airbyte.api.model.generated.AttemptStatus.class)); } + // this test intentionally only looks at the reset config as the rest is the same here. + @Test + void testResetJobIncludesResetConfig() { + final JobConfig resetConfig = new JobConfig() + .withConfigType(ConfigType.RESET_CONNECTION) + .withResetConnection(new JobResetConnectionConfig().withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of( + new io.airbyte.config.StreamDescriptor().withName("users"), + new io.airbyte.config.StreamDescriptor().withName("accounts"))))); + final Job resetJob = new Job( + JOB_ID, + ConfigType.RESET_CONNECTION, + JOB_CONFIG_ID, + resetConfig, + Collections.emptyList(), + JobStatus.SUCCEEDED, + CREATED_AT, + CREATED_AT, + CREATED_AT); + + final ResetConfig expectedResetConfig = new ResetConfig().streamsToReset(List.of( + new StreamDescriptor().name("users"), + new StreamDescriptor().name("accounts"))); + assertEquals(expectedResetConfig, jobConverter.getJobInfoRead(resetJob).getJob().getResetConfig()); + } + } diff --git a/docs/reference/api/generated-api-html/index.html b/docs/reference/api/generated-api-html/index.html index 89a1478de4a4..fc6140f2ddb9 100644 --- a/docs/reference/api/generated-api-html/index.html +++ b/docs/reference/api/generated-api-html/index.html @@ -1145,14 +1145,16 @@

Example data

"job" : { "createdAt" : 6, "configId" : "configId", - "streams" : [ { - "name" : "name", - "namespace" : "namespace" - }, { - "name" : "name", - "namespace" : "namespace" - } ], "id" : 0, + "resetConfig" : { + "streamsToReset" : [ { + "name" : "name", + "namespace" : "namespace" + }, { + "name" : "name", + "namespace" : "namespace" + } ] + }, "updatedAt" : 1 }, "attempts" : [ { @@ -1477,14 +1479,16 @@

Example data

"job" : { "createdAt" : 6, "configId" : "configId", - "streams" : [ { - "name" : "name", - "namespace" : "namespace" - }, { - "name" : "name", - "namespace" : "namespace" - } ], "id" : 0, + "resetConfig" : { + "streamsToReset" : [ { + "name" : "name", + "namespace" : "namespace" + }, { + "name" : "name", + "namespace" : "namespace" + } ] + }, "updatedAt" : 1 }, "attempts" : [ { @@ -4060,14 +4064,16 @@

Example data

"job" : { "createdAt" : 6, "configId" : "configId", - "streams" : [ { - "name" : "name", - "namespace" : "namespace" - }, { - "name" : "name", - "namespace" : "namespace" - } ], "id" : 0, + "resetConfig" : { + "streamsToReset" : [ { + "name" : "name", + "namespace" : "namespace" + }, { + "name" : "name", + "namespace" : "namespace" + } ] + }, "updatedAt" : 1 }, "attempts" : [ { @@ -4462,14 +4468,16 @@

Example data

"job" : { "createdAt" : 6, "configId" : "configId", - "streams" : [ { - "name" : "name", - "namespace" : "namespace" - }, { - "name" : "name", - "namespace" : "namespace" - } ], "id" : 0, + "resetConfig" : { + "streamsToReset" : [ { + "name" : "name", + "namespace" : "namespace" + }, { + "name" : "name", + "namespace" : "namespace" + } ] + }, "updatedAt" : 1 }, "attempts" : [ { @@ -4636,14 +4644,16 @@

Example data

"job" : { "createdAt" : 6, "configId" : "configId", - "streams" : [ { - "name" : "name", - "namespace" : "namespace" - }, { - "name" : "name", - "namespace" : "namespace" - } ], "id" : 0, + "resetConfig" : { + "streamsToReset" : [ { + "name" : "name", + "namespace" : "namespace" + }, { + "name" : "name", + "namespace" : "namespace" + } ] + }, "updatedAt" : 1 }, "attempts" : [ { @@ -4743,14 +4753,16 @@

Example data

"job" : { "createdAt" : 6, "configId" : "configId", - "streams" : [ { - "name" : "name", - "namespace" : "namespace" - }, { - "name" : "name", - "namespace" : "namespace" - } ], "id" : 0, + "resetConfig" : { + "streamsToReset" : [ { + "name" : "name", + "namespace" : "namespace" + }, { + "name" : "name", + "namespace" : "namespace" + } ] + }, "updatedAt" : 1 }, "attempts" : [ { @@ -10619,6 +10631,7 @@

Table of Contents

  • PrivateSourceDefinitionRead -
  • PrivateSourceDefinitionReadList -
  • ReleaseStage -
  • +
  • ResetConfig -
  • ResourceRequirements -
  • SetInstancewideDestinationOauthParamsRequestBody -
  • SetInstancewideSourceOauthParamsRequestBody -
  • @@ -11360,7 +11373,7 @@

    JobRead - createdAt
    Long format: int64
    updatedAt
    Long format: int64
    status
    -
    streams (optional)
    +
    resetConfig (optional)
    @@ -11622,6 +11635,13 @@

    ReleaseStage -

    +
    +

    ResetConfig - Up

    +
    contains information about how a reset was configured. only populated if the job was a reset.
    +
    +
    streamsToReset (optional)
    +
    +

    ResourceRequirements - Up

    optional resource requirements to run workers (blank for unbounded allocations)