Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring utils to be more sane #1219

Merged
merged 1 commit into from
Mar 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow.models import DAG
from flask_admin import BaseView
from importlib import import_module
from airflow.utils import AirflowException
from airflow.exceptions import AirflowException

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
if DAGS_FOLDER not in sys.path:
Expand Down
22 changes: 13 additions & 9 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import json

import airflow
from airflow import jobs, settings, utils
from airflow import jobs, settings
from airflow import configuration as conf
from airflow.executors import DEFAULT_EXECUTOR
from airflow.models import DagModel, DagBag, TaskInstance, DagPickle, DagRun
from airflow.utils import AirflowException, State
from airflow.utils import db as db_utils
from airflow.utils import logging as logging_utils
from airflow.utils.state import State
from airflow.exceptions import AirflowException

DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))

Expand Down Expand Up @@ -78,7 +81,8 @@ def backfill(args, dag=None):
mark_success=args.mark_success,
include_adhoc=args.include_adhoc,
local=args.local,
donot_pickle=(args.donot_pickle or conf.getboolean('core', 'donot_pickle')),
donot_pickle=(args.donot_pickle or
conf.getboolean('core', 'donot_pickle')),
ignore_dependencies=args.ignore_dependencies,
pool=args.pool)

Expand Down Expand Up @@ -133,7 +137,7 @@ def set_is_paused(is_paused, args, dag=None):

def run(args, dag=None):

utils.pessimistic_connection_handling()
db_utils.pessimistic_connection_handling()
if dag:
args.dag_id = dag.dag_id

Expand Down Expand Up @@ -236,10 +240,10 @@ def run(args, dag=None):
remote_log_location = filename.replace(log_base, remote_base)
# S3
if remote_base.startswith('s3:/'):
utils.S3Log().write(log, remote_log_location)
logging_utils.S3Log().write(log, remote_log_location)
# GCS
elif remote_base.startswith('gs:/'):
utils.GCSLog().write(
logging_utils.GCSLog().write(
log,
remote_log_location,
append=True)
Expand Down Expand Up @@ -401,7 +405,7 @@ def worker(args):

def initdb(args): # noqa
print("DB: " + repr(settings.engine.url))
utils.initdb()
db_utils.initdb()
print("Done.")


Expand All @@ -412,14 +416,14 @@ def resetdb(args):
"Proceed? (y/n)").upper() == "Y":
logging.basicConfig(level=settings.LOGGING_LEVEL,
format=settings.SIMPLE_LOG_FORMAT)
utils.resetdb()
db_utils.resetdb()
else:
print("Bail.")


def upgradedb(args): # noqa
print("DB: " + repr(settings.engine.url))
utils.upgradedb()
db_utils.upgradedb()


def version(args): # noqa
Expand Down
3 changes: 3 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from collections import OrderedDict
from configparser import ConfigParser


class AirflowConfigException(Exception):
pass

Expand Down Expand Up @@ -612,6 +613,7 @@ def test_mode():
def get(section, key, **kwargs):
return conf.get(section, key, **kwargs)


def getboolean(section, key):
return conf.getboolean(section, key)

Expand Down Expand Up @@ -644,5 +646,6 @@ def set(section, option, value): # noqa
########################
# convenience method to access config entries


def get_dags_folder():
return os.path.expanduser(get('core', 'DAGS_FOLDER'))
4 changes: 2 additions & 2 deletions airflow/contrib/executors/mesos_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
from airflow.settings import Session
from airflow.utils import State
from airflow.utils import AirflowException
from airflow.utils.state import State
from airflow.exceptions import AirflowException


DEFAULT_FRAMEWORK_NAME = 'Airflow'
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Imports the hooks dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_hooks = {
'ftp_hook': ['FTPHook'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/gc_base_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from airflow.hooks.base_hook import BaseHook
from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from oauth2client.client import SignedJwtAssertionCredentials, GoogleCredentials

class GoogleCloudBaseHook(BaseHook):
Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/hooks/qubole_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import logging

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow import configuration

Expand Down Expand Up @@ -151,4 +151,4 @@ def create_cmd_args(self):
else:
args += inplace_args.split(' ')

return args
return args
2 changes: 1 addition & 1 deletion airflow/contrib/hooks/ssh_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from contextlib import contextmanager

from airflow.hooks.base_hook import BaseHook
from airflow import AirflowException
from airflow.exceptions import AirflowException

import logging

Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Imports the operators dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils import import_module_attrs as _import_module_attrs
from airflow.utils.helpers import import_module_attrs as _import_module_attrs

_operators = {
'ssh_execute_operator': ['SSHExecuteOperator'],
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_check_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.operators import CheckOperator, ValueCheckOperator, IntervalCheckOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults


class BigQueryCheckOperator(CheckOperator):
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class BigQueryOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class BigQueryToBigQueryOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/bigquery_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class BigQueryToCloudStorageOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/gcs_download_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class GoogleCloudStorageDownloadOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/gcs_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class GoogleCloudStorageToBigQueryOperator(BaseOperator):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/mysql_to_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.hooks import MySqlHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults
from collections import OrderedDict
from datetime import date, datetime
from decimal import Decimal
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/qubole_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks import QuboleHook


Expand Down
4 changes: 2 additions & 2 deletions airflow/contrib/operators/ssh_execute_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from subprocess import STDOUT

from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils import AirflowException
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException


class SSHTempFileContent():
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/vertica_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from airflow.contrib.hooks import VerticaHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults


class VerticaOperator(BaseOperator):
Expand Down
2 changes: 1 addition & 1 deletion airflow/contrib/operators/vertica_to_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.hooks import HiveCliHook
from airflow.contrib.hooks import VerticaHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
from airflow.utils.decorators import apply_defaults

class VerticaToHiveTransfer(BaseOperator):
"""
Expand Down
6 changes: 3 additions & 3 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from airflow.operators import ShortCircuitOperator, DummyOperator
from airflow.models import DAG
import airflow.utils
import airflow.utils.helpers
from datetime import datetime, timedelta

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
Expand All @@ -21,5 +21,5 @@
ds_true = [DummyOperator(task_id='true_' + str(i), dag=dag) for i in [1, 2]]
ds_false = [DummyOperator(task_id='false_' + str(i), dag=dag) for i in [1, 2]]

airflow.utils.chain(cond_true, *ds_true)
airflow.utils.chain(cond_false, *ds_false)
airflow.utils.helpers.chain(cond_true, *ds_true)
airflow.utils.helpers.chain(cond_false, *ds_false)
6 changes: 4 additions & 2 deletions airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
Expand All @@ -14,6 +15,7 @@
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""

from airflow import DAG
from airflow.operators import TriggerDagRunOperator
from datetime import datetime
Expand All @@ -35,8 +37,8 @@ def conditionally_trigger(context, dag_run_obj):

# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
default_args={"owner" : "me",
"start_date":datetime.now()},
default_args={"owner": "me",
"start_date": datetime.now()},
schedule_interval='@once')


Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@


def run_this_func(ds, **kwargs):
print( "Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))

run_this = PythonOperator(
task_id='run_this',
Expand Down
10 changes: 10 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class AirflowException(Exception):
pass


class AirflowSensorTimeout(Exception):
pass


class AirflowTaskTimeout(Exception):
pass
2 changes: 1 addition & 1 deletion airflow/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
except:
pass

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException

_EXECUTOR = configuration.get('core', 'EXECUTOR')

Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from builtins import range

from airflow import configuration
from airflow.utils import State, LoggingMixin
from airflow.utils.state import State
from airflow.utils.logging import LoggingMixin

PARALLELISM = configuration.getint('core', 'PARALLELISM')

Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from celery import Celery
from celery import states as celery_states

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow import configuration

Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from airflow import configuration
from airflow.executors.base_executor import BaseExecutor
from airflow.utils import State, LoggingMixin
from airflow.utils.state import State
from airflow.utils.logging import LoggingMixin

PARALLELISM = configuration.get('core', 'PARALLELISM')

Expand Down
3 changes: 1 addition & 2 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from builtins import str
import logging
import subprocess

from airflow.executors.base_executor import BaseExecutor
from airflow.utils import State
from airflow.utils.state import State


class SequentialExecutor(BaseExecutor):
Expand Down
2 changes: 1 addition & 1 deletion airflow/hooks/S3_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
boto.set_stream_logger('boto')
logging.getLogger("boto").setLevel(logging.INFO)

from airflow.utils import AirflowException
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


Expand Down
3 changes: 2 additions & 1 deletion airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Imports the hooks dynamically while keeping the package API clean,
# abstracting the underlying modules
from airflow.utils import import_module_attrs as _import_module_attrs

from airflow.utils.helpers import import_module_attrs as _import_module_attrs
from airflow.hooks.base_hook import BaseHook # noqa to expose in package

_hooks = {
Expand Down
Loading