Skip to content

Commit

Permalink
feat: Add read_time support for get and query (#334)
Browse files Browse the repository at this point in the history
* feat: add read_time support for get and query

* fix timezone issue in read_time unit tests

* fix sphinx check error

Co-authored-by: Mariatta Wijaya <Mariatta@users.noreply.github.com>
  • Loading branch information
yixiaoshen and Mariatta authored Jul 15, 2022
1 parent aab3d2a commit 58b4b74
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 40 deletions.
34 changes: 28 additions & 6 deletions google/cloud/datastore/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ def _extended_lookup(
transaction_id=None,
retry=None,
timeout=None,
read_time=None,
):
"""Repeat lookup until all keys found (unless stop requested).
Expand Down Expand Up @@ -157,7 +158,7 @@ def _extended_lookup(
:type transaction_id: str
:param transaction_id: If passed, make the request in the scope of
the given transaction. Incompatible with
``eventual==True``.
``eventual==True`` or ``read_time``.
:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -170,6 +171,12 @@ def _extended_lookup(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.
:type read_time: datetime
:param read_time:
(Optional) Read time to use for read consistency. Incompatible with
``eventual==True`` or ``transaction_id``.
This feature is in private preview.
:rtype: list of :class:`.entity_pb2.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if missing / deferred are not null or
Expand All @@ -186,7 +193,7 @@ def _extended_lookup(
results = []

loop_num = 0
read_options = helpers.get_read_options(eventual, transaction_id)
read_options = helpers.get_read_options(eventual, transaction_id, read_time)
while loop_num < _MAX_LOOPS: # loop against possible deferred.
loop_num += 1
lookup_response = datastore_api.lookup(
Expand Down Expand Up @@ -401,6 +408,7 @@ def get(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
"""Retrieve an entity from a single key (if it exists).
Expand Down Expand Up @@ -430,7 +438,8 @@ def get(
:type eventual: bool
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency, but cannot
be used inside a transaction or will raise ValueError.
be used inside a transaction or with read_time, or will
raise ValueError.
:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -443,10 +452,16 @@ def get(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.
:type read_time: datetime
:param read_time: Read the entity from the specified time (may be null).
Cannot be used with eventual consistency or inside a
transaction, or will raise ValueError. This feature is in private preview.
:rtype: :class:`google.cloud.datastore.entity.Entity` or ``NoneType``
:returns: The requested entity if it exists.
:raises: :class:`ValueError` if eventual is True and in a transaction.
:raises: :class:`ValueError` if more than one of ``eventual==True``,
``transaction``, and ``read_time`` is specified.
"""
entities = self.get_multi(
keys=[key],
Expand All @@ -456,6 +471,7 @@ def get(
eventual=eventual,
retry=retry,
timeout=timeout,
read_time=read_time,
)
if entities:
return entities[0]
Expand All @@ -469,6 +485,7 @@ def get_multi(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
"""Retrieve entities, along with their attributes.
Expand Down Expand Up @@ -506,11 +523,15 @@ def get_multi(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.
:type read_time: datetime
:param read_time: (Optional) Read time to use for read consistency. This feature is in private preview.
:rtype: list of :class:`google.cloud.datastore.entity.Entity`
:returns: The requested entities.
:raises: :class:`ValueError` if one or more of ``keys`` has a project
which does not match our project.
:raises: :class:`ValueError` if eventual is True and in a transaction.
which does not match our project; or if more than one of
``eventual==True``, ``transaction``, and ``read_time`` is
specified.
"""
if not keys:
return []
Expand All @@ -533,6 +554,7 @@ def get_multi(
transaction_id=transaction and transaction.id,
retry=retry,
timeout=timeout,
read_time=read_time,
)

if missing is not None:
Expand Down
28 changes: 21 additions & 7 deletions google/cloud/datastore/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from google.cloud.datastore_v1.types import entity as entity_pb2
from google.cloud.datastore.entity import Entity
from google.cloud.datastore.key import Key
from google.protobuf import timestamp_pb2


def _get_meaning(value_pb, is_list=False):
Expand Down Expand Up @@ -230,7 +231,7 @@ def entity_to_protobuf(entity):
return entity_pb


def get_read_options(eventual, transaction_id):
def get_read_options(eventual, transaction_id, read_time=None):
"""Validate rules for read options, and assign to the request.
Helper method for ``lookup()`` and ``run_query``.
Expand All @@ -242,21 +243,34 @@ def get_read_options(eventual, transaction_id):
:type transaction_id: bytes
:param transaction_id: A transaction identifier (may be null).
:type read_time: datetime
:param read_time: Read data from the specified time (may be null). This feature is in private preview.
:rtype: :class:`.datastore_pb2.ReadOptions`
:returns: The read options corresponding to the inputs.
:raises: :class:`ValueError` if ``eventual`` is ``True`` and the
``transaction_id`` is not ``None``.
:raises: :class:`ValueError` if more than one of ``eventual==True``,
``transaction``, and ``read_time`` is specified.
"""
if transaction_id is None:
if eventual:
return datastore_pb2.ReadOptions(
read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
)
if read_time is not None:
raise ValueError("eventual must be False when read_time is specified")
else:
return datastore_pb2.ReadOptions(
read_consistency=datastore_pb2.ReadOptions.ReadConsistency.EVENTUAL
)
else:
return datastore_pb2.ReadOptions()
if read_time is None:
return datastore_pb2.ReadOptions()
else:
read_time_pb = timestamp_pb2.Timestamp()
read_time_pb.FromDatetime(read_time)
return datastore_pb2.ReadOptions(read_time=read_time_pb)
else:
if eventual:
raise ValueError("eventual must be False when in a transaction")
elif read_time is not None:
raise ValueError("transaction and read_time are mutual exclusive")
else:
return datastore_pb2.ReadOptions(transaction=transaction_id)

Expand Down
23 changes: 20 additions & 3 deletions google/cloud/datastore/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ def fetch(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
"""Execute the Query; return an iterator for the matching entities.
Expand Down Expand Up @@ -412,7 +413,8 @@ def fetch(
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
with read_time, otherwise will raise
ValueError.
:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -425,6 +427,11 @@ def fetch(
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.
:type read_time: datetime
:param read_time:
(Optional) use read_time read consistency, cannot be used inside a
transaction or with eventual consistency, or will raise ValueError.
:rtype: :class:`Iterator`
:returns: The iterator for the query.
"""
Expand All @@ -441,6 +448,7 @@ def fetch(
eventual=eventual,
retry=retry,
timeout=timeout,
read_time=read_time,
)


Expand Down Expand Up @@ -473,7 +481,7 @@ class Iterator(page_iterator.Iterator):
:param eventual: (Optional) Defaults to strongly consistent (False).
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
with read_time, otherwise will raise ValueError.
:type retry: :class:`google.api_core.retry.Retry`
:param retry:
Expand All @@ -485,6 +493,11 @@ class Iterator(page_iterator.Iterator):
Time, in seconds, to wait for the request to complete.
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.
:type read_time: datetime
:param read_time: (Optional) Runs the query with read time consistency.
Cannot be used with eventual consistency or inside a
transaction, otherwise will raise ValueError. This feature is in private preview.
"""

next_page_token = None
Expand All @@ -500,6 +513,7 @@ def __init__(
eventual=False,
retry=None,
timeout=None,
read_time=None,
):
super(Iterator, self).__init__(
client=client,
Expand All @@ -513,6 +527,7 @@ def __init__(
self._eventual = eventual
self._retry = retry
self._timeout = timeout
self._read_time = read_time
# The attributes below will change over the life of the iterator.
self._more_results = True
self._skipped_results = 0
Expand Down Expand Up @@ -593,7 +608,9 @@ def _next_page(self):
transaction_id = None
else:
transaction_id = transaction.id
read_options = helpers.get_read_options(self._eventual, transaction_id)
read_options = helpers.get_read_options(
self._eventual, transaction_id, self._read_time
)

partition_id = entity_pb2.PartitionId(
project_id=self._query.project, namespace_id=self._query.namespace
Expand Down
24 changes: 21 additions & 3 deletions google/cloud/datastore/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from google.cloud.datastore.batch import Batch
from google.cloud.datastore_v1.types import TransactionOptions
from google.protobuf import timestamp_pb2


def _make_retry_timeout_kwargs(retry, timeout):
Expand Down Expand Up @@ -141,18 +142,35 @@ class Transaction(Batch):
:type read_only: bool
:param read_only: indicates the transaction is read only.
:type read_time: datetime
:param read_time: (Optional) Time at which the transaction reads entities.
Only allowed when ``read_only=True``. This feature is in private preview.
:raises: :class:`ValueError` if read_time is specified when
``read_only=False``.
"""

_status = None

def __init__(self, client, read_only=False):
def __init__(self, client, read_only=False, read_time=None):
super(Transaction, self).__init__(client)
self._id = None

if read_only:
options = TransactionOptions(read_only=TransactionOptions.ReadOnly())
if read_time is not None:
read_time_pb = timestamp_pb2.Timestamp()
read_time_pb.FromDatetime(read_time)
options = TransactionOptions(
read_only=TransactionOptions.ReadOnly(read_time=read_time_pb)
)
else:
options = TransactionOptions(read_only=TransactionOptions.ReadOnly())
else:
options = TransactionOptions()
if read_time is not None:
raise ValueError("read_time is only allowed in read only transaction.")
else:
options = TransactionOptions()

self._options = options

Expand Down
Loading

0 comments on commit 58b4b74

Please sign in to comment.