Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cloud Pub/Sub Quickstart V2 #2004

Merged
merged 10 commits into from
Feb 12, 2019
74 changes: 74 additions & 0 deletions pubsub/cloud-client/quickstart/pub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START pubsub_quickstart_pub_all]
import argparse
import time
# [START pubsub_quickstart_pub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_pub_deps]


def get_callback(api_future, data):
"""Wraps message data in the context of the callback function."""

def callback(api_future):
try:
print("Published message {} now has message ID {}".format(
data, api_future.result()))
except Exception:
print("A problem occurred when publishing {}: {}\n".format(
data, api_future.exception()))
raise
return callback


def pub(project_id, topic_name):
"""Publishes a message to a Pub/Sub topic."""
# [START pubsub_quickstart_pub_client]
# Initializes the Publisher client
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
client = pubsub_v1.PublisherClient()
# [END pubsub_quickstart_pub_client]
# Creates a fully qualified identifier in the form of
# `projects/{project_id}/topics/{topic_name}`
topic_path = client.topic_path(project_id, topic_name)

# Data sent to Cloud Pub/Sub must be a bytestring
data = "Hello, World!"
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
data = data.encode('utf-8')

# When you publish a message, the client returns a future.
api_future = client.publish(topic_path, data=data)
api_future.add_done_callback(get_callback(api_future, data))

# Keeps the main thread from exiting to handle message processing
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
# in the background.
while api_future.running():
time.sleep(0.1)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Google Cloud project ID')
parser.add_argument('topic_name', help='Pub/Sub topic name')

args = parser.parse_args()

pub(args.project_id, args.topic_name)
# [END pubsub_quickstart_pub_all]
52 changes: 52 additions & 0 deletions pubsub/cloud-client/quickstart/pub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import pytest

from google.cloud import pubsub_v1

import pub

PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'quickstart-pub-test-topic'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture(scope='module')
def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
publisher_client.delete_topic(topic_path)
except Exception:
pass

publisher_client.create_topic(topic_path)

yield TOPIC


def test_pub(topic, capsys):
pub.pub(PROJECT, topic)

out, _ = capsys.readouterr()

assert "Published message b'Hello, World!'" in out
64 changes: 64 additions & 0 deletions pubsub/cloud-client/quickstart/sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START pubsub_quickstart_sub_all]
import argparse
import time
# [START pubsub_quickstart_sub_deps]
from google.cloud import pubsub_v1
# [END pubsub_quickstart_sub_deps]


def sub(project_id, subscription_name):
"""Receives messages from a Pub/Sub subscription."""
# [START pubsub_quickstart_sub_client]
# Initializes the Subscriber client
client = pubsub_v1.SubscriberClient()
# [END pubsub_quickstart_sub_client]
# Creates a fully qualified identifier in the form of
# `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = client.subscription_path(
project_id, subscription_name)

def callback(message):
print('Received message {} of message ID {}'.format(
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
message, message.message_id))
# Acknowledges the message. Unack'ed messages will be redelivered.
message.ack()
print('Acknolwedged message of message ID {}\n'.format(
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
message.message_id))

client.subscribe(subscription_path, callback=callback)
print('Listening for messages on {}..\n'.format(subscription_path))

# The subscriber is non-blocking. We keep the main thread from exiting
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
# so it can process messages asynchronously in the background.
while True:
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
time.sleep(60)


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Google Cloud project ID')
parser.add_argument('subscription_name', help='Pub/Sub subscription name')

args = parser.parse_args()

sub(args.project_id, args.subscription_name)
# [END pubsub_quickstart_sub_all]
98 changes: 98 additions & 0 deletions pubsub/cloud-client/quickstart/sub_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
#!/usr/bin/env python

# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import os
import pytest
import time

from google.cloud import pubsub_v1

import sub


PROJECT = os.environ['GCLOUD_PROJECT']
TOPIC = 'quickstart-sub-test-topic'
SUBSCRIPTION = 'quickstart-sub-test-topic-sub'


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture(scope='module')
def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
publisher_client.delete_topic(topic_path)
except Exception:
pass

publisher_client.create_topic(topic_path)

yield topic_path


@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.delete_subscription(subscription_path)
except Exception:
pass

subscriber_client.create_subscription(subscription_path, topic=topic)

yield SUBSCRIPTION


def _publish_messages(publisher_client, topic):
data = u'Hello, World!'.encode('utf-8')
publisher_client.publish(topic, data=data)


def _make_sleep_patch():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer using the monkeypatch fixture for pytest. It can clean up after your tests so that the sleep function is restored after the test is done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need some help with this. In the meanwhile, since other tests are also written using _make_sleep_patch in the same repo, shall we start another PR just for this purpose?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the pattern I'm describing: https://stackoverflow.com/a/29110609/101923,

Now that I have a closer look, mock.patch does the same thing, but this still seems much more complicated than it needs to be.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried monkeypatch but it created an infinite loop.

def mock_sleep(sleep):
    time.sleep(10)
    raise RuntimeError('sigil')

monkeypatch.setattr(time, 'sleep', mock_sleep)

while True: time.sleep(60) would call time.sleep(10), which calls time.sleep(10) and never reaches raise RuntimeError('sigil').

real_sleep = time.sleep

def new_sleep(period):
if period == 60:
real_sleep(10)
raise RuntimeError('sigil')
else:
real_sleep(period)

return mock.patch('time.sleep', new=new_sleep)


def test_sub(publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
sub.sub(PROJECT, subscription)

out, _ = capsys.readouterr()

assert "Received message" in out