diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 0058d3a3f99609..ec0fbecbed046f 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -4,6 +4,7 @@ import os import subprocess import textwrap +import warnings from datetime import datetime from builtins import input @@ -137,21 +138,14 @@ def run(args): utils.pessimistic_connection_handling() # Setting up logging - log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) - directory = log + "/{args.dag_id}/{args.task_id}".format(args=args) + log_base = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER')) + directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args) if not os.path.exists(directory): os.makedirs(directory) args.execution_date = dateutil.parser.parse(args.execution_date) iso = args.execution_date.isoformat() filename = "{directory}/{iso}".format(**locals()) - # store old log (to help with S3 appends) - if os.path.exists(filename): - with open(filename, 'r') as logfile: - old_log = logfile.read() - else: - old_log = None - subdir = process_subdir(args.subdir) logging.root.handlers = [] logging.basicConfig( @@ -233,34 +227,39 @@ def run(args): executor.heartbeat() executor.end() - if configuration.get('core', 'S3_LOG_FOLDER').startswith('s3:'): - import boto - s3_log = filename.replace(log, configuration.get('core', 'S3_LOG_FOLDER')) - bucket, key = s3_log.lstrip('s3:/').split('/', 1) - if os.path.exists(filename): + # store logs remotely + remote_base = configuration.get('core', 'REMOTE_BASE_LOG_FOLDER') - # get logs - with open(filename, 'r') as logfile: - new_log = logfile.read() + # deprecated as of March 2016 + if not remote_base and configuration.get('core', 'S3_LOG_FOLDER'): + warnings.warn( + 'The S3_LOG_FOLDER configuration key has been replaced by ' + 'REMOTE_BASE_LOG_FOLDER. Your configuration still works but please ' + 'update airflow.cfg to ensure future compatibility.', + DeprecationWarning) + remote_base = configuration.get('core', 'S3_LOG_FOLDER') - # remove old logs (since they are already in S3) - if old_log: - new_log.replace(old_log, '') + if os.path.exists(filename): + # read log and remove old logs to get just the latest additions - try: - s3 = boto.connect_s3() - s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key) - - # append new logs to old S3 logs, if available - if s3_key.exists(): - old_s3_log = s3_key.get_contents_as_string().decode() - new_log = old_s3_log + '\n' + new_log - - # send log to S3 - encrypt = configuration.get('core', 'ENCRYPT_S3_LOGS') - s3_key.set_contents_from_string(new_log, encrypt_key=encrypt) - except: - print('Could not send logs to S3.') + with open(filename, 'r') as logfile: + log = logfile.read() + + remote_log_location = filename.replace(log_base, remote_base) + # S3 + + if remote_base.startswith('s3:/'): + utils.S3Log().write(log, remote_log_location) + # GCS + elif remote_base.startswith('gs:/'): + utils.GCSLog().write( + log, + remote_log_location, + append=True) + # Other + elif remote_base: + logging.error( + 'Unsupported remote log location: {}'.format(remote_base)) def task_state(args): diff --git a/airflow/configuration.py b/airflow/configuration.py index 3f565dfc3704a5..d2d4beb06d6164 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -4,8 +4,8 @@ from __future__ import unicode_literals from future import standard_library - standard_library.install_aliases() + from builtins import str from configparser import ConfigParser import errno @@ -70,8 +70,10 @@ class AirflowConfigException(Exception): 'plugins_folder': None, 'security': None, 'donot_pickle': False, - 's3_log_folder': '', + 'remote_base_log_folder': '', + 'remote_log_conn_id': '', 'encrypt_s3_logs': False, + 's3_log_folder': '', # deprecated! 'dag_concurrency': 16, 'max_active_runs_per_dag': 16, 'executor': 'SequentialExecutor', @@ -137,9 +139,17 @@ class AirflowConfigException(Exception): # The folder where airflow should store its log files. This location base_log_folder = {AIRFLOW_HOME}/logs -# An S3 location can be provided for log backups -# For S3, use the full URL to the base folder (starting with "s3://...") -s3_log_folder = None + +# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users +# must supply a remote location URL (starting with either 's3://...' or +# 'gs://...') and an Airflow connection id that provides access to the storage +# location. +remote_base_log_folder = None +remote_log_conn_id = None +# Use server-side encryption for logs stored in S3 +encrypt_s3_logs = False +# deprecated option for remote log storage, use remote_base_log_folder instead! +# s3_log_folder = None # The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index dd9f67592648c5..00b6a0cdcacdd1 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -343,7 +343,8 @@ def load_file( def load_string(self, string_data, key, bucket_name=None, - replace=False): + replace=False, + encrypt=False): """ Loads a local file to S3 @@ -371,6 +372,7 @@ def load_string(self, string_data, if not key_obj: key_obj = bucket.new_key(key_name=key) key_size = key_obj.set_contents_from_string(string_data, - replace=replace) + replace=replace, + encrypt_key=encrypt) logging.info("The key {key} now contains" " {key_size} bytes".format(**locals())) diff --git a/airflow/utils.py b/airflow/utils.py index 00d7a0863ba290..2c0ad23d0a1577 100644 --- a/airflow/utils.py +++ b/airflow/utils.py @@ -795,3 +795,177 @@ def logger(self): self._logger = logging.root.getChild(self.__class__.__module__ + '.' +self.__class__.__name__) return self._logger + +class S3Log(object): + """ + Utility class for reading and writing logs in S3. + Requires airflow[s3] and setting the REMOTE_BASE_LOG_FOLDER and + REMOTE_LOG_CONN_ID configuration options in airflow.cfg. + """ + def __init__(self): + remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') + try: + from airflow.hooks import S3Hook + self.hook = S3Hook(remote_conn_id) + except: + self.hook = None + logging.error( + 'Could not create an S3Hook with connection id "{}". ' + 'Please make sure that airflow[s3] is installed and ' + 'the S3 connection exists.'.format(remote_conn_id)) + + def read(self, remote_log_location, return_error=False): + """ + Returns the log found at the remote_log_location. Returns '' if no + logs are found or there is an error. + + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param return_error: if True, returns a string error message if an + error occurs. Otherwise returns '' when an error occurs. + :type return_error: bool + """ + if self.hook: + try: + s3_key = self.hook.get_key(remote_log_location) + if s3_key: + return s3_key.get_contents_as_string().decode() + except: + pass + + # raise/return error if we get here + err = 'Could not read logs from {}'.format(remote_log_location) + logging.error(err) + return err if return_error else '' + + + def write(self, log, remote_log_location, append=False): + """ + Writes the log to the remote_log_location. Fails silently if no hook + was created. + + :param log: the log to write to the remote_log_location + :type log: string + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param append: if False, any existing log file is overwritten. If True, + the new log is appended to any existing logs. + :type append: bool + + """ + if self.hook: + + if append: + old_log = self.read(remote_log_location) + log = old_log + '\n' + log + try: + self.hook.load_string( + log, + key=remote_log_location, + replace=True, + encrypt=configuration.get('core', 'ENCRYPT_S3_LOGS')) + return + except: + pass + + # raise/return error if we get here + logging.error('Could not write logs to {}'.format(remote_log_location)) + + +class GCSLog(object): + """ + Utility class for reading and writing logs in GCS. + Requires either airflow[gcloud] or airflow[gcp_api] and + setting the REMOTE_BASE_LOG_FOLDER and REMOTE_LOG_CONN_ID configuration + options in airflow.cfg. + """ + def __init__(self): + """ + Attempt to create hook with airflow[gcloud] (and set + use_gcloud = True), otherwise uses airflow[gcp_api] + """ + remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID') + self.use_gcloud = False + + try: + from airflow.contrib.hooks import GCSHook + self.hook = GCSHook(remote_conn_id) + self.use_gcloud = True + except: + try: + from airflow.contrib.hooks import GoogleCloudStorageHook + self.hook = GoogleCloudStorageHook(remote_conn_id) + except: + self.hook = None + logging.error( + 'Could not create a GCSHook with connection id "{}". ' + 'Please make sure that either airflow[gcloud] or ' + 'airflow[gcp_api] is installed and the GCS connection ' + 'exists.'.format(remote_conn_id)) + + def read(self, remote_log_location, return_error=True): + """ + Returns the log found at the remote_log_location. + + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param return_error: if True, returns a string error message if an + error occurs. Otherwise returns '' when an error occurs. + :type return_error: bool + """ + if self.hook: + try: + if self.use_gcloud: + gcs_blob = self.hook.get_blob(remote_log_location) + if gcs_blob: + return gcs_blob.download_as_string().decode() + else: + bkt, blob = remote_log_location.lstrip('gs:/').split('/', 1) + return self.hook.download(bkt, blob).decode() + except: + pass + + # raise/return error if we get here + err = 'Could not read logs from {}'.format(remote_log_location) + logging.error(err) + return err if return_error else '' + + def write(self, log, remote_log_location, append=False): + """ + Writes the log to the remote_log_location. Fails silently if no hook + was created. + + :param log: the log to write to the remote_log_location + :type log: string + :param remote_log_location: the log's location in remote storage + :type remote_log_location: string (path) + :param append: if False, any existing log file is overwritten. If True, + the new log is appended to any existing logs. + :type append: bool + + """ + if self.hook: + + if append: + old_log = self.read(remote_log_location) + log = old_log + '\n' + log + + try: + if self.use_gcloud: + self.hook.upload_from_string( + log, + blob=remote_log_location, + replace=True) + return + else: + bkt, blob = remote_log_location.lstrip('gs:/').split('/', 1) + from tempfile import NamedTemporaryFile + with NamedTemporaryFile(mode='w+') as tmpfile: + tmpfile.write(log) + self.hook.upload(bkt, blob, tmpfile.name) + return + except: + pass + + # raise/return error if we get here + logging.error('Could not write logs to {}'.format(remote_log_location)) diff --git a/airflow/www/views.py b/airflow/www/views.py index 07f616c94e77bb..392f9a4856ff5f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -779,7 +779,7 @@ def log(self): f.close() log_loaded = True except: - log = "*** Log file isn't where expected.\n".format(loc) + log = "*** Local log file not found.\n".format(loc) else: WORKER_LOG_SERVER_PORT = \ conf.get('celery', 'WORKER_LOG_SERVER_PORT') @@ -796,22 +796,23 @@ def log(self): log += "*** Failed to fetch log file from worker.\n".format( **locals()) - # try to load log backup from S3 - s3_log_folder = conf.get('core', 'S3_LOG_FOLDER') - if not log_loaded and s3_log_folder.startswith('s3:'): - import boto - s3 = boto.connect_s3() - s3_log_loc = os.path.join( - conf.get('core', 'S3_LOG_FOLDER'), log_relative) - log += '*** Fetching log from S3: {}\n'.format(s3_log_loc) - log += ('*** Note: S3 logs are only available once ' - 'tasks have completed.\n') - bucket, key = s3_log_loc.lstrip('s3:/').split('/', 1) - s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key) - if s3_key.exists(): - log += '\n' + s3_key.get_contents_as_string().decode() - else: - log += '*** No log found on S3.\n' + if not log_loaded: + # load remote logs + remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + remote_log = os.path.join(remote_log_base, log_relative) + log += '\n*** Reading remote logs...\n'.format(remote_log) + + # S3 + if remote_log.startswith('s3:/'): + log += utils.S3Log().read(remote_log, return_error=True) + + # GCS + elif remote_log.startswith('gs:/'): + log += utils.GCSLog().read(remote_log, return_error=True) + + # unsupported + elif remote_log: + log += '*** Unsupported remote log location.' session.commit() session.close() diff --git a/setup.py b/setup.py index d08bd555929b1d..2b1b85f0026046 100644 --- a/setup.py +++ b/setup.py @@ -58,7 +58,7 @@ def run(self): ] gcp_api = [ 'httplib2', - 'google-api-python-client', + 'google-api-python-client<=1.4.2', 'oauth2client>=1.5.2, <2.0.0', ] hdfs = ['snakebite>=2.4.13']