Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bigtable: Add 'PartialRowsData.cancel'. #8176

Merged
merged 9 commits into from
Jun 20, 2019
8 changes: 7 additions & 1 deletion bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS):
self.rows = {}
self._state = self.STATE_NEW_ROW

# Flag to stop iteration, for any reason not related to self.retry()
self._stop = False
Copy link
Contributor

@crwilcox crwilcox Jun 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking over the code, it seems this is marking if the iterator has been cancelled. The comment and name of this sort of directed me to think maybe this did something different. would naming this self._cancelled be accurate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for _cancelled

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Done.


@property
def state(self):
"""State machine state.
Expand All @@ -388,6 +391,7 @@ def state(self):

def cancel(self):
"""Cancels the iterator, closing the stream."""
self._stop = True
self.response_iterator.cancel()

def consume_all(self, max_loops=None):
Expand Down Expand Up @@ -436,7 +440,7 @@ def __iter__(self):
Parse the response and its chunks into a new/existing row in
:attr:`_rows`. Rows are returned in order by row key.
"""
while True:
while not self._stop:
try:
response = self._read_next_response()
except StopIteration:
Expand All @@ -445,6 +449,8 @@ def __iter__(self):
break

for chunk in response.chunks:
if self._stop:
break
self._process_chunk(chunk)
if chunk.commit_row:
self.last_scanned_row_key = self._previous_row.row_key
Expand Down
1 change: 1 addition & 0 deletions bigtable/tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ def test_cancel(self):
self.assertEqual(response_iterator.cancel_calls, 0)
yield_rows_data.cancel()
self.assertEqual(response_iterator.cancel_calls, 1)
self.assertEqual(list(yield_rows_data), [])

# 'consume_next' tested via 'TestPartialRowsData_JSON_acceptance_tests'

Expand Down