Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Bigtable Row.commit_modifications(). #1472

Merged
merged 3 commits into from
Feb 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions gcloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import six

from gcloud._helpers import _datetime_from_microseconds
from gcloud._helpers import _microseconds_from_datetime
from gcloud._helpers import _to_bytes
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
Expand Down Expand Up @@ -449,6 +450,67 @@ def commit(self):

return result

def clear_modification_rules(self):
"""Removes all currently accumulated modifications on current row."""
del self._rule_pb_list[:]

def commit_modifications(self):
"""Makes a ``ReadModifyWriteRow`` API request.

This commits modifications made by :meth:`append_cell_value` and
:meth:`increment_cell_value`. If no modifications were made, makes
no API request and just returns ``{}``.

Modifies a row atomically, reading the latest existing timestamp/value
from the specified columns and writing a new value by appending /
incrementing. The new cell created uses either the current server time
or the highest timestamp of a cell in that column (if it exceeds the
server time).

:rtype: dict
:returns: The new contents of all modified cells. Returned as a
dictionary of column families, each of which holds a
dictionary of columns. Each column contains a list of cells
modified. Each cell is represented with a two-tuple with the
value (in bytes) and the timestamp for the cell. For example:

.. code:: python

{
u'col-fam-id': {
b'col-name1': [
(b'cell-val', datetime.datetime(...)),
(b'cell-val-newer', datetime.datetime(...)),
],
b'col-name2': [
(b'altcol-cell-val', datetime.datetime(...)),
],
},
u'col-fam-id2': {
b'col-name3-but-other-fam': [
(b'foo', datetime.datetime(...)),
],
},
}
"""
if len(self._rule_pb_list) == 0:
return {}
request_pb = messages_pb2.ReadModifyWriteRowRequest(
table_name=self._table.name,
row_key=self._row_key,
rules=self._rule_pb_list,
)
# We expect a `.data_pb2.Row`
client = self._table._cluster._client
row_response = client._data_stub.ReadModifyWriteRow(
request_pb, client.timeout_seconds)

# Reset modifications after commit-ing request.
self.clear_modification_rules()

# NOTE: We expect row_response.key == self._row_key but don't check.
return _parse_rmw_row_response(row_response)


class RowFilter(object):
"""Basic filter to apply to cells in a row.
Expand Down Expand Up @@ -1192,3 +1254,81 @@ def to_pb(self):
condition_kwargs['false_filter'] = self.false_filter.to_pb()
condition = data_pb2.RowFilter.Condition(**condition_kwargs)
return data_pb2.RowFilter(condition=condition)


def _parse_rmw_row_response(row_response):
"""Parses the response to a ``ReadModifyWriteRow`` request.

:type row_response: :class:`.data_pb2.Row`
:param row_response: The response row (with only modified cells) from a
``ReadModifyWriteRow`` request.

:rtype: dict
:returns: The new contents of all modified cells. Returned as a
dictionary of column families, each of which holds a
dictionary of columns. Each column contains a list of cells
modified. Each cell is represented with a two-tuple with the
value (in bytes) and the timestamp for the cell. For example:

.. code:: python

{
u'col-fam-id': {
b'col-name1': [
(b'cell-val', datetime.datetime(...)),
(b'cell-val-newer', datetime.datetime(...)),
],
b'col-name2': [
(b'altcol-cell-val', datetime.datetime(...)),
],
},
u'col-fam-id2': {
b'col-name3-but-other-fam': [
(b'foo', datetime.datetime(...)),
],
},
}
"""
result = {}
for column_family in row_response.families:
column_family_id, curr_family = _parse_family_pb(column_family)
result[column_family_id] = curr_family
return result


def _parse_family_pb(family_pb):
"""Parses a Family protobuf into a dictionary.

:type family_pb: :class:`._generated.bigtable_data_pb2.Family`
:param family_pb: A protobuf

:rtype: tuple
:returns: A string and dictionary. The string is the name of the
column family and the dictionary has column names (within the
family) as keys and cell lists as values. Each cell is
represented with a two-tuple with the value (in bytes) and the
timestamp for the cell. For example:

.. code:: python

{
b'col-name1': [
(b'cell-val', datetime.datetime(...)),
(b'cell-val-newer', datetime.datetime(...)),
],
b'col-name2': [
(b'altcol-cell-val', datetime.datetime(...)),
],
}
"""
result = {}
for column in family_pb.columns:
result[column.qualifier] = cells = []
for cell in column.cells:
val_pair = (
cell.value,
_datetime_from_microseconds(cell.timestamp_micros),
)
cells.append(val_pair)

return family_pb.name, result
227 changes: 227 additions & 0 deletions gcloud/bigtable/test_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,88 @@ def test_commit_with_filter_no_mutations(self):
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])

def test_commit_modifications(self):
from gcloud._testing import _Monkey
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._testing import _FakeStub
from gcloud.bigtable import row as MUT

row_key = b'row_key'
table_name = 'projects/more-stuff'
column_family_id = u'column_family_id'
column = b'column'
timeout_seconds = 87
client = _Client(timeout_seconds=timeout_seconds)
table = _Table(table_name, client=client)
row = self._makeOne(row_key, table)

# Create request_pb
value = b'bytes-value'
# We will call row.append_cell_value(COLUMN_FAMILY_ID, COLUMN, value).
request_pb = messages_pb2.ReadModifyWriteRowRequest(
table_name=table_name,
row_key=row_key,
rules=[
data_pb2.ReadModifyWriteRule(
family_name=column_family_id,
column_qualifier=column,
append_value=value,
),
],
)

# Create response_pb
response_pb = object()

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_pb)

# Create expected_result.
row_responses = []
expected_result = object()

def mock_parse_rmw_row_response(row_response):
row_responses.append(row_response)
return expected_result

# Perform the method and check the result.
with _Monkey(MUT, _parse_rmw_row_response=mock_parse_rmw_row_response):
row.append_cell_value(column_family_id, column, value)
result = row.commit_modifications()

self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'ReadModifyWriteRow',
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(row._pb_mutations, [])
self.assertEqual(row._true_pb_mutations, None)
self.assertEqual(row._false_pb_mutations, None)

self.assertEqual(row_responses, [response_pb])
self.assertEqual(row._rule_pb_list, [])

def test_commit_modifications_no_rules(self):
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
client = _Client()
table = _Table(None, client=client)
row = self._makeOne(row_key, table)
self.assertEqual(row._rule_pb_list, [])

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub()

# Perform the method and check the result.
result = row.commit_modifications()
self.assertEqual(result, {})
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])


class Test_BoolFilter(unittest2.TestCase):

Expand Down Expand Up @@ -1535,6 +1617,151 @@ def test_to_pb_false_only(self):
self.assertEqual(filter_pb, expected_pb)


class Test__parse_rmw_row_response(unittest2.TestCase):

def _callFUT(self, row_response):
from gcloud.bigtable.row import _parse_rmw_row_response
return _parse_rmw_row_response(row_response)

def test_it(self):
from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2

col_fam1 = u'col-fam-id'
col_fam2 = u'col-fam-id2'
col_name1 = b'col-name1'
col_name2 = b'col-name2'
col_name3 = b'col-name3-but-other-fam'
cell_val1 = b'cell-val'
cell_val2 = b'cell-val-newer'
cell_val3 = b'altcol-cell-val'
cell_val4 = b'foo'

microseconds = 1000871
timestamp = _datetime_from_microseconds(microseconds)
expected_output = {
col_fam1: {
col_name1: [
(cell_val1, timestamp),
(cell_val2, timestamp),
],
col_name2: [
(cell_val3, timestamp),
],
},
col_fam2: {
col_name3: [
(cell_val4, timestamp),
],
},
}
sample_input = data_pb2.Row(
families=[
data_pb2.Family(
name=col_fam1,
columns=[
data_pb2.Column(
qualifier=col_name1,
cells=[
data_pb2.Cell(
value=cell_val1,
timestamp_micros=microseconds,
),
data_pb2.Cell(
value=cell_val2,
timestamp_micros=microseconds,
),
],
),
data_pb2.Column(
qualifier=col_name2,
cells=[
data_pb2.Cell(
value=cell_val3,
timestamp_micros=microseconds,
),
],
),
],
),
data_pb2.Family(
name=col_fam2,
columns=[
data_pb2.Column(
qualifier=col_name3,
cells=[
data_pb2.Cell(
value=cell_val4,
timestamp_micros=microseconds,
),
],
),
],
),
],
)
self.assertEqual(expected_output, self._callFUT(sample_input))


class Test__parse_family_pb(unittest2.TestCase):

def _callFUT(self, family_pb):
from gcloud.bigtable.row import _parse_family_pb
return _parse_family_pb(family_pb)

def test_it(self):
from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2

col_fam1 = u'col-fam-id'
col_name1 = b'col-name1'
col_name2 = b'col-name2'
cell_val1 = b'cell-val'
cell_val2 = b'cell-val-newer'
cell_val3 = b'altcol-cell-val'

microseconds = 5554441037
timestamp = _datetime_from_microseconds(microseconds)
expected_dict = {
col_name1: [
(cell_val1, timestamp),
(cell_val2, timestamp),
],
col_name2: [
(cell_val3, timestamp),
],
}
expected_output = (col_fam1, expected_dict)
sample_input = data_pb2.Family(
name=col_fam1,
columns=[
data_pb2.Column(
qualifier=col_name1,
cells=[
data_pb2.Cell(
value=cell_val1,
timestamp_micros=microseconds,
),
data_pb2.Cell(
value=cell_val2,
timestamp_micros=microseconds,
),
],
),
data_pb2.Column(
qualifier=col_name2,
cells=[
data_pb2.Cell(
value=cell_val3,
timestamp_micros=microseconds,
),
],
),
],
)
self.assertEqual(expected_output, self._callFUT(sample_input))


class _Client(object):

data_stub = None
Expand Down
Loading