Skip to content

Commit

Permalink
Limit the size of bundles of elements emitted by SDK into the data ou…
Browse files Browse the repository at this point in the history
…tput stream. (#31581)

* Limit the size of bundles of elements emitted by SDK into the data output stream.

* Trigger tests.

* Use a type-compliant sentinel.
  • Loading branch information
tvalentyn authored Jun 13, 2024
1 parent 30cce44 commit 635372f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run."
}

12 changes: 8 additions & 4 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def close(self):
class _GrpcDataChannel(DataChannel):
"""Base class for implementing a BeamFnData-based DataChannel."""

_WRITES_FINISHED = object()
_WRITES_FINISHED = beam_fn_api_pb2.Elements.Data()

def __init__(self, data_buffer_time_limit_ms=0):
# type: (int) -> None
Expand All @@ -475,7 +475,7 @@ def __init__(self, data_buffer_time_limit_ms=0):

def close(self):
# type: () -> None
self._to_send.put(self._WRITES_FINISHED) # type: ignore[arg-type]
self._to_send.put(self._WRITES_FINISHED)
self._closed = True

def wait(self, timeout=None):
Expand Down Expand Up @@ -639,8 +639,12 @@ def _write_outputs(self):
streams = [self._to_send.get()]
try:
# Coalesce up to 100 other items.
for _ in range(100):
streams.append(self._to_send.get_nowait())
total_size_bytes = streams[0].ByteSize()
while (total_size_bytes < _DEFAULT_SIZE_FLUSH_THRESHOLD and
len(streams) <= 100):
data_or_timer = self._to_send.get_nowait()
total_size_bytes += data_or_timer.ByteSize()
streams.append(data_or_timer)
except queue.Empty:
pass
if streams[-1] is self._WRITES_FINISHED:
Expand Down

0 comments on commit 635372f

Please sign in to comment.