Skip to content

Commit

Permalink
add subgraph watcher handle tests (#2172)
Browse files Browse the repository at this point in the history
Add tests for `SubgraphWatcher` handle events.
  • Loading branch information
loshz authored Sep 24, 2024
1 parent 7d1628e commit 64b0272
Showing 1 changed file with 49 additions and 11 deletions.
60 changes: 49 additions & 11 deletions src/composition/watchers/watcher/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ pub enum SubgraphWatcherKind {
/// Don't ever update, schema is only pulled once.
// TODO: figure out what to do with this; is it ever used? can we remove it?
_Once(String),

/// Event specifically used for testing watch handlers.
#[cfg(test)]
TestWatcher,
}

impl TryFrom<SchemaSource> for SubgraphWatcher {
Expand All @@ -51,13 +55,9 @@ impl TryFrom<SchemaSource> for SubgraphWatcher {
// directives from introspection (but not when the source is a file)
fn try_from(schema_source: SchemaSource) -> Result<Self, Self::Error> {
match schema_source {
SchemaSource::File { file } => {
println!("wtf?");

Ok(Self {
watcher: SubgraphWatcherKind::File(FileWatcher::new(file)),
})
}
SchemaSource::File { file } => Ok(Self {
watcher: SubgraphWatcherKind::File(FileWatcher::new(file)),
}),
SchemaSource::SubgraphIntrospection {
subgraph_url,
introspection_headers,
Expand All @@ -74,16 +74,27 @@ impl TryFrom<SchemaSource> for SubgraphWatcher {
}

impl SubgraphWatcherKind {
/// Watch the subgraph for changes based on the kind of watcher attached
/// Watch a subgraph for changes based on the kind of watcher attached.
///
/// Development note: this is a stream of Strings, but in the future we might want something
/// more flexible to get type safety
/// more flexible to get type safety.
async fn watch(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
match self {
Self::File(file_watcher) => file_watcher.clone().watch(),
Self::Introspect(introspection) => introspection.watch(),
// TODO: figure out what this is; sdl? stdin one-off? either way, probs not watching
Self::_Once(_) => unimplemented!(),

// Create a new single buffered channel for testing watch events.
#[cfg(test)]
Self::TestWatcher => {
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;

let (tx, rx) = channel(1);
tx.send("watch event".to_string()).await.unwrap();
ReceiverStream::new(rx).boxed()
}
}
}
}
Expand Down Expand Up @@ -148,7 +159,8 @@ impl SubgraphIntrospection {
}
}

/// A unit struct denoting a change to a subgraph, used by composition to know whether to recompose
/// A unit struct denoting a change to a subgraph, used by composition to know whether to
/// recompose.
pub struct SubgraphChanged;

impl SubtaskHandleUnit for SubgraphWatcher {
Expand All @@ -157,7 +169,7 @@ impl SubtaskHandleUnit for SubgraphWatcher {
fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle {
tokio::spawn(async move {
let mut watcher = self.watcher.watch().await;
while let Some(_change) = watcher.next().await {
while watcher.next().await.is_some() {
let _ = sender
.send(SubgraphChanged)
.tap_err(|err| tracing::error!("{:?}", err));
Expand All @@ -166,3 +178,29 @@ impl SubtaskHandleUnit for SubgraphWatcher {
.abort_handle()
}
}

#[cfg(test)]
mod tests {
use futures::StreamExt;

use crate::composition::watchers::subtask::{Subtask, SubtaskRunUnit};

use super::{SubgraphChanged, SubgraphWatcher, SubgraphWatcherKind};

#[tokio::test]
async fn test_subgraphwatcher_handle() {
let watch_handler = SubgraphWatcher {
watcher: SubgraphWatcherKind::TestWatcher,
};

let (mut watch_messages, watch_subtask) = Subtask::new(watch_handler);
let abort_handle = watch_subtask.run();

assert!(matches!(
watch_messages.next().await.unwrap(),
SubgraphChanged
));

abort_handle.abort();
}
}

0 comments on commit 64b0272

Please sign in to comment.