Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub: How to flush unsent messages on program exit? #6883

Closed
mckingho opened this issue Dec 10, 2018 · 5 comments · Fixed by #9365
Closed

Pubsub: How to flush unsent messages on program exit? #6883

mckingho opened this issue Dec 10, 2018 · 5 comments · Fixed by #9365
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.

Comments

@mckingho
Copy link

I have set caching configuration which may cache some messages and messages are not sent to pubsub yet. Is there a flush method when I want to exit the program.

@tseaver tseaver added type: question Request for information or clarification. Not an issue. api: pubsub Issues related to the Pub/Sub API. labels Dec 10, 2018
@tseaver tseaver changed the title [pubsub] flush message on program exits [pubsub] How to flush unsent messages on program exit? Dec 11, 2018
@tseaver tseaver changed the title [pubsub] How to flush unsent messages on program exit? [Pubsub] How to flush unsent messages on program exit? Dec 11, 2018
@tseaver tseaver changed the title [Pubsub] How to flush unsent messages on program exit? Pubsub: How to flush unsent messages on program exit? Dec 11, 2018
@anguillanneuf
Copy link
Contributor

A workaround is to use Seek and seek to the current timestamp. This way, messages that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged.

@sduskis
Copy link
Contributor

sduskis commented May 15, 2019

@plamut do you have any advice about this question?

@plamut
Copy link
Contributor

plamut commented May 16, 2019

I'm not 100% sure if I understand the question correctly, but does "caching" refer to the batch settings used when publishing messages? E.g the max_latency setting that sets how long the publisher client should wait before actually publishing a batch of messages to the server (for messages added by the publisher.publish() method)?

If so, one way of forcing the publisher to actually send the messages to the server (e.g. on program exit) is to manually invoke the batch.commit() method.

The publisher client internally stores message batches for each topic here, thus forcing a batch to commit messages early could be done with the following:

topic_batch = publisher._batches.get(TOPIC_PATH)
if topic_batch is not None:
    topic_batch.commit()

(invoking commit() is thread-safe, and will simply do nothing if batch commit is already in progress)


@mckingho Does that answer your question? Or maybe what @anguillanneuf wrote earlier? If not and "caching configuration" refers to something else, we would need a bit more info.

Looking forward to hearing again from you!

@sduskis
Copy link
Contributor

sduskis commented May 20, 2019

@plamut: I think the user wants to have a method to ensure that all messages have successfully been sent... i.e. all futures have completed.

@sduskis
Copy link
Contributor

sduskis commented May 24, 2019

We may need documentation (or a new method) for how to wait for shut down until all futures are complete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. type: question Request for information or clarification. Not an issue.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants