Skip to content

Commit

Permalink
Cloud Pub/Sub Quickstart V2 (#2004)
Browse files Browse the repository at this point in the history
* Quickstart V2

* Adopts Kir's suggestions

* Adopted Tim's suggestions

* proper resource deletion during teardown
  • Loading branch information
anguillanneuf committed Feb 12, 2019
1 parent 18f766a commit d49312e
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 0 deletions.
73 changes: 73 additions & 0 deletions pubsub/cloud-client/quickstart/pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START pubsub_quickstart_pub_all]
import argparse
import time
# [START pubsub_quickstart_pub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_pub_deps]


def get_callback(api_future, data):
"""Wrap message data in the context of the callback function."""

def callback(api_future):
try:
print("Published message {} now has message ID {}".format(
data, api_future.result()))
except Exception:
print("A problem occurred when publishing {}: {}\n".format(
data, api_future.exception()))
raise
return callback


def pub(project_id, topic_name):
"""Publishes a message to a Pub/Sub topic."""
# [START pubsub_quickstart_pub_client]
# Initialize a Publisher client
client = pubsub_v1.PublisherClient()
# [END pubsub_quickstart_pub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/topics/{topic_name}`
topic_path = client.topic_path(project_id, topic_name)

# Data sent to Cloud Pub/Sub must be a bytestring
data = b"Hello, World!"

# When you publish a message, the client returns a future.
api_future = client.publish(topic_path, data=data)
api_future.add_done_callback(get_callback(api_future, data))

# Keep the main thread from exiting until background message
# is processed.
while api_future.running():
time.sleep(0.1)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Google Cloud project ID')
parser.add_argument('topic_name', help='Pub/Sub topic name')

args = parser.parse_args()

pub(args.project_id, args.topic_name)
# [END pubsub_quickstart_pub_all]
61 changes: 61 additions & 0 deletions pubsub/cloud-client/quickstart/pub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pytest

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1

import pub

PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'quickstart-pub-test-topic'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture(scope='module')
def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.create_topic(topic_path)
except AlreadyExists:
pass

yield TOPIC


@pytest.fixture
def to_delete(publisher_client):
doomed = []
yield doomed
for item in doomed:
publisher_client.delete_topic(item)


def test_pub(publisher_client, topic, to_delete, capsys):
pub.pub(PROJECT, topic)

to_delete.append('projects/{}/topics/{}'.format(PROJECT, TOPIC))

out, _ = capsys.readouterr()

assert "Published message b'Hello, World!'" in out
64 changes: 64 additions & 0 deletions pubsub/cloud-client/quickstart/sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START pubsub_quickstart_sub_all]
import argparse
import time
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]


def sub(project_id, subscription_name):
"""Receives messages from a Pub/Sub subscription."""
# [START pubsub_quickstart_sub_client]
# Initialize a Subscriber client
client = pubsub_v1.SubscriberClient()
# [END pubsub_quickstart_sub_client]
# Create a fully qualified identifier in the form of
# `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = client.subscription_path(
project_id, subscription_name)

def callback(message):
print('Received message {} of message ID {}'.format(
message, message.message_id))
# Acknowledge the message. Unack'ed messages will be redelivered.
message.ack()
print('Acknowledged message of message ID {}\n'.format(
message.message_id))

client.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

# Keep the main thread from exiting so the subscriber can
# process messages in the background.
while True:
time.sleep(60)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Google Cloud project ID')
parser.add_argument('subscription_name', help='Pub/Sub subscription name')

args = parser.parse_args()

sub(args.project_id, args.subscription_name)
# [END pubsub_quickstart_sub_all]
113 changes: 113 additions & 0 deletions pubsub/cloud-client/quickstart/sub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import os
import pytest
import time

from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1

import sub


PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'quickstart-sub-test-topic'
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture(scope='module')
def topic_path(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher_client.create_topic(topic_path)
except AlreadyExists:
pass

yield topic_path


@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic_path):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.create_subscription(subscription_path, topic_path)
except AlreadyExists:
pass

yield SUBSCRIPTION


@pytest.fixture
def to_delete(publisher_client, subscriber_client):
doomed = []
yield doomed
for client, item in doomed:
if 'topics' in item:
publisher_client.delete_topic(item)
if 'subscriptions' in item:
subscriber_client.delete_subscription(item)


def _make_sleep_patch():
real_sleep = time.sleep

def new_sleep(period):
if period == 60:
real_sleep(10)
raise RuntimeError('sigil')
else:
real_sleep(period)

return mock.patch('time.sleep', new=new_sleep)


def test_sub(publisher_client,
topic_path,
subscriber_client,
subscription,
to_delete,
capsys):

publisher_client.publish(topic_path, data=b'Hello, World!')

to_delete.append((publisher_client, topic_path))

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
sub.sub(PROJECT, subscription)

to_delete.append((subscriber_client,
'projects/{}/subscriptions/{}'.format(PROJECT,
SUBSCRIPTION)))

out, _ = capsys.readouterr()
assert "Received message" in out
assert "Acknowledged message" in out

0 comments on commit d49312e

Please sign in to comment.