Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable CustomDbtNormalization on k8s #238

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.config.WorkerConfigsProvider.ResourceType;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.WorkerConstants;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -41,6 +43,7 @@ public class DbtTransformationRunner implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class);
private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh";
private static final String DBT_CONFIG_JSON = "dbt_config.json";
private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("dbt")
.setPrefixColor(Color.PURPLE_BACKGROUND);
Expand Down Expand Up @@ -104,7 +107,11 @@ public boolean transform(final String jobId,
try {
final Map<String, String> files = ImmutableMap.of(
DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh"),
"sshtunneling.sh", MoreResources.readResource("sshtunneling.sh"));
"sshtunneling.sh", MoreResources.readResource("sshtunneling.sh"),
"generate_profile.py", MoreResources.readResource("generate_profile.py"),
"dbt_config.json", Jsons.serialize(dbtConfig),
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config)
);
final List<String> dbtArguments = new ArrayList<>();
dbtArguments.add(DBT_ENTRYPOINT_SH);
if (Strings.isNullOrEmpty(dbtConfig.getDbtArguments())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@
set -e

CWD=$(pwd)
if [[ ! -d git_repo ]]
then
echo "Preparing environment to build dbt profile...\n"
echo "Instaling jq lib..."
apt update && apt install jq tree -y
echo "Cloning data in pod..."
cat dbt_config.json | jq '"git clone --depth 5 -b " + (.gitRepoBranch) + " --single-branch " + (.gitRepoUrl) + " git_repo"' | xargs /bin/bash -c
echo "Generating profiles.yaml ..."
python generate_profile.py
echo "Listing folders ... tree -L 2"
tree -L 2
fi
# change directory to be inside git_repo that was just cloned
cd git_repo
echo "Running from $(pwd)"
Expand Down
64 changes: 64 additions & 0 deletions airbyte-commons-worker/src/main/resources/generate_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import yaml
import json
import re

default_bq_profile = {
"config": {
"partial_parse": True,
"printer_width": 120,
"send_anonymous_usage_stats": False,
"use_colors": True
},
"normalize": {
"outputs": {
"prod": {
"dataset": "",
"keyfile_json": {},
"location": "",
"method": "service-account-json",
"priority": "interactive",
"project": "",
"retries": 3,
"threads": 8,
"type": "bigquery"
}
},
"target": "prod"
}
}

if __name__ == "__main__":
print("Reading destination config data and parsing...")
with open("destination_config.json") as f:
dest_conf = json.loads(f.read())

profile_vars = {
"dataset": dest_conf["dataset_id"],
"location": dest_conf["dataset_location"],
"project": dest_conf["project_id"],
"keyfile_json": json.loads(dest_conf['credentials_json'])
}
default_bq_profile["normalize"]["outputs"]["prod"].update(profile_vars)

print("Writing dbt profile to execute...")
with open("profiles.yml", "w") as f:
f.write(yaml.dump(default_bq_profile))

print("Checking if need to fix --project-dir if not pointing to /config/git_repo...")
with open("dbt_config.json") as f:
dbt_config = json.loads(f.read())

if "--project-dir" in dbt_config["dbtArguments"]:
print("Project dir setted in arguments... checking correct path...")
dbt_args = dbt_config["dbtArguments"]
pattern = r"(?<=--project-dir )(.*?)(?=[\b|\s]|$)"
found = re.findall(pattern, dbt_args)[0]
if "/config/git_repo" not in found:
dbt_config['dbtArguments'] = re.sub(pattern, r"/config/git_repo/{}".format(found), dbt_args)

with open("dbt_config.json", "w") as f:
f.write(json.dumps(dbt_config))

else:
print("Nothing to fix...")

Loading