Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Upgrade with Cherry-Picks [only] from 1.10.4+twtr #64

Merged
merged 32 commits into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ed564bd
EWT-569 : Initial Commit for migrations
Dec 14, 2020
587b99c
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 76fe7ac595ce4b91f6…
vshshjn7 Sep 11, 2019
8fafd51
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00b04d694f8f34…
msumit Nov 19, 2020
816f7c9
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00b04d694f8f34…
vshshjn7 Oct 17, 2019
2bb4059
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00b04d694f8f34…
msumit Nov 19, 2020
458d299
CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (#16)
vshshjn7 Oct 17, 2019
2a5f70d
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6f…
msumit Nov 19, 2020
78dafd6
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 4ce8d4cfb24e3c9e50…
vshshjn7 Mar 9, 2020
c404b13
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 299b4d883daf44e175…
vshshjn7 Mar 14, 2020
cadbf66
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb8…
vshshjn7 Apr 9, 2020
234156f
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 8a689af000405f7fe6…
abhishekbafna Apr 9, 2020
6ed530d
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 5875a150d1c4a5d51f…
vshshjn7 Apr 13, 2020
119c428
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick a68e2b3653d7f75007…
aoen Apr 14, 2020
f5350ea
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c24…
msumit May 4, 2020
c54061a
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a073…
abhishekbafna May 6, 2020
1698a7a
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c881…
vshshjn7 May 13, 2020
bd62ed0
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a718…
msumit Jun 18, 2020
7c053df
[EWT-361] Fix broken regex pattern for extracting dataflow job id (#51)
rajatsri28 Jun 18, 2020
93e2678
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977f…
abhishekbafna Jul 6, 2020
af5a6be
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f2…
vshshjn7 Sep 3, 2020
3849402
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb41…
msumit Sep 7, 2020
27f136b
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd6…
msumit Sep 21, 2020
565b6e2
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 61624027…
msumit Sep 24, 2020
4ea1c3c
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 19914193…
msumit Oct 15, 2020
840dc0c
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f91…
msumit Oct 20, 2020
2fb2c2a
[EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e9…
msumit Nov 19, 2020
3f589ac
[EWT-569] Airflow Upgrade to 1.10.14 from 1.10.4+twtr : Removing cont…
Jan 5, 2021
ddd9d09
[EWT-569] Airflow Upgrade to 1.10.14 from 1.10.4+twtr : Adding back c…
Jan 5, 2021
cf95e34
[EWT-569] Airflow Upgrade to 1.10.14 from 1.10.4+twtr : Resolving com…
Jan 7, 2021
502007c
[EWT-569] Airflow Upgrade to 1.10.14 from 1.10.4+twtr : Resolving PR …
Jan 7, 2021
8f2290e
[EWT-569] Airflow Upgrade to 1.10.14 from 1.10.4+twtr : Airflow Versi…
Jan 7, 2021
34c6696
[EWT-569] Airflow Upgrade to 1.10.14 from 1.10.4+twtr : Resolving PR…
Jan 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .arcconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
ayushSethi22 marked this conversation as resolved.
Show resolved Hide resolved
"arc.feature.start.default": "origin/twtr_rb_1.10.0",
"arc.land.onto.default": "twtr_rb_1.10.0",
"base": "git:merge-base(origin/twtr_rb_1.10.0), arc:amended, arc:prompt",
"history.immutable": false
}
20 changes: 20 additions & 0 deletions README_TWITTER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Developing locally

Here are some steps to develop this dependency locally and interact with source, interpreted from
https://confluence.twitter.biz/display/ENG/Overview%3A+Python+3rdparty+in+Source

1. Create a git branch for this change.
2. Edit `airflow/version.py` to change the version.
3. Edit `source/3rdparty/python/BUILD` with the corresponding version.
4. Run the command `python3.7 setup.py bdist_wheel` in the `airflow` directory to build the wheel.
It will be written to `airflow/dist`.
5. Clean out the pex cache: `rm -rf ~/.pex ~/.cache/pants`.
6. Run `ps aux | grep pantsd` to find the pid of the pantsd process.
7. Run `kill $pid` where `$pid` is the the pid just observed.
8. From the `source` directory, run `./pants clean-all`.
9. Now here are the hacky parts. The `run-local.sh` and `run-aurora.sh` all run pants commands
without the option `--python-repos-repos`. You can either edit these to include this option,
or run a pants command that includes it, which will cache the local artifact you need, e.g.
`./pants test airflow:: --python-repos-repos="['file:///path/to/airflow/dist/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/wheels/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/bootstrap/','https://science-binaries.local.twitter.com/home/third_party/source/python/sources/']"`
10. Now you can start up airflow instances as usual with the newly built wheel!
11. See the above link for `Adding Dependencies to science-libraries`.
14 changes: 14 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
This file documents any backwards-incompatible changes in Airflow and
assists users migrating to a new version.


<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of contents**
Expand Down Expand Up @@ -196,6 +197,19 @@ Users can now offer a path to a yaml for the KubernetesPodOperator using the `po

Now use NULL as default value for dag.description in dag table

## CP

### Ability to patch Pool.DEFAULT_POOL_NAME in BaseOperator
It was not possible to patch pool in BaseOperator as the signature sets the default value of pool
as Pool.DEFAULT_POOL_NAME.
While using subdagoperator in unittest(without initializing the sqlite db), it was throwing the
following error:
```
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: slot_pool.
```
Fix for this, https://github.com/apache/airflow/pull/8587


### Restrict editing DagRun State in the old UI (Flask-admin based UI)

Before 1.10.11 it was possible to edit DagRun State in the `/admin/dagrun/` page
Expand Down
3 changes: 2 additions & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,10 @@ def read_store_serialized_dags():
from airflow.configuration import conf
return conf.getboolean('core', 'store_serialized_dags')
dagbag = DagBag(
dag_folder=dag_model.fileloc,
dag_folder=dag_model.get_local_fileloc(),
store_serialized_dags=read_store_serialized_dags()
)

dag_run = DagRun()
triggers = _trigger_dag(
dag_id=dag_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -1099,4 +1099,4 @@ fs_group =
# The Key-value pairs to be given to worker pods.
# The worker pods will be given these static labels, as well as some additional dynamic labels
# to identify the task.
# Should be supplied in the format: ``key = value``
# Should be supplied in the format: key = value
24 changes: 15 additions & 9 deletions airflow/contrib/hooks/gcp_dataflow_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
DEFAULT_DATAFLOW_LOCATION = 'us-central1'


JOB_ID_PATTERN = re.compile(
r'Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]'
)


class _DataflowJob(LoggingMixin):
def __init__(self, dataflow, project_number, name, location, poll_sleep=10,
job_id=None, num_retries=None):
Expand Down Expand Up @@ -128,24 +133,25 @@ def __init__(self, cmd):

def _line(self, fd):
if fd == self._proc.stderr.fileno():
line = b''.join(self._proc.stderr.readlines())
line = self._proc.stderr.readline().decode()
if line:
self.log.warning(line[:-1])
self.log.warning(line.rstrip("\n"))
return line
if fd == self._proc.stdout.fileno():
line = b''.join(self._proc.stdout.readlines())
line = self._proc.stdout.readline().decode()
if line:
self.log.info(line[:-1])
self.log.info(line.rstrip("\n"))
return line

raise Exception("No data in stderr or in stdout.")

@staticmethod
def _extract_job(line):
# Job id info: https://goo.gl/SE29y9.
job_id_pattern = re.compile(
br'.*console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
matched_job = job_id_pattern.search(line or '')
# [EWT-361] : Fixes out of date regex to extract job id
matched_job = JOB_ID_PATTERN.search(line or '')
if matched_job:
return matched_job.group(1).decode()
return matched_job.group('job_id_java') or matched_job.group('job_id_python')

def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
Expand Down Expand Up @@ -236,7 +242,7 @@ def start_python_dataflow(self, job_name, variables, dataflow, py_options,
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
self._start_dataflow(variables, name, ["python3"] + py_options + [dataflow],
label_formatter)

@staticmethod
Expand Down
7 changes: 6 additions & 1 deletion airflow/contrib/kubernetes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ def __init__(
security_context=None,
configmaps=None,
pod_runtime_info_envs=None,
dnspolicy=None
dnspolicy=None,
priority_class=None,
lifecycle=None
):
warnings.warn(
"Using `airflow.contrib.kubernetes.pod.Pod` is deprecated. Please use `k8s.V1Pod`.",
Expand Down Expand Up @@ -130,6 +132,9 @@ def __init__(
self.configmaps = configmaps or []
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
self.priority_class = priority_class
self.lifecycle = lifecycle or {}


def to_v1_kubernetes_pod(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None):
self.log.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
else:
self.log.info("could not queue task %s", key)
self.log.error("could not queue task %s", key)

def queue_task_instance(
self,
Expand Down
8 changes: 8 additions & 0 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ def start(self):
self._sync_parallelism
)

def queue_command(self, simple_task_instance, command, priority=1, queue=None):
key = simple_task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
else:
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)

def _num_tasks_per_send_process(self, to_send_count):
"""
How many Celery tasks should each worker process send.
Expand Down
59 changes: 57 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ def _run_file_processor(result_channel,
stdout = StreamLogWriter(log, logging.INFO)
stderr = StreamLogWriter(log, logging.WARN)

log.info("Setting log context for file {}".format(file_path))
# log file created here
set_context(log, file_path)
log.info("Successfully set log context for file {}".format(file_path))
setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))

try:
Expand All @@ -154,6 +157,7 @@ def _run_file_processor(result_channel,
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_ids, log=log)
log.info("Processing file {}".format(file_path))
result = scheduler_job.process_file(file_path,
zombies,
pickle_dags)
Expand Down Expand Up @@ -1030,7 +1034,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):

if self.executor.has_task(task_instance):
self.log.debug(
"Not handling task %s as the executor reports it is running",
"Still handling task %s even though as the executor reports it is running",
task_instance.key
)
num_tasks_in_executor += 1
Expand Down Expand Up @@ -1157,6 +1161,11 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
# actually enqueue them
for simple_task_instance in simple_task_instances:
simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)

path = simple_dag.full_filepath
if path.startswith(settings.DAGS_FOLDER):
path = path.replace(settings.DAGS_FOLDER, "DAGS_FOLDER", 1)

command = TI.generate_command(
simple_task_instance.dag_id,
simple_task_instance.task_id,
Expand All @@ -1168,7 +1177,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
ignore_task_deps=False,
ignore_ti_state=False,
pool=simple_task_instance.pool,
file_path=simple_dag.full_filepath,
file_path=path,
pickle_id=simple_dag.pickle_id)

priority = simple_task_instance.priority_weight
Expand Down Expand Up @@ -1449,6 +1458,50 @@ def _execute_helper(self):

# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
if len(simple_dags) > 0:
try:
simple_dag_bag = SimpleDagBag(simple_dags)

# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states

# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.UP_FOR_RETRY],
State.FAILED)
# If a task instance is scheduled or queued or up for reschedule,
# but the corresponding DAG run isn't running, set the state to
# NONE so we don't try to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.QUEUED,
State.SCHEDULED,
State.UP_FOR_RESCHEDULE],
State.NONE)

scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids)
self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids))

# TODO(CX-17516): State.QUEUED has been added here which is a hack as the Celery
# Executor does not reliably enqueue tasks with the my MySQL broker, and we have
# seen tasks hang after they get queued. The effect of this hack is queued tasks
# will constantly be requeued and resent to the executor (Celery).
# This should be removed when we switch away from the MySQL Celery backend.
self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED, State.QUEUED))

except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
continue

# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()

self._change_state_for_tasks_failed_to_execute()

if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
continue
Expand Down Expand Up @@ -1485,7 +1538,9 @@ def _execute_helper(self):
sleep(sleep_length)

# Stop any processors
self.log.info("Terminating DAG processors")
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
Expand Down
5 changes: 5 additions & 0 deletions airflow/kubernetes/worker_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ def _get_environment(self):
self.kube_config.git_subpath # dags
)
env['AIRFLOW__CORE__DAGS_FOLDER'] = dag_volume_mount_path
# TODO This change can be submitted into the apache as well.
# Set the scheduler env into the worker pod.
os_env = os.environ
os_env.update(env)
env = os_env
return env

def _get_configmaps(self):
Expand Down
Loading