Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More fluent GraphTraversal API #4598

Merged
merged 1 commit into from
May 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 43 additions & 19 deletions crates/turbo-tasks/src/graph/graph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::collections::HashSet;

/// A graph store is a data structure that will be built up during a graph
/// traversal. It is used to store the results of the traversal.
pub trait GraphStore<Node>: Default {
pub trait GraphStore {
type Node;
type Handle: Clone;

// TODO(alexkirsz) An `entry(from_handle) -> Entry` API would be more
Expand All @@ -13,48 +14,71 @@ pub trait GraphStore<Node>: Default {
fn insert(
&mut self,
from_handle: Option<Self::Handle>,
node: Node,
) -> Option<(Self::Handle, &Node)>;
node: GraphNode<Self::Node>,
) -> Option<(Self::Handle, &Self::Node)>;
}

/// Utility type to ensure that GraphStore::insert can only ever be called from
/// within this module, as a GraphNode can't be constructed outside of it.
#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash, Ord, PartialOrd)]
pub struct GraphNode<Node>(pub(super) Node);

impl<Node> GraphNode<Node> {
/// Consumes this `GraphNode` and returns the underlying node.
pub fn into_node(self) -> Node {
self.0
}

/// Returns a reference the underlying node.
pub fn node(&self) -> &Node {
&self.0
}

/// Returns a mutable reference the underlying node.
pub fn node_mut(&mut self) -> &mut Node {
&mut self.0
}
}

/// A [`GraphStore`] wrapper that skips nodes that have already been
/// visited. This is necessary to avoid repeated work when traversing non-tree
/// graphs (i.e. where a node can have more than one incoming edge).
#[derive(Debug)]
pub struct SkipDuplicates<StoreImpl, Node>
pub struct SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
StoreImpl: GraphStore,
{
store: StoreImpl,
visited: HashSet<Node>,
visited: HashSet<StoreImpl::Node>,
}

impl<StoreImpl, Node> Default for SkipDuplicates<StoreImpl, Node>
impl<StoreImpl> SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
StoreImpl: GraphStore,
{
fn default() -> Self {
pub fn new(store: StoreImpl) -> Self {
Self {
store: Default::default(),
store: store,
visited: Default::default(),
}
}
}

impl<StoreImpl, Node> GraphStore<Node> for SkipDuplicates<StoreImpl, Node>
impl<StoreImpl> GraphStore for SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
Node: Eq + std::hash::Hash + Clone,
StoreImpl: GraphStore,
StoreImpl::Node: Eq + std::hash::Hash + Clone,
{
type Node = StoreImpl::Node;
type Handle = StoreImpl::Handle;

fn insert(
&mut self,
from_handle: Option<Self::Handle>,
node: Node,
) -> Option<(Self::Handle, &Node)> {
if !self.visited.contains(&node) {
self.visited.insert(node.clone());
node: GraphNode<StoreImpl::Node>,
) -> Option<(Self::Handle, &StoreImpl::Node)> {
if !self.visited.contains(node.node()) {
self.visited.insert(node.node().clone());
self.store.insert(from_handle, node)
} else {
// Always insert the node into the store, even if we've already
Expand All @@ -66,9 +90,9 @@ where
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SkipDuplicates could implement skip_duplicates and return self

Copy link
Contributor Author

@alexkirsz alexkirsz May 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the way the traits are set up, this would require specialization, so I'm going to skip this for now.


impl<StoreImpl, Node> SkipDuplicates<StoreImpl, Node>
impl<StoreImpl> SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
StoreImpl: GraphStore,
{
/// Consumes the wrapper and returns the underlying store.
pub fn into_inner(self) -> StoreImpl {
Expand Down
81 changes: 51 additions & 30 deletions crates/turbo-tasks/src/graph/graph_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,54 @@ use std::{future::Future, pin::Pin};
use anyhow::Result;
use futures::{stream::FuturesUnordered, Stream};

use super::{graph_store::GraphStore, with_future::With, Visit, VisitControlFlow};
use super::{
graph_store::{GraphNode, GraphStore},
with_future::With,
SkipDuplicates, Visit, VisitControlFlow,
};

/// [`GraphTraversal`] is a utility type that can be used to traverse a graph of
/// nodes, where each node can have a variable number of outgoing edges. The
/// traversal is done in parallel, and the order of the nodes in the traversal
/// result is determined by the [`GraphStore`] parameter.
pub struct GraphTraversal<Store> {
_store: std::marker::PhantomData<Store>,
pub trait GraphTraversal: GraphStore + Sized {
fn visit<RootEdgesIt, VisitImpl, Abort, Impl>(
self,
root_edges: RootEdgesIt,
visit: VisitImpl,
) -> GraphTraversalFuture<Self, VisitImpl, Abort, Impl>
where
VisitImpl: Visit<Self::Node, Abort, Impl>,
RootEdgesIt: IntoIterator<Item = VisitImpl::Edge>;

fn skip_duplicates(self) -> SkipDuplicates<Self>;
}

impl<Store> GraphTraversal<Store> {
impl<Store> GraphTraversal for Store
where
Store: GraphStore,
{
/// Visits the graph starting from the given `roots`, and returns a future
/// that will resolve to the traversal result.
pub fn visit<Node, RootEdgesIt, VisitImpl, Abort, Impl>(
fn visit<RootEdgesIt, VisitImpl, Abort, Impl>(
mut self,
root_edges: RootEdgesIt,
mut visit: VisitImpl,
) -> GraphTraversalFuture<Node, Store, VisitImpl, Abort, Impl>
) -> GraphTraversalFuture<Self, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
VisitImpl: Visit<Self::Node, Abort, Impl>,
RootEdgesIt: IntoIterator<Item = VisitImpl::Edge>,
{
let mut store = Store::default();
let futures = FuturesUnordered::new();
for edge in root_edges {
match visit.visit(edge) {
VisitControlFlow::Continue(node) => {
if let Some((parent_handle, node_ref)) = store.insert(None, node) {
if let Some((parent_handle, node_ref)) = self.insert(None, GraphNode(node)) {
futures.push(With::new(visit.edges(node_ref), parent_handle));
}
}
VisitControlFlow::Skip(node) => {
store.insert(None, node);
self.insert(None, GraphNode(node));
}
VisitControlFlow::Abort(abort) => {
return GraphTraversalFuture {
Expand All @@ -46,42 +61,46 @@ impl<Store> GraphTraversal<Store> {
}
GraphTraversalFuture {
state: GraphTraversalState::Running(GraphTraversalRunningState {
store,
store: self,
futures,
visit,
}),
}
}

fn skip_duplicates(self) -> SkipDuplicates<Self> {
SkipDuplicates::new(self)
}
}

/// A future that resolves to a [`GraphStore`] containing the result of a graph
/// traversal.
pub struct GraphTraversalFuture<Node, Store, VisitImpl, Abort, Impl>
pub struct GraphTraversalFuture<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
state: GraphTraversalState<Node, Store, VisitImpl, Abort, Impl>,
state: GraphTraversalState<Store, VisitImpl, Abort, Impl>,
}

#[derive(Default)]
enum GraphTraversalState<Node, Store, VisitImpl, Abort, Impl>
enum GraphTraversalState<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
#[default]
Completed,
Aborted {
abort: Abort,
},
Running(GraphTraversalRunningState<Node, Store, VisitImpl, Abort, Impl>),
Running(GraphTraversalRunningState<Store, VisitImpl, Abort, Impl>),
}

struct GraphTraversalRunningState<Node, Store, VisitImpl, Abort, Impl>
struct GraphTraversalRunningState<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
store: Store,
futures: FuturesUnordered<With<VisitImpl::EdgesFuture, Store::Handle>>,
Expand All @@ -102,11 +121,10 @@ impl<Completed> GraphTraversalResult<Completed, !> {
}
}

impl<Node, Store, VisitImpl, Abort, Impl> Future
for GraphTraversalFuture<Node, Store, VisitImpl, Abort, Impl>
impl<Store, VisitImpl, Abort, Impl> Future for GraphTraversalFuture<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
type Output = GraphTraversalResult<Result<Store>, Abort>;

Expand All @@ -132,8 +150,9 @@ where
for edge in edges {
match running.visit.visit(edge) {
VisitControlFlow::Continue(node) => {
if let Some((node_handle, node_ref)) =
running.store.insert(Some(parent_handle.clone()), node)
if let Some((node_handle, node_ref)) = running
.store
.insert(Some(parent_handle.clone()), GraphNode(node))
{
running.futures.push(With::new(
running.visit.edges(node_ref),
Expand All @@ -142,7 +161,9 @@ where
}
}
VisitControlFlow::Skip(node) => {
running.store.insert(Some(parent_handle.clone()), node);
running
.store
.insert(Some(parent_handle.clone()), GraphNode(node));
}
VisitControlFlow::Abort(abort) => {
break 'outer (
Expand Down
15 changes: 11 additions & 4 deletions crates/turbo-tasks/src/graph/non_deterministic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::graph_store::GraphStore;
use super::graph_store::{GraphNode, GraphStore};

/// A graph traversal that does not guarantee any particular order, and may not
/// return the same order every time it is run.
Expand All @@ -8,19 +8,26 @@ pub struct NonDeterministic<T> {

impl<T> Default for NonDeterministic<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> NonDeterministic<T> {
pub fn new() -> Self {
Self { output: Vec::new() }
}
}

impl<T> GraphStore<T> for NonDeterministic<T> {
impl<T> GraphStore for NonDeterministic<T> {
type Node = T;
type Handle = ();

fn insert(
&mut self,
_from_handle: Option<Self::Handle>,
node: T,
node: GraphNode<T>,
) -> Option<(Self::Handle, &T)> {
self.output.push(node);
self.output.push(node.into_node());
Some(((), self.output.last().unwrap()))
}
}
Expand Down
20 changes: 15 additions & 5 deletions crates/turbo-tasks/src/graph/reverse_topological.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{HashMap, HashSet};

use super::graph_store::GraphStore;
use super::graph_store::{GraphNode, GraphStore};

/// A graph traversal that returns nodes in reverse topological order.
pub struct ReverseTopological<T>
Expand All @@ -16,20 +16,30 @@ where
T: Eq + std::hash::Hash + Clone,
{
fn default() -> Self {
Self::new()
}
}

impl<T> ReverseTopological<T>
where
T: Eq + std::hash::Hash + Clone,
{
pub fn new() -> Self {
Self {
adjacency_map: HashMap::new(),
roots: Vec::new(),
}
}
}

impl<T> GraphStore<T> for ReverseTopological<T>
impl<T> GraphStore for ReverseTopological<T>
where
T: Eq + std::hash::Hash + Clone,
{
type Node = T;
type Handle = T;

fn insert(&mut self, from_handle: Option<T>, node: T) -> Option<(Self::Handle, &T)> {
fn insert(&mut self, from_handle: Option<T>, node: GraphNode<T>) -> Option<(Self::Handle, &T)> {
let vec = if let Some(from_handle) = from_handle {
self.adjacency_map
.entry(from_handle)
Expand All @@ -38,8 +48,8 @@ where
&mut self.roots
};

vec.push(node.clone());
Some((node, vec.last().unwrap()))
vec.push(node.node().clone());
Some((node.into_node(), vec.last().unwrap()))
}
}

Expand Down
21 changes: 10 additions & 11 deletions crates/turbopack-core/src/changed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use turbo_tasks::{
graph::{GraphTraversal, NonDeterministic, SkipDuplicates},
graph::{GraphTraversal, NonDeterministic},
CompletionVc, CompletionsVc,
};

Expand All @@ -20,16 +20,15 @@ async fn get_referenced_assets(parent: AssetVc) -> Result<impl Iterator<Item = A
/// asset graph changes.
#[turbo_tasks::function]
pub async fn any_content_changed(root: AssetVc) -> Result<CompletionVc> {
let completions = GraphTraversal::<SkipDuplicates<NonDeterministic<_>, _>>::visit(
[root],
get_referenced_assets,
)
.await
.completed()?
.into_inner()
.into_iter()
.map(content_changed)
.collect();
let completions = NonDeterministic::new()
.skip_duplicates()
.visit([root], get_referenced_assets)
Comment on lines +23 to +25
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the difference in interface: no more type wrangling to create the traversal.

.await
.completed()?
.into_inner()
.into_iter()
.map(content_changed)
.collect();

Ok(CompletionsVc::cell(completions).completed())
}
Expand Down
Loading