Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement multi-use snapshots #3615

Merged
merged 19 commits into from
Jul 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 13 additions & 57 deletions spanner/google/cloud/spanner/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,7 @@ def batch(self):
"""
return BatchCheckout(self)

def snapshot(self, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
def snapshot(self, **kw):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""Return an object which wraps a snapshot.

The wrapper *must* be used as a context manager, with the snapshot
Expand All @@ -390,38 +389,15 @@ def snapshot(self, read_timestamp=None, min_read_timestamp=None,
See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly

If no options are passed, reads will use the ``strong`` model, reading
at a timestamp where all previously committed transactions are visible.

:type read_timestamp: :class:`datetime.datetime`
:param read_timestamp: Execute all reads at the given timestamp.

:type min_read_timestamp: :class:`datetime.datetime`
:param min_read_timestamp: Execute all reads at a
timestamp >= ``min_read_timestamp``.

:type max_staleness: :class:`datetime.timedelta`
:param max_staleness: Read data at a
timestamp >= NOW - ``max_staleness`` seconds.

:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.

:rtype: :class:`~google.cloud.spanner.snapshot.Snapshot`
:returns: a snapshot bound to this session
:raises: :exc:`ValueError` if the session has not yet been created.
:type kw: dict
:param kw:
Passed through to
:class:`~google.cloud.spanner.snapshot.Snapshot` constructor.

:rtype: :class:`~google.cloud.spanner.database.SnapshotCheckout`
:returns: new wrapper
"""
return SnapshotCheckout(
self,
read_timestamp=read_timestamp,
min_read_timestamp=min_read_timestamp,
max_staleness=max_staleness,
exact_staleness=exact_staleness,
)
return SnapshotCheckout(self, **kw)


class BatchCheckout(object):
Expand Down Expand Up @@ -467,40 +443,20 @@ class SnapshotCheckout(object):
:type database: :class:`~google.cloud.spannder.database.Database`
:param database: database to use

:type read_timestamp: :class:`datetime.datetime`
:param read_timestamp: Execute all reads at the given timestamp.

:type min_read_timestamp: :class:`datetime.datetime`
:param min_read_timestamp: Execute all reads at a
timestamp >= ``min_read_timestamp``.

:type max_staleness: :class:`datetime.timedelta`
:param max_staleness: Read data at a
timestamp >= NOW - ``max_staleness`` seconds.

:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.
:type kw: dict
:param kw:
Passed through to
:class:`~google.cloud.spanner.snapshot.Snapshot` constructor.
"""
def __init__(self, database, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
def __init__(self, database, **kw):
self._database = database
self._session = None
self._read_timestamp = read_timestamp
self._min_read_timestamp = min_read_timestamp
self._max_staleness = max_staleness
self._exact_staleness = exact_staleness
self._kw = kw

def __enter__(self):
"""Begin ``with`` block."""
session = self._session = self._database._pool.get()
return Snapshot(
session,
read_timestamp=self._read_timestamp,
min_read_timestamp=self._min_read_timestamp,
max_staleness=self._max_staleness,
exact_staleness=self._exact_staleness,
)
return Snapshot(session, **self._kw)

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
Expand Down
31 changes: 6 additions & 25 deletions spanner/google/cloud/spanner/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,30 +139,15 @@ def delete(self):
raise NotFound(self.name)
raise

def snapshot(self, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
def snapshot(self, **kw):
"""Create a snapshot to perform a set of reads with shared staleness.

See
https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.TransactionOptions.ReadOnly

If no options are passed, reads will use the ``strong`` model, reading
at a timestamp where all previously committed transactions are visible.

:type read_timestamp: :class:`datetime.datetime`
:param read_timestamp: Execute all reads at the given timestamp.

:type min_read_timestamp: :class:`datetime.datetime`
:param min_read_timestamp: Execute all reads at a
timestamp >= ``min_read_timestamp``.

:type max_staleness: :class:`datetime.timedelta`
:param max_staleness: Read data at a
timestamp >= NOW - ``max_staleness`` seconds.

:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.
:type kw: dict
:param kw: Passed through to
:class:`~google.cloud.spanner.snapshot.Snapshot` ctor.

:rtype: :class:`~google.cloud.spanner.snapshot.Snapshot`
:returns: a snapshot bound to this session
Expand All @@ -171,11 +156,7 @@ def snapshot(self, read_timestamp=None, min_read_timestamp=None,
if self._session_id is None:
raise ValueError("Session has not been created.")

return Snapshot(self,
read_timestamp=read_timestamp,
min_read_timestamp=min_read_timestamp,
max_staleness=max_staleness,
exact_staleness=exact_staleness)
return Snapshot(self, **kw)

def read(self, table, columns, keyset, index='', limit=0,
resume_token=b''):
Expand Down Expand Up @@ -292,7 +273,7 @@ def run_in_transaction(self, func, *args, **kw):
txn = self.transaction()
else:
txn = self._transaction
if txn._id is None:
if txn._transaction_id is None:
txn.begin()
try:
func(txn, *args, **kw)
Expand Down
85 changes: 81 additions & 4 deletions spanner/google/cloud/spanner/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class _SnapshotBase(_SessionWrapper):
:type session: :class:`~google.cloud.spanner.session.Session`
:param session: the session used to perform the commit
"""
_multi_use = False
_transaction_id = None
_read_request_count = 0

def _make_txn_selector(self): # pylint: disable=redundant-returns-doc
"""Helper for :meth:`read` / :meth:`execute_sql`.

Expand Down Expand Up @@ -70,7 +74,15 @@ def read(self, table, columns, keyset, index='', limit=0,

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises: ValueError for reuse of single-use snapshots, or if a
transaction ID is pending for multiple-use snapshots.
"""
if self._read_request_count > 0:
if not self._multi_use:
raise ValueError("Cannot re-use single-use snapshot.")
if self._transaction_id is None:
raise ValueError("Transaction ID pending.")

database = self._session._database
api = database.spanner_api
options = _options_with_prefix(database.name)
Expand All @@ -81,7 +93,12 @@ def read(self, table, columns, keyset, index='', limit=0,
transaction=transaction, index=index, limit=limit,
resume_token=resume_token, options=options)

return StreamedResultSet(iterator)
self._read_request_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self)
else:
return StreamedResultSet(iterator)

def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
resume_token=b''):

This comment was marked as spam.

This comment was marked as spam.

Expand Down Expand Up @@ -109,7 +126,15 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,

:rtype: :class:`~google.cloud.spanner.streamed.StreamedResultSet`
:returns: a result set instance which can be used to consume rows.
:raises: ValueError for reuse of single-use snapshots, or if a
transaction ID is pending for multiple-use snapshots.
"""
if self._read_request_count > 0:
if not self._multi_use:
raise ValueError("Cannot re-use single-use snapshot.")
if self._transaction_id is None:
raise ValueError("Transaction ID pending.")

if params is not None:
if param_types is None:
raise ValueError(
Expand All @@ -128,7 +153,12 @@ def execute_sql(self, sql, params=None, param_types=None, query_mode=None,
transaction=transaction, params=params_pb, param_types=param_types,
query_mode=query_mode, resume_token=resume_token, options=options)

return StreamedResultSet(iterator)
self._read_request_count += 1

if self._multi_use:
return StreamedResultSet(iterator, source=self)
else:
return StreamedResultSet(iterator)


class Snapshot(_SnapshotBase):
Expand Down Expand Up @@ -157,9 +187,16 @@ class Snapshot(_SnapshotBase):
:type exact_staleness: :class:`datetime.timedelta`
:param exact_staleness: Execute all reads at a timestamp that is
``exact_staleness`` old.

:type multi_use: :class:`bool`
:param multi_use: If true, multipl :meth:`read` / :meth:`execute_sql`
calls can be performed with the snapshot in the
context of a read-only transaction, used to ensure
isolation / consistency. Incompatible with
``max_staleness`` and ``min_read_timestamp``.
"""
def __init__(self, session, read_timestamp=None, min_read_timestamp=None,
max_staleness=None, exact_staleness=None):
max_staleness=None, exact_staleness=None, multi_use=False):
super(Snapshot, self).__init__(session)
opts = [
read_timestamp, min_read_timestamp, max_staleness, exact_staleness]
Expand All @@ -168,14 +205,24 @@ def __init__(self, session, read_timestamp=None, min_read_timestamp=None,
if len(flagged) > 1:
raise ValueError("Supply zero or one options.")

if multi_use:
if min_read_timestamp is not None or max_staleness is not None:
raise ValueError(
"'multi_use' is incompatible with "
"'min_read_timestamp' / 'max_staleness'")

self._strong = len(flagged) == 0
self._read_timestamp = read_timestamp
self._min_read_timestamp = min_read_timestamp
self._max_staleness = max_staleness
self._exact_staleness = exact_staleness
self._multi_use = multi_use

def _make_txn_selector(self):
"""Helper for :meth:`read`."""
if self._transaction_id is not None:
return TransactionSelector(id=self._transaction_id)

if self._read_timestamp:
key = 'read_timestamp'
value = _datetime_to_pb_timestamp(self._read_timestamp)
Expand All @@ -194,4 +241,34 @@ def _make_txn_selector(self):

options = TransactionOptions(
read_only=TransactionOptions.ReadOnly(**{key: value}))
return TransactionSelector(single_use=options)

if self._multi_use:
return TransactionSelector(begin=options)
else:
return TransactionSelector(single_use=options)

def begin(self):
"""Begin a transaction on the database.

:rtype: bytes
:returns: the ID for the newly-begun transaction.
:raises: ValueError if the transaction is already begun, committed,
or rolled back.
"""
if not self._multi_use:
raise ValueError("Cannot call 'begin' single-use snapshots")

if self._transaction_id is not None:
raise ValueError("Read-only transaction already begun")

if self._read_request_count > 0:
raise ValueError("Read-only transaction already pending")

database = self._session._database
api = database.spanner_api
options = _options_with_prefix(database.name)
txn_selector = self._make_txn_selector()
response = api.begin_transaction(
self._session.name, txn_selector.begin, options=options)
self._transaction_id = response.id
return self._transaction_id
12 changes: 10 additions & 2 deletions spanner/google/cloud/spanner/streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ class StreamedResultSet(object):
Iterator yielding
:class:`google.cloud.proto.spanner.v1.result_set_pb2.PartialResultSet`
instances.

:type source: :class:`~google.cloud.spanner.snapshot.Snapshot`
:param source: Snapshot from which the result set was fetched.
"""
def __init__(self, response_iterator):
def __init__(self, response_iterator, source=None):
self._response_iterator = response_iterator
self._rows = [] # Fully-processed rows
self._counter = 0 # Counter for processed responses
Expand All @@ -42,6 +45,7 @@ def __init__(self, response_iterator):
self._resume_token = None # To resume from last received PRS
self._current_row = [] # Accumulated values for incomplete row
self._pending_chunk = None # Incomplete value
self._source = source # Source snapshot

@property
def rows(self):
Expand Down Expand Up @@ -130,7 +134,11 @@ def consume_next(self):
self._resume_token = response.resume_token

if self._metadata is None: # first response
self._metadata = response.metadata
metadata = self._metadata = response.metadata

source = self._source
if source is not None and source._transaction_id is None:
source._transaction_id = metadata.transaction.id

if response.HasField('stats'): # last response
self._stats = response.stats
Expand Down
Loading