Skip to content

Commit

Permalink
Migrate ssh exception -> config error exception (#19094)
Browse files Browse the repository at this point in the history
* Update SshTunnel.java

Migrate ssh exception -> config error exception

* Unnest exception

* Address PR comments

* Add test case
  • Loading branch information
akashkulk authored Nov 8, 2022
1 parent c1a8169 commit 4fe8468
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,18 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
}
} catch (final Exception e) {
final String displayMessage = getDisplayMessage(e);
// Many of the exceptions thrown are nested inside layers of RuntimeExceptions. An attempt is made to
// find the root exception that corresponds to a configuration error. If that does not exist, we
// just return the original exception.
final Throwable rootThrowable = getRootConfigError(e);
final String displayMessage = getDisplayMessage(rootThrowable);
// If the source connector throws a config error, a trace message with the relevant message should
// be surfaced.
if (isConfigError(e)) {
if (isConfigError(rootThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage);
}
if (parsed.getCommand().equals(Command.CHECK)) {
// Currently, special handling is required for the SPEC case since the user display information in
// Currently, special handling is required for the CHECK case since the user display information in
// the trace message is
// not properly surfaced to the FE. In the future, we can remove this and just throw an exception.
outputRecordCollector
Expand All @@ -179,18 +183,34 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

private boolean isConfigError(final Exception e) {
/**
* Returns the first instance of an exception associated with a configuration error (if it exists).
* Otherwise, the original exception is returned.
*/
private Throwable getRootConfigError(final Exception e) {
Throwable current = e;
while (current != null) {
if (isConfigError(current)) {
return current;
} else {
current = current.getCause();
}
}
return e;
}

private boolean isConfigError(final Throwable e) {
return e instanceof ConfigErrorException || e instanceof ConnectionErrorException;
}

private String getDisplayMessage(final Exception e) {
private String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), e);
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else {
return "Could not connect with provided configuration. Error: " + e.getMessage();
return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : "";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
Expand Down Expand Up @@ -311,7 +312,7 @@ KeyPair getPrivateKeyPair() throws IOException, GeneralSecurityException {
if (keyPairs != null && keyPairs.iterator().hasNext()) {
return keyPairs.iterator().next();
}
throw new RuntimeException("Unable to load private key pairs, verify key pairs are properly inputted");
throw new ConfigErrorException("Unable to load private key pairs, verify key pairs are properly inputted");
}

private String validateKey() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.base;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -19,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterators;
Expand Down Expand Up @@ -222,6 +225,67 @@ void testRead() throws Exception {
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testReadException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.read(configPath, configuredCatalogPath,
statePath);
final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenThrow(configErrorException);

final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);

final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
final Throwable throwable = catchThrowable(() -> new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS));

assertThat(throwable).isInstanceOf(ConfigErrorException.class);
verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE);
}

@Test
void testCheckNestedException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Invalid configuration");
final ConfigErrorException configErrorException = new ConfigErrorException("Invalid configuration");
final RuntimeException runtimeException = new RuntimeException(new RuntimeException(configErrorException));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.check(CONFIG)).thenThrow(runtimeException);

final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);
final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testCheckRuntimeException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Runtime Error");
final RuntimeException runtimeException = new RuntimeException("Runtime Error");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.check(CONFIG)).thenThrow(runtimeException);

final ConnectorSpecification expectedConnSpec = mock(ConnectorSpecification.class);
when(source.spec()).thenReturn(expectedConnSpec);
when(expectedConnSpec.getConnectionSpecification()).thenReturn(CONFIG);
final JsonSchemaValidator jsonSchemaValidator = mock(JsonSchemaValidator.class);
new IntegrationRunner(cliParser, stdoutConsumer, null, source, jsonSchemaValidator).run(ARGS);

verify(source).check(CONFIG);
verify(stdoutConsumer).accept(new AirbyteMessage().withType(Type.CONNECTION_STATUS).withConnectionStatus(output));
verify(jsonSchemaValidator).validate(any(), any());
}

@Test
void testWrite() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.write(configPath, configuredCatalogPath);
Expand Down

0 comments on commit 4fe8468

Please sign in to comment.