Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-common-workers: Collect trace message on failed connection_status #20721

Merged
merged 29 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ddc362a
set failure reason when exit code is 0
alafanechere Dec 20, 2022
93a5e8c
format
alafanechere Dec 20, 2022
f0b6906
new approach
alafanechere Dec 21, 2022
2b4f99d
explicit is better than implicit
alafanechere Dec 27, 2022
e2b238d
Merge branch 'master' into augustin/check/collect-trace-message-on-0-…
alafanechere Dec 27, 2022
74d4a7b
use setters
alafanechere Dec 27, 2022
393e773
Merge branch 'augustin/check/collect-trace-message-on-0-exit' of gith…
alafanechere Dec 28, 2022
993fa9a
write discover tests
alafanechere Dec 28, 2022
2bf5ff5
write discover tests
alafanechere Dec 28, 2022
1f6c310
write check tests
alafanechere Dec 28, 2022
59648af
Merge branch 'master' into augustin/check/collect-trace-message-on-0-…
alafanechere Jan 3, 2023
51d8da1
add stderr to exception
alafanechere Jan 3, 2023
9fa3c99
add stderr to exception
alafanechere Jan 3, 2023
effb72f
Merge branch 'master' into augustin/check/collect-trace-message-on-0-…
alafanechere Jan 3, 2023
b9586a9
fix exit code
alafanechere Jan 3, 2023
e5a04a8
Merge branch 'master' into augustin/check/collect-trace-message-on-0-…
alafanechere Jan 4, 2023
1316d82
Merge branch 'master' into augustin/check/collect-trace-message-on-0-…
pedroslopez Jan 11, 2023
41dda91
fix spotbugs error
pedroslopez Jan 11, 2023
995157d
add missing method signature update after merge
pedroslopez Jan 11, 2023
0673e4e
fix pmd warning
pedroslopez Jan 11, 2023
1f2138e
return check job output if present, regardless of failure status
pedroslopez Jan 11, 2023
d401117
introduce utils throwWorkerException
alafanechere Jan 12, 2023
db2f9f4
log warning on non 0 status code
alafanechere Jan 12, 2023
cd4ad00
Merge branch 'master' into augustin/check/collect-trace-message-on-0-…
alafanechere Jan 12, 2023
f14fed3
format
alafanechere Jan 12, 2023
c8998a9
Merge branch 'augustin/check/collect-trace-message-on-0-exit' of gith…
alafanechere Jan 12, 2023
f0ebcdf
consistent error message
alafanechere Jan 12, 2023
34fcadd
Merge branch 'master' into augustin/check/collect-trace-message-on-0-…
alafanechere Jan 12, 2023
2a0856c
fix failing test
pedroslopez Jan 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.workers;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.commons.io.IOs;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardSyncInput;
Expand All @@ -20,6 +20,12 @@
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.helper.FailureHelper.ConnectorCommand;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand Down Expand Up @@ -111,6 +117,14 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa
.withState(sync.getState());
}

private static ConnectorCommand getConnectorCommandFromOutputType(final OutputType outputType) {
return switch (outputType) {
case SPEC -> ConnectorCommand.SPEC;
case CHECK_CONNECTION -> ConnectorCommand.CHECK;
case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER;
};
}

public static Optional<AirbyteControlConnectorConfigMessage> getMostRecentConfigControlMessage(final Map<Type, List<AirbyteMessage>> messagesByType) {
return messagesByType.getOrDefault(Type.CONTROL, new ArrayList<>()).stream()
.map(AirbyteMessage::getControl)
Expand All @@ -119,28 +133,35 @@ public static Optional<AirbyteControlConnectorConfigMessage> getMostRecentConfig
.reduce((first, second) -> second);
}

public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType,
final Map<Type, List<AirbyteMessage>> messagesByType,
final String defaultErrorMessage)
throws WorkerException {
final Optional<AirbyteTraceMessage> traceMessage =
messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream()
.map(AirbyteMessage::getTrace)
.filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR)
.findFirst();
private static Optional<AirbyteTraceMessage> getTraceMessageFromMessagesByType(final Map<Type, List<AirbyteMessage>> messagesByType) {
return messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream()
.map(AirbyteMessage::getTrace)
.filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR)
.findFirst();
}

public static Map<Type, List<AirbyteMessage>> getMessagesByType(final Process process, final AirbyteStreamFactory streamFactory, final int timeOut)
throws IOException {
final Map<Type, List<AirbyteMessage>> messagesByType;
try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

WorkerUtils.gentleClose(process, timeOut, TimeUnit.MINUTES);
return messagesByType;
}
}

public static Optional<FailureReason> getJobFailureReasonFromMessages(final OutputType outputType,
final Map<Type, List<AirbyteMessage>> messagesByType) {
final Optional<AirbyteTraceMessage> traceMessage = getTraceMessageFromMessagesByType(messagesByType);
if (traceMessage.isPresent()) {
final ConnectorCommand connectorCommand = switch (outputType) {
case SPEC -> ConnectorCommand.SPEC;
case CHECK_CONNECTION -> ConnectorCommand.CHECK;
case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER;
};

final FailureReason failureReason = FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand);
return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason);
final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType);
return Optional.of(FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand));
} else {
return Optional.empty();
}

throw new WorkerException(defaultErrorMessage);
}

public static Map<AirbyteStreamNameNamespacePair, JsonNode> mapStreamNamesToSchemas(final StandardSyncInput syncInput) {
Expand All @@ -151,4 +172,25 @@ public static Map<AirbyteStreamNameNamespacePair, JsonNode> mapStreamNamesToSche

}

public static String getStdErrFromErrorStream(final InputStream errorStream) throws IOException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream, StandardCharsets.UTF_8));
final StringBuilder errorOutput = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
errorOutput.append(line);
errorOutput.append(System.lineSeparator());
}
return errorOutput.toString();
}

public static void throwWorkerException(final String errorMessage, final Process process)
throws WorkerException, IOException {
final String stderr = getStdErrFromErrorStream(process.getErrorStream());
if (stderr.isEmpty()) {
throw new WorkerException(errorMessage);
} else {
throw new WorkerException(errorMessage + ": \n" + stderr);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

import datadog.trace.api.Trace;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
Expand All @@ -29,14 +29,11 @@
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,18 +70,12 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(input.getConnectionConfiguration()));

LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType;
try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));
final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION);

WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES);
}
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final int exitCode = process.exitValue();
final Optional<AirbyteConnectionStatus> status = messagesByType
final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);
final Optional<AirbyteConnectionStatus> connectionStatus = messagesByType
.getOrDefault(Type.CONNECTION_STATUS, new ArrayList<>()).stream()
.map(AirbyteMessage::getConnectionStatus)
.findFirst();
erohmensing marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -104,25 +95,30 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa
});
}

if (status.isPresent() && exitCode == 0) {
final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);

final int exitCode = process.exitValue();
if (exitCode != 0) {
LOGGER.warn("Check connection job subprocess finished with exit code {}", exitCode);
}

if (connectionStatus.isPresent()) {
final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput()
.withStatus(Enums.convertTo(status.get().getStatus(), Status.class))
.withMessage(status.get().getMessage());

LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode);
LOGGER.debug("Check connection job received output: {}", output);
LineGobbler.endSection("CHECK");
return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output);
} else {
final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode);
LOGGER.error(message);

return WorkerUtils.getJobFailureOutputOrThrow(OutputType.CHECK_CONNECTION, messagesByType, message);
.withStatus(Enums.convertTo(connectionStatus.get().getStatus(), Status.class))
.withMessage(connectionStatus.get().getMessage());
LOGGER.info("Check connection job received output: {}", output);
jobOutput.setCheckConnection(output);
erohmensing marked this conversation as resolved.
Show resolved Hide resolved
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Error checking connection status: no status nor failure reason were outputted", process);
}
LineGobbler.endSection("CHECK");
return jobOutput;

} catch (final Exception e) {
ApmTraceUtils.addExceptionToTrace(e);
LOGGER.error("Unexpected error while checking connection: ", e);
LineGobbler.endSection("CHECK");
throw new WorkerException("Unexpected error while getting checking connection.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.metrics.lib.ApmTraceUtils;
Expand All @@ -29,16 +29,13 @@
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,16 +77,10 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(discoverSchemaInput.getConnectionConfiguration()));

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType;

try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES);
}
final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);

final Optional<AirbyteCatalog> catalog = messagesByType
.getOrDefault(Type.CATALOG, new ArrayList<>()).stream()
Expand All @@ -102,26 +93,27 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI
UUID.fromString(discoverSchemaInput.getSourceId()),
configMessage.getConfig()));

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);

final int exitCode = process.exitValue();
if (exitCode == 0) {
if (catalog.isEmpty()) {
throw new WorkerException("Integration failed to output a catalog struct.");
}
if (exitCode != 0) {
LOGGER.warn("Discover job subprocess finished with exit codee {}", exitCode);
}

if (catalog.isPresent()) {
final UUID catalogId =
configRepository.writeActorCatalogFetchEvent(catalog.get(),
// NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce
// it, so we check again here.
discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()),
discoverSchemaInput.getConnectorVersion(),
discoverSchemaInput.getConfigHash());
return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID).withDiscoverCatalogId(catalogId);
} else {
return WorkerUtils.getJobFailureOutputOrThrow(
OutputType.DISCOVER_CATALOG_ID,
messagesByType,
String.format("Discover job subprocess finished with exit code %s", exitCode));
jobOutput.setDiscoverCatalogId(catalogId);
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process);
}
return jobOutput;
} catch (final WorkerException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.ConnectorJobOutput.OutputType;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -23,14 +23,11 @@
import io.airbyte.workers.internal.AirbyteStreamFactory;
import io.airbyte.workers.internal.DefaultAirbyteStreamFactory;
import io.airbyte.workers.process.IntegrationLauncher;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,7 +37,6 @@ public class DefaultGetSpecWorker implements GetSpecWorker {

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;

private Process process;

public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher,
Expand All @@ -60,38 +56,31 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot)
try {
process = integrationLauncher.spec(jobRoot);

final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

final Map<Type, List<AirbyteMessage>> messagesByType;
try (final InputStream stdout = process.getInputStream()) {
messagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

// todo (cgardens) - let's pre-fetch the images outside of the worker so we don't need account for
// this.
// retrieving spec should generally be instantaneous, but since docker images might not be pulled
// it could take a while longer depending on internet conditions as well.
WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES);
}
final Map<Type, List<AirbyteMessage>> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30);

final Optional<ConnectorSpecification> spec = messagesByType
.getOrDefault(Type.SPEC, new ArrayList<>()).stream()
.map(AirbyteMessage::getSpec)
.findFirst();

final Optional<FailureReason> failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.SPEC, messagesByType);
failureReason.ifPresent(jobOutput::setFailureReason);

final int exitCode = process.exitValue();
if (exitCode == 0) {
if (spec.isEmpty()) {
throw new WorkerException("integration failed to output a spec struct.");
}

return new ConnectorJobOutput().withOutputType(OutputType.SPEC).withSpec(spec.get());
} else {
return WorkerUtils.getJobFailureOutputOrThrow(
OutputType.SPEC,
messagesByType,
String.format("Spec job subprocess finished with exit code %s", exitCode));
if (exitCode != 0) {
LOGGER.warn("Spec job subprocess finished with exit code {}", exitCode);
}

if (spec.isPresent()) {
jobOutput.setSpec(spec.get());
} else if (failureReason.isEmpty()) {
WorkerUtils.throwWorkerException("Integration failed to output a spec struct and did not output a failure reason", process);
}

return jobOutput;
} catch (final Exception e) {
throw new WorkerException(String.format("Error while getting spec from image %s", config.getDockerImage()), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<Sta
final CheckConnectionRead checkConnectionRead = new CheckConnectionRead()
.jobInfo(jobConverter.getSynchronousJobRead(response));

if (response.isSuccess()) {
if (response.getOutput() != null) {
checkConnectionRead
.status(Enums.convertTo(response.getOutput().getStatus(), StatusEnum.class))
.message(response.getOutput().getMessage());
Expand Down
Loading