Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: Add 'PartialRowsData.cancel'. #8176

Merged
merged 9 commits into from
Jun 20, 2019
8 changes: 7 additions & 1 deletion bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
self.rows = {}
self._state = self.STATE_NEW_ROW

# Flag to stop iteration, for any reason not related to self.retry()
self._cancelled = False

@property
def state(self):
"""State machine state.
Expand All @@ -388,6 +391,7 @@ def state(self):

def cancel(self):
"""Cancels the iterator, closing the stream."""
self._cancelled = True
self.response_iterator.cancel()

def consume_all(self, max_loops=None):
Expand Down Expand Up @@ -436,7 +440,7 @@ def __iter__(self):
Parse the response and its chunks into a new/existing row in
:attr:`_rows`. Rows are returned in order by row key.
"""
while True:
while not self._cancelled:
try:
response = self._read_next_response()
except StopIteration:
Expand All @@ -445,6 +449,8 @@ def __iter__(self):
break

for chunk in response.chunks:
if self._cancelled:
break
self._process_chunk(chunk)
if chunk.commit_row:
self.last_scanned_row_key = self._previous_row.row_key
Expand Down
1 change: 1 addition & 0 deletions bigtable/tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ def test_cancel(self):
self.assertEqual(response_iterator.cancel_calls, 0)
yield_rows_data.cancel()
self.assertEqual(response_iterator.cancel_calls, 1)
self.assertEqual(list(yield_rows_data), [])

# 'consume_next' tested via 'TestPartialRowsData_JSON_acceptance_tests'

Expand Down