Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing SqlServerOperations.java #3454

Merged
merged 3 commits into from
May 17, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@
import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class SqlServerOperations implements SqlOperations {

@Override
public void createSchemaIfNotExists(JdbcDatabase database, String schemaName) throws Exception {
final String query = String.format("IF NOT EXISTS ( SELECT * FROM sys.schemas WHERE name = '%s') EXEC('CREATE SCHEMA [%s]')",
schemaName,
schemaName);
schemaName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this from formatting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the formatter did a bunch of stuff to it. The substantive changes are mostly down at the bottom.

schemaName);
database.execute(query);
}

Expand All @@ -51,24 +49,24 @@ public void createTableIfNotExists(JdbcDatabase database, String schemaName, Str
@Override
public String createTableQuery(String schemaName, String tableName) {
return String.format(
"IF NOT EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id "
+ "WHERE s.name = '%s' AND t.name = '%s') "
+ "CREATE TABLE %s.%s ( \n"
+ "%s VARCHAR(64) PRIMARY KEY,\n"
+ "%s VARCHAR(MAX),\n"
+ "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET()\n"
+ ");\n",
schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
"IF NOT EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id "
+ "WHERE s.name = '%s' AND t.name = '%s') "
+ "CREATE TABLE %s.%s ( \n"
+ "%s VARCHAR(64) PRIMARY KEY,\n"
+ "%s VARCHAR(MAX),\n"
+ "%s DATETIMEOFFSET(7) DEFAULT SYSDATETIMEOFFSET()\n"
+ ");\n",
schemaName, tableName, schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

@Override
public void dropTableIfExists(JdbcDatabase database, String schemaName, String tableName) throws Exception {
final String query = String.format(
"IF EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id "
+ "WHERE s.name = '%s' AND t.name = '%s') "
+ "DROP TABLE %s.%s",
schemaName, tableName, schemaName, tableName);
"IF EXISTS (SELECT * FROM sys.tables t JOIN sys.schemas s ON t.schema_id = s.schema_id "
+ "WHERE s.name = '%s' AND t.name = '%s') "
+ "DROP TABLE %s.%s",
schemaName, tableName, schemaName, tableName);
database.execute(query);
}

Expand All @@ -78,17 +76,16 @@ public String truncateTableQuery(String schemaName, String tableName) {
}

@Override
public void insertRecords(JdbcDatabase database, Stream<AirbyteRecordMessage> recordsStream, String schemaName, String tempTableName)
throws Exception {
final List<AirbyteRecordMessage> records = recordsStream.collect(Collectors.toList());
public void insertRecords(JdbcDatabase database, List<AirbyteRecordMessage> records, String schemaName, String tempTableName)
throws Exception {

final String insertQueryComponent = String.format(
"INSERT INTO %s.%s (%s, %s, %s) VALUES\n",
schemaName,
tempTableName,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
"INSERT INTO %s.%s (%s, %s, %s) VALUES\n",
schemaName,
tempTableName,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
final String recordQueryComponent = "(?, ?, ?),\n";
SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, database, records);
}
Expand All @@ -99,8 +96,18 @@ public String copyTableQuery(String schemaName, String sourceTableName, String d
}

@Override
public void executeTransaction(JdbcDatabase database, String queries) throws Exception {
database.execute("BEGIN TRAN;\n" + queries + "COMMIT TRAN;");
public void executeTransaction(JdbcDatabase database, List<String> queries) throws Exception {
database.execute("BEGIN TRAN;\n" + String.join("\n", queries) + "\nCOMMIT TRAN;");
}

@Override
public boolean isValidData(String data) {
return true;
}

@Override
public boolean isSchemaRequired() {
return true;
}

}