diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index fa007180851c..4d406140c2df 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -135,8 +135,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig validateConfig(integration.spec().getConnectionSpecification(), config, "READ"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); - final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null)); - try (messageIterator) { + try (final AutoCloseableIterator messageIterator = source.read(config, catalog, stateOptional.orElse(null))) { AirbyteSentry.executeWithTracing("ReadSource", () -> messageIterator.forEachRemaining(outputRecordCollector::accept)); } } @@ -145,8 +144,9 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig final JsonNode config = parseConfig(parsed.getConfigPath()); validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector); - AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer)); + try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) { + AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer)); + } } default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); } @@ -159,16 +159,14 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc // use a Scanner that only processes new line characters to strictly abide with the // https://jsonlines.org/ standard final Scanner input = new Scanner(System.in).useDelimiter("[\r\n]+"); - try (consumer) { - consumer.start(); - while (input.hasNext()) { - final String inputString = input.next(); - final Optional messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class); - if (messageOptional.isPresent()) { - consumer.accept(messageOptional.get()); - } else { - LOGGER.error("Received invalid message: " + inputString); - } + consumer.start(); + while (input.hasNext()) { + final String inputString = input.next(); + final Optional messageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class); + if (messageOptional.isPresent()) { + consumer.accept(messageOptional.get()); + } else { + LOGGER.error("Received invalid message: " + inputString); } } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java index 4e25b66d16e0..d66c4b87bfd3 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/ssh/SshWrappedSource.java @@ -14,9 +14,12 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SshWrappedSource implements Source { + private static final Logger LOGGER = LoggerFactory.getLogger(SshWrappedSource.class); private final Source delegate; private final List hostKey; private final List portKey; @@ -46,7 +49,15 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception { public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) throws Exception { final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey); - return AutoCloseableIterators.appendOnClose(delegate.read(tunnel.getConfigInTunnel(), catalog, state), tunnel::close); + final AutoCloseableIterator delegateRead; + try { + delegateRead = delegate.read(tunnel.getConfigInTunnel(), catalog, state); + } catch (final Exception e) { + LOGGER.error("Exception occurred while getting the delegate read iterator, closing SSH tunnel", e); + tunnel.close(); + throw e; + } + return AutoCloseableIterators.appendOnClose(delegateRead, tunnel::close); } } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 83156e849f9e..c2670ed5b6e9 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -241,14 +241,13 @@ void testDestinationConsumerLifecycleSuccess() throws Exception { + Jsons.serialize(message2) + "\n" + Jsons.serialize(stateMessage)).getBytes())); - final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class); - IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock); - - final InOrder inOrder = inOrder(airbyteMessageConsumerMock); - inOrder.verify(airbyteMessageConsumerMock).accept(message1); - inOrder.verify(airbyteMessageConsumerMock).accept(message2); - inOrder.verify(airbyteMessageConsumerMock).accept(stateMessage); - inOrder.verify(airbyteMessageConsumerMock).close(); + try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) { + IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock); + final InOrder inOrder = inOrder(airbyteMessageConsumerMock); + inOrder.verify(airbyteMessageConsumerMock).accept(message1); + inOrder.verify(airbyteMessageConsumerMock).accept(message2); + inOrder.verify(airbyteMessageConsumerMock).accept(stateMessage); + } } @Test @@ -267,15 +266,13 @@ void testDestinationConsumerLifecycleFailure() throws Exception { .withEmittedAt(EMITTED_AT)); System.setIn(new ByteArrayInputStream((Jsons.serialize(message1) + "\n" + Jsons.serialize(message2)).getBytes())); - final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class); - doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(message1); - - assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock)); - - final InOrder inOrder = inOrder(airbyteMessageConsumerMock); - inOrder.verify(airbyteMessageConsumerMock).accept(message1); - inOrder.verify(airbyteMessageConsumerMock).close(); - inOrder.verifyNoMoreInteractions(); + try (final AirbyteMessageConsumer airbyteMessageConsumerMock = mock(AirbyteMessageConsumer.class)) { + doThrow(new IOException("error")).when(airbyteMessageConsumerMock).accept(message1); + assertThrows(IOException.class, () -> IntegrationRunner.consumeWriteStream(airbyteMessageConsumerMock)); + final InOrder inOrder = inOrder(airbyteMessageConsumerMock); + inOrder.verify(airbyteMessageConsumerMock).accept(message1); + inOrder.verifyNoMoreInteractions(); + } } } diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java index 7a200b0e3d46..2f9db7b7b349 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java @@ -115,18 +115,18 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { final JsonNode jdbcConfig = toDatabaseConfig(config); final JdbcDatabase database = Databases.createJdbcDatabase( - jdbcConfig.get("username").asText(), - jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, - jdbcConfig.get("jdbc_url").asText(), - driverClass, - jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null, - sourceOperations); + jdbcConfig.get("username").asText(), + jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, + jdbcConfig.get("jdbc_url").asText(), + driverClass, + jdbcConfig.has("connection_properties") ? jdbcConfig.get("connection_properties").asText() : null, + sourceOperations); quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); return new CockroachJdbcDatabase(database, sourceOperations); } - + private CheckedFunction getPrivileges(JdbcDatabase database) { return connection -> { final PreparedStatement ps = connection.prepareStatement( diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java index 9df77e4e0af5..0aa9572cb5a2 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java @@ -9,10 +9,6 @@ import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.db.JdbcCompatibleSourceOperations; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration; - -import javax.sql.DataSource; - import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; @@ -22,17 +18,15 @@ import java.util.stream.Stream; /** - * This implementation uses non-streamed queries to CockroachDB. CockroachDB - * does not currently support multiple active pgwire portals on the same session, - * which makes it impossible to replicate tables that have over ~1000 rows - * using StreamingJdbcDatabase. See: https://go.crdb.dev/issue-v/40195/v21.2 - * and in particular, the comment: - * https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351 - * The same situation as kafka-connect applies to StreamingJdbcDatabase + * This implementation uses non-streamed queries to CockroachDB. CockroachDB does not currently + * support multiple active pgwire portals on the same session, which makes it impossible to + * replicate tables that have over ~1000 rows using StreamingJdbcDatabase. See: + * https://go.crdb.dev/issue-v/40195/v21.2 and in particular, the comment: + * https://github.com/cockroachdb/cockroach/issues/40195?version=v21.2#issuecomment-870570351 The + * same situation as kafka-connect applies to StreamingJdbcDatabase */ public class CockroachJdbcDatabase - extends JdbcDatabase -{ + extends JdbcDatabase { private final JdbcDatabase database;