Skip to content

Commit

Permalink
add streams to reset to job info (#13919)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Jun 22, 2022
1 parent 5689483 commit 2378b87
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 71 deletions.
8 changes: 7 additions & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3729,7 +3729,13 @@ components:
format: int64
status:
$ref: "#/components/schemas/JobStatus"
streams:
resetConfig:
$ref: "#/components/schemas/ResetConfig"
ResetConfig:
type: object
description: contains information about how a reset was configured. only populated if the job was a reset.
properties:
streamsToReset:
type: array
items:
$ref: "#/components/schemas/StreamDescriptor"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static AirbyteCatalog configuredCatalogToCatalog(final ConfiguredAirbyteC
configuredCatalog.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.collect(Collectors.toList()));
.toList());
}

/**
Expand Down Expand Up @@ -122,7 +122,7 @@ public static List<StreamDescriptor> extractStreamDescriptors(final ConfiguredAi
public static List<StreamDescriptor> extractStreamDescriptors(final AirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.map(abStream -> new StreamDescriptor().withName(abStream.getName()).withNamespace(abStream.getNamespace()))
.map(CatalogHelpers::extractDescriptor)
.toList();
}

Expand All @@ -138,7 +138,7 @@ public static ConfiguredAirbyteCatalog toDefaultConfiguredCatalog(final AirbyteC
.withStreams(catalog.getStreams()
.stream()
.map(CatalogHelpers::toDefaultConfiguredStream)
.collect(Collectors.toList()));
.toList());
}

public static ConfiguredAirbyteStream toDefaultConfiguredStream(final AirbyteStream stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import io.airbyte.api.model.generated.JobStatus;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.ResetConfig;
import io.airbyte.api.model.generated.SourceDefinitionRead;
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
Expand All @@ -41,12 +43,11 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class JobConverter {

private static final int LOG_TAIL_SIZE = 1000000;

private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;

Expand All @@ -58,13 +59,13 @@ public JobConverter(final WorkerEnvironment workerEnvironment, final LogConfigs
public JobInfoRead getJobInfoRead(final Job job) {
return new JobInfoRead()
.job(getJobWithAttemptsRead(job).getJob())
.attempts(job.getAttempts().stream().map(attempt -> getAttemptInfoRead(attempt)).collect(Collectors.toList()));
.attempts(job.getAttempts().stream().map(this::getAttemptInfoRead).collect(Collectors.toList()));
}

public JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
final SourceDefinitionRead sourceDefinitionRead,
final DestinationDefinitionRead destinationDefinitionRead,
final AirbyteVersion airbyteVersion) {
public static JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
final SourceDefinitionRead sourceDefinitionRead,
final DestinationDefinitionRead destinationDefinitionRead,
final AirbyteVersion airbyteVersion) {
return new JobDebugRead()
.id(jobInfoRead.getJob().getId())
.configId(jobInfoRead.getJob().getConfigId())
Expand All @@ -84,10 +85,30 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) {
.id(job.getId())
.configId(configId)
.configType(configType)
.resetConfig(extractResetConfigIfReset(job).orElse(null))
.createdAt(job.getCreatedAtInSecond())
.updatedAt(job.getUpdatedAtInSecond())
.status(Enums.convertTo(job.getStatus(), JobStatus.class)))
.attempts(job.getAttempts().stream().map(attempt -> getAttemptRead(attempt)).collect(Collectors.toList()));
.attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).toList());
}

/**
* If the job is of type RESET, extracts the part of the reset config that we expose in the API.
* Otherwise, returns empty optional.
*
* @param job - job
* @return api representation of reset config
*/
private static Optional<ResetConfig> extractResetConfigIfReset(final Job job) {
if (job.getConfigType() == ConfigType.RESET_CONNECTION) {
return Optional.ofNullable(
new ResetConfig().streamsToReset(job.getConfig().getResetConnection().getResetSourceConfiguration().getStreamsToReset()
.stream()
.map(ProtocolConverters::streamDescriptorToApi)
.toList()));
} else {
return Optional.empty();
}
}

public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
*/
public class ProtocolConverters {

public static StreamDescriptor streamDescriptorToApi(final io.airbyte.config.StreamDescriptor protocolStreamDescriptor) {
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) {
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private JobDebugInfoRead buildJobDebugInfoRead(final JobInfoRead jobInfoRead)
final DestinationRead destination = getDestinationRead(connection);
final SourceDefinitionRead sourceDefinitionRead = getSourceDefinitionRead(source);
final DestinationDefinitionRead destinationDefinitionRead = getDestinationDefinitionRead(destination);
final JobDebugRead jobDebugRead = jobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion);
final JobDebugRead jobDebugRead = JobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion);

return new JobDebugInfoRead()
.attempts(jobInfoRead.getAttempts())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,38 @@
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.ResetConfig;
import io.airbyte.api.model.generated.SourceDefinitionRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobOutput.OutputType;
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -60,10 +70,7 @@ class JobConverterTest {
private static final String JOB_CONFIG_ID = "123";
private static final JobStatus JOB_STATUS = JobStatus.RUNNING;
private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING;
private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE;
private static final JobConfig JOB_CONFIG = new JobConfig()
.withConfigType(CONFIG_TYPE)
.withCheckConnection(new JobCheckConnectionConfig());
private static final JobConfig.ConfigType CONFIG_TYPE = ConfigType.SYNC;
private static final Path LOG_PATH = Path.of("log_path");
private static final long CREATED_AT = System.currentTimeMillis() / 1000;
private static final long RECORDS_EMITTED = 15L;
Expand All @@ -76,6 +83,12 @@ class JobConverterTest {
private static final String FAILURE_STACKTRACE = "stacktrace";
private static final boolean PARTIAL_SUCCESS = false;

private static final JobConfig JOB_CONFIG = new JobConfig()
.withConfigType(CONFIG_TYPE)
.withSync(new JobSyncConfig().withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("users")),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("accounts"))))));

private static final JobOutput JOB_OUTPUT = new JobOutput()
.withOutputType(OutputType.SYNC)
.withSync(new StandardSyncOutput()
Expand Down Expand Up @@ -104,7 +117,7 @@ class JobConverterTest {
.id(JOB_ID)
.configId(JOB_CONFIG_ID)
.status(io.airbyte.api.model.generated.JobStatus.RUNNING)
.configType(JobConfigType.CHECK_CONNECTION_SOURCE)
.configType(JobConfigType.SYNC)
.createdAt(CREATED_AT)
.updatedAt(CREATED_AT))
.attempts(Lists.newArrayList(new AttemptInfoRead()
Expand Down Expand Up @@ -149,7 +162,7 @@ class JobConverterTest {
.id(JOB_ID)
.configId(JOB_CONFIG_ID)
.status(io.airbyte.api.model.generated.JobStatus.RUNNING)
.configType(JobConfigType.CHECK_CONNECTION_SOURCE)
.configType(JobConfigType.SYNC)
.airbyteVersion(airbyteVersion.serialize())
.sourceDefinition(sourceDefinitionRead)
.destinationDefinition(destinationDefinitionRead);
Expand Down Expand Up @@ -192,31 +205,56 @@ public void setUp() {
}

@Test
public void testGetJobInfoRead() {
void testGetJobInfoRead() {
assertEquals(JOB_INFO, jobConverter.getJobInfoRead(job));
}

@Test
public void testGetDebugJobInfoRead() {
assertEquals(JOB_DEBUG_INFO, jobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion));
void testGetDebugJobInfoRead() {
assertEquals(JOB_DEBUG_INFO, JobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion));
}

@Test
public void testGetJobWithAttemptsRead() {
assertEquals(JOB_WITH_ATTEMPTS_READ, jobConverter.getJobWithAttemptsRead(job));
void testGetJobWithAttemptsRead() {
assertEquals(JOB_WITH_ATTEMPTS_READ, JobConverter.getJobWithAttemptsRead(job));
}

@Test
public void testGetJobRead() {
final JobWithAttemptsRead jobReadActual = jobConverter.getJobWithAttemptsRead(job);
void testGetJobRead() {
final JobWithAttemptsRead jobReadActual = JobConverter.getJobWithAttemptsRead(job);
assertEquals(JOB_WITH_ATTEMPTS_READ, jobReadActual);
}

@Test
public void testEnumConversion() {
void testEnumConversion() {
assertTrue(Enums.isCompatible(JobConfig.ConfigType.class, JobConfigType.class));
assertTrue(Enums.isCompatible(JobStatus.class, io.airbyte.api.model.generated.JobStatus.class));
assertTrue(Enums.isCompatible(AttemptStatus.class, io.airbyte.api.model.generated.AttemptStatus.class));
}

// this test intentionally only looks at the reset config as the rest is the same here.
@Test
void testResetJobIncludesResetConfig() {
final JobConfig resetConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(new JobResetConnectionConfig().withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(
new io.airbyte.config.StreamDescriptor().withName("users"),
new io.airbyte.config.StreamDescriptor().withName("accounts")))));
final Job resetJob = new Job(
JOB_ID,
ConfigType.RESET_CONNECTION,
JOB_CONFIG_ID,
resetConfig,
Collections.emptyList(),
JobStatus.SUCCEEDED,
CREATED_AT,
CREATED_AT,
CREATED_AT);

final ResetConfig expectedResetConfig = new ResetConfig().streamsToReset(List.of(
new StreamDescriptor().name("users"),
new StreamDescriptor().name("accounts")));
assertEquals(expectedResetConfig, jobConverter.getJobInfoRead(resetJob).getJob().getResetConfig());
}

}
Loading

0 comments on commit 2378b87

Please sign in to comment.