Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scope all tracking state to fix imported dbt runs #273

Merged
merged 3 commits into from
Feb 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def handle(args):
# correct profiles.yml file
if not config.send_anonymous_usage_stats(parsed.profiles_dir):
dbt.tracking.do_not_track()
else:
dbt.tracking.initialize_tracking()

res = run_from_args(parsed)
dbt.tracking.flush()
Expand Down Expand Up @@ -92,10 +94,13 @@ def run_from_args(parsed):
log_path = proj.get('log-path', 'logs')

initialize_logger(parsed.debug, log_path)
logger.debug("Tracking: {}".format(dbt.tracking.active_user.state()))

dbt.tracking.track_invocation_start(project=proj, args=parsed)

result = None
try:
return task.run()
result = task.run()
dbt.tracking.track_invocation_end(
project=proj, args=parsed, result_type="ok", result=None
)
Expand All @@ -110,6 +115,8 @@ def run_from_args(parsed):
)
raise

return result


def invoke_dbt(parsed):
task = None
Expand Down
5 changes: 3 additions & 2 deletions dbt/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def call_table_exists(schema, table):

self.context = {
"run_started_at": datetime.now(),
"invocation_id": dbt.tracking.invocation_id,
"invocation_id": dbt.tracking.active_user.invocation_id,
"get_columns_in_table": call_get_columns_in_table,
"get_missing_columns": call_get_missing_columns,
"already_exists": call_table_exists,
Expand Down Expand Up @@ -575,8 +575,9 @@ def on_complete(run_model_results):
run_model_result.execution_time
)

invocation_id = dbt.tracking.active_user.invocation_id
dbt.tracking.track_model_run({
"invocation_id": dbt.tracking.invocation_id,
"invocation_id": invocation_id,
"index": index,
"total": num_models,
"execution_time": run_model_result.execution_time,
Expand Down
170 changes: 107 additions & 63 deletions dbt/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,56 @@
emitter = Emitter(COLLECTOR_URL, protocol=COLLECTOR_PROTOCOL, buffer_size=1)
tracker = Tracker(emitter, namespace="cf", app_id="dbt")

active_user = None

def __write_user():
user = {
"id": str(uuid.uuid4())
}

cookie_dir = os.path.dirname(COOKIE_PATH)
if not os.path.exists(cookie_dir):
os.makedirs(cookie_dir)
class User(object):

with open(COOKIE_PATH, "w") as fh:
yaml.dump(user, fh)
def __init__(self):
self.do_not_track = True

return user
self.id = None
self.invocation_id = None

def state(self):
return "do not track" if self.do_not_track else "tracking"

def get_user():
if os.path.isfile(COOKIE_PATH):
with open(COOKIE_PATH, "r") as fh:
try:
user = yaml.safe_load(fh)
if user is None:
user = __write_user()
except yaml.reader.ReaderError as e:
user = __write_user()
else:
user = __write_user()
def initialize(self):
self.do_not_track = False

self.invocation_id = str(uuid.uuid4())

cookie = self.get_cookie()
self.id = cookie.get('id')

subject = Subject()
subject.set_user_id(self.id)
tracker.set_subject(subject)

def set_cookie(self):
cookie_dir = os.path.dirname(COOKIE_PATH)
user = {"id": str(uuid.uuid4())}

if not os.path.exists(cookie_dir):
os.makedirs(cookie_dir)

return user
with open(COOKIE_PATH, "w") as fh:
yaml.dump(user, fh)

return user

def get_cookie(self):
if not os.path.isfile(COOKIE_PATH):
user = self.set_cookie()
else:
with open(COOKIE_PATH, "r") as fh:
try:
user = yaml.safe_load(fh)
if user is None:
user = self.set_cookie()
except yaml.reader.ReaderError as e:
user = self.set_cookie()
return user


def get_options(args):
Expand All @@ -75,11 +96,11 @@ def get_run_type(args):
return 'regular'


def get_invocation_context(invocation_id, user, project, args):
def get_invocation_context(user, project, args):
return {
"project_id": None if project is None else project.hashed_name(),
"user_id": user.get("id", None),
"invocation_id": invocation_id,
"user_id": user.id,
"invocation_id": user.invocation_id,

"command": args.which,
"options": get_options(args),
Expand All @@ -89,8 +110,8 @@ def get_invocation_context(invocation_id, user, project, args):
}


def get_invocation_start_context(invocation_id, user, project, args):
data = get_invocation_context(invocation_id, user, project, args)
def get_invocation_start_context(user, project, args):
data = get_invocation_context(user, project, args)

start_data = {
"progress": "start",
Expand All @@ -102,10 +123,8 @@ def get_invocation_start_context(invocation_id, user, project, args):
return SelfDescribingJson(INVOCATION_SPEC, data)


def get_invocation_end_context(
invocation_id, user, project, args, result_type, result
):
data = get_invocation_context(invocation_id, user, project, args)
def get_invocation_end_context(user, project, args, result_type, result):
data = get_invocation_context(user, project, args)

start_data = {
"progress": "end",
Expand All @@ -117,10 +136,8 @@ def get_invocation_end_context(
return SelfDescribingJson(INVOCATION_SPEC, data)


def get_invocation_invalid_context(
invocation_id, user, project, args, result_type, result
):
data = get_invocation_context(invocation_id, user, project, args)
def get_invocation_invalid_context(user, project, args, result_type, result):
data = get_invocation_context(user, project, args)

start_data = {
"progress": "invalid",
Expand Down Expand Up @@ -155,20 +172,9 @@ def get_dbt_env_context():

return SelfDescribingJson(INVOCATION_ENV_SPEC, data)

invocation_id = str(uuid.uuid4())
platform_context = get_platform_context()
env_context = get_dbt_env_context()

user = get_user()
subject = Subject()
subject.set_user_id(user.get("id", None))
tracker.set_subject(subject)

__is_do_not_track = False


def track(*args, **kwargs):
if __is_do_not_track:
def track(user, *args, **kwargs):
if user.do_not_track:
return
else:
logger.debug("Sending event: {}".format(kwargs))
Expand All @@ -181,43 +187,76 @@ def track(*args, **kwargs):


def track_invocation_start(project=None, args=None):
invocation_context = get_invocation_start_context(
invocation_id, user, project, args
context = [
get_invocation_start_context(active_user, project, args),
get_platform_context(),
get_dbt_env_context()
]

track(
active_user,
category="dbt",
action='invocation',
label='start',
context=context
)
context = [invocation_context, platform_context, env_context]
track(category="dbt", action='invocation', label='start', context=context)


def track_model_run(options):
context = [SelfDescribingJson(RUN_MODEL_SPEC, options)]
model_id = options['model_id']
track(
active_user,
category="dbt",
action='run_model',
label=invocation_id,
label=active_user.invocation_id,
context=context
)


def track_invocation_end(
project=None, args=None, result_type=None, result=None
):
invocation_context = get_invocation_end_context(
invocation_id, user, project, args, result_type, result
user = active_user
context = [
get_invocation_end_context(user, project, args, result_type, result),
get_platform_context(),
get_dbt_env_context()
]
track(
active_user,
category="dbt",
action='invocation',
label='end',
context=context
)
context = [invocation_context, platform_context, env_context]
track(category="dbt", action='invocation', label='end', context=context)


def track_invalid_invocation(
project=None, args=None, result_type=None, result=None
):

user = active_user
invocation_context = get_invocation_invalid_context(
invocation_id, user, project, args, result_type, result
user,
project,
args,
result_type,
result
)
context = [invocation_context, platform_context, env_context]

context = [
invocation_context,
get_platform_context(),
get_dbt_env_context()
]

track(
category="dbt", action='invocation', label='invalid', context=context
active_user,
category="dbt",
action='invocation',
label='invalid',
context=context
)


Expand All @@ -227,6 +266,11 @@ def flush():


def do_not_track():
global __is_do_not_track
logger.debug("Not sending anonymous usage events")
__is_do_not_track = True
global active_user
active_user = User()


def initialize_tracking():
global active_user
active_user = User()
active_user.initialize()