Skip to content

Commit

Permalink
refactor remote log read/write and add GCS support
Browse files Browse the repository at this point in the history
squashed:
update configuration

more descriptive comment

split remote log uploads into helper functions for S3 and GCS

read logs from s3

read logs from GCS

keep old_log as string

change name to log_base

better logging

overwrite in GCS

use current configuration var

objects could be none; don't check if they exist with method

allow s3 encryption from hook

fix capitalization typo

replace string search with indexing

add param docs

refactor remote log read/write into utility classes
  • Loading branch information
jlowin committed Mar 17, 2016
1 parent 3629c90 commit 274d4e4
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 58 deletions.
67 changes: 33 additions & 34 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import subprocess
import textwrap
import warnings
from datetime import datetime

from builtins import input
Expand Down Expand Up @@ -137,21 +138,14 @@ def run(args):
utils.pessimistic_connection_handling()

# Setting up logging
log = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
directory = log + "/{args.dag_id}/{args.task_id}".format(args=args)
log_base = os.path.expanduser(configuration.get('core', 'BASE_LOG_FOLDER'))
directory = log_base + "/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
args.execution_date = dateutil.parser.parse(args.execution_date)
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())

# store old log (to help with S3 appends)
if os.path.exists(filename):
with open(filename, 'r') as logfile:
old_log = logfile.read()
else:
old_log = None

subdir = process_subdir(args.subdir)
logging.root.handlers = []
logging.basicConfig(
Expand Down Expand Up @@ -233,34 +227,39 @@ def run(args):
executor.heartbeat()
executor.end()

if configuration.get('core', 'S3_LOG_FOLDER').startswith('s3:'):
import boto
s3_log = filename.replace(log, configuration.get('core', 'S3_LOG_FOLDER'))
bucket, key = s3_log.lstrip('s3:/').split('/', 1)
if os.path.exists(filename):
# store logs remotely
remote_base = configuration.get('core', 'REMOTE_BASE_LOG_FOLDER')

# get logs
with open(filename, 'r') as logfile:
new_log = logfile.read()
# deprecated as of March 2016
if not remote_base and configuration.get('core', 'S3_LOG_FOLDER'):
warnings.warn(
'The S3_LOG_FOLDER configuration key has been replaced by '
'REMOTE_BASE_LOG_FOLDER. Your configuration still works but please '
'update airflow.cfg to ensure future compatibility.',
DeprecationWarning)
remote_base = configuration.get('core', 'S3_LOG_FOLDER')

# remove old logs (since they are already in S3)
if old_log:
new_log.replace(old_log, '')
if os.path.exists(filename):
# read log and remove old logs to get just the latest additions

try:
s3 = boto.connect_s3()
s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key)

# append new logs to old S3 logs, if available
if s3_key.exists():
old_s3_log = s3_key.get_contents_as_string().decode()
new_log = old_s3_log + '\n' + new_log

# send log to S3
encrypt = configuration.get('core', 'ENCRYPT_S3_LOGS')
s3_key.set_contents_from_string(new_log, encrypt_key=encrypt)
except:
print('Could not send logs to S3.')
with open(filename, 'r') as logfile:
log = logfile.read()

remote_log_location = filename.replace(log_base, remote_base)
# S3

if remote_base.startswith('s3:/'):
utils.S3Log().write(log, remote_log_location)
# GCS
elif remote_base.startswith('gs:/'):
utils.GCSLog().write(
log,
remote_log_location,
append=True)
# Other
elif remote_base:
logging.error(
'Unsupported remote log location: {}'.format(remote_base))


def task_state(args):
Expand Down
20 changes: 15 additions & 5 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from __future__ import unicode_literals

from future import standard_library

standard_library.install_aliases()

from builtins import str
from configparser import ConfigParser
import errno
Expand Down Expand Up @@ -70,8 +70,10 @@ class AirflowConfigException(Exception):
'plugins_folder': None,
'security': None,
'donot_pickle': False,
's3_log_folder': '',
'remote_base_log_folder': '',
'remote_log_conn_id': '',
'encrypt_s3_logs': False,
's3_log_folder': '', # deprecated!
'dag_concurrency': 16,
'max_active_runs_per_dag': 16,
'executor': 'SequentialExecutor',
Expand Down Expand Up @@ -133,9 +135,17 @@ class AirflowConfigException(Exception):
# The folder where airflow should store its log files. This location
base_log_folder = {AIRFLOW_HOME}/logs
# An S3 location can be provided for log backups
# For S3, use the full URL to the base folder (starting with "s3://...")
s3_log_folder = None
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = None
remote_log_conn_id = None
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
# deprecated option for remote log storage, use remote_base_log_folder instead!
# s3_log_folder = None
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
Expand Down
6 changes: 4 additions & 2 deletions airflow/hooks/S3_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ def load_file(

def load_string(self, string_data,
key, bucket_name=None,
replace=False):
replace=False,
encrypt=False):
"""
Loads a local file to S3
Expand Down Expand Up @@ -366,6 +367,7 @@ def load_string(self, string_data,
if not key_obj:
key_obj = bucket.new_key(key_name=key)
key_size = key_obj.set_contents_from_string(string_data,
replace=replace)
replace=replace,
encrypt_key=encrypt)
logging.info("The key {key} now contains"
" {key_size} bytes".format(**locals()))
174 changes: 174 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,3 +784,177 @@ def logger(self):
self._logger = logging.root.getChild(self.__class__.__module__ + '.' +self.__class__.__name__)
return self._logger


class S3Log(object):
"""
Utility class for reading and writing logs in S3.
Requires airflow[s3] and setting the REMOTE_BASE_LOG_FOLDER and
REMOTE_LOG_CONN_ID configuration options in airflow.cfg.
"""
def __init__(self):
remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
try:
from airflow.hooks import S3Hook
self.hook = S3Hook(remote_conn_id)
except:
self.hook = None
logging.error(
'Could not create an S3Hook with connection id "{}". '
'Please make sure that airflow[s3] is installed and '
'the S3 connection exists.'.format(remote_conn_id))

def read(self, remote_log_location, return_error=False):
"""
Returns the log found at the remote_log_location. Returns '' if no
logs are found or there is an error.
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param return_error: if True, returns a string error message if an
error occurs. Otherwise returns '' when an error occurs.
:type return_error: bool
"""
if self.hook:
try:
s3_key = self.hook.get_key(remote_log_location)
if s3_key:
return s3_key.get_contents_as_string().decode()
except:
pass

# raise/return error if we get here
err = 'Could not read logs from {}'.format(remote_log_location)
logging.error(err)
return err if return_error else ''


def write(self, log, remote_log_location, append=False):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
:param log: the log to write to the remote_log_location
:type log: string
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:type append: bool
"""
if self.hook:

if append:
old_log = self.read(remote_log_location)
log = old_log + '\n' + log
try:
self.hook.load_string(
log,
key=remote_log_location,
replace=True,
encrypt=configuration.get('core', 'ENCRYPT_S3_LOGS'))
return
except:
pass

# raise/return error if we get here
logging.error('Could not write logs to {}'.format(remote_log_location))


class GCSLog(object):
"""
Utility class for reading and writing logs in GCS.
Requires either airflow[gcloud] or airflow[gcp_api] and
setting the REMOTE_BASE_LOG_FOLDER and REMOTE_LOG_CONN_ID configuration
options in airflow.cfg.
"""
def __init__(self):
"""
Attempt to create hook with airflow[gcloud] (and set
use_gcloud = True), otherwise uses airflow[gcp_api]
"""
remote_conn_id = configuration.get('core', 'REMOTE_LOG_CONN_ID')
self.use_gcloud = False

try:
from airflow.contrib.hooks import GCSHook
self.hook = GCSHook(remote_conn_id)
self.use_gcloud = True
except:
try:
from airflow.contrib.hooks import GoogleCloudStorageHook
self.hook = GoogleCloudStorageHook(remote_conn_id)
except:
self.hook = None
logging.error(
'Could not create a GCSHook with connection id "{}". '
'Please make sure that either airflow[gcloud] or '
'airflow[gcp_api] is installed and the GCS connection '
'exists.'.format(remote_conn_id))

def read(self, remote_log_location, return_error=True):
"""
Returns the log found at the remote_log_location.
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param return_error: if True, returns a string error message if an
error occurs. Otherwise returns '' when an error occurs.
:type return_error: bool
"""
if self.hook:
try:
if self.use_gcloud:
gcs_blob = self.hook.get_blob(remote_log_location)
if gcs_blob:
return gcs_blob.download_as_string().decode()
else:
bkt, blob = remote_log_location.lstrip('gs:/').split('/', 1)
return self.hook.download(bkt, blob).decode()
except:
pass

# raise/return error if we get here
err = 'Could not read logs from {}'.format(remote_log_location)
logging.error(err)
return err if return_error else ''

def write(self, log, remote_log_location, append=False):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
:param log: the log to write to the remote_log_location
:type log: string
:param remote_log_location: the log's location in remote storage
:type remote_log_location: string (path)
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:type append: bool
"""
if self.hook:

if append:
old_log = self.read(remote_log_location)
log = old_log + '\n' + log

try:
if self.use_gcloud:
self.hook.upload_from_string(
log,
blob=remote_log_location,
replace=True)
return
else:
bkt, blob = remote_log_location.lstrip('gs:/').split('/', 1)
from tempfile import NamedTemporaryFile
with NamedTemporaryFile(mode='w+') as tmpfile:
tmpfile.write(log)
gcs_hook.upload(bkt, blob, tmpfile.name)
return
except:
pass

# raise/return error if we get here
logging.error('Could not write logs to {}'.format(remote_log_location))
35 changes: 18 additions & 17 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def log(self):
f.close()
log_loaded = True
except:
log = "*** Log file isn't where expected.\n".format(loc)
log = "*** Local log file not found.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = \
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
Expand All @@ -773,22 +773,23 @@ def log(self):
log += "*** Failed to fetch log file from worker.\n".format(
**locals())

# try to load log backup from S3
s3_log_folder = conf.get('core', 'S3_LOG_FOLDER')
if not log_loaded and s3_log_folder.startswith('s3:'):
import boto
s3 = boto.connect_s3()
s3_log_loc = os.path.join(
conf.get('core', 'S3_LOG_FOLDER'), log_relative)
log += '*** Fetching log from S3: {}\n'.format(s3_log_loc)
log += ('*** Note: S3 logs are only available once '
'tasks have completed.\n')
bucket, key = s3_log_loc.lstrip('s3:/').split('/', 1)
s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key)
if s3_key.exists():
log += '\n' + s3_key.get_contents_as_string().decode()
else:
log += '*** No log found on S3.\n'
if not log_loaded:
# load remote logs
remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
remote_log = os.path.join(remote_log_base, log_relative)
log += '\n*** Reading remote logs...\n'.format(remote_log)

# S3
if remote_log.startswith('s3:/'):
log += utils.S3Log().read(remote_log, return_error=True)

# GCS
elif remote_log.startswith('gs:/'):
log += utils.GCSLog().read(remote_log, return_error=True)

# unsupported
elif remote_log:
log += '*** Unsupported remote log location.'

session.commit()
session.close()
Expand Down

0 comments on commit 274d4e4

Please sign in to comment.