Skip to content

Commit

Permalink
wip; need stream list from reset for this to work
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Jun 19, 2022
1 parent 5da035c commit 354c6e2
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,64 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(final String
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE);
}

/**
* Converts a {@link ConfiguredAirbyteCatalog} into an {@link AirbyteCatalog}. This is possible
* because the latter is a subset of the former.
*
* @param configuredCatalog - catalog to convert
* @return - airbyte catalog
*/
public static AirbyteCatalog configuredCatalogToCatalog(final ConfiguredAirbyteCatalog configuredCatalog) {
return new AirbyteCatalog().withStreams(
configuredCatalog.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.collect(Collectors.toList()));
}

/**
* Extracts {@link StreamDescriptor} for a given {@link AirbyteStream}
*
* @param airbyteStream stream
* @return stream descriptor
*/
public static StreamDescriptor extractDescriptor(final ConfiguredAirbyteStream airbyteStream) {
return extractDescriptor(airbyteStream.getStream());
}

/**
* Extracts {@link StreamDescriptor} for a given {@link ConfiguredAirbyteStream}
*
* @param airbyteStream stream
* @return stream descriptor
*/
public static StreamDescriptor extractDescriptor(final AirbyteStream airbyteStream) {
return new StreamDescriptor().withName(airbyteStream.getName()).withNamespace(airbyteStream.getNamespace());
}

/**
* Extracts {@link StreamDescriptor}s for each stream in a given {@link ConfiguredAirbyteCatalog}
*
* @param configuredCatalog catalog
* @return list of stream descriptors
*/
public static List<StreamDescriptor> extractStreamDescriptors(final ConfiguredAirbyteCatalog configuredCatalog) {
return extractStreamDescriptors(configuredCatalogToCatalog(configuredCatalog));
}

/**
* Extracts {@link StreamDescriptor}s for each stream in a given {@link AirbyteCatalog}
*
* @param catalog catalog
* @return list of stream descriptors
*/
public static List<StreamDescriptor> extractStreamDescriptors(final AirbyteCatalog catalog) {
return catalog.getStreams()
.stream()
.map(abStream -> new StreamDescriptor().withName(abStream.getName()).withNamespace(abStream.getNamespace()))
.toList();
}

/**
* Convert a Catalog into a ConfiguredCatalog. This applies minimum default to the Catalog to make
* it a valid ConfiguredCatalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.SourceDefinitionRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.api.model.generated.SynchronousJobRead;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.models.Attempt;
Expand All @@ -41,12 +45,11 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

public class JobConverter {

private static final int LOG_TAIL_SIZE = 1000000;

private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;

Expand All @@ -58,13 +61,13 @@ public JobConverter(final WorkerEnvironment workerEnvironment, final LogConfigs
public JobInfoRead getJobInfoRead(final Job job) {
return new JobInfoRead()
.job(getJobWithAttemptsRead(job).getJob())
.attempts(job.getAttempts().stream().map(attempt -> getAttemptInfoRead(attempt)).collect(Collectors.toList()));
.attempts(job.getAttempts().stream().map(this::getAttemptInfoRead).collect(Collectors.toList()));
}

public JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
final SourceDefinitionRead sourceDefinitionRead,
final DestinationDefinitionRead destinationDefinitionRead,
final AirbyteVersion airbyteVersion) {
public static JobDebugRead getDebugJobInfoRead(final JobInfoRead jobInfoRead,
final SourceDefinitionRead sourceDefinitionRead,
final DestinationDefinitionRead destinationDefinitionRead,
final AirbyteVersion airbyteVersion) {
return new JobDebugRead()
.id(jobInfoRead.getJob().getId())
.configId(jobInfoRead.getJob().getConfigId())
Expand All @@ -84,10 +87,45 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) {
.id(job.getId())
.configId(configId)
.configType(configType)
.streams(extractStreamNamesFromCatalogIfSync(job).orElse(null))
.createdAt(job.getCreatedAtInSecond())
.updatedAt(job.getUpdatedAtInSecond())
.status(Enums.convertTo(job.getStatus(), JobStatus.class)))
.attempts(job.getAttempts().stream().map(attempt -> getAttemptRead(attempt)).collect(Collectors.toList()));
.attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).collect(Collectors.toList()));
}

/**
* If the job is of type SYNC or RESET, extracts the stream names that were in the catalog.
* Otherwise, returns an empty optional.
*
* @param job - job whose streams to extract
* @return stream descriptors of streams in the catalog used by job
*/
private static Optional<List<StreamDescriptor>> extractStreamNamesFromCatalogIfSync(final Job job) {
return extractCatalogIfSyncOrReset(job)
.map(CatalogHelpers::extractStreamDescriptors)
.map(streamDescriptor -> streamDescriptor.stream().map(ProtocolConverters::streamDescriptorToApi).toList());
}

/**
* If the job is of type SYNC or RESET, extracts the configured catalog. Otherwise, returns an empty
* optional.
*
* @param job - job whose catalog to extract
* @return catalog used by job
*/
private static Optional<ConfiguredAirbyteCatalog> extractCatalogIfSyncOrReset(final Job job) {
if (job.getConfigType() == ConfigType.SYNC) {
return Optional.of(job.getConfig()
.getSync()
.getConfiguredAirbyteCatalog());
} else if (job.getConfigType() == ConfigType.RESET_CONNECTION) {
return Optional.of(job.getConfig()
.getResetConnection()
.getConfiguredAirbyteCatalog());
} else {
return Optional.empty();
}
}

public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.converters;

import io.airbyte.api.model.generated.StreamDescriptor;

/**
* Utilities that convert protocol types into API representations of the protocol type.
*/
public class ProtocolConverters {

public static StreamDescriptor streamDescriptorToApi(final io.airbyte.protocol.models.StreamDescriptor protocolStreamDescriptor) {
return new StreamDescriptor().name(protocolStreamDescriptor.getName()).namespace(protocolStreamDescriptor.getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private JobDebugInfoRead buildJobDebugInfoRead(final JobInfoRead jobInfoRead)
final DestinationRead destination = getDestinationRead(connection);
final SourceDefinitionRead sourceDefinitionRead = getSourceDefinitionRead(source);
final DestinationDefinitionRead destinationDefinitionRead = getDestinationDefinitionRead(destination);
final JobDebugRead jobDebugRead = jobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion);
final JobDebugRead jobDebugRead = JobConverter.getDebugJobInfoRead(jobInfoRead, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion);

return new JobDebugInfoRead()
.attempts(jobInfoRead.getAttempts())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,33 @@
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.LogRead;
import io.airbyte.api.model.generated.SourceDefinitionRead;
import io.airbyte.api.model.generated.StreamDescriptor;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.JobCheckConnectionConfig;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobOutput.OutputType;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
Expand All @@ -60,10 +66,7 @@ class JobConverterTest {
private static final String JOB_CONFIG_ID = "123";
private static final JobStatus JOB_STATUS = JobStatus.RUNNING;
private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING;
private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE;
private static final JobConfig JOB_CONFIG = new JobConfig()
.withConfigType(CONFIG_TYPE)
.withCheckConnection(new JobCheckConnectionConfig());
private static final JobConfig.ConfigType CONFIG_TYPE = ConfigType.SYNC;
private static final Path LOG_PATH = Path.of("log_path");
private static final long CREATED_AT = System.currentTimeMillis() / 1000;
private static final long RECORDS_EMITTED = 15L;
Expand All @@ -76,6 +79,12 @@ class JobConverterTest {
private static final String FAILURE_STACKTRACE = "stacktrace";
private static final boolean PARTIAL_SUCCESS = false;

private static final JobConfig JOB_CONFIG = new JobConfig()
.withConfigType(CONFIG_TYPE)
.withSync(new JobSyncConfig().withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("users")),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("accounts"))))));

private static final JobOutput JOB_OUTPUT = new JobOutput()
.withOutputType(OutputType.SYNC)
.withSync(new StandardSyncOutput()
Expand Down Expand Up @@ -103,8 +112,11 @@ class JobConverterTest {
.job(new JobRead()
.id(JOB_ID)
.configId(JOB_CONFIG_ID)
.streams(List.of(
new StreamDescriptor().name("users"),
new StreamDescriptor().name("accounts")))
.status(io.airbyte.api.model.generated.JobStatus.RUNNING)
.configType(JobConfigType.CHECK_CONNECTION_SOURCE)
.configType(JobConfigType.SYNC)
.createdAt(CREATED_AT)
.updatedAt(CREATED_AT))
.attempts(Lists.newArrayList(new AttemptInfoRead()
Expand Down Expand Up @@ -149,7 +161,7 @@ class JobConverterTest {
.id(JOB_ID)
.configId(JOB_CONFIG_ID)
.status(io.airbyte.api.model.generated.JobStatus.RUNNING)
.configType(JobConfigType.CHECK_CONNECTION_SOURCE)
.configType(JobConfigType.SYNC)
.airbyteVersion(airbyteVersion.serialize())
.sourceDefinition(sourceDefinitionRead)
.destinationDefinition(destinationDefinitionRead);
Expand Down Expand Up @@ -198,17 +210,17 @@ public void testGetJobInfoRead() {

@Test
public void testGetDebugJobInfoRead() {
assertEquals(JOB_DEBUG_INFO, jobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion));
assertEquals(JOB_DEBUG_INFO, JobConverter.getDebugJobInfoRead(JOB_INFO, sourceDefinitionRead, destinationDefinitionRead, airbyteVersion));
}

@Test
public void testGetJobWithAttemptsRead() {
assertEquals(JOB_WITH_ATTEMPTS_READ, jobConverter.getJobWithAttemptsRead(job));
assertEquals(JOB_WITH_ATTEMPTS_READ, JobConverter.getJobWithAttemptsRead(job));
}

@Test
public void testGetJobRead() {
final JobWithAttemptsRead jobReadActual = jobConverter.getJobWithAttemptsRead(job);
final JobWithAttemptsRead jobReadActual = JobConverter.getJobWithAttemptsRead(job);
assertEquals(JOB_WITH_ATTEMPTS_READ, jobReadActual);
}

Expand Down

0 comments on commit 354c6e2

Please sign in to comment.