Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process: make sure type annoations pass with mypy #542

Merged
merged 21 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion google/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

try:
import pkg_resources

pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil

__path__ = pkgutil.extend_path(__path__, __name__)
__path__: List[str] = pkgutil.extend_path(__path__, __name__) # type: ignore
4 changes: 2 additions & 2 deletions google/cloud/pubsub_v1/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import absolute_import

import concurrent.futures
from typing import Any, NoReturn
from typing import Any, NoReturn, Optional

import google.api_core.future

Expand Down Expand Up @@ -47,7 +47,7 @@ def set_result(self, result: Any):
"""
return super().set_result(result=result)

def set_exception(self, exception: Exception):
def set_exception(self, exception: Optional[BaseException]):
"""Set the result of the future as being the given exception.

Do not use this method, it should only be used internally by the library and its
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/publisher/_batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __len__(self):

@staticmethod
@abc.abstractmethod
def make_lock() -> None: # pragma: NO COVER
def make_lock(): # pragma: NO COVER
"""Return a lock in the chosen concurrency model.

Returns:
Expand Down
17 changes: 8 additions & 9 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import threading
import time
import typing
from typing import Any, Callable, Optional, Sequence
from typing import Any, Callable, List, Optional, Sequence

import google.api_core.exceptions
from google.api_core import gapic_v1
Expand All @@ -28,11 +28,10 @@
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
from google import api_core
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1 import PublisherClient

from google.cloud.pubsub_v1.publisher import Client as PublisherClient
from google.pubsub_v1.services.publisher.client import OptionalRetry

_LOGGER = logging.getLogger(__name__)
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
Expand Down Expand Up @@ -93,8 +92,8 @@ def __init__(
settings: "types.BatchSettings",
batch_done_callback: Callable[[bool], Any] = None,
commit_when_full: bool = True,
commit_retry: "api_core.retry.Retry" = gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
):
self._client = client
self._topic = topic
Expand All @@ -108,8 +107,8 @@ def __init__(
# _futures list should remain unchanged after batch
# status changed from ACCEPTING_MESSAGES to any other
# in order to avoid race conditions
self._futures = []
self._messages = []
self._futures: List[futures.Future] = []
self._messages: List[gapic_types.PubsubMessage] = []
self._status = base.BatchStatus.ACCEPTING_MESSAGES

# The initial size is not zero, we need to account for the size overhead
Expand Down Expand Up @@ -368,7 +367,7 @@ def publish(
), "Publish after stop() or publish error."

if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
return
return None

size_increase = gapic_types.PublishRequest(
messages=[message]
Expand Down
7 changes: 2 additions & 5 deletions google/cloud/pubsub_v1/publisher/_sequencer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@

if typing.TYPE_CHECKING: # pragma: NO COVER
from concurrent import futures
from google.api_core import retry
from google.pubsub_v1.services.publisher.client import OptionalRetry


class Sequencer(metaclass=abc.ABCMeta):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
sequences messages to be published.
"""

@staticmethod
@abc.abstractmethod
def is_finished(self) -> bool: # pragma: NO COVER
""" Whether the sequencer is finished and should be cleaned up.
Expand All @@ -40,7 +39,6 @@ def is_finished(self) -> bool: # pragma: NO COVER
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def unpause(self) -> None: # pragma: NO COVER
""" Unpauses this sequencer.
Expand All @@ -51,12 +49,11 @@ def unpause(self) -> None: # pragma: NO COVER
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def publish(
self,
message: gapic_types.PubsubMessage,
retry: "retry.Retry" = None,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
) -> "futures.Future": # pragma: NO COVER
""" Publish message for this ordering key.
Expand Down
25 changes: 13 additions & 12 deletions google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@

import enum
import collections
import concurrent.futures as futures
import threading
import typing
from typing import Iterable, Sequence
from typing import Deque, Iterable, Sequence

from google.api_core import gapic_v1
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
from google.cloud.pubsub_v1.publisher._batch import base as batch_base
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.api_core import retry
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import _batch
from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
from google.pubsub_v1.services.publisher.client import OptionalRetry


class _OrderedSequencerStatus(str, enum.Enum):
Expand Down Expand Up @@ -101,7 +102,7 @@ def __init__(self, client: "PublisherClient", topic: str, ordering_key: str):
# Batches ordered from first (head/left) to last (right/tail).
# Invariant: always has at least one batch after the first publish,
# unless paused or stopped.
self._ordered_batches = collections.deque()
self._ordered_batches: Deque["_batch.thread.Batch"] = collections.deque()
# See _OrderedSequencerStatus for valid state transitions.
self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES

Expand Down Expand Up @@ -237,8 +238,8 @@ def unpause(self) -> None:

def _create_batch(
self,
commit_retry: "retry.Retry" = gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> "_batch.thread.Batch":
""" Create a new batch using the client's batch class and other stored
settings.
Expand All @@ -262,8 +263,8 @@ def _create_batch(
def publish(
self,
message: gapic_types.PubsubMessage,
retry: "retry.Retry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> futures.Future:
""" Publish message for this ordering key.

Expand All @@ -289,12 +290,12 @@ def publish(
"""
with self._state_lock:
if self._state == _OrderedSequencerStatus.PAUSED:
future = futures.Future()
errored_future = futures.Future()
exception = exceptions.PublishToPausedOrderingKeyException(
self._ordering_key
)
future.set_exception(exception)
return future
errored_future.set_exception(exception)
return errored_future

# If waiting to be cleaned-up, convert to accepting messages to
# prevent this sequencer from being cleaned-up only to have another
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
# limitations under the License.

import typing
from typing import Optional

from google.api_core import gapic_v1

from google.cloud.pubsub_v1.publisher._sequencer import base
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
from concurrent import futures
from google.api_core import retry
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.publisher import _batch
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
from google.pubsub_v1.services.publisher.client import OptionalRetry

from google.cloud.pubsub_v1 import types


class UnorderedSequencer(base.Sequencer):
Expand All @@ -35,7 +38,7 @@ class UnorderedSequencer(base.Sequencer):
def __init__(self, client: "PublisherClient", topic: str):
self._client = client
self._topic = topic
self._current_batch = None
self._current_batch: Optional["_batch.thread.Batch"] = None
self._stopped = False

def is_finished(self) -> bool:
Expand Down Expand Up @@ -88,8 +91,8 @@ def unpause(self) -> typing.NoReturn:

def _create_batch(
self,
commit_retry: "retry.Retry" = gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> "_batch.thread.Batch":
""" Create a new batch using the client's batch class and other stored
settings.
Expand All @@ -113,8 +116,8 @@ def _create_batch(
def publish(
self,
message: gapic_types.PubsubMessage,
retry: "retry.Retry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> "futures.Future":
""" Batch message into existing or new batch.

Expand Down
31 changes: 18 additions & 13 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import threading
import time
import typing
from typing import Any, Sequence, Type, Union
from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials
from google.oauth2 import service_account
from google.auth.credentials import AnonymousCredentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
Expand All @@ -46,10 +46,9 @@
__version__ = "0.0"

if typing.TYPE_CHECKING: # pragma: NO COVER
from google import api_core
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.publisher._sequencer.base import Sequencer
from google.cloud.pubsub_v1.publisher import _batch
from google.pubsub_v1.services.publisher.client import OptionalRetry


_LOGGER = logging.getLogger(__name__)
Expand All @@ -62,6 +61,10 @@

_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()

SequencerType = Union[
ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer
]


@_gapic.add_methods(publisher_client.PublisherClient, denylist=_DENYLISTED_METHODS)
class Client(object):
Expand Down Expand Up @@ -152,10 +155,10 @@ def __init__(
# messages. One batch exists for each topic.
self._batch_lock = self._batch_class.make_lock()
# (topic, ordering_key) => sequencers object
self._sequencers = {}
self._sequencers: Dict[Tuple[str, str], SequencerType] = {}
self._is_stopped = False
# Thread created to commit all sequencers after a timeout.
self._commit_thread = None
self._commit_thread: Optional[threading.Thread] = None

# The object controlling the message publishing flow
self._flow_controller = FlowController(self.publisher_options.flow_control)
Expand Down Expand Up @@ -196,7 +199,7 @@ def target(self) -> str:
"""
return self._target

def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> "Sequencer":
def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType:
""" Get an existing sequencer or create a new one given the (topic,
ordering_key) pair.
"""
Expand Down Expand Up @@ -254,8 +257,8 @@ def publish(
topic: str,
data: bytes,
ordering_key: str = "",
retry: "api_core.retry.Retry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
**attrs: Union[bytes, str],
) -> "pubsub_v1.publisher.futures.Future":
"""Publish a single message.
Expand Down Expand Up @@ -380,8 +383,10 @@ def on_publish_done(future):
if retry is gapic_v1.method.DEFAULT:
# use the default retry for the publish GRPC method as a base
transport = self.api._transport
retry = transport._wrapped_methods[transport.publish]._retry
retry = retry.with_deadline(2.0 ** 32)
base_retry = transport._wrapped_methods[transport.publish]._retry
retry = base_retry.with_deadline(2.0 ** 32)
else:
retry = retry.with_deadline(2.0 ** 32)

# Delegate the publishing to the sequencer.
sequencer = self._get_or_create_sequencer(topic, ordering_key)
Expand Down Expand Up @@ -490,7 +495,7 @@ def _set_batch_class(self, batch_class: Type) -> None:

# Used only for testing.
def _set_sequencer(
self, topic: str, sequencer: "Sequencer", ordering_key: str = ""
self, topic: str, sequencer: SequencerType, ordering_key: str = ""
) -> None:
sequencer_key = (topic, ordering_key)
self._sequencers[sequencer_key] = sequencer
Loading