From 962d032104916351fef02b3200a240a3d39bec0a Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 12 Aug 2022 12:26:05 +0200 Subject: [PATCH 1/6] implement map_deferred_response in rhai we need a specific function to map over deferred responses, so we can make the distinction with the first one, where we have access to headers --- apollo-router/src/plugins/rhai.rs | 296 +++++++++++++++++++++++++++++- 1 file changed, 294 insertions(+), 2 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index 48ab13e275..f1b164eb39 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -96,6 +96,7 @@ mod router { pub(crate) type Request = RouterRequest; pub(crate) type Response = RhaiRouterResponse; + pub(crate) type DeferredResponse = RhaiRouterDeferredResponse; pub(crate) type Service = BoxService; } @@ -112,6 +113,7 @@ mod execution { pub(crate) type Request = ExecutionRequest; pub(crate) type Response = RhaiExecutionResponse; + pub(crate) type DeferredResponse = RhaiExecutionDeferredResponse; pub(crate) type Service = BoxService; } @@ -242,6 +244,20 @@ mod router_plugin_mod { Ok(obj.with_mut(|response| response.response.body().clone())) } + #[rhai_fn(get = "body", pure, return_raw)] + pub(crate) fn get_originating_body_router_deferred_response( + obj: &mut SharedMut, + ) -> Result> { + Ok(obj.with_mut(|response| response.response.clone())) + } + + #[rhai_fn(get = "body", pure, return_raw)] + pub(crate) fn get_originating_body_execution_deferred_response( + obj: &mut SharedMut, + ) -> Result> { + Ok(obj.with_mut(|response| response.response.clone())) + } + #[rhai_fn(set = "headers", return_raw)] pub(crate) fn set_originating_headers_router_response( obj: &mut SharedMut, @@ -296,6 +312,24 @@ mod router_plugin_mod { Ok(()) } + #[rhai_fn(set = "body", return_raw)] + pub(crate) fn set_originating_body_router_deferred_response( + obj: &mut SharedMut, + body: Response, + ) -> Result<(), Box> { + obj.with_mut(|response| response.response = body); + Ok(()) + } + + #[rhai_fn(set = "body", return_raw)] + pub(crate) fn set_originating_body_execution_deferred_response( + obj: &mut SharedMut, + body: Response, + ) -> Result<(), Box> { + obj.with_mut(|response| response.response = body); + Ok(()) + } + pub(crate) fn map_request(rhai_service: &mut RhaiService, callback: FnPtr) { rhai_service .service @@ -307,6 +341,12 @@ mod router_plugin_mod { .service .map_response(rhai_service.clone(), callback) } + + pub(crate) fn map_deferred_response(rhai_service: &mut RhaiService, callback: FnPtr) { + rhai_service + .service + .map_deferred_response(rhai_service.clone(), callback) + } } /// Plugin which implements Rhai functionality @@ -571,11 +611,21 @@ pub(crate) struct RhaiExecutionResponse { response: http_ext::Response, } +pub(crate) struct RhaiExecutionDeferredResponse { + context: Context, + response: Response, +} + pub(crate) struct RhaiRouterResponse { context: Context, response: http_ext::Response, } +pub(crate) struct RhaiRouterDeferredResponse { + context: Context, + response: Response, +} + macro_rules! if_subgraph { ( subgraph => $subgraph: block else $not_subgraph: block ) => { $subgraph @@ -911,7 +961,6 @@ impl ServiceStep { let RhaiRouterResponse { context, response } = response_opt.unwrap(); let (parts, body) = http::Response::from(response).into_parts(); - //FIXME we should also map over the stream of future responses let response = http::Response::from_parts( parts, once(ready(body)).chain(rest).boxed(), @@ -1011,7 +1060,6 @@ impl ServiceStep { let RhaiExecutionResponse { context, response } = response_opt.unwrap(); let (parts, body) = http::Response::from(response).into_parts(); - //FIXME we should also map over the stream of future responses let response = http::Response::from_parts( parts, once(ready(body)).chain(rest).boxed(), @@ -1027,6 +1075,220 @@ impl ServiceStep { } } } + + fn map_deferred_response(&mut self, rhai_service: RhaiService, callback: FnPtr) { + match self { + ServiceStep::Router(service) => { + service.replace(|service| { + BoxService::new(service.and_then( + |router_response: RouterResponse| async move { + // Let's define a local function to build an error response + // XXX: This isn't ideal. We already have a response, so ideally we'd + // like to append this error into the existing response. However, + // the significantly different treatment of errors in different + // response types makes this extremely painful. This needs to be + // re-visited at some point post GA. + fn failure_message( + context: Context, + msg: String, + status: StatusCode, + ) -> RouterResponse { + let res = RouterResponse::error_builder() + .errors(vec![Error { + message: msg, + ..Default::default() + }]) + .status_code(status) + .context(context) + .build() + .expect("can't fail to build our error message"); + res + } + + // we split the response stream into headers+first response, then a stream of deferred responses + // for which we will implement mapping later + let RouterResponse { response, context } = router_response; + let (parts, stream) = http::Response::from(response).into_parts(); + let (first, rest) = stream.into_future().await; + + if first.is_none() { + return Ok(failure_message( + context, + "rhai execution error: empty response".to_string(), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + let first = first.unwrap(); + let ctx = context.clone(); + + let mapped_stream = rest.filter_map(move |deferred_response| { + let rhai_service = rhai_service.clone(); + let context = context.clone(); + let callback = callback.clone(); + async move { + let response = RhaiRouterDeferredResponse { + context, + response: deferred_response, + }; + let shared_response = Shared::new(Mutex::new(Some(response))); + + let result: Result = if callback.is_curried() { + callback + .call( + &rhai_service.engine, + &rhai_service.ast, + (shared_response.clone(),), + ) + .map_err(|err| err.to_string()) + } else { + let mut scope = rhai_service.scope.clone(); + rhai_service + .engine + .call_fn( + &mut scope, + &rhai_service.ast, + callback.fn_name(), + (shared_response.clone(),), + ) + .map_err(|err| err.to_string()) + }; + if let Err(error) = result { + tracing::error!( + "map_deferred_response callback failed: {error}" + ); + return None; + } + + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + let RhaiRouterDeferredResponse { response, .. } = + response_opt.unwrap(); + Some(response) + } + }); + + let response = http::Response::from_parts( + parts, + once(ready(first)).chain(mapped_stream).boxed(), + ) + .into(); + Ok(RouterResponse { + context: ctx, + response, + }) + }, + )) + }) + } + ServiceStep::QueryPlanner(_service) => { + tracing::error!("rhai execution error: map_deferred_response is not supported for query planner plugins"); + } + ServiceStep::Execution(service) => { + service.replace(|service| { + service + .and_then(|execution_response: ExecutionResponse| async move { + // Let's define a local function to build an error response + // XXX: This isn't ideal. We already have a response, so ideally we'd + // like to append this error into the existing response. However, + // the significantly different treatment of errors in different + // response types makes this extremely painful. This needs to be + // re-visited at some point post GA. + fn failure_message( + context: Context, + msg: String, + status: StatusCode, + ) -> ExecutionResponse { + ExecutionResponse::error_builder() + .errors(vec![Error { + message: msg, + ..Default::default() + }]) + .status_code(status) + .context(context) + .build() + .expect("can't fail to build our error message") + } + + // we split the response stream into headers+first response, then a stream of deferred responses + // for which we will implement mapping later + let ExecutionResponse { response, context } = execution_response; + let (parts, stream) = http::Response::from(response).into_parts(); + let (first, rest) = stream.into_future().await; + + if first.is_none() { + return Ok(failure_message( + context, + "rhai execution error: empty response".to_string(), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + let first = first.unwrap(); + let ctx = context.clone(); + + let mapped_stream = rest.filter_map(move |deferred_response| { + let rhai_service = rhai_service.clone(); + let context = context.clone(); + let callback = callback.clone(); + async move { + let response = RhaiExecutionDeferredResponse { + context, + response: deferred_response, + }; + let shared_response = Shared::new(Mutex::new(Some(response))); + let result: Result = if callback.is_curried() { + callback + .call( + &rhai_service.engine, + &rhai_service.ast, + (shared_response.clone(),), + ) + .map_err(|err| err.to_string()) + } else { + let mut scope = rhai_service.scope.clone(); + rhai_service + .engine + .call_fn( + &mut scope, + &rhai_service.ast, + callback.fn_name(), + (shared_response.clone(),), + ) + .map_err(|err| err.to_string()) + }; + if let Err(error) = result { + tracing::error!( + "map_deferred_response callback failed: {error}" + ); + return None; + } + + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + let RhaiExecutionDeferredResponse { response, .. } = + response_opt.unwrap(); + + Some(response) + } + }); + + let response = http::Response::from_parts( + parts, + once(ready(first)).chain(mapped_stream).boxed(), + ) + .into(); + Ok(ExecutionResponse { + context: ctx, + response, + }) + }) + .boxed() + }) + } + ServiceStep::Subgraph(_service) => { + tracing::error!("rhai execution error: map_deferred_response is not supported for subgraph plugins"); + } + } + } } #[derive(Clone, Debug)] @@ -1385,6 +1647,36 @@ impl Rhai { }, ); + engine + .register_get_result( + "context", + |obj: &mut SharedMut| { + Ok(obj.with_mut(|response| response.context.clone())) + }, + ) + .register_set_result( + "context", + |obj: &mut SharedMut, context: Context| { + obj.with_mut(|response| response.context = context); + Ok(()) + }, + ); + + engine + .register_get_result( + "context", + |obj: &mut SharedMut| { + Ok(obj.with_mut(|response| response.context.clone())) + }, + ) + .register_set_result( + "context", + |obj: &mut SharedMut, context: Context| { + obj.with_mut(|response| response.context = context); + Ok(()) + }, + ); + engine } From 9897190586644f24c33d872ddcc0a3d4638ef9d7 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Tue, 16 Aug 2022 11:16:24 +0200 Subject: [PATCH 2/6] factor some code --- apollo-router/src/plugins/rhai.rs | 159 ++++++++---------------------- 1 file changed, 39 insertions(+), 120 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index c15b1d1ba5..3f9c3b7924 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -26,6 +26,7 @@ use rhai::Dynamic; use rhai::Engine; use rhai::EvalAltResult; use rhai::FnPtr; +use rhai::FuncArgs; use rhai::Instant; use rhai::Map; use rhai::Scope; @@ -735,26 +736,9 @@ impl ServiceStep { Ok(ControlFlow::Break(res)) } let shared_request = Shared::new(Mutex::new(Some(request))); - let result: Result = if callback.is_curried() { - callback - .call( - &rhai_service.engine, - &rhai_service.ast, - (shared_request.clone(),), - ) - .map_err(|err| err.to_string()) - } else { - let mut scope = rhai_service.scope.clone(); - rhai_service - .engine - .call_fn( - &mut scope, - &rhai_service.ast, - callback.fn_name(), - (shared_request.clone(),), - ) - .map_err(|err| err.to_string()) - }; + let result = + execute(&rhai_service, &callback, (shared_request.clone(),)); + if let Err(error) = result { tracing::error!("map_request callback failed: {error}"); let mut guard = shared_request.lock().unwrap(); @@ -799,26 +783,9 @@ impl ServiceStep { Ok(ControlFlow::Break(res)) } let shared_request = Shared::new(Mutex::new(Some(request))); - let result: Result = if callback.is_curried() { - callback - .call( - &rhai_service.engine, - &rhai_service.ast, - (shared_request.clone(),), - ) - .map_err(|err| err.to_string()) - } else { - let mut scope = rhai_service.scope.clone(); - rhai_service - .engine - .call_fn( - &mut scope, - &rhai_service.ast, - callback.fn_name(), - (shared_request.clone(),), - ) - .map_err(|err| err.to_string()) - }; + let result = + execute(&rhai_service, &callback, (shared_request.clone(),)); + if let Err(error) = result { tracing::error!("map_request callback failed: {error}"); let mut guard = shared_request.lock().unwrap(); @@ -897,26 +864,8 @@ impl ServiceStep { }; let shared_response = Shared::new(Mutex::new(Some(response))); - let result: Result = if callback.is_curried() { - callback - .call( - &rhai_service.engine, - &rhai_service.ast, - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - } else { - let mut scope = rhai_service.scope.clone(); - rhai_service - .engine - .call_fn( - &mut scope, - &rhai_service.ast, - callback.fn_name(), - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - }; + let result = + execute(&rhai_service, &callback, (shared_response.clone(),)); if let Err(error) = result { tracing::error!("map_response callback failed: {error}"); let mut guard = shared_response.lock().unwrap(); @@ -996,26 +945,9 @@ impl ServiceStep { .into(), }; let shared_response = Shared::new(Mutex::new(Some(response))); - let result: Result = if callback.is_curried() { - callback - .call( - &rhai_service.engine, - &rhai_service.ast, - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - } else { - let mut scope = rhai_service.scope.clone(); - rhai_service - .engine - .call_fn( - &mut scope, - &rhai_service.ast, - callback.fn_name(), - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - }; + let result = + execute(&rhai_service, &callback, (shared_response.clone(),)); + if let Err(error) = result { tracing::error!("map_response callback failed: {error}"); let mut guard = shared_response.lock().unwrap(); @@ -1104,26 +1036,11 @@ impl ServiceStep { }; let shared_response = Shared::new(Mutex::new(Some(response))); - let result: Result = if callback.is_curried() { - callback - .call( - &rhai_service.engine, - &rhai_service.ast, - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - } else { - let mut scope = rhai_service.scope.clone(); - rhai_service - .engine - .call_fn( - &mut scope, - &rhai_service.ast, - callback.fn_name(), - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - }; + let result = execute( + &rhai_service, + &callback, + (shared_response.clone(),), + ); if let Err(error) = result { tracing::error!( "map_deferred_response callback failed: {error}" @@ -1207,26 +1124,11 @@ impl ServiceStep { response: deferred_response, }; let shared_response = Shared::new(Mutex::new(Some(response))); - let result: Result = if callback.is_curried() { - callback - .call( - &rhai_service.engine, - &rhai_service.ast, - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - } else { - let mut scope = rhai_service.scope.clone(); - rhai_service - .engine - .call_fn( - &mut scope, - &rhai_service.ast, - callback.fn_name(), - (shared_response.clone(),), - ) - .map_err(|err| err.to_string()) - }; + let result = execute( + &rhai_service, + &callback, + (shared_response.clone(),), + ); if let Err(error) = result { tracing::error!( "map_deferred_response callback failed: {error}" @@ -1263,6 +1165,23 @@ impl ServiceStep { } } +fn execute( + rhai_service: &RhaiService, + callback: &FnPtr, + args: impl FuncArgs, +) -> Result { + if callback.is_curried() { + callback + .call(&rhai_service.engine, &rhai_service.ast, args) + .map_err(|err| err.to_string()) + } else { + let mut scope = rhai_service.scope.clone(); + rhai_service + .engine + .call_fn(&mut scope, &rhai_service.ast, callback.fn_name(), args) + .map_err(|err| err.to_string()) + } +} #[derive(Clone, Debug)] pub(crate) struct RhaiService { scope: Scope<'static>, From d64abb90da803ec561fcc46b0733ea5b4a4eeb36 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Tue, 16 Aug 2022 14:47:21 +0200 Subject: [PATCH 3/6] changelog --- NEXT_CHANGELOG.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 30b9eef3f8..a8d2a03613 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -350,6 +350,16 @@ See the API documentation for an example. (It can be built with `cargo doc --ope By [@SimonSapin](https://github.com/SimonSapin) +### Remove telemetry configuration hot reloading ([PR #1501](https://github.com/apollographql/router/pull/1501)) + +The `map_deferred_response` method is now available for the router service and execution +service in Rhai. When using the `@defer` directive, we get the data in a serie of graphql +responses. The first one is available with the `map_response` method, where the HTTP headers +and the response body can be modified. The following responses are available through +`map_deferred_response`, which only has access to the response body. + +By [@geal](https://github.com/geal) in https://github.com/apollographql/router/pull/1501 + ## 🐛 Fixes ## 🛠 Maintenance From 33cbf523a5b86697b9800e911039f04eccd51e66 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Thu, 18 Aug 2022 14:50:20 +0200 Subject: [PATCH 4/6] use map_response for deffered responses too --- apollo-router/src/plugins/rhai.rs | 198 ++++++------------------------ 1 file changed, 37 insertions(+), 161 deletions(-) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index e92b744a99..46a072ac96 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -324,12 +324,6 @@ mod router_plugin_mod { .service .map_response(rhai_service.clone(), callback) } - - pub(crate) fn map_deferred_response(rhai_service: &mut RhaiService, callback: FnPtr) { - rhai_service - .service - .map_deferred_response(rhai_service.clone(), callback) - } } /// Plugin which implements Rhai functionality @@ -813,7 +807,6 @@ impl ServiceStep { fn map_response(&mut self, rhai_service: RhaiService, callback: FnPtr) { match self { ServiceStep::Router(service) => { - // gen_map_response!(router, service, rhai_service, callback); service.replace(|service| { BoxService::new(service.and_then( |router_response: SupergraphResponse| async move { @@ -883,147 +876,6 @@ impl ServiceStep { response_opt.unwrap(); let (parts, body) = http::Response::from(response).into_parts(); - let response = http::Response::from_parts( - parts, - once(ready(body)).chain(rest).boxed(), - ) - .into(); - Ok(SupergraphResponse { context, response }) - }, - )) - }) - } - ServiceStep::QueryPlanner(service) => { - gen_map_response!(query_planner, service, rhai_service, callback); - } - ServiceStep::Execution(service) => { - //gen_map_response!(execution, service, rhai_service, callback); - service.replace(|service| { - service - .and_then(|execution_response: ExecutionResponse| async move { - // Let's define a local function to build an error response - // XXX: This isn't ideal. We already have a response, so ideally we'd - // like to append this error into the existing response. However, - // the significantly different treatment of errors in different - // response types makes this extremely painful. This needs to be - // re-visited at some point post GA. - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> ExecutionResponse { - ExecutionResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build() - .expect("can't fail to build our error message") - } - - // we split the response stream into headers+first response, then a stream of deferred responses - // for which we will implement mapping later - let ExecutionResponse { response, context } = execution_response; - let (parts, stream) = http::Response::from(response).into_parts(); - let (first, rest) = stream.into_future().await; - - if first.is_none() { - return Ok(failure_message( - context, - "rhai execution error: empty response".to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let response = RhaiExecutionResponse { - context, - response: http::Response::from_parts( - parts, - first.expect("already checked"), - ) - .into(), - }; - let shared_response = Shared::new(Mutex::new(Some(response))); - let result = - execute(&rhai_service, &callback, (shared_response.clone(),)); - - if let Err(error) = result { - tracing::error!("map_response callback failed: {error}"); - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - return Ok(failure_message( - response_opt.unwrap().context, - format!("rhai execution error: '{}'", error), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - - let mut guard = shared_response.lock().unwrap(); - let response_opt = guard.take(); - let RhaiExecutionResponse { context, response } = response_opt.unwrap(); - let (parts, body) = http::Response::from(response).into_parts(); - - let response = http::Response::from_parts( - parts, - once(ready(body)).chain(rest).boxed(), - ) - .into(); - Ok(ExecutionResponse { context, response }) - }) - .boxed() - }) - } - ServiceStep::Subgraph(service) => { - gen_map_response!(subgraph, service, rhai_service, callback); - } - } - } - - fn map_deferred_response(&mut self, rhai_service: RhaiService, callback: FnPtr) { - match self { - ServiceStep::Router(service) => { - service.replace(|service| { - BoxService::new(service.and_then( - |supergraph_response: SupergraphResponse| async move { - // Let's define a local function to build an error response - // XXX: This isn't ideal. We already have a response, so ideally we'd - // like to append this error into the existing response. However, - // the significantly different treatment of errors in different - // response types makes this extremely painful. This needs to be - // re-visited at some point post GA. - fn failure_message( - context: Context, - msg: String, - status: StatusCode, - ) -> SupergraphResponse { - let res = SupergraphResponse::error_builder() - .errors(vec![Error { - message: msg, - ..Default::default() - }]) - .status_code(status) - .context(context) - .build() - .expect("can't fail to build our error message"); - res - } - - // we split the response stream into headers+first response, then a stream of deferred responses - // for which we will implement mapping later - let SupergraphResponse { response, context } = supergraph_response; - let (parts, stream) = http::Response::from(response).into_parts(); - let (first, rest) = stream.into_future().await; - - if first.is_none() { - return Ok(failure_message( - context, - "rhai execution error: empty response".to_string(), - StatusCode::INTERNAL_SERVER_ERROR, - )); - } - let first = first.unwrap(); let ctx = context.clone(); let mapped_stream = rest.filter_map(move |deferred_response| { @@ -1043,9 +895,7 @@ impl ServiceStep { (shared_response.clone(),), ); if let Err(error) = result { - tracing::error!( - "map_deferred_response callback failed: {error}" - ); + tracing::error!("map_response callback failed: {error}"); return None; } @@ -1059,7 +909,7 @@ impl ServiceStep { let response = http::Response::from_parts( parts, - once(ready(first)).chain(mapped_stream).boxed(), + once(ready(body)).chain(mapped_stream).boxed(), ) .into(); Ok(SupergraphResponse { @@ -1070,8 +920,8 @@ impl ServiceStep { )) }) } - ServiceStep::QueryPlanner(_service) => { - tracing::error!("rhai execution error: map_deferred_response is not supported for query planner plugins"); + ServiceStep::QueryPlanner(service) => { + gen_map_response!(query_planner, service, rhai_service, callback); } ServiceStep::Execution(service) => { service.replace(|service| { @@ -1112,7 +962,35 @@ impl ServiceStep { StatusCode::INTERNAL_SERVER_ERROR, )); } - let first = first.unwrap(); + + let response = RhaiExecutionResponse { + context, + response: http::Response::from_parts( + parts, + first.expect("already checked"), + ) + .into(), + }; + let shared_response = Shared::new(Mutex::new(Some(response))); + let result = + execute(&rhai_service, &callback, (shared_response.clone(),)); + + if let Err(error) = result { + tracing::error!("map_response callback failed: {error}"); + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + return Ok(failure_message( + response_opt.unwrap().context, + format!("rhai execution error: '{}'", error), + StatusCode::INTERNAL_SERVER_ERROR, + )); + } + + let mut guard = shared_response.lock().unwrap(); + let response_opt = guard.take(); + let RhaiExecutionResponse { context, response } = response_opt.unwrap(); + let (parts, body) = http::Response::from(response).into_parts(); + let ctx = context.clone(); let mapped_stream = rest.filter_map(move |deferred_response| { @@ -1131,9 +1009,7 @@ impl ServiceStep { (shared_response.clone(),), ); if let Err(error) = result { - tracing::error!( - "map_deferred_response callback failed: {error}" - ); + tracing::error!("map_response callback failed: {error}"); return None; } @@ -1148,7 +1024,7 @@ impl ServiceStep { let response = http::Response::from_parts( parts, - once(ready(first)).chain(mapped_stream).boxed(), + once(ready(body)).chain(mapped_stream).boxed(), ) .into(); Ok(ExecutionResponse { @@ -1159,8 +1035,8 @@ impl ServiceStep { .boxed() }) } - ServiceStep::Subgraph(_service) => { - tracing::error!("rhai execution error: map_deferred_response is not supported for subgraph plugins"); + ServiceStep::Subgraph(service) => { + gen_map_response!(subgraph, service, rhai_service, callback); } } } From e352cbed74c6b95de5bc51ebcf6d83c80f3f003b Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Thu, 18 Aug 2022 18:01:37 +0200 Subject: [PATCH 5/6] add the headers_are_available function --- apollo-router/src/plugins/rhai.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index 46a072ac96..f8a77d320a 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -1146,6 +1146,22 @@ impl Rhai { Err(_e) => false, } }) + .register_fn( + "headers_are_available", + |_: &mut SharedMut| -> bool { true }, + ) + .register_fn( + "headers_are_available", + |_: &mut SharedMut| -> bool { false }, + ) + .register_fn( + "headers_are_available", + |_: &mut SharedMut| -> bool { true }, + ) + .register_fn( + "headers_are_available", + |_: &mut SharedMut| -> bool { false }, + ) // Register a HeaderMap indexer so we can get/set headers .register_indexer_get_result(|x: &mut HeaderMap, key: &str| { let search_name = From 22b59b067648f06094efaf7bec53aebdc560c9b6 Mon Sep 17 00:00:00 2001 From: Geoffroy Couprie Date: Fri, 19 Aug 2022 15:23:31 +0200 Subject: [PATCH 6/6] use default function implementations throwing an error this will be a better UX than calling an absent method --- apollo-router/src/plugins/rhai.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/apollo-router/src/plugins/rhai.rs b/apollo-router/src/plugins/rhai.rs index f8a77d320a..62eb8f9672 100644 --- a/apollo-router/src/plugins/rhai.rs +++ b/apollo-router/src/plugins/rhai.rs @@ -192,6 +192,13 @@ mod router_plugin_mod { Ok(obj.with_mut(|response| response.response.headers().clone())) } + #[rhai_fn(get = "headers", pure, return_raw)] + pub(crate) fn get_originating_headers_router_deferred_response( + _obj: &mut SharedMut, + ) -> Result> { + Err("cannot access headers on a deferred response".into()) + } + #[rhai_fn(get = "headers", pure, return_raw)] pub(crate) fn get_originating_headers_execution_response( obj: &mut SharedMut, @@ -199,6 +206,13 @@ mod router_plugin_mod { Ok(obj.with_mut(|response| response.response.headers().clone())) } + #[rhai_fn(get = "headers", pure, return_raw)] + pub(crate) fn get_originating_headers_execution_deferred_response( + _obj: &mut SharedMut, + ) -> Result> { + Err("cannot access headers on a deferred response".into()) + } + #[rhai_fn(get = "headers", pure, return_raw)] pub(crate) fn get_originating_headers_subgraph_response( obj: &mut SharedMut, @@ -250,6 +264,14 @@ mod router_plugin_mod { Ok(()) } + #[rhai_fn(set = "headers", return_raw)] + pub(crate) fn set_originating_headers_router_deferred_response( + _obj: &mut SharedMut, + _headers: HeaderMap, + ) -> Result<(), Box> { + Err("cannot access headers on a deferred response".into()) + } + #[rhai_fn(set = "headers", return_raw)] pub(crate) fn set_originating_headers_execution_response( obj: &mut SharedMut, @@ -259,6 +281,14 @@ mod router_plugin_mod { Ok(()) } + #[rhai_fn(set = "headers", return_raw)] + pub(crate) fn set_originating_headers_execution_deferred_response( + _obj: &mut SharedMut, + _headers: HeaderMap, + ) -> Result<(), Box> { + Err("cannot access headers on a deferred response".into()) + } + #[rhai_fn(set = "headers", return_raw)] pub(crate) fn set_originating_headers_subgraph_response( obj: &mut SharedMut,