Skip to content

Commit

Permalink
Add thread_id context var
Browse files Browse the repository at this point in the history
  • Loading branch information
NiallRees committed Jun 24, 2023
1 parent 9836f7b commit 943d49c
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 24 deletions.
6 changes: 6 additions & 0 deletions core/dbt/context/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
from typing import Any, Dict, NoReturn, Optional, Mapping, Iterable, Set, List
import threading

from dbt.flags import get_flags
import dbt.flags as flags_module
Expand Down Expand Up @@ -596,6 +597,11 @@ def invocation_id(self) -> Optional[str]:
"""
return get_invocation_id()

@contextproperty
def thread_id(self) -> str:
"""thread_id outputs an ID for the current thread (useful for auditing)"""
return threading.current_thread().name

@contextproperty
def modules(self) -> Dict[str, Any]:
"""The `modules` variable in the Jinja context contains useful Python
Expand Down
3 changes: 2 additions & 1 deletion tests/adapter/dbt/tests/adapter/hooks/data/seed_model.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ create table {schema}.on_model_hook (
target_pass TEXT,
target_threads INTEGER,
run_started_at TEXT,
invocation_id TEXT
invocation_id TEXT,
thread_id TEXT
);
3 changes: 2 additions & 1 deletion tests/adapter/dbt/tests/adapter/hooks/data/seed_run.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ create table {schema}.on_run_hook (
target_pass TEXT,
target_threads INTEGER,
run_started_at TEXT,
invocation_id TEXT
invocation_id TEXT,
thread_id TEXT,
);
52 changes: 35 additions & 17 deletions tests/adapter/dbt/tests/adapter/hooks/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
target_pass,
target_threads,
run_started_at,
invocation_id
invocation_id,
thread_id
) VALUES (
'{{ state }}',
'{{ target.dbname }}',
Expand All @@ -52,7 +53,8 @@
'{{ target.get("pass", "") }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
'{{ invocation_id }}',
'{{ thread_id }}'\
)
{% endmacro %}
Expand Down Expand Up @@ -83,7 +85,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -95,7 +98,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -109,7 +113,9 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'end',\
'{{ target.dbname }}',\
Expand All @@ -121,7 +127,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)"
})
}}
Expand All @@ -144,7 +151,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -156,7 +164,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'
'{{ invocation_id }}',\
'{{ thread_id }}'\
)",
"pre-hook": "\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -170,7 +179,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -182,7 +192,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'
'{{ invocation_id }}',\
'{{ thread_id }}'\
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -196,7 +207,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'end',\
'{{ target.dbname }}',\
Expand All @@ -208,7 +220,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)"
})
}}
Expand All @@ -231,7 +244,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -243,7 +257,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)",
post_hook="\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -257,7 +272,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id\
invocation_id,\
thread_id
) VALUES (\
'end',\
'{{ target.dbname }}',\
Expand All @@ -269,7 +285,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)"
)
}}
Expand All @@ -292,7 +309,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}' as target_pass,\
{{ target.threads }} as target_threads,\
'{{ run_started_at }}' as run_started_at,\
'{{ invocation_id }}' as invocation_id
'{{ invocation_id }}' as invocation_id,\
'{{ thread_id }}' as thread_id
from {{ ref('pre') }}\
"
})
Expand Down
17 changes: 12 additions & 5 deletions tests/adapter/dbt/tests/adapter/hooks/test_model_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
target_pass,
target_threads,
run_started_at,
invocation_id
invocation_id,
thread_id
) VALUES (
'start',
'{{ target.dbname }}',
Expand All @@ -47,7 +48,8 @@
'{{ target.get("pass", "") }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
'{{ invocation_id }}',
'{{ thread_id }}'
)
"""

Expand All @@ -63,7 +65,8 @@
target_pass,
target_threads,
run_started_at,
invocation_id
invocation_id,
thread_id
) VALUES (
'end',
'{{ target.dbname }}',
Expand All @@ -75,7 +78,8 @@
'{{ target.get("pass", "") }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
'{{ invocation_id }}',
'{{ thread_id }}'
)
"""

Expand All @@ -98,6 +102,7 @@ def get_ctx_vars(self, state, count, project):
"target_pass",
"run_started_at",
"invocation_id",
"thread_id",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
query = f"select {field_list} from {project.test_schema}.on_model_hook where test_state = '{state}'"
Expand Down Expand Up @@ -127,6 +132,7 @@ def check_hooks(self, state, project, host, count=1):
assert (
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
), "invocation_id was not set"
assert ctx["thread_id"].startswith("Thread-")


class TestPrePostModelHooks(BaseTestPrePost):
Expand Down Expand Up @@ -204,7 +210,8 @@ def project_config_update(self):
'{{ target.get(pass, "") }}' as target_pass,
{{ target.threads }} as target_threads,
'{{ run_started_at }}' as run_started_at,
'{{ invocation_id }}' as invocation_id
'{{ invocation_id }}' as invocation_id,
'{{ thread_id }}' as thread_id
from {{ ref('post') }}""".strip()
],
}
Expand Down
2 changes: 2 additions & 0 deletions tests/adapter/dbt/tests/adapter/hooks/test_run_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def get_ctx_vars(self, state, project):
"target_pass",
"run_started_at",
"invocation_id",
"thread_id",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
query = f"select {field_list} from {project.test_schema}.on_run_hook where test_state = '{state}'"
Expand Down Expand Up @@ -119,6 +120,7 @@ def check_hooks(self, state, project, host):
assert (
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
), "invocation_id was not set"
assert ctx["thread_id"].startswith("Thread-")

def test_pre_and_post_run_hooks(self, setUp, project, dbt_profile_target):
run_dbt(["run"])
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/context_methods/test_env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
-- runtime variables
'{{ run_started_at }}' as run_started_at,
'{{ invocation_id }}' as invocation_id,
'{{ thread_id }}' as thread_id,
'{{ env_var("DBT_TEST_ENV_VAR") }}' as env_var,
'{{ env_var("DBT_TEST_IGNORE_DEFAULT", "ignored_default_val") }}' as env_var_ignore_default,
Expand Down Expand Up @@ -114,6 +115,7 @@ def get_ctx_vars(self, project):
"target.pass",
"run_started_at",
"invocation_id",
"thread_id",
"env_var",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
Expand Down
1 change: 1 addition & 0 deletions tests/functional/context_methods/test_secret_env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_disallow_secret(self, project):
-- runtime variables
'{{ run_started_at }}' as run_started_at,
'{{ invocation_id }}' as invocation_id,
'{{ thread_id }}' as thread_id,
'{{ env_var("DBT_TEST_ENV_VAR") }}' as env_var,
'secret_variable' as env_var_secret, -- make sure the value itself is scrubbed from the logs
Expand Down

0 comments on commit 943d49c

Please sign in to comment.