Skip to content

Commit

Permalink
Entry point improvements
Browse files Browse the repository at this point in the history
`ApolloRouterBuilder` has been migrated to `buildstructor` for consistency with other code.
Calls to `ApolloRouterBuilder::default()` should be migrated to `ApolloRouter::builder`.
`FederatedServerHandle` has been renamed to `ApolloRouterHandle`.

Removed functionality:
* The ability to supply your own `RouterServiceFactory`. This may be added back if there is a concrete use case for it.
* `StateListener`. This made the internal state machine unnecessarily complex. `ready()` remains on `ApolloRouterHandle`.
* `ApolloRouterHandle#shutdown()` has been removed. Instead dropping `ApolloRouterHandle` will cause the router to shutdown.
  • Loading branch information
bryn committed Jun 13, 2022
1 parent cb5ae71 commit 5811493
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 277 deletions.
12 changes: 12 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ This is a breaking change since slightly invalid input might have validated befo

By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/1211

### Entry point improvements ([PR #1227](https://github.com/apollographql/router/pull/1227))
`ApolloRouterBuilder` has been migrated to `buildstructor` for consistency with other code.
Calls to `ApolloRouterBuilder::default()` should be migrated to `ApolloRouter::builder`.
`FederatedServerHandle` has been renamed to `ApolloRouterHandle`.

Removed functionality:
* The ability to supply your own `RouterServiceFactory`. This may be added back if there is a concrete use case for it.
* `StateListener`. This made the internal state machine unnecessarily complex. `ready()` remains on `ApolloRouterHandle`.
* `ApolloRouterHandle#shutdown()` has been removed. Instead dropping `ApolloRouterHandle` will cause the router to shutdown.

By [@bryncooke](https://github.com/bryncooke) in https://github.com/apollographql/router/pull/1227

## 🚀 Features
### Helm chart now has the option to use an existing Secret for API Key [PR #1196](https://github.com/apollographql/router/pull/1196)
This change allows the use an already existing Secret for the graph API Key.
Expand Down
6 changes: 2 additions & 4 deletions apollo-router/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,13 @@ pub async fn rt_main() -> Result<()> {
}
};

let server = ApolloRouter::builder()
let router = ApolloRouter::builder()
.configuration(configuration)
.schema(schema)
.shutdown(ShutdownKind::CtrlC)
.build();
let mut server_handle = server.serve();
server_handle.with_default_state_receiver().await;

if let Err(err) = server_handle.await {
if let Err(err) = router.serve().await {
tracing::error!("{}", err);
return Err(err.into());
}
Expand Down
1 change: 1 addition & 0 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![cfg_attr(feature = "failfast", allow(unreachable_code))]

extern crate core;
macro_rules! failfast_debug {
($($tokens:tt)+) => {{
tracing::debug!($($tokens)+);
Expand Down
188 changes: 51 additions & 137 deletions apollo-router/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use crate::axum_http_server_factory::AxumHttpServerFactory;
use crate::configuration::validate_configuration;
use crate::configuration::{Configuration, ListenAddr};
use crate::configuration::Configuration;
use crate::reload::Error as ReloadError;
use crate::router_factory::{RouterServiceFactory, YamlRouterServiceFactory};
use crate::state_machine::StateMachine;
use derivative::Derivative;
use derive_more::{Display, From};
use displaydoc::Display as DisplayDoc;
use futures::channel::{mpsc, oneshot};
use futures::channel::oneshot;
use futures::prelude::*;
use futures::FutureExt;
use std::fmt::{Display, Formatter};
use std::fs;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::task::spawn;
use tracing::subscriber::SetGlobalDefaultError;
use url::Url;
Expand Down Expand Up @@ -324,14 +325,13 @@ impl ShutdownKind {
///
/// ```
/// use apollo_router::prelude::*;
/// use apollo_router::ApolloRouterBuilder;
/// use apollo_router::{ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::{ApolloRouter, ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::configuration::Configuration;
///
/// async {
/// let configuration = serde_yaml::from_str::<Configuration>("Config").unwrap();
/// let schema: apollo_router::Schema = "schema".parse().unwrap();
/// let server = ApolloRouterBuilder::default()
/// let server = ApolloRouter::builder()
/// .configuration(ConfigurationKind::Instance(Box::new(configuration)))
/// .schema(SchemaKind::Instance(Box::new(schema)))
/// .shutdown(ShutdownKind::CtrlC)
Expand All @@ -343,20 +343,19 @@ impl ShutdownKind {
/// Shutdown via handle.
/// ```
/// use apollo_router::prelude::*;
/// use apollo_router::ApolloRouterBuilder;
/// use apollo_router::{ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::{ApolloRouter, ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::configuration::Configuration;
///
/// async {
/// let configuration = serde_yaml::from_str::<Configuration>("Config").unwrap();
/// let schema: apollo_router::Schema = "schema".parse().unwrap();
/// let server = ApolloRouterBuilder::default()
/// let server = ApolloRouter::builder()
/// .configuration(ConfigurationKind::Instance(Box::new(configuration)))
/// .schema(SchemaKind::Instance(Box::new(schema)))
/// .shutdown(ShutdownKind::CtrlC)
/// .build();
/// let handle = server.serve();
/// handle.shutdown().await;
/// drop(handle);
/// };
/// ```
///
Expand Down Expand Up @@ -412,121 +411,39 @@ pub(crate) enum Event {
Shutdown,
}

/// Public state that the client can be notified with via state listener
/// This is useful for waiting until the server is actually serving requests.
#[derive(Debug, PartialEq)]
pub enum State {
/// The server is starting up.
Startup,

/// The server is running on a particular address.
Running { address: ListenAddr, schema: String },

/// The server has stopped.
Stopped,

/// The server has errored.
Errored,
}

impl Display for State {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
State::Startup => write!(f, "startup"),
State::Running { .. } => write!(f, "running"),
State::Stopped => write!(f, "stopped"),
State::Errored => write!(f, "errored"),
}
}
}

/// A handle that allows the client to await for various server events.
pub struct FederatedServerHandle {
pub(crate) result: Pin<Box<dyn Future<Output = Result<(), FederatedServerError>> + Send>>,
pub(crate) shutdown_sender: oneshot::Sender<()>,
pub(crate) state_receiver: Option<mpsc::Receiver<State>>,
pub struct RouterHandle {
result: Pin<Box<dyn Future<Output = Result<(), FederatedServerError>> + Send>>,
ready: Arc<RwLock<bool>>,
shutdown_sender: Option<oneshot::Sender<()>>,
}

impl FederatedServerHandle {
/// Wait until the server is ready and return the socket address that it is listening on.
/// If the socket address has been configured to port zero the OS will choose the port.
/// The socket address returned is the actual port that was bound.
///
/// This method can only be called once, and is not designed for use in dynamic configuration
/// scenarios.
///
/// returns: Option<SocketAddr>
pub async fn ready(&mut self) -> Option<ListenAddr> {
self.state_receiver()
.map(|state| {
if let State::Running { address, .. } = state {
Some(address)
} else {
None
}
})
.filter(|socket| future::ready(socket != &None))
.map(|s| s.unwrap())
.next()
.boxed()
.await
}

/// Return a receiver of lifecycle events for the server. This method may only be called once.
///
/// returns: mspc::Receiver<State>
pub fn state_receiver(&mut self) -> mpsc::Receiver<State> {
self.state_receiver.take().expect(
"State listener has already been taken. 'ready' or 'state' may be called once only.",
)
}

/// Trigger and wait until the server has shut down.
///
/// returns: Result<(), FederatedServerError>
pub async fn shutdown(mut self) -> Result<(), FederatedServerError> {
self.maybe_close_state_receiver();
if self.shutdown_sender.send(()).is_err() {
tracing::error!("Failed to send shutdown event")
impl RouterHandle {
/// Returns the server handle (self) when the router is ready to receive requests.
/// Note that the router may not start
pub async fn ready(self) -> Result<Self, FederatedServerError> {
if *self.ready.read().await {
Ok(self)
} else {
Err(FederatedServerError::StartupError)
}
self.result.await
}
}

/// If the state receiver has not been set it must be closed otherwise it'll block the
/// state machine from progressing.
fn maybe_close_state_receiver(&mut self) {
if let Some(mut state_receiver) = self.state_receiver.take() {
state_receiver.close();
}
}

/// State receiver that prints out basic lifecycle events.
pub async fn with_default_state_receiver(&mut self) {
self.state_receiver()
.for_each(|state| {
match state {
State::Startup => {
tracing::info!("starting Apollo Router")
}
State::Running { .. } => {}
State::Stopped => {
tracing::info!("stopped")
}
State::Errored => {
tracing::info!("stopped with error")
}
}
future::ready(())
})
.await
impl Drop for RouterHandle {
fn drop(&mut self) {
let _ = self
.shutdown_sender
.take()
.expect("shutdown sender mut be present")
.send(());
}
}

impl Future for FederatedServerHandle {
impl Future for RouterHandle {
type Output = Result<(), FederatedServerError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.maybe_close_state_receiver();
self.result.poll_unpin(cx)
}
}
Expand All @@ -537,14 +454,11 @@ where
{
/// Start the federated server on a separate thread.
///
/// The returned handle allows the user to await until the server is ready and shutdown.
/// Alternatively the user can await on the server handle itself to wait for shutdown via the
/// configured shutdown mechanism.
/// Dropping the server handle will shutdown the server.
///
/// returns: FederatedServerHandle
/// returns: RouterHandle
///
pub fn serve(self) -> FederatedServerHandle {
let (state_listener, state_receiver) = mpsc::channel::<State>(1);
pub fn serve(self) -> RouterHandle {
let server_factory = AxumHttpServerFactory::new();
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let event_stream = Self::generate_event_stream(
Expand All @@ -554,8 +468,8 @@ where
shutdown_receiver,
);

let state_machine =
StateMachine::new(server_factory, Some(state_listener), self.router_factory);
let state_machine = StateMachine::new(server_factory, self.router_factory);
let ready = state_machine.ready.clone();
let result = spawn(async move { state_machine.process_events(event_stream).await })
.map(|r| match r {
Ok(Ok(ok)) => Ok(ok),
Expand All @@ -564,10 +478,10 @@ where
})
.boxed();

FederatedServerHandle {
RouterHandle {
result,
shutdown_sender,
state_receiver: Some(state_receiver),
shutdown_sender: Some(shutdown_sender),
ready,
}
}

Expand Down Expand Up @@ -603,7 +517,7 @@ mod tests {
use std::env::temp_dir;
use test_log::test;

fn init_with_server() -> FederatedServerHandle {
fn init_with_server() -> RouterHandle {
let configuration =
serde_yaml::from_str::<Configuration>(include_str!("testdata/supergraph_config.yaml"))
.unwrap();
Expand All @@ -615,28 +529,28 @@ mod tests {
.serve()
}

#[test(tokio::test)]
#[tokio::test(flavor = "multi_thread")]
async fn basic_request() {
let mut server_handle = init_with_server();
let listen_addr = server_handle.ready().await.expect("Server never ready");
assert_federated_response(&listen_addr, r#"{ topProducts { name } }"#).await;
server_handle.shutdown().await.expect("Could not shutdown");
let router_handle = init_with_server()
.ready()
.await
.expect("router failed to start");
assert_federated_response(r#"{ topProducts { name } }"#).await;
drop(router_handle);
}

async fn assert_federated_response(listen_addr: &ListenAddr, request: &str) {
async fn assert_federated_response(request: &str) {
let request = Request::builder().query(request).build();
let expected = query(listen_addr, &request).await.unwrap();
let expected = query(&request).await.unwrap();

let response = to_string_pretty(&expected).unwrap();
assert!(!response.is_empty());
}

async fn query(
listen_addr: &ListenAddr,
request: &crate::Request,
) -> Result<crate::Response, crate::FetchError> {
async fn query(request: &crate::Request) -> Result<crate::Response, crate::FetchError> {
Ok(reqwest::Client::new()
.post(format!("{}/", listen_addr))
.post("http://127.0.0.1:4000")
.timeout(Duration::from_secs(10))
.json(request)
.send()
.await
Expand Down
Loading

0 comments on commit 5811493

Please sign in to comment.