Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for message transforms to Topic and Subscription #1274

Merged
merged 8 commits into from
Jan 30, 2025
6 changes: 6 additions & 0 deletions google/pubsub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from google.pubsub_v1.types.pubsub import GetSubscriptionRequest
from google.pubsub_v1.types.pubsub import GetTopicRequest
from google.pubsub_v1.types.pubsub import IngestionDataSourceSettings
from google.pubsub_v1.types.pubsub import IngestionFailureEvent
from google.pubsub_v1.types.pubsub import JavaScriptUDF
from google.pubsub_v1.types.pubsub import ListSnapshotsRequest
from google.pubsub_v1.types.pubsub import ListSnapshotsResponse
from google.pubsub_v1.types.pubsub import ListSubscriptionsRequest
Expand All @@ -53,6 +55,7 @@
from google.pubsub_v1.types.pubsub import ListTopicSubscriptionsRequest
from google.pubsub_v1.types.pubsub import ListTopicSubscriptionsResponse
from google.pubsub_v1.types.pubsub import MessageStoragePolicy
from google.pubsub_v1.types.pubsub import MessageTransform
from google.pubsub_v1.types.pubsub import ModifyAckDeadlineRequest
from google.pubsub_v1.types.pubsub import ModifyPushConfigRequest
from google.pubsub_v1.types.pubsub import PlatformLogsSettings
Expand Down Expand Up @@ -115,6 +118,8 @@
"GetSubscriptionRequest",
"GetTopicRequest",
"IngestionDataSourceSettings",
"IngestionFailureEvent",
"JavaScriptUDF",
"ListSnapshotsRequest",
"ListSnapshotsResponse",
"ListSubscriptionsRequest",
Expand All @@ -126,6 +131,7 @@
"ListTopicSubscriptionsRequest",
"ListTopicSubscriptionsResponse",
"MessageStoragePolicy",
"MessageTransform",
"ModifyAckDeadlineRequest",
"ModifyPushConfigRequest",
"PlatformLogsSettings",
Expand Down
6 changes: 6 additions & 0 deletions google/pubsub_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
from .types.pubsub import GetSubscriptionRequest
from .types.pubsub import GetTopicRequest
from .types.pubsub import IngestionDataSourceSettings
from .types.pubsub import IngestionFailureEvent
from .types.pubsub import JavaScriptUDF
from .types.pubsub import ListSnapshotsRequest
from .types.pubsub import ListSnapshotsResponse
from .types.pubsub import ListSubscriptionsRequest
Expand All @@ -51,6 +53,7 @@
from .types.pubsub import ListTopicSubscriptionsRequest
from .types.pubsub import ListTopicSubscriptionsResponse
from .types.pubsub import MessageStoragePolicy
from .types.pubsub import MessageTransform
from .types.pubsub import ModifyAckDeadlineRequest
from .types.pubsub import ModifyPushConfigRequest
from .types.pubsub import PlatformLogsSettings
Expand Down Expand Up @@ -116,6 +119,8 @@
"GetSubscriptionRequest",
"GetTopicRequest",
"IngestionDataSourceSettings",
"IngestionFailureEvent",
"JavaScriptUDF",
"ListSchemaRevisionsRequest",
"ListSchemaRevisionsResponse",
"ListSchemasRequest",
Expand All @@ -131,6 +136,7 @@
"ListTopicsRequest",
"ListTopicsResponse",
"MessageStoragePolicy",
"MessageTransform",
"ModifyAckDeadlineRequest",
"ModifyPushConfigRequest",
"PlatformLogsSettings",
Expand Down
20 changes: 5 additions & 15 deletions google/pubsub_v1/services/publisher/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,11 +1366,7 @@ async def set_iam_policy(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.set_iam_policy,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self.transport._wrapped_methods[self._client._transport.set_iam_policy]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down Expand Up @@ -1491,11 +1487,7 @@ async def get_iam_policy(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.get_iam_policy,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self.transport._wrapped_methods[self._client._transport.get_iam_policy]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down Expand Up @@ -1554,11 +1546,9 @@ async def test_iam_permissions(

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
self._client._transport.test_iam_permissions,
default_timeout=None,
client_info=DEFAULT_CLIENT_INFO,
)
rpc = self.transport._wrapped_methods[
self._client._transport.test_iam_permissions
]

# Certain fields should be provided within the metadata header;
# add these here.
Expand Down
40 changes: 3 additions & 37 deletions google/pubsub_v1/services/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,36 +511,6 @@ def _get_universe_domain(
raise ValueError("Universe Domain cannot be an empty string.")
return universe_domain

@staticmethod
def _compare_universes(
client_universe: str, credentials: ga_credentials.Credentials
) -> bool:
"""Returns True iff the universe domains used by the client and credentials match.

Args:
client_universe (str): The universe domain configured via the client options.
credentials (ga_credentials.Credentials): The credentials being used in the client.

Returns:
bool: True iff client_universe matches the universe in credentials.

Raises:
ValueError: when client_universe does not match the universe in credentials.
"""

default_universe = PublisherClient._DEFAULT_UNIVERSE
credentials_universe = getattr(credentials, "universe_domain", default_universe)

if client_universe != credentials_universe:
raise ValueError(
"The configured universe domain "
f"({client_universe}) does not match the universe domain "
f"found in the credentials ({credentials_universe}). "
"If you haven't configured the universe domain explicitly, "
f"`{default_universe}` is the default."
)
return True

def _validate_universe_domain(self):
"""Validates client's and credentials' universe domains are consistent.

Expand All @@ -550,13 +520,9 @@ def _validate_universe_domain(self):
Raises:
ValueError: If the configured universe domain is not valid.
"""
self._is_universe_domain_valid = (
self._is_universe_domain_valid
or PublisherClient._compare_universes(
self.universe_domain, self.transport._credentials
)
)
return self._is_universe_domain_valid

# NOTE (b/349488459): universe validation is disabled until further notice.
return True

@property
def api_endpoint(self):
Expand Down
9 changes: 9 additions & 0 deletions google/pubsub_v1/services/publisher/transports/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

transport inheritance structure
_______________________________

`PublisherTransport` is the ABC for all transports.
- public child `PublisherGrpcTransport` for sync gRPC transport (defined in `grpc.py`).
- public child `PublisherGrpcAsyncIOTransport` for async gRPC transport (defined in `grpc_asyncio.py`).
- private child `_BasePublisherRestTransport` for base REST transport with inner classes `_BaseMETHOD` (defined in `rest_base.py`).
- public child `PublisherRestTransport` for sync REST transport with inner classes `METHOD` derived from the parent's corresponding `_BaseMETHOD` classes (defined in `rest.py`).
15 changes: 15 additions & 0 deletions google/pubsub_v1/services/publisher/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,21 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.get_iam_policy: gapic_v1.method.wrap_method(
self.get_iam_policy,
default_timeout=None,
client_info=client_info,
),
self.set_iam_policy: gapic_v1.method.wrap_method(
self.set_iam_policy,
default_timeout=None,
client_info=client_info,
),
self.test_iam_permissions: gapic_v1.method.wrap_method(
self.test_iam_permissions,
default_timeout=None,
client_info=client_info,
),
}

def close(self):
Expand Down
46 changes: 37 additions & 9 deletions google/pubsub_v1/services/publisher/transports/grpc_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import warnings
from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union

Expand Down Expand Up @@ -233,6 +234,9 @@ def __init__(
)

# Wrap messages. This must be done after self._grpc_channel exists
self._wrap_with_kind = (
"kind" in inspect.signature(gapic_v1.method_async.wrap_method).parameters
)
self._prep_wrapped_messages(client_info)

@property
Expand Down Expand Up @@ -585,7 +589,7 @@ def test_iam_permissions(
def _prep_wrapped_messages(self, client_info):
"""Precompute the wrapped methods, overriding the base class method to use async wrappers."""
self._wrapped_methods = {
self.create_topic: gapic_v1.method_async.wrap_method(
self.create_topic: self._wrap_method(
self.create_topic,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -599,7 +603,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.update_topic: gapic_v1.method_async.wrap_method(
self.update_topic: self._wrap_method(
self.update_topic,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -613,7 +617,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.publish: gapic_v1.method_async.wrap_method(
self.publish: self._wrap_method(
self.publish,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -633,7 +637,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.get_topic: gapic_v1.method_async.wrap_method(
self.get_topic: self._wrap_method(
self.get_topic,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -649,7 +653,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.list_topics: gapic_v1.method_async.wrap_method(
self.list_topics: self._wrap_method(
self.list_topics,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -665,7 +669,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.list_topic_subscriptions: gapic_v1.method_async.wrap_method(
self.list_topic_subscriptions: self._wrap_method(
self.list_topic_subscriptions,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -681,7 +685,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.list_topic_snapshots: gapic_v1.method_async.wrap_method(
self.list_topic_snapshots: self._wrap_method(
self.list_topic_snapshots,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -697,7 +701,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.delete_topic: gapic_v1.method_async.wrap_method(
self.delete_topic: self._wrap_method(
self.delete_topic,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -711,7 +715,7 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.detach_subscription: gapic_v1.method_async.wrap_method(
self.detach_subscription: self._wrap_method(
self.detach_subscription,
default_retry=retries.AsyncRetry(
initial=0.1,
Expand All @@ -725,10 +729,34 @@ def _prep_wrapped_messages(self, client_info):
default_timeout=60.0,
client_info=client_info,
),
self.get_iam_policy: self._wrap_method(
self.get_iam_policy,
default_timeout=None,
client_info=client_info,
),
self.set_iam_policy: self._wrap_method(
self.set_iam_policy,
default_timeout=None,
client_info=client_info,
),
self.test_iam_permissions: self._wrap_method(
self.test_iam_permissions,
default_timeout=None,
client_info=client_info,
),
}

def _wrap_method(self, func, *args, **kwargs):
if self._wrap_with_kind: # pragma: NO COVER
kwargs["kind"] = self.kind
return gapic_v1.method_async.wrap_method(func, *args, **kwargs)

def close(self):
return self.grpc_channel.close()

@property
def kind(self) -> str:
return "grpc_asyncio"


__all__ = ("PublisherGrpcAsyncIOTransport",)
Loading