Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Extensions with_lock() to try and avoid timing issues #5360

Merged
merged 8 commits into from
Jun 10, 2024
7 changes: 7 additions & 0 deletions .changesets/feat_garypen_router_340_extensions_with_lock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Add Extensions with_lock() to try and avoid timing issues ([PR #5360](https://github.com/apollographql/router/pull/5360))

It's easy to trip over issues when interacting with Extensions because we inadvertently hold locks for too long. This can be a source of bugs in the router and causes a lot of tests to be flaky.

with_lock() avoids this kind of problem by explicitly restricting the lifetime of the Extensions lock.

By [@garypen](https://github.com/garypen) in https://github.com/apollographql/router/pull/5360
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,9 @@ impl<'a> Drop for CancelHandler<'a> {
self.span
.in_scope(|| tracing::error!("broken pipe: the client closed the connection"));
}
self.context.extensions().lock().insert(CanceledRequest);
self.context
.extensions()
.with_lock(|mut lock| lock.insert(CanceledRequest));
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions apollo-router/src/context/extensions/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,29 @@ impl ExtensionsMutex {
/// It is CRITICAL to avoid holding on to the mutex guard for too long, particularly across async calls.
/// Doing so may cause performance degradation or even deadlocks.
///
/// DEPRECATED: prefer with_lock()
///
/// See related clippy lint for examples: <https://rust-lang.github.io/rust-clippy/master/index.html#/await_holding_lock>
#[deprecated]
Copy link
Member

@abernix abernix Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is there a reason we don't use the Deprecated macro's parameters on things like this here? Doesn't that provide a better user experience?:

#[deprecated(since = "1.49.0", note = "Users should instead use with_lock")]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely renders nicer in the documentation!

pub fn lock(&self) -> ExtensionsGuard {
ExtensionsGuard {
#[cfg(debug_assertions)]
start: Instant::now(),
guard: self.extensions.lock(),
}
}

/// Locks the extensions for interaction.
///
/// The lock will be dropped once the closure completes.
pub fn with_lock<'a, T, F: FnOnce(ExtensionsGuard<'a>) -> T>(&'a self, func: F) -> T {
let locked = ExtensionsGuard {
#[cfg(debug_assertions)]
start: Instant::now(),
guard: self.extensions.lock(),
};
func(locked)
}
}

pub struct ExtensionsGuard<'a> {
Expand Down
15 changes: 7 additions & 8 deletions apollo-router/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,7 @@ impl Context {
#[doc(hidden)]
pub fn unsupported_executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
self.extensions()
.lock()
.get::<ParsedDocument>()
.map(|d| d.executable.clone())
.with_lock(|lock| lock.get::<ParsedDocument>().map(|d| d.executable.clone()))
}
}

Expand Down Expand Up @@ -421,10 +419,11 @@ mod test {
fn context_extensions() {
// This is mostly tested in the extensions module.
let c = Context::new();
let mut extensions = c.extensions().lock();
extensions.insert(1usize);
let v = extensions.get::<usize>();
assert_eq!(v, Some(&1usize));
c.extensions().with_lock(|mut lock| lock.insert(1usize));
let v = c
.extensions()
.with_lock(|lock| lock.get::<usize>().cloned());
assert_eq!(v, Some(1usize));
}

#[test]
Expand Down Expand Up @@ -455,7 +454,7 @@ mod test {
let document =
Query::parse_document("{ me }", None, &schema, &Configuration::default()).unwrap();
assert!(c.unsupported_executable_document().is_none());
c.extensions().lock().insert(document);
c.extensions().with_lock(|mut lock| lock.insert(document));
assert!(c.unsupported_executable_document().is_some());
}
}
14 changes: 8 additions & 6 deletions apollo-router/src/plugins/authentication/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ impl SubgraphAuth {
ServiceBuilder::new()
.map_request(move |req: SubgraphRequest| {
let signing_params = signing_params.clone();
req.context.extensions().lock().insert(signing_params);
req.context
.extensions()
.with_lock(|mut lock| lock.insert(signing_params));
req
})
.service(service)
Expand Down Expand Up @@ -644,11 +646,11 @@ mod test {
request: &SubgraphRequest,
service_name: String,
) -> hyper::Request<hyper::Body> {
let signing_params = {
let ctx = request.context.extensions().lock();
let sp = ctx.get::<Arc<SigningParamsConfig>>();
sp.cloned().unwrap()
};
let signing_params = request
.context
.extensions()
.with_lock(|lock| lock.get::<Arc<SigningParamsConfig>>().cloned())
.unwrap();

let http_request = request
.clone()
Expand Down
12 changes: 7 additions & 5 deletions apollo-router/src/plugins/authorization/authenticated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1661,11 +1661,13 @@ mod tests {
.unwrap();*/
let mut headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue> = MultiMap::new();
headers.insert("Accept".into(), "multipart/mixed;deferSpec=20220824".into());
context.extensions().lock().insert(ClientRequestAccepts {
multipart_defer: true,
multipart_subscription: true,
json: true,
wildcard: true,
context.extensions().with_lock(|mut lock| {
lock.insert(ClientRequestAccepts {
multipart_defer: true,
multipart_subscription: true,
json: true,
wildcard: true,
})
});
let request = supergraph::Request::fake_builder()
.query("query { orga(id: 1) { id creatorUser { id } ... @defer { nonNullId } } }")
Expand Down
10 changes: 6 additions & 4 deletions apollo-router/src/plugins/authorization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,12 @@ impl AuthorizationPlugin {
.unwrap_or_default();
policies.sort();

context.extensions().lock().insert(CacheKeyMetadata {
is_authenticated,
scopes,
policies,
context.extensions().with_lock(|mut lock| {
lock.insert(CacheKeyMetadata {
is_authenticated,
scopes,
policies,
})
});
}

Expand Down
29 changes: 17 additions & 12 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ impl Plugin for EntityCache {
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
ServiceBuilder::new()
.map_response(|mut response: supergraph::Response| {
if let Some(cache_control) = {
let lock = response.context.extensions().lock();
let cache_control = lock.get::<CacheControl>().cloned();
cache_control
} {
if let Some(cache_control) = response
.context
.extensions()
.with_lock(|lock| lock.get::<CacheControl>().cloned())
{
let _ = cache_control.to_headers(response.response.headers_mut());
}

Expand Down Expand Up @@ -437,7 +437,10 @@ async fn cache_lookup_root(

match cache_result {
Some(value) => {
request.context.extensions().lock().insert(value.0.control);
request
.context
.extensions()
.with_lock(|mut lock| lock.insert(value.0.control));

Ok(ControlFlow::Break(
subgraph::Response::builder()
Expand Down Expand Up @@ -519,12 +522,14 @@ async fn cache_lookup_entities(
}

fn update_cache_control(context: &Context, cache_control: &CacheControl) {
if let Some(c) = context.extensions().lock().get_mut::<CacheControl>() {
*c = c.merge(cache_control);
return;
}
//FIXME: race condition. We need an Entry API for private entries
context.extensions().lock().insert(cache_control.clone());
context.extensions().with_lock(|mut lock| {
if let Some(c) = lock.get_mut::<CacheControl>() {
*c = c.merge(cache_control);
} else {
//FIXME: race condition. We need an Entry API for private entries
lock.insert(cache_control.clone());
}
})
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ mod tests {
.unwrap();

let ctx = Context::new();
ctx.extensions().lock().insert::<ParsedDocument>(query);
ctx.extensions()
.with_lock(|mut lock| lock.insert::<ParsedDocument>(query));

let planner_res = planner
.call(QueryPlannerRequest::new(query_str.to_string(), None, ctx))
Expand Down
45 changes: 17 additions & 28 deletions apollo-router/src/plugins/demand_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ pub(crate) struct DemandControl {

impl DemandControl {
fn report_operation_metric(context: Context) {
let guard = context.extensions().lock();
let cost_context = guard.get::<CostContext>();
let result = cost_context.map_or("NO_CONTEXT", |c| c.result);
let result = context
.extensions()
.with_lock(|lock| lock.get::<CostContext>().map_or("NO_CONTEXT", |c| c.result));
u64_counter!(
"apollo.router.operations.demand_control",
"Total operations with demand control enabled",
Expand Down Expand Up @@ -237,7 +237,9 @@ impl Plugin for DemandControl {
let strategy = self.strategy_factory.create();
ServiceBuilder::new()
.checkpoint(move |req: execution::Request| {
req.context.extensions().lock().insert(strategy.clone());
req.context
.extensions()
.with_lock(|mut lock| lock.insert(strategy.clone()));
// On the request path we need to check for estimates, checkpoint is used to do this, short-circuiting the request if it's too expensive.
Ok(match strategy.on_execution_request(&req) {
Ok(_) => ControlFlow::Continue(req),
Expand All @@ -258,13 +260,9 @@ impl Plugin for DemandControl {
.context
.unsupported_executable_document()
.expect("must have document");
let strategy = resp
.context
.extensions()
.lock()
.get::<Strategy>()
.expect("must have strategy")
.clone();
let strategy = resp.context.extensions().with_lock(|lock| {
lock.get::<Strategy>().expect("must have strategy").clone()
});
let context = resp.context.clone();

// We want to sequence this code to run after all the subgraph responses have been scored.
Expand Down Expand Up @@ -323,13 +321,9 @@ impl Plugin for DemandControl {
} else {
ServiceBuilder::new()
.checkpoint(move |req: subgraph::Request| {
let strategy = req
.context
.extensions()
.lock()
.get::<Strategy>()
.expect("must have strategy")
.clone();
let strategy = req.context.extensions().with_lock(|lock| {
lock.get::<Strategy>().expect("must have strategy").clone()
});

// On the request path we need to check for estimates, checkpoint is used to do this, short-circuiting the request if it's too expensive.
Ok(match strategy.on_subgraph_request(&req) {
Expand All @@ -355,13 +349,9 @@ impl Plugin for DemandControl {
},
|req: Arc<Valid<ExecutableDocument>>, fut| async move {
let resp: subgraph::Response = fut.await?;
let strategy = resp
.context
.extensions()
.lock()
.get::<Strategy>()
.expect("must have strategy")
.clone();
let strategy = resp.context.extensions().with_lock(|lock| {
lock.get::<Strategy>().expect("must have strategy").clone()
});
Ok(match strategy.on_subgraph_response(req.as_ref(), &resp) {
Ok(_) => resp,
Err(err) => subgraph::Response::builder()
Expand Down Expand Up @@ -556,7 +546,7 @@ mod test {
let strategy = plugin.strategy_factory.create();

let ctx = context();
ctx.extensions().lock().insert(strategy);
ctx.extensions().with_lock(|mut lock| lock.insert(strategy));
let mut req = subgraph::Request::fake_builder()
.subgraph_name("test")
.context(ctx)
Expand All @@ -582,8 +572,7 @@ mod test {
};
let ctx = Context::new();
ctx.extensions()
.lock()
.insert(ParsedDocument::new(parsed_document));
.with_lock(|mut lock| lock.insert(ParsedDocument::new(parsed_document)));
ctx
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ impl StrategyImpl for StaticEstimated {
self.cost_calculator
.planned(&request.query_plan)
.and_then(|cost| {
let mut extensions = request.context.extensions().lock();
let cost_result = extensions.get_or_default_mut::<CostContext>();
cost_result.estimated = cost;
if cost > self.max {
Err(
cost_result.result(DemandControlError::EstimatedCostTooExpensive {
estimated_cost: cost,
max_cost: self.max,
}),
)
} else {
Ok(())
}
request.context.extensions().with_lock(|mut lock| {
let cost_result = lock.get_or_default_mut::<CostContext>();
cost_result.estimated = cost;
if cost > self.max {
Err(
cost_result.result(DemandControlError::EstimatedCostTooExpensive {
estimated_cost: cost,
max_cost: self.max,
}),
)
} else {
Ok(())
}
})
})
}

Expand All @@ -56,9 +57,9 @@ impl StrategyImpl for StaticEstimated {
) -> Result<(), DemandControlError> {
if response.data.is_some() {
let cost = self.cost_calculator.actual(request, response)?;
let mut extensions = context.extensions().lock();
let cost_result = extensions.get_or_default_mut::<CostContext>();
cost_result.actual = cost;
context
.extensions()
.with_lock(|mut lock| lock.get_or_default_mut::<CostContext>().actual = cost);
}
Ok(())
}
Expand Down
Loading
Loading