Skip to content

Commit

Permalink
Add PubSub system test for fix in googleapis#4174
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffkpayne committed Oct 14, 2017
1 parent 4c86521 commit 133db15
Showing 1 changed file with 64 additions and 1 deletion.
65 changes: 64 additions & 1 deletion pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from __future__ import absolute_import

from datetime import datetime
import time
import uuid

Expand Down Expand Up @@ -103,4 +104,66 @@ def test_subscribe_to_messages():
assert callback.call_count >= 50
finally:
publisher.delete_topic(topic_name)
subscriber.delete_subscription(sub_name)


def test_subscribe_to_messages_async_callbacks():
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_name = _resource_name('topic')
sub_name = _resource_name('subscription')

try:
# Create a topic.
publisher.create_topic(topic_name)

# Subscribe to the topic. This must happen before the messages
# are published.
subscriber.create_subscription(sub_name, topic_name)
subscription = subscriber.subscribe(sub_name)

# Publish some messages.
futures = [publisher.publish(
topic_name,
b'Wooooo! The claaaaaw!',
num=str(i),
) for i in range(0, 2)]

# Make sure the publish completes.
[f.result() for f in futures]

# We want to make sure that the callback was called asynchronously. So
# track when each call happened and make sure below.
call_times = []

def process_message(message):
# list.append() is thread-safe.
call_times.append(datetime.now())
time.sleep(2)
message.ack()

callback = mock.Mock(wraps=process_message)
side_effect = mock.Mock()
callback.side_effect = side_effect

# Actually open the subscription and hold it open for a few seconds.
subscription.open(callback)
for second in range(0, 5):
time.sleep(4)

# The callback should have fired at least two times, but it may
# take some time.
if callback.call_count >= 2:
first = min(call_times[:2])
last = max(call_times[:2])
diff = last - first
# "Ensure" the first two callbacks were executed asynchronously
# (sequentially would have resulted in a difference of 2+
# seconds).
assert diff.days == 0
assert diff.seconds < 2
assert side_effect.call_count >= 2

# Okay, we took too long; fail out.
assert callback.call_count >= 2
finally:
publisher.delete_topic(topic_name)

0 comments on commit 133db15

Please sign in to comment.