Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: subscription updates ordered #2507

Merged
merged 3 commits into from
Oct 9, 2024

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Oct 8, 2024

Summary by CodeRabbit

  • New Features

    • Introduced unbounded channels for improved communication in entity, event, event message, and indexer services.
    • Enhanced event and update handling processes for better performance and responsiveness.
  • Bug Fixes

    • Improved error handling during event processing and subscriber notifications.
  • Documentation

    • Updated method signatures and descriptions to reflect changes in the service structures and their functionalities.

Copy link

coderabbitai bot commented Oct 8, 2024

Walkthrough

Ohayo, sensei! This pull request introduces modifications across several files related to the Service and Manager structures, implementing unbounded channels for improved communication. Key changes include the addition of entity_sender, event_sender, update_sender, and the restructuring of methods like publish_updates. These updates enhance the asynchronous handling of entity and event updates, allowing for better separation of concerns and more flexible processing of subscriber notifications.

Changes

File Path Change Summary
crates/torii/grpc/src/server/subscriptions/entity.rs - Added entity_sender: UnboundedSender<OptimisticEntity> to Service.
- Updated poll and publish_updates methods for asynchronous entity handling.
- Modified method signatures in EntityManager.
crates/torii/grpc/src/server/subscriptions/event.rs - Added event_sender: UnboundedSender<Event> to Service.
- Restructured publish_updates to process_event.
- Updated poll method for event handling.
crates/torii/grpc/src/server/subscriptions/event_message.rs - Added event_sender: UnboundedSender<OptimisticEventMessage> to Service.
- Introduced publish_updates and process_event_update methods.
- Updated poll method for event message handling.
crates/torii/grpc/src/server/subscriptions/indexer.rs - Added update_sender: UnboundedSender<ContractUpdated> to Service.
- Introduced publish_updates method for update processing.
- Modified poll method for update handling.

Possibly related PRs

  • feat(torii-core): optimistically broadcast entity update #2466: The changes in this PR involve the introduction of OptimisticEntity and its integration into the Service struct, which is directly related to the modifications made in the main PR regarding the EntityManager and Service structures in entity.rs. Both PRs enhance the handling of entity updates and their publication mechanisms.
  • refactor-opt(torii-grpc): subscriptions no db fetch #2455: This PR refactors the subscription handling to eliminate database fetching, which aligns with the main PR's focus on improving the communication mechanism between services and subscribers, particularly through the use of unbounded channels.
  • opt-fix(torii-core): fix and optimize partial updates #2427: Although this PR focuses on optimizing partial updates, it may relate to the overall update handling improvements seen in the main PR, particularly in how updates are processed and structured within the system.

Suggested reviewers

  • glihm

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Outside diff range and nitpick comments (4)
crates/torii/grpc/src/server/subscriptions/indexer.rs (3)

12-12: Ohayo sensei! Be cautious with unbounded channels

Using unbounded channels can lead to unbounded memory growth if the producer outpaces the consumer. It's important to ensure that the system can handle this or implement backpressure mechanisms if necessary.


103-110: Sensei, consider enhancing error handling in publish_updates

While logging the error is helpful, consider whether additional error handling or recovery mechanisms are needed if process_update fails repeatedly. This could improve the robustness of your update processing.


149-155: Sensei, handle update_sender send errors more robustly

When this.update_sender.send(update) returns an error, it indicates that the receiver has been dropped. Consider implementing additional actions in this case, such as restarting the publish_updates task or cleaning up resources to ensure the system remains stable.

crates/torii/grpc/src/server/subscriptions/entity.rs (1)

84-92: Ohayo, sensei! Spawning publish_updates within new method may lead to unexpected behavior.

While spawning publish_updates here initializes the background task, starting asynchronous tasks within constructors can sometimes cause issues with error handling and lifecycle management. It's often better to have an explicit method to start background tasks after construction.

Consider refactoring the initialization to start the task outside of the new method for clearer control over the service's lifecycle.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between d039c6d and 79d8110.

📒 Files selected for processing (4)
  • crates/torii/grpc/src/server/subscriptions/entity.rs (3 hunks)
  • crates/torii/grpc/src/server/subscriptions/event.rs (3 hunks)
  • crates/torii/grpc/src/server/subscriptions/event_message.rs (3 hunks)
  • crates/torii/grpc/src/server/subscriptions/indexer.rs (3 hunks)
🧰 Additional context used
🔇 Additional comments (5)
crates/torii/grpc/src/server/subscriptions/indexer.rs (2)

85-85: Addition of update_sender looks good, sensei

Adding the update_sender to the Service struct appropriately facilitates communication via the unbounded channel.


90-98: Ohayo sensei! Initialization and task spawning are well done

The unbounded channel is correctly initialized, and the publish_updates task is appropriately spawned. This setup enhances asynchronous handling of updates.

crates/torii/grpc/src/server/subscriptions/event_message.rs (2)

75-75: Ohayo, sensei! Addition of event_sender improves communication

Adding the event_sender field to the Service struct allows for efficient communication between the event producer and the event processing task, enhancing the overall design.


80-88: Ohayo, sensei! Refactored new method enhances asynchronous handling

The introduction of the unbounded channel and the spawning of the publish_updates task in the new method improve the asynchronous processing of event updates, resulting in better separation of concerns and scalability.

crates/torii/grpc/src/server/subscriptions/entity.rs (1)

12-12: Ohayo, sensei! New imports added for unbounded channels.

The inclusion of unbounded_channel, UnboundedReceiver, and UnboundedSender is appropriate for implementing unbounded channels.

Comment on lines 71 to 85
let (event_sender, event_receiver) = unbounded_channel();
let service = Self {
simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()),
event_sender,
};

tokio::spawn(Self::publish_updates(subs_manager, event_receiver));

service
}

async fn publish_updates(
subs: Arc<EventManager>,
mut event_receiver: UnboundedReceiver<Event>,
) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider potential issues with unbounded_channel leading to unbounded memory usage.

Ohayo, sensei! The use of unbounded_channel may result in unbounded memory growth if the event_sender sends events faster than publish_updates can process them. This could lead to memory exhaustion over time. Consider using a bounded channel with an appropriate capacity or implementing backpressure mechanisms to prevent potential memory issues.

Comment on lines +214 to +220
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(event)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.event_sender.send(event) {
error!(target = LOG_TARGET, error = %e, "Sending event update to processor.");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Be cautious with unbounded channels to avoid memory issues

The use of UnboundedSender and UnboundedReceiver may lead to unbounded memory growth if events are produced faster than they are consumed. Consider using a bounded channel (tokio::sync::mpsc::channel) with an appropriate capacity to implement backpressure and prevent potential memory exhaustion.

simple_broker: Pin<Box<dyn Stream<Item = OptimisticEntity> + Send>>,
entity_sender: UnboundedSender<OptimisticEntity>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Potential memory growth due to unbounded channels in Service struct.

Using an unbounded channel (UnboundedSender<OptimisticEntity>) means that if the producer sends messages faster than the consumer can process them, it could lead to increased memory usage. Consider using a bounded channel to prevent potential memory exhaustion.

Would you like assistance in refactoring this to use a bounded channel with appropriate capacity?

Comment on lines +97 to +105
mut entity_receiver: UnboundedReceiver<OptimisticEntity>,
) {
while let Some(entity) = entity_receiver.recv().await {
if let Err(e) = Self::process_entity_update(&subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Processing entity update.");
}
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ohayo, sensei! Error handling in publish_updates may need improvement.

Currently, errors from process_entity_update are logged but not acted upon. If process_entity_update fails repeatedly, it might indicate a critical issue. You might want to implement a retry mechanism or take corrective action when such errors occur.

Would you like assistance in implementing enhanced error handling or a retry strategy?

Comment on lines +234 to +240
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Ohayo, sensei! Potential issue if entity_sender is closed in poll method.

If entity_sender.send(entity) fails, it could mean that the receiver has been dropped, possibly causing this loop to continue indefinitely without effective processing. Consider handling this case by breaking out of the loop or shutting down the service gracefully.

Here's a suggested change to handle the closed sender:

 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
     let this = self.get_mut();

     while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
-        if let Err(e) = this.entity_sender.send(entity) {
+        if this.entity_sender.send(entity).is_err() {
             error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
+            return Poll::Ready(());
         }
     }

     Poll::Pending
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &entity).await {
error!(target = LOG_TARGET, error = %e, "Publishing entity update.");
}
});
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.entity_sender.send(entity) {
error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
}
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
if this.entity_sender.send(entity).is_err() {
error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
return Poll::Ready(());
}
}
Poll::Pending
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (3)
crates/torii/grpc/src/server/subscriptions/indexer.rs (1)

92-100: Ohayo, sensei! Great refactoring of the new method!

The changes here are really solid:

  1. Returning Self is more idiomatic for a constructor-like method.
  2. Creating an unbounded channel for updates is a smart move for handling high-throughput scenarios.
  3. Spawning an async task for update processing allows for better concurrency.

One small suggestion to consider:

Consider adding a comment explaining the purpose of the spawned task. It might help future maintainers understand the design choice quickly. For example:

// Spawn a task to process updates asynchronously, improving responsiveness
tokio::spawn(Self::publish_updates(subs_manager, update_receiver));

Overall, excellent work on improving the service initialization!

crates/torii/grpc/src/server/subscriptions/event.rs (2)

73-80: LGTM! Improved event handling with concurrent processing.

Ohayo, sensei! The changes in Service::new are looking sharp. Creating an unbounded channel and spawning a new task for publish_updates allows for concurrent event processing. Nice touch!

A small suggestion: Consider using Self instead of service when returning:

Self { simple_broker: Box::pin(SimpleBroker::<Event>::subscribe()), event_sender }

This maintains consistency with Rust's idiomatic style.


82-90: LGTM! Effective separation of event reception and processing.

Ohayo, sensei! The new publish_updates method is a great addition. It nicely separates event reception from processing and includes error handling. Excellent work!

A small suggestion to improve error handling:

if let Err(e) = Self::process_event(&subs, &event).await {
    error!(target = LOG_TARGET, error = %e, event = ?event, "Failed to process event update.");
}

This change adds the event details to the error log, which could be helpful for debugging.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 79d8110 and d9a455b.

📒 Files selected for processing (4)
  • crates/torii/grpc/src/server/subscriptions/entity.rs (3 hunks)
  • crates/torii/grpc/src/server/subscriptions/event.rs (3 hunks)
  • crates/torii/grpc/src/server/subscriptions/event_message.rs (3 hunks)
  • crates/torii/grpc/src/server/subscriptions/indexer.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/torii/grpc/src/server/subscriptions/entity.rs
  • crates/torii/grpc/src/server/subscriptions/event_message.rs
🧰 Additional context used
🔇 Additional comments (5)
crates/torii/grpc/src/server/subscriptions/indexer.rs (2)

12-14: Ohayo, sensei! LGTM on the import additions!

The new imports for UnboundedReceiver and UnboundedSender are spot-on for the unbounded channel implementation. Nice work on keeping the imports tidy and relevant!


87-87: Excellent addition, sensei!

The new update_sender field is a great improvement. It allows for asynchronous and non-blocking update handling, which should enhance the overall performance of the service. Well done on implementing this unbounded channel approach!

crates/torii/grpc/src/server/subscriptions/event.rs (3)

12-14: LGTM! New imports for unbounded channels.

Ohayo, sensei! The new imports from tokio::sync::mpsc are spot on for implementing the unbounded channel. They're essential for the refactoring we're about to see. Keep up the good work!


68-68: Consider potential memory issues with unbounded channels.

Ohayo again, sensei! The addition of event_sender: UnboundedSender<Event> is a smart move for improving event handling. However, I must reiterate a concern: unbounded channels could lead to unbounded memory growth if events are sent faster than they're processed. Consider implementing backpressure mechanisms or using a bounded channel with an appropriate capacity to prevent potential memory exhaustion.

Let's check if there are any safeguards in place:

#!/bin/bash
# Description: Check for potential safeguards against unbounded memory growth

# Test: Search for any backpressure mechanisms or bounded channel usage
rg --type rust 'bounded_channel|Semaphore|RateLimiter' crates/torii/grpc/src/

173-175: LGTM! Enhanced separation of concerns in event handling.

Ohayo once more, sensei! The changes in the Future implementation for Service are spot on. Sending events to the event_sender instead of directly invoking publish_updates nicely decouples event reception from processing. The error handling for the send operation is also a great touch. Keep up the excellent work!

Comment on lines 103 to +112
async fn publish_updates(
subs: Arc<IndexerManager>,
mut update_receiver: UnboundedReceiver<ContractUpdated>,
) {
while let Some(update) = update_receiver.recv().await {
if let Err(e) = Self::process_update(&subs, &update).await {
error!(target = LOG_TARGET, error = %e, "Processing indexer update.");
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Dojo-level implementation, sensei!

The publish_updates method is a great addition:

  1. It effectively separates the concern of update processing.
  2. The use of while let for consuming from the channel is idiomatic and efficient.
  3. Error handling is in place, which is crucial for long-running tasks.

A small suggestion to enhance error handling:

Consider adding more context to the error log. This could help with debugging in the future. For example:

error!(
    target = LOG_TARGET,
    error = %e,
    contract_address = %update.contract_address,
    "Failed to process indexer update"
);

This addition provides more information about which update caused the error.

Overall, excellent work on implementing this asynchronous update processing!

Comment on lines +151 to +157
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing indexer update.");
}
});
while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
if let Err(e) = this.update_sender.send(update) {
error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor.");
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Excellent refinement of the poll method!

The changes here are well thought out:

  1. Simplifying std::pin::Pin<&mut Self> to Pin<&mut Self> improves readability.
  2. Sending updates through the update_sender aligns perfectly with the new asynchronous processing design.
  3. Error handling for the send operation is a good practice.

A small suggestion to improve error handling:

Consider breaking the loop when a send error occurs. This could prevent unnecessary CPU usage if the receiver has been dropped. For example:

while let Poll::Ready(Some(update)) = this.simple_broker.poll_next_unpin(cx) {
    if let Err(e) = this.update_sender.send(update) {
        error!(target = LOG_TARGET, error = %e, "Sending indexer update to processor failed. Stopping poll.");
        return Poll::Ready(());
    }
}

This change ensures that the service stops polling if it can't send updates anymore.

Overall, great job on refining this method to work with the new asynchronous architecture!

Copy link

codecov bot commented Oct 8, 2024

Codecov Report

Attention: Patch coverage is 62.22222% with 34 lines in your changes missing coverage. Please review.

Project coverage is 68.84%. Comparing base (d039c6d) to head (d9a455b).

Files with missing lines Patch % Lines
...ates/torii/grpc/src/server/subscriptions/entity.rs 60.86% 9 Missing ⚠️
...rii/grpc/src/server/subscriptions/event_message.rs 60.86% 9 Missing ⚠️
...tes/torii/grpc/src/server/subscriptions/indexer.rs 62.50% 9 Missing ⚠️
...rates/torii/grpc/src/server/subscriptions/event.rs 65.00% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2507      +/-   ##
==========================================
+ Coverage   68.82%   68.84%   +0.01%     
==========================================
  Files         377      377              
  Lines       48930    48978      +48     
==========================================
+ Hits        33677    33717      +40     
- Misses      15253    15261       +8     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
crates/torii/grpc/src/server/subscriptions/entity.rs (2)

88-96: LGTM! Consider adding error handling for the spawned task.

Ohayo, sensei! The changes in the new method look good. The initialization of entity_sender and entity_receiver, along with spawning a new task for publish_updates, aligns well with the refactoring objectives.

Consider adding error handling for the spawned task to gracefully handle any panics or errors:

tokio::spawn(async move {
    if let Err(e) = Self::publish_updates(subs_manager, entity_receiver).await {
        error!(target = LOG_TARGET, error = %e, "Entity update processing task failed");
    }
});

238-244: Improve error handling for closed entity_sender.

Ohayo, sensei! The changes in the poll method look good overall. However, the error handling for entity_sender.send() could be improved. If the send operation fails, it likely means that the receiver has been dropped, which could cause this loop to continue indefinitely without effective processing.

Consider handling this case by breaking out of the loop or shutting down the service gracefully. Here's a suggested change:

 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
     let this = self.get_mut();

     while let Poll::Ready(Some(entity)) = this.simple_broker.poll_next_unpin(cx) {
-        if let Err(e) = this.entity_sender.send(entity) {
-            error!(target = LOG_TARGET, error = %e, "Sending entity update to processor.");
+        if this.entity_sender.send(entity).is_err() {
+            error!(target = LOG_TARGET, "Sending entity update to processor failed. Shutting down service.");
+            return Poll::Ready(());
         }
     }

     Poll::Pending
 }

This change will cause the service to shut down if the entity_sender is closed, preventing potential infinite loops.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between d9a455b and 6e7c0f4.

📒 Files selected for processing (1)
  • crates/torii/grpc/src/server/subscriptions/entity.rs (3 hunks)
🧰 Additional context used
🔇 Additional comments (3)
crates/torii/grpc/src/server/subscriptions/entity.rs (3)

12-14: LGTM! New imports for unbounded channels.

Ohayo, sensei! The new imports from tokio::sync::mpsc are correctly added to support the introduction of unbounded channels in the code. This change aligns well with the refactoring objectives.


83-83: Consider using a bounded channel for entity_sender.

Ohayo, sensei! The use of an unbounded channel (UnboundedSender<OptimisticEntity>) in the Service struct could potentially lead to memory growth issues if the producer sends messages faster than the consumer can process them. To mitigate this risk, consider using a bounded channel with an appropriate capacity.

Would you like assistance in refactoring this to use a bounded channel with a suitable capacity?


101-109: Improve error handling in publish_updates.

Ohayo, sensei! While the changes to use UnboundedReceiver look good, the error handling in publish_updates could be improved. Currently, errors from process_entity_update are logged but not acted upon. If process_entity_update fails repeatedly, it might indicate a critical issue.

Consider implementing a retry mechanism or taking corrective action when such errors occur. Would you like assistance in implementing enhanced error handling or a retry strategy?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants