Skip to content

Commit

Permalink
Fix ssh tunneling for normalization (#6396)
Browse files Browse the repository at this point in the history
* switch to custom file for ssh config in normalization

* bump version

* get local port properly

* added unit test for write_ssh_config

* format
  • Loading branch information
Phlair authored Sep 23, 2021
1 parent 0b1f1e2 commit 3d8625e
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 23 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.1.45
LABEL io.airbyte.version=0.1.46
LABEL io.airbyte.name=airbyte/normalization
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ function main() {
run)
configuredbt
. /airbyte/sshtunneling.sh
openssh $CONFIG_FILE "${PROJECT_DIR}/localsshport.json"
openssh "${PROJECT_DIR}/ssh.json"
trap 'closessh' EXIT
# Run dbt to compile and execute the generated normalization models
dbt run --profiles-dir "${PROJECT_DIR}" --project-dir "${PROJECT_DIR}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def run(self, args):
transformed_config = self.transform(integration_type, original_config)
self.write_yaml_config(inputs["output_path"], transformed_config, "profiles.yml")
if self.is_ssh_tunnelling(original_config):
self.write_ssh_port(inputs["output_path"], self.pick_a_port())
self.write_ssh_config(inputs["output_path"], original_config, transformed_config)

@staticmethod
def parse(args):
Expand Down Expand Up @@ -282,17 +282,21 @@ def write_yaml_config(output_path: str, config: Dict[str, Any], filename: str):
fh.write(yaml.dump(config))

@staticmethod
def write_ssh_port(output_path: str, port: int):
def write_ssh_config(output_path: str, original_config: Dict[str, Any], transformed_config: Dict[str, Any]):
"""
This function writes a small json file with content like {"port":xyz}
This is being used only when ssh tunneling.
We do this because we need to decide on and save this port number into our dbt config
and then use that same port in sshtunneling.sh when opening the tunnel.
This function writes a json file with config specific to ssh.
We do this because we need these details to open the ssh tunnel for dbt.
"""
ssh_dict = {
"db_host": original_config["host"],
"db_port": original_config["port"],
"tunnel_map": original_config["tunnel_method"],
"local_port": transformed_config["normalize"]["outputs"]["prod"]["port"],
}
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(os.path.join(output_path, "localsshport.json"), "w") as fh:
json.dump({"port": port}, fh)
with open(os.path.join(output_path, "ssh.json"), "w") as fh:
json.dump(ssh_dict, fh)


def main(args=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
#


import json
import os
import socket
import tempfile
import time

import pytest
Expand Down Expand Up @@ -339,3 +341,42 @@ def test_parse(self):
assert {"integration_type": DestinationType.postgres, "config": "config.json", "output_path": "out.yml"} == t.parse(
["--integration-type", "postgres", "--config", "config.json", "--out", "out.yml"]
)

def test_write_ssh_config(self):
original_config_input = {
"type": "postgres",
"dbname": "my_db",
"host": "airbyte.io",
"pass": "password123",
"port": 5432,
"schema": "public",
"threads": 32,
"user": "a user",
"tunnel_method": {
"tunnel_host": "1.2.3.4",
"tunnel_method": "SSH_PASSWORD_AUTH",
"tunnel_port": 22,
"tunnel_user": "user",
"tunnel_user_password": "pass",
},
}
transformed_config_input = self.get_base_config()
transformed_config_input["normalize"]["outputs"]["prod"] = {
"port": 7890,
}
expected = {
"db_host": "airbyte.io",
"db_port": 5432,
"tunnel_map": {
"tunnel_host": "1.2.3.4",
"tunnel_method": "SSH_PASSWORD_AUTH",
"tunnel_port": 22,
"tunnel_user": "user",
"tunnel_user_password": "pass",
},
"local_port": 7890,
}
tmp_path = tempfile.TemporaryDirectory().name
TransformConfig.write_ssh_config(tmp_path, original_config_input, transformed_config_input)
with open(os.path.join(tmp_path, "ssh.json"), "r") as f:
assert json.load(f) == expected
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);

public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.45";
public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.46";

private final DestinationType destinationType;
private final ProcessFactory processFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ if [[ -f "${CWD}/bq_keyfile.json" ]]; then
fi

. $CWD/sshtunneling.sh
openssh $CWD/destination_config.json $CWD/localsshport.json
openssh $CWD/ssh.json
trap 'closessh' EXIT

# Add mandatory flags profiles-dir and project-dir when calling dbt when necessary
Expand Down
21 changes: 10 additions & 11 deletions airbyte-workers/src/main/resources/sshtunneling.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# This function opens an ssh tunnel if required using values provided in config.
# Requires two arguments,
# path to config file ($1)
# path to file containing local port to use ($2)
# Requires one argument,
# path to ssh config file ($1)
function openssh() {
# check if jq is missing, and if so try to install it..
# this is janky but for custom dbt transform we can't be sure jq is installed as using user docker image
Expand All @@ -14,20 +13,20 @@ function openssh() {
fi
# tunnel_db_host and tunnel_db_port currently rely on the destination's spec using "host" and "port" as keys for these values
# if adding ssh support for a new destination where this is not the case, extra logic will be needed to capture these dynamically
tunnel_db_host=$(cat $1 | jq -r '.host')
tunnel_db_port=$(cat $1 | jq -r '.port')
tunnel_method=$(cat $1 | jq -r '.tunnel_method.tunnel_method' | tr '[:lower:]' '[:upper:]')
tunnel_username=$(cat $1 | jq -r '.tunnel_method.tunnel_user')
tunnel_host=$(cat $1 | jq -r '.tunnel_method.tunnel_host')
tunnel_local_port=$(cat $2 | jq -r '.port')
tunnel_db_host=$(cat $1 | jq -r '.db_host')
tunnel_db_port=$(cat $1 | jq -r '.db_port')
tunnel_method=$(cat $1 | jq -r '.tunnel_map.tunnel_method' | tr '[:lower:]' '[:upper:]')
tunnel_username=$(cat $1 | jq -r '.tunnel_map.tunnel_user')
tunnel_host=$(cat $1 | jq -r '.tunnel_map.tunnel_host')
tunnel_local_port=$(cat $1 | jq -r '.local_port')
# set a path for a control socket, allowing us to close this specific ssh connection when desired
tmpcontrolsocket="/tmp/sshsocket${tunnel_db_remote_port}-${RANDOM}"
if [[ ${tunnel_method} = "SSH_KEY_AUTH" ]] ; then
echo "Detected tunnel method SSH_KEY_AUTH for normalization"
# create a temporary file to hold ssh key and trap to delete on EXIT
trap 'rm -f "$tmpkeyfile"' EXIT
tmpkeyfile=$(mktemp /tmp/xyzfile.XXXXXXXXXXX) || exit 1
echo "$(cat $1 | jq -r '.tunnel_method.ssh_key')" > $tmpkeyfile
echo "$(cat $1 | jq -r '.tunnel_map.ssh_key')" > $tmpkeyfile
# -f=background -N=no remote command -M=master mode StrictHostKeyChecking=no auto-adds host
echo "Running: ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -i {key file} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}"
ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -i $tmpkeyfile -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} &&
Expand All @@ -44,7 +43,7 @@ function openssh() {
{ dnf install epel-release -y && dnf install sshpass -y; } || exit 1
fi
# put ssh password in env var for use in sshpass. Better than directly passing with -p
export SSHPASS=$(cat $1 | jq -r '.tunnel_method.tunnel_user_password')
export SSHPASS=$(cat $1 | jq -r '.tunnel_map.tunnel_user_password')
echo "Running: sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S {control socket} -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host}"
sshpass -e ssh -f -N -M -o StrictHostKeyChecking=no -S $tmpcontrolsocket -l ${tunnel_username} -L ${tunnel_local_port}:${tunnel_db_host}:${tunnel_db_port} ${tunnel_host} &&
sshopen="yes" &&
Expand Down

0 comments on commit 3d8625e

Please sign in to comment.