Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exposing SSL-only version of Postgres Source #6362

Merged
merged 12 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.spec_modification;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.Source;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;

/**
* In some cases we want to prune or mutate the spec for an existing source. The common case is that
* we want to remove features that are not appropriate for some reason. e.g. In cloud, we do not
* want to allow users to send data unencrypted.
*/
public abstract class SpecModifyingSource implements Source {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if all we're doing is modify the spec is a whole new superclass worth it? why not just extend the source directly and modify the spec method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair. I think we can reuse this class for all the db sources that this applies to so it gives us a nice convention to work from if we want to expand it later. i can just document it once. otherwise we're just going to extend a bunch of sources and have to explain it in a bunch of places.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree with @cgardens the decoration pattern makes the feature of modifying the spec more composable


private final Source source;

public SpecModifyingSource(final Source source) {
this.source = source;
}

public abstract ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception;

@Override
public ConnectorSpecification spec() throws Exception {
return modifySpec(source.spec());
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
return source.check(config);
}

@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
return source.discover(config);
}

@Override
public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state)
throws Exception {
return source.read(config, catalog, state);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public String getDriverClass() {
}

@Override
public String createTableQuery(String tableName, String columnClause, String primaryKeyClause) {
public String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) {
// ClickHouse requires Engine to be mentioned as part of create table query.
// Refer : https://clickhouse.tech/docs/en/engines/table-engines/ for more information
return String.format("CREATE TABLE %s(%s) %s",
Expand All @@ -56,12 +56,12 @@ public void tearDown() throws SQLException {
}

@Override
public String primaryKeyClause(List<String> columns) {
public String primaryKeyClause(final List<String> columns) {
if (columns.isEmpty()) {
return "";
}

StringBuilder clause = new StringBuilder();
final StringBuilder clause = new StringBuilder();
clause.append("(");
for (int i = 0; i < columns.size(); i++) {
clause.append(columns.get(i));
Expand Down Expand Up @@ -91,7 +91,7 @@ public void setup() throws Exception {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new ClickHouseSource();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public boolean supportsSchemas() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new CockroachDbSource();
}

Expand Down Expand Up @@ -360,7 +360,7 @@ void testReadMultipleTables() throws Exception {
@Test
void testReadMultipleTablesIncrementally() throws Exception {
final String tableName2 = TABLE_NAME + 2;
String streamName2 = streamName + 2;
final String streamName2 = streamName + 2;
database.execute(ctx -> {
ctx.createStatement().execute(
createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)",
Expand Down Expand Up @@ -493,7 +493,7 @@ void testDiscoverWithMultipleSchemas() throws Exception {
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID))));

// sort streams by name so that we are comparing lists with the same order.
Comparator<AirbyteStream> schemaTableCompare = Comparator
final Comparator<AirbyteStream> schemaTableCompare = Comparator
.comparing(stream -> stream.getNamespace() + "." + stream.getName());
expected.getStreams().sort(schemaTableCompare);
actual.getStreams().sort(schemaTableCompare);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void setup() throws Exception {
public void clean() throws Exception {
// In Db2 before dropping a schema, all objects that were in that schema must be dropped or moved to
// another schema.
for (String tableName : TEST_TABLES) {
for (final String tableName : TEST_TABLES) {
final String dropTableQuery = String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, tableName);
super.database.execute(connection -> connection.createStatement().execute(dropTableQuery));
Expand Down Expand Up @@ -116,7 +116,7 @@ public String getDriverClass() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new Db2Source();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public boolean supportsSchemas() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new PostgresTestSource();
}

Expand Down Expand Up @@ -95,8 +95,8 @@ public PostgresTestSource() {
}

@Override
public JsonNode toDatabaseConfig(JsonNode config) {
ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
public JsonNode toDatabaseConfig(final JsonNode config) {
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s",
config.get("host").asText(),
Expand All @@ -115,7 +115,7 @@ public Set<String> getExcludedInternalNameSpaces() {
return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history");
}

public static void main(String[] args) throws Exception {
public static void main(final String[] args) throws Exception {
final Source source = new PostgresTestSource();
LOGGER.info("starting source: {}", PostgresTestSource.class);
new IntegrationRunner(source).run(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.commons.util.MoreIterators;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore this class for now. i had to commit some hacks to get this to work. will clean it up if we stick with this approach.

import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.SourceJdbcUtils;
import io.airbyte.integrations.source.relationaldb.models.DbState;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -95,7 +97,7 @@ public abstract class JdbcSourceAcceptanceTest {

public JsonNode config;
public JdbcDatabase database;
public AbstractJdbcSource source;
public Source source;
public static String streamName;

/**
Expand Down Expand Up @@ -126,21 +128,43 @@ public abstract class JdbcSourceAcceptanceTest {
/**
* An instance of the source that should be tests.
*
* @return abstract jdbc source
*/
public abstract AbstractJdbcSource getJdbcSource();

/**
* In some cases the Source that is being tested may be an AbstractJdbcSource, but because it is
* decorated, Java cannot recognize it as such. In these cases, as a workaround a user can choose to
* override getJdbcSource and have it return null. Then they can override this method with the
* decorated source AND override getToDatabaseConfigFunction with the appropriate
* toDatabaseConfigFunction that is hidden behind the decorator.
*
* @return source
*/
public abstract AbstractJdbcSource getSource();
public Source getSource() {
return getJdbcSource();
}

protected String createTableQuery(String tableName, String columnClause, String primaryKeyClause) {
/**
* See getSource() for when to override this method.
*
* @return a function that maps a source's config to a jdbc config.
*/
public Function<JsonNode, JsonNode> getToDatabaseConfigFunction() {
return getJdbcSource()::toDatabaseConfig;
}

protected String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) {
return String.format("CREATE TABLE %s(%s %s %s)",
tableName, columnClause, primaryKeyClause.equals("") ? "" : ",", primaryKeyClause);
}

protected String primaryKeyClause(List<String> columns) {
protected String primaryKeyClause(final List<String> columns) {
if (columns.isEmpty()) {
return "";
}

StringBuilder clause = new StringBuilder();
final StringBuilder clause = new StringBuilder();
clause.append("PRIMARY KEY (");
for (int i = 0; i < columns.size(); i++) {
clause.append(columns.get(i));
Expand All @@ -155,7 +179,7 @@ protected String primaryKeyClause(List<String> columns) {
public void setup() throws Exception {
source = getSource();
config = getConfig();
final JsonNode jdbcConfig = source.toDatabaseConfig(config);
final JsonNode jdbcConfig = getToDatabaseConfigFunction().apply(config);

streamName = TABLE_NAME;

Expand Down Expand Up @@ -253,7 +277,7 @@ void testCheckFailure() throws Exception {
@Test
void testDiscover() throws Exception {
final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config));
AirbyteCatalog expected = getCatalog(getDefaultNamespace());
final AirbyteCatalog expected = getCatalog(getDefaultNamespace());
assertEquals(expected.getStreams().size(), actual.getStreams().size());
actual.getStreams().forEach(actualStream -> {
final Optional<AirbyteStream> expectedStream =
Expand All @@ -265,7 +289,7 @@ void testDiscover() throws Exception {
});
}

protected AirbyteCatalog filterOutOtherSchemas(AirbyteCatalog catalog) {
protected AirbyteCatalog filterOutOtherSchemas(final AirbyteCatalog catalog) {
if (supportsSchemas()) {
final AirbyteCatalog filteredCatalog = Jsons.clone(catalog);
filteredCatalog.setStreams(filteredCatalog.getStreams()
Expand Down Expand Up @@ -312,7 +336,7 @@ void testDiscoverWithMultipleSchemas() throws Exception {
Field.of(COL_NAME, JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));
// sort streams by name so that we are comparing lists with the same order.
Comparator<AirbyteStream> schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName());
final Comparator<AirbyteStream> schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName());
expected.getStreams().sort(schemaTableCompare);
actual.getStreams().sort(schemaTableCompare);
assertEquals(expected, filterOutOtherSchemas(actual));
Expand All @@ -325,7 +349,7 @@ void testReadSuccess() throws Exception {
source.read(config, getConfiguredCatalogWithOneStream(getDefaultNamespace()), null));

setEmittedAtToNull(actualMessages);
List<AirbyteMessage> expectedMessages = getTestMessages();
final List<AirbyteMessage> expectedMessages = getTestMessages();
assertThat(expectedMessages, Matchers.containsInAnyOrder(actualMessages.toArray()));
assertThat(actualMessages, Matchers.containsInAnyOrder(expectedMessages.toArray()));
}
Expand Down Expand Up @@ -596,7 +620,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
@Test
void testReadMultipleTablesIncrementally() throws Exception {
final String tableName2 = TABLE_NAME + 2;
String streamName2 = streamName + 2;
final String streamName2 = streamName + 2;
database.execute(ctx -> {
ctx.createStatement().execute(
createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)", ""));
Expand Down Expand Up @@ -692,34 +716,34 @@ void testReadMultipleTablesIncrementally() throws Exception {

// when initial and final cursor fields are the same.
private void incrementalCursorCheck(
String cursorField,
String initialCursorValue,
String endCursorValue,
List<AirbyteMessage> expectedRecordMessages)
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages)
throws Exception {
incrementalCursorCheck(cursorField, cursorField, initialCursorValue, endCursorValue,
expectedRecordMessages);
}

private void incrementalCursorCheck(
String initialCursorField,
String cursorField,
String initialCursorValue,
String endCursorValue,
List<AirbyteMessage> expectedRecordMessages)
final String initialCursorField,
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages)
throws Exception {
incrementalCursorCheck(initialCursorField, cursorField, initialCursorValue, endCursorValue,
expectedRecordMessages,
getConfiguredCatalogWithOneStream(getDefaultNamespace()).getStreams().get(0));
}

private void incrementalCursorCheck(
String initialCursorField,
String cursorField,
String initialCursorValue,
String endCursorValue,
List<AirbyteMessage> expectedRecordMessages,
ConfiguredAirbyteStream airbyteStream)
final String initialCursorField,
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages,
final ConfiguredAirbyteStream airbyteStream)
throws Exception {
airbyteStream.setSyncMode(SyncMode.INCREMENTAL);
airbyteStream.setCursorField(Lists.newArrayList(cursorField));
Expand Down Expand Up @@ -856,13 +880,13 @@ protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException {
Field.of(COL_LAST_NAME_WITH_SPACE, JsonSchemaPrimitive.STRING));
}

public String getFullyQualifiedTableName(String tableName) {
public String getFullyQualifiedTableName(final String tableName) {
return SourceJdbcUtils.getFullyQualifiedTableName(getDefaultSchemaName(), tableName);
}

public void createSchemas() throws SQLException {
if (supportsSchemas()) {
for (String schemaName : TEST_SCHEMAS) {
for (final String schemaName : TEST_SCHEMAS) {
final String createSchemaQuery = String.format("CREATE SCHEMA %s;", schemaName);
database.execute(connection -> connection.createStatement().execute(createSchemaQuery));
}
Expand All @@ -871,15 +895,15 @@ public void createSchemas() throws SQLException {

public void dropSchemas() throws SQLException {
if (supportsSchemas()) {
for (String schemaName : TEST_SCHEMAS) {
for (final String schemaName : TEST_SCHEMAS) {
final String dropSchemaQuery = String
.format(DROP_SCHEMA_QUERY, schemaName);
database.execute(connection -> connection.createStatement().execute(dropSchemaQuery));
}
}
}

private JsonNode convertIdBasedOnDatabase(int idValue) {
private JsonNode convertIdBasedOnDatabase(final int idValue) {
if (getDriverClass().toLowerCase().contains("oracle")) {
return Jsons.jsonNode(BigDecimal.valueOf(idValue));
} else if (getDriverClass().toLowerCase().contains("snowflake")) {
Expand All @@ -902,8 +926,8 @@ protected String getDefaultNamespace() {
}
}

protected static void setEmittedAtToNull(Iterable<AirbyteMessage> messages) {
for (AirbyteMessage actualMessage : messages) {
protected static void setEmittedAtToNull(final Iterable<AirbyteMessage> messages) {
for (final AirbyteMessage actualMessage : messages) {
if (actualMessage.getRecord() != null) {
actualMessage.getRecord().setEmittedAt(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public JsonNode getConfig() {
}

@Override
public AbstractJdbcSource getSource() {
public AbstractJdbcSource getJdbcSource() {
return new MssqlSource();
}

Expand Down
Loading