Skip to content

Commit

Permalink
Merge pull request #383 from HubSpot/log_tail
Browse files Browse the repository at this point in the history
add log tailing to logfetch tool
  • Loading branch information
tpetr committed Jan 9, 2015
2 parents f647456 + 8cff7e9 commit f8a2382
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 51 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ scripts/*.gz
scripts/singularity_logfetch.egg-info
scripts/build
scripts/dist
scripts/.venv
38 changes: 31 additions & 7 deletions scripts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,40 @@ When the -g option is set, the log fetcher will grep the downloaded files for th

##Example Usage
- Specify a configuration file AND folder to use
- `logfetch -r ‘My_Jobs_Id’ -c somefile -f ~/.somefolder` (uses ~/.somefolder/somefile as config file)

`logfetch -r ‘My_Jobs_Id’ -c somefile -f ~/.somefolder` (uses ~/.somefolder/somefile as config file)

- Specify a configuration file in the default directory
- `logfetch -r ‘My_Jobs_Id’ -c somefile` (uses ~/.logfetch/somefile as config file)

`logfetch -r ‘My_Jobs_Id’ -c somefile` (uses ~/.logfetch/somefile as config file)

- Search logs for a request
- `logfetch -r 'My_Jobs_Id' -g 'Regex_here'`

`logfetch -r 'My_Jobs_Id' -g 'Regex_here'`

- Search logs for a specific deploy
- `logfetch -r 'My_Jobs_Id' -d '1_2_3' -g 'Regex_here'`

`logfetch -r 'My_Jobs_Id' -d '1_2_3' -g 'Regex_here'`

- Search logs for a specific task
- `logfetch -t 'My_Task_id' -g 'Regex_here'`

`logfetch -t 'My_Task_id' -g 'Regex_here'`

- Specify your own configuration file
- `logfetch -c /etc/my_conf_file -t 'My_Task_id' -g 'Regex_here'`

`logfetch -c /etc/my_conf_file -t 'My_Task_id' -g 'Regex_here'`

- Don't search, just download logs
- `logfetch -r 'My_Jobs_Id'`

`logfetch -r 'My_Jobs_Id'`

##Tailing Logs
You can tail live log files byusing logtail. Just provude the request, task, or request and deploy along with a log file path.

For example, to tail the service.log file for all tasks for a request named MyRequest, you would use the command:

`logtail -r ‘MyRequest’ -l ‘service.log’`

- The path for the log file is relative to the base path for that task’s sandbox. ie. to tail a file in (sandbox path)/logs/access.log, the argument to -l would be ‘logs/access.log’

You can also provide the -g option which will provide the grep string to the singularity api and search the results. You cannot provide a full grep command as in some of the above examples, just a string to match on
51 changes: 48 additions & 3 deletions scripts/logfetch/log_fetcher.py → scripts/logfetch/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fake_section_head import FakeSectionHead
from live_logs import download_live_logs
from s3_logs import download_s3_logs
from tail import start_tail
from grep import grep_files

CONF_READ_ERR_FORMAT = 'Could not load config from {0} due to {1}'
Expand All @@ -22,7 +23,10 @@ def exit(reason):
sys.stderr.write(colored(reason, 'red') + '\n')
sys.exit(1)

def main(args):
def tail_logs(args):
start_tail(args)

def fetch_logs(args):
check_dest(args)
all_logs = []
all_logs += download_s3_logs(args)
Expand All @@ -33,7 +37,7 @@ def check_dest(args):
if not os.path.exists(args.dest):
os.makedirs(args.dest)

def entrypoint():
def fetch():
conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live")
conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE")
Expand Down Expand Up @@ -81,4 +85,45 @@ def entrypoint():

args.dest = os.path.expanduser(args.dest)

main(args)
fetch_logs(args)

def tail():
conf_parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, add_help=False)
conf_parser.add_argument("-f", "--conf_folder", help="specify a folder for config files to live")
conf_parser.add_argument("-c", "--conf_file", help="Specify config file within the conf folder", metavar="FILE")
args, remaining_argv = conf_parser.parse_known_args()
conf_dir = args.conf_folder if args.conf_folder else DEFAULT_CONF_DIR
conf_file = os.path.expanduser(conf_dir + '/' + args.conf_file) if args.conf_file else os.path.expanduser(conf_dir + '/' + DEFAULT_CONF_FILE)
config = ConfigParser.SafeConfigParser()

defaults = {}

try:
config.readfp(FakeSectionHead(open(os.path.expanduser(conf_file))))
defaults.update(dict(config.items("Defaults")))
except Exception, err:
sys.stderr.write(CONF_READ_ERR_FORMAT.format(conf_file, err) + '\n')

parser = argparse.ArgumentParser(parents=[conf_parser], description="Tail log files from Singularity. One can specify either a TaskId, RequestId and DeployId, or RequestId",
prog="log_fetcher")

parser.set_defaults(**defaults)
parser.add_argument("-t", "--taskId", help="TaskId of task to fetch logs for", metavar="taskId")
parser.add_argument("-r", "--requestId", help="RequestId of request to fetch logs for", metavar="requestId")
parser.add_argument("-d", "--deployId", help="DeployId of task to fetch logs for", metavar="deployId")
parser.add_argument("-u", "--singularity-uri-base", help="The base for singularity (eg. http://localhost:8080/singularity/v1)", metavar="URI")
parser.add_argument("-g", "--grep", help="String to grep for", metavar='grep')
parser.add_argument("-l", "--logfile", help="Logfile path/name to tail (ie 'logs/access.log')", metavar="logfile")

args = parser.parse_args(remaining_argv)

if args.deployId and not args.requestId:
exit("Must specify requestId (-r) when specifying deployId")
elif not args.requestId and not args.deployId and not args.taskId:
exit('Must specify one of\n -t taskId\n -r requestId and -d deployId\n -r requestId')
elif not args.logfile:
exit("Must specify logfile to tail (-l)")

args.dest = os.path.expanduser(args.dest)

tail_logs(args)
43 changes: 3 additions & 40 deletions scripts/logfetch/live_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
import grequests
from glob import glob
from termcolor import colored
from datetime import datetime

from callbacks import generate_callback
from singularity_request import get_json_response
import logfetch_base

DOWNLOAD_FILE_FORMAT = '{0}/sandbox/{1}/download'
BROWSE_FOLDER_FORMAT = '{0}/sandbox/{1}/browse'
REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks'
ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active'

def download_live_logs(args):
tasks = tasks_to_check(args)
Expand Down Expand Up @@ -61,46 +57,13 @@ def tasks_to_check(args):
if args.taskId:
return [args.taskId]
else:
return tasks_for_request(args)

def tasks_for_request(args):
if args.requestId and args.deployId:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)]
else:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)][0:args.task_count]
return tasks

def all_tasks_for_request(args):
uri = '{0}{1}'.format(logfetch_base.base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId))
historical_tasks = get_json_response(uri)
uri = '{0}{1}'.format(logfetch_base.base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId))
active_tasks = get_json_response(uri)
if len(historical_tasks) == 0:
return active_tasks
elif len(active_tasks) == 0:
return historical_tasks
else:
return active_tasks + [h for h in historical_tasks if is_in_date_range(args, int(str(h['updatedAt'])[0:-3]))]

def is_in_date_range(args, timestamp):
timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp)
if args.end_days:
if timedelta.days > args.start_days or timedelta.days <= args.end_days:
return False
else:
return True
else:
if timedelta.days > args.start_days:
return False
else:
return True

return logfetch_base.tasks_for_request(args)

def logs_folder_files(args, task):
uri = BROWSE_FOLDER_FORMAT.format(logfetch_base.base_uri(args), task)
files_json = get_json_response(uri, {'path' : '{0}/logs'.format(task)})
if 'files' in files_json:
files = files_json['files']
return [f['name'] for f in files if is_in_date_range(args, f['mtime'])]
return [f['name'] for f in files if logfetch_base.is_in_date_range(args, f['mtime'])]
else:
return [f['path'].rsplit('/')[-1] for f in files_json if is_in_date_range(args, f['mtime'])]
return [f['path'].rsplit('/')[-1] for f in files_json if logfetch_base.is_in_date_range(args, f['mtime'])]
41 changes: 41 additions & 0 deletions scripts/logfetch/logfetch_base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import os
import sys
import gzip
from datetime import datetime
from termcolor import colored
from singularity_request import get_json_response

BASE_URI_FORMAT = '{0}{1}'
REQUEST_TASKS_FORMAT = '/history/request/{0}/tasks'
ACTIVE_TASKS_FORMAT = '/history/request/{0}/tasks/active'

def unpack_logs(logs):
for zipped_file in logs:
Expand All @@ -24,3 +28,40 @@ def base_uri(args):
uri = BASE_URI_FORMAT.format(uri_prefix, args.singularity_uri_base)
return uri

def tasks_for_request(args):
if args.requestId and args.deployId:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args) if (task["taskId"]["deployId"] == args.deployId)]
else:
tasks = [task["taskId"]["id"] for task in all_tasks_for_request(args)]
if hasattr(args, 'task_count'):
tasks = tasks[0:args.task_count]
return tasks

def all_tasks_for_request(args):
uri = '{0}{1}'.format(base_uri(args), ACTIVE_TASKS_FORMAT.format(args.requestId))
active_tasks = get_json_response(uri)
if hasattr(args, 'start_days'):
uri = '{0}{1}'.format(base_uri(args), REQUEST_TASKS_FORMAT.format(args.requestId))
historical_tasks = get_json_response(uri)
if len(historical_tasks) == 0:
return active_tasks
elif len(active_tasks) == 0:
return historical_tasks
else:
return active_tasks + [h for h in historical_tasks if is_in_date_range(args, int(str(h['updatedAt'])[0:-3]))]
else:
return active_tasks

def is_in_date_range(args, timestamp):
timedelta = datetime.utcnow() - datetime.utcfromtimestamp(timestamp)
if args.end_days:
if timedelta.days > args.start_days or timedelta.days <= args.end_days:
return False
else:
return True
else:
if timedelta.days > args.start_days:
return False
else:
return True

76 changes: 76 additions & 0 deletions scripts/logfetch/tail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import sys
import logfetch_base
import requests
import time
import threading


TAIL_LOG_FORMAT = '{0}/sandbox/{1}/read'
READ_INTERVAL = 5
THREAD_TIMEOUT = 100000

def start_tail(args):
if args.requestId:
sys.stderr.write('Fetching tasks\n')
tasks = [str(t) for t in logfetch_base.tasks_for_request(args)]
else:
tasks = [args.taskId]
sys.stderr.write('Tailing logs for tasks:\n')
for t in tasks:
sys.stderr.write('{0}\n'.format(t))
sys.stderr.write('ctrl+c to exit\n')
try:
threads = []
for task in tasks:
thread = LogStreamer(args, task)
threads.append(thread)
thread.start()
for t in threads:
t.join(THREAD_TIMEOUT) #Need a timeout otherwise can't be killed by ctrl+c
if not t.isAlive:
break
except KeyboardInterrupt:
sys.stderr.write('Stopping tail')
sys.exit(0)

class LogStreamer(threading.Thread):
def __init__(self, args, task):
threading.Thread.__init__(self)
self.daemon = True
self.Args = args
self.Task = task

def run(self):
self.stream_log_for_task(self.Args, self.Task)

def stream_log_for_task(self, args, task):
uri = TAIL_LOG_FORMAT.format(logfetch_base.base_uri(args), task)
path = '{0}/{1}'.format(task, args.logfile)
keep_trying = True
try:
offset = self.get_initial_offset(uri, path)
except ValueError:
sys.stderr.write('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task))
keep_trying = False
while keep_trying:
try:
offset = self.fetch_new_log_data(uri, path, offset, args.grep)
time.sleep(5)
except ValueError:
sys.stderr.write('Could not tail logs for task {0}, check that the task is still active and that the slave it runs on has not been decommissioned\n'.format(task))
keep_trying = False

def get_initial_offset(self, uri, path):
params = {"path" : path}
return requests.get(uri, params=params).json()['offset']

def fetch_new_log_data(self, uri, path, offset, grep):
params = {
"path" : path,
"offset" : offset
}
if grep:
params['grep'] = grep
response = requests.get(uri, params=params).json()
sys.stdout.write(response['data'])
return offset + len(response['data'].encode('utf-8'))
5 changes: 4 additions & 1 deletion scripts/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
install_requires=requirements,
zip_safe=False,
entry_points={
'console_scripts':['logfetch=logfetch.log_fetcher:entrypoint'],
'console_scripts':[
'logfetch=logfetch.entrypoint:fetch',
'logtail=logfetch.entrypoint:tail'
],
}
)

0 comments on commit f8a2382

Please sign in to comment.