From 216ab1c86d43c405fe7b2f8d2efe7e0a4e1fb872 Mon Sep 17 00:00:00 2001 From: Sakari Ikonen Date: Mon, 27 May 2024 16:23:11 +0300 Subject: [PATCH] fix line buffering with low chunk sizes --- metaflow/datastore/task_datastore.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/metaflow/datastore/task_datastore.py b/metaflow/datastore/task_datastore.py index 298da265077..7b2c9c25c7f 100644 --- a/metaflow/datastore/task_datastore.py +++ b/metaflow/datastore/task_datastore.py @@ -865,7 +865,17 @@ def stream_logs( elif len(lines) == 1 and val: # the chunk did not contain a newline, # instead the bytes for the logline continue to the next chunk - buffer[k] = val + if v is None: + # if we did not get a new value this round, + # we know to output everything from the buffer. + output[k] = val + else: + # otherwise keep buffering + buffer[k] = val + + if not output and buffer: + # we did not have enough data to return yet + continue yield [(paths[k], v if v is not None else b"") for k, v in output.items()]