Skip to content

Commit

Permalink
improve nesting of tracing for invalidations
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jan 4, 2024
1 parent 8449b15 commit 522c814
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 183 deletions.
5 changes: 4 additions & 1 deletion crates/turbo-tasks-memory/src/memory_backend_with_pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,10 @@ impl<P: PersistedGraph> Backend for MemoryBackendWithPersistedGraph<P> {
)
}
};
Some(TaskExecutionSpec { future })
Some(TaskExecutionSpec {
future,
span: tracing::Span::none(),
})
}

fn task_execution_result(
Expand Down
95 changes: 65 additions & 30 deletions crates/turbo-tasks-memory/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ use rustc_hash::FxHasher;
use smallvec::SmallVec;
use stats::TaskStats;
use tokio::task_local;
use tracing::Span;
use turbo_tasks::{
backend::{PersistentTaskType, TaskExecutionSpec},
event::{Event, EventListener},
get_invalidator, registry, CellId, Invalidator, RawVc, StatsType, TaskId, TaskIdSet,
TraitTypeId, TurboTasksBackendApi, ValueTypeId,
get_invalidator, registry, CellId, Invalidator, NativeFunction, RawVc, StatsType, TaskId,
TaskIdSet, TraitType, TraitTypeId, TurboTasksBackendApi, ValueTypeId,
};

use crate::{
Expand Down Expand Up @@ -122,7 +123,9 @@ impl Display for TaskType {
enum PrepareTaskType {
#[default]
None,
Native(NativeTaskFn),
Resolve(&'static NativeFunction),
ResolveTrait(&'static TraitType),
Native(&'static NativeFunction, NativeTaskFn),
}

/// A Task is an instantiation of an Function with some arguments.
Expand Down Expand Up @@ -675,8 +678,7 @@ impl Task {
) -> Option<TaskExecutionSpec> {
let mut aggregation_context = TaskAggregationContext::new(turbo_tasks, backend);
let dependencies;
let future;
{
let (future, span) = {
let mut state = self.full_state_mut();
match state.state_type {
Done { .. } | InProgress { .. } | InProgressDirty { .. } => {
Expand Down Expand Up @@ -707,11 +709,11 @@ impl Task {
)
}
};
future = self.make_execution_future(state, backend, turbo_tasks);
}
self.make_execution_future(state, backend, turbo_tasks)
};
aggregation_context.apply_queued_updates();
self.clear_dependencies(dependencies, backend, turbo_tasks);
Some(TaskExecutionSpec { future })
Some(TaskExecutionSpec { future, span })
}

/// Prepares task execution and returns a future that will execute the task.
Expand All @@ -720,52 +722,85 @@ impl Task {
mut state: FullTaskWriteGuard<'_>,
_backend: &MemoryBackend,
turbo_tasks: &dyn TurboTasksBackendApi<MemoryBackend>,
) -> Pin<Box<dyn Future<Output = Result<RawVc>> + Send>> {
) -> (Pin<Box<dyn Future<Output = Result<RawVc>> + Send>>, Span) {
match &self.ty {
TaskType::Root(bound_fn) => {
drop(state);
bound_fn()
(bound_fn(), tracing::trace_span!("turbo_tasks::root_task"))
}
TaskType::Once(mutex) => {
drop(state);
mutex.lock().take().expect("Task can only be executed once")
(
mutex.lock().take().expect("Task can only be executed once"),
tracing::trace_span!("turbo_tasks::once_task"),
)
}
TaskType::Persistent { ty, .. } => match &**ty {
PersistentTaskType::Native(native_fn, inputs) => {
let future = if let PrepareTaskType::Native(bound_fn) = &state.prepared_type {
bound_fn()
} else {
let bound_fn = registry::get_function(*native_fn).bind(inputs);
let future = bound_fn();
state.prepared_type = PrepareTaskType::Native(bound_fn);
future
};
let result =
if let PrepareTaskType::Native(func, bound_fn) = &state.prepared_type {
let span = func.span();
let entered = span.enter();
let future = bound_fn();
drop(entered);
(future, span)
} else {
let func = registry::get_function(*native_fn);
let span = func.span();
let entered = span.enter();
let bound_fn = func.bind(inputs);
let future = bound_fn();
drop(entered);
state.prepared_type = PrepareTaskType::Native(func, bound_fn);
(future, span)
};
drop(state);
future
result
}
PersistentTaskType::ResolveNative(ref native_fn, inputs) => {
PersistentTaskType::ResolveNative(ref native_fn_id, inputs) => {
let native_fn_id = *native_fn_id;
let span = if let &PrepareTaskType::Resolve(func) = &state.prepared_type {
func.resolve_span()
} else {
let func = registry::get_function(native_fn_id);
state.prepared_type = PrepareTaskType::Resolve(func);
func.resolve_span()
};
drop(state);
let native_fn = *native_fn;
let entered = span.enter();
let inputs = inputs.clone();
let turbo_tasks = turbo_tasks.pin();
Box::pin(PersistentTaskType::run_resolve_native(
native_fn,
let future = Box::pin(PersistentTaskType::run_resolve_native(
native_fn_id,
inputs,
turbo_tasks,
))
));
drop(entered);
(future, span)
}
PersistentTaskType::ResolveTrait(trait_type, name, inputs) => {
PersistentTaskType::ResolveTrait(trait_type_id, name, inputs) => {
let trait_type_id = *trait_type_id;
let span =
if let PrepareTaskType::ResolveTrait(trait_type) = &state.prepared_type {
trait_type.resolve_span(name)
} else {
let trait_type = registry::get_trait(trait_type_id);
state.prepared_type = PrepareTaskType::ResolveTrait(trait_type);
trait_type.resolve_span(name)
};
drop(state);
let trait_type = *trait_type;
let entered = span.enter();
let name = name.clone();
let inputs = inputs.clone();
let turbo_tasks = turbo_tasks.pin();
Box::pin(PersistentTaskType::run_resolve_trait(
trait_type,
let future = Box::pin(PersistentTaskType::run_resolve_trait(
trait_type_id,
name,
inputs,
turbo_tasks,
))
));
drop(entered);
(future, span)
}
},
}
Expand Down
105 changes: 45 additions & 60 deletions crates/turbo-tasks/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
use anyhow::{anyhow, bail, Result};
use auto_hash_map::AutoMap;
use serde::{Deserialize, Serialize};
use tracing::Instrument;
use tracing::Span;

pub use crate::id::BackendJobId;
use crate::{
Expand Down Expand Up @@ -130,6 +130,7 @@ impl PersistentTaskType {

pub struct TaskExecutionSpec {
pub future: Pin<Box<dyn Future<Output = Result<RawVc>> + Send>>,
pub span: Span,
}

// TODO technically CellContent is already indexed by the ValueTypeId, so we
Expand Down Expand Up @@ -355,22 +356,14 @@ impl PersistentTaskType {
mut inputs: Vec<ConcreteTaskInput>,
turbo_tasks: Arc<dyn TurboTasksBackendApi<B>>,
) -> Result<RawVc> {
let span = tracing::trace_span!(
"turbo_tasks::resolve_call",
name = &registry::get_function(fn_id).name.as_str()
);
async move {
for i in 0..inputs.len() {
let input = unsafe { take(inputs.get_unchecked_mut(i)) };
let input = input.resolve().await?;
unsafe {
*inputs.get_unchecked_mut(i) = input;
}
for i in 0..inputs.len() {
let input = unsafe { take(inputs.get_unchecked_mut(i)) };
let input = input.resolve().await?;
unsafe {
*inputs.get_unchecked_mut(i) = input;
}
Ok(turbo_tasks.native_call(fn_id, inputs))
}
.instrument(span)
.await
Ok(turbo_tasks.native_call(fn_id, inputs))
}

pub async fn run_resolve_trait<B: Backend + 'static>(
Expand All @@ -379,56 +372,48 @@ impl PersistentTaskType {
inputs: Vec<ConcreteTaskInput>,
turbo_tasks: Arc<dyn TurboTasksBackendApi<B>>,
) -> Result<RawVc> {
let span = tracing::trace_span!(
"turbo_tasks::resolve_trait_call",
name = format!("{}::{name}", &registry::get_trait(trait_type).name),
);
async move {
let mut resolved_inputs = Vec::with_capacity(inputs.len());
let mut iter = inputs.into_iter();
if let Some(this) = iter.next() {
let this = this.resolve().await?;
let this_value = this.clone().resolve_to_value().await?;
match this_value.get_trait_method(trait_type, name) {
Ok(native_fn) => {
resolved_inputs.push(this);
for input in iter {
resolved_inputs.push(input)
}
Ok(turbo_tasks.dynamic_call(native_fn, resolved_inputs))
let mut resolved_inputs = Vec::with_capacity(inputs.len());
let mut iter = inputs.into_iter();
if let Some(this) = iter.next() {
let this = this.resolve().await?;
let this_value = this.clone().resolve_to_value().await?;
match this_value.get_trait_method(trait_type, name) {
Ok(native_fn) => {
resolved_inputs.push(this);
for input in iter {
resolved_inputs.push(input)
}
Err(name) => {
if !this_value.has_trait(trait_type) {
let traits =
this_value
.traits()
.iter()
.fold(String::new(), |mut out, t| {
let _ = write!(out, " {}", t);
out
});
Err(anyhow!(
"{} doesn't implement {} (only{})",
this_value,
registry::get_trait(trait_type),
traits,
))
} else {
Err(anyhow!(
"{} implements trait {}, but method {} is missing",
this_value,
registry::get_trait(trait_type),
name
))
}
Ok(turbo_tasks.dynamic_call(native_fn, resolved_inputs))
}
Err(name) => {
if !this_value.has_trait(trait_type) {
let traits =
this_value
.traits()
.iter()
.fold(String::new(), |mut out, t| {
let _ = write!(out, " {}", t);
out
});
Err(anyhow!(
"{} doesn't implement {} (only{})",
this_value,
registry::get_trait(trait_type),
traits,
))
} else {
Err(anyhow!(
"{} implements trait {}, but method {} is missing",
this_value,
registry::get_trait(trait_type),
name
))
}
}
} else {
panic!("No arguments for trait call");
}
} else {
panic!("No arguments for trait call");
}
.instrument(span)
.await
}

pub fn run<B: Backend + 'static>(
Expand Down
Loading

0 comments on commit 522c814

Please sign in to comment.