diff --git a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs index 2d486715a6..936ada0ed8 100644 --- a/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs +++ b/crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs @@ -760,7 +760,7 @@ impl MutTxId { ctx, self.timer, self.lock_wait_time, - false, + true, Some(&tx_data), Some(&committed_state_write_lock), ); @@ -780,7 +780,7 @@ impl MutTxId { ctx, self.timer, self.lock_wait_time, - false, + true, Some(&tx_data), Some(&committed_state_write_lock), ); diff --git a/crates/core/src/execution_context.rs b/crates/core/src/execution_context.rs index 8cdbe86b8b..bb2fcbc1ea 100644 --- a/crates/core/src/execution_context.rs +++ b/crates/core/src/execution_context.rs @@ -212,6 +212,12 @@ impl ExecutionContext { Self::new(database, None, WorkloadType::Update) } + /// Returns an [ExecutionContext] for an incremental subscription update, + /// where this update is the result of a reducer mutation. + pub fn incremental_update_for_reducer(database: Address, ctx: ReducerContext) -> Self { + Self::new(database, Some(ctx), WorkloadType::Update) + } + /// Returns an [ExecutionContext] for an internal database operation. pub fn internal(database: Address) -> Self { Self::new(database, None, WorkloadType::Internal) diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index da5a46de13..42692f826b 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -168,21 +168,35 @@ impl ModuleSubscriptions { let subscriptions = self.subscriptions.read(); let stdb = &self.relational_db; - let read_tx = match &mut event.status { - EventStatus::Committed(db_update) => { - let Some((tx_data, read_tx)) = stdb.commit_tx_downgrade(ctx, tx)? else { - return Ok(Err(WriteConflict)); - }; - *db_update = DatabaseUpdate::from_writes(&tx_data); - read_tx - } - EventStatus::Failed(_) | EventStatus::OutOfEnergy => stdb.rollback_mut_tx_downgrade(ctx, tx), - }; + // Downgrade mutable tx. + // Ensure tx is released/cleaned up once out of scope. + let read_tx = scopeguard::guard( + match &mut event.status { + EventStatus::Committed(db_update) => { + let Some((tx_data, read_tx)) = stdb.commit_tx_downgrade(ctx, tx)? else { + return Ok(Err(WriteConflict)); + }; + *db_update = DatabaseUpdate::from_writes(&tx_data); + read_tx + } + EventStatus::Failed(_) | EventStatus::OutOfEnergy => stdb.rollback_mut_tx_downgrade(ctx, tx), + }, + |tx| { + self.relational_db.release_tx(ctx, tx); + }, + ); let event = Arc::new(event); + // New execution context for the incremental subscription update. + // TODO: The tx and the ExecutionContext should be coupled together. + let ctx = if let Some(reducer_ctx) = ctx.reducer_context() { + ExecutionContext::incremental_update_for_reducer(stdb.address(), reducer_ctx.clone()) + } else { + ExecutionContext::incremental_update(stdb.address()) + }; + match &event.status { EventStatus::Committed(_) => { - let ctx = ExecutionContext::incremental_update(stdb.address()); let slow_query_threshold = StVarTable::incr_limit(&ctx, stdb, &read_tx)?.map(Duration::from_millis); subscriptions.eval_updates(&ctx, stdb, &read_tx, event.clone(), client, slow_query_threshold) }