Skip to content

Commit

Permalink
Implement multi-use snapshots (googleapis#3615)
Browse files Browse the repository at this point in the history
  • Loading branch information
tseaver authored and landrito committed Aug 22, 2017
1 parent 1c68bda commit e2ec1e1
Show file tree
Hide file tree
Showing 11 changed files with 803 additions and 530 deletions.
70 changes: 13 additions & 57 deletions spanner/google/cloud/spanner/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,7 @@ def batch(self):
"""
return BatchCheckout(self)

def snapshot(self, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
def snapshot(self, **kw):
"""Return an object which wraps a snapshot.
The wrapper *must* be used as a context manager, with the snapshot
Expand All @@ -390,38 +389,15 @@ def snapshot(self, read_timestamp=None, min_read_timestamp=None,
See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly
If no options are passed, reads will use the ``strong`` model, reading
at a timestamp where all previously committed transactions are visible.
:type read_timestamp: :class:`datetime.datetime`
:param read_timestamp: Execute all reads at the given timestamp.
:type min_read_timestamp: :class:`datetime.datetime`
:param min_read_timestamp: Execute all reads at a
timestamp >= ``min_read_timestamp``.
:type max_staleness: :class:`datetime.timedelta`
:param max_staleness: Read data at a
timestamp >= NOW - ``max_staleness`` seconds.
:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.
:rtype: :class:`~google.cloud.spanner.snapshot.Snapshot`
:returns: a snapshot bound to this session
:raises: :exc:`ValueError` if the session has not yet been created.
:type kw: dict
:param kw:
Passed through to
:class:`~google.cloud.spanner.snapshot.Snapshot` constructor.
:rtype: :class:`~google.cloud.spanner.database.SnapshotCheckout`
:returns: new wrapper
"""
return SnapshotCheckout(
self,
read_timestamp=read_timestamp,
min_read_timestamp=min_read_timestamp,
max_staleness=max_staleness,
exact_staleness=exact_staleness,
)
return SnapshotCheckout(self, **kw)


class BatchCheckout(object):
Expand Down Expand Up @@ -467,40 +443,20 @@ class SnapshotCheckout(object):
:type database: :class:`~google.cloud.spannder.database.Database`
:param database: database to use
:type read_timestamp: :class:`datetime.datetime`
:param read_timestamp: Execute all reads at the given timestamp.
:type min_read_timestamp: :class:`datetime.datetime`
:param min_read_timestamp: Execute all reads at a
timestamp >= ``min_read_timestamp``.
:type max_staleness: :class:`datetime.timedelta`
:param max_staleness: Read data at a
timestamp >= NOW - ``max_staleness`` seconds.
:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.
:type kw: dict
:param kw:
Passed through to
:class:`~google.cloud.spanner.snapshot.Snapshot` constructor.
"""
def __init__(self, database, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
def __init__(self, database, **kw):
self._database = database
self._session = None
self._read_timestamp = read_timestamp
self._min_read_timestamp = min_read_timestamp
self._max_staleness = max_staleness
self._exact_staleness = exact_staleness
self._kw = kw

def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
return Snapshot(
session,
read_timestamp=self._read_timestamp,
min_read_timestamp=self._min_read_timestamp,
max_staleness=self._max_staleness,
exact_staleness=self._exact_staleness,
)
return Snapshot(session, **self._kw)

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
Expand Down
31 changes: 6 additions & 25 deletions spanner/google/cloud/spanner/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,30 +139,15 @@ def delete(self):
raise NotFound(self.name)
raise

def snapshot(self, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
def snapshot(self, **kw):
"""Create a snapshot to perform a set of reads with shared staleness.
See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly
If no options are passed, reads will use the ``strong`` model, reading
at a timestamp where all previously committed transactions are visible.
:type read_timestamp: :class:`datetime.datetime`
:param read_timestamp: Execute all reads at the given timestamp.
:type min_read_timestamp: :class:`datetime.datetime`
:param min_read_timestamp: Execute all reads at a
timestamp >= ``min_read_timestamp``.
:type max_staleness: :class:`datetime.timedelta`
:param max_staleness: Read data at a
timestamp >= NOW - ``max_staleness`` seconds.
:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.
:type kw: dict
:param kw: Passed through to
:class:`~google.cloud.spanner.snapshot.Snapshot` ctor.
:rtype: :class:`~google.cloud.spanner.snapshot.Snapshot`
:returns: a snapshot bound to this session
Expand All @@ -171,11 +156,7 @@ def snapshot(self, read_timestamp=None, min_read_timestamp=None,
if self._session_id is None:
raise ValueError("Session has not been created.")

return Snapshot(self,
read_timestamp=read_timestamp,
min_read_timestamp=min_read_timestamp,
max_staleness=max_staleness,
exact_staleness=exact_staleness)
return Snapshot(self, **kw)

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
Expand Down Expand Up @@ -292,7 +273,7 @@ def run_in_transaction(self, func, *args, **kw):
txn = self.transaction()
else:
txn = self._transaction
if txn._id is None:
if txn._transaction_id is None:
txn.begin()
try:
func(txn, *args, **kw)
Expand Down
85 changes: 81 additions & 4 deletions spanner/google/cloud/spanner/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class _SnapshotBase(_SessionWrapper):
:type session: :class:`~google.cloud.spanner.session.Session`
:param session: the session used to perform the commit
"""
_multi_use = False
_transaction_id = None
_read_request_count = 0

def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
"""Helper for :meth:`read` / :meth:`execute_sql`.
Expand Down Expand Up @@ -70,7 +74,15 @@ def read(self, table, columns, keyset, index='', limit=0,
:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises: ValueError for reuse of single-use snapshots, or if a
transaction ID is pending for multiple-use snapshots.
"""
if self._read_request_count > 0:
if not self._multi_use:
raise ValueError("Cannot re-use single-use snapshot.")
if self._transaction_id is None:
raise ValueError("Transaction ID pending.")

database = self._session._database
api = database.spanner_api
options = _options_with_prefix(database.name)
Expand All @@ -81,7 +93,12 @@ def read(self, table, columns, keyset, index='', limit=0,
transaction=transaction, index=index, limit=limit,
resume_token=resume_token, options=options)

return StreamedResultSet(iterator)
self._read_request_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self)
else:
return StreamedResultSet(iterator)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
resume_token=b''):
Expand Down Expand Up @@ -109,7 +126,15 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises: ValueError for reuse of single-use snapshots, or if a
transaction ID is pending for multiple-use snapshots.
"""
if self._read_request_count > 0:
if not self._multi_use:
raise ValueError("Cannot re-use single-use snapshot.")
if self._transaction_id is None:
raise ValueError("Transaction ID pending.")

if params is not None:
if param_types is None:
raise ValueError(
Expand All @@ -128,7 +153,12 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
transaction=transaction, params=params_pb, param_types=param_types,
query_mode=query_mode, resume_token=resume_token, options=options)

return StreamedResultSet(iterator)
self._read_request_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self)
else:
return StreamedResultSet(iterator)


class Snapshot(_SnapshotBase):
Expand Down Expand Up @@ -157,9 +187,16 @@ class Snapshot(_SnapshotBase):
:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.
:type multi_use: :class:`bool`
:param multi_use: If true, multipl :meth:`read` / :meth:`execute_sql`
calls can be performed with the snapshot in the
context of a read-only transaction, used to ensure
isolation / consistency. Incompatible with
``max_staleness`` and ``min_read_timestamp``.
"""
def __init__(self, session, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
max_staleness=None, exact_staleness=None, multi_use=False):
super(Snapshot, self).__init__(session)
opts = [
read_timestamp, min_read_timestamp, max_staleness, exact_staleness]
Expand All @@ -168,14 +205,24 @@ def __init__(self, session, read_timestamp=None, min_read_timestamp=None,
if len(flagged) > 1:
raise ValueError("Supply zero or one options.")

if multi_use:
if min_read_timestamp is not None or max_staleness is not None:
raise ValueError(
"'multi_use' is incompatible with "
"'min_read_timestamp' / 'max_staleness'")

self._strong = len(flagged) == 0
self._read_timestamp = read_timestamp
self._min_read_timestamp = min_read_timestamp
self._max_staleness = max_staleness
self._exact_staleness = exact_staleness
self._multi_use = multi_use

def _make_txn_selector(self):
"""Helper for :meth:`read`."""
if self._transaction_id is not None:
return TransactionSelector(id=self._transaction_id)

if self._read_timestamp:
key = 'read_timestamp'
value = _datetime_to_pb_timestamp(self._read_timestamp)
Expand All @@ -194,4 +241,34 @@ def _make_txn_selector(self):

options = TransactionOptions(
read_only=TransactionOptions.ReadOnly(**{key: value}))
return TransactionSelector(single_use=options)

if self._multi_use:
return TransactionSelector(begin=options)
else:
return TransactionSelector(single_use=options)

def begin(self):
"""Begin a transaction on the database.
:rtype: bytes
:returns: the ID for the newly-begun transaction.
:raises: ValueError if the transaction is already begun, committed,
or rolled back.
"""
if not self._multi_use:
raise ValueError("Cannot call 'begin' single-use snapshots")

if self._transaction_id is not None:
raise ValueError("Read-only transaction already begun")

if self._read_request_count > 0:
raise ValueError("Read-only transaction already pending")

database = self._session._database
api = database.spanner_api
options = _options_with_prefix(database.name)
txn_selector = self._make_txn_selector()
response = api.begin_transaction(
self._session.name, txn_selector.begin, options=options)
self._transaction_id = response.id
return self._transaction_id
12 changes: 10 additions & 2 deletions spanner/google/cloud/spanner/streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ class StreamedResultSet(object):
Iterator yielding
:class:`google.cloud.proto.spanner.v1.result_set_pb2.PartialResultSet`
instances.
:type source: :class:`~google.cloud.spanner.snapshot.Snapshot`
:param source: Snapshot from which the result set was fetched.
"""
def __init__(self, response_iterator):
def __init__(self, response_iterator, source=None):
self._response_iterator = response_iterator
self._rows = [] # Fully-processed rows
self._counter = 0 # Counter for processed responses
Expand All @@ -42,6 +45,7 @@ def __init__(self, response_iterator):
self._resume_token = None # To resume from last received PRS
self._current_row = [] # Accumulated values for incomplete row
self._pending_chunk = None # Incomplete value
self._source = source # Source snapshot

@property
def rows(self):
Expand Down Expand Up @@ -130,7 +134,11 @@ def consume_next(self):
self._resume_token = response.resume_token

if self._metadata is None: # first response
self._metadata = response.metadata
metadata = self._metadata = response.metadata

source = self._source
if source is not None and source._transaction_id is None:
source._transaction_id = metadata.transaction.id

if response.HasField('stats'): # last response
self._stats = response.stats
Expand Down
Loading

0 comments on commit e2ec1e1

Please sign in to comment.