diff --git a/.gitignore b/.gitignore index 1b45e9ef48..25f8bd0c9f 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,4 @@ scripts/*.gz scripts/singularity_logfetch.egg-info scripts/build scripts/dist +scripts/.venv diff --git a/scripts/README.md b/scripts/README.md index c83ede42d2..aa78184a6c 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -39,16 +39,40 @@ When the -g option is set, the log fetcher will grep the downloaded files for th ##Example Usage - Specify a configuration file AND folder to use - - `logfetch -r ‘My_Jobs_Id’ -c somefile -f ~/.somefolder` (uses ~/.somefolder/somefile as config file) + +`logfetch -r ‘My_Jobs_Id’ -c somefile -f ~/.somefolder` (uses ~/.somefolder/somefile as config file) + - Specify a configuration file in the default directory - - `logfetch -r ‘My_Jobs_Id’ -c somefile` (uses ~/.logfetch/somefile as config file) + +`logfetch -r ‘My_Jobs_Id’ -c somefile` (uses ~/.logfetch/somefile as config file) + - Search logs for a request - - `logfetch -r 'My_Jobs_Id' -g 'Regex_here'` + +`logfetch -r 'My_Jobs_Id' -g 'Regex_here'` + - Search logs for a specific deploy - - `logfetch -r 'My_Jobs_Id' -d '1_2_3' -g 'Regex_here'` + +`logfetch -r 'My_Jobs_Id' -d '1_2_3' -g 'Regex_here'` + - Search logs for a specific task - - `logfetch -t 'My_Task_id' -g 'Regex_here'` + +`logfetch -t 'My_Task_id' -g 'Regex_here'` + - Specify your own configuration file - - `logfetch -c /etc/my_conf_file -t 'My_Task_id' -g 'Regex_here'` + +`logfetch -c /etc/my_conf_file -t 'My_Task_id' -g 'Regex_here'` + - Don't search, just download logs - - `logfetch -r 'My_Jobs_Id'` + +`logfetch -r 'My_Jobs_Id'` + +##Tailing Logs +You can tail live log files byusing logtail. Just provude the request, task, or request and deploy along with a log file path. + +For example, to tail the service.log file for all tasks for a request named MyRequest, you would use the command: + +`logtail -r ‘MyRequest’ -l ‘service.log’` + +- The path for the log file is relative to the base path for that task’s sandbox. ie. to tail a file in (sandbox path)/logs/access.log, the argument to -l would be ‘logs/access.log’ + +You can also provide the -g option which will provide the grep string to the singularity api and search the results. You cannot provide a full grep command as in some of the above examples, just a string to match on diff --git a/scripts/logfetch/log_fetcher.py b/scripts/logfetch/entrypoint.py similarity index 61% rename from scripts/logfetch/log_fetcher.py rename to scripts/logfetch/entrypoint.py index 899a8bc4b9..7d967983a2 100644 --- a/scripts/logfetch/log_fetcher.py +++ b/scripts/logfetch/entrypoint.py @@ -7,6 +7,7 @@ from fake_section_head import FakeSectionHead from live_logs import download_live_logs from s3_logs import download_s3_logs +from tail import start_tail from grep import grep_files CONF_READ_ERR_FORMAT = 'Could not load config from {0} due to {1}' @@ -22,7 +23,10 @@ def exit(reason): sys.stderr.write(colored(reason, 'red') + '\n') sys.exit(1) -def main(args): +def tail_logs(args): + start_tail(args) + +def fetch_logs(args): check_dest(args) all_logs = [] all_logs += download_s3_logs(args) @@ -33,7 +37,7 @@ def check_dest(args): if not os.path.exists(args.dest): os.makedirs(args.dest) -def entrypoint(): +def fetch(): conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live") conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE") @@ -81,4 +85,45 @@ def entrypoint(): args.dest = os.path.expanduser(args.dest) - main(args) + fetch_logs(args) + +def tail(): + conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False) + conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live") + conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE") + args, remaining_argv = conf_parser.parse_known_args() + conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR + conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE) + config = ConfigParser.SafeConfigParser() + + defaults = {} + + try: + config.readfp(FakeSectionHead(open(os.path.expanduser(conf_file)))) + defaults.update(dict(config.items("Defaults"))) + except Exception, err: + sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n') + + parser = argparse.ArgumentParser(parents=[conf_parser], description="Tail log files from Singularity. One can specify either a TaskId, RequestId and DeployId, or RequestId", + prog="log_fetcher") + + parser.set_defaults(**defaults) + parser.add_argument("-t", "--taskId", help="TaskId of task to fetch logs for", metavar="taskId") + parser.add_argument("-r", "--requestId", help="RequestId of request to fetch logs for", metavar="requestId") + parser.add_argument("-d", "--deployId", help="DeployId of task to fetch logs for", metavar="deployId") + parser.add_argument("-u", "--singularity-uri-base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)", metavar="URI") + parser.add_argument("-g", "--grep", help="String to grep for", metavar='grep') + parser.add_argument("-l", "--logfile", help="Logfile path/name to tail (ie 'logs/access.log')", metavar="logfile") + + args = parser.parse_args(remaining_argv) + + if args.deployId and not args.requestId: + exit("Must specify requestId (-r) when specifying deployId") + elif not args.requestId and not args.deployId and not args.taskId: + exit('Must specify one of\n -t taskId\n -r requestId and -d deployId\n -r requestId') + elif not args.logfile: + exit("Must specify logfile to tail (-l)") + + args.dest = os.path.expanduser(args.dest) + + tail_logs(args) diff --git a/scripts/logfetch/live_logs.py b/scripts/logfetch/live_logs.py index 2fc5ab5215..51d5188a8d 100644 --- a/scripts/logfetch/live_logs.py +++ b/scripts/logfetch/live_logs.py @@ -3,16 +3,12 @@ import grequests from glob import glob from termcolor import colored -from datetime import datetime - from callbacks import generate_callback from singularity_request import get_json_response import logfetch_base DOWNLOAD_FILE_FORMAT = '{0}/sandbox/{1}/download' BROWSE_FOLDER_FORMAT = '{0}/sandbox/{1}/browse' -REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks' -ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active' def download_live_logs(args): tasks = tasks_to_check(args) @@ -61,46 +57,13 @@ def tasks_to_check(args): if args.taskId: return [args.taskId] else: - return tasks_for_request(args) - -def tasks_for_request(args): - if args.requestId and args.deployId: - tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)] - else: - tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)][0:args.task_count] - return tasks - -def all_tasks_for_request(args): - uri = '{0}{1}'.format(logfetch_base.base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId)) - historical_tasks = get_json_response(uri) - uri = '{0}{1}'.format(logfetch_base.base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId)) - active_tasks = get_json_response(uri) - if len(historical_tasks) == 0: - return active_tasks - elif len(active_tasks) == 0: - return historical_tasks - else: - return active_tasks + [h for h in historical_tasks if is_in_date_range(args, int(str(h['updatedAt'])[0:-3]))] - -def is_in_date_range(args, timestamp): - timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp) - if args.end_days: - if timedelta.days > args.start_days or timedelta.days <= args.end_days: - return False - else: - return True - else: - if timedelta.days > args.start_days: - return False - else: - return True - + return logfetch_base.tasks_for_request(args) def logs_folder_files(args, task): uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task) files_json = get_json_response(uri, {'path' : '{0}/logs'.format(task)}) if 'files' in files_json: files = files_json['files'] - return [f['name'] for f in files if is_in_date_range(args, f['mtime'])] + return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])] else: - return [f['path'].rsplit('/')[-1] for f in files_json if is_in_date_range(args, f['mtime'])] + return [f['path'].rsplit('/')[-1] for f in files_json if logfetch_base.is_in_date_range(args, f['mtime'])] diff --git a/scripts/logfetch/logfetch_base.py b/scripts/logfetch/logfetch_base.py index 6b7c2726f2..4aef537689 100644 --- a/scripts/logfetch/logfetch_base.py +++ b/scripts/logfetch/logfetch_base.py @@ -1,9 +1,13 @@ import os import sys import gzip +from datetime import datetime from termcolor import colored +from singularity_request import get_json_response BASE_URI_FORMAT = '{0}{1}' +REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks' +ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active' def unpack_logs(logs): for zipped_file in logs: @@ -24,3 +28,40 @@ def base_uri(args): uri = BASE_URI_FORMAT.format(uri_prefix, args.singularity_uri_base) return uri +def tasks_for_request(args): + if args.requestId and args.deployId: + tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)] + else: + tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)] + if hasattr(args, 'task_count'): + tasks = tasks[0:args.task_count] + return tasks + +def all_tasks_for_request(args): + uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId)) + active_tasks = get_json_response(uri) + if hasattr(args, 'start_days'): + uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId)) + historical_tasks = get_json_response(uri) + if len(historical_tasks) == 0: + return active_tasks + elif len(active_tasks) == 0: + return historical_tasks + else: + return active_tasks + [h for h in historical_tasks if is_in_date_range(args, int(str(h['updatedAt'])[0:-3]))] + else: + return active_tasks + +def is_in_date_range(args, timestamp): + timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp) + if args.end_days: + if timedelta.days > args.start_days or timedelta.days <= args.end_days: + return False + else: + return True + else: + if timedelta.days > args.start_days: + return False + else: + return True + diff --git a/scripts/logfetch/tail.py b/scripts/logfetch/tail.py new file mode 100644 index 0000000000..c2a60a3d38 --- /dev/null +++ b/scripts/logfetch/tail.py @@ -0,0 +1,76 @@ +import sys +import logfetch_base +import requests +import time +import threading + + +TAIL_LOG_FORMAT = '{0}/sandbox/{1}/read' +READ_INTERVAL = 5 +THREAD_TIMEOUT = 100000 + +def start_tail(args): + if args.requestId: + sys.stderr.write('Fetching tasks\n') + tasks = [str(t) for t in logfetch_base.tasks_for_request(args)] + else: + tasks = [args.taskId] + sys.stderr.write('Tailing logs for tasks:\n') + for t in tasks: + sys.stderr.write('{0}\n'.format(t)) + sys.stderr.write('ctrl+c to exit\n') + try: + threads = [] + for task in tasks: + thread = LogStreamer(args, task) + threads.append(thread) + thread.start() + for t in threads: + t.join(THREAD_TIMEOUT) #Need a timeout otherwise can't be killed by ctrl+c + if not t.isAlive: + break + except KeyboardInterrupt: + sys.stderr.write('Stopping tail') + sys.exit(0) + +class LogStreamer(threading.Thread): + def __init__(self, args, task): + threading.Thread.__init__(self) + self.daemon = True + self.Args = args + self.Task = task + + def run(self): + self.stream_log_for_task(self.Args, self.Task) + + def stream_log_for_task(self, args, task): + uri = TAIL_LOG_FORMAT.format(logfetch_base.base_uri(args), task) + path = '{0}/{1}'.format(task, args.logfile) + keep_trying = True + try: + offset = self.get_initial_offset(uri, path) + except ValueError: + sys.stderr.write('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task)) + keep_trying = False + while keep_trying: + try: + offset = self.fetch_new_log_data(uri, path, offset, args.grep) + time.sleep(5) + except ValueError: + sys.stderr.write('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task)) + keep_trying = False + + def get_initial_offset(self, uri, path): + params = {"path" : path} + return requests.get(uri, params=params).json()['offset'] + + def fetch_new_log_data(self, uri, path, offset, grep): + params = { + "path" : path, + "offset" : offset + } + if grep: + params['grep'] = grep + response = requests.get(uri, params=params).json() + sys.stdout.write(response['data']) + return offset + len(response['data'].encode('utf-8')) diff --git a/scripts/setup.py b/scripts/setup.py index 75b7a63068..f206d9f0c1 100644 --- a/scripts/setup.py +++ b/scripts/setup.py @@ -20,6 +20,9 @@ install_requires=requirements, zip_safe=False, entry_points={ - 'console_scripts':['logfetch=logfetch.log_fetcher:entrypoint'], + 'console_scripts':[ + 'logfetch=logfetch.entrypoint:fetch', + 'logtail=logfetch.entrypoint:tail' + ], } )