Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ssh tunnel for postgres normalization #5818

Closed
wants to merge 77 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
6d531f9
oracle normalization
marcosmarxm Aug 22, 2021
078be20
correct dbt_project function for oracle
marcosmarxm Aug 24, 2021
b1f4d81
unit tests
marcosmarxm Aug 24, 2021
36959b1
run format
marcosmarxm Aug 24, 2021
c22a678
Merge branch 'master' into marcosmarxm/normalization-oracle
marcosmarxm Aug 24, 2021
1c23695
correct ephemeral tests
marcosmarxm Aug 25, 2021
f797107
add gradle dependency for oracle destination
marcosmarxm Aug 25, 2021
545be65
run int tests
marcosmarxm Aug 25, 2021
22c903e
add oracle in settings.gradle for normalization run[
marcosmarxm Aug 25, 2021
91db2cd
autogenerated finals
cgardens Aug 30, 2021
8aaadaa
init
cgardens Aug 30, 2021
04c09fa
remove dynamic generation of data
cgardens Aug 30, 2021
f1bfd1b
styling
cgardens Aug 30, 2021
b5aa5f4
clean up
cgardens Aug 30, 2021
800f16c
clean
cgardens Aug 30, 2021
c78df0b
opinionated capitalization
cgardens Aug 30, 2021
b1943d5
more opinionated caps
cgardens Aug 30, 2021
8cb4988
sometimes you have to roll the hard six
cgardens Aug 30, 2021
df19286
victory
cgardens Aug 30, 2021
e2f30b8
add secrets to ci
cgardens Aug 30, 2021
0755c53
fix those tests
cgardens Aug 30, 2021
6cd9311
clean
cgardens Aug 30, 2021
817f1fa
wip
cgardens Aug 30, 2021
53e92c6
use default airbyte columns
marcosmarxm Aug 31, 2021
b529041
format
marcosmarxm Sep 1, 2021
202af8f
test all destinatoin ephemeral
marcosmarxm Sep 1, 2021
5924283
correct unit test
marcosmarxm Sep 1, 2021
a697133
correct unit test
marcosmarxm Sep 1, 2021
48a71b0
destination docs update
marcosmarxm Sep 1, 2021
3b4723d
correct mypy
marcosmarxm Sep 1, 2021
90115d4
integration test all dest
marcosmarxm Sep 1, 2021
4ac2f85
refactor oracle function
marcosmarxm Sep 1, 2021
508efa0
Merge branch 'master' into marcosmarxm/normalization-oracle
marcosmarxm Sep 1, 2021
881afbe
merge master
marcosmarxm Sep 1, 2021
7334102
run all destinations
marcosmarxm Sep 1, 2021
fdc3dc3
flake8 escape regex
marcosmarxm Sep 1, 2021
252b996
surrogate key function
marcosmarxm Sep 2, 2021
b64724a
correct few minor comments
marcosmarxm Sep 2, 2021
97787c2
refactor scd sql function
marcosmarxm Sep 2, 2021
f7f4b25
postgres ssh normalization working except for custom normalization
Phlair Sep 2, 2021
fa7b131
refactor scd function
marcosmarxm Sep 2, 2021
48381ad
revert test
marcosmarxm Sep 2, 2021
4ab8d79
refactor minor details
marcosmarxm Sep 2, 2021
475e00c
revert tests
marcosmarxm Sep 2, 2021
8b2b27f
revert ephemeral test
marcosmarxm Sep 2, 2021
0d92570
revert unit test table_registry
marcosmarxm Sep 2, 2021
1ee9f90
revert airbyte_protocol format
marcosmarxm Sep 2, 2021
69f6143
Merge branch 'master' into marcosmarxm/normalization-oracle
marcosmarxm Sep 2, 2021
b3e0e2e
format
marcosmarxm Sep 2, 2021
defc0fc
bump normalization version in worker
marcosmarxm Sep 2, 2021
82de69d
minor chnages
marcosmarxm Sep 2, 2021
e588198
minor chages
marcosmarxm Sep 2, 2021
61452a9
correct json_column for other destinations
marcosmarxm Sep 2, 2021
02bf9af
gradlew format
marcosmarxm Sep 2, 2021
69d065c
wip
cgardens Aug 30, 2021
47d3f33
update change log
cgardens Sep 3, 2021
d43b500
update to use new config schema
cgardens Sep 3, 2021
e63a5a3
fix tests to clean themselves up
cgardens Sep 3, 2021
5f19695
fully working but need to pull and adapt to Charles' latest changes
Phlair Sep 3, 2021
260a204
Merge + necessary changes
Phlair Sep 6, 2021
323129e
Delete SshTunnel.java
Phlair Sep 6, 2021
935f1af
Delete SshWrappedJdbcDestination.java
Phlair Sep 6, 2021
ea1f7f8
update normalization version
Phlair Sep 6, 2021
f1cc6cb
Merge branch 'george/new-pg-ssh-norm' of https://github.com/airbytehq…
Phlair Sep 6, 2021
4ca4696
revert tests
marcosmarxm Sep 7, 2021
728c3e5
remove comments
marcosmarxm Sep 7, 2021
e342018
address review comments
Phlair Sep 7, 2021
5e341bc
add Oracle destination explicit in safe_cast_str
marcosmarxm Sep 7, 2021
5fd3fc0
add quote_in_parenthesis inside if clause
marcosmarxm Sep 7, 2021
3337c48
gradlew format
marcosmarxm Sep 7, 2021
67815d3
copying sshtunnel script with gradle
Phlair Sep 7, 2021
442c707
added instructions for adding support for other dests
Phlair Sep 7, 2021
82a4de4
Merge branch 'master' into marcosmarxm/normalization-oracle
marcosmarxm Sep 7, 2021
d769abc
fix ssh script copy
Phlair Sep 7, 2021
1a98048
Merge branch 'marcosmarxm/normalization-oracle' of https://github.com…
Phlair Sep 7, 2021
8be8147
added my changes back into transform.py
Phlair Sep 7, 2021
3cd895e
bump version
Phlair Sep 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ jobs:
BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }}
CART_TEST_CREDS: ${{ secrets.CART_TEST_CREDS }}
CHARGEBEE_INTEGRATION_TEST_CREDS: ${{ secrets.CHARGEBEE_INTEGRATION_TEST_CREDS }}
DESTINATION_POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.DESTINATION_POSTGRES_SSH_KEY_TEST_CREDS }}
DESTINATION_POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.DESTINATION_POSTGRES_SSH_PWD_TEST_CREDS }}
DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }}
DESTINATION_KEEN_TEST_CREDS: ${{ secrets.DESTINATION_KEEN_TEST_CREDS }}
DESTINATION_KVDB_TEST_CREDS: ${{ secrets.DESTINATION_KVDB_TEST_CREDS }}
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ jobs:
BRAINTREE_TEST_CREDS: ${{ secrets.BRAINTREE_TEST_CREDS }}
CART_TEST_CREDS: ${{ secrets.CART_TEST_CREDS }}
CHARGEBEE_INTEGRATION_TEST_CREDS: ${{ secrets.CHARGEBEE_INTEGRATION_TEST_CREDS }}
DESTINATION_POSTGRES_SSH_KEY_TEST_CREDS: ${{ secrets.DESTINATION_POSTGRES_SSH_KEY_TEST_CREDS }}
DESTINATION_POSTGRES_SSH_PWD_TEST_CREDS: ${{ secrets.DESTINATION_POSTGRES_SSH_PWD_TEST_CREDS }}
DESTINATION_PUBSUB_TEST_CREDS: ${{ secrets.DESTINATION_PUBSUB_TEST_CREDS }}
DESTINATION_KEEN_TEST_CREDS: ${{ secrets.DESTINATION_KEEN_TEST_CREDS }}
DESTINATION_KVDB_TEST_CREDS: ${{ secrets.DESTINATION_KVDB_TEST_CREDS }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package io.airbyte.integrations.base;

import io.airbyte.commons.concurrency.VoidCallable;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.protocol.models.AirbyteMessage;

Expand Down Expand Up @@ -56,4 +57,29 @@ public interface AirbyteMessageConsumer extends CheckedConsumer<AirbyteMessage,
@Override
void close() throws Exception;

/**
* Append a function to be called on {@link AirbyteMessageConsumer#close}.
*/
static AirbyteMessageConsumer appendOnClose(final AirbyteMessageConsumer consumer, final VoidCallable voidCallable) {
return new AirbyteMessageConsumer() {

@Override
public void start() throws Exception {
consumer.start();
}

@Override
public void accept(final AirbyteMessage message) throws Exception {
consumer.accept(message);
}

@Override
public void close() throws Exception {
consumer.close();
voidCallable.call();
}

};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.base.ssh;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.List;
import java.util.function.Consumer;

/**
* Decorates a Destination with an SSH Tunnel using the standard configuration that Airbyte uses for
* configuring SSH.
*/
public class SshWrappedDestination implements Destination {

private final Destination delegate;
private final List<String> hostKey;
private final List<String> portKey;

public SshWrappedDestination(final Destination delegate,
final List<String> hostKey,
final List<String> portKey) {
this.delegate = delegate;
this.hostKey = hostKey;
this.portKey = portKey;
}

@Override
public ConnectorSpecification spec() throws Exception {
// inject the standard ssh configuration into the spec.
final ConnectorSpecification originalSpec = delegate.spec();
final ObjectNode propNode = (ObjectNode) originalSpec.getConnectionSpecification().get("properties");
propNode.set("tunnel_method", Jsons.deserialize(MoreResources.readResource("ssh-tunnel-spec.json")));
return originalSpec;
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
return SshTunnel.sshWrap(config, hostKey, portKey, delegate::check);
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final Consumer<AirbyteMessage> outputRecordCollector)
throws Exception {
final SshTunnel tunnel = SshTunnel.getInstance(config, hostKey, portKey);
return AirbyteMessageConsumer.appendOnClose(delegate.getConsumer(tunnel.getConfigInTunnel(), catalog, outputRecordCollector), tunnel::close);
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Developing an SSH Source
# Developing an SSH Connector

## Goal
Easy development of any source that needs the ability to connect to a resource via SSH Tunnel.
Easy development of any connector that needs the ability to connect to a resource via SSH Tunnel.

## Overview
Our SSH connector support is designed to be easy to plug into any existing connector. There are a few major pieces to consider:
1. Add SSH Configuration to the Spec - for SSH, we need to take in additional configuration, so we need to inject extra fields into the connector configuration.
2. Add SSH Logic to the Connector - before the connector code begins to execute we need to start an SSH tunnel. This library provides logic to create that tunnel (and clean it up).
3. Acceptance Testing - it is a good practice to include acceptance testing for the SSH version of a connector for at least one of the SSH types (password or ssh key). While unit testing for the SSH functionality exists in this package (coming soon), high-level acceptance testing to make sure this feature works with the individual connector belongs in the connector.
4. Normalization Support for Destinations - if the connector is a destination and supports normalization, there's a small change required in the normalization code to update the config so that dbt uses the right credentials for the SSH tunnel.

## How To

Expand All @@ -21,6 +22,15 @@ Our SSH connector support is designed to be easy to plug into any existing conne
### Acceptance Testing
1. The only difference between existing acceptance testing and acceptance testing with SSH is that the configuration that is used for testing needs to contain additional fields. You can see the `Postgres Source ssh key creds` in lastpass to see an example of what that might look like. Those credentials leverage an existing bastion host in our test infrastructure. (As future work, we want to get rid of the need to use a static bastion server and instead do it in docker so we can run it all locally.)

### Normalization Support for Destinations
1. The core functionality for ssh tunnelling with normalization is already in place but you'll need to add a small tweak to `transform_config/transform.py` in the normalization module. Find the function `transform_{connector}()` and add at the start:
```
if TransformConfig.is_ssh_tunnelling(config):
config = TransformConfig.get_ssh_altered_config(config, port_key="port", host_key="host")
```
Replace port_key and host_key as necessary. Look at `transform_postgres()` to see an example.
2. If your `host_key="host"` and `port_key="port"` then step 1 should be sufficient. However if the key names differ for your connector, you will also need to add some logic into `sshtunneling.sh` (within airbyte-workers) to handle this, as currently it assumes that the keys are exactly `host` and `port`.

## Misc

### How to wrap the protocol in an SSH Tunnel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*
!Dockerfile
!entrypoint.sh
!build/sshtunneling.sh
!setup.py
!normalization
!dbt-project-template
23 changes: 22 additions & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
FROM fishtownanalytics/dbt:0.19.0

USER root
WORKDIR /tmp
RUN apt-get update && apt-get install -y \
wget \
unzip \
libaio-dev \
libaio1
RUN mkdir -p /opt/oracle
RUN wget https://download.oracle.com/otn_software/linux/instantclient/19600/instantclient-basic-linux.x64-19.6.0.0.0dbru.zip
RUN unzip instantclient-basic-linux.x64-19.6.0.0.0dbru.zip -d /opt/oracle
ENV ORACLE_HOME /opt/oracle/instantclient_19_6
ENV LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$ORACLE_HOME
ENV TNS_ADMIN /opt/oracle/instantclient_19_6/network/admin
RUN pip install cx_Oracle

COPY --from=airbyte/base-airbyte-protocol-python:0.1.1 /airbyte /airbyte

RUN apt-get update && apt-get install -y jq sshpass

WORKDIR /airbyte
COPY entrypoint.sh .
COPY build/sshtunneling.sh .

WORKDIR /airbyte/normalization_code
COPY normalization ./normalization
Expand All @@ -14,8 +33,10 @@ RUN pip install .

WORKDIR /airbyte/normalization_code
RUN pip install .
RUN pip install dbt-oracle==0.4.3
RUN pip install git+https://github.com/dbeatty10/dbt-mysql@96655ea9f7fca7be90c9112ce8ffbb5aac1d3716#egg=dbt-mysql


WORKDIR /airbyte/normalization_code/dbt-template/
# Download external dbt dependencies
RUN dbt deps
Expand All @@ -24,5 +45,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.42
LABEL io.airbyte.version=0.1.44
LABEL io.airbyte.name=airbyte/normalization
1 change: 1 addition & 0 deletions airbyte-integrations/bases/base-normalization/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ allowed characters, if quotes are needed or not, and the length limitations:
- [redshift](../../../docs/integrations/destinations/redshift.md)
- [snowflake](../../../docs/integrations/destinations/snowflake.md)
- [mysql](../../../docs/integrations/destinations/mysql.md)
- [oracle](../../../docs/integrations/destinations/oracle.md)

Rules about truncations, for example for both of these strings which are too long for the postgres 64 limit:
- `Aaaa_Bbbb_Cccc_Dddd_Eeee_Ffff_Gggg_Hhhh_Iiii`
Expand Down
21 changes: 21 additions & 0 deletions airbyte-integrations/bases/base-normalization/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,28 @@ airbytePython {
}

dependencies {
implementation project(':airbyte-workers')
implementation files(project(':airbyte-integrations:bases:airbyte-protocol').airbyteDocker.outputs)
}

// we need to access the sshtunneling script from airbyte-workers for ssh support
task copySshScript(type: Copy, dependsOn: [project(':airbyte-workers').processResources]) {
from "${project(':airbyte-workers').buildDir}/resources/main"
into "${buildDir}"
include "sshtunneling.sh"
}

// make sure the copy task above worked (if it fails, it fails silently annoyingly)
task checkSshScriptCopy(type: Task, dependsOn: copySshScript) {
doFirst {
assert file("${buildDir}/sshtunneling.sh").exists() :
"Copy of sshtunneling.sh failed, check that it is present in airbyte-workers."
}
}

test.dependsOn checkSshScriptCopy
assemble.dependsOn checkSshScriptCopy

installReqs.dependsOn(":airbyte-integrations:bases:airbyte-protocol:installReqs")
integrationTest.dependsOn(build)

Expand All @@ -24,6 +43,8 @@ task("customIntegrationTestPython", type: PythonTask, dependsOn: installTestReqs
dependsOn ':airbyte-integrations:connectors:destination-postgres:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-redshift:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-snowflake:airbyteDocker'
dependsOn ':airbyte-integrations:connectors:destination-oracle:airbyteDocker'

}

integrationTest.dependsOn("customIntegrationTestPython")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# This file is necessary to install dbt-utils with dbt deps
# the content will be overwritten by the transform function

# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
cross join unnest({{ array_col }}) as {{ array_col }}
{%- endmacro %}

{% macro oracle__cross_join_unnest(stream_name, array_col) -%}
{% do exceptions.warn("Normalization does not support unnesting for Oracle yet.") %}
{%- endmacro %}

{% macro postgres__cross_join_unnest(stream_name, array_col) -%}
cross join jsonb_array_elements(
case jsonb_typeof({{ array_col }})
Expand Down Expand Up @@ -66,6 +70,10 @@
_airbyte_nested_data
{%- endmacro %}

{% macro oracle__unnested_column_value(column_col) -%}
{{ column_col }}
{%- endmacro %}

{# unnest_cte ------------------------------------------------- #}

{% macro unnest_cte(table_name, stream_name, column_col) -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
jsonb
{% endmacro %}

{%- macro oracle__type_json() -%}
varchar2(4000)
{%- endmacro -%}

{% macro snowflake__type_json() %}
variant
{% endmacro %}
Expand All @@ -31,24 +35,39 @@
char
{%- endmacro -%}

{%- macro oracle__type_string() -%}
varchar2(4000)
{%- endmacro -%}


{# float ------------------------------------------------- #}
{% macro mysql__type_float() %}
float
{% endmacro %}

{% macro oracle__type_float() %}
float
{% endmacro %}


{# int ------------------------------------------------- #}
{% macro default__type_int() %}
signed
{% endmacro %}

{% macro oracle__type_int() %}
int
{% endmacro %}

{# bigint ------------------------------------------------- #}
{% macro mysql__type_bigint() %}
signed
{% endmacro %}

{% macro oracle__type_bigint() %}
numeric
{% endmacro %}


{# numeric ------------------------------------------------- #}
{% macro mysql__type_numeric() %}
Expand Down Expand Up @@ -81,6 +100,10 @@
char
{%- endmacro -%}

{% macro oracle__type_timestamp_with_timezone() %}
varchar2(4000)
{% endmacro %}


{# date ------------------------------------------------- #}

Expand All @@ -91,3 +114,7 @@
{% macro default__type_date() %}
date
{% endmacro %}

{% macro oracle__type_date() %}
varchar2(4000)
{% endmacro %}
Loading