Skip to content

Commit

Permalink
Merge branch 'main' into protobufVersion
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu authored Jun 19, 2024
2 parents 0f9f74c + 0b8d7b9 commit 78ee38b
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .release-please-manifest.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
".": "2.21.3"
".": "2.21.4"
}

7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
[1]: https://pypi.org/project/google-cloud-pubsub/#history


## [2.21.4](https://github.com/googleapis/python-pubsub/compare/v2.21.3...v2.21.4) (2024-06-18)


### Documentation

* **samples:** Add code sample for optimistic subscribe ([#1182](https://github.com/googleapis/python-pubsub/issues/1182)) ([d8e8aa5](https://github.com/googleapis/python-pubsub/commit/d8e8aa59ab0288fdaf5a1cc5e476581e73d0f82c))

## [2.21.3](https://github.com/googleapis/python-pubsub/compare/v2.21.2...v2.21.3) (2024-06-10)


Expand Down
2 changes: 1 addition & 1 deletion google/pubsub/gapic_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__version__ = "2.21.3" # {x-release-please-version}
__version__ = "2.21.4" # {x-release-please-version}
2 changes: 1 addition & 1 deletion google/pubsub_v1/gapic_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
__version__ = "2.21.3" # {x-release-please-version}
__version__ = "2.21.4" # {x-release-please-version}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
],
"language": "PYTHON",
"name": "google-cloud-pubsub",
"version": "2.21.3"
"version": "2.21.4"
},
"snippets": [
{
Expand Down
93 changes: 93 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,86 @@ def create_subscription(project_id: str, topic_id: str, subscription_id: str) ->
# [END pubsub_create_pull_subscription]


def optimistic_subscribe(
project_id: str,
topic_id: str,
subscription_id: str,
timeout: Optional[float] = None,
) -> None:
"""Optimistically subscribe to messages instead of making calls to verify existence
of a subscription first and then subscribing to messages from it. This avoids admin
operation calls to verify the existence of a subscription and reduces the probability
of running out of quota for admin operations."""
# [START pubsub_optimistic_subscribe]
from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it.
message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
print("Successfully subscribed until the timeout passed.")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except NotFound:
print(f"Subscription {subscription_path} not found, creating it.")

try:
# If the subscription does not exist, then create it.
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription = subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)

if subscription:
print(f"Subscription {subscription.name} created")
else:
raise ValueError("Subscription creation failed.")

# Subscribe on the created subscription.
try:
streaming_pull_future = subscriber.subscribe(
subscription.name, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
except Exception as e:
print(
f"Exception occurred when creating subscription and subscribing to it: {e}"
)
except Exception as e:
print(f"Exception occurred when attempting optimistic subscribe: {e}")
# [END pubsub_optimistic_subscribe]


def create_subscription_with_dead_letter_topic(
project_id: str,
topic_id: str,
Expand Down Expand Up @@ -1161,6 +1241,15 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
remove_dead_letter_policy_parser.add_argument("topic_id")
remove_dead_letter_policy_parser.add_argument("subscription_id")

optimistic_subscribe_parser = subparsers.add_parser(
"optimistic-subscribe", help=optimistic_subscribe.__doc__
)
optimistic_subscribe_parser.add_argument("topic_id")
optimistic_subscribe_parser.add_argument("subscription_id")
optimistic_subscribe_parser.add_argument(
"timeout", default=None, type=float, nargs="?"
)

receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__)
receive_parser.add_argument("subscription_id")
receive_parser.add_argument("timeout", default=None, type=float, nargs="?")
Expand Down Expand Up @@ -1303,6 +1392,10 @@ def callback(message: pubsub_v1.subscriber.message.Message) -> None:
)
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
elif args.command == "optimistic-subscribe":
optimistic_subscribe(
args.project_id, args.topic_id, args.subscription_id, args.timeout
)
elif args.command == "receive":
receive_messages(args.project_id, args.subscription_id, args.timeout)
elif args.command == "receive-custom-attributes":
Expand Down
50 changes: 50 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,56 @@ def test_create_subscription(
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_optimistic_subscribe(
subscriber_client: pubsub_v1.SubscriberClient,
topic: str,
publisher_client: pubsub_v1.PublisherClient,
capsys: CaptureFixture[str],
) -> None:
subscription_id = f"subscription_for_optimistic_subscribe-{PY_VERSION}-{UUID}"
subscription_path = subscriber_client.subscription_path(PROJECT_ID, subscription_id)
# Ensure there is no pre-existing subscription.
# So that we can test the case where optimistic subscribe fails.
try:
subscriber_client.delete_subscription(
request={"subscription": subscription_path}
)
except NotFound:
pass

# Invoke optimistic_subscribe when the subscription is not present.
# This tests scenario where optimistic subscribe fails.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)
out, _ = capsys.readouterr()
# Verify optimistic subscription failed.
assert f"Subscription {subscription_path} not found, creating it." in out
# Verify that subscription created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" in out
# Verify that subscription didn't already exist.
assert "Successfully subscribed until the timeout passed." not in out

# Invoke optimistic_subscribe when the subscription is present.
# This tests scenario where optimistic subscribe succeeds.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, subscription_id, 5)

out, _ = capsys.readouterr()
# Verify optimistic subscription succeeded.
assert f"Subscription {subscription_path} not found, creating it." not in out
# Verify that subscription was not created due to optimistic subscribe failure.
assert f"Subscription {subscription_path} created" not in out
# Verify that subscription already existed.
assert "Successfully subscribed until the timeout passed." in out

# Test case where optimistic subscribe throws an exception other than NotFound
# or TimeoutError.
subscriber.optimistic_subscribe(PROJECT_ID, TOPIC, "123", 5)
out, _ = capsys.readouterr()
assert "Exception occurred when attempting optimistic subscribe:" in out

# Clean up resources created during test.
subscriber_client.delete_subscription(request={"subscription": subscription_path})


def test_create_subscription_with_dead_letter_policy(
subscriber_client: pubsub_v1.SubscriberClient,
dead_letter_topic: str,
Expand Down

0 comments on commit 78ee38b

Please sign in to comment.