Skip to content

Commit

Permalink
Merge pull request #18 from epoch8/fix-16
Browse files Browse the repository at this point in the history
Fix #16
  • Loading branch information
elephantum authored Oct 15, 2018
2 parents f0efd3a + 21dde74 commit 0b4a65f
Showing 1 changed file with 45 additions and 25 deletions.
70 changes: 45 additions & 25 deletions prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,51 @@
from prometheus_client import generate_latest, REGISTRY
from prometheus_client.core import GaugeMetricFamily

from contextlib import contextmanager


@contextmanager
def session_scope(session):
"""
Provide a transactional scope around a series of operations.
"""
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()


def get_dag_state_info():
'''get dag info
:return dag_info
'''
dag_status_query = Session.query(
DagStat.dag_id, DagStat.state, DagStat.count
).group_by(DagStat.dag_id, DagStat.state).subquery()
return Session.query(
DagStat.dag_id, DagStat.state, DagStat.count,
DagModel.owners
).join(DagModel, DagModel.dag_id == DagStat.dag_id).all()
with session_scope(Session) as session:
dag_status_query = session.query(
DagStat.dag_id, DagStat.state, DagStat.count
).group_by(DagStat.dag_id, DagStat.state).subquery()
return session.query(
DagStat.dag_id, DagStat.state, DagStat.count,
DagModel.owners
).join(DagModel, DagModel.dag_id == DagStat.dag_id).all()


def get_task_state_info():
'''get task info
:return task_info
'''
task_status_query = Session.query(
TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.state, func.count(TaskInstance.dag_id).label('value')
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery()
return Session.query(
task_status_query.c.dag_id, task_status_query.c.task_id,
task_status_query.c.state, task_status_query.c.value, DagModel.owners
).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).all()
with session_scope(Session) as session:
task_status_query = session.query(
TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.state, func.count(TaskInstance.dag_id).label('value')
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery()
return session.query(
task_status_query.c.dag_id, task_status_query.c.task_id,
task_status_query.c.state, task_status_query.c.value, DagModel.owners
).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).all()


def get_dag_duration_info():
Expand All @@ -46,16 +65,17 @@ def get_dag_duration_info():
'''
duration = func.sum(func.now() - DagRun.start_date)

return Session.query(
DagRun.dag_id,
DagRun.run_id,
duration.label('duration')
).group_by(
DagRun.dag_id,
DagRun.run_id
).filter(
DagRun.state == State.RUNNING
).all()
with session_scope(Session) as session:
return session.query(
DagRun.dag_id,
DagRun.run_id,
duration.label('duration')
).group_by(
DagRun.dag_id,
DagRun.run_id
).filter(
DagRun.state == State.RUNNING
).all()


class MetricsCollector(object):
Expand Down

0 comments on commit 0b4a65f

Please sign in to comment.