diff --git a/pubsub/cloud-client/quickstart/pub.py b/pubsub/cloud-client/quickstart/pub.py new file mode 100644 index 000000000000..9617b34ea846 --- /dev/null +++ b/pubsub/cloud-client/quickstart/pub.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START pubsub_quickstart_pub_all] +import argparse +import time +# [START pubsub_quickstart_pub_deps] +from google.cloud import pubsub_v1 +# [END pubsub_quickstart_pub_deps] + + +def get_callback(api_future, data): + """Wrap message data in the context of the callback function.""" + + def callback(api_future): + try: + print("Published message {} now has message ID {}".format( + data, api_future.result())) + except Exception: + print("A problem occurred when publishing {}: {}\n".format( + data, api_future.exception())) + raise + return callback + + +def pub(project_id, topic_name): + """Publishes a message to a Pub/Sub topic.""" + # [START pubsub_quickstart_pub_client] + # Initialize a Publisher client + client = pubsub_v1.PublisherClient() + # [END pubsub_quickstart_pub_client] + # Create a fully qualified identifier in the form of + # `projects/{project_id}/topics/{topic_name}` + topic_path = client.topic_path(project_id, topic_name) + + # Data sent to Cloud Pub/Sub must be a bytestring + data = b"Hello, World!" + + # When you publish a message, the client returns a future. + api_future = client.publish(topic_path, data=data) + api_future.add_done_callback(get_callback(api_future, data)) + + # Keep the main thread from exiting until background message + # is processed. + while api_future.running(): + time.sleep(0.1) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('project_id', help='Google Cloud project ID') + parser.add_argument('topic_name', help='Pub/Sub topic name') + + args = parser.parse_args() + + pub(args.project_id, args.topic_name) +# [END pubsub_quickstart_pub_all] diff --git a/pubsub/cloud-client/quickstart/pub_test.py b/pubsub/cloud-client/quickstart/pub_test.py new file mode 100644 index 000000000000..09443364a3f6 --- /dev/null +++ b/pubsub/cloud-client/quickstart/pub_test.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pytest + +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 + +import pub + +PROJECT = os.environ['GCLOUD_PROJECT'] +TOPIC = 'quickstart-pub-test-topic' + + +@pytest.fixture(scope='module') +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope='module') +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.create_topic(topic_path) + except AlreadyExists: + pass + + yield TOPIC + + +@pytest.fixture +def to_delete(publisher_client): + doomed = [] + yield doomed + for item in doomed: + publisher_client.delete_topic(item) + + +def test_pub(publisher_client, topic, to_delete, capsys): + pub.pub(PROJECT, topic) + + to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC)) + + out, _ = capsys.readouterr() + + assert "Published message b'Hello, World!'" in out diff --git a/pubsub/cloud-client/quickstart/sub.py b/pubsub/cloud-client/quickstart/sub.py new file mode 100644 index 000000000000..520803d70a5b --- /dev/null +++ b/pubsub/cloud-client/quickstart/sub.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START pubsub_quickstart_sub_all] +import argparse +import time +# [START pubsub_quickstart_sub_deps] +from google.cloud import pubsub_v1 +# [END pubsub_quickstart_sub_deps] + + +def sub(project_id, subscription_name): + """Receives messages from a Pub/Sub subscription.""" + # [START pubsub_quickstart_sub_client] + # Initialize a Subscriber client + client = pubsub_v1.SubscriberClient() + # [END pubsub_quickstart_sub_client] + # Create a fully qualified identifier in the form of + # `projects/{project_id}/subscriptions/{subscription_name}` + subscription_path = client.subscription_path( + project_id, subscription_name) + + def callback(message): + print('Received message {} of message ID {}'.format( + message, message.message_id)) + # Acknowledge the message. Unack'ed messages will be redelivered. + message.ack() + print('Acknowledged message of message ID {}\n'.format( + message.message_id)) + + client.subscribe(subscription_path, callback=callback) + print('Listening for messages on {}..\n'.format(subscription_path)) + + # Keep the main thread from exiting so the subscriber can + # process messages in the background. + while True: + time.sleep(60) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('project_id', help='Google Cloud project ID') + parser.add_argument('subscription_name', help='Pub/Sub subscription name') + + args = parser.parse_args() + + sub(args.project_id, args.subscription_name) +# [END pubsub_quickstart_sub_all] diff --git a/pubsub/cloud-client/quickstart/sub_test.py b/pubsub/cloud-client/quickstart/sub_test.py new file mode 100644 index 000000000000..9c70384ed693 --- /dev/null +++ b/pubsub/cloud-client/quickstart/sub_test.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import os +import pytest +import time + +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 + +import sub + + +PROJECT = os.environ['GCLOUD_PROJECT'] +TOPIC = 'quickstart-sub-test-topic' +SUBSCRIPTION = 'quickstart-sub-test-topic-sub' + + +@pytest.fixture(scope='module') +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope='module') +def topic_path(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.create_topic(topic_path) + except AlreadyExists: + pass + + yield topic_path + + +@pytest.fixture(scope='module') +def subscriber_client(): + yield pubsub_v1.SubscriberClient() + + +@pytest.fixture(scope='module') +def subscription(subscriber_client, topic_path): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION) + + try: + subscriber_client.create_subscription(subscription_path, topic_path) + except AlreadyExists: + pass + + yield SUBSCRIPTION + + +@pytest.fixture +def to_delete(publisher_client, subscriber_client): + doomed = [] + yield doomed + for client, item in doomed: + if 'topics' in item: + publisher_client.delete_topic(item) + if 'subscriptions' in item: + subscriber_client.delete_subscription(item) + + +def _make_sleep_patch(): + real_sleep = time.sleep + + def new_sleep(period): + if period == 60: + real_sleep(10) + raise RuntimeError('sigil') + else: + real_sleep(period) + + return mock.patch('time.sleep', new=new_sleep) + + +def test_sub(publisher_client, + topic_path, + subscriber_client, + subscription, + to_delete, + capsys): + + publisher_client.publish(topic_path, data=b'Hello, World!') + + to_delete.append((publisher_client, topic_path)) + + with _make_sleep_patch(): + with pytest.raises(RuntimeError, match='sigil'): + sub.sub(PROJECT, subscription) + + to_delete.append((subscriber_client, + 'projects/{}/subscriptions/{}'.format(PROJECT, + SUBSCRIPTION))) + + out, _ = capsys.readouterr() + assert "Received message" in out + assert "Acknowledged message" in out