-
Notifications
You must be signed in to change notification settings - Fork 90
/
Copy pathmethod.py
244 lines (197 loc) · 9.25 KB
/
method.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for wrapping low-level gRPC methods with common functionality.
This is used by gapic clients to provide common error mapping, retry, timeout,
pagination, and long-running operations to gRPC methods.
"""
from google.api_core import general_helpers
from google.api_core import grpc_helpers
from google.api_core import timeout
from google.api_core.gapic_v1 import client_info
USE_DEFAULT_METADATA = object()
DEFAULT = object()
"""Sentinel value indicating that a retry or timeout argument was unspecified,
so the default should be used."""
def _is_not_none_or_false(value):
return value is not None and value is not False
def _apply_decorators(func, decorators):
"""Apply a list of decorators to a given function.
``decorators`` may contain items that are ``None`` or ``False`` which will
be ignored.
"""
decorators = filter(_is_not_none_or_false, reversed(decorators))
for decorator in decorators:
func = decorator(func)
return func
def _determine_timeout(default_timeout, specified_timeout, retry):
"""Determines how timeout should be applied to a wrapped method.
Args:
default_timeout (Optional[Timeout]): The default timeout specified
at method creation time.
specified_timeout (Optional[Timeout]): The timeout specified at
invocation time. If :attr:`DEFAULT`, this will be set to
the ``default_timeout``.
retry (Optional[Retry]): The retry specified at invocation time.
Returns:
Optional[Timeout]: The timeout to apply to the method or ``None``.
"""
# If timeout is specified as a number instead of a Timeout instance,
# convert it to a ConstantTimeout.
if isinstance(specified_timeout, (int, float)):
specified_timeout = timeout.ConstantTimeout(specified_timeout)
if isinstance(default_timeout, (int, float)):
default_timeout = timeout.ConstantTimeout(default_timeout)
if specified_timeout is DEFAULT:
specified_timeout = default_timeout
if specified_timeout is default_timeout:
# If timeout is the default and the default timeout is exponential and
# a non-default retry is specified, make sure the timeout's deadline
# matches the retry's. This handles the case where the user leaves
# the timeout default but specifies a lower deadline via the retry.
if (
retry
and retry is not DEFAULT
and isinstance(default_timeout, timeout.ExponentialTimeout)
):
return default_timeout.with_deadline(retry._deadline)
else:
return default_timeout
return specified_timeout
class _GapicCallable(object):
"""Callable that applies retry, timeout, and metadata logic.
Args:
target (Callable): The low-level RPC method.
retry (google.api_core.retry.Retry): The default retry for the
callable. If ``None``, this callable will not retry by default
timeout (google.api_core.timeout.Timeout): The default timeout
for the callable. If ``None``, this callable will not specify
a timeout argument to the low-level RPC method by default.
metadata (Sequence[Tuple[str, str]]): Additional metadata that is
provided to the RPC method on every invocation. This is merged with
any metadata specified during invocation. If ``None``, no
additional metadata will be passed to the RPC method.
"""
def __init__(self, target, retry, timeout, metadata=None):
self._target = target
self._retry = retry
self._timeout = timeout
self._metadata = metadata
def __call__(self, *args, **kwargs):
"""Invoke the low-level RPC with retry, timeout, and metadata."""
# Note: Due to Python 2 lacking keyword-only arguments we use kwargs to
# extract the retry and timeout params.
timeout_ = _determine_timeout(
self._timeout,
kwargs.pop("timeout", self._timeout),
# Use only the invocation-specified retry only for this, as we only
# want to adjust the timeout deadline if the *user* specified
# a different retry.
kwargs.get("retry", None),
)
retry = kwargs.pop("retry", self._retry)
if retry is DEFAULT:
retry = self._retry
# Apply all applicable decorators.
wrapped_func = _apply_decorators(self._target, [retry, timeout_])
# Add the user agent metadata to the call.
if self._metadata is not None:
metadata = kwargs.get("metadata", [])
# Due to the nature of invocation, None should be treated the same
# as not specified.
if metadata is None:
metadata = []
metadata = list(metadata)
metadata.extend(self._metadata)
kwargs["metadata"] = metadata
return wrapped_func(*args, **kwargs)
def wrap_method(
func,
default_retry=None,
default_timeout=None,
client_info=client_info.DEFAULT_CLIENT_INFO,
):
"""Wrap an RPC method with common behavior.
This applies common error wrapping, retry, and timeout behavior a function.
The wrapped function will take optional ``retry`` and ``timeout``
arguments.
For example::
import google.api_core.gapic_v1.method
from google.api_core import retry
from google.api_core import timeout
# The original RPC method.
def get_topic(name, timeout=None):
request = publisher_v2.GetTopicRequest(name=name)
return publisher_stub.GetTopic(request, timeout=timeout)
default_retry = retry.Retry(deadline=60)
default_timeout = timeout.Timeout(deadline=60)
wrapped_get_topic = google.api_core.gapic_v1.method.wrap_method(
get_topic, default_retry)
# Execute get_topic with default retry and timeout:
response = wrapped_get_topic()
# Execute get_topic without doing any retying but with the default
# timeout:
response = wrapped_get_topic(retry=None)
# Execute get_topic but only retry on 5xx errors:
my_retry = retry.Retry(retry.if_exception_type(
exceptions.InternalServerError))
response = wrapped_get_topic(retry=my_retry)
The way this works is by late-wrapping the given function with the retry
and timeout decorators. Essentially, when ``wrapped_get_topic()`` is
called:
* ``get_topic()`` is first wrapped with the ``timeout`` into
``get_topic_with_timeout``.
* ``get_topic_with_timeout`` is wrapped with the ``retry`` into
``get_topic_with_timeout_and_retry()``.
* The final ``get_topic_with_timeout_and_retry`` is called passing through
the ``args`` and ``kwargs``.
The callstack is therefore::
method.__call__() ->
Retry.__call__() ->
Timeout.__call__() ->
wrap_errors() ->
get_topic()
Note that if ``timeout`` or ``retry`` is ``None``, then they are not
applied to the function. For example,
``wrapped_get_topic(timeout=None, retry=None)`` is more or less
equivalent to just calling ``get_topic`` but with error re-mapping.
Args:
func (Callable[Any]): The function to wrap. It should accept an
optional ``timeout`` argument. If ``metadata`` is not ``None``, it
should accept a ``metadata`` argument.
default_retry (Optional[google.api_core.Retry]): The default retry
strategy. If ``None``, the method will not retry by default.
default_timeout (Optional[google.api_core.Timeout]): The default
timeout strategy. Can also be specified as an int or float. If
``None``, the method will not have timeout specified by default.
client_info
(Optional[google.api_core.gapic_v1.client_info.ClientInfo]):
Client information used to create a user-agent string that's
passed as gRPC metadata to the method. If unspecified, then
a sane default will be used. If ``None``, then no user agent
metadata will be provided to the RPC method.
Returns:
Callable: A new callable that takes optional ``retry`` and ``timeout``
arguments and applies the common error mapping, retry, timeout,
and metadata behavior to the low-level RPC method.
"""
func = grpc_helpers.wrap_errors(func)
if client_info is not None:
user_agent_metadata = [client_info.to_grpc_metadata()]
else:
user_agent_metadata = None
return general_helpers.wraps(func)(
_GapicCallable(
func, default_retry, default_timeout, metadata=user_agent_metadata
)
)