Skip to content

Commit

Permalink
Exposing SSL-only version of Postgres Source (#6362)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Sep 27, 2021
1 parent d9adbd3 commit f308690
Show file tree
Hide file tree
Showing 22 changed files with 605 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.spec_modification;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.Source;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;

/**
* In some cases we want to prune or mutate the spec for an existing source. The common case is that
* we want to remove features that are not appropriate for some reason. e.g. In cloud, we do not
* want to allow users to send data unencrypted.
*/
public abstract class SpecModifyingSource implements Source {

private final Source source;

public SpecModifyingSource(final Source source) {
this.source = source;
}

public abstract ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception;

@Override
public ConnectorSpecification spec() throws Exception {
return modifySpec(source.spec());
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
return source.check(config);
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
return source.discover(config);
}

@Override
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state)
throws Exception {
return source.read(config, catalog, state);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public String getDriverClass() {
}

@Override
public String createTableQuery(String tableName, String columnClause, String primaryKeyClause) {
public String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) {
// ClickHouse requires Engine to be mentioned as part of create table query.
// Refer : https://clickhouse.tech/docs/en/engines/table-engines/ for more information
return String.format("CREATE TABLE %s(%s) %s",
Expand All @@ -56,12 +56,12 @@ public void tearDown() throws SQLException {
}

@Override
public String primaryKeyClause(List<String> columns) {
public String primaryKeyClause(final List<String> columns) {
if (columns.isEmpty()) {
return "";
}

StringBuilder clause = new StringBuilder();
final StringBuilder clause = new StringBuilder();
clause.append("(");
for (int i = 0; i < columns.size(); i++) {
clause.append(columns.get(i));
Expand Down Expand Up @@ -91,7 +91,7 @@ public void setup() throws Exception {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new ClickHouseSource();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public boolean supportsSchemas() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new CockroachDbSource();
}

Expand Down Expand Up @@ -360,7 +360,7 @@ void testReadMultipleTables() throws Exception {
@Test
void testReadMultipleTablesIncrementally() throws Exception {
final String tableName2 = TABLE_NAME + 2;
String streamName2 = streamName + 2;
final String streamName2 = streamName + 2;
database.execute(ctx -> {
ctx.createStatement().execute(
createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)",
Expand Down Expand Up @@ -493,7 +493,7 @@ void testDiscoverWithMultipleSchemas() throws Exception {
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))));

// sort streams by name so that we are comparing lists with the same order.
Comparator<AirbyteStream> schemaTableCompare = Comparator
final Comparator<AirbyteStream> schemaTableCompare = Comparator
.comparing(stream -> stream.getNamespace() + "." + stream.getName());
expected.getStreams().sort(schemaTableCompare);
actual.getStreams().sort(schemaTableCompare);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void setup() throws Exception {
public void clean() throws Exception {
// In Db2 before dropping a schema, all objects that were in that schema must be dropped or moved to
// another schema.
for (String tableName : TEST_TABLES) {
for (final String tableName : TEST_TABLES) {
final String dropTableQuery = String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, tableName);
super.database.execute(connection -> connection.createStatement().execute(dropTableQuery));
Expand Down Expand Up @@ -116,7 +116,7 @@ public String getDriverClass() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new Db2Source();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public boolean supportsSchemas() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new PostgresTestSource();
}

Expand Down Expand Up @@ -95,8 +95,8 @@ public PostgresTestSource() {
}

@Override
public JsonNode toDatabaseConfig(JsonNode config) {
ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
public JsonNode toDatabaseConfig(final JsonNode config) {
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s",
config.get("host").asText(),
Expand All @@ -115,7 +115,7 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

public static void main(String[] args) throws Exception {
public static void main(final String[] args) throws Exception {
final Source source = new PostgresTestSource();
LOGGER.info("starting source: {}", PostgresTestSource.class);
new IntegrationRunner(source).run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.SourceJdbcUtils;
import io.airbyte.integrations.source.relationaldb.models.DbState;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -95,7 +97,7 @@ public abstract class JdbcSourceAcceptanceTest {

public JsonNode config;
public JdbcDatabase database;
public AbstractJdbcSource source;
public Source source;
public static String streamName;

/**
Expand Down Expand Up @@ -126,21 +128,43 @@ public abstract class JdbcSourceAcceptanceTest {
/**
* An instance of the source that should be tests.
*
* @return abstract jdbc source
*/
public abstract AbstractJdbcSource getJdbcSource();

/**
* In some cases the Source that is being tested may be an AbstractJdbcSource, but because it is
* decorated, Java cannot recognize it as such. In these cases, as a workaround a user can choose to
* override getJdbcSource and have it return null. Then they can override this method with the
* decorated source AND override getToDatabaseConfigFunction with the appropriate
* toDatabaseConfigFunction that is hidden behind the decorator.
*
* @return source
*/
public abstract AbstractJdbcSource getSource();
public Source getSource() {
return getJdbcSource();
}

protected String createTableQuery(String tableName, String columnClause, String primaryKeyClause) {
/**
* See getSource() for when to override this method.
*
* @return a function that maps a source's config to a jdbc config.
*/
public Function<JsonNode, JsonNode> getToDatabaseConfigFunction() {
return getJdbcSource()::toDatabaseConfig;
}

protected String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) {
return String.format("CREATE TABLE %s(%s %s %s)",
tableName, columnClause, primaryKeyClause.equals("") ? "" : ",", primaryKeyClause);
}

protected String primaryKeyClause(List<String> columns) {
protected String primaryKeyClause(final List<String> columns) {
if (columns.isEmpty()) {
return "";
}

StringBuilder clause = new StringBuilder();
final StringBuilder clause = new StringBuilder();
clause.append("PRIMARY KEY (");
for (int i = 0; i < columns.size(); i++) {
clause.append(columns.get(i));
Expand All @@ -155,7 +179,7 @@ protected String primaryKeyClause(List<String> columns) {
public void setup() throws Exception {
source = getSource();
config = getConfig();
final JsonNode jdbcConfig = source.toDatabaseConfig(config);
final JsonNode jdbcConfig = getToDatabaseConfigFunction().apply(config);

streamName = TABLE_NAME;

Expand Down Expand Up @@ -253,7 +277,7 @@ void testCheckFailure() throws Exception {
@Test
void testDiscover() throws Exception {
final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config));
AirbyteCatalog expected = getCatalog(getDefaultNamespace());
final AirbyteCatalog expected = getCatalog(getDefaultNamespace());
assertEquals(expected.getStreams().size(), actual.getStreams().size());
actual.getStreams().forEach(actualStream -> {
final Optional<AirbyteStream> expectedStream =
Expand All @@ -265,7 +289,7 @@ void testDiscover() throws Exception {
});
}

protected AirbyteCatalog filterOutOtherSchemas(AirbyteCatalog catalog) {
protected AirbyteCatalog filterOutOtherSchemas(final AirbyteCatalog catalog) {
if (supportsSchemas()) {
final AirbyteCatalog filteredCatalog = Jsons.clone(catalog);
filteredCatalog.setStreams(filteredCatalog.getStreams()
Expand Down Expand Up @@ -312,7 +336,7 @@ void testDiscoverWithMultipleSchemas() throws Exception {
Field.of(COL_NAME, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));
// sort streams by name so that we are comparing lists with the same order.
Comparator<AirbyteStream> schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName());
final Comparator<AirbyteStream> schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName());
expected.getStreams().sort(schemaTableCompare);
actual.getStreams().sort(schemaTableCompare);
assertEquals(expected, filterOutOtherSchemas(actual));
Expand All @@ -325,7 +349,7 @@ void testReadSuccess() throws Exception {
source.read(config, getConfiguredCatalogWithOneStream(getDefaultNamespace()), null));

setEmittedAtToNull(actualMessages);
List<AirbyteMessage> expectedMessages = getTestMessages();
final List<AirbyteMessage> expectedMessages = getTestMessages();
assertThat(expectedMessages, Matchers.containsInAnyOrder(actualMessages.toArray()));
assertThat(actualMessages, Matchers.containsInAnyOrder(expectedMessages.toArray()));
}
Expand Down Expand Up @@ -596,7 +620,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
@Test
void testReadMultipleTablesIncrementally() throws Exception {
final String tableName2 = TABLE_NAME + 2;
String streamName2 = streamName + 2;
final String streamName2 = streamName + 2;
database.execute(ctx -> {
ctx.createStatement().execute(
createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)", ""));
Expand Down Expand Up @@ -692,34 +716,34 @@ void testReadMultipleTablesIncrementally() throws Exception {

// when initial and final cursor fields are the same.
private void incrementalCursorCheck(
String cursorField,
String initialCursorValue,
String endCursorValue,
List<AirbyteMessage> expectedRecordMessages)
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages)
throws Exception {
incrementalCursorCheck(cursorField, cursorField, initialCursorValue, endCursorValue,
expectedRecordMessages);
}

private void incrementalCursorCheck(
String initialCursorField,
String cursorField,
String initialCursorValue,
String endCursorValue,
List<AirbyteMessage> expectedRecordMessages)
final String initialCursorField,
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages)
throws Exception {
incrementalCursorCheck(initialCursorField, cursorField, initialCursorValue, endCursorValue,
expectedRecordMessages,
getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0));
}

private void incrementalCursorCheck(
String initialCursorField,
String cursorField,
String initialCursorValue,
String endCursorValue,
List<AirbyteMessage> expectedRecordMessages,
ConfiguredAirbyteStream airbyteStream)
final String initialCursorField,
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages,
final ConfiguredAirbyteStream airbyteStream)
throws Exception {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(Lists.newArrayList(cursorField));
Expand Down Expand Up @@ -856,13 +880,13 @@ protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException {
Field.of(COL_LAST_NAME_WITH_SPACE, JsonSchemaPrimitive.STRING));
}

public String getFullyQualifiedTableName(String tableName) {
public String getFullyQualifiedTableName(final String tableName) {
return SourceJdbcUtils.getFullyQualifiedTableName(getDefaultSchemaName(), tableName);
}

public void createSchemas() throws SQLException {
if (supportsSchemas()) {
for (String schemaName : TEST_SCHEMAS) {
for (final String schemaName : TEST_SCHEMAS) {
final String createSchemaQuery = String.format("CREATE SCHEMA %s;", schemaName);
database.execute(connection -> connection.createStatement().execute(createSchemaQuery));
}
Expand All @@ -871,15 +895,15 @@ public void createSchemas() throws SQLException {

public void dropSchemas() throws SQLException {
if (supportsSchemas()) {
for (String schemaName : TEST_SCHEMAS) {
for (final String schemaName : TEST_SCHEMAS) {
final String dropSchemaQuery = String
.format(DROP_SCHEMA_QUERY, schemaName);
database.execute(connection -> connection.createStatement().execute(dropSchemaQuery));
}
}
}

private JsonNode convertIdBasedOnDatabase(int idValue) {
private JsonNode convertIdBasedOnDatabase(final int idValue) {
if (getDriverClass().toLowerCase().contains("oracle")) {
return Jsons.jsonNode(BigDecimal.valueOf(idValue));
} else if (getDriverClass().toLowerCase().contains("snowflake")) {
Expand All @@ -902,8 +926,8 @@ protected String getDefaultNamespace() {
}
}

protected static void setEmittedAtToNull(Iterable<AirbyteMessage> messages) {
for (AirbyteMessage actualMessage : messages) {
protected static void setEmittedAtToNull(final Iterable<AirbyteMessage> messages) {
for (final AirbyteMessage actualMessage : messages) {
if (actualMessage.getRecord() != null) {
actualMessage.getRecord().setEmittedAt(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public JsonNode getConfig() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new MssqlSource();
}

Expand Down
Loading

0 comments on commit f308690

Please sign in to comment.