Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Log Storage (take 2) #1137

Merged
merged 3 commits into from
Mar 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import subprocess
import textwrap
import warnings
from datetime import datetime

from builtins import input
Expand Down Expand Up @@ -137,21 +138,14 @@ def run(args):
utils.pessimistic_connection_handling()

# Setting up logging
log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
directory = log + "/{args.dag_id}/{args.task_id}".format(args=args)
log_base = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
args.execution_date = dateutil.parser.parse(args.execution_date)
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())

# store old log (to help with S3 appends)
if os.path.exists(filename):
with open(filename, 'r') as logfile:
old_log = logfile.read()
else:
old_log = None

subdir = process_subdir(args.subdir)
logging.root.handlers = []
logging.basicConfig(
Expand Down Expand Up @@ -233,34 +227,39 @@ def run(args):
executor.heartbeat()
executor.end()

if configuration.get('core', 'S3_LOG_FOLDER').startswith('s3:'):
import boto
s3_log = filename.replace(log, configuration.get('core', 'S3_LOG_FOLDER'))
bucket, key = s3_log.lstrip('s3:/').split('/', 1)
if os.path.exists(filename):
# store logs remotely
remote_base = configuration.get('core', 'REMOTE_BASE_LOG_FOLDER')

# get logs
with open(filename, 'r') as logfile:
new_log = logfile.read()
# deprecated as of March 2016
if not remote_base and configuration.get('core', 'S3_LOG_FOLDER'):
warnings.warn(
'The S3_LOG_FOLDER configuration key has been replaced by '
'REMOTE_BASE_LOG_FOLDER. Your configuration still works but please '
'update airflow.cfg to ensure future compatibility.',
DeprecationWarning)
remote_base = configuration.get('core', 'S3_LOG_FOLDER')

# remove old logs (since they are already in S3)
if old_log:
new_log.replace(old_log, '')
if os.path.exists(filename):
# read log and remove old logs to get just the latest additions

try:
s3 = boto.connect_s3()
s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key)

# append new logs to old S3 logs, if available
if s3_key.exists():
old_s3_log = s3_key.get_contents_as_string().decode()
new_log = old_s3_log + '\n' + new_log

# send log to S3
encrypt = configuration.get('core', 'ENCRYPT_S3_LOGS')
s3_key.set_contents_from_string(new_log, encrypt_key=encrypt)
except:
print('Could not send logs to S3.')
with open(filename, 'r') as logfile:
log = logfile.read()

remote_log_location = filename.replace(log_base, remote_base)
# S3

if remote_base.startswith('s3:/'):
utils.S3Log().write(log, remote_log_location)
# GCS
elif remote_base.startswith('gs:/'):
utils.GCSLog().write(
log,
remote_log_location,
append=True)
# Other
elif remote_base:
logging.error(
'Unsupported remote log location: {}'.format(remote_base))


def task_state(args):
Expand Down
20 changes: 15 additions & 5 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from __future__ import unicode_literals

from future import standard_library

standard_library.install_aliases()

from builtins import str
from configparser import ConfigParser
import errno
Expand Down Expand Up @@ -70,8 +70,10 @@ class AirflowConfigException(Exception):
'plugins_folder': None,
'security': None,
'donot_pickle': False,
's3_log_folder': '',
'remote_base_log_folder': '',
'remote_log_conn_id': '',
'encrypt_s3_logs': False,
's3_log_folder': '', # deprecated!
'dag_concurrency': 16,
'max_active_runs_per_dag': 16,
'executor': 'SequentialExecutor',
Expand Down Expand Up @@ -133,9 +135,17 @@ class AirflowConfigException(Exception):

# The folder where airflow should store its log files. This location
base_log_folder = {AIRFLOW_HOME}/logs
# An S3 location can be provided for log backups
# For S3, use the full URL to the base folder (starting with "s3://...")
s3_log_folder = None

# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = None
remote_log_conn_id = None
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# deprecated option for remote log storage, use remote_base_log_folder instead!
# s3_log_folder = None

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
Expand Down
6 changes: 4 additions & 2 deletions airflow/hooks/S3_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ def load_file(

def load_string(self, string_data,
key, bucket_name=None,
replace=False):
replace=False,
encrypt=False):
"""
Loads a local file to S3

Expand Down Expand Up @@ -366,6 +367,7 @@ def load_string(self, string_data,
if not key_obj:
key_obj = bucket.new_key(key_name=key)
key_size = key_obj.set_contents_from_string(string_data,
replace=replace)
replace=replace,
encrypt_key=encrypt)
logging.info("The key {key} now contains"
" {key_size} bytes".format(**locals()))
174 changes: 174 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,3 +784,177 @@ def logger(self):
self._logger = logging.root.getChild(self.__class__.__module__ + '.' +self.__class__.__name__)
return self._logger


class S3Log(object):
"""
Utility class for reading and writing logs in S3.
Requires airflow[s3] and setting the REMOTE_BASE_LOG_FOLDER and
REMOTE_LOG_CONN_ID configuration options in airflow.cfg.
"""
def __init__(self):
remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
try:
from airflow.hooks import S3Hook
self.hook = S3Hook(remote_conn_id)
except:
self.hook = None
logging.error(
'Could not create an S3Hook with connection id "{}". '
'Please make sure that airflow[s3] is installed and '
'the S3 connection exists.'.format(remote_conn_id))

def read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location. Returns '' if no
logs are found or there is an error.

:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param return_error: if True, returns a string error message if an
error occurs. Otherwise returns '' when an error occurs.
:type return_error: bool
"""
if self.hook:
try:
s3_key = self.hook.get_key(remote_log_location)
if s3_key:
return s3_key.get_contents_as_string().decode()
except:
pass

# raise/return error if we get here
err = 'Could not read logs from {}'.format(remote_log_location)
logging.error(err)
return err if return_error else ''


def write(self, log, remote_log_location, append=False):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.

:param log: the log to write to the remote_log_location
:type log: string
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:type append: bool

"""
if self.hook:

if append:
old_log = self.read(remote_log_location)
log = old_log + '\n' + log
try:
self.hook.load_string(
log,
key=remote_log_location,
replace=True,
encrypt=configuration.get('core', 'ENCRYPT_S3_LOGS'))
return
except:
pass

# raise/return error if we get here
logging.error('Could not write logs to {}'.format(remote_log_location))


class GCSLog(object):
"""
Utility class for reading and writing logs in GCS.
Requires either airflow[gcloud] or airflow[gcp_api] and
setting the REMOTE_BASE_LOG_FOLDER and REMOTE_LOG_CONN_ID configuration
options in airflow.cfg.
"""
def __init__(self):
"""
Attempt to create hook with airflow[gcloud] (and set
use_gcloud = True), otherwise uses airflow[gcp_api]
"""
remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
self.use_gcloud = False

try:
from airflow.contrib.hooks import GCSHook
self.hook = GCSHook(remote_conn_id)
self.use_gcloud = True
except:
try:
from airflow.contrib.hooks import GoogleCloudStorageHook
self.hook = GoogleCloudStorageHook(remote_conn_id)
except:
self.hook = None
logging.error(
'Could not create a GCSHook with connection id "{}". '
'Please make sure that either airflow[gcloud] or '
'airflow[gcp_api] is installed and the GCS connection '
'exists.'.format(remote_conn_id))

def read(self, remote_log_location, return_error=True):
"""
Returns the log found at the remote_log_location.

:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param return_error: if True, returns a string error message if an
error occurs. Otherwise returns '' when an error occurs.
:type return_error: bool
"""
if self.hook:
try:
if self.use_gcloud:
gcs_blob = self.hook.get_blob(remote_log_location)
if gcs_blob:
return gcs_blob.download_as_string().decode()
else:
bkt, blob = remote_log_location.lstrip('gs:/').split('/', 1)
return self.hook.download(bkt, blob).decode()
except:
pass

# raise/return error if we get here
err = 'Could not read logs from {}'.format(remote_log_location)
logging.error(err)
return err if return_error else ''

def write(self, log, remote_log_location, append=False):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.

:param log: the log to write to the remote_log_location
:type log: string
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:type append: bool

"""
if self.hook:

if append:
old_log = self.read(remote_log_location)
log = old_log + '\n' + log

try:
if self.use_gcloud:
self.hook.upload_from_string(
log,
blob=remote_log_location,
replace=True)
return
else:
bkt, blob = remote_log_location.lstrip('gs:/').split('/', 1)
from tempfile import NamedTemporaryFile
with NamedTemporaryFile(mode='w+') as tmpfile:
tmpfile.write(log)
self.hook.upload(bkt, blob, tmpfile.name)
return
except:
pass

# raise/return error if we get here
logging.error('Could not write logs to {}'.format(remote_log_location))
35 changes: 18 additions & 17 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def log(self):
f.close()
log_loaded = True
except:
log = "*** Log file isn't where expected.\n".format(loc)
log = "*** Local log file not found.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = \
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
Expand All @@ -773,22 +773,23 @@ def log(self):
log += "*** Failed to fetch log file from worker.\n".format(
**locals())

# try to load log backup from S3
s3_log_folder = conf.get('core', 'S3_LOG_FOLDER')
if not log_loaded and s3_log_folder.startswith('s3:'):
import boto
s3 = boto.connect_s3()
s3_log_loc = os.path.join(
conf.get('core', 'S3_LOG_FOLDER'), log_relative)
log += '*** Fetching log from S3: {}\n'.format(s3_log_loc)
log += ('*** Note: S3 logs are only available once '
'tasks have completed.\n')
bucket, key = s3_log_loc.lstrip('s3:/').split('/', 1)
s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key)
if s3_key.exists():
log += '\n' + s3_key.get_contents_as_string().decode()
else:
log += '*** No log found on S3.\n'
if not log_loaded:
# load remote logs
remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
remote_log = os.path.join(remote_log_base, log_relative)
log += '\n*** Reading remote logs...\n'.format(remote_log)

# S3
if remote_log.startswith('s3:/'):
log += utils.S3Log().read(remote_log, return_error=True)

# GCS
elif remote_log.startswith('gs:/'):
log += utils.GCSLog().read(remote_log, return_error=True)

# unsupported
elif remote_log:
log += '*** Unsupported remote log location.'

session.commit()
session.close()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run(self):
]
gcp_api = [
'httplib2',
'google-api-python-client',
'google-api-python-client<=1.4.2',
'oauth2client>=1.5.2, <2.0.0',
]
hdfs = ['snakebite>=2.4.13']
Expand Down