Skip to content

Commit

Permalink
Format java code (#12401)
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren authored Apr 27, 2022
1 parent 9f577bb commit b70a6fb
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public class DatabricksConstants {
"delta.autoOptimize.optimizeWrite = true",
"delta.autoOptimize.autoCompact = true");

private DatabricksConstants() {
}
private DatabricksConstants() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public interface SqlOperations {
/**
* Create a schema with provided name if it does not already exist.
*
* @param database Database that the connector is syncing
* @param database Database that the connector is syncing
* @param schemaName Name of schema.
* @throws Exception exception
*/
Expand All @@ -30,7 +30,7 @@ public interface SqlOperations {
/**
* Denotes whether the schema exists in destination database
*
* @param database Database that the connector is syncing
* @param database Database that the connector is syncing
* @param schemaName Name of schema.
* @return true if the schema exists in destination database, false if it doesn't
*/
Expand All @@ -41,19 +41,19 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
/**
* Create a table with provided name in provided schema if it does not already exist.
*
* @param database Database that the connector is syncing
* @param database Database that the connector is syncing
* @param schemaName Name of schema
* @param tableName Name of table
* @param tableName Name of table
* @throws Exception exception
*/
void createTableIfNotExists(JdbcDatabase database, String schemaName, String tableName) throws Exception;

/**
* Query to create a table with provided name in provided schema if it does not already exist.
*
* @param database Database that the connector is syncing
* @param database Database that the connector is syncing
* @param schemaName Name of schema
* @param tableName Name of table
* @param tableName Name of table
* @return query
*/
String createTableQuery(JdbcDatabase database, String schemaName, String tableName);
Expand All @@ -62,38 +62,39 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
* Drop the table if it exists.
*
* @param schemaName Name of schema
* @param tableName Name of table
* @param tableName Name of table
* @throws Exception exception
*/
void dropTableIfExists(JdbcDatabase database, String schemaName, String tableName) throws Exception;

/**
* Query to remove all records from a table. Assumes the table exists.
*
* @param database Database that the connector is syncing
* @param database Database that the connector is syncing
* @param schemaName Name of schema
* @param tableName Name of table
* @param tableName Name of table
* @return Query
*/
String truncateTableQuery(JdbcDatabase database, String schemaName, String tableName);

/**
* Insert records into table. Assumes the table exists.
*
* @param database Database that the connector is syncing
* @param records Records to insert.
* @param database Database that the connector is syncing
* @param records Records to insert.
* @param schemaName Name of schema
* @param tableName Name of table
* @param tableName Name of table
* @throws Exception exception
*/
void insertRecords(JdbcDatabase database, List<AirbyteRecordMessage> records, String schemaName, String tableName) throws Exception;

/**
* Query to copy all records from source table to destination table. Both tables must be in the specified schema. Assumes both table exist.
* Query to copy all records from source table to destination table. Both tables must be in the
* specified schema. Assumes both table exist.
*
* @param database Database that the connector is syncing
* @param schemaName Name of schema
* @param sourceTableName Name of source table
* @param database Database that the connector is syncing
* @param schemaName Name of schema
* @param sourceTableName Name of source table
* @param destinationTableName Name of destination table
* @return Query
*/
Expand All @@ -103,7 +104,7 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
* Given an arbitrary number of queries, execute a transaction.
*
* @param database Database that the connector is syncing
* @param queries Queries to execute
* @param queries Queries to execute
* @throws Exception exception
*/
void executeTransaction(JdbcDatabase database, List<String> queries) throws Exception;
Expand All @@ -120,19 +121,21 @@ default boolean isSchemaExists(final JdbcDatabase database, final String schemaN
*/
boolean isSchemaRequired();


/**
* The method is responsible for executing some specific DB Engine logic in onClose method. We can override this method to execute specific logic
* e.g. to handle any necessary migrations in the destination, etc.
* The method is responsible for executing some specific DB Engine logic in onClose method. We can
* override this method to execute specific logic e.g. to handle any necessary migrations in the
* destination, etc.
* <p>
* In next example you can see how migration from VARCHAR to SUPER column is handled for the Redshift destination:
* In next example you can see how migration from VARCHAR to SUPER column is handled for the
* Redshift destination:
*
* @param database - Database that the connector is interacting with
* @param schemaNames - schemas will be discovered
* @param schemaNames - schemas will be discovered
* @see io.airbyte.integrations.destination.redshift.RedshiftSqlOperations#onDestinationCloseOperations
*/
default void onDestinationCloseOperations(JdbcDatabase database, Set<String> schemaNames) {
// do nothing
LOGGER.info("No onDestinationCloseOperations required for this destination.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ public class CopyConsumerFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(CopyConsumerFactory.class);

public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> outputRecordCollector,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final ExtendedNameTransformer namingResolver,
final T config,
final ConfiguredAirbyteCatalog catalog,
final StreamCopierFactory<T> streamCopierFactory,
final String defaultSchema) {
final JdbcDatabase database,
final SqlOperations sqlOperations,
final ExtendedNameTransformer namingResolver,
final T config,
final ConfiguredAirbyteCatalog catalog,
final StreamCopierFactory<T> streamCopierFactory,
final String defaultSchema) {
final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier = createWriteConfigs(
namingResolver,
config,
Expand All @@ -65,12 +65,12 @@ public static <T> AirbyteMessageConsumer create(final Consumer<AirbyteMessage> o
}

private static <T> Map<AirbyteStreamNameNamespacePair, StreamCopier> createWriteConfigs(final ExtendedNameTransformer namingResolver,
final T config,
final ConfiguredAirbyteCatalog catalog,
final StreamCopierFactory<T> streamCopierFactory,
final String defaultSchema,
final JdbcDatabase database,
final SqlOperations sqlOperations) {
final T config,
final ConfiguredAirbyteCatalog catalog,
final StreamCopierFactory<T> streamCopierFactory,
final String defaultSchema,
final JdbcDatabase database,
final SqlOperations sqlOperations) {
final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier = new HashMap<>();
final String stagingFolder = UUID.randomUUID().toString();
for (final var configuredStream : catalog.getStreams()) {
Expand All @@ -89,8 +89,8 @@ private static OnStartFunction onStartFunction(final Map<AirbyteStreamNameNamesp
}

private static RecordWriter<AirbyteRecordMessage> recordWriterFunction(final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier,
final SqlOperations sqlOperations,
final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount) {
final SqlOperations sqlOperations,
final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount) {
return (AirbyteStreamNameNamespacePair pair, List<AirbyteRecordMessage> records) -> {
final var fileName = pairToCopier.get(pair).prepareStagingFile();
for (final AirbyteRecordMessage recordMessage : records) {
Expand All @@ -117,9 +117,9 @@ private static CheckAndRemoveRecordWriter removeStagingFilePrinter(final Map<Air
}

private static OnCloseFunction onCloseFunction(final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier,
final JdbcDatabase database,
final SqlOperations sqlOperations,
final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount) {
final JdbcDatabase database,
final SqlOperations sqlOperations,
final Map<AirbyteStreamNameNamespacePair, Long> pairToIgnoredRecordCount) {
return (hasFailed) -> {
pairToIgnoredRecordCount
.forEach((pair, count) -> LOGGER.warn("A total of {} record(s) of data from stream {} were invalid and were ignored.", count, pair));
Expand All @@ -128,9 +128,9 @@ private static OnCloseFunction onCloseFunction(final Map<AirbyteStreamNameNamesp
}

private static void closeAsOneTransaction(final Map<AirbyteStreamNameNamespacePair, StreamCopier> pairToCopier,
boolean hasFailed,
final JdbcDatabase db,
final SqlOperations sqlOperations)
boolean hasFailed,
final JdbcDatabase db,
final SqlOperations sqlOperations)
throws Exception {
Exception firstException = null;
List<StreamCopier> streamCopiers = new ArrayList<>(pairToCopier.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,5 @@ public interface StreamCopier {
* @return current staging file name
*/
String getCurrentFile();

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.UUID;
import org.joda.time.DateTime;

public interface StagingOperations extends SqlOperations {
public interface StagingOperations extends SqlOperations {

String getStageName(String namespace, String streamName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.mariadb_columnstore;

import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;

import java.util.ArrayList;
import java.util.List;

public class MariaDbTestDataComparator extends AdvancedTestDataComparator {

private final ExtendedNameTransformer namingResolver = new MariadbColumnstoreNameTransformer();
private final ExtendedNameTransformer namingResolver = new MariadbColumnstoreNameTransformer();

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
return result;
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MariaDBContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
import io.airbyte.commons.stream.MoreStreams;
import io.airbyte.commons.text.Names;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.mongodb.MongoDatabase;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.util.ArrayList;
import java.util.List;

import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.util.ArrayList;
import java.util.List;
import org.bson.Document;
import org.testcontainers.containers.MongoDBContainer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
Expand All @@ -23,9 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;

import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ public String truncateTableQuery(final JdbcDatabase database, final String schem

@Override
public void insertRecords(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
final String schemaName,
final String tempTableName)
final List<AirbyteRecordMessage> records,
final String schemaName,
final String tempTableName)
throws Exception {
final String tableName = String.format("%s.%s", schemaName, tempTableName);
final String columns = String.format("(%s, %s, %s)",
Expand All @@ -107,11 +107,11 @@ public void insertRecords(final JdbcDatabase database,

// Adapted from SqlUtils.insertRawRecordsInSingleQuery to meet some needs specific to Oracle syntax
private static void insertRawRecordsInSingleQuery(final String tableName,
final String columns,
final String recordQueryComponent,
final JdbcDatabase jdbcDatabase,
final List<AirbyteRecordMessage> records,
final Supplier<UUID> uuidSupplier)
final String columns,
final String recordQueryComponent,
final JdbcDatabase jdbcDatabase,
final List<AirbyteRecordMessage> records,
final Supplier<UUID> uuidSupplier)
throws SQLException {
if (records.isEmpty()) {
return;
Expand Down Expand Up @@ -152,9 +152,9 @@ private static void insertRawRecordsInSingleQuery(final String tableName,

@Override
public String copyTableQuery(final JdbcDatabase database,
final String schemaName,
final String sourceTableName,
final String destinationTableName) {
final String schemaName,
final String sourceTableName,
final String destinationTableName) {
return String.format("INSERT INTO %s.%s SELECT * FROM %s.%s\n", schemaName, destinationTableName, schemaName, sourceTableName);
}

Expand Down
Loading

0 comments on commit b70a6fb

Please sign in to comment.