Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bugfix] fix merge conflict that broke Hive support #3196

Merged
merged 1 commit into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,12 @@ class CeleryConfig(object):
# configuration. These blueprints will get integrated in the app
BLUEPRINTS = []

try:
# Provide a callable that receives a tracking_url and returns another
# URL. This is used to translate internal Hadoop job tracker URL
# into a proxied one
TRACKING_URL_TRANSFORMER = lambda x: x

try:
if CONFIG_PATH_ENV_VAR in os.environ:
# Explicitly import config module that is not in pythonpath; useful
# for case where app is being executed via pex.
Expand Down
35 changes: 23 additions & 12 deletions superset/db_engine_specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@

from superset.utils import SupersetTemplateException
from superset.utils import QueryStatus
from superset import utils
from superset import cache_util
from superset import conf, cache_util, utils

tracking_url_trans = conf.get('TRACKING_URL_TRANSFORMER')

Grain = namedtuple('Grain', 'name label function')

Expand Down Expand Up @@ -683,7 +684,7 @@ def adjust_database_uri(cls, uri, selected_schema=None):
@classmethod
def progress(cls, log_lines):
total_jobs = 1 # assuming there's at least 1 job
current_job = None
current_job = 1
stages = {}
for line in log_lines:
match = cls.jobs_stats_r.match(line)
Expand All @@ -692,6 +693,7 @@ def progress(cls, log_lines):
match = cls.launching_job_r.match(line)
if match:
current_job = int(match.groupdict()['job_number'])
total_jobs = int(match.groupdict()['max_jobs']) or 1
stages = {}
match = cls.stage_progress_r.match(line)
if match:
Expand All @@ -701,10 +703,9 @@ def progress(cls, log_lines):
stages[stage_number] = (map_progress + reduce_progress) / 2
logging.info(
"Progress detail: {}, "
"total jobs: {}".format(stages, total_jobs))
"current job {}, "
"total jobs: {}".format(stages, current_job, total_jobs))

if not total_jobs or not current_job:
return 0
stage_progress = sum(
stages.values()) / len(stages.values()) if stages else 0

Expand All @@ -731,18 +732,16 @@ def handle_cursor(cls, cursor, query, session):
polled = cursor.poll()
last_log_line = 0
tracking_url = None
job_id = None
while polled.operationState in unfinished_states:
query = session.query(type(query)).filter_by(id=query.id).one()
if query.status == QueryStatus.STOPPED:
cursor.cancel()
break

resp = cursor.fetch_logs()
if resp and resp.log:
log = resp.log or ''
log_lines = resp.log.splitlines()
logging.info("\n".join(log_lines[last_log_line:]))
last_log_line = len(log_lines) - 1
log = cursor.fetch_logs() or ''
if log:
log_lines = log.splitlines()
progress = cls.progress(log_lines)
logging.info("Progress total: {}".format(progress))
needs_commit = False
Expand All @@ -754,8 +753,20 @@ def handle_cursor(cls, cursor, query, session):
if tracking_url:
logging.info(
"Found the tracking url: {}".format(tracking_url))
tracking_url = tracking_url_trans(tracking_url)
logging.info(
"Transformation applied: {}".format(tracking_url))
query.tracking_url = tracking_url
job_id = tracking_url.split('/')[-2]
logging.info("Job id: {}".format(job_id))
needs_commit = True
if job_id and len(log_lines) > last_log_line:
# Wait for job id before logging things out
# this allows for prefixing all log lines and becoming
# searchable in something like Kibana
for l in log_lines[last_log_line:]:
logging.info("[{}] {}".format(job_id, l))
last_log_line = len(log_lines)
if needs_commit:
session.commit()
time.sleep(5)
Expand Down