Skip to content

Commit

Permalink
Added sample for Pub/Sub synchronous pull subscriber [(#1673)](Google…
Browse files Browse the repository at this point in the history
…CloudPlatform/python-docs-samples#1673)

* Added sample for synchronous pull
  • Loading branch information
anguillanneuf authored and plamut committed Jul 10, 2020
1 parent 5e45c05 commit a885414
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 0 deletions.
45 changes: 45 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def create_push_subscription(project,
def delete_subscription(project, subscription_name):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand Down Expand Up @@ -138,6 +140,8 @@ def receive_messages(project, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_async_pull]
# [START pubsub_quickstart_subscriber]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -160,6 +164,8 @@ def callback(message):
def receive_messages_with_custom_attributes(project, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -186,6 +192,8 @@ def callback(message):
def receive_messages_with_flow_control(project, subscription_name):
"""Receives messages from a pull subscription with flow control."""
# [START pubsub_subscriber_flow_settings]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -207,9 +215,38 @@ def callback(message):
# [END pubsub_subscriber_flow_settings]


def receive_messages_synchronously(project, subscription_name):
"""Pulling messages synchronously."""
# [START pubsub_subscriber_sync_pull]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

# Builds a pull request with a specific number of messages to return.
# `return_immediately` is set to False so that the system waits (for a
# bounded amount of time) until at lease one message is available.
response = subscriber.pull(
subscription_path,
max_messages=3,
return_immediately=False)

ack_ids = []
for received_message in response.received_messages:
print("Received: {}".format(received_message.message.data))
ack_ids.append(received_message.ack_id)

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)
# [END pubsub_subscriber_sync_pull]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
# project = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand Down Expand Up @@ -281,6 +318,11 @@ def callback(message):
help=receive_messages_with_flow_control.__doc__)
receive_with_flow_control_parser.add_argument('subscription_name')

receive_messages_synchronously_parser = subparsers.add_parser(
'receive-synchronously',
help=receive_messages_synchronously.__doc__)
receive_messages_synchronously_parser.add_argument('subscription_name')

listen_for_errors_parser = subparsers.add_parser(
'listen_for_errors', help=listen_for_errors.__doc__)
listen_for_errors_parser.add_argument('subscription_name')
Expand Down Expand Up @@ -314,5 +356,8 @@ def callback(message):
elif args.command == 'receive-flow-control':
receive_messages_with_flow_control(
args.project, args.subscription_name)
elif args.command == 'receive-synchronously':
receive_messages_synchronously(
args.project, args.subscription_name)
elif args.command == 'listen_for_errors':
listen_for_errors(args.project, args.subscription_name)
15 changes: 15 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,18 @@ def test_receive_with_flow_control(
assert 'Listening' in out
assert subscription in out
assert 'Message 1' in out


def test_receive_synchronously(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_flow_control(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Message 1' in out
assert 'Message 2' in out
assert 'Message 3' in out

0 comments on commit a885414

Please sign in to comment.