Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: massive logs streaming #424

Merged
merged 16 commits into from
Jun 12, 2024
2 changes: 2 additions & 0 deletions docker-compose.development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ services:
- MF_UI_METADATA_HOST=${MF_UI_METADATA_HOST:-0.0.0.0}
- MF_METADATA_DB_POOL_MIN=1
- MF_METADATA_DB_POOL_MAX=10
- METAFLOW_S3_RETRY_COUNT=0
- LOGLEVEL=INFO
- AIOPG_ECHO=0
- UI_ENABLED=0
Expand All @@ -51,6 +52,7 @@ services:
- NOTIFICATIONS=$NOTIFICATIONS
- GA_TRACKING_ID=none
- PLUGINS=$PLUGINS
- AWS_PROFILE=$AWS_PROFILE
depends_on:
- migration
metadata:
Expand Down
30 changes: 24 additions & 6 deletions services/ui_backend_service/api/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from services.data.db_utils import DBResponse, translate_run_key, translate_task_key, DBPagination, DBResponse
from services.utils import handle_exceptions, web_response
from .utils import format_response_list, get_pathspec_from_request
from .utils import format_response_list, get_pathspec_from_request, logger

from aiohttp import web
from multidict import MultiDict
Expand Down Expand Up @@ -240,8 +240,10 @@ async def get_task_log_file(self, request, logtype=STDOUT):
attempt=task['attempt_id']
)

lines, _ = await read_and_output(self.cache, task, logtype, output_raw=True)
return file_download_response(log_filename, lines)
def _gen():
return stream_pages(self.cache, task, logtype, output_raw=True)

return await file_download_response(request, log_filename, _gen)


async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_order=False, output_raw=False):
Expand All @@ -264,6 +266,16 @@ async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_
return log_response["content"], log_response["pages"]


async def stream_pages(cache_client, task, logtype, output_raw):
page = 1
while True:
logs, _ = await read_and_output(cache_client, task, logtype, limit=1000, page=page, output_raw=output_raw)
if not logs:
break
yield logs
page += 1


def get_pagination_params(request):
"""extract pagination params from request
"""
Expand All @@ -281,11 +293,17 @@ def get_pagination_params(request):
return limit, page, reverse_order


def file_download_response(filename, body):
return web.Response(
async def file_download_response(request, filename, async_line_iterator):
response = web.StreamResponse(
headers=MultiDict({'Content-Disposition': 'Attachment;filename={}'.format(filename)}),
body=body
)
await response.prepare(request)
# NOTE: this can not handle errors thrown by the cache, as status cannot be changed after .prepare() has been called.
async for lines in async_line_iterator():
await response.write(lines.encode("utf-8"))

await response.write_eof()
return response


class LogException(Exception):
Expand Down
7 changes: 4 additions & 3 deletions services/ui_backend_service/data/cache/client/cache_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def format_request(cls, *args, **kwargs):
is proxied by `cache_client` as a client-facing API
of the action.

Function returns a four-tuple:
Function returns a tuple:
1. `message`: an arbitrary JSON-encodable payload that
is passed to `execute`.
2. `obj_keys`: a list of keys that the action promises
Expand All @@ -43,8 +43,9 @@ def format_request(cls, *args, **kwargs):
be purged from the cache before other objects.
5. `invalidate_cache`: boolean to indicate if existing
cache keys should be invalidated.
6. `ephemeral_storage_path` : optional path for persisting files across cache action invocations
"""
# return message, obj_keys, stream_key, disposable_keys, invalidate_cache
# return message, obj_keys, stream_key, disposable_keys, invalidate_cache, ephemeral_storage_path
raise NotImplementedError

@classmethod
Expand Down Expand Up @@ -106,7 +107,7 @@ class Check(CacheAction):
@classmethod
def format_request(cls, *args, **kwargs):
key = 'check-%s' % uuid.uuid4()
return None, [key], None, [key], False
return None, [key], None, [key], False, None

@classmethod
def response(cls, keys_objs):
Expand Down
11 changes: 7 additions & 4 deletions services/ui_backend_service/data/cache/client/cache_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def _send(self, op, **kwargs):
def _action(self, cls):

def _call(*args, **kwargs):
msg, keys, stream_key, disposable_keys, invalidate_cache =\
msg, keys, stream_key, disposable_keys, invalidate_cache, ephemeral_path =\
cls.format_request(*args, **kwargs)
future = CacheFuture(keys, stream_key, self, cls, self._root)
if future.key_paths_ready() and not invalidate_cache:
Expand All @@ -217,7 +217,8 @@ def _call(*args, **kwargs):
stream_key=stream_key,
message=msg,
disposable_keys=disposable_keys,
invalidate_cache=invalidate_cache)
invalidate_cache=invalidate_cache,
ephemeral_path=ephemeral_path)

return self.request_and_return([req] if req else [], future)

Expand Down Expand Up @@ -292,7 +293,8 @@ def server_request(op,
message=None,
disposable_keys=None,
idempotency_token=None,
invalidate_cache=False):
invalidate_cache=False,
ephemeral_path=None):

if idempotency_token is None:
fields = [op]
Expand All @@ -315,5 +317,6 @@ def server_request(op,
'message': message,
'idempotency_token': token,
'disposable_keys': disposable_keys,
'invalidate_cache': invalidate_cache
'invalidate_cache': invalidate_cache,
'ephemeral_path': ephemeral_path
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ def terminate(self):
missing = self.filestore.commit(self.tempdir,
self.request['keys'],
self.request['stream_key'],
self.request['disposable_keys'])
self.request['disposable_keys'],
self.request['ephemeral_path'])
if missing:
self.echo("failed to produce the following keys: %s"
% ','.join(missing))
Expand Down
18 changes: 17 additions & 1 deletion services/ui_backend_service/data/cache/client/cache_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def close_tempdir(self, tempdir):
self.safe_fileop(os.unlink, stream)
self.safe_fileop(shutil.rmtree, tempdir)

def commit(self, tempdir, keys, stream_key, disposable_keys):
def commit(self, tempdir, keys, stream_key, disposable_keys, ephemeral_path=None):
def _insert(queue, key, value):
# we want to update the object's location in the
# OrderedDict, so we will need to delete any possible
Expand Down Expand Up @@ -293,5 +293,21 @@ def _insert(queue, key, value):
else:
missing.append(key)

# Additionally, if the ephemeral path contains anything,
# we want to make sure that these objects are tracked for GC as well.
if ephemeral_path is not None and os.path.exists(ephemeral_path):
for dirpath, dirnames, files in os.walk(ephemeral_path):
for file in files:
p = os.path.join(dirpath, file)
sz = filesize(p)
if sz is None:
continue
if p in self.objects_queue:
# cover possible file size changes
old_size = self.objects_queue[p]
sz = (sz - old_size)
self.total_size += sz
_insert(self.objects_queue, p, sz)

self._gc_objects()
return missing
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def format_request(cls, flow_id, run_number, invalidate_cache=False):
[result_key], \
stream_key, \
[stream_key], \
invalidate_cache
invalidate_cache, \
None

@classmethod
def response(cls, keys_objs):
Expand Down
3 changes: 2 additions & 1 deletion services/ui_backend_service/data/cache/get_data_action.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def format_request(cls, *args, **kwargs):
target_keys, \
stream_key, \
[stream_key], \
invalidate_cache
invalidate_cache, \
None

@classmethod
def response(cls, keys_objs):
Expand Down
Loading
Loading