Skip to content

Commit

Permalink
BigTable: PartialRowsData iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
chemelnucfin committed Jan 19, 2018
1 parent 95aaa50 commit 4ac3bf5
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 59 deletions.
124 changes: 71 additions & 53 deletions bigtable/google/cloud/bigtable/row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,69 @@ def __eq__(self, other):
def __ne__(self, other):
return not self == other

def __iter__(self):
return self._consume_next(True)

def _consume_next(self, yield_=False):
""" Helper for consume_next.
:type yield_: bool
:param yield_: if True, yields rows as they complete,
else finish iteration of the response_iterator
"""
while True:
response = six.next(self._response_iterator)
self._counter += 1

if self._last_scanned_row_key is None: # first response
if response.last_scanned_row_key:
raise InvalidReadRowsResponse()

self._last_scanned_row_key = response.last_scanned_row_key

row = self._row
cell = self._cell

for chunk in response.chunks:

self._validate_chunk(chunk)

if chunk.reset_row:
row = self._row = None
cell = self._cell = self._previous_cell = None
continue

if row is None:
row = self._row = PartialRowData(chunk.row_key)

if cell is None:
qualifier = None
if chunk.HasField('qualifier'):
qualifier = chunk.qualifier.value

cell = self._cell = PartialCellData(
chunk.row_key,
chunk.family_name.value,
qualifier,
chunk.timestamp_micros,
chunk.labels,
chunk.value)
self._copy_from_previous(cell)
else:
cell.append_value(chunk.value)

if chunk.commit_row:
self._save_current_row()
row = cell = None
if yield_:
yield self._previous_row
continue

if chunk.value_size == 0:
self._save_current_cell()
cell = None
break

@property
def state(self):
"""State machine state.
Expand Down Expand Up @@ -258,54 +321,10 @@ def consume_next(self):
Parse the response and its chunks into a new/existing row in
:attr:`_rows`. Rows are returned in order by row key.
"""
response = six.next(self._response_iterator)
self._counter += 1

if self._last_scanned_row_key is None: # first response
if response.last_scanned_row_key:
raise InvalidReadRowsResponse()

self._last_scanned_row_key = response.last_scanned_row_key

row = self._row
cell = self._cell

for chunk in response.chunks:

self._validate_chunk(chunk)

if chunk.reset_row:
row = self._row = None
cell = self._cell = self._previous_cell = None
continue

if row is None:
row = self._row = PartialRowData(chunk.row_key)

if cell is None:
qualifier = None
if chunk.HasField('qualifier'):
qualifier = chunk.qualifier.value

cell = self._cell = PartialCellData(
chunk.row_key,
chunk.family_name.value,
qualifier,
chunk.timestamp_micros,
chunk.labels,
chunk.value)
self._copy_from_previous(cell)
else:
cell.append_value(chunk.value)

if chunk.commit_row:
self._save_current_row()
row = cell = None
continue

if chunk.value_size == 0:
self._save_current_cell()
cell = None
try:
next(self._consume_next(False))
except StopIteration:
return False

def consume_all(self, max_loops=None):
"""Consume the streamed responses until there are no more.
Expand All @@ -320,13 +339,12 @@ def consume_all(self, max_loops=None):
"""
curr_loop = 0
if max_loops is None:
max_loops = float('inf')
while True:
if self.consume_next() is False: # guard against None
return
while curr_loop < max_loops:
curr_loop += 1
try:
self.consume_next()
except StopIteration:
break
self.consume_next()

@staticmethod
def _validate_chunk_status(chunk):
Expand Down
17 changes: 17 additions & 0 deletions bigtable/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,23 @@ def test_read_row(self):
}
self.assertEqual(partial_row_data.cells, expected_row_contents)

def test_read_rows_iter(self):
row = self._table.row(ROW_KEY)
row_alt = self._table.row(ROW_KEY_ALT)
self.rows_to_delete.extend([row, row_alt])

cell1, cell2, cell3, cell4 = self._write_to_row(row, row_alt,
row, row_alt)
row.commit()
row_alt.commit()
keys = [ROW_KEY, ROW_KEY_ALT]
rows_data = self._table.read_rows()
self.assertEqual(rows_data.rows, {})
for data, key in zip(rows_data, keys):
self.assertEqual(data.row_key, key)
self.assertEqual(data, self._table.read_row(key))
self.assertEqual(data.cells, self._table.read_row(key).cells)

def test_read_rows(self):
row = self._table.row(ROW_KEY)
row_alt = self._table.row(ROW_KEY_ALT)
Expand Down
45 changes: 39 additions & 6 deletions bigtable/tests/unit/test_row_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,13 @@ def __init__(self, *args, **kwargs):
self._consumed = []

def consume_next(self):
value = self._response_iterator.next()
self._consumed.append(value)
return value
try:
value = self._response_iterator.next()
self._consumed.append(value)
return value
except StopIteration:
return False


return FakePartialRowsData

Expand Down Expand Up @@ -525,6 +529,21 @@ def test_invalid_last_row_missing_commit(self):

# Non-error cases

def test_iter(self):
values = [mock.Mock()] * 3
chunks, results = self._load_json_test('two rows')

for value in values:
value.chunks = chunks
response_iterator = _MockCancellableIterator(*values)

partial_rows = self._make_one(response_iterator)
partial_rows._last_scanned_row_key = 'BEFORE'

for data, value in zip(partial_rows, results):
flattened = self._sort_flattend_cells(_flatten_cells(data))
self.assertEqual(flattened[0], value)

_marker = object()

def _match_results(self, testcase_name, expected_result=_marker):
Expand Down Expand Up @@ -644,12 +663,26 @@ def _flatten_cells(prd):
from google.cloud._helpers import _bytes_to_unicode
from google.cloud._helpers import _microseconds_from_datetime

for row_key, row in prd.rows.items():
for family_name, family in row.cells.items():
try:
for row_key, row in prd.rows.items():
for family_name, family in row.cells.items():
for qualifier, column in family.items():
for cell in column:
yield {
u'rk': _bytes_to_unicode(row_key),
u'fm': family_name,
u'qual': _bytes_to_unicode(qualifier),
u'ts': _microseconds_from_datetime(cell.timestamp),
u'value': _bytes_to_unicode(cell.value),
u'label': u' '.join(cell.labels),
u'error': False,
}
except AttributeError:
for family_name, family in prd.cells.items():
for qualifier, column in family.items():
for cell in column:
yield {
u'rk': _bytes_to_unicode(row_key),
u'rk': _bytes_to_unicode(prd.row_key),
u'fm': family_name,
u'qual': _bytes_to_unicode(qualifier),
u'ts': _microseconds_from_datetime(cell.timestamp),
Expand Down

0 comments on commit 4ac3bf5

Please sign in to comment.