Skip to content

Commit

Permalink
fix: Always remember to release a read only tx (#1613)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-spacetime authored Aug 20, 2024
1 parent 9e178bd commit 8282b67
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
4 changes: 2 additions & 2 deletions crates/core/src/db/datastore/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ impl MutTxId {
ctx,
self.timer,
self.lock_wait_time,
false,
true,
Some(&tx_data),
Some(&committed_state_write_lock),
);
Expand All @@ -780,7 +780,7 @@ impl MutTxId {
ctx,
self.timer,
self.lock_wait_time,
false,
true,
Some(&tx_data),
Some(&committed_state_write_lock),
);
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ impl ExecutionContext {
Self::new(database, None, WorkloadType::Update)
}

/// Returns an [ExecutionContext] for an incremental subscription update,
/// where this update is the result of a reducer mutation.
pub fn incremental_update_for_reducer(database: Address, ctx: ReducerContext) -> Self {
Self::new(database, Some(ctx), WorkloadType::Update)
}

/// Returns an [ExecutionContext] for an internal database operation.
pub fn internal(database: Address) -> Self {
Self::new(database, None, WorkloadType::Internal)
Expand Down
36 changes: 25 additions & 11 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,35 @@ impl ModuleSubscriptions {
let subscriptions = self.subscriptions.read();
let stdb = &self.relational_db;

let read_tx = match &mut event.status {
EventStatus::Committed(db_update) => {
let Some((tx_data, read_tx)) = stdb.commit_tx_downgrade(ctx, tx)? else {
return Ok(Err(WriteConflict));
};
*db_update = DatabaseUpdate::from_writes(&tx_data);
read_tx
}
EventStatus::Failed(_) | EventStatus::OutOfEnergy => stdb.rollback_mut_tx_downgrade(ctx, tx),
};
// Downgrade mutable tx.
// Ensure tx is released/cleaned up once out of scope.
let read_tx = scopeguard::guard(
match &mut event.status {
EventStatus::Committed(db_update) => {
let Some((tx_data, read_tx)) = stdb.commit_tx_downgrade(ctx, tx)? else {
return Ok(Err(WriteConflict));
};
*db_update = DatabaseUpdate::from_writes(&tx_data);
read_tx
}
EventStatus::Failed(_) | EventStatus::OutOfEnergy => stdb.rollback_mut_tx_downgrade(ctx, tx),
},
|tx| {
self.relational_db.release_tx(ctx, tx);
},
);
let event = Arc::new(event);

// New execution context for the incremental subscription update.
// TODO: The tx and the ExecutionContext should be coupled together.
let ctx = if let Some(reducer_ctx) = ctx.reducer_context() {
ExecutionContext::incremental_update_for_reducer(stdb.address(), reducer_ctx.clone())
} else {
ExecutionContext::incremental_update(stdb.address())
};

match &event.status {
EventStatus::Committed(_) => {
let ctx = ExecutionContext::incremental_update(stdb.address());
let slow_query_threshold = StVarTable::incr_limit(&ctx, stdb, &read_tx)?.map(Duration::from_millis);
subscriptions.eval_updates(&ctx, stdb, &read_tx, event.clone(), client, slow_query_threshold)
}
Expand Down

2 comments on commit 8282b67

@github-actions
Copy link

@github-actions github-actions bot commented on 8282b67 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

@github-actions
Copy link

@github-actions github-actions bot commented on 8282b67 Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarking failed. Please check the workflow run for details.

Please sign in to comment.