Skip to content

Commit

Permalink
document composition/runner (#2181)
Browse files Browse the repository at this point in the history
  • Loading branch information
loshz committed Sep 27, 2024
1 parent b474c05 commit 276ca5c
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 4 deletions.
62 changes: 59 additions & 3 deletions src/composition/runner.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,30 @@
//! A [`Runner`] provides methods for configuring and handling background tasks for producing
//! composition events based of supergraph config changes.
//!
//! ```rust,ignore
//! use apollo_federation_types::config::SupergraphConfig;
//! use tokio_stream::wrappers::UnboundedReceiverStream;
//!
//! use crate::composition::{
//! events::CompositionEvent,
//! runner::Runner,
//! supergraph::binary::SupergraphBinary,
//! };
//!
//! let supergraph_config = SupergraphConfig::new();
//! let supergraph_binary = SupergraphBinary::new();
//!
//! let runner = Runner::new(supergraph_config, supergraph_binary);
//! let stream = runner.run().await.unwrap();
//! while let Some(event) = stream.next().await {
//! match event {
//! CompositionEvent::Started => println!("composition started"),
//! CompositionEvent::Success(_) => println!("composition success"),
//! CompositionEvent::Error(_) => println!("composition serror"),
//! }
//! }
//! ```
#![warn(missing_docs)]
use std::collections::HashMap;

use apollo_federation_types::config::SupergraphConfig;
Expand Down Expand Up @@ -28,13 +55,16 @@ use super::{
},
};

/// A struct for configuring and running subtasks for watching for both supergraph and subgraph
/// change events.
// TODO: handle retry flag for subgraphs (see rover dev help)
pub struct Runner {
supergraph_config: FinalSupergraphConfig,
supergraph_binary: SupergraphBinary,
}

impl Runner {
/// Produces a new Runner from a supergraph config and binary.
pub fn new(
supergraph_config: FinalSupergraphConfig,
supergraph_binary: SupergraphBinary,
Expand All @@ -45,7 +75,11 @@ impl Runner {
}
}

/// Start subtask watchers for both supergraph and subgraph configs, sending composition events on
/// the returned stream.
pub async fn run(self) -> RoverResult<UnboundedReceiverStream<CompositionEvent>> {
// Attempt to get a supergraph config stream and file based watcher subtask for receiving
// change events.
let (supergraph_config_stream, supergraph_config_subtask) =
match self.supergraph_config_subtask() {
Some((supergraph_diff_stream, supergraph_config_subtask)) => (
Expand All @@ -55,20 +89,30 @@ impl Runner {
None => (empty().boxed(), None),
};

// Construct watchers based on subgraph definitions in the given supergraph config.
let subgraph_config_watchers = SubgraphWatchers::new(self.supergraph_config.clone().into());
// Create a new subtask to handle events from the given subgraph watchers, receiving
// messages on the returned stream.
let (subgraph_changed_messages, subgraph_config_watchers_subtask) =
Subtask::new(subgraph_config_watchers);

// Create a handler for supergraph composition events.
let composition_handler = RunComposition::builder()
.supergraph_config(self.supergraph_config)
.supergraph_binary(self.supergraph_binary)
.exec_command(TokioCommand::default())
.read_file(FsReadFile::default())
.build();
let (composition_messages, composition_subtask) = Subtask::new(composition_handler);

// Create a new subtask for the composition handler, passing in a stream of subgraph change
// events in order to trigger recomposition.
let (composition_messages, composition_subtask) = Subtask::new(composition_handler);
composition_subtask.run(subgraph_changed_messages.boxed());

// Start subgraph watchers, listening for events from the supergraph change stream.
subgraph_config_watchers_subtask.run(supergraph_config_stream);

// Start the supergraph watcher subtask.
if let Some(supergraph_config_subtask) = supergraph_config_subtask {
supergraph_config_subtask.run();
}
Expand All @@ -84,6 +128,10 @@ impl Runner {
)> {
let supergraph_config: SupergraphConfig = self.supergraph_config.clone().into();

// If the supergraph config was passed as a file, we can configure a watcher for change
// events.
// We could return None here if we received a supergraph config directly from stdin. In
// that case, we don't want to configure a watcher.
if let Some(origin_path) = self.supergraph_config.origin_path() {
let f = FileWatcher::new(origin_path.clone());
let watcher = SupergraphConfigWatcher::new(f, supergraph_config.clone());
Expand All @@ -105,6 +153,7 @@ struct SubgraphWatchers {
}

impl SubgraphWatchers {
/// Create a set of watchers from the subgraph definitions of a supergraph config.
pub fn new(supergraph_config: SupergraphConfig) -> SubgraphWatchers {
let watchers = supergraph_config
.into_iter()
Expand All @@ -130,6 +179,11 @@ impl SubtaskHandleStream for SubgraphWatchers {
) -> AbortHandle {
tokio::task::spawn(async move {
let mut abort_handles: HashMap<String, (AbortHandle, AbortHandle)> = HashMap::new();
// Start a background task for each of the subtask watchers that listens for change
// events and send each event to the parent sender to be consumed by the composition
// handler.
// We also collect the abort handles for each background task in order to gracefully
// shut down.
for (subgraph_name, (mut messages, subtask)) in self.watchers.into_iter() {
let sender = sender.clone();
let messages_abort_handle = tokio::task::spawn(async move {
Expand All @@ -144,9 +198,10 @@ impl SubtaskHandleStream for SubgraphWatchers {
abort_handles.insert(subgraph_name, (messages_abort_handle, subtask_abort_handle));
}

// for supergraph diff events
// Wait for supergraph diff events received from the input stream.
while let Some(diff) = input.next().await {
// for new subgraphs added to the session
// If we detect additional diffs, start a new subgraph subtask.
// Adding the abort handle to the currentl collection of handles.
for (name, subgraph_config) in diff.added() {
if let Ok((mut messages, subtask)) =
SubgraphWatcher::try_from(subgraph_config.schema.clone())
Expand Down Expand Up @@ -174,6 +229,7 @@ impl SubtaskHandleStream for SubgraphWatchers {
);
}
}
// If we detect removal diffs, stop the subtask for the removed subgraph.
for name in diff.removed() {
if let Some((messages_abort_handle, subtask_abort_handle)) =
abort_handles.get(name)
Expand Down
2 changes: 1 addition & 1 deletion src/composition/watchers/subtask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use tokio::{
};
use tokio_stream::wrappers::UnboundedReceiverStream;

/// A trait whose implementation will be able send events
/// A trait whose implementation will be able to send events
pub trait SubtaskHandleUnit {
type Output;
fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle;
Expand Down

0 comments on commit 276ca5c

Please sign in to comment.