Skip to content

Commit

Permalink
Entry point improvements
Browse files Browse the repository at this point in the history
ApolloRouterBuilder has been migrated to buildstructor for consistency with other code.
Calls to `ApolloRouterBuilder::default()` should be migrated to `ApolloRouter::builder`.
`FederatedServerHandle` has been renamed to `ApolloRouterHandle`.

Removed functionality:
* The ability to supply your own RouterServiceFactory. This was only meant to be used as part of testing.
* StateListener. This made the internal state machine unnecessarily complex. `ready()` remains on `ApolloRouterHandle`.
* `ApolloRouterHandle#shutdown()` has been removed. Instead dropping `ApolloRouterHandle` will cause the router to shutdown.
  • Loading branch information
bryn committed Jun 13, 2022
1 parent cb5ae71 commit 63d3ce3
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 277 deletions.
12 changes: 12 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ This is a breaking change since slightly invalid input might have validated befo

By [@o0Ignition0o](https://github.com/o0Ignition0o) in https://github.com/apollographql/router/pull/1211

### Entry point improvements ([PR #1227](https://github.com/apollographql/router/pull/1227))
ApolloRouterBuilder has been migrated to buildstructor for consistency with other code.
Calls to `ApolloRouterBuilder::default()` should be migrated to `ApolloRouter::builder`.
`FederatedServerHandle` has been renamed to `ApolloRouterHandle`.

Removed functionality:
* The ability to supply your own RouterServiceFactory. This was only meant to be used as part of testing.
* StateListener. This made the internal state machine unnecessarily complex. `ready()` remains on `ApolloRouterHandle`.
* `ApolloRouterHandle#shutdown()` has been removed. Instead dropping `ApolloRouterHandle` will cause the router to shutdown.

By [@bryncooke](https://github.com/bryncooke) in https://github.com/apollographql/router/pull/1227

## 🚀 Features
### Helm chart now has the option to use an existing Secret for API Key [PR #1196](https://github.com/apollographql/router/pull/1196)
This change allows the use an already existing Secret for the graph API Key.
Expand Down
6 changes: 2 additions & 4 deletions apollo-router/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,15 +265,13 @@ pub async fn rt_main() -> Result<()> {
}
};

let server = ApolloRouter::builder()
let router = ApolloRouter::builder()
.configuration(configuration)
.schema(schema)
.shutdown(ShutdownKind::CtrlC)
.build();
let mut server_handle = server.serve();
server_handle.with_default_state_receiver().await;

if let Err(err) = server_handle.await {
if let Err(err) = router.serve().await {
tracing::error!("{}", err);
return Err(err.into());
}
Expand Down
1 change: 1 addition & 0 deletions apollo-router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![cfg_attr(feature = "failfast", allow(unreachable_code))]

extern crate core;
macro_rules! failfast_debug {
($($tokens:tt)+) => {{
tracing::debug!($($tokens)+);
Expand Down
188 changes: 51 additions & 137 deletions apollo-router/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use crate::axum_http_server_factory::AxumHttpServerFactory;
use crate::configuration::validate_configuration;
use crate::configuration::{Configuration, ListenAddr};
use crate::configuration::Configuration;
use crate::reload::Error as ReloadError;
use crate::router_factory::{RouterServiceFactory, YamlRouterServiceFactory};
use crate::state_machine::StateMachine;
use derivative::Derivative;
use derive_more::{Display, From};
use displaydoc::Display as DisplayDoc;
use futures::channel::{mpsc, oneshot};
use futures::channel::oneshot;
use futures::prelude::*;
use futures::FutureExt;
use std::fmt::{Display, Formatter};
use std::fs;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::sync::RwLock;
use tokio::task::spawn;
use tracing::subscriber::SetGlobalDefaultError;
use url::Url;
Expand Down Expand Up @@ -324,14 +325,13 @@ impl ShutdownKind {
///
/// ```
/// use apollo_router::prelude::*;
/// use apollo_router::ApolloRouterBuilder;
/// use apollo_router::{ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::{ApolloRouter, ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::configuration::Configuration;
///
/// async {
/// let configuration = serde_yaml::from_str::<Configuration>("Config").unwrap();
/// let schema: apollo_router::Schema = "schema".parse().unwrap();
/// let server = ApolloRouterBuilder::default()
/// let server = ApolloRouter::builder()
/// .configuration(ConfigurationKind::Instance(Box::new(configuration)))
/// .schema(SchemaKind::Instance(Box::new(schema)))
/// .shutdown(ShutdownKind::CtrlC)
Expand All @@ -343,20 +343,19 @@ impl ShutdownKind {
/// Shutdown via handle.
/// ```
/// use apollo_router::prelude::*;
/// use apollo_router::ApolloRouterBuilder;
/// use apollo_router::{ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::{ApolloRouter, ConfigurationKind, SchemaKind, ShutdownKind};
/// use apollo_router::configuration::Configuration;
///
/// async {
/// let configuration = serde_yaml::from_str::<Configuration>("Config").unwrap();
/// let schema: apollo_router::Schema = "schema".parse().unwrap();
/// let server = ApolloRouterBuilder::default()
/// let server = ApolloRouter::builder()
/// .configuration(ConfigurationKind::Instance(Box::new(configuration)))
/// .schema(SchemaKind::Instance(Box::new(schema)))
/// .shutdown(ShutdownKind::CtrlC)
/// .build();
/// let handle = server.serve();
/// handle.shutdown().await;
/// drop(handle);
/// };
/// ```
///
Expand Down Expand Up @@ -412,121 +411,39 @@ pub(crate) enum Event {
Shutdown,
}

/// Public state that the client can be notified with via state listener
/// This is useful for waiting until the server is actually serving requests.
#[derive(Debug, PartialEq)]
pub enum State {
/// The server is starting up.
Startup,

/// The server is running on a particular address.
Running { address: ListenAddr, schema: String },

/// The server has stopped.
Stopped,

/// The server has errored.
Errored,
}

impl Display for State {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
State::Startup => write!(f, "startup"),
State::Running { .. } => write!(f, "running"),
State::Stopped => write!(f, "stopped"),
State::Errored => write!(f, "errored"),
}
}
}

/// A handle that allows the client to await for various server events.
pub struct FederatedServerHandle {
pub(crate) result: Pin<Box<dyn Future<Output = Result<(), FederatedServerError>> + Send>>,
pub(crate) shutdown_sender: oneshot::Sender<()>,
pub(crate) state_receiver: Option<mpsc::Receiver<State>>,
pub struct RouterHandle {
result: Pin<Box<dyn Future<Output = Result<(), FederatedServerError>> + Send>>,
ready: Arc<RwLock<bool>>,
shutdown_sender: Option<oneshot::Sender<()>>,
}

impl FederatedServerHandle {
/// Wait until the server is ready and return the socket address that it is listening on.
/// If the socket address has been configured to port zero the OS will choose the port.
/// The socket address returned is the actual port that was bound.
///
/// This method can only be called once, and is not designed for use in dynamic configuration
/// scenarios.
///
/// returns: Option<SocketAddr>
pub async fn ready(&mut self) -> Option<ListenAddr> {
self.state_receiver()
.map(|state| {
if let State::Running { address, .. } = state {
Some(address)
} else {
None
}
})
.filter(|socket| future::ready(socket != &None))
.map(|s| s.unwrap())
.next()
.boxed()
.await
}

/// Return a receiver of lifecycle events for the server. This method may only be called once.
///
/// returns: mspc::Receiver<State>
pub fn state_receiver(&mut self) -> mpsc::Receiver<State> {
self.state_receiver.take().expect(
"State listener has already been taken. 'ready' or 'state' may be called once only.",
)
}

/// Trigger and wait until the server has shut down.
///
/// returns: Result<(), FederatedServerError>
pub async fn shutdown(mut self) -> Result<(), FederatedServerError> {
self.maybe_close_state_receiver();
if self.shutdown_sender.send(()).is_err() {
tracing::error!("Failed to send shutdown event")
impl RouterHandle {
/// Returns the server handle (self) when the router is ready to receive requests.
/// Note that the router may not start
pub async fn ready(self) -> Result<Self, FederatedServerError> {
if *self.ready.read().await {
Ok(self)
} else {
Err(FederatedServerError::StartupError)
}
self.result.await
}
}

/// If the state receiver has not been set it must be closed otherwise it'll block the
/// state machine from progressing.
fn maybe_close_state_receiver(&mut self) {
if let Some(mut state_receiver) = self.state_receiver.take() {
state_receiver.close();
}
}

/// State receiver that prints out basic lifecycle events.
pub async fn with_default_state_receiver(&mut self) {
self.state_receiver()
.for_each(|state| {
match state {
State::Startup => {
tracing::info!("starting Apollo Router")
}
State::Running { .. } => {}
State::Stopped => {
tracing::info!("stopped")
}
State::Errored => {
tracing::info!("stopped with error")
}
}
future::ready(())
})
.await
impl Drop for RouterHandle {
fn drop(&mut self) {
let _ = self
.shutdown_sender
.take()
.expect("shutdown sender mut be present")
.send(());
}
}

impl Future for FederatedServerHandle {
impl Future for RouterHandle {
type Output = Result<(), FederatedServerError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.maybe_close_state_receiver();
self.result.poll_unpin(cx)
}
}
Expand All @@ -537,14 +454,11 @@ where
{
/// Start the federated server on a separate thread.
///
/// The returned handle allows the user to await until the server is ready and shutdown.
/// Alternatively the user can await on the server handle itself to wait for shutdown via the
/// configured shutdown mechanism.
/// Dropping the server handle will shutdown the server.
///
/// returns: FederatedServerHandle
/// returns: RouterHandle
///
pub fn serve(self) -> FederatedServerHandle {
let (state_listener, state_receiver) = mpsc::channel::<State>(1);
pub fn serve(self) -> RouterHandle {
let server_factory = AxumHttpServerFactory::new();
let (shutdown_sender, shutdown_receiver) = oneshot::channel::<()>();
let event_stream = Self::generate_event_stream(
Expand All @@ -554,8 +468,8 @@ where
shutdown_receiver,
);

let state_machine =
StateMachine::new(server_factory, Some(state_listener), self.router_factory);
let state_machine = StateMachine::new(server_factory, self.router_factory);
let ready = state_machine.ready.clone();
let result = spawn(async move { state_machine.process_events(event_stream).await })
.map(|r| match r {
Ok(Ok(ok)) => Ok(ok),
Expand All @@ -564,10 +478,10 @@ where
})
.boxed();

FederatedServerHandle {
RouterHandle {
result,
shutdown_sender,
state_receiver: Some(state_receiver),
shutdown_sender: Some(shutdown_sender),
ready,
}
}

Expand Down Expand Up @@ -603,7 +517,7 @@ mod tests {
use std::env::temp_dir;
use test_log::test;

fn init_with_server() -> FederatedServerHandle {
fn init_with_server() -> RouterHandle {
let configuration =
serde_yaml::from_str::<Configuration>(include_str!("testdata/supergraph_config.yaml"))
.unwrap();
Expand All @@ -615,28 +529,28 @@ mod tests {
.serve()
}

#[test(tokio::test)]
#[tokio::test(flavor = "multi_thread")]
async fn basic_request() {
let mut server_handle = init_with_server();
let listen_addr = server_handle.ready().await.expect("Server never ready");
assert_federated_response(&listen_addr, r#"{ topProducts { name } }"#).await;
server_handle.shutdown().await.expect("Could not shutdown");
let router_handle = init_with_server()
.ready()
.await
.expect("router failed to start");
assert_federated_response(r#"{ topProducts { name } }"#).await;
drop(router_handle);
}

async fn assert_federated_response(listen_addr: &ListenAddr, request: &str) {
async fn assert_federated_response(request: &str) {
let request = Request::builder().query(request).build();
let expected = query(listen_addr, &request).await.unwrap();
let expected = query(&request).await.unwrap();

let response = to_string_pretty(&expected).unwrap();
assert!(!response.is_empty());
}

async fn query(
listen_addr: &ListenAddr,
request: &crate::Request,
) -> Result<crate::Response, crate::FetchError> {
async fn query(request: &crate::Request) -> Result<crate::Response, crate::FetchError> {
Ok(reqwest::Client::new()
.post(format!("{}/", listen_addr))
.post("http://127.0.0.1:4000")
.timeout(Duration::from_secs(10))
.json(request)
.send()
.await
Expand Down
Loading

0 comments on commit 63d3ce3

Please sign in to comment.