Skip to content

Commit

Permalink
Replace all remaining usage of lock() with with_lock()
Browse files Browse the repository at this point in the history
Only review this commit if you are very interested in the details.
  • Loading branch information
garypen committed Jun 6, 2024
1 parent 1f2fb32 commit a0963c4
Show file tree
Hide file tree
Showing 34 changed files with 416 additions and 439 deletions.
4 changes: 3 additions & 1 deletion apollo-router/src/axum_factory/axum_http_server_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,9 @@ impl<'a> Drop for CancelHandler<'a> {
self.span
.in_scope(|| tracing::error!("broken pipe: the client closed the connection"));
}
self.context.extensions().lock().insert(CanceledRequest);
self.context
.extensions()
.with_lock(|mut lock| lock.insert(CanceledRequest));
}
}
}
Expand Down
7 changes: 6 additions & 1 deletion apollo-router/src/context/extensions/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl ExtensionsMutex {
/// Doing so may cause performance degradation or even deadlocks.
///
/// See related clippy lint for examples: <https://rust-lang.github.io/rust-clippy/master/index.html#/await_holding_lock>
#[deprecated]
pub fn lock(&self) -> ExtensionsGuard {
ExtensionsGuard {
#[cfg(debug_assertions)]
Expand All @@ -37,7 +38,11 @@ impl ExtensionsMutex {
///
/// The lock will be dropped once the closure completes.
pub fn with_lock<'a, T, F: FnOnce(ExtensionsGuard<'a>) -> T>(&'a self, func: F) -> T {
let locked = self.lock();
let locked = ExtensionsGuard {
#[cfg(debug_assertions)]
start: Instant::now(),
guard: self.extensions.lock(),
};
func(locked)
}
}
Expand Down
15 changes: 7 additions & 8 deletions apollo-router/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,7 @@ impl Context {
#[doc(hidden)]
pub fn unsupported_executable_document(&self) -> Option<Arc<Valid<ExecutableDocument>>> {
self.extensions()
.lock()
.get::<ParsedDocument>()
.map(|d| d.executable.clone())
.with_lock(|lock| lock.get::<ParsedDocument>().map(|d| d.executable.clone()))
}
}

Expand Down Expand Up @@ -421,10 +419,11 @@ mod test {
fn context_extensions() {
// This is mostly tested in the extensions module.
let c = Context::new();
let mut extensions = c.extensions().lock();
extensions.insert(1usize);
let v = extensions.get::<usize>();
assert_eq!(v, Some(&1usize));
c.extensions().with_lock(|mut lock| lock.insert(1usize));
let v = c
.extensions()
.with_lock(|lock| lock.get::<usize>().cloned());
assert_eq!(v, Some(1usize));
}

#[test]
Expand Down Expand Up @@ -455,7 +454,7 @@ mod test {
let document =
Query::parse_document("{ me }", None, &schema, &Configuration::default()).unwrap();
assert!(c.unsupported_executable_document().is_none());
c.extensions().lock().insert(document);
c.extensions().with_lock(|mut lock| lock.insert(document));
assert!(c.unsupported_executable_document().is_some());
}
}
14 changes: 8 additions & 6 deletions apollo-router/src/plugins/authentication/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,9 @@ impl SubgraphAuth {
ServiceBuilder::new()
.map_request(move |req: SubgraphRequest| {
let signing_params = signing_params.clone();
req.context.extensions().lock().insert(signing_params);
req.context
.extensions()
.with_lock(|mut lock| lock.insert(signing_params));
req
})
.service(service)
Expand Down Expand Up @@ -644,11 +646,11 @@ mod test {
request: &SubgraphRequest,
service_name: String,
) -> hyper::Request<hyper::Body> {
let signing_params = {
let ctx = request.context.extensions().lock();
let sp = ctx.get::<Arc<SigningParamsConfig>>();
sp.cloned().unwrap()
};
let signing_params = request
.context
.extensions()
.with_lock(|lock| lock.get::<Arc<SigningParamsConfig>>().cloned())
.unwrap();

let http_request = request
.clone()
Expand Down
12 changes: 7 additions & 5 deletions apollo-router/src/plugins/authorization/authenticated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1661,11 +1661,13 @@ mod tests {
.unwrap();*/
let mut headers: MultiMap<TryIntoHeaderName, TryIntoHeaderValue> = MultiMap::new();
headers.insert("Accept".into(), "multipart/mixed;deferSpec=20220824".into());
context.extensions().lock().insert(ClientRequestAccepts {
multipart_defer: true,
multipart_subscription: true,
json: true,
wildcard: true,
context.extensions().with_lock(|mut lock| {
lock.insert(ClientRequestAccepts {
multipart_defer: true,
multipart_subscription: true,
json: true,
wildcard: true,
})
});
let request = supergraph::Request::fake_builder()
.query("query { orga(id: 1) { id creatorUser { id } ... @defer { nonNullId } } }")
Expand Down
10 changes: 6 additions & 4 deletions apollo-router/src/plugins/authorization/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,12 @@ impl AuthorizationPlugin {
.unwrap_or_default();
policies.sort();

context.extensions().lock().insert(CacheKeyMetadata {
is_authenticated,
scopes,
policies,
context.extensions().with_lock(|mut lock| {
lock.insert(CacheKeyMetadata {
is_authenticated,
scopes,
policies,
})
});
}

Expand Down
29 changes: 17 additions & 12 deletions apollo-router/src/plugins/cache/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ impl Plugin for EntityCache {
fn supergraph_service(&self, service: supergraph::BoxService) -> supergraph::BoxService {
ServiceBuilder::new()
.map_response(|mut response: supergraph::Response| {
if let Some(cache_control) = {
let lock = response.context.extensions().lock();
let cache_control = lock.get::<CacheControl>().cloned();
cache_control
} {
if let Some(cache_control) = response
.context
.extensions()
.with_lock(|lock| lock.get::<CacheControl>().cloned())
{
let _ = cache_control.to_headers(response.response.headers_mut());
}

Expand Down Expand Up @@ -437,7 +437,10 @@ async fn cache_lookup_root(

match cache_result {
Some(value) => {
request.context.extensions().lock().insert(value.0.control);
request
.context
.extensions()
.with_lock(|mut lock| lock.insert(value.0.control));

Ok(ControlFlow::Break(
subgraph::Response::builder()
Expand Down Expand Up @@ -519,12 +522,14 @@ async fn cache_lookup_entities(
}

fn update_cache_control(context: &Context, cache_control: &CacheControl) {
if let Some(c) = context.extensions().lock().get_mut::<CacheControl>() {
*c = c.merge(cache_control);
return;
}
//FIXME: race condition. We need an Entry API for private entries
context.extensions().lock().insert(cache_control.clone());
context.extensions().with_lock(|mut lock| {
if let Some(c) = lock.get_mut::<CacheControl>() {
*c = c.merge(cache_control);
} else {
//FIXME: race condition. We need an Entry API for private entries
lock.insert(cache_control.clone());
}
})
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ mod tests {
.unwrap();

let ctx = Context::new();
ctx.extensions().lock().insert::<ParsedDocument>(query);
ctx.extensions()
.with_lock(|mut lock| lock.insert::<ParsedDocument>(query));

let planner_res = planner
.call(QueryPlannerRequest::new(query_str.to_string(), None, ctx))
Expand Down
45 changes: 17 additions & 28 deletions apollo-router/src/plugins/demand_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ pub(crate) struct DemandControl {

impl DemandControl {
fn report_operation_metric(context: Context) {
let guard = context.extensions().lock();
let cost_context = guard.get::<CostContext>();
let result = cost_context.map_or("NO_CONTEXT", |c| c.result);
let result = context
.extensions()
.with_lock(|lock| lock.get::<CostContext>().map_or("NO_CONTEXT", |c| c.result));
u64_counter!(
"apollo.router.operations.demand_control",
"Total operations with demand control enabled",
Expand Down Expand Up @@ -237,7 +237,9 @@ impl Plugin for DemandControl {
let strategy = self.strategy_factory.create();
ServiceBuilder::new()
.checkpoint(move |req: execution::Request| {
req.context.extensions().lock().insert(strategy.clone());
req.context
.extensions()
.with_lock(|mut lock| lock.insert(strategy.clone()));
// On the request path we need to check for estimates, checkpoint is used to do this, short-circuiting the request if it's too expensive.
Ok(match strategy.on_execution_request(&req) {
Ok(_) => ControlFlow::Continue(req),
Expand All @@ -258,13 +260,9 @@ impl Plugin for DemandControl {
.context
.unsupported_executable_document()
.expect("must have document");
let strategy = resp
.context
.extensions()
.lock()
.get::<Strategy>()
.expect("must have strategy")
.clone();
let strategy = resp.context.extensions().with_lock(|lock| {
lock.get::<Strategy>().expect("must have strategy").clone()
});
let context = resp.context.clone();

// We want to sequence this code to run after all the subgraph responses have been scored.
Expand Down Expand Up @@ -323,13 +321,9 @@ impl Plugin for DemandControl {
} else {
ServiceBuilder::new()
.checkpoint(move |req: subgraph::Request| {
let strategy = req
.context
.extensions()
.lock()
.get::<Strategy>()
.expect("must have strategy")
.clone();
let strategy = req.context.extensions().with_lock(|lock| {
lock.get::<Strategy>().expect("must have strategy").clone()
});

// On the request path we need to check for estimates, checkpoint is used to do this, short-circuiting the request if it's too expensive.
Ok(match strategy.on_subgraph_request(&req) {
Expand All @@ -355,13 +349,9 @@ impl Plugin for DemandControl {
},
|req: Arc<Valid<ExecutableDocument>>, fut| async move {
let resp: subgraph::Response = fut.await?;
let strategy = resp
.context
.extensions()
.lock()
.get::<Strategy>()
.expect("must have strategy")
.clone();
let strategy = resp.context.extensions().with_lock(|lock| {
lock.get::<Strategy>().expect("must have strategy").clone()
});
Ok(match strategy.on_subgraph_response(req.as_ref(), &resp) {
Ok(_) => resp,
Err(err) => subgraph::Response::builder()
Expand Down Expand Up @@ -556,7 +546,7 @@ mod test {
let strategy = plugin.strategy_factory.create();

let ctx = context();
ctx.extensions().lock().insert(strategy);
ctx.extensions().with_lock(|mut lock| lock.insert(strategy));
let mut req = subgraph::Request::fake_builder()
.subgraph_name("test")
.context(ctx)
Expand All @@ -582,8 +572,7 @@ mod test {
};
let ctx = Context::new();
ctx.extensions()
.lock()
.insert(ParsedDocument::new(parsed_document));
.with_lock(|mut lock| lock.insert(ParsedDocument::new(parsed_document)));
ctx
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,20 @@ impl StrategyImpl for StaticEstimated {
self.cost_calculator
.planned(&request.query_plan)
.and_then(|cost| {
let mut extensions = request.context.extensions().lock();
let cost_result = extensions.get_or_default_mut::<CostContext>();
cost_result.estimated = cost;
if cost > self.max {
Err(
cost_result.result(DemandControlError::EstimatedCostTooExpensive {
estimated_cost: cost,
max_cost: self.max,
}),
)
} else {
Ok(())
}
request.context.extensions().with_lock(|mut lock| {
let cost_result = lock.get_or_default_mut::<CostContext>();
cost_result.estimated = cost;
if cost > self.max {
Err(
cost_result.result(DemandControlError::EstimatedCostTooExpensive {
estimated_cost: cost,
max_cost: self.max,
}),
)
} else {
Ok(())
}
})
})
}

Expand All @@ -56,9 +57,9 @@ impl StrategyImpl for StaticEstimated {
) -> Result<(), DemandControlError> {
if response.data.is_some() {
let cost = self.cost_calculator.actual(request, response)?;
let mut extensions = context.extensions().lock();
let cost_result = extensions.get_or_default_mut::<CostContext>();
cost_result.actual = cost;
context
.extensions()
.with_lock(|mut lock| lock.get_or_default_mut::<CostContext>().actual = cost);
}
Ok(())
}
Expand Down
Loading

0 comments on commit a0963c4

Please sign in to comment.