Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tps metric to contracts table #2468

Merged
merged 16 commits into from
Oct 8, 2024
Merged

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Sep 23, 2024

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced subscription capabilities for indexer updates in the World service.
    • Added a new RPC method, SubscribeIndexer, allowing clients to receive updates related to the indexer.
    • Enhanced transaction processing tracking with new parameters in the set_head method.
    • Added methods for clients to subscribe to StarkNet events and indexer updates.
  • Improvements

    • Updated metadata handling to reflect new naming conventions and types.
    • Improved error handling and subscription management in the gRPC client and server.
    • Added new structures for managing indexer updates and subscriptions.
  • Database Changes

    • Added new columns last_block_timestamp and tps to the contracts table for enhanced transaction metrics.

Copy link

codecov bot commented Sep 23, 2024

Codecov Report

Attention: Patch coverage is 30.47619% with 146 lines in your changes missing coverage. Please review.

Project coverage is 68.80%. Comparing base (c291c7a) to head (10c561d).
Report is 7 commits behind head on main.

Files with missing lines Patch % Lines
...tes/torii/grpc/src/server/subscriptions/indexer.rs 10.52% 68 Missing ⚠️
crates/torii/grpc/src/client.rs 0.00% 21 Missing ⚠️
crates/torii/grpc/src/server/mod.rs 17.39% 19 Missing ⚠️
crates/torii/core/src/engine.rs 37.50% 15 Missing ⚠️
crates/torii/client/src/client/mod.rs 0.00% 10 Missing ⚠️
crates/torii/grpc/src/types/mod.rs 0.00% 9 Missing ⚠️
crates/torii/core/src/executor.rs 91.30% 2 Missing ⚠️
crates/torii/core/src/sql.rs 95.65% 1 Missing ⚠️
crates/torii/core/src/types.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2468      +/-   ##
==========================================
- Coverage   68.96%   68.80%   -0.17%     
==========================================
  Files         372      377       +5     
  Lines       48568    48883     +315     
==========================================
+ Hits        33496    33635     +139     
- Misses      15072    15248     +176     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Larkooo Larkooo marked this pull request as ready for review October 4, 2024 19:15
Copy link

coderabbitai bot commented Oct 4, 2024

Walkthrough

Ohayo, sensei! This pull request introduces several enhancements across multiple files related to blockchain management and gRPC functionalities. Key changes include the addition of a new enum variant and struct for handling blockchain head updates in the executor.rs file. New structs for indexer updates are introduced in types.rs, along with updates to the world.proto file to support a new subscription RPC method. The database schema is modified to include columns for tracking block timestamps and transactions per second (TPS), collectively enhancing the system's data handling capabilities.

Changes

File Path Change Summary
crates/torii/core/src/executor.rs Added SetHead(ContractUpdated) to BrokerMessage enum. Introduced SetHeadQuery struct and modified handle_query_type to process the new query type.
crates/torii/core/src/types.rs Introduced new struct IndexerUpdate and Contract with fields for head, tps, last_block_timestamp, and contract_address.
crates/torii/grpc/proto/world.proto Added SubscribeIndexer RPC method and updated WorldMetadata method signatures. Defined new message types for indexer subscription.
crates/torii/grpc/src/server/mod.rs Added subscribe_indexer method to DojoWorld and renamed metadata to world. Updated gRPC service implementation for indexer subscriptions.
crates/torii/grpc/src/server/subscriptions/indexer.rs Introduced IndexerSubscriber and IndexerManager for managing subscriptions. Defined methods for adding and removing subscribers.
crates/torii/migrations/20240923155431_last_block_timestamp.sql Added new column last_block_timestamp to contracts table for TPS calculations.
crates/torii/migrations/20240923155431_tps.sql Added new columns last_block_timestamp and tps to contracts table for transaction metrics.

Possibly related PRs

Suggested reviewers

  • glihm

📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 4be7b4d and 10c561d.

📒 Files selected for processing (1)
  • crates/torii/client/src/client/mod.rs (3 hunks)
🧰 Additional context used
🔇 Additional comments (2)
crates/torii/client/src/client/mod.rs (2)

Line range hint 159-168: Well-implemented on_starknet_event method

Ohayo, sensei! This method correctly provides functionality for subscribing to StarkNet events. The implementation is clean and follows the existing patterns.


169-180: Effective implementation of on_indexer_updated

Ohayo, sensei! The method properly handles subscriptions to indexer updates, using the world contract address as a default when none is provided. Great work!


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Outside diff range and nitpick comments (7)
crates/torii/migrations/20240923155431_last_block_timestamp.sql (1)

1-2: Ohayo, SQL sensei! Your migration looks solid, but let's level up!

The addition of the last_block_timestamp column is a great move for TPS calculations. Here are some thoughts to make it even more sugoi:

  1. Consider adding a NOT NULL constraint with a DEFAULT value. This ensures data integrity for existing and new rows.
  2. If you need precise timing, a TIMESTAMP type might be more appropriate than INTEGER.
  3. Depending on your query patterns, an index on this column could boost performance if it's used in WHERE clauses or joins.

What do you think, sensei? Want to power up this migration even further?

Here's a potential power-up for your consideration:

-- Add last_block_timestamp column for TPS calculation
ALTER TABLE contracts ADD COLUMN last_block_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;
CREATE INDEX idx_contracts_last_block_timestamp ON contracts(last_block_timestamp);

This version uses TIMESTAMP, adds a NOT NULL constraint with a default value, and creates an index for potential performance benefits.

crates/torii/core/src/types.rs (1)

88-94: Ohayo, sensei! The new IndexerUpdate struct looks good!

The addition of the IndexerUpdate struct aligns well with the PR objective of adding a TPS metric to the contracts table. The fields chosen are appropriate for tracking indexer updates.

A few suggestions to consider:

  1. If this struct will be used in database operations or API responses, you might want to derive Serialize, Deserialize, and FromRow traits.
  2. Consider adding documentation comments to explain the purpose of each field, especially tps and head.

Would you like me to provide an example of how to add these traits and documentation, sensei?

crates/torii/grpc/proto/world.proto (1)

49-55: Ohayo, sensei! This message is packed with goodies!

The SubscribeIndexerResponse message is well-structured and provides comprehensive indexer status information. Great job including the tps (transactions per second) field - it's a valuable addition that wasn't explicitly mentioned in the PR objectives.

Consider adding a brief comment for each field to explain their purpose, especially for head and tps, as their meanings might not be immediately clear to all developers.

crates/torii/grpc/src/types/mod.rs (2)

19-25: Ohayo, sensei! The IndexerUpdate struct looks great!

The new IndexerUpdate struct is well-designed and contains all the necessary fields for representing indexer updates. The choice of types for each field is appropriate, and the derived traits provide useful functionality.

Consider adding a brief doc comment to explain the purpose of this struct and possibly describe each field. For example:

/// Represents an update from an indexer.
///
/// # Fields
/// * `head` - The current head block number.
/// * `tps` - Transactions per second.
/// * `last_block_timestamp` - Timestamp of the last processed block.
/// * `contract_address` - The address of the contract being indexed.
#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)]
pub struct IndexerUpdate {
    // ... existing fields ...
}

This will help other developers understand the purpose and usage of this struct at a glance.


27-36: Ohayo again, sensei! The From implementation looks solid!

The implementation for converting proto::world::SubscribeIndexerResponse to IndexerUpdate is straightforward and correct. It properly maps all fields and handles the conversion of contract_address from bytes to a Felt.

Consider adding error handling for the contract_address conversion. While it's likely that the input will always be valid, it's generally a good practice to handle potential errors. You could use TryFrom instead of From to allow for error handling. Here's an example:

impl TryFrom<proto::world::SubscribeIndexerResponse> for IndexerUpdate {
    type Error = &'static str;

    fn try_from(value: proto::world::SubscribeIndexerResponse) -> Result<Self, Self::Error> {
        Ok(Self {
            head: value.head,
            tps: value.tps,
            last_block_timestamp: value.last_block_timestamp,
            contract_address: Felt::from_bytes_be_slice(&value.contract_address)
                .map_err(|_| "Invalid contract address")?,
        })
    }
}

This approach allows for more robust error handling and makes it clear that the conversion might fail.

crates/torii/grpc/src/server/subscriptions/indexer.rs (1)

44-46: Clarify Comment Regarding Unlock Issue with Browsers

Ohayo sensei! The comment // NOTE: unlock issue with firefox/safari lacks context, making it difficult for others to understand its significance.

Please expand the comment to explain the specific issue with Firefox and Safari browsers and how sending an initial empty response resolves it. For example:

// NOTE: Sending an initial response to prevent connection hang in Firefox and Safari.
// These browsers expect an immediate response to establish the stream properly.

This clarification aids in maintaining the code by providing necessary background.

crates/torii/core/src/sql.rs (1)

89-89: Ohayo, sensei! Is the async keyword necessary here?

The set_head function is marked as async, but there are no await points within the function. If asynchronous behavior isn't required, consider removing async to avoid unnecessary overhead.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between c291c7a and b12107d.

📒 Files selected for processing (11)
  • crates/torii/core/src/engine.rs (7 hunks)
  • crates/torii/core/src/executor.rs (5 hunks)
  • crates/torii/core/src/sql.rs (2 hunks)
  • crates/torii/core/src/types.rs (1 hunks)
  • crates/torii/grpc/proto/world.proto (2 hunks)
  • crates/torii/grpc/src/client.rs (4 hunks)
  • crates/torii/grpc/src/server/mod.rs (9 hunks)
  • crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
  • crates/torii/grpc/src/server/subscriptions/mod.rs (1 hunks)
  • crates/torii/grpc/src/types/mod.rs (1 hunks)
  • crates/torii/migrations/20240923155431_last_block_timestamp.sql (1 hunks)
🔇 Additional comments (34)
crates/torii/grpc/src/server/subscriptions/mod.rs (1)

5-5: Ohayo, sensei! The new module looks good!

The addition of the indexer module is clean and consistent with the existing style. It's likely related to the tps metric mentioned in the PR objective.

Could you enlighten this humble reviewer about the contents of the new indexer module, sensei? It would be most helpful to understand how it relates to the tps metric for the contracts table.

✅ Verification successful

Ohayo, sensei! The indexer module has been successfully verified and aligns perfectly with the PR objectives. No issues found.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check the contents of the new indexer module
cat crates/torii/grpc/src/server/subscriptions/indexer.rs

Length of output: 3966

crates/torii/grpc/proto/world.proto (4)

10-11: Ohayo, sensei! New RPC method looks sugoi!

The addition of the SubscribeIndexer method is a great improvement. It allows clients to receive real-time updates about the indexer, which is super useful for monitoring purposes.


44-47: Sugoi message definition, sensei!

The SubscribeIndexerRequest message looks well-defined. Using bytes for the contract_address is a smart choice, as it provides flexibility for different address formats.


58-63: Konnichiwa again, sensei! Nice consistency in renaming.

The renaming of MetadataRequest and MetadataResponse to WorldMetadataRequest and WorldMetadataResponse respectively is consistent with the earlier RPC method change. This improves the overall clarity of the protocol.


14-14: Konnichiwa, sensei! Nice rename, but let's double-check something.

The update to WorldMetadata method signature is a great improvement for clarity. It now clearly indicates that it's specific to World metadata.

Could you please verify that the MetadataRequest and MetadataResponse messages have been renamed to WorldMetadataRequest and WorldMetadataResponse respectively? Here's a script to help:

✅ Verification successful

ohayo, sensei! It looks like the previous search didn't recognize .proto files. Let's try again by specifically targeting .proto files:


ohayo, sensei! The renaming of MetadataRequest and MetadataResponse to WorldMetadataRequest and WorldMetadataResponse in crates/torii/grpc/proto/world.proto has been successfully verified.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the renaming of metadata messages

# Test: Search for old and new message names
echo "Checking for old message names:"
rg --type proto 'message MetadataRequest|message MetadataResponse'

echo "Checking for new message names:"
rg --type proto 'message WorldMetadataRequest|message WorldMetadataResponse'

Length of output: 353


Script:

#!/bin/bash
# Description: Verify the renaming of metadata messages by searching .proto files

echo "Checking for old message names in .proto files:"
rg -g '*.proto' 'message MetadataRequest|message MetadataResponse'

echo "Checking for new message names in .proto files:"
rg -g '*.proto' 'message WorldMetadataRequest|message WorldMetadataResponse'

Length of output: 492

crates/torii/grpc/src/client.rs (5)

11-15: Well-placed imports for indexer subscription functionality

Ohayo, sensei! The new imports including SubscribeIndexerRequest and SubscribeIndexerResponse are correctly added, ensuring the necessary types are available for the indexer subscription features.


18-20: Correct inclusion of IndexerUpdate in type imports

Ohayo, sensei! The addition of IndexerUpdate to the imports is appropriate and aligns with the new functionality introduced for indexer updates.


74-74: Updated request type in metadata method

Ohayo, sensei! The change to use WorldMetadataRequest instead of MetadataRequest in the .world_metadata call is correct and reflects the updated request structure.


113-127: New subscribe_indexer method is well-implemented

Ohayo, sensei! The subscribe_indexer method is properly defined to subscribe to indexer updates. The usage of contract_address.to_bytes_be().to_vec() correctly converts the address for the request, and the error handling is consistent with the rest of the client implementation.


304-321: Accurate implementation of IndexerUpdateStreaming

Ohayo, sensei! The IndexerUpdateStreaming struct and its Stream trait implementation are correctly defined. This ensures seamless streaming of indexer updates. The poll_next method properly delegates to self.0.poll_next_unpin(cx).

crates/torii/core/src/executor.rs (4)

34-34: Ohayo! Added SetHead variant to BrokerMessage

Sensei, the addition of the SetHead(IndexerUpdate) variant to BrokerMessage enhances the messaging capabilities.


49-56: Ohayo! New SetHeadQuery struct looks good

Sensei, the introduction of SetHeadQuery struct is well-defined and appropriately used.


59-59: Ohayo! Extended QueryType with SetHead variant

Sensei, adding SetHead(SetHeadQuery) to QueryType is a good extension for handling new query types.


329-329: Ohayo! New SetHead variant handled correctly in send_broker_message

Sensei, great job adding the SetHead variant handling in send_broker_message. All message variants are properly covered.

crates/torii/core/src/engine.rs (7)

392-392: Ohayo sensei! Initialization of world_txns_count is appropriate

The variable world_txns_count is correctly initialized to zero for counting world transactions.


441-441: Ohayo sensei! Correctly incrementing world_txns_count

The world_txns_count variable is appropriately incremented after processing a world transaction.


458-460: Ohayo sensei! Verify set_head parameters after processing pending transactions

Again, self.db.set_head is called with data.block_number - 1. Please confirm that this is consistent with your intended logic.


480-480: Ohayo sensei! Assignment of transactions_count

Assigning transactions_count with data.transactions.len() correctly captures the number of transactions processed.


513-521: Ohayo sensei! Verify set_head parameters after processing range

When calling self.db.set_head, ensure that all parameters—data.latest_block_number, last_block_timestamp, transactions_count as u64, self.world.address—are correctly passed and that their values are appropriate.

Run the following script to compare parameters in set_head calls:

#!/bin/bash
# Description: List all calls to `set_head` and their parameters for comparison.

# Expected: Parameters should be consistent and in the correct order.

rg --type rust --no-heading -A 5 'set_head\('

413-420: Ohayo sensei! Verify correctness of set_head parameters during error handling

In the error handling block, self.db.set_head is called with data.block_number - 1. Please verify that decrementing the block number is appropriate in this context.

Run the following script to check similar usages:

✅ Verification successful

Ohayo sensei!

Action Required: Corrected Script for Verifying set_head Parameter Decrement

Please run the following script to accurately identify calls to set_head where data.block_number is decremented by 1:


Ohayo sensei!

Verification Successful: set_head Parameter Decrement is Appropriate

  • The decrementing of data.block_number by 1 in the set_head call within the error handling block is consistent with standard error handling practices to maintain system integrity.
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Find calls to `set_head` where the block number is decremented.

# Expected: Ensure that decrementing is consistent with error handling practices.

rg --type rust --no-heading -A 5 'set_head\(\s*.*block_number\s*-\s*1'

Length of output: 2293


Script:

#!/bin/bash
# Description: Find calls to `set_head` where the block number is decremented.

rg --type rust --no-heading -A 5 'set_head\(\s*data\.block_number\s*-\s*1'

Length of output: 502


161-161: Ohayo sensei! Ensure default parameters are appropriate when initializing set_head

Passing zeros for timestamp and world_txns_count in self.db.set_head may require verification to ensure they do not cause unintended side effects.

Run the following script to verify the initialization parameters:

crates/torii/grpc/src/server/mod.rs (12)

20-21: Adding necessary imports

Ohayo, sensei! The required imports for RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventsRequest, RetrieveEventsResponse, SubscribeModelsRequest, and SubscribeModelsResponse have been correctly added.


32-32: Importing IndexerManager

Ohayo, sensei! The addition of use subscriptions::indexer::IndexerManager; is spot on.


51-51: Updated imports for new functionalities

Ohayo, sensei! The imports SubscribeIndexerRequest, SubscribeIndexerResponse, WorldMetadataRequest, and WorldMetadataResponse have been correctly added to accommodate the new methods.


89-89: Adding indexer_manager to DojoWorld struct

Ohayo, sensei! Including indexer_manager: Arc<IndexerManager> in the DojoWorld struct is correctly implemented.


104-104: Initializing indexer_manager in new method

Ohayo, sensei! The indexer_manager is properly initialized using Arc::new(IndexerManager::default()).


121-122: Spawning indexer_manager service

Ohayo, sensei! The indexer_manager service is correctly spawned with tokio::task::spawn.


131-131: Including indexer_manager in DojoWorld instance

Ohayo, sensei! The indexer_manager is properly included in the returned DojoWorld instance.


137-137: Renaming metadata method to world

Ohayo, sensei! Renaming the metadata method to world enhances clarity and aligns with its functionality.


694-701: Implementing new subscribe_indexer method

Ohayo, sensei! The new asynchronous method subscribe_indexer is correctly implemented to handle indexer subscriptions.


1027-1028: Defining SubscribeIndexerResponseStream type alias

Ohayo, sensei! The type alias SubscribeIndexerResponseStream has been properly defined.


1036-1036: Implementing SubscribeIndexerStream in World trait

Ohayo, sensei! The SubscribeIndexerStream type has been correctly implemented in the World trait for DojoWorld.


1040-1049: Updating world_metadata method implementation

Ohayo, sensei! The world_metadata method has been updated to invoke self.world(), reflecting the method rename and ensuring correct functionality.

crates/torii/core/src/sql.rs (1)

18-18: Ohayo, sensei! The import of SetHeadQuery looks good.

Comment on lines 84 to 99
for (idx, sub) in subs.subscribers.read().await.iter() {
if sub.contract_address != update.contract_address {
continue;
}

let resp = SubscribeIndexerResponse {
head: update.head,
tps: update.tps,
last_block_timestamp: update.last_block_timestamp,
contract_address: update.contract_address.to_bytes_be().to_vec(),
};

if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize Subscriber Lookup for Improved Performance

Ohayo sensei! Iterating over all subscribers to match the contract_address can become inefficient as the subscriber count grows.

To enhance performance, consider indexing subscribers by contract_address. Modify the subscribers structure as follows:

-pub struct IndexerManager {
-    subscribers: RwLock<HashMap<usize, IndexerSubscriber>>,
+pub struct IndexerManager {
+    subscribers: RwLock<HashMap<Felt, HashMap<usize, IndexerSubscriber>>>,
}

Update the add_subscriber and publish_updates methods to accommodate this new structure. This change allows for quicker access to relevant subscribers based on the contract_address.

Committable suggestion was skipped due to low confidence.

Comment on lines +191 to +205
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;

let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo! Potential underflow and division by zero in TPS calculation

Sensei, in the calculation of tps, subtracting previous_block_timestamp from set_head.last_block_timestamp without checking for underflow may lead to incorrect results or panic in debug mode. If set_head.last_block_timestamp is less than previous_block_timestamp, the subtraction will underflow. Consider using checked_sub to safely handle this scenario.

Apply this diff to prevent underflow:

+let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0);
+let tps: u64 = if time_diff != 0 {
+    set_head.txns_count / time_diff
+} else {
+    set_head.txns_count
+};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count
};
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
let time_diff = set_head.last_block_timestamp.checked_sub(previous_block_timestamp).unwrap_or(0);
let tps: u64 = if time_diff != 0 {
set_head.txns_count / time_diff
} else {
set_head.txns_count
};

Comment on lines 1050 to 1059
async fn subscribe_indexer(
&self,
request: Request<SubscribeIndexerRequest>,
) -> ServiceResult<Self::SubscribeIndexerStream> {
let SubscribeIndexerRequest { contract_address } = request.into_inner();
let rx = self
.subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice()))
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential errors during contract_address conversion

Ohayo, sensei! In the subscribe_indexer method, please ensure that any potential errors from Felt::from_bytes_be_slice are properly handled. If the conversion can fail, consider returning an appropriate gRPC Status error to inform the client.

Apply this diff to handle the potential error:

 async fn subscribe_indexer(
     &self,
     request: Request<SubscribeIndexerRequest>,
 ) -> ServiceResult<Self::SubscribeIndexerStream> {
     let SubscribeIndexerRequest { contract_address } = request.into_inner();
+    let contract_felt = Felt::from_bytes_be_slice(contract_address.as_slice())
+        .map_err(|_| Status::invalid_argument("Invalid contract address"))?;
     let rx = self
-        .subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice()))
+        .subscribe_indexer(contract_felt)
         .await
         .map_err(|e| Status::internal(e.to_string()))?;
     Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream))
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn subscribe_indexer(
&self,
request: Request<SubscribeIndexerRequest>,
) -> ServiceResult<Self::SubscribeIndexerStream> {
let SubscribeIndexerRequest { contract_address } = request.into_inner();
let rx = self
.subscribe_indexer(Felt::from_bytes_be_slice(contract_address.as_slice()))
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream))
async fn subscribe_indexer(
&self,
request: Request<SubscribeIndexerRequest>,
) -> ServiceResult<Self::SubscribeIndexerStream> {
let SubscribeIndexerRequest { contract_address } = request.into_inner();
let contract_felt = Felt::from_bytes_be_slice(contract_address.as_slice())
.map_err(|_| Status::invalid_argument("Invalid contract address"))?;
let rx = self
.subscribe_indexer(contract_felt)
.await
.map_err(|e| Status::internal(e.to_string()))?;
Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream))
}

Comment on lines +106 to +107
"UPDATE contracts SET head = ?, last_block_timestamp = ? WHERE id = ?".to_string(),
vec![head_arg, last_block_timestamp_arg, id],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Should world_txns_count and contract_address be included in the SQL update?

Currently, the SQL statement only updates head and last_block_timestamp in the contracts table. If world_txns_count and contract_address need to be persisted in the database, consider including them in the update query.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (2)
crates/torii/migrations/20240923155431_tps.sql (2)

1-2: Ohayo, sensei! The new column looks good, but let's think about existing data.

The addition of the last_block_timestamp column is a solid move for TPS calculation. However, we might want to consider how this affects existing rows in the contracts table.

Consider adding a DEFAULT value or updating existing rows to ensure consistency across all records. For example:

ALTER TABLE contracts ADD COLUMN last_block_timestamp INTEGER DEFAULT 0;
-- And then update existing rows if needed
UPDATE contracts SET last_block_timestamp = /* some appropriate value */ WHERE last_block_timestamp IS NULL;

3-3: Ohayo again, sensei! The TPS column is a nice addition, but let's make it crystal clear.

Adding the tps column is a great idea for tracking performance metrics. However, we can improve on this a bit.

Consider these enhancements:

  1. Add a comment explaining the purpose of the tps column, similar to the last_block_timestamp column.
  2. Specify a DEFAULT value to handle existing and new rows consistently.
  3. Consider using a more precise data type if decimal precision is needed for TPS.

Here's an example of how you might implement these suggestions:

-- Add TPS (Transactions Per Second) column for performance tracking
ALTER TABLE contracts ADD COLUMN tps INTEGER DEFAULT 0;
-- Or if decimal precision is needed:
-- ALTER TABLE contracts ADD COLUMN tps NUMERIC(10, 2) DEFAULT 0.0;
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between b12107d and 9eb9b2a.

📒 Files selected for processing (4)
  • crates/torii/core/src/executor.rs (5 hunks)
  • crates/torii/core/src/types.rs (1 hunks)
  • crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
  • crates/torii/migrations/20240923155431_tps.sql (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/torii/core/src/types.rs
  • crates/torii/grpc/src/server/subscriptions/indexer.rs
🔇 Additional comments (5)
crates/torii/migrations/20240923155431_tps.sql (1)

1-3: Ohayo once more, sensei! Let's zoom out and look at the big picture.

These changes are laying the groundwork for some cool TPS tracking functionality. Nice work! However, let's consider a few things to ensure smooth sailing:

  1. Application Code: Make sure any related application code is updated to use these new columns effectively.
  2. Data Migration: Consider how to handle existing data. You might want to backfill these columns with appropriate values.
  3. Performance Impact: Keep an eye on how these additional columns affect query performance, especially if the contracts table is large or frequently accessed.
  4. Documentation: Update any relevant documentation to reflect these new columns and their purpose.

To ensure we're not missing any important details, let's run a quick check on the existing schema:

This will help us understand the current state of the schema and ensure we're not duplicating any existing functionality.

crates/torii/core/src/executor.rs (4)

17-18: Ohayo! LGTM for the import changes, sensei!

The import statements have been updated correctly to include Contract as ContractUpdated. This change aligns with the new functionality being added.


34-34: Ohayo! New BrokerMessage variant looks good, sensei!

The addition of SetHead(ContractUpdated) to the BrokerMessage enum is spot-on. It's a necessary change to support the new blockchain head update functionality.


49-56: Ohayo! New SetHeadQuery struct is well-crafted, sensei!

The SetHeadQuery struct is a solid addition. It contains all the necessary fields for setting the blockchain head, with appropriate types for each field. Nice work on structuring this data!


59-59: Ohayo! QueryType enum update is on point, sensei!

The addition of SetHead(SetHeadQuery) to the QueryType enum is a perfect complement to the new SetHeadQuery struct. This change enables the system to handle the new query type for setting the blockchain head.

Comment on lines +191 to +199
QueryType::SetHead(set_head) => {
let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
"SELECT last_block_timestamp FROM contracts WHERE id = ?",
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp doesn't fit in u64"))?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo! Watch out for that sneaky i64 to u64 conversion, sensei!

The conversion from i64 to u64 for previous_block_timestamp could fail if the value is negative. Consider using a safe conversion method or handling potential errors. Here's a suggestion:

let previous_block_timestamp: u64 = sqlx::query_scalar::<_, i64>(
    "SELECT last_block_timestamp FROM contracts WHERE id = ?"
)
.bind(format!("{:#x}", set_head.contract_address))
.fetch_one(&mut **tx)
.await?
.try_into()
.map_err(|_| anyhow::anyhow!("Last block timestamp is negative or doesn't fit in u64"))?;

This approach will provide a more informative error message if the conversion fails.

Comment on lines +201 to +205
let tps: u64 = if set_head.last_block_timestamp - previous_block_timestamp != 0 {
set_head.txns_count / (set_head.last_block_timestamp - previous_block_timestamp)
} else {
set_head.txns_count
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo! Let's make that TPS calculation more robust, sensei!

The current TPS calculation might not handle edge cases well. Consider using a more precise calculation method that avoids potential issues with integer division. Here's a suggestion:

let time_diff = set_head.last_block_timestamp.saturating_sub(previous_block_timestamp);
let tps = if time_diff > 0 {
    (set_head.txns_count as f64 / time_diff as f64).round() as u64
} else {
    0 // or another appropriate default value
};

This approach uses floating-point division for more precise results and handles the case where time_diff is zero or when set_head.last_block_timestamp is less than previous_block_timestamp.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
crates/torii/grpc/src/server/mod.rs (2)

694-701: LGTM with suggestion: New subscribe_indexer method added.

Ohayo, sensei! The new subscribe_indexer method looks good and matches the description in the summary. However, we should improve the error handling when converting the contract_address.

Consider adding explicit error handling for the Felt::from_bytes_be_slice conversion. Here's a suggested improvement:

 async fn subscribe_indexer(
     &self,
     contract_address: Felt,
 ) -> Result<Receiver<Result<proto::world::SubscribeIndexerResponse, tonic::Status>>, Error>
 {
-    self.indexer_manager.add_subscriber(&self.pool, contract_address).await
+    self.indexer_manager
+        .add_subscriber(&self.pool, contract_address)
+        .await
+        .map_err(|e| Error::Custom(format!("Failed to add indexer subscriber: {}", e)))
 }

This change will provide more context if the subscription fails.


1027-1059: LGTM with suggestion: New indexer subscription functionality added.

Ohayo once more, sensei! The new type alias and implementation of subscribe_indexer for the World trait look good. They're consistent with the new indexer functionality we're adding. However, we should improve the error handling when converting the contract_address.

Consider adding explicit error handling for the Felt::from_bytes_be_slice conversion. Here's a suggested improvement:

 async fn subscribe_indexer(
     &self,
     request: Request<SubscribeIndexerRequest>,
 ) -> ServiceResult<Self::SubscribeIndexerStream> {
     let SubscribeIndexerRequest { contract_address } = request.into_inner();
+    let contract_felt = Felt::from_bytes_be_slice(&contract_address)
+        .map_err(|e| Status::invalid_argument(&format!("Invalid contract address: {}", e)))?;
     let rx = self
-        .subscribe_indexer(Felt::from_bytes_be_slice(&contract_address))
+        .subscribe_indexer(contract_felt)
         .await
         .map_err(|e| Status::internal(e.to_string()))?;
     Ok(Response::new(Box::pin(ReceiverStream::new(rx)) as Self::SubscribeIndexerStream))
 }

This change will provide more specific error information if the contract address is invalid.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between caf38b2 and 938eabf.

📒 Files selected for processing (6)
  • crates/torii/core/src/executor.rs (5 hunks)
  • crates/torii/core/src/types.rs (1 hunks)
  • crates/torii/grpc/proto/world.proto (2 hunks)
  • crates/torii/grpc/src/server/mod.rs (9 hunks)
  • crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
  • crates/torii/grpc/src/types/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • crates/torii/core/src/executor.rs
  • crates/torii/core/src/types.rs
  • crates/torii/grpc/proto/world.proto
  • crates/torii/grpc/src/types/mod.rs
🔇 Additional comments (12)
crates/torii/grpc/src/server/subscriptions/indexer.rs (7)

1-22: LGTM! Imports and constant look good.

Ohayo sensei! The imports cover all the necessary dependencies for the implemented functionality. The LOG_TARGET constant is well-defined for logging purposes.


24-30: LGTM! IndexerSubscriber struct is well-defined.

Ohayo sensei! The IndexerSubscriber struct is nicely designed with a contract_address of type Felt and a sender for responses. This structure should effectively manage individual subscriptions.


77-87: LGTM! Service struct is well-designed.

Ohayo sensei! The Service struct is nicely defined with an Arc<IndexerManager> for shared ownership and a boxed stream for flexibility. The new method provides a clean initialization process.


1-140: Overall, a well-implemented subscription system with room for optimization.

Ohayo sensei! The implementation of the indexer subscription system is solid and makes good use of async Rust features. The code is well-structured and handles concurrency appropriately.

There are a few areas for potential optimization:

  1. Subscriber ID generation could be made deterministic.
  2. Subscriber lookup could be optimized for better performance with large numbers of subscribers.
  3. Concurrency limits could be implemented in the Future implementation to prevent potential resource exhaustion.

These optimizations would further enhance the robustness and scalability of the system. Great work overall!


32-74: ⚠️ Potential issue

Consider using a deterministic ID generation method.

Ohayo sensei! The IndexerManager struct and its implementation look good overall. However, there's a potential issue with the subscriber ID generation:

let id = rand::thread_rng().gen::<usize>();

This random ID generation could lead to collisions, especially as the number of subscribers grows. Consider using a deterministic method, such as an atomic counter:

use std::sync::atomic::{AtomicUsize, Ordering};

static SUBSCRIBER_ID_COUNTER: AtomicUsize = AtomicUsize::new(1);

// In add_subscriber method
let id = SUBSCRIBER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);

This approach ensures unique IDs and avoids potential collisions.


89-120: 🛠️ Refactor suggestion

Consider optimizing subscriber lookup.

Ohayo sensei! The publish_updates method looks good overall, but there's room for performance improvement:

for (idx, sub) in subs.subscribers.read().await.iter() {
    if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address {
        continue;
    }
    // ... rest of the code
}

This approach iterates over all subscribers, which could become inefficient as the number of subscribers grows. Consider indexing subscribers by contract_address to allow for quicker lookups:

pub struct IndexerManager {
    subscribers: RwLock<HashMap<Felt, HashMap<usize, IndexerSubscriber>>>,
}

You'll need to update the add_subscriber and publish_updates methods accordingly, but this change could significantly improve performance for large numbers of subscribers.


123-140: ⚠️ Potential issue

Consider implementing a concurrency limit.

Ohayo sensei! The Future implementation for Service looks good, but there's a potential issue:

while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
    let subs = Arc::clone(&pin.subs_manager);
    tokio::spawn(async move {
        // ... task code
    });
}

Spawning a new task for every event without limit could exhaust system resources if events arrive frequently. Consider implementing a concurrency limit or processing events within a fixed-size thread pool. You could use tokio::spawn with a semaphore or tokio::sync::mpsc channels to control task spawning.

crates/torii/grpc/src/server/mod.rs (5)

20-21: LGTM! New imports added.

Ohayo, sensei! The new imports for RetrieveEntitiesRequest and RetrieveEntitiesResponse look good. They're likely related to new entity retrieval functionality.


32-32: LGTM! New import for IndexerManager.

Ohayo again, sensei! The new import for IndexerManager looks good. It seems we're adding some indexer management functionality to our codebase.


51-51: LGTM! New imports for indexer subscription.

Ohayo once more, sensei! The new imports for SubscribeIndexerRequest and SubscribeIndexerResponse are in line with the indexer functionality we're adding. Everything looks good here.


Line range hint 89-131: LGTM! IndexerManager added to DojoWorld.

Ohayo, sensei! The addition of indexer_manager to the DojoWorld struct and its initialization look good. The new task spawned for the indexer service follows the existing pattern for other managers. These changes are consistent with the new indexer functionality we're implementing.


137-137: LGTM! Method renamed from metadata to world.

Ohayo, sensei! The renaming of the method from metadata to world looks good and aligns with the summary provided. However, we should ensure that all calls to this method are updated throughout the codebase.

Let's verify the usage of this method across the codebase:

✅ Verification successful

Ohayo, sensei! The method rename from metadata to world in crates/torii/grpc/src/server/mod.rs has been successfully verified. All remaining metadata instances are unrelated and do not affect this specific change.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any remaining usage of the old method name 'metadata'

# Search for 'metadata' method calls
echo "Searching for remaining 'metadata' method calls:"
rg '\bmetadata\s*\(' --type rust

# Search for 'world' method calls to confirm the update
echo "Searching for updated 'world' method calls:"
rg '\bworld\s*\(' --type rust

Length of output: 1976

crates/torii/grpc/src/server/subscriptions/indexer.rs Outdated Show resolved Hide resolved
crates/torii/grpc/src/server/subscriptions/indexer.rs Outdated Show resolved Hide resolved
crates/torii/grpc/src/server/subscriptions/indexer.rs Outdated Show resolved Hide resolved
crates/torii/grpc/src/server/subscriptions/indexer.rs Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Outside diff range and nitpick comments (1)
crates/torii/grpc/src/server/subscriptions/indexer.rs (1)

24-30: Ohayo, sensei! Consider enhancing the comment for contract_address.

The current comment for contract_address could be more descriptive. Consider updating it to provide more context about its purpose and usage.

Here's a suggested improvement:

-    /// Contract address that the subscriber is interested in
+    /// The contract address that the subscriber is monitoring for updates.
+    /// This can be a specific contract or Felt::ZERO to subscribe to all contracts.
     contract_address: Felt,
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 9f57837 and 4be7b4d.

📒 Files selected for processing (1)
  • crates/torii/grpc/src/server/subscriptions/indexer.rs (1 hunks)
🧰 Additional context used
🔇 Additional comments (2)
crates/torii/grpc/src/server/subscriptions/indexer.rs (2)

1-22: Ohayo, sensei! The imports and constant look good!

The imports cover all necessary libraries for the implemented functionality, and the LOG_TARGET constant follows a consistent naming convention.


1-144: Ohayo, sensei! Overall, a solid implementation with room for refinement!

The subscription system for indexer updates is well-structured and utilizes appropriate concurrent programming techniques. However, there are a few areas that could be improved:

  1. Enhance the uniqueness of subscriber IDs to prevent potential collisions.
  2. Strengthen SQL query construction to prevent potential SQL injection vulnerabilities.
  3. Implement non-blocking or timeout mechanisms in the publish_updates method to avoid potential bottlenecks.
  4. Add concurrency control in the poll method to prevent resource exhaustion from unbounded task spawning.

Addressing these points will further improve the robustness and efficiency of the implementation.

contract_address: Felt,
) -> Result<Receiver<Result<proto::world::SubscribeIndexerResponse, tonic::Status>>, Error>
{
let id = rand::thread_rng().gen::<usize>();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Be wary of potential ID collisions!

Using rand::thread_rng().gen::<usize>() for subscriber IDs may lead to collisions, especially with a large number of subscribers.

Consider using an atomic counter or a UUID for guaranteed unique IDs:

use std::sync::atomic::{AtomicUsize, Ordering};

static SUBSCRIBER_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);

// In add_subscriber method
let id = SUBSCRIBER_ID_COUNTER.fetch_add(1, Ordering::Relaxed);

Comment on lines +47 to +59
let mut statement =
"SELECT head, tps, last_block_timestamp, contract_address FROM contracts".to_string();

let contracts: Vec<ContractUpdated> = if contract_address != Felt::ZERO {
statement += " WHERE id = ?";

sqlx::query_as(&statement)
.bind(format!("{:#x}", contract_address))
.fetch_all(pool)
.await?
} else {
sqlx::query_as(&statement).fetch_all(pool).await?
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Watch out for potential SQL injection!

The SQL query construction could be vulnerable to SQL injection if contract_address is not properly sanitized.

Consider using parameterized queries consistently:

let contracts: Vec<ContractUpdated> = if contract_address != Felt::ZERO {
    sqlx::query_as("SELECT head, tps, last_block_timestamp, contract_address FROM contracts WHERE id = ?")
        .bind(format!("{:#x}", contract_address))
        .fetch_all(pool)
        .await?
} else {
    sqlx::query_as("SELECT head, tps, last_block_timestamp, contract_address FROM contracts")
        .fetch_all(pool)
        .await?
};

Comment on lines +93 to +124
async fn publish_updates(
subs: Arc<IndexerManager>,
update: &ContractUpdated,
) -> Result<(), Error> {
let mut closed_stream = Vec::new();
let contract_address =
Felt::from_str(&update.contract_address).map_err(ParseError::FromStr)?;

for (idx, sub) in subs.subscribers.read().await.iter() {
if sub.contract_address != Felt::ZERO && sub.contract_address != contract_address {
continue;
}

let resp = SubscribeIndexerResponse {
head: update.head,
tps: update.tps,
last_block_timestamp: update.last_block_timestamp,
contract_address: contract_address.to_bytes_be().to_vec(),
};

if sub.sender.send(Ok(resp)).await.is_err() {
closed_stream.push(*idx);
}
}

for id in closed_stream {
trace!(target = LOG_TARGET, id = %id, "Closing indexer updates stream.");
subs.remove_subscriber(id).await
}

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Be mindful of potential blocking in publish_updates!

The send operation on the subscriber's channel could potentially block if the channel is full, affecting the overall performance of the system.

Consider using a non-blocking send operation or implementing a timeout mechanism:

use tokio::time::timeout;
use std::time::Duration;

// In publish_updates method
let send_future = sub.sender.send(Ok(resp));
if timeout(Duration::from_millis(100), send_future).await.is_err() {
    closed_stream.push(*idx);
}

Comment on lines +133 to +140
while let Poll::Ready(Some(event)) = pin.simple_broker.poll_next_unpin(cx) {
let subs = Arc::clone(&pin.subs_manager);
tokio::spawn(async move {
if let Err(e) = Service::publish_updates(subs, &event).await {
error!(target = LOG_TARGET, error = %e, "Publishing indexer update.");
}
});
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Be cautious of unbounded task spawning!

Spawning a new task for every event without limit could potentially exhaust system resources if events are frequent.

Consider implementing a concurrency limit or processing events within a fixed-size thread pool:

use tokio::sync::Semaphore;

// In Service struct
semaphore: Arc<Semaphore>,

// In new method
semaphore: Arc::new(Semaphore::new(10)), // Adjust the number based on your needs

// In poll method
let permit = match pin.semaphore.clone().try_acquire_owned() {
    Ok(permit) => permit,
    Err(_) => continue, // Skip this event if we've reached the concurrency limit
};

tokio::spawn(async move {
    let _permit = permit; // Keep the permit until the task is done
    if let Err(e) = Service::publish_updates(subs, &event).await {
        error!(target = LOG_TARGET, error = %e, "Publishing indexer update.");
    }
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants