-
Notifications
You must be signed in to change notification settings - Fork 102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple root pipelines or pipelines collections #57
Comments
I've added an extra runner which looks to certain lables in the pipeline when executing:
This is the runner: import sys
import os
import click
import datetime
# data_integration_run -> the "head" pipeline which is started, e.g. this runner always starts the root pipeline
# data_integration_node_run -> individual pipelines/nodes which are run
# node_path -> array with path elements from the root pipeline. E.g. "[]" is the root pipeline,
# "['operational_system']" for OS
__last_runtime_query = f'''
SELECT
max(end_time)
FROM data_integration_node_run
WHERE succeeded
and node_path = ARRAY [{"%s"}];
'''
__already_running_processes = '''
SELECT run_id, start_time, node_path
FROM data_integration_run
WHERE end_time IS NULL
-- "self healing" in case a job is not closed. But long enough to not start
-- if a legitimate run is still running
AND start_time > now() - INTERVAL '12 hours'
ORDER BY start_time DESC
LIMIT 1
'''
def __debug(msg: str):
if False or os.environ.get('DEBUG'):
print(msg)
def _should_run_once_a_day(pipeline_id: str, run_after: str, start_ts: datetime.datetime):
from mara_db.postgresql import postgres_cursor_context
# We only look at today, if a pipeline was not successfully running yesterday,
# this is ignored after midnight UTC
run_after_time = datetime.time.fromisoformat(run_after).replace(tzinfo=datetime.timezone.utc)
run_after_ts = datetime.datetime.combine(start_ts.date(), run_after_time)
# No need to check anything if are not after the run_after time
__debug(f' start_ts: {start_ts}')
__debug(f' run_after_ts: {run_after_ts}')
if start_ts < run_after_ts:
__debug(f' not yet time -> should NOT run')
return False
# We might run, but should check if we already ran
with postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(__last_runtime_query, (pipeline_id,))
last_run_ts = cursor.fetchone()[0]
__debug(f' last_run_ts: {last_run_ts}')
if last_run_ts is None:
# we've never run the pipeline
__debug(f' never run -> should run')
return True
if last_run_ts > run_after_ts:
# we already did a successful run
__debug(f' already ran today -> should NOT run')
return False
# no sucsessful run after the run_after time -> do a run
__debug(f' did not run yet -> should run')
return True
def _ensure_no_other_running_etl():
from mara_db.postgresql import postgres_cursor_context
# We might run, but should check if we already ran
with postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(__already_running_processes)
row = cursor.fetchone()
if row is None:
# no other ETL running
return True
else:
import app.slack
run_id, start_ts, node_path = row
info = f'run_id: {run_id}, start_ts: {start_ts}, node_path: {node_path}'
msg = f"Found already running ETL, aborting: ({info})"
print(msg)
app.slack.notify_slack_info(msg)
# exit with a success number, one slack message is enough
sys.exit(0)
@click.command()
@click.option('--overwrite-once-a-day', default=False, is_flag=True,
help='Include all "once_a_day" pipelines in the run')
@click.option('--overwrite-manual', default=False, is_flag=True,
help='Include all "manual" pipelines in the run')
@click.option('--overwrite-already-running-etl', default=False, is_flag=True,
help="Don't check and abort, if an ETL is already running")
def run_root_pipeline(overwrite_once_a_day: bool, overwrite_manual: bool, overwrite_already_running_etl: bool):
"""Runs configured nodes in the root pipeline"""
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines import config, pipelines
__debug(f'overwrite_once_a_day: {overwrite_once_a_day}')
__debug(f'overwrite_manual: {overwrite_manual}')
__debug(f'overwrite_already_running_etl: {overwrite_already_running_etl}')
# we always take the root pipeline here but remove nodes which only need to run once a day after
# a configured timestamp in UTCs
# A pipline can have certain labels:
# 'run_mode':'once_a_day', or 'manual', or 'always' (= default)
# 'run_after':'01:00:00' -> if 'run_mode == 'once_a_day', then run it once after this time
# time is in UTC
# We use the start of the overall run, not the start of the individual pipeline
start_ts = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
os.environ['PGAPPNAME'] = f'mara_etl_framework__{start_ts.isoformat()}'
if not overwrite_already_running_etl:
_ensure_no_other_running_etl()
root = config.root_pipeline()
# a list of nodes (= pipelines) to run selectively in the pipeline
_nodes = set()
# Check which nodes should be run
for node in root.nodes.values():
__debug(f'Node: {node.id}')
if not isinstance(node, pipelines.Pipeline):
# for now we would just run it...
__debug(f' Not pipeline -> running')
_nodes.add(node)
continue
run_mode = node.labels.get('run_mode', 'always')
if run_mode == 'always':
__debug(f' mode: always -> running')
_nodes.add(node)
continue
elif run_mode == 'once_a_day':
if overwrite_once_a_day:
__debug(f' mode: once_a_day + overwrite -> running')
_nodes.add(node)
continue
run_after = node.labels.get('run_after', '00:00:00')
if _should_run_once_a_day(str(node.id), run_after, start_ts):
__debug(f' mode: once_a_day + time -> running')
_nodes.add(node)
else:
__debug(f' mode: once_a_day + already_run_today -> NOT running')
print(f"NOT running pipeline {node.id} (already ran today).")
continue
else:
# this assumes that anything else is manual mode
if overwrite_manual:
__debug(f' mode: manual + overwrite -> running')
_nodes.add(node)
else:
__debug(f' mode: manual -> NOT running')
print(f"NOT running pipeline {node.id} (set to manual).")
continue
__debug(str([n.id for n in _nodes]))
if not run_pipeline(root, _nodes, False):
sys.exit(-1)
@click.command()
@click.option('--path', default='',
help='The parent ids of of the pipeline to run, separated by comma. Example: "pipeline-id,sub-pipeline-id".')
@click.option('--nodes',
help='IDs of sub-nodes of the pipeline to run, separated by comma. When provided, then only these nodes are run. Example: "do-this, do-that".')
@click.option('--with-upstreams', default=False, is_flag=True,
help='Also run all upstreams of --nodes within the pipeline.')
@click.option('--only-with-label', default='',
help='Only execute if a label of that name is present in the pipeline labels and the value truish.')
@click.option('--overwrite-already-running-etl', default=False, is_flag=True,
help="Don't check and abort, if an ETL is already running")
def run(path, nodes, with_upstreams: bool, only_with_label: bool, overwrite_already_running_etl: bool):
"""Runs a pipeline or a sub-set of its nodes"""
# copied from mara_pipelines/ui/cli.py with the addition of checking for already running ETLs and the
# possibility to overwrite
from mara_pipelines.ui.cli import run_pipeline
from mara_pipelines import pipelines
# --- ADDED: START ---- # compared to the upstream run command
# Make sure we can identify this
start_ts = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0)
os.environ['PGAPPNAME'] = f'mara_etl_framework__{start_ts.isoformat()}'
if not overwrite_already_running_etl:
_ensure_no_other_running_etl()
# --- ADDED: END ----
# the pipeline to run
path = path.split(',')
pipeline, found = pipelines.find_node(path)
if not found:
print(f'Pipeline {path} not found', file=sys.stderr)
sys.exit(-1)
if not isinstance(pipeline, pipelines.Pipeline):
print(f'Node {path} is not a pipeline, but a {pipeline.__class__.__name__}', file=sys.stderr)
sys.exit(-1)
# a list of nodes to run selectively in the pipeline
_nodes = set()
for id in (nodes.split(',') if nodes else []):
node = pipeline.nodes.get(id)
if not node:
print(f'Node "{id}" not found in pipeline {path}', file=sys.stderr)
sys.exit(-1)
else:
_nodes.add(node)
# --- ADDED: START ----
if only_with_label:
if with_upstreams:
print(f'Cannot handle --only-with-label together with --with-upstreams.', file=sys.stderr)
sys.exit(-1)
potential_nodes = _nodes if _nodes else pipeline.nodes.values()
_nodes = set()
for n in potential_nodes:
if isinstance(n, pipelines.Pipeline):
if n.labels.get(only_with_label, False):
_nodes.add(n)
else:
_nodes.add(n)
if not _nodes:
print(f'No nodes found with label "{only_with_label}".')
sys.exit(-1)
__debug(f'pipeline: {pipeline.id}')
__debug(f'nodes: {str([n.id for n in _nodes])}')
# --- ADDED: END ----
if not run_pipeline(pipeline, _nodes, with_upstreams, interactively_started=False):
sys.exit(-1)
# [Some cli commands omitted, like a command to ensure that the ELT db exist or something to print an env file from the current local config so one can test docker stuff] One can probably merge run and run run_root_pipeline to get all the commandline flags... If there is interest to get this functionality merged here, I can summit a PR. |
I think we should work together to make a PR for this. I build already something similar: Instead of starting the main run_pipeline method with a custom
I certainly agree that my solution - adjusting the mara_pipelines module - has its downsides and I would love to work together to bring a label-filter execution option into the master branch. I would like to get feedback here from @martin-loetzsch what he thinks about this. Maybe we should create a separate issue for that. But back to the topic: My main reason for this issue was that I would like to have a safer option in mara to manage multiple pipelines without e.g the option that someone can accidental execute a full-load and incremental load by running the root pipeline. I miss here an alternate way instead of using one big root pipeline. |
By design, there is one root pipeline where all other pipelines are added to. This might make sence when you just have one pipeline which just does one task, but I have several pipelines which I don't want to execute together.
E.g. a pipeline for a daily refresh with several incremental refreshes, a pipeline to execute a complete full load, a pipeline running at a specific time to refresh on demand specific data areas etc.
I came up with the following ideas how this could be solved:
flask mara_pipeline.ui.run --pipline ...
PipelineCollection
class. This pipeline class would have a collection of pipelines. You can't run aPipelineCollection
, but can run its sub-pipelines. This class could then be set as the root pipeline. A pipeline would then be called viaflask mara_pipeline.ui.run --path <pipeline_name>.<path within the pipeline>
Has someone other ideas? Is there maybe a common way how to solve this I am not aware of?
The text was updated successfully, but these errors were encountered: