From 92fc6c7ae51fd3cd2302624f4b1564cc1518d625 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Tue, 27 Dec 2016 14:57:10 -0800 Subject: [PATCH 01/20] Add OperationFuture class --- google/gax/__init__.py | 158 +++++++++++++++++++++++++++++++++++++++++ setup.py | 1 + test/test_gax.py | 129 ++++++++++++++++++++++++++++++++- 3 files changed, 287 insertions(+), 1 deletion(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 45c7cce..43e183b 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -31,6 +31,10 @@ from __future__ import absolute_import import collections +import time +import threading +from concurrent import futures +from .errors import GaxError __version__ = '0.15.0' @@ -482,3 +486,157 @@ def __next__(self): if self._index >= len(self._current): self._current = None return resource + + +def _from_any(pb_type, any_pb): + if not any_pb.Is(pb_type.DESCRIPTOR): + raise TypeError( + 'Could not convert {} to {}'.format( + any_pb.__class__.__name__, pb_type.__name__)) + + return pb_type.FromString(any_pb.value) + + +class ResultError(GaxError): + """Thrown when an operation completes with an error result. + """ + + @classmethod + def from_operation_error(cls, operation_error): + """Factory: construct instance from an operation error result. + + Args: + operation_error (google.rpc.Status): the error result of the + operation. + """ + return cls(operation_error.message, operation_error.code) + + def __init__(self, msg, code=None): + """Constructor. + + Args: + msg (string): describes the error that occurred. + code (int, optional): the status code, which should be an enum + value of google.rpc.Code. + """ + super(ResultError, self).__init__(msg) + self.code = code + + def __str__(self): + msg = super(ResultError, self).__str__() + return 'ResultError({}, with code {})'.format(msg, self.code) + + +class OperationFuture(object): + """A Future which polls a service for completion via OperationsApi.""" + + def __init__(self, operation, client, result_type, metadata_type, + call_options=None): + """Constructor. + + Args: + operation (google.longrunning.Operation): the initial long-running + operation. + client (google.cloud.gapic.longrunning.operations_api.OperationsApi): + the client used to manage the long-running operation with an API + service. + result_type (type): the class type to be unpacked from the result. + metadata_type (type): the class type to be unpacked from the + metadata. + call_options (google.gax.CallOptions, optional): the call options + that are used when reloading the operation. + """ + self._first_operation = operation + self._last_operation = operation + self._client = client + self._result_type = result_type + self._metadata_type = metadata_type + self._call_options = call_options + self._cancelled = False + + def cancel(self): + """If last Operation's value of `done` is true, returns false; + otherwise, issues OperationsApi.cancel_operation and returns true. + """ + if not self._last_operation.done: + self._client.cancel_operation(self._last_operation.name) + self._cancelled = True + + return self._cancelled + + def result(self, timeout=None): + """Enters polling loop on OperationsApi.get_operation, and once Operation.done + is true, then returns Operation.response if successful or throws + ResultError if not successful. + + This method will wait up to timeout seconds. If the call hasn't + completed in timeout seconds, then a concurrent.futures.TimeoutError + will be raised. timeout can be an int or float. If timeout is not + specified or None, there is no limit to the wait time. + """ + if not self._poll(timeout).HasField('response'): + raise ResultError.from_operation_error(self._last_operation.error) + + return _from_any(self._result_type, self._last_operation.response) + + def exception(self, timeout=None): + """Similar to result(), except returns the exception if any.""" + if self._poll(timeout).HasField('error'): + return self._last_operation.error + + return None + + def cancelled(self): + """Return True if the call was successfully cancelled.""" + return self._cancelled + + def done(self): + """Issues OperationsApi.get_operation and returns value of Operation.done.""" + return self._get_operation().done + + def add_done_callback(self, done_clbk): + """Enters a polling loop on OperationsApi.get_operation, and once the + operation is done or cancelled, calls the function with this OperationFuture. + """ + def _execute_clbk(self, clbk): + self._poll() + clbk(self) + + threading.Thread(target=_execute_clbk, args=(self, done_clbk)).start() + + def operation_name(self): + """Returns the value of Operation.name from the initial Operation object + returned from the first call. Blocks if the first call isn't done yet. + """ + return self._first_operation.name + + def metadata(self): + """Returns the value of Operation.metadata from the initial Operation object + returned from the first call. Blocks if the first call isn't done yet. + """ + return _from_any(self._metadata_type, self._first_operation.metadata) + + def last_operation_data(self): + """Returns the data from the last call to OperationsApi.get_operation (or if only + the initial API call has been made, the data from that first call). Blocks if + the first call isn't done yet. + """ + return self._last_operation + + def _get_operation(self): + if not self._last_operation.done: + self._last_operation = self._client.get_operation( + self._last_operation.name, self._call_options) + + return self._last_operation + + def _poll(self, timeout=None): + start_time = time.time() + + while timeout is None or time.time() < start_time + timeout: + if self._get_operation().done: + return self._last_operation + + time.sleep(1) + + raise futures.TimeoutError() diff --git a/setup.py b/setup.py index 389d5b2..bd43e04 100644 --- a/setup.py +++ b/setup.py @@ -54,6 +54,7 @@ 'ply==3.8', 'protobuf>=3.0.0, <4.0dev', 'oauth2client>=2.0.0, <4.0dev', + 'googleapis-common-protos>=1.5.0', ] setup( diff --git a/test/test_gax.py b/test/test_gax.py index 74ea365..d76b76c 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -32,11 +32,18 @@ from __future__ import absolute_import +import mock import unittest2 +from concurrent import futures + +from fixture_pb2 import Simple +# pylint: disable=no-name-in-module,import-error +from google.longrunning import operations_pb2 +from google.rpc import status_pb2 from google.gax import ( BundleOptions, CallOptions, _CallSettings, INITIAL_PAGE, OPTION_INHERIT, - RetryOptions) + RetryOptions, OperationFuture, ResultError) class TestBundleOptions(unittest2.TestCase): @@ -111,3 +118,123 @@ def test_settings_merge_none(self): self.assertEqual(final.page_descriptor, settings.page_descriptor) self.assertEqual(final.bundler, settings.bundler) self.assertEqual(final.bundle_descriptor, settings.bundle_descriptor) + + +class TestOperationFuture(unittest2.TestCase): + + OPERATION_NAME = 'operations/projects/foo/instances/bar/operations/123' + + def _make_operation(self, metadata=None, response=None, error=None, + **kwargs): + operation = operations_pb2.Operation(name=self.OPERATION_NAME, **kwargs) + + if metadata is not None: + operation.metadata.Pack(metadata) + + if response is not None: + operation.response.Pack(response) + + if error is not None: + operation.error.CopyFrom(error) + + return operation + + def _make_operation_future(self, *operations): + if not operations: + operations = [self._make_operation()] + + mock_client = mock.Mock() + mock_client.get_operation.side_effect = operations + + return OperationFuture(operations[0], mock_client, Simple, Simple) + + def test_cancelled_defaults_to_false(self): + operation_future = self._make_operation_future() + self.assertFalse(operation_future.cancelled()) + + def test_cancel_changes_cancelled_to_true(self): + operation_future = self._make_operation_future() + operation_future.cancel() + self.assertTrue(operation_future.cancelled()) + + def test_cancel_does_nothing_when_already_done(self): + operation = self._make_operation(done=True) + operation_future = self._make_operation_future(operation) + operation_future.cancel() + self.assertFalse(operation_future.cancelled()) + + def test_done_true(self): + operation = self._make_operation(done=True) + operation_future = self._make_operation_future(operation) + self.assertTrue(operation_future.done()) + + def test_done_false(self): + operation_future = self._make_operation_future() + self.assertFalse(operation_future.done()) + + def test_operation_name(self): + operation_future = self._make_operation_future() + self.assertEqual(self.OPERATION_NAME, operation_future.operation_name()) + + def test_metadata(self): + metadata = Simple() + operation = self._make_operation(metadata=metadata) + operation_future = self._make_operation_future(operation) + self.assertEqual(metadata, operation_future.metadata()) + + def test_last_operation_data(self): + operation = self._make_operation() + operation_future = self._make_operation_future(operation) + self.assertEqual(operation, operation_future.last_operation_data()) + + def test_result_response(self): + response = Simple() + operation = self._make_operation(done=True, response=response) + operation_future = self._make_operation_future(operation) + self.assertEqual(response, operation_future.result()) + + def test_result_error(self): + operation = self._make_operation(done=True, error=status_pb2.Status()) + operation_future = self._make_operation_future(operation) + self.assertRaises(ResultError, operation_future.result) + + def test_result_timeout(self): + operation_future = self._make_operation_future() + self.assertRaises(futures.TimeoutError, operation_future.result, 0) + + def test_exception_error(self): + error = status_pb2.Status() + operation = self._make_operation(done=True, error=error) + operation_future = self._make_operation_future(operation) + self.assertEqual(error, operation_future.exception()) + + def test_exception_response(self): + operation = self._make_operation(done=True, response=Simple()) + operation_future = self._make_operation_future(operation) + self.assertIsNone(operation_future.exception()) + + def test_exception_timeout(self): + operation_future = self._make_operation_future() + self.assertRaises(futures.TimeoutError, operation_future.exception, 0) + + @mock.patch('time.sleep') + @mock.patch('time.time') + def test_add_done_callback(self, mock_time, mock_sleep): + def incr_time(secs): + mock_time.return_value += secs + + mock_time.return_value = 0 + mock_sleep.side_effect = incr_time + + operation_future = self._make_operation_future( + self._make_operation(), + self._make_operation(done=True)) + + mock_clbk1 = mock.Mock() + mock_clbk2 = mock.Mock() + + operation_future.add_done_callback(mock_clbk1) + operation_future.add_done_callback(mock_clbk2) + + mock_clbk1.assert_called_with(operation_future) + mock_clbk2.assert_called_with(operation_future) From 903b12bb4d68137c7b6379d7c17dfc9bf60d7be2 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Tue, 27 Dec 2016 15:25:41 -0800 Subject: [PATCH 02/20] Fix import order of concurrent.futures --- test/test_gax.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_gax.py b/test/test_gax.py index d76b76c..9597c12 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -32,11 +32,11 @@ from __future__ import absolute_import +from concurrent import futures + import mock import unittest2 -from concurrent import futures - from fixture_pb2 import Simple # pylint: disable=no-name-in-module,import-error from google.longrunning import operations_pb2 From 8f2dfc08d03a51ea60cce41021edf84afb234816 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Wed, 28 Dec 2016 10:08:13 -0800 Subject: [PATCH 03/20] Handle exceptions thrown by done callbacks --- google/gax/__init__.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 43e183b..49dd253 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -31,6 +31,7 @@ from __future__ import absolute_import import collections +import logging import time import threading from concurrent import futures @@ -40,6 +41,8 @@ __version__ = '0.15.0' +_LOG = logging.getLogger(__name__) + INITIAL_PAGE = object() """A placeholder for the page token passed into an initial paginated request.""" @@ -600,7 +603,10 @@ def add_done_callback(self, done_clbk): """ def _execute_clbk(self, clbk): self._poll() - clbk(self) + try: + clbk(self) + except Exception as ex: # pylint: disable=broad-except + _LOG.exception(ex) threading.Thread(target=_execute_clbk, args=(self, done_clbk)).start() From 347b89840f40c6f4cf13482cfbd49b4bb10fda5f Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Wed, 28 Dec 2016 14:18:09 -0800 Subject: [PATCH 04/20] Generate LRO Gapic library --- google/gapic/__init__.py | 1 + google/gapic/longrunning/__init__.py | 1 + google/gapic/longrunning/operations_client.py | 279 ++++++++++++++++++ .../longrunning/operations_client_config.json | 48 +++ setup.py | 2 +- tox.ini | 10 +- 6 files changed, 335 insertions(+), 6 deletions(-) create mode 100644 google/gapic/__init__.py create mode 100644 google/gapic/longrunning/__init__.py create mode 100644 google/gapic/longrunning/operations_client.py create mode 100644 google/gapic/longrunning/operations_client_config.json diff --git a/google/gapic/__init__.py b/google/gapic/__init__.py new file mode 100644 index 0000000..de40ea7 --- /dev/null +++ b/google/gapic/__init__.py @@ -0,0 +1 @@ +__import__('pkg_resources').declare_namespace(__name__) diff --git a/google/gapic/longrunning/__init__.py b/google/gapic/longrunning/__init__.py new file mode 100644 index 0000000..de40ea7 --- /dev/null +++ b/google/gapic/longrunning/__init__.py @@ -0,0 +1 @@ +__import__('pkg_resources').declare_namespace(__name__) diff --git a/google/gapic/longrunning/operations_client.py b/google/gapic/longrunning/operations_client.py new file mode 100644 index 0000000..1fce0dc --- /dev/null +++ b/google/gapic/longrunning/operations_client.py @@ -0,0 +1,279 @@ +# Copyright 2016 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# EDITING INSTRUCTIONS +# This file was generated from the file +# https://github.com/google/googleapis/blob/master/google/longrunning/operations.proto, +# and updates to that file get reflected here through a refresh process. +# For the short term, the refresh process will only be runnable by Google engineers. +# +# The only allowed edits are to method and file documentation. A 3-way +# merge preserves those additions if the generated source changes. +"""Accesses the google.longrunning Operations API.""" + +import json +import os +import pkg_resources +import platform + +from google.gax import api_callable +from google.gax import config +from google.gax import path_template +import google.gax + +from google.longrunning import operations_pb2 + +_PageDesc = google.gax.PageDescriptor + + +class OperationsClient(object): + """ + Manages long-running operations with an API service. + + When an API method normally takes long time to complete, it can be designed + to return ``Operation`` to the client, and the client can use this + interface to receive the real response asynchronously by polling the + operation resource, or pass the operation resource to another API (such as + Google Cloud Pub/Sub API) to receive the response. Any API service that + returns long-running operations should implement the ``Operations`` interface + so developers can have a consistent client experience. + """ + + SERVICE_ADDRESS = 'longrunning.googleapis.com' + """The default address of the service.""" + + DEFAULT_SERVICE_PORT = 443 + """The default port of the service.""" + + _CODE_GEN_NAME_VERSION = 'gapic/0.1.0' + + _GAX_VERSION = pkg_resources.get_distribution('google-gax').version + + _PAGE_DESCRIPTORS = { + 'list_operations': _PageDesc('page_token', 'next_page_token', + 'operations') + } + + # The scopes needed to make gRPC calls to all of the methods defined in + # this service + _ALL_SCOPES = () + + def __init__(self, + service_path=SERVICE_ADDRESS, + port=DEFAULT_SERVICE_PORT, + channel=None, + credentials=None, + ssl_credentials=None, + scopes=None, + client_config=None, + app_name='gax', + app_version=_GAX_VERSION): + """Constructor. + + Args: + service_path (string): The domain name of the API remote host. + port (int): The port on which to connect to the remote host. + channel (:class:`grpc.Channel`): A ``Channel`` instance through + which to make calls. + credentials (object): The authorization credentials to attach to + requests. These credentials identify this application to the + service. + ssl_credentials (:class:`grpc.ChannelCredentials`): A + ``ChannelCredentials`` instance for use with an SSL-enabled + channel. + scopes (list[string]): A list of OAuth2 scopes to attach to requests. + client_config (dict): + A dictionary for call options for each method. See + :func:`google.gax.construct_settings` for the structure of + this data. Falls back to the default config if not specified + or the specified config is missing data points. + app_name (string): The codename of the calling service. + app_version (string): The version of the calling service. + + Returns: + A OperationsClient object. + """ + if scopes is None: + scopes = self._ALL_SCOPES + if client_config is None: + client_config = {} + goog_api_client = '{}/{} {} gax/{} python/{}'.format( + app_name, app_version, self._CODE_GEN_NAME_VERSION, + self._GAX_VERSION, platform.python_version()) + metadata = [('x-goog-api-client', goog_api_client)] + default_client_config = json.loads( + pkg_resources.resource_string( + __name__, 'operations_client_config.json').decode()) + defaults = api_callable.construct_settings( + 'google.longrunning.Operations', + default_client_config, + client_config, + config.STATUS_CODE_NAMES, + kwargs={'metadata': metadata}, + page_descriptors=self._PAGE_DESCRIPTORS) + self.operations_stub = config.create_stub( + operations_pb2.OperationsStub, + channel=channel, + service_path=service_path, + service_port=port, + credentials=credentials, + scopes=scopes, + ssl_credentials=ssl_credentials) + + self._get_operation = api_callable.create_api_call( + self.operations_stub.GetOperation, + settings=defaults['get_operation']) + self._list_operations = api_callable.create_api_call( + self.operations_stub.ListOperations, + settings=defaults['list_operations']) + self._cancel_operation = api_callable.create_api_call( + self.operations_stub.CancelOperation, + settings=defaults['cancel_operation']) + self._delete_operation = api_callable.create_api_call( + self.operations_stub.DeleteOperation, + settings=defaults['delete_operation']) + + # Service calls + def get_operation(self, name, options=None): + """ + Gets the latest state of a long-running operation. Clients can use this + method to poll the operation result at intervals as recommended by the API + service. + + Example: + >>> from google.gapic.longrunning import operations_client + >>> api = operations_client.OperationsClient() + >>> name = '' + >>> response = api.get_operation(name) + + Args: + name (string): The name of the operation resource. + options (:class:`google.gax.CallOptions`): Overrides the default + settings for this call, e.g, timeout, retries etc. + + Returns: + A :class:`google.longrunning.operations_pb2.Operation` instance. + + Raises: + :exc:`google.gax.errors.GaxError` if the RPC is aborted. + :exc:`ValueError` if the parameters are invalid. + """ + request = operations_pb2.GetOperationRequest(name=name) + return self._get_operation(request, options) + + def list_operations(self, name, filter_, page_size=0, options=None): + """ + Lists operations that match the specified filter in the request. If the + server doesn't support this method, it returns ``UNIMPLEMENTED``. + NOTE: the ``name`` binding below allows API services to override the binding + to use different resource name schemes, such as ``users/*/operations``. + + Example: + >>> from google.gapic.longrunning import operations_client + >>> from google.gax import CallOptions, INITIAL_PAGE + >>> api = operations_client.OperationsClient() + >>> name = '' + >>> filter_ = '' + >>> + >>> # Iterate over all results + >>> for element in api.list_operations(name, filter_): + >>> # process element + >>> pass + >>> + >>> # Or iterate over results one page at a time + >>> for page in api.list_operations(name, filter_, options=CallOptions(page_token=INITIAL_PAGE)): + >>> for element in page: + >>> # process element + >>> pass + + Args: + name (string): The name of the operation collection. + filter_ (string): The standard list filter. + page_size (int): The maximum number of resources contained in the + underlying API response. If page streaming is performed per- + resource, this parameter does not affect the return value. If page + streaming is performed per-page, this determines the maximum number + of resources in a page. + options (:class:`google.gax.CallOptions`): Overrides the default + settings for this call, e.g, timeout, retries etc. + + Returns: + A :class:`google.gax.PageIterator` instance. By default, this + is an iterable of :class:`google.longrunning.operations_pb2.Operation` instances. + This object can also be configured to iterate over the pages + of the response through the `CallOptions` parameter. + + Raises: + :exc:`google.gax.errors.GaxError` if the RPC is aborted. + :exc:`ValueError` if the parameters are invalid. + """ + request = operations_pb2.ListOperationsRequest( + name=name, filter=filter_, page_size=page_size) + return self._list_operations(request, options) + + def cancel_operation(self, name, options=None): + """ + Starts asynchronous cancellation on a long-running operation. The server + makes a best effort to cancel the operation, but success is not + guaranteed. If the server doesn't support this method, it returns + ``google.rpc.Code.UNIMPLEMENTED``. Clients can use + ``Operations.GetOperation`` or + other methods to check whether the cancellation succeeded or whether the + operation completed despite cancellation. On successful cancellation, + the operation is not deleted; instead, it becomes an operation with + an ``Operation.error`` value with a ``google.rpc.Status.code`` of 1, + corresponding to ``Code.CANCELLED``. + + Example: + >>> from google.gapic.longrunning import operations_client + >>> api = operations_client.OperationsClient() + >>> name = '' + >>> api.cancel_operation(name) + + Args: + name (string): The name of the operation resource to be cancelled. + options (:class:`google.gax.CallOptions`): Overrides the default + settings for this call, e.g, timeout, retries etc. + + Raises: + :exc:`google.gax.errors.GaxError` if the RPC is aborted. + :exc:`ValueError` if the parameters are invalid. + """ + request = operations_pb2.CancelOperationRequest(name=name) + self._cancel_operation(request, options) + + def delete_operation(self, name, options=None): + """ + Deletes a long-running operation. This method indicates that the client is + no longer interested in the operation result. It does not cancel the + operation. If the server doesn't support this method, it returns + ``google.rpc.Code.UNIMPLEMENTED``. + + Example: + >>> from google.gapic.longrunning import operations_client + >>> api = operations_client.OperationsClient() + >>> name = '' + >>> api.delete_operation(name) + + Args: + name (string): The name of the operation resource to be deleted. + options (:class:`google.gax.CallOptions`): Overrides the default + settings for this call, e.g, timeout, retries etc. + + Raises: + :exc:`google.gax.errors.GaxError` if the RPC is aborted. + :exc:`ValueError` if the parameters are invalid. + """ + request = operations_pb2.DeleteOperationRequest(name=name) + self._delete_operation(request, options) diff --git a/google/gapic/longrunning/operations_client_config.json b/google/gapic/longrunning/operations_client_config.json new file mode 100644 index 0000000..86873bc --- /dev/null +++ b/google/gapic/longrunning/operations_client_config.json @@ -0,0 +1,48 @@ +{ + "interfaces": { + "google.longrunning.Operations": { + "retry_codes": { + "retry_codes_def": { + "idempotent": [ + "DEADLINE_EXCEEDED", + "UNAVAILABLE" + ], + "non_idempotent": [] + } + }, + "retry_params": { + "default": { + "initial_retry_delay_millis": 100, + "retry_delay_multiplier": 1.3, + "max_retry_delay_millis": 60000, + "initial_rpc_timeout_millis": 20000, + "rpc_timeout_multiplier": 1.0, + "max_rpc_timeout_millis": 20000, + "total_timeout_millis": 600000 + } + }, + "methods": { + "GetOperation": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + }, + "ListOperations": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + }, + "CancelOperation": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + }, + "DeleteOperation": { + "timeout_millis": 60000, + "retry_codes_name": "idempotent", + "retry_params_name": "default" + } + } + } + } +} diff --git a/setup.py b/setup.py index bd43e04..3f68b66 100644 --- a/setup.py +++ b/setup.py @@ -54,7 +54,7 @@ 'ply==3.8', 'protobuf>=3.0.0, <4.0dev', 'oauth2client>=2.0.0, <4.0dev', - 'googleapis-common-protos>=1.5.0', + 'googleapis-common-protos>=1.5.0, <2.0dev', ] setup( diff --git a/tox.ini b/tox.ini index 84402b1..b486ff4 100644 --- a/tox.ini +++ b/tox.ini @@ -18,7 +18,7 @@ commands = -mkdir src-gen [testenv:pep8] deps = flake8 -commands = flake8 --max-complexity=10 google test --ignore=E501 --exclude=src-gen +commands = flake8 --max-complexity=10 google test --ignore=E501 --exclude=src-gen,gapic [testenv:pylint-errors] deps = pylint @@ -27,7 +27,7 @@ commands = pylint \ -f colorized \ -E \ google test \ - --ignore=src-gen + --ignore=src-gen,gapic [testenv:pylint-warnings] deps = pylint @@ -37,7 +37,7 @@ commands = pylint \ -e W \ -r n \ google test \ - --ignore=src-gen,_grpc_google_auth.py,test__grpc_google_auth.py + --ignore=src-gen,_grpc_google_auth.py,test__grpc_google_auth.py,gapic [testenv:pylint-no-test-deps] deps = pylint @@ -46,7 +46,7 @@ commands = pylint \ -e E,W,R \ -d fixme,locally-disabled \ google \ - --ignore=src-gen,_grpc_google_auth.py,test__grpc_google_auth.py + --ignore=src-gen,_grpc_google_auth.py,test__grpc_google_auth.py,gapic [testenv:pylint-full] deps = pylint @@ -56,7 +56,7 @@ commands = pylint \ -e E,W,R \ -d fixme,locally-disabled \ google test \ - --ignore=src-gen + --ignore=src-gen,gapic [testenv:devenv] commands = From f15551d7ade08893ff8b289d4a3fd474ea754605 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 29 Dec 2016 11:18:10 -0800 Subject: [PATCH 05/20] Refactor _OperationFuture to be more pythonic --- google/gax/__init__.py | 155 ++++++++++++++++++++--------------------- test/test_gax.py | 77 +++++++++++++------- tox.ini | 2 +- 3 files changed, 128 insertions(+), 106 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 49dd253..b1b19dc 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -32,10 +32,12 @@ from __future__ import absolute_import import collections import logging +import multiprocessing +import random import time -import threading from concurrent import futures -from .errors import GaxError +from google.rpc import code_pb2 +from google.gax.errors import GaxError __version__ = '0.15.0' @@ -492,46 +494,27 @@ def __next__(self): def _from_any(pb_type, any_pb): - if not any_pb.Is(pb_type.DESCRIPTOR): - raise TypeError( - 'Could not convert {} to {}'.format( - any_pb.__class__.__name__, pb_type.__name__)) + """Converts an Any protobuf to the specified message type - return pb_type.FromString(any_pb.value) + Args: + pb_type (type): the type of the message that any_pb stores an instance + of. + any_pb (google.protobuf.any_pb2.Any): the object to be converted. - -class ResultError(GaxError): - """Thrown when an operation completes with an error result. + Returns: + An instance of the pb_type message. """ + msg = pb_type() + if any_pb.Unpack(msg): + return msg - @classmethod - def from_operation_error(cls, operation_error): - """Factory: construct instance from an operation error result. - - Args: - operation_error (google.rpc.Status): the error result of the - operation. - """ - return cls(operation_error.message, operation_error.code) - - def __init__(self, msg, code=None): - """Constructor. - - Args: - msg (string): describes the error that occurred. - code (int, optional): the status code, which should be an enum - value of google.rpc.Code. - """ - super(ResultError, self).__init__(msg) - self.code = code + raise TypeError( + 'Could not convert {} to {}'.format( + any_pb.__class__.__name__, pb_type.__name__)) - def __str__(self): - msg = super(ResultError, self).__str__() - return 'ResultError({}, with code {})'.format(msg, self.code) - -class OperationFuture(object): - """A Future which polls a service for completion via OperationsApi.""" +class _OperationFuture(object): + """A Future which polls a service for completion via OperationsClient.""" def __init__(self, operation, client, result_type, metadata_type, call_options=None): @@ -539,38 +522,36 @@ def __init__(self, operation, client, result_type, metadata_type, Args: operation (google.longrunning.Operation): the initial long-running - operation. - client (google.cloud.gapic.longrunning.operations_api.OperationsApi): - the client used to manage the long-running operation with an API - service. - result_type (type): the class type to be unpacked from the result. - metadata_type (type): the class type to be unpacked from the - metadata. + operation object. + client (google.gapic.longrunning.operations_client.OperationsClient): + a client for the long-running operation service. + result_type (type): the class type of the result. + metadata_type (type): the class type of the metadata. call_options (google.gax.CallOptions, optional): the call options that are used when reloading the operation. """ - self._first_operation = operation self._last_operation = operation self._client = client self._result_type = result_type self._metadata_type = metadata_type self._call_options = call_options - self._cancelled = False + self._done_clbks = [] + self._process = None def cancel(self): """If last Operation's value of `done` is true, returns false; - otherwise, issues OperationsApi.cancel_operation and returns true. + otherwise, issues OperationsClient.cancel_operation and returns true. """ - if not self._last_operation.done: - self._client.cancel_operation(self._last_operation.name) - self._cancelled = True + if self.done(): + return False - return self._cancelled + self._client.cancel_operation(self._last_operation.name) + return True def result(self, timeout=None): - """Enters polling loop on OperationsApi.get_operation, and once Operation.done - is true, then returns Operation.response if successful or throws - ResultError if not successful. + """Enters polling loop on OperationsClient.get_operation, and once + Operation.done is true, then returns Operation.response if successful + or throws GaxError if not successful. This method will wait up to timeout seconds. If the call hasn't completed in timeout seconds, then a concurrent.futures.TimeoutError @@ -578,7 +559,7 @@ def result(self, timeout=None): specified or None, there is no limit to the wait time. """ if not self._poll(timeout).HasField('response'): - raise ResultError.from_operation_error(self._last_operation.error) + raise GaxError(self._last_operation.error.message) return _from_any(self._result_type, self._last_operation.response) @@ -591,41 +572,55 @@ def exception(self, timeout=None): def cancelled(self): """Return True if the call was successfully cancelled.""" - return self._cancelled + self._get_operation() + return self._last_operation.HasField('error') and \ + self._last_operation.error.code == code_pb2.CANCELLED def done(self): - """Issues OperationsApi.get_operation and returns value of Operation.done.""" + """Issues OperationsClient.get_operation and returns value of + Operation.done. + """ return self._get_operation().done - def add_done_callback(self, done_clbk): - """Enters a polling loop on OperationsApi.get_operation, and once the - operation is done or cancelled, calls the function with this OperationFuture. + def add_done_callback(self, fn): # pylint: disable=invalid-name + """Enters a polling loop on OperationsClient.get_operation, and once the + operation is done or cancelled, calls the function with this + _OperationFuture. """ - def _execute_clbk(self, clbk): + def _execute_clbks(self): self._poll() - try: - clbk(self) - except Exception as ex: # pylint: disable=broad-except - _LOG.exception(ex) - - threading.Thread(target=_execute_clbk, args=(self, done_clbk)).start() + for clbk in self._done_clbks: + try: + clbk(self) + except Exception as ex: # pylint: disable=broad-except + _LOG.exception(ex) + + self._done_clbks.append(fn) + if self._process is None: + self._process = multiprocessing.Process( + target=_execute_clbks, args=(self,)) + self._process.start() + elif not self._process.is_alive() and self._last_operation.done: + _execute_clbks(self) def operation_name(self): - """Returns the value of Operation.name from the initial Operation object - returned from the first call. Blocks if the first call isn't done yet. + """Returns the value of Operation.name from the last call to + OperationsClient.get_operation (or if only the initial API call has + been made, the name from that first call). """ - return self._first_operation.name + return self._last_operation.name def metadata(self): - """Returns the value of Operation.metadata from the initial Operation object - returned from the first call. Blocks if the first call isn't done yet. + """Returns the value of Operation.metadata from the last call to + OperationsClient.get_operation (or if only the initial API call has + been made, the metadata from that first call). """ - return _from_any(self._metadata_type, self._first_operation.metadata) + return _from_any(self._metadata_type, self._last_operation.metadata) def last_operation_data(self): - """Returns the data from the last call to OperationsApi.get_operation (or if only - the initial API call has been made, the data from that first call). Blocks if - the first call isn't done yet. + """Returns the data from the last call to OperationsClient.get_operation + (or if only the initial API call has been made, the data from that + first call). """ return self._last_operation @@ -637,12 +632,16 @@ def _get_operation(self): return self._last_operation def _poll(self, timeout=None): + delay_mult = 2 + max_delay = 30 + delay = 1 start_time = time.time() - while timeout is None or time.time() < start_time + timeout: - if self._get_operation().done: + if self.done(): return self._last_operation - time.sleep(1) + sleep_time = random.uniform(0, delay) + time.sleep(sleep_time) + delay = min(delay * delay_mult, max_delay) raise futures.TimeoutError() diff --git a/test/test_gax.py b/test/test_gax.py index 9597c12..f3ce57d 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -32,6 +32,7 @@ from __future__ import absolute_import +import multiprocessing from concurrent import futures import mock @@ -40,10 +41,11 @@ from fixture_pb2 import Simple # pylint: disable=no-name-in-module,import-error from google.longrunning import operations_pb2 -from google.rpc import status_pb2 +from google.rpc import code_pb2, status_pb2 from google.gax import ( BundleOptions, CallOptions, _CallSettings, INITIAL_PAGE, OPTION_INHERIT, - RetryOptions, OperationFuture, ResultError) + RetryOptions, _OperationFuture) +from google.gax.errors import GaxError class TestBundleOptions(unittest2.TestCase): @@ -146,21 +148,45 @@ def _make_operation_future(self, *operations): mock_client = mock.Mock() mock_client.get_operation.side_effect = operations - return OperationFuture(operations[0], mock_client, Simple, Simple) + return _OperationFuture(operations[0], mock_client, Simple, Simple) - def test_cancelled_defaults_to_false(self): - operation_future = self._make_operation_future() - self.assertFalse(operation_future.cancelled()) + def test_cancel_issues_call_when_not_done(self): + operation = self._make_operation() - def test_cancel_changes_cancelled_to_true(self): - operation_future = self._make_operation_future() - operation_future.cancel() - self.assertTrue(operation_future.cancelled()) + mock_client = mock.Mock() + mock_client.get_operation.return_value = operation + mock_client.cancel_operation = mock.Mock() + + operation_future = _OperationFuture( + operation, mock_client, Simple, Simple) + + self.assertTrue(operation_future.cancel()) + + mock_client.cancel_operation.assert_called_with(self.OPERATION_NAME) def test_cancel_does_nothing_when_already_done(self): operation = self._make_operation(done=True) + + mock_client = mock.Mock() + mock_client.get_operation.return_value = operation + mock_client.cancel_operation = mock.Mock() + + operation_future = _OperationFuture( + operation, mock_client, Simple, Simple) + + self.assertFalse(operation_future.cancel()) + mock_client.cancel_operation.assert_not_called() + + def test_cancelled_true(self): + error = status_pb2.Status(code=code_pb2.CANCELLED) + operation = self._make_operation(error=error) + operation_future = self._make_operation_future(operation) + + self.assertTrue(operation_future.cancelled()) + + def test_cancelled_false(self): + operation = self._make_operation(error=status_pb2.Status()) operation_future = self._make_operation_future(operation) - operation_future.cancel() self.assertFalse(operation_future.cancelled()) def test_done_true(self): @@ -180,6 +206,7 @@ def test_metadata(self): metadata = Simple() operation = self._make_operation(metadata=metadata) operation_future = self._make_operation_future(operation) + self.assertEqual(metadata, operation_future.metadata()) def test_last_operation_data(self): @@ -191,12 +218,13 @@ def test_result_response(self): response = Simple() operation = self._make_operation(done=True, response=response) operation_future = self._make_operation_future(operation) + self.assertEqual(response, operation_future.result()) def test_result_error(self): operation = self._make_operation(done=True, error=status_pb2.Status()) operation_future = self._make_operation_future(operation) - self.assertRaises(ResultError, operation_future.result) + self.assertRaises(GaxError, operation_future.result) def test_result_timeout(self): operation_future = self._make_operation_future() @@ -206,6 +234,7 @@ def test_exception_error(self): error = status_pb2.Status() operation = self._make_operation(done=True, error=error) operation_future = self._make_operation_future(operation) + self.assertEqual(error, operation_future.exception()) def test_exception_response(self): @@ -217,24 +246,18 @@ def test_exception_timeout(self): operation_future = self._make_operation_future() self.assertRaises(futures.TimeoutError, operation_future.exception, 0) - @mock.patch('time.sleep') - @mock.patch('time.time') - def test_add_done_callback(self, mock_time, mock_sleep): - def incr_time(secs): - mock_time.return_value += secs - - mock_time.return_value = 0 - mock_sleep.side_effect = incr_time + def test_add_done_callback(self): + def done_clbk(ofuture): + ofuture.conn.send(True) + ofuture.conn.close() operation_future = self._make_operation_future( self._make_operation(), - self._make_operation(done=True)) + self._make_operation(done=True, response=Simple())) - mock_clbk1 = mock.Mock() - mock_clbk2 = mock.Mock() + parent_conn, child_conn = multiprocessing.Pipe(False) + operation_future.conn = child_conn - operation_future.add_done_callback(mock_clbk1) - operation_future.add_done_callback(mock_clbk2) + operation_future.add_done_callback(done_clbk) - mock_clbk1.assert_called_with(operation_future) - mock_clbk2.assert_called_with(operation_future) + self.assertTrue(parent_conn.recv()) diff --git a/tox.ini b/tox.ini index b486ff4..9616316 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ whitelist_externals = mkdir protoc commands = -mkdir src-gen -python -m grpc.tools.protoc -Itest --python_out=src-gen test/fixture.proto - -py.test --timeout=30 --cov-report html --cov-report=term --cov {toxinidir}/google + -py.test --timeout=30 --cov-report html --cov-report=term --cov {toxinidir}/google/gax [testenv:pep8] deps = flake8 From 1032b55696043179311fe2890acb9b435188c237 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 29 Dec 2016 13:00:27 -0800 Subject: [PATCH 06/20] Add JSON to package --- MANIFEST.in | 1 + setup.py | 1 + test/test_gax.py | 1 - 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/MANIFEST.in b/MANIFEST.in index 0c73842..b67d7bc 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1,2 @@ include README.rst LICENSE +global-include *.json diff --git a/setup.py b/setup.py index 3f68b66..726bbeb 100644 --- a/setup.py +++ b/setup.py @@ -83,4 +83,5 @@ ], tests_require=['pytest'], install_requires=install_requires, + include_package_data=True, ) diff --git a/test/test_gax.py b/test/test_gax.py index f3ce57d..7153fe4 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -39,7 +39,6 @@ import unittest2 from fixture_pb2 import Simple -# pylint: disable=no-name-in-module,import-error from google.longrunning import operations_pb2 from google.rpc import code_pb2, status_pb2 from google.gax import ( From 4ce09ca4a114f4ad65ae3af5571e59bc038c1084 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Tue, 3 Jan 2017 12:03:56 -0800 Subject: [PATCH 07/20] Add done callback pool --- google/gax/__init__.py | 50 +++++++++++++++++++++++++------------- test/test_gax.py | 54 +++++++++++++++++++++++------------------- 2 files changed, 62 insertions(+), 42 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index b1b19dc..47e7456 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -32,7 +32,7 @@ from __future__ import absolute_import import collections import logging -import multiprocessing +import multiprocessing as mp import random import time from concurrent import futures @@ -513,6 +513,13 @@ def _from_any(pb_type, any_pb): any_pb.__class__.__name__, pb_type.__name__)) +def _try_callback(target, clbk): + try: + clbk(target) + except Exception as ex: # pylint: disable=broad-except + _LOG.exception(ex) + + class _OperationFuture(object): """A Future which polls a service for completion via OperationsClient.""" @@ -535,7 +542,7 @@ def __init__(self, operation, client, result_type, metadata_type, self._result_type = result_type self._metadata_type = metadata_type self._call_options = call_options - self._done_clbks = [] + self._queue = mp.Manager().Queue() self._process = None def cancel(self): @@ -573,8 +580,8 @@ def exception(self, timeout=None): def cancelled(self): """Return True if the call was successfully cancelled.""" self._get_operation() - return self._last_operation.HasField('error') and \ - self._last_operation.error.code == code_pb2.CANCELLED + return (self._last_operation.HasField('error') and + self._last_operation.error.code == code_pb2.CANCELLED) def done(self): """Issues OperationsClient.get_operation and returns value of @@ -587,21 +594,11 @@ def add_done_callback(self, fn): # pylint: disable=invalid-name operation is done or cancelled, calls the function with this _OperationFuture. """ - def _execute_clbks(self): - self._poll() - for clbk in self._done_clbks: - try: - clbk(self) - except Exception as ex: # pylint: disable=broad-except - _LOG.exception(ex) - - self._done_clbks.append(fn) + self._queue.put(fn) if self._process is None: - self._process = multiprocessing.Process( - target=_execute_clbks, args=(self,)) - self._process.start() + mp.Process(target=self._execute_clbks).start() elif not self._process.is_alive() and self._last_operation.done: - _execute_clbks(self) + _try_callback(self, fn) def operation_name(self): """Returns the value of Operation.name from the last call to @@ -645,3 +642,22 @@ def _poll(self, timeout=None): delay = min(delay * delay_mult, max_delay) raise futures.TimeoutError() + + def _execute_clbks(self): + self._poll() + + # Create a new pool. + pool_size = mp.cpu_count() * 2 + pool = mp.Pool(processes=pool_size) + + # Actually send the final callback provided to the pool. + while not self._queue.empty(): + item = self._queue.get() + pool.apply_async(_try_callback, (self, item)) + + # We are done; nothing more will be sent to the pool. + # This is a prerequisite to `pool.join()`. + pool.close() + + # Block until the async callbacks have run. + pool.join() diff --git a/test/test_gax.py b/test/test_gax.py index 7153fe4..94b3acd 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -32,7 +32,7 @@ from __future__ import absolute_import -import multiprocessing +import multiprocessing as mp from concurrent import futures import mock @@ -121,6 +121,21 @@ def test_settings_merge_none(self): self.assertEqual(final.bundle_descriptor, settings.bundle_descriptor) +def _done_clbk(operation_future): + operation_future.test_queue.put(True) + + +class _FakeOperationsClient(object): + def __init__(self, operations): + self.operations = list(reversed(operations)) + + def get_operation(self, *args): # pylint: disable=unused-argument + return self.operations.pop() + + def cancel_operation(self, *args): # pylint: disable=unused-argument + pass + + class TestOperationFuture(unittest2.TestCase): OPERATION_NAME = 'operations/projects/foo/instances/bar/operations/123' @@ -144,37 +159,32 @@ def _make_operation_future(self, *operations): if not operations: operations = [self._make_operation()] - mock_client = mock.Mock() - mock_client.get_operation.side_effect = operations - - return _OperationFuture(operations[0], mock_client, Simple, Simple) + fake_client = _FakeOperationsClient(operations) + return _OperationFuture(operations[0], fake_client, Simple, Simple) def test_cancel_issues_call_when_not_done(self): operation = self._make_operation() - mock_client = mock.Mock() - mock_client.get_operation.return_value = operation - mock_client.cancel_operation = mock.Mock() + fake_client = _FakeOperationsClient([operation]) + fake_client.cancel_operation = mock.Mock() operation_future = _OperationFuture( - operation, mock_client, Simple, Simple) + operation, fake_client, Simple, Simple) self.assertTrue(operation_future.cancel()) - - mock_client.cancel_operation.assert_called_with(self.OPERATION_NAME) + fake_client.cancel_operation.assert_called_with(self.OPERATION_NAME) def test_cancel_does_nothing_when_already_done(self): operation = self._make_operation(done=True) - mock_client = mock.Mock() - mock_client.get_operation.return_value = operation - mock_client.cancel_operation = mock.Mock() + fake_client = _FakeOperationsClient([operation]) + fake_client.cancel_operation = mock.Mock() operation_future = _OperationFuture( - operation, mock_client, Simple, Simple) + operation, fake_client, Simple, Simple) self.assertFalse(operation_future.cancel()) - mock_client.cancel_operation.assert_not_called() + fake_client.cancel_operation.assert_not_called() def test_cancelled_true(self): error = status_pb2.Status(code=code_pb2.CANCELLED) @@ -246,17 +256,11 @@ def test_exception_timeout(self): self.assertRaises(futures.TimeoutError, operation_future.exception, 0) def test_add_done_callback(self): - def done_clbk(ofuture): - ofuture.conn.send(True) - ofuture.conn.close() - operation_future = self._make_operation_future( self._make_operation(), self._make_operation(done=True, response=Simple())) + operation_future.test_queue = mp.Manager().Queue() - parent_conn, child_conn = multiprocessing.Pipe(False) - operation_future.conn = child_conn - - operation_future.add_done_callback(done_clbk) + operation_future.add_done_callback(_done_clbk) - self.assertTrue(parent_conn.recv()) + self.assertTrue(operation_future.test_queue.get()) From c33e1059df076f76c5e215f18427b10128256cf9 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Tue, 3 Jan 2017 16:22:53 -0800 Subject: [PATCH 08/20] Change _last_operation to _operation --- google/gax/__init__.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 47e7456..21b287f 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -533,11 +533,11 @@ def __init__(self, operation, client, result_type, metadata_type, client (google.gapic.longrunning.operations_client.OperationsClient): a client for the long-running operation service. result_type (type): the class type of the result. - metadata_type (type): the class type of the metadata. + metadata_type (type, optional): the class type of the metadata. call_options (google.gax.CallOptions, optional): the call options that are used when reloading the operation. """ - self._last_operation = operation + self._operation = operation self._client = client self._result_type = result_type self._metadata_type = metadata_type @@ -552,7 +552,7 @@ def cancel(self): if self.done(): return False - self._client.cancel_operation(self._last_operation.name) + self._client.cancel_operation(self._operation.name) return True def result(self, timeout=None): @@ -566,22 +566,22 @@ def result(self, timeout=None): specified or None, there is no limit to the wait time. """ if not self._poll(timeout).HasField('response'): - raise GaxError(self._last_operation.error.message) + raise GaxError(self._operation.error.message) - return _from_any(self._result_type, self._last_operation.response) + return _from_any(self._result_type, self._operation.response) def exception(self, timeout=None): """Similar to result(), except returns the exception if any.""" if self._poll(timeout).HasField('error'): - return self._last_operation.error + return self._operation.error return None def cancelled(self): """Return True if the call was successfully cancelled.""" self._get_operation() - return (self._last_operation.HasField('error') and - self._last_operation.error.code == code_pb2.CANCELLED) + return (self._operation.HasField('error') and + self._operation.error.code == code_pb2.CANCELLED) def done(self): """Issues OperationsClient.get_operation and returns value of @@ -597,7 +597,7 @@ def add_done_callback(self, fn): # pylint: disable=invalid-name self._queue.put(fn) if self._process is None: mp.Process(target=self._execute_clbks).start() - elif not self._process.is_alive() and self._last_operation.done: + elif not self._process.is_alive() and self._operation.done: _try_callback(self, fn) def operation_name(self): @@ -605,28 +605,31 @@ def operation_name(self): OperationsClient.get_operation (or if only the initial API call has been made, the name from that first call). """ - return self._last_operation.name + return self._operation.name def metadata(self): """Returns the value of Operation.metadata from the last call to OperationsClient.get_operation (or if only the initial API call has been made, the metadata from that first call). """ - return _from_any(self._metadata_type, self._last_operation.metadata) + if self._operation.metadata is None: + return None + + return _from_any(self._metadata_type, self._operation.metadata) def last_operation_data(self): """Returns the data from the last call to OperationsClient.get_operation (or if only the initial API call has been made, the data from that first call). """ - return self._last_operation + return self._operation def _get_operation(self): - if not self._last_operation.done: - self._last_operation = self._client.get_operation( - self._last_operation.name, self._call_options) + if not self._operation.done: + self._operation = self._client.get_operation( + self._operation.name, self._call_options) - return self._last_operation + return self._operation def _poll(self, timeout=None): delay_mult = 2 @@ -635,7 +638,7 @@ def _poll(self, timeout=None): start_time = time.time() while timeout is None or time.time() < start_time + timeout: if self.done(): - return self._last_operation + return self._operation sleep_time = random.uniform(0, delay) time.sleep(sleep_time) From d7c1bdd008520ebff5ea2f486626166690578497 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Wed, 4 Jan 2017 12:16:22 -0800 Subject: [PATCH 09/20] Removed Pool from add_done_callback Reducing concurrency simplifies the implementation and makes it closer to the expected behavior of c.f.Future. --- google/gax/__init__.py | 27 ++++++++------------------- test/test_gax.py | 12 ++++++++++++ 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index 21b287f..fbc1e9e 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -542,7 +542,7 @@ def __init__(self, operation, client, result_type, metadata_type, self._result_type = result_type self._metadata_type = metadata_type self._call_options = call_options - self._queue = mp.Manager().Queue() + self._done_clbks = [] self._process = None def cancel(self): @@ -592,11 +592,13 @@ def done(self): def add_done_callback(self, fn): # pylint: disable=invalid-name """Enters a polling loop on OperationsClient.get_operation, and once the operation is done or cancelled, calls the function with this - _OperationFuture. + _OperationFuture. Added callables are called in the order that they + were added. """ - self._queue.put(fn) + self._done_clbks.append(fn) if self._process is None: - mp.Process(target=self._execute_clbks).start() + self._process = mp.Process(target=self._execute_clbks) + self._process.start() elif not self._process.is_alive() and self._operation.done: _try_callback(self, fn) @@ -649,18 +651,5 @@ def _poll(self, timeout=None): def _execute_clbks(self): self._poll() - # Create a new pool. - pool_size = mp.cpu_count() * 2 - pool = mp.Pool(processes=pool_size) - - # Actually send the final callback provided to the pool. - while not self._queue.empty(): - item = self._queue.get() - pool.apply_async(_try_callback, (self, item)) - - # We are done; nothing more will be sent to the pool. - # This is a prerequisite to `pool.join()`. - pool.close() - - # Block until the async callbacks have run. - pool.join() + for done_clbk in self._done_clbks: + _try_callback(self, done_clbk) diff --git a/test/test_gax.py b/test/test_gax.py index 94b3acd..a06740a 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -264,3 +264,15 @@ def test_add_done_callback(self): operation_future.add_done_callback(_done_clbk) self.assertTrue(operation_future.test_queue.get()) + + def test_add_done_callback_when_already_done(self): + operation_future = self._make_operation_future( + self._make_operation(done=True, response=Simple())) + operation_future.test_queue = mp.Manager().Queue() + + operation_future.add_done_callback(_done_clbk) + self.assertTrue(operation_future.test_queue.get()) + + operation_future._process.join() # pylint: disable=protected-access + operation_future.add_done_callback(_done_clbk) + self.assertTrue(operation_future.test_queue.get()) From 03c3171dcc667edd86fa8ae4863510ca17c9f31d Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Wed, 4 Jan 2017 15:28:02 -0800 Subject: [PATCH 10/20] Factor out common retry code into its own module --- google/gax/__init__.py | 58 +++++++++----- google/gax/api_callable.py | 102 ++---------------------- google/gax/retry.py | 157 +++++++++++++++++++++++++++++++++++++ test/test_api_callable.py | 125 +---------------------------- test/test_gax.py | 13 ++- test/test_retry.py | 148 ++++++++++++++++++++++++++++++++++ 6 files changed, 356 insertions(+), 247 deletions(-) create mode 100644 google/gax/retry.py create mode 100644 test/test_retry.py diff --git a/google/gax/__init__.py b/google/gax/__init__.py index fbc1e9e..e826c24 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -33,11 +33,12 @@ import collections import logging import multiprocessing as mp -import random -import time -from concurrent import futures + +from grpc import RpcError, StatusCode from google.rpc import code_pb2 + from google.gax.errors import GaxError +from google.gax.retry import retryable __version__ = '0.15.0' @@ -45,6 +46,10 @@ _LOG = logging.getLogger(__name__) + +_MILLIS_PER_SEC = 1000 + + INITIAL_PAGE = object() """A placeholder for the page token passed into an initial paginated request.""" @@ -520,6 +525,16 @@ def _try_callback(target, clbk): _LOG.exception(ex) +class _DeadlineExceededError(RpcError, GaxError): + + def __init__(self): + super(_DeadlineExceededError, self).__init__('Deadline Exceeded') + + def code(self): # pylint: disable=no-self-use + """Always returns StatusCode.DEADLINE_EXCEEDED""" + return StatusCode.DEADLINE_EXCEEDED + + class _OperationFuture(object): """A Future which polls a service for completion via OperationsClient.""" @@ -561,9 +576,9 @@ def result(self, timeout=None): or throws GaxError if not successful. This method will wait up to timeout seconds. If the call hasn't - completed in timeout seconds, then a concurrent.futures.TimeoutError - will be raised. timeout can be an int or float. If timeout is not - specified or None, there is no limit to the wait time. + completed in timeout seconds, then a RetryError will be raised. + timeout can be an int or float. If timeout is not specified or None, + there is no limit to the wait time. """ if not self._poll(timeout).HasField('response'): raise GaxError(self._operation.error.message) @@ -633,20 +648,25 @@ def _get_operation(self): return self._operation + def _done_check(self, _timeout): + if self.done(): + return self._operation + + raise _DeadlineExceededError() + def _poll(self, timeout=None): - delay_mult = 2 - max_delay = 30 - delay = 1 - start_time = time.time() - while timeout is None or time.time() < start_time + timeout: - if self.done(): - return self._operation - - sleep_time = random.uniform(0, delay) - time.sleep(sleep_time) - delay = min(delay * delay_mult, max_delay) - - raise futures.TimeoutError() + if timeout is None: + backoff_settings = BackoffSettings( + 1000, 2, 30000, None, None, None, None) + else: + backoff_settings = BackoffSettings( + 1000, 2, 30000, 0, 0, 0, timeout * _MILLIS_PER_SEC) + + retry_options = RetryOptions( + [StatusCode.DEADLINE_EXCEEDED], backoff_settings) + retryable_done_check = retryable(self._done_check, retry_options) + + return retryable_done_check(self) def _execute_clbks(self): self._poll() diff --git a/google/gax/api_callable.py b/google/gax/api_callable.py index 00c7763..05473c8 100644 --- a/google/gax/api_callable.py +++ b/google/gax/api_callable.py @@ -30,107 +30,17 @@ """Provides function wrappers that implement page streaming and retrying.""" from __future__ import absolute_import, division -import random -import time from future import utils -from . import (BackoffSettings, BundleOptions, bundling, _CallSettings, config, - errors, PageIterator, ResourceIterator, RetryOptions) -from .errors import RetryError +from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings, + config, errors, PageIterator, ResourceIterator, + RetryOptions) +from google.gax.retry import add_timeout_arg, retryable _MILLIS_PER_SECOND = 1000 -def _add_timeout_arg(a_func, timeout, **kwargs): - """Updates a_func so that it gets called with the timeout as its final arg. - - This converts a callable, a_func, into another callable with an additional - positional arg. - - Args: - a_func (callable): a callable to be updated - timeout (int): to be added to the original callable as it final positional - arg. - - - Returns: - callable: the original callable updated to the timeout arg - """ - - def inner(*args): - """Updates args with the timeout.""" - updated_args = args + (timeout,) - return a_func(*updated_args, **kwargs) - - return inner - - -def _retryable(a_func, retry, **kwargs): - """Creates a function equivalent to a_func, but that retries on certain - exceptions. - - Args: - a_func (callable): A callable. - retry (RetryOptions): Configures the exceptions upon which the callable - should retry, and the parameters to the exponential backoff retry - algorithm. - - Returns: - A function that will retry on exception. - """ - - delay_mult = retry.backoff_settings.retry_delay_multiplier - max_delay = (retry.backoff_settings.max_retry_delay_millis / - _MILLIS_PER_SECOND) - timeout_mult = retry.backoff_settings.rpc_timeout_multiplier - max_timeout = (retry.backoff_settings.max_rpc_timeout_millis / - _MILLIS_PER_SECOND) - total_timeout = (retry.backoff_settings.total_timeout_millis / - _MILLIS_PER_SECOND) - - def inner(*args): - """Equivalent to ``a_func``, but retries upon transient failure. - - Retrying is done through an exponential backoff algorithm configured - by the options in ``retry``. - """ - delay = retry.backoff_settings.initial_retry_delay_millis - timeout = (retry.backoff_settings.initial_rpc_timeout_millis / - _MILLIS_PER_SECOND) - exc = RetryError('Retry total timeout exceeded before any' - 'response was received') - now = time.time() - deadline = now + total_timeout - - while now < deadline: - try: - to_call = _add_timeout_arg(a_func, timeout, **kwargs) - return to_call(*args) - - # pylint: disable=broad-except - except Exception as exception: - if config.exc_to_code(exception) not in retry.retry_codes: - raise RetryError( - 'Exception occurred in retry method that was not' - ' classified as transient', exception) - - # pylint: disable=redefined-variable-type - exc = RetryError('Retry total timeout exceeded with exception', - exception) - to_sleep = random.uniform(0, delay) - time.sleep(to_sleep / _MILLIS_PER_SECOND) - now = time.time() - delay = min(delay * delay_mult, max_delay) - timeout = min( - timeout * timeout_mult, max_timeout, deadline - now) - continue - - raise exc - - return inner - - def _bundleable(desc): """Creates a function that transforms an API call into a bundling call. @@ -472,10 +382,10 @@ def inner(request, options=None): """Invoke with the actual settings.""" this_settings = settings.merge(options) if this_settings.retry and this_settings.retry.retry_codes: - api_call = _retryable( + api_call = retryable( func, this_settings.retry, **this_settings.kwargs) else: - api_call = _add_timeout_arg( + api_call = add_timeout_arg( func, this_settings.timeout, **this_settings.kwargs) api_call = _catch_errors(api_call, config.API_ERRORS) return api_caller(api_call, this_settings, request) diff --git a/google/gax/retry.py b/google/gax/retry.py new file mode 100644 index 0000000..94bea27 --- /dev/null +++ b/google/gax/retry.py @@ -0,0 +1,157 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Provides function wrappers that implement retrying.""" + +from __future__ import absolute_import, division +import random +import time + +from google.gax import config, errors + +_MILLIS_PER_SECOND = 1000 + + +def _retryable_without_timeout(a_func, retry_options, **kwargs): + delay_mult = retry_options.backoff_settings.retry_delay_multiplier + max_delay = (retry_options.backoff_settings.max_retry_delay_millis / + _MILLIS_PER_SECOND) + + def inner(*args): + """Equivalent to ``a_func``, but retries upon transient failure. + + Retrying is done through an exponential backoff algorithm configured + by the options in ``retry``. + """ + delay = retry_options.backoff_settings.initial_retry_delay_millis + while True: + try: + return a_func(*args, **kwargs) + except errors.RetryError as exception: + raise exception + except Exception as exception: # pylint: disable=broad-except + code = config.exc_to_code(exception) + if code not in retry_options.retry_codes: + raise errors.RetryError( + 'Exception occurred in retry method that was not' + ' classified as transient', exception) + + to_sleep = random.uniform(0, delay) + time.sleep(to_sleep / _MILLIS_PER_SECOND) + delay = min(delay * delay_mult, max_delay) + + return inner + + +def add_timeout_arg(a_func, timeout, **kwargs): + """Updates a_func so that it gets called with the timeout as its final arg. + + This converts a callable, a_func, into another callable with an additional + positional arg. + + Args: + a_func (callable): a callable to be updated + timeout (int): to be added to the original callable as it final positional + arg. + + + Returns: + callable: the original callable updated to the timeout arg + """ + + def inner(*args): + """Updates args with the timeout.""" + updated_args = args + (timeout,) + return a_func(*updated_args, **kwargs) + + return inner + + +def _retryable_with_timeout(a_func, retry_options, **kwargs): + timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier + max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / + _MILLIS_PER_SECOND) + total_timeout = (retry_options.backoff_settings.total_timeout_millis / + _MILLIS_PER_SECOND) + + def inner(*args): + """Equivalent to ``a_func``, but retries with a timeout upon transient + failure. + """ + timeout = (retry_options.backoff_settings.initial_rpc_timeout_millis / + _MILLIS_PER_SECOND) + exc = errors.RetryError('Retry total timeout exceeded before any' + 'response was received') + now = time.time() + deadline = now + total_timeout + while now < deadline: + try: + to_call = add_timeout_arg(a_func, timeout, **kwargs) + return to_call(*args) + except Exception as exception: # pylint: disable=broad-except + code = config.exc_to_code(exception) + if code not in retry_options.retry_codes: + raise exception + + # pylint: disable=redefined-variable-type + exc = errors.RetryError( + 'Retry total timeout exceeded with exception', exception) + now = time.time() + timeout = min( + timeout * timeout_mult, max_timeout, deadline - now) + + raise exc + + return _retryable_without_timeout(inner, retry_options, **kwargs) + + +def _has_timeout_settings(backoff_settings): + return backoff_settings.rpc_timeout_multiplier is not None and \ + backoff_settings.max_rpc_timeout_millis is not None and \ + backoff_settings.total_timeout_millis is not None and \ + backoff_settings.initial_rpc_timeout_millis is not None + + +def retryable(a_func, retry_options, **kwargs): + """Creates a function equivalent to a_func, but that retries on certain + exceptions. + + Args: + a_func (callable): A callable. + retry_options (RetryOptions): Configures the exceptions upon which the + callable should retry, and the parameters to the exponential backoff + retry algorithm. + + Returns: + A function that will retry on exception. + """ + if _has_timeout_settings(retry_options.backoff_settings): + return _retryable_with_timeout(a_func, retry_options, **kwargs) + + return _retryable_without_timeout(a_func, retry_options, **kwargs) diff --git a/test/test_api_callable.py b/test/test_api_callable.py index f30ab66..d1f5e18 100644 --- a/test/test_api_callable.py +++ b/test/test_api_callable.py @@ -37,7 +37,7 @@ from google.gax import ( api_callable, bundling, BackoffSettings, BundleDescriptor, BundleOptions, _CallSettings, CallOptions, INITIAL_PAGE, PageDescriptor, RetryOptions) -from google.gax.errors import GaxError, RetryError +from google.gax.errors import GaxError import grpc @@ -156,129 +156,6 @@ def test_retry(self, mock_exc_to_code, mock_time): self.assertEqual(my_callable(None), 1729) self.assertEqual(mock_call.call_count, to_attempt) - @mock.patch('time.time') - def test_no_retry_if_no_codes(self, mock_time): - retry = RetryOptions([], BackoffSettings(1, 2, 3, 4, 5, 6, 7)) - - mock_call = mock.Mock() - mock_call.side_effect = CustomException('', _FAKE_STATUS_CODE_1) - mock_time.return_value = 0 - - settings = _CallSettings(timeout=0, retry=retry) - my_callable = api_callable.create_api_call(mock_call, settings) - self.assertRaises(CustomException, my_callable, None) - self.assertEqual(mock_call.call_count, 1) - - @mock.patch('time.time') - @mock.patch('google.gax.config.exc_to_code') - def test_retry_aborts_simple(self, mock_exc_to_code, mock_time): - def fake_call(dummy_request, dummy_timeout): - raise CustomException('', _FAKE_STATUS_CODE_1) - - retry = RetryOptions( - [_FAKE_STATUS_CODE_1], - BackoffSettings(0, 0, 0, 0, 0, 0, 1)) - mock_time.side_effect = [0, 2] - mock_exc_to_code.side_effect = lambda e: e.code - settings = _CallSettings(timeout=0, retry=retry) - my_callable = api_callable.create_api_call(fake_call, settings) - - try: - my_callable(None) - except RetryError as exc: - self.assertIsInstance(exc.cause, CustomException) - - @mock.patch('time.time') - @mock.patch('google.gax.config.exc_to_code') - def test_retry_times_out_simple(self, mock_exc_to_code, mock_time): - mock_exc_to_code.side_effect = lambda e: e.code - to_attempt = 3 - retry = RetryOptions( - [_FAKE_STATUS_CODE_1], - BackoffSettings(0, 0, 0, 0, 0, 0, 1)) - mock_call = mock.Mock() - mock_call.side_effect = CustomException('', _FAKE_STATUS_CODE_1) - mock_time.side_effect = ([0] * to_attempt + [2]) - settings = _CallSettings(timeout=0, retry=retry) - my_callable = api_callable.create_api_call(mock_call, settings) - - try: - my_callable(None) - except RetryError as exc: - self.assertIsInstance(exc.cause, CustomException) - - self.assertEqual(mock_call.call_count, to_attempt) - - @mock.patch('time.time') - @mock.patch('google.gax.config.exc_to_code') - def test_retry_aborts_on_unexpected_exception( - self, mock_exc_to_code, mock_time): - mock_exc_to_code.side_effect = lambda e: e.code - retry = RetryOptions( - [_FAKE_STATUS_CODE_1], - BackoffSettings(0, 0, 0, 0, 0, 0, 1)) - mock_call = mock.Mock() - mock_call.side_effect = CustomException('', _FAKE_STATUS_CODE_2) - mock_time.return_value = 0 - settings = _CallSettings(timeout=0, retry=retry) - my_callable = api_callable.create_api_call(mock_call, settings) - self.assertRaises(Exception, my_callable, None) - self.assertEqual(mock_call.call_count, 1) - - @mock.patch('time.time') - def test_retry_times_out_no_response(self, mock_time): - mock_time.return_value = 1 - retry = RetryOptions( - [_FAKE_STATUS_CODE_1], - BackoffSettings(0, 0, 0, 0, 0, 0, 0)) - settings = _CallSettings(timeout=0, retry=retry) - my_callable = api_callable.create_api_call(lambda: None, settings) - - self.assertRaises(RetryError, my_callable, None) - - @mock.patch('time.sleep') - @mock.patch('time.time') - @mock.patch('google.gax.config.exc_to_code') - def test_retry_exponential_backoff(self, mock_exc_to_code, mock_time, - mock_sleep): - # pylint: disable=too-many-locals - mock_exc_to_code.side_effect = lambda e: e.code - MILLIS_PER_SEC = 1000 - mock_time.return_value = 0 - - def incr_time(secs): - mock_time.return_value += secs - - def api_call(dummy_request, timeout, **dummy_kwargs): - incr_time(timeout) - raise CustomException(str(timeout), _FAKE_STATUS_CODE_1) - - mock_call = mock.Mock() - mock_sleep.side_effect = incr_time - mock_call.side_effect = api_call - - params = BackoffSettings(3, 2, 24, 5, 2, 80, 2500) - retry = RetryOptions([_FAKE_STATUS_CODE_1], params) - settings = _CallSettings(timeout=0, retry=retry) - my_callable = api_callable.create_api_call(mock_call, settings) - - try: - my_callable(None) - except RetryError as exc: - self.assertIsInstance(exc.cause, CustomException) - - self.assertGreaterEqual(mock_time(), - params.total_timeout_millis / MILLIS_PER_SEC) - - # Very rough bounds - calls_lower_bound = params.total_timeout_millis / ( - params.max_retry_delay_millis + params.max_rpc_timeout_millis) - self.assertGreater(mock_call.call_count, calls_lower_bound) - - calls_upper_bound = (params.total_timeout_millis / - params.initial_retry_delay_millis) - self.assertLess(mock_call.call_count, calls_upper_bound) - def test_page_streaming(self): # A mock grpc function that page streams a list of consecutive # integers, returning `page_size` integers with each call and using diff --git a/test/test_gax.py b/test/test_gax.py index a06740a..d3fb12e 100644 --- a/test/test_gax.py +++ b/test/test_gax.py @@ -27,14 +27,11 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -# pylint: disable=missing-docstring,no-self-use,no-init,invalid-name +# pylint: disable=missing-docstring,no-self-use,no-init,invalid-name,protected-access """Unit tests for gax package globals.""" from __future__ import absolute_import - import multiprocessing as mp -from concurrent import futures - import mock import unittest2 @@ -44,7 +41,7 @@ from google.gax import ( BundleOptions, CallOptions, _CallSettings, INITIAL_PAGE, OPTION_INHERIT, RetryOptions, _OperationFuture) -from google.gax.errors import GaxError +from google.gax.errors import GaxError, RetryError class TestBundleOptions(unittest2.TestCase): @@ -237,7 +234,7 @@ def test_result_error(self): def test_result_timeout(self): operation_future = self._make_operation_future() - self.assertRaises(futures.TimeoutError, operation_future.result, 0) + self.assertRaises(RetryError, operation_future.result, 0) def test_exception_error(self): error = status_pb2.Status() @@ -253,7 +250,7 @@ def test_exception_response(self): def test_exception_timeout(self): operation_future = self._make_operation_future() - self.assertRaises(futures.TimeoutError, operation_future.exception, 0) + self.assertRaises(RetryError, operation_future.exception, 0) def test_add_done_callback(self): operation_future = self._make_operation_future( @@ -273,6 +270,6 @@ def test_add_done_callback_when_already_done(self): operation_future.add_done_callback(_done_clbk) self.assertTrue(operation_future.test_queue.get()) - operation_future._process.join() # pylint: disable=protected-access + operation_future._process.join() operation_future.add_done_callback(_done_clbk) self.assertTrue(operation_future.test_queue.get()) diff --git a/test/test_retry.py b/test/test_retry.py new file mode 100644 index 0000000..f4c0cea --- /dev/null +++ b/test/test_retry.py @@ -0,0 +1,148 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# pylint: disable=missing-docstring,invalid-name +"""Unit tests for retry""" + +from __future__ import absolute_import, division +import mock +import unittest2 + +from google.gax import (errors, retry, BackoffSettings, RetryOptions) + +_MILLIS_PER_SEC = 1000 + + +_FAKE_STATUS_CODE_1 = object() + + +class CustomException(Exception): + def __init__(self, msg, code): + super(CustomException, self).__init__(msg) + self.code = code + + +class TestRetry(unittest2.TestCase): + + @mock.patch('google.gax.config.exc_to_code') + @mock.patch('time.time') + def test_retryable_without_timeout(self, mock_time, mock_exc_to_code): + mock_time.return_value = 0 + mock_exc_to_code.side_effect = lambda e: e.code + + mock_call = mock.Mock() + mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1), + mock.DEFAULT] + mock_call.return_value = 1729 + + retry_options = RetryOptions( + [_FAKE_STATUS_CODE_1], + BackoffSettings(0, 0, 0, None, None, None, None)) + + my_callable = retry.retryable(mock_call, retry_options) + + self.assertEqual(my_callable(None), 1729) + self.assertEqual(2, mock_call.call_count) + + @mock.patch('google.gax.config.exc_to_code') + @mock.patch('time.time') + def test_retryable_with_timeout(self, mock_time, mock_exc_to_code): + mock_time.return_value = 1 + mock_exc_to_code.side_effect = lambda e: e.code + + mock_call = mock.Mock() + mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1), + mock.DEFAULT] + mock_call.return_value = 1729 + + retry_options = RetryOptions( + [_FAKE_STATUS_CODE_1], + BackoffSettings(0, 0, 0, 0, 0, 0, 0)) + + my_callable = retry.retryable(mock_call, retry_options) + + self.assertRaises(errors.RetryError, my_callable) + self.assertEqual(0, mock_call.call_count) + + @mock.patch('google.gax.config.exc_to_code') + @mock.patch('time.time') + def test_retryable_when_no_codes(self, mock_time, mock_exc_to_code): + mock_time.return_value = 0 + mock_exc_to_code.side_effect = lambda e: e.code + + mock_call = mock.Mock() + mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1), + mock.DEFAULT] + mock_call.return_value = 1729 + + retry_options = RetryOptions( + [], + BackoffSettings(0, 0, 0, 0, 0, 0, 1)) + + my_callable = retry.retryable(mock_call, retry_options) + + self.assertRaises(errors.RetryError, my_callable) + self.assertEqual(1, mock_call.call_count) + + @mock.patch('google.gax.config.exc_to_code') + @mock.patch('time.sleep') + @mock.patch('time.time') + def test_retryable_exponential_backoff( + self, mock_time, mock_sleep, mock_exc_to_code): + def incr_time(secs): + mock_time.return_value += secs + + def api_call(timeout): + incr_time(timeout) + raise CustomException(str(timeout), _FAKE_STATUS_CODE_1) + + mock_time.return_value = 0 + mock_sleep.side_effect = incr_time + mock_exc_to_code.side_effect = lambda e: e.code + + mock_call = mock.Mock() + mock_call.side_effect = api_call + + params = BackoffSettings(3, 2, 24, 5, 2, 80, 2500) + retry_options = RetryOptions([_FAKE_STATUS_CODE_1], params) + + my_callable = retry.retryable(mock_call, retry_options) + + self.assertRaises(errors.RetryError, my_callable) + self.assertGreaterEqual(mock_time(), + params.total_timeout_millis / _MILLIS_PER_SEC) + + # Very rough bounds + calls_lower_bound = params.total_timeout_millis / ( + params.max_retry_delay_millis + params.max_rpc_timeout_millis) + self.assertGreater(mock_call.call_count, calls_lower_bound) + + calls_upper_bound = (params.total_timeout_millis / + params.initial_retry_delay_millis) + self.assertLess(mock_call.call_count, calls_upper_bound) From cc5c20603e6c776196f21d4b2b4f8e6ff03aba09 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 5 Jan 2017 10:35:26 -0800 Subject: [PATCH 11/20] Upgrade google-auth from 0.2.0 to 0.5.0. Trying to fix Travis CI error: "AttributeError: '_NamespacePath' object has no attribute 'sort'" --- test-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-requirements.txt b/test-requirements.txt index 37221da..a0fb2bd 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,7 +4,7 @@ pytest-cov>=1.8.1 pytest-timeout>=1.0.0 unittest2>=1.1.0 grpcio-tools>=1.0.0 -google-auth>=0.2.0 +google-auth>=0.5.0 requests>=2.11.1 httplib2>=0.9.2 google-auth-httplib2>=0.0.1 From cac5d99297d94e898a91c8f0be2f4b4613659d19 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 5 Jan 2017 11:01:25 -0800 Subject: [PATCH 12/20] Add setuptools to test-requirements.txt. Still trying to fix the Travis CI error. --- test-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/test-requirements.txt b/test-requirements.txt index a0fb2bd..756e949 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,3 +8,4 @@ google-auth>=0.5.0 requests>=2.11.1 httplib2>=0.9.2 google-auth-httplib2>=0.0.1 +setuptools>=32.3.1 From 85efdd414fba39843bf348b87c8d56da7a543ac3 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 5 Jan 2017 11:25:34 -0800 Subject: [PATCH 13/20] Upgrade pylint to 1.6.4. Still fixing Travis CI error. --- test-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-requirements.txt b/test-requirements.txt index 756e949..14eea94 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,4 +8,4 @@ google-auth>=0.5.0 requests>=2.11.1 httplib2>=0.9.2 google-auth-httplib2>=0.0.1 -setuptools>=32.3.1 +pylint>=1.6.4 From e680dcd1905694b887ffe356cbcf2ab4deb754ff Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 5 Jan 2017 11:45:35 -0800 Subject: [PATCH 14/20] Replace duplicate pylint install with astroid install --- test-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-requirements.txt b/test-requirements.txt index 14eea94..6db04ed 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,4 +8,4 @@ google-auth>=0.5.0 requests>=2.11.1 httplib2>=0.9.2 google-auth-httplib2>=0.0.1 -pylint>=1.6.4 +astroid>=1.4.9 From 6ff6e5f77455fc2b5d2e0aa63b6ce9b6016b01ea Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 5 Jan 2017 13:00:58 -0800 Subject: [PATCH 15/20] Added usedevelop = True to tox config. This should actually fix the Travis CI error. --- test-requirements.txt | 1 - tox.ini | 10 ++++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/test-requirements.txt b/test-requirements.txt index 6db04ed..a0fb2bd 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -8,4 +8,3 @@ google-auth>=0.5.0 requests>=2.11.1 httplib2>=0.9.2 google-auth-httplib2>=0.0.1 -astroid>=1.4.9 diff --git a/tox.ini b/tox.ini index 9616316..675849b 100644 --- a/tox.ini +++ b/tox.ini @@ -1,10 +1,11 @@ [tox] envlist = py27,py34,py35,pep8,pylint-errors,pylint-full,pylint-no-test-deps -[tox:travis] -2.7 = py27,pep8,pylint-full,pylint-no-test-deps,docs -3.4 = py34,pep8,pylint-full,pylint-no-test-deps -3.5 = py35,pep8,pylint-full,pylint-no-test-deps +[travis] +python = + 2.7: py27,pep8,pylint-full,pylint-no-test-deps,docs + 3.4: py34,pep8,pylint-full,pylint-no-test-deps + 3.5: py35,pep8,pylint-full,pylint-no-test-deps [testenv] setenv = @@ -17,6 +18,7 @@ commands = -mkdir src-gen -py.test --timeout=30 --cov-report html --cov-report=term --cov {toxinidir}/google/gax [testenv:pep8] +usedevelop = True deps = flake8 commands = flake8 --max-complexity=10 google test --ignore=E501 --exclude=src-gen,gapic From bc98103d6e0c9cf005b3b16a8a50d86da5513781 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Thu, 5 Jan 2017 16:15:09 -0800 Subject: [PATCH 16/20] Consolidate retryable functions into one. Prevents double while loop when retrying with timeout. --- google/gax/__init__.py | 2 +- google/gax/api_callable.py | 7 +- google/gax/retry.py | 130 +++++++++++++++++-------------------- tox.ini | 1 - 4 files changed, 62 insertions(+), 78 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index e826c24..cec94bf 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -666,7 +666,7 @@ def _poll(self, timeout=None): [StatusCode.DEADLINE_EXCEEDED], backoff_settings) retryable_done_check = retryable(self._done_check, retry_options) - return retryable_done_check(self) + return retryable_done_check() def _execute_clbks(self): self._poll() diff --git a/google/gax/api_callable.py b/google/gax/api_callable.py index 05473c8..1fbafd4 100644 --- a/google/gax/api_callable.py +++ b/google/gax/api_callable.py @@ -35,8 +35,7 @@ from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings, config, errors, PageIterator, ResourceIterator, - RetryOptions) -from google.gax.retry import add_timeout_arg, retryable + RetryOptions, retry as rt) _MILLIS_PER_SECOND = 1000 @@ -382,10 +381,10 @@ def inner(request, options=None): """Invoke with the actual settings.""" this_settings = settings.merge(options) if this_settings.retry and this_settings.retry.retry_codes: - api_call = retryable( + api_call = rt.retryable( func, this_settings.retry, **this_settings.kwargs) else: - api_call = add_timeout_arg( + api_call = rt.add_timeout_arg( func, this_settings.timeout, **this_settings.kwargs) api_call = _catch_errors(api_call, config.API_ERRORS) return api_caller(api_call, this_settings, request) diff --git a/google/gax/retry.py b/google/gax/retry.py index 94bea27..9ebde73 100644 --- a/google/gax/retry.py +++ b/google/gax/retry.py @@ -38,35 +38,11 @@ _MILLIS_PER_SECOND = 1000 -def _retryable_without_timeout(a_func, retry_options, **kwargs): - delay_mult = retry_options.backoff_settings.retry_delay_multiplier - max_delay = (retry_options.backoff_settings.max_retry_delay_millis / - _MILLIS_PER_SECOND) - - def inner(*args): - """Equivalent to ``a_func``, but retries upon transient failure. - - Retrying is done through an exponential backoff algorithm configured - by the options in ``retry``. - """ - delay = retry_options.backoff_settings.initial_retry_delay_millis - while True: - try: - return a_func(*args, **kwargs) - except errors.RetryError as exception: - raise exception - except Exception as exception: # pylint: disable=broad-except - code = config.exc_to_code(exception) - if code not in retry_options.retry_codes: - raise errors.RetryError( - 'Exception occurred in retry method that was not' - ' classified as transient', exception) - - to_sleep = random.uniform(0, delay) - time.sleep(to_sleep / _MILLIS_PER_SECOND) - delay = min(delay * delay_mult, max_delay) - - return inner +def _has_timeout_settings(backoff_settings): + return (backoff_settings.rpc_timeout_multiplier is not None and + backoff_settings.max_rpc_timeout_millis is not None and + backoff_settings.total_timeout_millis is not None and + backoff_settings.initial_rpc_timeout_millis is not None) def add_timeout_arg(a_func, timeout, **kwargs): @@ -93,65 +69,75 @@ def inner(*args): return inner -def _retryable_with_timeout(a_func, retry_options, **kwargs): - timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier - max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / - _MILLIS_PER_SECOND) - total_timeout = (retry_options.backoff_settings.total_timeout_millis / - _MILLIS_PER_SECOND) +def retryable(a_func, retry_options, **kwargs): + """Creates a function equivalent to a_func, but that retries on certain + exceptions. + + Args: + a_func (callable): A callable. + retry_options (RetryOptions): Configures the exceptions upon which the + callable should retry, and the parameters to the exponential backoff + retry algorithm. + + Returns: + A function that will retry on exception. + """ + delay_mult = retry_options.backoff_settings.retry_delay_multiplier + max_delay = (retry_options.backoff_settings.max_retry_delay_millis / + _MILLIS_PER_SECOND) + has_timeout_settings = _has_timeout_settings(retry_options.backoff_settings) + + if has_timeout_settings: + timeout_mult = retry_options.backoff_settings.rpc_timeout_multiplier + max_timeout = (retry_options.backoff_settings.max_rpc_timeout_millis / + _MILLIS_PER_SECOND) + total_timeout = (retry_options.backoff_settings.total_timeout_millis / + _MILLIS_PER_SECOND) def inner(*args): - """Equivalent to ``a_func``, but retries with a timeout upon transient - failure. + """Equivalent to ``a_func``, but retries upon transient failure. + + Retrying is done through an exponential backoff algorithm configured + by the options in ``retry``. """ - timeout = (retry_options.backoff_settings.initial_rpc_timeout_millis / - _MILLIS_PER_SECOND) + delay = retry_options.backoff_settings.initial_retry_delay_millis exc = errors.RetryError('Retry total timeout exceeded before any' 'response was received') - now = time.time() - deadline = now + total_timeout - while now < deadline: + if has_timeout_settings: + timeout = ( + retry_options.backoff_settings.initial_rpc_timeout_millis / + _MILLIS_PER_SECOND) + + now = time.time() + deadline = now + total_timeout + else: + timeout = None + deadline = None + + while deadline is None or now < deadline: try: to_call = add_timeout_arg(a_func, timeout, **kwargs) return to_call(*args) except Exception as exception: # pylint: disable=broad-except code = config.exc_to_code(exception) if code not in retry_options.retry_codes: - raise exception + raise errors.RetryError( + 'Exception occurred in retry method that was not' + ' classified as transient', exception) # pylint: disable=redefined-variable-type exc = errors.RetryError( 'Retry total timeout exceeded with exception', exception) - now = time.time() - timeout = min( - timeout * timeout_mult, max_timeout, deadline - now) - - raise exc - - return _retryable_without_timeout(inner, retry_options, **kwargs) - - -def _has_timeout_settings(backoff_settings): - return backoff_settings.rpc_timeout_multiplier is not None and \ - backoff_settings.max_rpc_timeout_millis is not None and \ - backoff_settings.total_timeout_millis is not None and \ - backoff_settings.initial_rpc_timeout_millis is not None - -def retryable(a_func, retry_options, **kwargs): - """Creates a function equivalent to a_func, but that retries on certain - exceptions. + to_sleep = random.uniform(0, delay) + time.sleep(to_sleep / _MILLIS_PER_SECOND) + delay = min(delay * delay_mult, max_delay) - Args: - a_func (callable): A callable. - retry_options (RetryOptions): Configures the exceptions upon which the - callable should retry, and the parameters to the exponential backoff - retry algorithm. + if has_timeout_settings: + now = time.time() + timeout = min( + timeout * timeout_mult, max_timeout, deadline - now) - Returns: - A function that will retry on exception. - """ - if _has_timeout_settings(retry_options.backoff_settings): - return _retryable_with_timeout(a_func, retry_options, **kwargs) + raise exc - return _retryable_without_timeout(a_func, retry_options, **kwargs) + return inner diff --git a/tox.ini b/tox.ini index 675849b..f29a4d9 100644 --- a/tox.ini +++ b/tox.ini @@ -18,7 +18,6 @@ commands = -mkdir src-gen -py.test --timeout=30 --cov-report html --cov-report=term --cov {toxinidir}/google/gax [testenv:pep8] -usedevelop = True deps = flake8 commands = flake8 --max-complexity=10 google test --ignore=E501 --exclude=src-gen,gapic From 365de3740b011736c2c316e031aade2d0e6e0fb5 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Fri, 6 Jan 2017 14:06:31 -0800 Subject: [PATCH 17/20] Move _done_check to closure --- google/gapic/longrunning/operations_client.py | 34 +++++++++---- google/gax/__init__.py | 49 +++++++++---------- google/gax/retry.py | 2 +- test/test_api_callable.py | 3 -- test/test_retry.py | 49 ++++++++++++++++--- 5 files changed, 92 insertions(+), 45 deletions(-) diff --git a/google/gapic/longrunning/operations_client.py b/google/gapic/longrunning/operations_client.py index 1fce0dc..61838fb 100644 --- a/google/gapic/longrunning/operations_client.py +++ b/google/gapic/longrunning/operations_client.py @@ -1,16 +1,30 @@ -# Copyright 2016 Google Inc. All rights reserved. +# Copyright 2016, Google Inc. All rights reserved. # -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: # -# http://www.apache.org/licenses/LICENSE-2.0 +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. # -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # # EDITING INSTRUCTIONS # This file was generated from the file diff --git a/google/gax/__init__.py b/google/gax/__init__.py index cec94bf..b408859 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -562,7 +562,7 @@ def __init__(self, operation, client, result_type, metadata_type, def cancel(self): """If last Operation's value of `done` is true, returns false; - otherwise, issues OperationsClient.cancel_operation and returns true. + otherwise, issues OperationsClient.cancel_operation and returns true. """ if self.done(): return False @@ -572,13 +572,13 @@ def cancel(self): def result(self, timeout=None): """Enters polling loop on OperationsClient.get_operation, and once - Operation.done is true, then returns Operation.response if successful - or throws GaxError if not successful. + Operation.done is true, then returns Operation.response if successful or + throws GaxError if not successful. - This method will wait up to timeout seconds. If the call hasn't - completed in timeout seconds, then a RetryError will be raised. - timeout can be an int or float. If timeout is not specified or None, - there is no limit to the wait time. + This method will wait up to timeout seconds. If the call hasn't + completed in timeout seconds, then a RetryError will be raised. timeout + can be an int or float. If timeout is not specified or None, there is no + limit to the wait time. """ if not self._poll(timeout).HasField('response'): raise GaxError(self._operation.error.message) @@ -600,15 +600,15 @@ def cancelled(self): def done(self): """Issues OperationsClient.get_operation and returns value of - Operation.done. + Operation.done. """ return self._get_operation().done def add_done_callback(self, fn): # pylint: disable=invalid-name """Enters a polling loop on OperationsClient.get_operation, and once the - operation is done or cancelled, calls the function with this - _OperationFuture. Added callables are called in the order that they - were added. + operation is done or cancelled, calls the function with this + _OperationFuture. Added callables are called in the order that they were + added. """ self._done_clbks.append(fn) if self._process is None: @@ -618,16 +618,13 @@ def add_done_callback(self, fn): # pylint: disable=invalid-name _try_callback(self, fn) def operation_name(self): - """Returns the value of Operation.name from the last call to - OperationsClient.get_operation (or if only the initial API call has - been made, the name from that first call). - """ + """Returns the value of Operation.name.""" return self._operation.name def metadata(self): """Returns the value of Operation.metadata from the last call to - OperationsClient.get_operation (or if only the initial API call has - been made, the metadata from that first call). + OperationsClient.get_operation (or if only the initial API call has been + made, the metadata from that first call). """ if self._operation.metadata is None: return None @@ -636,8 +633,8 @@ def metadata(self): def last_operation_data(self): """Returns the data from the last call to OperationsClient.get_operation - (or if only the initial API call has been made, the data from that - first call). + (or if only the initial API call has been made, the data from that first + call). """ return self._operation @@ -648,13 +645,13 @@ def _get_operation(self): return self._operation - def _done_check(self, _timeout): - if self.done(): - return self._operation + def _poll(self, timeout=None): + def _done_check(_): + if self.done(): + return self._operation - raise _DeadlineExceededError() + raise _DeadlineExceededError() - def _poll(self, timeout=None): if timeout is None: backoff_settings = BackoffSettings( 1000, 2, 30000, None, None, None, None) @@ -664,7 +661,7 @@ def _poll(self, timeout=None): retry_options = RetryOptions( [StatusCode.DEADLINE_EXCEEDED], backoff_settings) - retryable_done_check = retryable(self._done_check, retry_options) + retryable_done_check = retryable(_done_check, retry_options) return retryable_done_check() @@ -673,3 +670,5 @@ def _execute_clbks(self): for done_clbk in self._done_clbks: _try_callback(self, done_clbk) + + self._done_clbks = [] diff --git a/google/gax/retry.py b/google/gax/retry.py index 9ebde73..e15d54f 100644 --- a/google/gax/retry.py +++ b/google/gax/retry.py @@ -1,4 +1,4 @@ -# Copyright 2016, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without diff --git a/test/test_api_callable.py b/test/test_api_callable.py index d1f5e18..354be6e 100644 --- a/test/test_api_callable.py +++ b/test/test_api_callable.py @@ -101,9 +101,6 @@ _FAKE_STATUS_CODE_1 = object() -_FAKE_STATUS_CODE_2 = object() - - class CustomException(Exception): def __init__(self, msg, code): super(CustomException, self).__init__(msg) diff --git a/test/test_retry.py b/test/test_retry.py index f4c0cea..8541c73 100644 --- a/test/test_retry.py +++ b/test/test_retry.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2017, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -42,6 +42,9 @@ _FAKE_STATUS_CODE_1 = object() +_FAKE_STATUS_CODE_2 = object() + + class CustomException(Exception): def __init__(self, msg, code): super(CustomException, self).__init__(msg) @@ -56,9 +59,10 @@ def test_retryable_without_timeout(self, mock_time, mock_exc_to_code): mock_time.return_value = 0 mock_exc_to_code.side_effect = lambda e: e.code + to_attempt = 3 mock_call = mock.Mock() - mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1), - mock.DEFAULT] + mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] * + (to_attempt - 1) + [mock.DEFAULT]) mock_call.return_value = 1729 retry_options = RetryOptions( @@ -68,7 +72,7 @@ def test_retryable_without_timeout(self, mock_time, mock_exc_to_code): my_callable = retry.retryable(mock_call, retry_options) self.assertEqual(my_callable(None), 1729) - self.assertEqual(2, mock_call.call_count) + self.assertEqual(to_attempt, mock_call.call_count) @mock.patch('google.gax.config.exc_to_code') @mock.patch('time.time') @@ -107,7 +111,36 @@ def test_retryable_when_no_codes(self, mock_time, mock_exc_to_code): my_callable = retry.retryable(mock_call, retry_options) - self.assertRaises(errors.RetryError, my_callable) + try: + my_callable(None) + except errors.RetryError as exc: + self.assertIsInstance(exc.cause, CustomException) + + self.assertEqual(1, mock_call.call_count) + + @mock.patch('google.gax.config.exc_to_code') + @mock.patch('time.time') + def test_retryable_aborts_on_unexpected_exception( + self, mock_time, mock_exc_to_code): + mock_time.return_value = 0 + mock_exc_to_code.side_effect = lambda e: e.code + + mock_call = mock.Mock() + mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_2), + mock.DEFAULT] + mock_call.return_value = 1729 + + retry_options = RetryOptions( + [_FAKE_STATUS_CODE_1], + BackoffSettings(0, 0, 0, 0, 0, 0, 1)) + + my_callable = retry.retryable(mock_call, retry_options) + + try: + my_callable(None) + except errors.RetryError as exc: + self.assertIsInstance(exc.cause, CustomException) + self.assertEqual(1, mock_call.call_count) @mock.patch('google.gax.config.exc_to_code') @@ -134,7 +167,11 @@ def api_call(timeout): my_callable = retry.retryable(mock_call, retry_options) - self.assertRaises(errors.RetryError, my_callable) + try: + my_callable() + except errors.RetryError as exc: + self.assertIsInstance(exc.cause, CustomException) + self.assertGreaterEqual(mock_time(), params.total_timeout_millis / _MILLIS_PER_SEC) From ebc4bfd5b6a12906ac093e5b781c36570d7ea2b2 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Fri, 6 Jan 2017 14:10:14 -0800 Subject: [PATCH 18/20] Change google-auth back to 0.2.0 --- test-requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test-requirements.txt b/test-requirements.txt index a0fb2bd..37221da 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,7 +4,7 @@ pytest-cov>=1.8.1 pytest-timeout>=1.0.0 unittest2>=1.1.0 grpcio-tools>=1.0.0 -google-auth>=0.5.0 +google-auth>=0.2.0 requests>=2.11.1 httplib2>=0.9.2 google-auth-httplib2>=0.0.1 From e269ad03a445244bd2e0297785af7517d38803b7 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Fri, 6 Jan 2017 15:21:10 -0800 Subject: [PATCH 19/20] Change _done_clbks to a deque --- google/gax/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/google/gax/__init__.py b/google/gax/__init__.py index b408859..c117542 100644 --- a/google/gax/__init__.py +++ b/google/gax/__init__.py @@ -557,7 +557,7 @@ def __init__(self, operation, client, result_type, metadata_type, self._result_type = result_type self._metadata_type = metadata_type self._call_options = call_options - self._done_clbks = [] + self._done_clbks = collections.deque() self._process = None def cancel(self): @@ -610,12 +610,14 @@ def add_done_callback(self, fn): # pylint: disable=invalid-name _OperationFuture. Added callables are called in the order that they were added. """ - self._done_clbks.append(fn) if self._process is None: + self._done_clbks.append(fn) self._process = mp.Process(target=self._execute_clbks) self._process.start() elif not self._process.is_alive() and self._operation.done: _try_callback(self, fn) + else: + self._done_clbks.append(fn) def operation_name(self): """Returns the value of Operation.name.""" @@ -668,7 +670,6 @@ def _done_check(_): def _execute_clbks(self): self._poll() - for done_clbk in self._done_clbks: + while self._done_clbks: + done_clbk = self._done_clbks.popleft() _try_callback(self, done_clbk) - - self._done_clbks = [] From ea4866c117a298bc9c2683d975925df4595b3be8 Mon Sep 17 00:00:00 2001 From: Eva Ogbe Date: Tue, 10 Jan 2017 12:17:05 -0800 Subject: [PATCH 20/20] Renamed retry to retry_options to avoid naming collision --- google/gax/api_callable.py | 20 ++++++++++---------- test/test_retry.py | 3 +++ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/google/gax/api_callable.py b/google/gax/api_callable.py index 1fbafd4..73a0523 100644 --- a/google/gax/api_callable.py +++ b/google/gax/api_callable.py @@ -35,7 +35,7 @@ from google.gax import (BackoffSettings, BundleOptions, bundling, _CallSettings, config, errors, PageIterator, ResourceIterator, - RetryOptions, retry as rt) + RetryOptions, retry) _MILLIS_PER_SECOND = 1000 @@ -163,13 +163,13 @@ def _construct_retry(method_config, retry_codes, retry_params, retry_names): return RetryOptions(retry_codes=codes, backoff_settings=backoff_settings) -def _merge_retry_options(retry, overrides): +def _merge_retry_options(retry_options, overrides): """Helper for ``construct_settings()``. Takes two retry options, and merges them into a single RetryOption instance. Args: - retry: The base RetryOptions. + retry_options: The base RetryOptions. overrides: The RetryOptions used for overriding ``retry``. Use the values if it is not None. If entire ``overrides`` is None, ignore the base retry and return None. @@ -181,12 +181,12 @@ def _merge_retry_options(retry, overrides): return None if overrides.retry_codes is None and overrides.backoff_settings is None: - return retry + return retry_options - codes = retry.retry_codes + codes = retry_options.retry_codes if overrides.retry_codes is not None: codes = overrides.retry_codes - backoff_settings = retry.backoff_settings + backoff_settings = retry_options.backoff_settings if overrides.backoff_settings is not None: backoff_settings = overrides.backoff_settings @@ -309,14 +309,14 @@ def construct_settings( bundling_config = overriding_method['bundling'] bundler = _construct_bundling(bundling_config, bundle_descriptor) - retry = _merge_retry_options( + retry_options = _merge_retry_options( _construct_retry(method_config, service_config['retry_codes'], service_config['retry_params'], retry_names), _construct_retry(overriding_method, overrides.get('retry_codes'), overrides.get('retry_params'), retry_names)) defaults[snake_name] = _CallSettings( - timeout=timeout, retry=retry, + timeout=timeout, retry=retry_options, page_descriptor=page_descriptors.get(snake_name), bundler=bundler, bundle_descriptor=bundle_descriptor, kwargs=kwargs) @@ -381,10 +381,10 @@ def inner(request, options=None): """Invoke with the actual settings.""" this_settings = settings.merge(options) if this_settings.retry and this_settings.retry.retry_codes: - api_call = rt.retryable( + api_call = retry.retryable( func, this_settings.retry, **this_settings.kwargs) else: - api_call = rt.add_timeout_arg( + api_call = retry.add_timeout_arg( func, this_settings.timeout, **this_settings.kwargs) api_call = _catch_errors(api_call, config.API_ERRORS) return api_caller(api_call, this_settings, request) diff --git a/test/test_retry.py b/test/test_retry.py index 8541c73..c316a8e 100644 --- a/test/test_retry.py +++ b/test/test_retry.py @@ -113,6 +113,7 @@ def test_retryable_when_no_codes(self, mock_time, mock_exc_to_code): try: my_callable(None) + self.fail('Should not have been reached') except errors.RetryError as exc: self.assertIsInstance(exc.cause, CustomException) @@ -138,6 +139,7 @@ def test_retryable_aborts_on_unexpected_exception( try: my_callable(None) + self.fail('Should not have been reached') except errors.RetryError as exc: self.assertIsInstance(exc.cause, CustomException) @@ -169,6 +171,7 @@ def api_call(timeout): try: my_callable() + self.fail('Should not have been reached') except errors.RetryError as exc: self.assertIsInstance(exc.cause, CustomException)