From 88b4a283e15bca7c4b09e8db1d0003557ac326bd Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 2 Feb 2017 22:57:42 -0500 Subject: [PATCH 1/3] scope all tracking state to fix imported dbt runs --- dbt/main.py | 3 + dbt/runner.py | 4 +- dbt/tracking.py | 166 +++++++++++++++++++++++++++++------------------- 3 files changed, 106 insertions(+), 67 deletions(-) diff --git a/dbt/main.py b/dbt/main.py index 87271a81242..cf9fa4b5789 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -43,6 +43,8 @@ def handle(args): # correct profiles.yml file if not config.send_anonymous_usage_stats(parsed.profiles_dir): dbt.tracking.do_not_track() + else: + dbt.tracking.initialize_tracking() res = run_from_args(parsed) dbt.tracking.flush() @@ -92,6 +94,7 @@ def run_from_args(parsed): log_path = proj.get('log-path', 'logs') initialize_logger(parsed.debug, log_path) + logger.debug("Tracking: {}".format(dbt.tracking.active_user.state())) dbt.tracking.track_invocation_start(project=proj, args=parsed) try: diff --git a/dbt/runner.py b/dbt/runner.py index 5c1fa904593..940634cdfca 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -412,7 +412,7 @@ def call_table_exists(schema, table): self.context = { "run_started_at": datetime.now(), - "invocation_id": dbt.tracking.invocation_id, + "invocation_id": dbt.tracking.active_user.invocation_id, "get_columns_in_table": call_get_columns_in_table, "get_missing_columns": call_get_missing_columns, "already_exists": call_table_exists, @@ -576,7 +576,7 @@ def on_complete(run_model_results): ) dbt.tracking.track_model_run({ - "invocation_id": dbt.tracking.invocation_id, + "invocation_id": dbt.tracking.active_user.invocation_id, "index": index, "total": num_models, "execution_time": run_model_result.execution_time, diff --git a/dbt/tracking.py b/dbt/tracking.py index 8392168ea55..05de4321fbb 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -31,35 +31,55 @@ emitter = Emitter(COLLECTOR_URL, protocol=COLLECTOR_PROTOCOL, buffer_size=1) tracker = Tracker(emitter, namespace="cf", app_id="dbt") +active_user = None -def __write_user(): - user = { - "id": str(uuid.uuid4()) - } +class User(object): - cookie_dir = os.path.dirname(COOKIE_PATH) - if not os.path.exists(cookie_dir): - os.makedirs(cookie_dir) + def __init__(self): + self.do_not_track = True - with open(COOKIE_PATH, "w") as fh: - yaml.dump(user, fh) + self.id = None + self.invocation_id = None - return user + def state(self): + return "do not track" if self.do_not_track else "tracking" + def initialize(self): + self.do_not_track = False -def get_user(): - if os.path.isfile(COOKIE_PATH): - with open(COOKIE_PATH, "r") as fh: - try: - user = yaml.safe_load(fh) - if user is None: - user = __write_user() - except yaml.reader.ReaderError as e: - user = __write_user() - else: - user = __write_user() + self.invocation_id = str(uuid.uuid4()) + + cookie = self.get_cookie() + self.id = cookie.get('id') + + subject = Subject() + subject.set_user_id(self.id) + tracker.set_subject(subject) + + def set_cookie(self): + cookie_dir = os.path.dirname(COOKIE_PATH) + user = {"id": str(uuid.uuid4())} + + if not os.path.exists(cookie_dir): + os.makedirs(cookie_dir) + + with open(COOKIE_PATH, "w") as fh: + yaml.dump(user, fh) - return user + return user + + def get_cookie(self): + if not os.path.isfile(COOKIE_PATH): + user = self.set_cookie() + else: + with open(COOKIE_PATH, "r") as fh: + try: + user = yaml.safe_load(fh) + if user is None: + user = self.set_cookie() + except yaml.reader.ReaderError as e: + user = self.set_cookie() + return user def get_options(args): @@ -75,11 +95,11 @@ def get_run_type(args): return 'regular' -def get_invocation_context(invocation_id, user, project, args): +def get_invocation_context(user, project, args): return { "project_id": None if project is None else project.hashed_name(), - "user_id": user.get("id", None), - "invocation_id": invocation_id, + "user_id": user.id, + "invocation_id": user.invocation_id, "command": args.which, "options": get_options(args), @@ -89,8 +109,8 @@ def get_invocation_context(invocation_id, user, project, args): } -def get_invocation_start_context(invocation_id, user, project, args): - data = get_invocation_context(invocation_id, user, project, args) +def get_invocation_start_context(user, project, args): + data = get_invocation_context(user, project, args) start_data = { "progress": "start", @@ -102,10 +122,8 @@ def get_invocation_start_context(invocation_id, user, project, args): return SelfDescribingJson(INVOCATION_SPEC, data) -def get_invocation_end_context( - invocation_id, user, project, args, result_type, result -): - data = get_invocation_context(invocation_id, user, project, args) +def get_invocation_end_context(user, project, args, result_type, result): + data = get_invocation_context(user, project, args) start_data = { "progress": "end", @@ -117,10 +135,8 @@ def get_invocation_end_context( return SelfDescribingJson(INVOCATION_SPEC, data) -def get_invocation_invalid_context( - invocation_id, user, project, args, result_type, result -): - data = get_invocation_context(invocation_id, user, project, args) +def get_invocation_invalid_context(user, project, args, result_type, result): + data = get_invocation_context(user, project, args) start_data = { "progress": "invalid", @@ -155,20 +171,10 @@ def get_dbt_env_context(): return SelfDescribingJson(INVOCATION_ENV_SPEC, data) -invocation_id = str(uuid.uuid4()) -platform_context = get_platform_context() -env_context = get_dbt_env_context() - -user = get_user() -subject = Subject() -subject.set_user_id(user.get("id", None)) -tracker.set_subject(subject) -__is_do_not_track = False - -def track(*args, **kwargs): - if __is_do_not_track: +def track(user, *args, **kwargs): + if user.do_not_track: return else: logger.debug("Sending event: {}".format(kwargs)) @@ -181,20 +187,29 @@ def track(*args, **kwargs): def track_invocation_start(project=None, args=None): - invocation_context = get_invocation_start_context( - invocation_id, user, project, args + context = [ + get_invocation_start_context(active_user, project, args), + get_platform_context(), + get_dbt_env_context() + ] + + track( + active_user, + category="dbt", + action='invocation', + label='start', + context=context ) - context = [invocation_context, platform_context, env_context] - track(category="dbt", action='invocation', label='start', context=context) def track_model_run(options): context = [SelfDescribingJson(RUN_MODEL_SPEC, options)] model_id = options['model_id'] track( + active_user, category="dbt", action='run_model', - label=invocation_id, + label=active_user.invocation_id, context=context ) @@ -202,22 +217,39 @@ def track_model_run(options): def track_invocation_end( project=None, args=None, result_type=None, result=None ): - invocation_context = get_invocation_end_context( - invocation_id, user, project, args, result_type, result + + user = active_user + context = [ + get_invocation_end_context(user, project, args, result_type, result), + get_platform_context(), + get_dbt_env_context() + ] + track( + active_user, + category="dbt", + action='invocation', + label='end', + context=context ) - context = [invocation_context, platform_context, env_context] - track(category="dbt", action='invocation', label='end', context=context) def track_invalid_invocation( project=None, args=None, result_type=None, result=None ): - invocation_context = get_invocation_invalid_context( - invocation_id, user, project, args, result_type, result - ) - context = [invocation_context, platform_context, env_context] + + user = active_user + context = [ + get_invocation_invalid_context(user, project, args, result_type, result), + get_platform_context(), + get_dbt_env_context() + ] + track( - category="dbt", action='invocation', label='invalid', context=context + active_user, + category="dbt", + action='invocation', + label='invalid', + context=context ) @@ -225,8 +257,12 @@ def flush(): logger.debug("Flushing usage events") tracker.flush() - def do_not_track(): - global __is_do_not_track - logger.debug("Not sending anonymous usage events") - __is_do_not_track = True + global active_user + active_user = User() + +def initialize_tracking(): + global active_user + active_user = User() + active_user.initialize() + From 810124b3b68d11a1404d047b3f38bc230ce34596 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Thu, 2 Feb 2017 23:14:19 -0500 Subject: [PATCH 2/3] actually send invocation end events --- dbt/main.py | 5 ++++- dbt/tracking.py | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/dbt/main.py b/dbt/main.py index cf9fa4b5789..002aad720b1 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -97,8 +97,10 @@ def run_from_args(parsed): logger.debug("Tracking: {}".format(dbt.tracking.active_user.state())) dbt.tracking.track_invocation_start(project=proj, args=parsed) + + result = None try: - return task.run() + result = task.run() dbt.tracking.track_invocation_end( project=proj, args=parsed, result_type="ok", result=None ) @@ -113,6 +115,7 @@ def run_from_args(parsed): ) raise + return result def invoke_dbt(parsed): task = None diff --git a/dbt/tracking.py b/dbt/tracking.py index 05de4321fbb..28d41724562 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -217,7 +217,6 @@ def track_model_run(options): def track_invocation_end( project=None, args=None, result_type=None, result=None ): - user = active_user context = [ get_invocation_end_context(user, project, args, result_type, result), From 05d44726afb099c489d494f6b38f2da5e7b73d2b Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Fri, 3 Feb 2017 11:10:59 -0500 Subject: [PATCH 3/3] pep8 --- dbt/main.py | 1 + dbt/runner.py | 3 ++- dbt/tracking.py | 15 ++++++++++++--- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/dbt/main.py b/dbt/main.py index 002aad720b1..c6165c86b44 100644 --- a/dbt/main.py +++ b/dbt/main.py @@ -117,6 +117,7 @@ def run_from_args(parsed): return result + def invoke_dbt(parsed): task = None proj = None diff --git a/dbt/runner.py b/dbt/runner.py index 940634cdfca..77c31d12960 100644 --- a/dbt/runner.py +++ b/dbt/runner.py @@ -575,8 +575,9 @@ def on_complete(run_model_results): run_model_result.execution_time ) + invocation_id = dbt.tracking.active_user.invocation_id dbt.tracking.track_model_run({ - "invocation_id": dbt.tracking.active_user.invocation_id, + "invocation_id": invocation_id, "index": index, "total": num_models, "execution_time": run_model_result.execution_time, diff --git a/dbt/tracking.py b/dbt/tracking.py index 28d41724562..7e2b02be2f6 100644 --- a/dbt/tracking.py +++ b/dbt/tracking.py @@ -33,6 +33,7 @@ active_user = None + class User(object): def __init__(self): @@ -172,7 +173,6 @@ def get_dbt_env_context(): return SelfDescribingJson(INVOCATION_ENV_SPEC, data) - def track(user, *args, **kwargs): if user.do_not_track: return @@ -237,8 +237,16 @@ def track_invalid_invocation( ): user = active_user + invocation_context = get_invocation_invalid_context( + user, + project, + args, + result_type, + result + ) + context = [ - get_invocation_invalid_context(user, project, args, result_type, result), + invocation_context, get_platform_context(), get_dbt_env_context() ] @@ -256,12 +264,13 @@ def flush(): logger.debug("Flushing usage events") tracker.flush() + def do_not_track(): global active_user active_user = User() + def initialize_tracking(): global active_user active_user = User() active_user.initialize() -