Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor JDBC JSON_Format into JDBC utils #7504

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,22 @@

package io.airbyte.db.jdbc;

import org.jooq.JSONFormat;

public class JdbcUtils {

private static final JdbcSourceOperations defaultSourceOperations = new JdbcSourceOperations();

public static JdbcSourceOperations getDefaultSourceOperations() {
private static final JSONFormat defaultJSONFormat = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

public static JdbcSourceOperations getDefaultSourcxeOperations() {
cgardens marked this conversation as resolved.
Show resolved Hide resolved
return defaultSourceOperations;
}

public static JSONFormat getDefaultJSONFormat() {
return defaultJSONFormat;
}

public static String getFullyQualifiedTableName(final String schemaName, final String tableName) {
return schemaName != null ? schemaName + "." + tableName : tableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.copy.StreamCopierFactory;
Expand All @@ -31,8 +32,6 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,7 +40,6 @@ public class DatabricksDestinationAcceptanceTest extends DestinationAcceptanceTe

private static final Logger LOGGER = LoggerFactory.getLogger(DatabricksDestinationAcceptanceTest.class);
private static final String SECRETS_CONFIG_JSON = "secrets/config.json";
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private final ExtendedNameTransformer nameTransformer = new DatabricksNameTransformer();
private JsonNode configJson;
Expand Down Expand Up @@ -85,7 +83,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
.orderBy(field(JavaBaseConstants.COLUMN_NAME_EMITTED_AT).asc())
.fetch().stream()
.map(record -> {
final JsonNode json = Jsons.deserialize(record.formatJSON(JSON_FORMAT));
final JsonNode json = Jsons.deserialize(record.formatJSON(JdbcUtils.getDefaultJSONFormat()));
final JsonNode jsonWithOriginalFields = nameUpdater.getJsonWithOriginalFieldNames(json);
return AvroRecordHelper.pruneAirbyteJson(jsonWithOriginalFields);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.testcontainers.containers.PostgreSQLContainer;

public class JdbcDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private PostgreSQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

Expand Down Expand Up @@ -99,7 +96,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
Expand All @@ -23,16 +24,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MSSQLServerContainer;

public class MssqlStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private static MSSQLServerContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private JsonNode config;
Expand Down Expand Up @@ -130,7 +128,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,20 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.MSSQLServerContainer;

public class MSSQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private static MSSQLServerContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private JsonNode configWithoutDbName;
Expand Down Expand Up @@ -120,7 +117,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,21 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.utility.DockerImageName;

public class MSSQLDestinationAcceptanceTestSSL extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private static MSSQLServerContainer<?> db;
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();
private JsonNode configWithoutDbName;
Expand Down Expand Up @@ -129,7 +126,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
return ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
Expand All @@ -21,8 +22,6 @@
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.MSSQLServerContainer;
import org.testcontainers.containers.Network;
Expand All @@ -33,8 +32,6 @@
*/
public abstract class SshMSSQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private final String schemaName = RandomStringUtils.randomAlphabetic(8).toLowerCase();
Expand Down Expand Up @@ -148,7 +145,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
database, schema, tableName.toLowerCase(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
Expand All @@ -26,16 +27,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLStrictEncryptDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

Expand Down Expand Up @@ -115,7 +112,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
Expand All @@ -26,16 +27,12 @@
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class MySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

Expand Down Expand Up @@ -117,7 +114,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
Expand All @@ -22,16 +23,13 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;

/**
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshMySQLDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();
private String schemaName;

Expand Down Expand Up @@ -131,7 +129,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schema, tableName.toLowerCase(),
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;

public class SslMySQLDestinationAcceptanceTest extends MySQLDestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);

private MySQLContainer<?> db;
private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer();

Expand Down Expand Up @@ -108,7 +105,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
Expand All @@ -21,13 +22,10 @@
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.testcontainers.containers.Network;

public abstract class SshOracleDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new OracleNameTransformer();

private final String schemaName = "TEST_ORCL";
Expand Down Expand Up @@ -116,7 +114,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT)))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(r -> r.formatJSON(JdbcUtils.getDefaultJSONFormat()))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}
Expand Down
Loading