From 3cb1cee260d2f425fe57bd6f01365bf3a6d61f51 Mon Sep 17 00:00:00 2001 From: rtso <8248583+rtso@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:20:59 -0400 Subject: [PATCH] custom parallization --- .../src/processors/events/events_storer.rs | 156 +++++++++++++++++- rust/sdk/src/traits/processable.rs | 4 + 2 files changed, 156 insertions(+), 4 deletions(-) diff --git a/rust/sdk-examples/src/processors/events/events_storer.rs b/rust/sdk-examples/src/processors/events/events_storer.rs index 8c4b635..c27984f 100644 --- a/rust/sdk-examples/src/processors/events/events_storer.rs +++ b/rust/sdk-examples/src/processors/events/events_storer.rs @@ -6,17 +6,29 @@ use crate::{ use ahash::AHashMap; use anyhow::Result; use aptos_indexer_processor_sdk::{ - traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + traits::{ + async_step::AsyncRunType, processable::CustomRunType, AsyncStep, IntoRunnableStep, + NamedStep, Processable, RunnableAsyncStep, RunnableStep, + }, types::transaction_context::TransactionContext, - utils::errors::ProcessorError, + utils::{ + errors::ProcessorError, + step_metrics::{StepMetricLabels, StepMetricsBuilder}, + }, }; use async_trait::async_trait; +use bigdecimal::Zero; use diesel::{ pg::{upsert::excluded, Pg}, query_builder::QueryFragment, ExpressionMethods, }; -use tracing::{error, info}; +use instrumented_channel::{ + instrumented_bounded_channel, InstrumentedAsyncReceiver, InstrumentedAsyncSender, +}; +use std::time::{Duration, Instant}; +use tokio::task::JoinHandle; +use tracing::{error, info, warn}; pub struct EventsStorer where @@ -55,7 +67,7 @@ fn insert_events_query( impl Processable for EventsStorer { type Input = EventModel; type Output = EventModel; - type RunType = AsyncRunType; + type RunType = CustomRunType; async fn process( &mut self, @@ -91,3 +103,139 @@ impl NamedStep for EventsStorer { "EventsStorer".to_string() } } + +// This trait implementation is required if you want to customize running the step +impl IntoRunnableStep for EventsStorer { + fn into_runnable_step(self) -> impl RunnableStep { + RunnableEventsStorer::new(self) + } +} + +pub struct RunnableEventsStorer { + pub step: EventsStorer, +} + +impl RunnableEventsStorer { + pub fn new(step: EventsStorer) -> Self { + Self { step } + } +} + +impl RunnableStep for RunnableEventsStorer { + fn spawn( + self, + input_receiver: Option>>, + output_channel_size: usize, + _input_sender: Option>>, + ) -> ( + InstrumentedAsyncReceiver>, + JoinHandle<()>, + ) { + let mut step = self.step; + let step_name = step.name(); + let input_receiver = input_receiver.expect("Input receiver must be set"); + + let (output_sender, output_receiver) = + instrumented_bounded_channel(&step_name, output_channel_size); + + // TIP: You may replace this tokio task with your own code to customize the parallelization of this step + info!(step_name = step_name, "Spawning processing task"); + let handle = tokio::spawn(async move { + loop { + let input_with_context = match input_receiver.recv().await { + Ok(input_with_context) => input_with_context, + Err(e) => { + // If the previous steps have finished and the channels have closed , we should break out of the loop + warn!( + step_name = step_name, + error = e.to_string(), + "No input received from channel" + ); + break; + }, + }; + let processing_duration = Instant::now(); + let output_with_context = match step.process(input_with_context).await { + Ok(output_with_context) => output_with_context, + Err(e) => { + error!( + step_name = step_name, + error = e.to_string(), + "Failed to process input" + ); + break; + }, + }; + if let Some(output_with_context) = output_with_context { + match StepMetricsBuilder::default() + .labels(StepMetricLabels { + step_name: step.name(), + }) + .latest_processed_version(output_with_context.end_version) + .latest_transaction_timestamp( + output_with_context.get_start_transaction_timestamp_unix(), + ) + .num_transactions_processed_count( + output_with_context.get_num_transactions(), + ) + .processing_duration_in_secs(processing_duration.elapsed().as_secs_f64()) + .processed_size_in_bytes(output_with_context.total_size_in_bytes) + .build() + { + Ok(mut metrics) => metrics.log_metrics(), + Err(e) => { + error!( + step_name = step_name, + error = e.to_string(), + "Failed to log metrics" + ); + break; + }, + } + match output_sender.send(output_with_context).await { + Ok(_) => (), + Err(e) => { + error!( + step_name = step_name, + error = e.to_string(), + "Error sending output to channel" + ); + break; + }, + } + } + } + + // Wait for output channel to be empty before ending the task and closing the send channel + loop { + let channel_size = output_sender.len(); + info!( + step_name = step_name, + channel_size = channel_size, + "Waiting for output channel to be empty" + ); + if channel_size.is_zero() { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + info!( + step_name = step_name, + "Output channel is empty. Closing send channel." + ); + }); + + (output_receiver, handle) + } +} + +impl NamedStep for RunnableEventsStorer { + fn name(&self) -> String { + self.step.name() + } + + fn type_name(&self) -> String { + let step_type = std::any::type_name::().to_string(); + format!("{} (via RunnableAsyncStep)", step_type) + } +} diff --git a/rust/sdk/src/traits/processable.rs b/rust/sdk/src/traits/processable.rs index 85784bc..8f677a9 100644 --- a/rust/sdk/src/traits/processable.rs +++ b/rust/sdk/src/traits/processable.rs @@ -11,6 +11,10 @@ pub trait RunnableStepType {} // This is a dummy implementation for the unit type impl RunnableStepType for () {} +pub struct CustomRunType; + +impl RunnableStepType for CustomRunType {} + #[async_trait] pub trait Processable where