Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal: message processing is now event-based #500

Merged
merged 1 commit into from
Nov 3, 2023

Conversation

odesenfans
Copy link
Contributor

Problem: the pending message fetcher and processor use a polling loop to look for messages to fetch/process. This leads to some latency when the pending_messages table is empty as the task sleeps while waiting for new pending messages.

Solution: add an exchange + queue in RabbitMQ to signal the arrival of new messages. To avoid modifying the message processor too much and avoid depending on coherency between the DB and RabbitMQ, the fetcher and processor simply spawn a new task that looks for messages and sets an asyncio Event object. The main fetching/processing loop waits on this event (with a timeout).

Note that this system is not used for retries as this would require another task that posts messages to the MQ on their next attempt. Retried messages simply wait for the next iteration of the loop (every second).

This solution has the following advantages and drawbacks:

  • No more arbitrary latency when processing new messages
  • No major modification of the pipeline, even if the MQ system fails for some reason the pending message processor will still process messages every second
  • No dependency on the state of the message queue, if the RabbitMQ queue is deleted for any reason the processor will keep on working
  • RabbitMQ overhead (one more exchange + queue).

Problem: the pending message fetcher and processor use a polling loop to
look for messages to fetch/process. This leads to some latency when the
pending_messages table is empty as the task sleeps while waiting for new
pending messages.

Solution: add an exchange + queue in RabbitMQ to signal the arrival of
new messages. To avoid modifying the message processor too much and
avoid depending on coherency between the DB and RabbitMQ, the fetcher
and processor simply spawn a new task that looks for messages and sets
an asyncio Event object. The main fetching/processing loop waits on this
event (with a timeout).

Note that this system is not used for retries as this would require
another task that posts messages to the MQ on their next attempt.
Retried messages simply wait for the next iteration of the loop
(every second).

This solution has the following advantages and drawbacks:
+ No more arbitrary latency when processing new messages
+ No major modification of the pipeline, even if the MQ system fails
  for some reason the pending message processor will still process
  messages every second
+ No dependency on the state of the message queue, if the RabbitMQ queue
  is deleted for any reason the processor will keep on working
- RabbitMQ overhead (one more exchange + queue).
@odesenfans odesenfans force-pushed the od-event-based-message-processor branch from e357a60 to f95ae30 Compare November 2, 2023 15:17
@odesenfans odesenfans merged commit c4543fa into dev Nov 3, 2023
2 checks passed
@odesenfans odesenfans deleted the od-event-based-message-processor branch November 3, 2023 10:47
hoh pushed a commit that referenced this pull request Dec 8, 2023
Problem: the pending message fetcher and processor use a polling loop to
look for messages to fetch/process. This leads to some latency when the
pending_messages table is empty as the task sleeps while waiting for new
pending messages.

Solution: add an exchange + queue in RabbitMQ to signal the arrival of
new messages. To avoid modifying the message processor too much and
avoid depending on coherency between the DB and RabbitMQ, the fetcher
and processor simply spawn a new task that looks for messages and sets
an asyncio Event object. The main fetching/processing loop waits on this
event (with a timeout).

Note that this system is not used for retries as this would require
another task that posts messages to the MQ on their next attempt.
Retried messages simply wait for the next iteration of the loop
(every second).

This solution has the following advantages and drawbacks:
+ No more arbitrary latency when processing new messages
+ No major modification of the pipeline, even if the MQ system fails
  for some reason the pending message processor will still process
  messages every second
+ No dependency on the state of the message queue, if the RabbitMQ queue
  is deleted for any reason the processor will keep on working
- RabbitMQ overhead (one more exchange + queue).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant