Skip to content

Commit

Permalink
Merge pull request #31598: [release-2.57.0] Cherrypick #31581 into th…
Browse files Browse the repository at this point in the history
…e release branch.
  • Loading branch information
kennknowles committed Jun 13, 2024
2 parents 5f0bc1a + d7da9d3 commit d64f8ca
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def close(self):
class _GrpcDataChannel(DataChannel):
"""Base class for implementing a BeamFnData-based DataChannel."""

_WRITES_FINISHED = object()
_WRITES_FINISHED = beam_fn_api_pb2.Elements.Data()

def __init__(self, data_buffer_time_limit_ms=0):
# type: (int) -> None
Expand All @@ -475,7 +475,7 @@ def __init__(self, data_buffer_time_limit_ms=0):

def close(self):
# type: () -> None
self._to_send.put(self._WRITES_FINISHED) # type: ignore[arg-type]
self._to_send.put(self._WRITES_FINISHED)
self._closed = True

def wait(self, timeout=None):
Expand Down Expand Up @@ -639,8 +639,12 @@ def _write_outputs(self):
streams = [self._to_send.get()]
try:
# Coalesce up to 100 other items.
for _ in range(100):
streams.append(self._to_send.get_nowait())
total_size_bytes = streams[0].ByteSize()
while (total_size_bytes < _DEFAULT_SIZE_FLUSH_THRESHOLD and
len(streams) <= 100):
data_or_timer = self._to_send.get_nowait()
total_size_bytes += data_or_timer.ByteSize()
streams.append(data_or_timer)
except queue.Empty:
pass
if streams[-1] is self._WRITES_FINISHED:
Expand Down

0 comments on commit d64f8ca

Please sign in to comment.