Skip to content

Commit

Permalink
feat: Added read_time as a parameter to various calls (synchronous/base
Browse files Browse the repository at this point in the history
classes)
  • Loading branch information
gkevinzheng committed Jan 30, 2025
1 parent 857f775 commit ffae9bc
Show file tree
Hide file tree
Showing 20 changed files with 631 additions and 53 deletions.
24 changes: 23 additions & 1 deletion google/cloud/firestore_v1/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""
from __future__ import annotations

from datetime import datetime
from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union

from google.api_core import exceptions, gapic_v1
Expand Down Expand Up @@ -56,6 +57,7 @@ def get(
timeout: float | None = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> QueryResultsList[AggregationResult]:
"""Runs the aggregation query.
Expand All @@ -78,6 +80,10 @@ def get(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime]): If set, reads documents as they were at the given
time. This must be a microsecond precision timestamp within the past one hour,
or if Point-in-Time Recovery is enabled, can additionally be a whole minute
timestamp within the past 7 days.
Returns:
QueryResultsList[AggregationResult]: The aggregation query results.
Expand All @@ -90,6 +96,7 @@ def get(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
result_list = list(result)

Expand All @@ -100,13 +107,14 @@ def get(

return QueryResultsList(result_list, explain_options, explain_metrics)

def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None):
def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None, read_time=None):
"""Helper method for :meth:`stream`."""
request, kwargs = self._prep_stream(
transaction,
retry,
timeout,
explain_options,
read_time,
)

return self._client._firestore_api.run_aggregation_query(
Expand All @@ -132,6 +140,7 @@ def _make_stream(
retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
"""Internal method for stream(). Runs the aggregation query.
Expand All @@ -155,6 +164,10 @@ def _make_stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime]): If set, reads documents as they were at the given
time. This must be a microsecond precision timestamp within the past one hour,
or if Point-in-Time Recovery is enabled, can additionally be a whole minute
timestamp within the past 7 days.
Yields:
List[AggregationResult]:
Expand All @@ -172,6 +185,7 @@ def _make_stream(
retry,
timeout,
explain_options,
read_time,
)
while True:
try:
Expand All @@ -182,6 +196,8 @@ def _make_stream(
transaction,
retry,
timeout,
explain_options,
read_time,
)
continue
else:
Expand All @@ -206,6 +222,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> StreamGenerator[List[AggregationResult]]:
"""Runs the aggregation query.
Expand All @@ -229,6 +246,10 @@ def stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime]): If set, reads documents as they were at the given
time. This must be a microsecond precision timestamp within the past one hour,
or if Point-in-Time Recovery is enabled, can additionally be a whole minute
timestamp within the past 7 days.
Returns:
`StreamGenerator[List[AggregationResult]]`:
Expand All @@ -239,5 +260,6 @@ def stream(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
return StreamGenerator(inner_generator, explain_options)
13 changes: 13 additions & 0 deletions google/cloud/firestore_v1/base_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import abc
from abc import ABC
from datetime import datetime
from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union

from google.api_core import gapic_v1
Expand Down Expand Up @@ -205,12 +206,14 @@ def _prep_stream(
retry: Union[retries.Retry, retries.AsyncRetry, None, object] = None,
timeout: float | None = None,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> Tuple[dict, dict]:
parent_path, expected_prefix = self._collection_ref._parent_info()
request = {
"parent": parent_path,
"structured_aggregation_query": self._to_protobuf(),
"transaction": _helpers.get_transaction_id(transaction),
"read_time": read_time,
}
if explain_options:
request["explain_options"] = explain_options._to_dict()
Expand All @@ -228,6 +231,7 @@ def get(
timeout: float | None = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> (
QueryResultsList[AggregationResult]
| Coroutine[Any, Any, List[List[AggregationResult]]]
Expand All @@ -253,6 +257,10 @@ def get(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime]): If set, reads documents as they were at the given
time. This must be a microsecond precision timestamp within the past one hour,
or if Point-in-Time Recovery is enabled, can additionally be a whole minute
timestamp within the past 7 days.
Returns:
(QueryResultsList[List[AggregationResult]] | Coroutine[Any, Any, List[List[AggregationResult]]]):
Expand All @@ -270,6 +278,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> (
StreamGenerator[List[AggregationResult]]
| AsyncStreamGenerator[List[AggregationResult]]
Expand All @@ -291,6 +300,10 @@ def stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime]): If set, reads documents as they were at the given
time. This must be a microsecond precision timestamp within the past one hour,
or if Point-in-Time Recovery is enabled, can additionally be a whole minute
timestamp within the past 7 days.
Returns:
StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator[List[AggregationResult]]:
Expand Down
13 changes: 12 additions & 1 deletion google/cloud/firestore_v1/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from __future__ import annotations

import os
from datetime import datetime
from typing import (
Any,
AsyncGenerator,
Expand Down Expand Up @@ -437,6 +438,7 @@ def _prep_get_all(
transaction: BaseTransaction | None = None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
read_time: datetime | None = None,
) -> Tuple[dict, dict, dict]:
"""Shared setup for async/sync :meth:`get_all`."""
document_paths, reference_map = _reference_info(references)
Expand All @@ -446,6 +448,7 @@ def _prep_get_all(
"documents": document_paths,
"mask": mask,
"transaction": _helpers.get_transaction_id(transaction),
"read_time": read_time,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

Expand All @@ -458,6 +461,8 @@ def get_all(
transaction=None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
*,
read_time: datetime | None = None
) -> Union[
AsyncGenerator[DocumentSnapshot, Any], Generator[DocumentSnapshot, Any, Any]
]:
Expand All @@ -467,9 +472,13 @@ def _prep_collections(
self,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
read_time: datetime | None = None,
) -> Tuple[dict, dict]:
"""Shared setup for async/sync :meth:`collections`."""
request = {"parent": "{}/documents".format(self._database_string)}
request = {
"parent": "{}/documents".format(self._database_string),
"read_time": read_time,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -478,6 +487,8 @@ def collections(
self,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
*,
read_time: datetime | None = None,
):
raise NotImplementedError

Expand Down
8 changes: 8 additions & 0 deletions google/cloud/firestore_v1/base_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from __future__ import annotations

import random

from datetime import datetime
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -202,6 +204,7 @@ def _prep_list_documents(
page_size: Optional[int] = None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: Optional[float] = None,
read_time: Optional[datetime] = None,
) -> Tuple[dict, dict]:
"""Shared setup for async / sync :method:`list_documents`"""
parent, _ = self._parent_info()
Expand All @@ -214,6 +217,7 @@ def _prep_list_documents(
# include any fields. To save on data transfer, we can set a field_path mask
# to include no fields
"mask": {"field_paths": None},
"read_time": read_time,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

Expand All @@ -224,6 +228,8 @@ def list_documents(
page_size: Optional[int] = None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: Optional[float] = None,
*,
read_time: Optional[datetime] = None,
) -> Union[
Generator[DocumentReference, Any, Any], AsyncGenerator[DocumentReference, Any]
]:
Expand Down Expand Up @@ -497,6 +503,7 @@ def get(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> (
QueryResultsList[DocumentSnapshot]
| Coroutine[Any, Any, QueryResultsList[DocumentSnapshot]]
Expand All @@ -510,6 +517,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime] = None,
) -> StreamGenerator[DocumentSnapshot] | AsyncIterator[DocumentSnapshot]:
raise NotImplementedError

Expand Down
14 changes: 13 additions & 1 deletion google/cloud/firestore_v1/base_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
Union,
Awaitable,
)
from datetime import datetime

from google.api_core import retry as retries

Expand Down Expand Up @@ -290,6 +291,7 @@ def _prep_batch_get(
transaction=None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
read_time: datetime | None = None,
) -> Tuple[dict, dict]:
"""Shared setup for async/sync :meth:`get`."""
if isinstance(field_paths, str):
Expand All @@ -305,6 +307,7 @@ def _prep_batch_get(
"documents": [self._document_path],
"mask": mask,
"transaction": _helpers.get_transaction_id(transaction),
"read_time": read_time,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

Expand All @@ -316,6 +319,8 @@ def get(
transaction=None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
*,
read_time: Optional[datetime] = None,
) -> "DocumentSnapshot" | Awaitable["DocumentSnapshot"]:
raise NotImplementedError

Expand All @@ -324,9 +329,14 @@ def _prep_collections(
page_size: int | None = None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
read_time: datetime | None = None,
) -> Tuple[dict, dict]:
"""Shared setup for async/sync :meth:`collections`."""
request = {"parent": self._document_path, "page_size": page_size}
request = {
"parent": self._document_path,
"page_size": page_size,
"read_time": read_time,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -336,6 +346,8 @@ def collections(
page_size: int | None = None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
*,
read_time: Optional[datetime] = None,
):
raise NotImplementedError

Expand Down
Loading

0 comments on commit ffae9bc

Please sign in to comment.