Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add stop method #9365

Merged
merged 15 commits into from
Nov 7, 2019
13 changes: 1 addition & 12 deletions pubsub/google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ def __init__(self, client, topic, settings, autocommit=True):
self._state_lock = threading.Lock()
# These members are all communicated between threads; ensure that
# any writes to them use the "state lock" to remain atomic.
self._futures = []
# _futures list should remain unchanged after batch
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
# status changed from ACCEPTING_MESSAGES to any other
# in order to avoid race conditions
self._futures = []
self._messages = []
self._size = 0
self._status = base.BatchStatus.ACCEPTING_MESSAGES
Expand Down Expand Up @@ -140,17 +140,6 @@ def status(self):
"""
return self._status

def wait(self):
"""If commit is in progress, waits until all of the futures resolved.

.. note::

This method blocks until all futures of this batch resolved.
"""
if self._status != base.BatchStatus.ACCEPTING_MESSAGES:
for future in self._futures:
future.result()

def commit(self):
"""Actually publish all of the messages on the active batch.

Expand Down
30 changes: 20 additions & 10 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def __init__(self, batch_settings=(), **kwargs):
# messages. One batch exists for each topic.
self._batch_lock = self._batch_class.make_lock()
self._batches = {}
self._is_stopped = False

@classmethod
def from_service_account_file(cls, filename, batch_settings=(), **kwargs):
Expand Down Expand Up @@ -242,7 +243,14 @@ def publish(self, topic, data, **attrs):
instance that conforms to Python Standard library's
:class:`~concurrent.futures.Future` interface (but not an
instance of that class).

Raises:
ValueError:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
If called after publisher has been stopped
by a `stop()` method call.
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""
if self._is_stopped:
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Cannot publish on a stopped publisher.")
# Sanity check: Is the data being sent as a bytestring?
# If it is literally anything else, complain loudly about it.
if not isinstance(data, six.binary_type):
Expand Down Expand Up @@ -276,20 +284,22 @@ def publish(self, topic, data, **attrs):
return future

def stop(self):
"""Immediately publish all outstanding batches.
"""Immediately publish all outstanding messages.

This asynchronously pushes all outstanding messages
and waits until all futures resolved. Method should be
invoked prior to deleting this Client object in order
to ensure that no pending messages are lost.
Asynchronously sends all outstanding messages and
prevents future calls to `publish()`. Method should
be invoked prior to deleting this `Client()` object
in order to ensure that no pending messages are lost.

.. note::

This method blocks until all futures of all
batches resolved.
This method is non-blocking. Use `Future()` objects
returned by `publish()` to make sure all messages
sent or failed to.
IlyaFaer marked this conversation as resolved.
Show resolved Hide resolved
"""
for topic in self._batches:
self._batches[topic].commit()
if self._is_stopped:
raise ValueError("Cannot stop a publisher already stopped.")

self._is_stopped = True
for topic in self._batches:
self._batches[topic].wait()
self._batches[topic].commit()
25 changes: 0 additions & 25 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,31 +252,6 @@ def test_block__commmit_api_error():
assert future.exception() == error


def test_wait():
batch = create_batch()
with mock.patch(
"google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True
) as result_mock:
futures = (batch.publish({"data": b"msg"}),)

batch._status = BatchStatus.IN_PROGRESS
futures[0]._completed.set()
batch.wait()

result_mock.assert_called()


def test_wait_not_started():
batch = create_batch()
with mock.patch(
"google.cloud.pubsub_v1.publisher.futures.Future.result", return_value=True
) as result_mock:
batch.publish({"data": b"msg"})
batch.wait()

result_mock.assert_not_called()


def test_block__commmit_retry_error():
batch = create_batch()
futures = (
Expand Down
21 changes: 11 additions & 10 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,24 +210,25 @@ def test_stop():

pubsub_msg = types.PubsubMessage(data=b"msg")

cp = mock.patch.object(batch, "commit")
wp = mock.patch.object(batch, "wait")
cp2 = mock.patch.object(batch2, "commit")
wp2 = mock.patch.object(batch2, "wait")
patch = mock.patch.object(batch, "commit")
patch2 = mock.patch.object(batch2, "commit")

with cp as c_mock, cp2 as c_mock2, wp as w_mock, wp2 as w_mock2:
with patch as commit_mock, patch2 as commit_mock2:
batch.publish(pubsub_msg)
batch2.publish(pubsub_msg)

client.stop()

# check if commit() called
c_mock.assert_called()
c_mock2.assert_called()
commit_mock.assert_called()
commit_mock2.assert_called()

# check if wait() called
w_mock.assert_called()
w_mock2.assert_called()
# check that closed publisher doesn't accept new messages
with pytest.raises(ValueError):
client.publish("topic1", pubsub_msg)

with pytest.raises(ValueError):
client.stop()


def test_gapic_instance_method():
Expand Down