Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination Oracle - Added support for connection via ssh tunnel #6370

Merged
merged 9 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ jobs:
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
MYSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MYSQL_SSH_KEY_TEST_CREDS }}
MYSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MYSQL_SSH_PWD_TEST_CREDS }}
ORACLE_SSH_KEY_TEST_CREDS: ${{ secrets.ORACLE_SSH_KEY_TEST_CREDS }}
ORACLE_SSH_PWD_TEST_CREDS: ${{ secrets.ORACLE_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ jobs:
POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.POSTGRES_SSH_PWD_TEST_CREDS }}
MYSQL_SSH_KEY_TEST_CREDS: ${{ secrets.MYSQL_SSH_KEY_TEST_CREDS }}
MYSQL_SSH_PWD_TEST_CREDS: ${{ secrets.MYSQL_SSH_PWD_TEST_CREDS }}
ORACLE_SSH_KEY_TEST_CREDS: ${{ secrets.ORACLE_SSH_KEY_TEST_CREDS }}
ORACLE_SSH_PWD_TEST_CREDS: ${{ secrets.ORACLE_SSH_PWD_TEST_CREDS }}
POSTHOG_TEST_CREDS: ${{ secrets.POSTHOG_TEST_CREDS }}
PIPEDRIVE_INTEGRATION_TESTS_CREDS: ${{ secrets.PIPEDRIVE_INTEGRATION_TESTS_CREDS }}
RECHARGE_INTEGRATION_TEST_CREDS: ${{ secrets.RECHARGE_INTEGRATION_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "3986776d-2319-4de9-8af8-db14c0996e72",
"name": "Oracle (Alpha)",
"dockerRepository": "airbyte/destination-oracle",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/oracle"
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
- destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
name: Oracle (Alpha)
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/oracle
- destinationDefinitionId: 9f760101-60ae-462f-9ee6-b7a9dafd454d
name: Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-oracle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation "com.oracle.database.jdbc:ojdbc8-production:19.7.0.0"

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.testcontainers:oracle-xe:1.15.2'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-oracle')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(OracleDestination.class);
public static final List<String> HOST_KEY = List.of("host");
public static final List<String> PORT_KEY = List.of("port");

public static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";

Expand Down Expand Up @@ -66,7 +70,7 @@ public JsonNode toJdbcConfig(JsonNode config) {
}

public static void main(String[] args) throws Exception {
final Destination destination = new OracleDestination();
final Destination destination = new SshWrappedDestination(new OracleDestination(), HOST_KEY, PORT_KEY);
LOGGER.info("starting destination: {}", OracleDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", OracleDestination.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public String convertStreamName(String input) {
if (!result.isEmpty() && result.charAt(0) == '_') {
result = result.substring(1);
}
return maxStringLength(result, 128);
// prior to Oracle version 12.2, identifiers are not allowed to exceed 30 characters in length.
// However, from version 12.2 they can be up to 128 bytes long. (Note: bytes, not characters).
return maxStringLength(result, 30);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ public void createSchemaIfNotExists(JdbcDatabase database, String schemaName) th
LOGGER.warn("Schema " + schemaName + " is not found! Trying to create a new one.");
final String query = String.format("create user %s identified by %s quota unlimited on %s",
schemaName, schemaName, tablespace);
// need to grant privileges to new user / this option is not mandatory for Oracle DB 18c or higher
final String privileges = String.format("GRANT ALL PRIVILEGES TO %s", schemaName);
database.execute(query);
database.execute(privileges);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.oracle;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshKeyOracleDestinationAcceptanceTest extends SshOracleDestinationAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_KEY_AUTH;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.oracle;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.ssh.SshBastionContainer;
import io.airbyte.integrations.base.ssh.SshTunnel;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.OracleContainer;

public abstract class SshOracleDestinationAcceptanceTest extends DestinationAcceptanceTest {

private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(JSONFormat.RecordFormat.OBJECT);

private final ExtendedNameTransformer namingResolver = new OracleNameTransformer();

private final String schemaName = "TEST_ORCL";

private final SshBastionContainer sshBastionContainer = new SshBastionContainer();

private OracleContainer db;

public abstract SshTunnel.TunnelMethod getTunnelMethod();

@Override
protected String getImageName() {
return "airbyte/destination-oracle:dev";
}

@Override
protected JsonNode getConfig() throws IOException, InterruptedException {
return sshBastionContainer.getTunnelConfig(getTunnelMethod(), getBasicOracleDbConfigBuider(db).put("schema", schemaName));
}

public ImmutableMap.Builder<Object, Object> getBasicOracleDbConfigBuider(OracleContainer db) {
return ImmutableMap.builder()
.put("host", Objects.requireNonNull(db.getContainerInfo().getNetworkSettings()
.getNetworks()
.get(((Network.NetworkImpl) sshBastionContainer.getNetWork()).getName())
.getIpAddress()))
.put("username", db.getUsername())
.put("password", db.getPassword())
.put("port", db.getExposedPorts().get(0))
.put("sid", db.getSid())
.put("schemas", List.of("JDBC_SPACE"))
.put("ssl", false);
}

@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}

@Override
protected JsonNode getFailCheckConfig() throws Exception {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put("password", "wrong password");
return clone;
}

@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv, String streamName, String namespace, JsonNode streamSchema) throws Exception {
List<JsonNode> jsonNodes = retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace);
return jsonNodes
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA.toUpperCase()).asText()))
.collect(Collectors.toList());
}

@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv env, String streamName, String namespace)
throws Exception {
String tableName = namingResolver.getIdentifier(streamName);
return retrieveRecordsFromTable(tableName, namespace);
}

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception {
final JsonNode config = getConfig();
return SshTunnel.sshWrap(
config,
OracleDestination.HOST_KEY,
OracleDestination.PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig)
.query(
ctx -> ctx
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT)))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.collect(Collectors.toList()));
}

@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
startTestContainers();
SshTunnel.sshWrap(
getConfig(),
OracleDestination.HOST_KEY,
OracleDestination.PORT_KEY,
mangledConfig -> {
Database databaseFromConfig = getDatabaseFromConfig(mangledConfig);
databaseFromConfig.query(ctx -> ctx.fetch(String.format("CREATE USER %s IDENTIFIED BY %s", schemaName, schemaName)));
databaseFromConfig.query(ctx -> ctx.fetch(String.format("GRANT ALL PRIVILEGES TO %s", schemaName)));
});
}

private void startTestContainers() {
sshBastionContainer.initAndStartBastion();
initAndStartJdbcContainer();
}

private void initAndStartJdbcContainer() {
db = new OracleContainer("epiclabs/docker-oracle-xe-11g")
.withNetwork(sshBastionContainer.getNetWork());
db.start();
}

private Database getDatabaseFromConfig(final JsonNode config) {
return Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:oracle:thin:@//%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("sid").asText()),
"oracle.jdbc.driver.OracleDriver",
null);
}

@Override
protected void tearDown(TestDestinationEnv testEnv) throws Exception {
SshTunnel.sshWrap(
getConfig(),
OracleDestination.HOST_KEY,
OracleDestination.PORT_KEY,
mangledConfig -> {
Database databaseFromConfig = getDatabaseFromConfig(mangledConfig);
databaseFromConfig.query(ctx -> ctx.fetch(String.format("DROP USER %s CASCADE", schemaName)));
});

sshBastionContainer.stopAndCloseContainers(db);
}

@Override
protected boolean supportsDBT() {
return true;
}

@Override
protected boolean implementsNamespaces() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.oracle;

import io.airbyte.integrations.base.ssh.SshTunnel;

public class SshPasswordOracleDestinationAcceptanceTest extends SshOracleDestinationAcceptanceTest {

@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH;
}

}
Loading