Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add log tailing to logfetch tool #383

Merged
merged 7 commits into from
Jan 9, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,38 @@ When the -g option is set, the log fetcher will grep the downloaded files for th

##Example Usage
- Specify a configuration file AND folder to use
- `logfetch -r ‘My_Jobs_Id’ -c somefile -f ~/.somefolder` (uses ~/.somefolder/somefile as config file)

`logfetch -r ‘My_Jobs_Id’ -c somefile -f ~/.somefolder` (uses ~/.somefolder/somefile as config file)

- Specify a configuration file in the default directory
- `logfetch -r ‘My_Jobs_Id’ -c somefile` (uses ~/.logfetch/somefile as config file)

`logfetch -r ‘My_Jobs_Id’ -c somefile` (uses ~/.logfetch/somefile as config file)

- Search logs for a request
- `logfetch -r 'My_Jobs_Id' -g 'Regex_here'`

`logfetch -r 'My_Jobs_Id' -g 'Regex_here'`

- Search logs for a specific deploy
- `logfetch -r 'My_Jobs_Id' -d '1_2_3' -g 'Regex_here'`

`logfetch -r 'My_Jobs_Id' -d '1_2_3' -g 'Regex_here'`

- Search logs for a specific task
- `logfetch -t 'My_Task_id' -g 'Regex_here'`

`logfetch -t 'My_Task_id' -g 'Regex_here'`

- Specify your own configuration file
- `logfetch -c /etc/my_conf_file -t 'My_Task_id' -g 'Regex_here'`

`logfetch -c /etc/my_conf_file -t 'My_Task_id' -g 'Regex_here'`

- Don't search, just download logs
- `logfetch -r 'My_Jobs_Id'`

`logfetch -r 'My_Jobs_Id'`

##Tailing Logs
You can tail live log files by providing the --tail option with the path to the log file. For example, to tail the service.log file for all tasks for a request named MyRequest, you would use the command:

`logfetch -r ‘MyRequest’ --tail ‘service.log’`

- The path for the log file is relative to the base path for that task’s sandbox. ie. to tail a file in (sandbox path)/logs/access.log, the argument to --tail would be ‘logs/access.log’

You can also provide the -g option which will provide the grep string to the singularity api and search the results. You cannot provide a full grep command as in some of the above examples, just a string to match on
43 changes: 3 additions & 40 deletions scripts/logfetch/live_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
import grequests
from glob import glob
from termcolor import colored
from datetime import datetime

from callbacks import generate_callback
from singularity_request import get_json_response
import logfetch_base

DOWNLOAD_FILE_FORMAT = '{0}/sandbox/{1}/download'
BROWSE_FOLDER_FORMAT = '{0}/sandbox/{1}/browse'
REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks'
ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active'

def download_live_logs(args):
tasks = tasks_to_check(args)
Expand Down Expand Up @@ -61,46 +57,13 @@ def tasks_to_check(args):
if args.taskId:
return [args.taskId]
else:
return tasks_for_request(args)

def tasks_for_request(args):
if args.requestId and args.deployId:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)]
else:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)][0:args.task_count]
return tasks

def all_tasks_for_request(args):
uri = '{0}{1}'.format(logfetch_base.base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId))
historical_tasks = get_json_response(uri)
uri = '{0}{1}'.format(logfetch_base.base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId))
active_tasks = get_json_response(uri)
if len(historical_tasks) == 0:
return active_tasks
elif len(active_tasks) == 0:
return historical_tasks
else:
return active_tasks + [h for h in historical_tasks if is_in_date_range(args, int(str(h['updatedAt'])[0:-3]))]

def is_in_date_range(args, timestamp):
timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp)
if args.end_days:
if timedelta.days > args.start_days or timedelta.days <= args.end_days:
return False
else:
return True
else:
if timedelta.days > args.start_days:
return False
else:
return True

return logfetch_base.tasks_for_request(args)

def logs_folder_files(args, task):
uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task)
files_json = get_json_response(uri, {'path' : '{0}/logs'.format(task)})
if 'files' in files_json:
files = files_json['files']
return [f['name'] for f in files if is_in_date_range(args, f['mtime'])]
return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])]
else:
return [f['path'].rsplit('/')[-1] for f in files_json if is_in_date_range(args, f['mtime'])]
return [f['path'].rsplit('/')[-1] for f in files_json if logfetch_base.is_in_date_range(args, f['mtime'])]
15 changes: 10 additions & 5 deletions scripts/logfetch/log_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fake_section_head import FakeSectionHead
from live_logs import download_live_logs
from s3_logs import download_s3_logs
from tail import tail_logs
from grep import grep_files

CONF_READ_ERR_FORMAT = 'Could not load config from {0} due to {1}'
Expand All @@ -24,10 +25,13 @@ def exit(reason):

def main(args):
check_dest(args)
all_logs = []
all_logs += download_s3_logs(args)
all_logs += download_live_logs(args)
grep_files(args, all_logs)
if args.tail:
tail_logs(args)
else:
all_logs = []
all_logs += download_s3_logs(args)
all_logs += download_live_logs(args)
grep_files(args, all_logs)

def check_dest(args):
if not os.path.exists(args.dest):
Expand Down Expand Up @@ -70,7 +74,8 @@ def entrypoint():
parser.add_argument("-u", "--singularity-uri-base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)", metavar="URI")
parser.add_argument("-s", "--start-days", help="Search for logs no older than this many days", type=int, metavar="start_days")
parser.add_argument("-e", "--end-days", help="Search for logs no new than this many days (defaults to None/today)", type=int, metavar="end_days")
parser.add_argument("-g", "--grep", help="Regex to grep for (normal grep syntax) or a full grep command", metavar='grep')
parser.add_argument("-g", "--grep", help="Regex to grep for (normal grep syntax) or a full grep command(cannot use full command with --tail)", metavar='grep')
parser.add_argument("--tail", help="Logfile name to tail, if this is set, no downloads will happen", metavar="tail")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we mimic the tail command and use -t as well (make people think less)?


args = parser.parse_args(remaining_argv)

Expand Down
36 changes: 36 additions & 0 deletions scripts/logfetch/logfetch_base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import os
import sys
import gzip
from datetime import datetime
from termcolor import colored
from singularity_request import get_json_response

BASE_URI_FORMAT = '{0}{1}'
REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks'
ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active'

def unpack_logs(logs):
for zipped_file in logs:
Expand All @@ -24,3 +28,35 @@ def base_uri(args):
uri = BASE_URI_FORMAT.format(uri_prefix, args.singularity_uri_base)
return uri

def tasks_for_request(args):
if args.requestId and args.deployId:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)]
else:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)][0:args.task_count]
return tasks

def all_tasks_for_request(args):
uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId))
historical_tasks = get_json_response(uri)
uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId))
active_tasks = get_json_response(uri)
if len(historical_tasks) == 0:
return active_tasks
elif len(active_tasks) == 0:
return historical_tasks
else:
return active_tasks + [h for h in historical_tasks if is_in_date_range(args, int(str(h['updatedAt'])[0:-3]))]

def is_in_date_range(args, timestamp):
timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp)
if args.end_days:
if timedelta.days > args.start_days or timedelta.days <= args.end_days:
return False
else:
return True
else:
if timedelta.days > args.start_days:
return False
else:
return True

75 changes: 75 additions & 0 deletions scripts/logfetch/tail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os
import sys
import logfetch_base
import requests
import time
import threading
from singularity_request import get_json_response


TAIL_LOG_FORMAT = '{0}/sandbox/{1}/read'
READ_INTERVAL = 5

def tail_logs(args):
if args.requestId:
sys.stderr.write('Fetching tasks\n')
tasks = [str(t) for t in logfetch_base.tasks_for_request(args)]
else:
tasks = [args.taskId]
sys.stderr.write('Tailing logs for tasks:\n')
for t in tasks:
sys.stderr.write('{0}\n'.format(t))
sys.stderr.write('ctrl+c to exit\n')
try:
threads = []
for task in tasks:
thread = LogStreamer(args, task)
threads += [thread]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a huge nitpick i know, but just use .append() instead of concatenating the lists

thread.start()
while True: # main thread needs something to do so it doesn't kill the others
time.sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better practice is to call join() on all threads

except KeyboardInterrupt:
sys.stdout.write('Stopping tail')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use stderr for these informational messages

sys.exit(0)

class LogStreamer(threading.Thread):
def __init__(self, args, task):
threading.Thread.__init__(self)
self.daemon = True
self.Args = args
self.Task = task

def run(self):
self.stream_log_for_task(self.Args, self.Task)

def stream_log_for_task(self, args, task):
uri = TAIL_LOG_FORMAT.format(logfetch_base.base_uri(args), task)
path = '{0}/{1}'.format(task, args.tail)
keep_trying = True
try:
offset = self.get_initial_offset(uri, path)
except ValueError:
sys.stderr.write('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task))
keep_trying = False
while keep_trying:
try:
offset = self.fetch_new_log_data(uri, path, offset, args.grep)
time.sleep(5)
except ValueError:
sys.stderr.write('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task))
keep_trying = False

def get_initial_offset(self, uri, path):
params = {"path" : path}
return requests.get(uri, params=params).json()['offset']

def fetch_new_log_data(self, uri, path, offset, grep):
params = {
"path" : path,
"offset" : offset
}
if grep:
params['grep'] = grep
response = requests.get(uri, params=params).json()
sys.stdout.write(response['data'])
return offset + len(response['data'].encode('utf-8'))
2 changes: 1 addition & 1 deletion scripts/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

setup(
name='singularity-logfetch',
version='0.0.5',
version='0.0.6',
description='Singularity log fetching and searching',
author="HubSpot",
author_email='singularity-users@googlegroups.com',
Expand Down