Skip to content

Commit

Permalink
Refactor JDBC JSON_Format into JDBC utils (#7504)
Browse files Browse the repository at this point in the history
  • Loading branch information
haoranyu authored Nov 3, 2021
1 parent b39274d commit 92fcdea
Show file tree
Hide file tree
Showing 18 changed files with 42 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@

package io.airbyte.db.jdbc;

import org.jooq.JSONFormat;

public class JdbcUtils {

private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();

private static final JSONFormat defaultJSONFormat = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

public static JdbcSourceOperations getDefaultSourceOperations() {
return defaultSourceOperations;
}

public static JSONFormat getDefaultJSONFormat() {
return defaultJSONFormat;
}

public static String getFullyQualifiedTableName(final String schemaName, final String tableName) {
return schemaName != null ? schemaName + "." + tableName : tableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
Expand All @@ -31,8 +32,6 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +40,6 @@ public class DatabricksDestinationAcceptanceTest extends DestinationAcceptanceTe

private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestinationAcceptanceTest.class);
private static final String SECRETS_CONFIG_JSON = "secrets/config.json";
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private final ExtendedNameTransformer nameTransformer = new DatabricksNameTransformer();
private JsonNode configJson;
Expand Down Expand Up @@ -85,7 +83,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
.orderBy(field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT).asc())
.fetch().stream()
.map(record -> {
final JsonNode json = Jsons.deserialize(record.formatJSON(JSON_FORMAT));
final JsonNode json = Jsons.deserialize(record.formatJSON(JdbcUtils.getDefaultJSONFormat()));
final JsonNode jsonWithOriginalFields = nameUpdater.getJsonWithOriginalFieldNames(json);
return AvroRecordHelper.pruneAirbyteJson(jsonWithOriginalFields);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.testcontainers.containers.PostgreSQLContainer;

public class JdbcDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private PostgreSQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

Expand Down Expand Up @@ -99,7 +96,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
Expand All @@ -23,16 +24,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MSSQLServerContainer;

public class MssqlStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private static MSSQLServerContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private JsonNode config;
Expand Down Expand Up @@ -130,7 +128,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,20 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.MSSQLServerContainer;

public class MSSQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private static MSSQLServerContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private JsonNode configWithoutDbName;
Expand Down Expand Up @@ -120,7 +117,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.utility.DockerImageName;

public class MSSQLDestinationAcceptanceTestSSL extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private static MSSQLServerContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private JsonNode configWithoutDbName;
Expand Down Expand Up @@ -129,7 +126,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
Expand All @@ -21,8 +22,6 @@
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.Network;
Expand All @@ -33,8 +32,6 @@
*/
public abstract class SshMSSQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private final String schemaName = RandomStringUtils.randomAlphabetic(8).toLowerCase();
Expand Down Expand Up @@ -148,7 +145,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
database, schema, tableName.toLowerCase(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
Expand All @@ -26,16 +27,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

Expand Down Expand Up @@ -115,7 +112,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
Expand All @@ -26,16 +27,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

Expand Down Expand Up @@ -117,7 +114,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
Expand All @@ -22,16 +23,13 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;

/**
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshMySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
private String schemaName;

Expand Down Expand Up @@ -131,7 +129,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName.toLowerCase(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class SslMySQLDestinationAcceptanceTest extends MySQLDestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

Expand Down Expand Up @@ -108,7 +105,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
Expand All @@ -21,13 +22,10 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.testcontainers.containers.Network;

public abstract class SshOracleDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new OracleNameTransformer();

private final String schemaName = "TEST_ORCL";
Expand Down Expand Up @@ -116,7 +114,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT)))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Loading

0 comments on commit 92fcdea

Please sign in to comment.