Skip to content

Commit

Permalink
Graph refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alexkirsz committed May 1, 2023
1 parent 9d4faa7 commit b9cfb3b
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 105 deletions.
62 changes: 43 additions & 19 deletions crates/turbo-tasks/src/graph/graph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use std::collections::HashSet;

/// A graph store is a data structure that will be built up during a graph
/// traversal. It is used to store the results of the traversal.
pub trait GraphStore<Node>: Default {
pub trait GraphStore {
type Node;
type Handle: Clone;

// TODO(alexkirsz) An `entry(from_handle) -> Entry` API would be more
Expand All @@ -13,48 +14,71 @@ pub trait GraphStore<Node>: Default {
fn insert(
&mut self,
from_handle: Option<Self::Handle>,
node: Node,
) -> Option<(Self::Handle, &Node)>;
node: GraphNode<Self::Node>,
) -> Option<(Self::Handle, &Self::Node)>;
}

/// Utility type to ensure that GraphStore::insert can only ever be called from
/// within this module, as a GraphNode can't be constructed outside of it.
#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash, Ord, PartialOrd)]
pub struct GraphNode<Node>(pub(super) Node);

impl<Node> GraphNode<Node> {
/// Consumes this `GraphNode` and returns the underlying node.
pub fn into_node(self) -> Node {
self.0
}

/// Returns a reference the underlying node.
pub fn node(&self) -> &Node {
&self.0
}

/// Returns a mutable reference the underlying node.
pub fn node_mut(&mut self) -> &mut Node {
&mut self.0
}
}

/// A [`GraphStore`] wrapper that skips nodes that have already been
/// visited. This is necessary to avoid repeated work when traversing non-tree
/// graphs (i.e. where a node can have more than one incoming edge).
#[derive(Debug)]
pub struct SkipDuplicates<StoreImpl, Node>
pub struct SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
StoreImpl: GraphStore,
{
store: StoreImpl,
visited: HashSet<Node>,
visited: HashSet<StoreImpl::Node>,
}

impl<StoreImpl, Node> Default for SkipDuplicates<StoreImpl, Node>
impl<StoreImpl> SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
StoreImpl: GraphStore,
{
fn default() -> Self {
pub fn new(store: StoreImpl) -> Self {
Self {
store: Default::default(),
store: store,
visited: Default::default(),
}
}
}

impl<StoreImpl, Node> GraphStore<Node> for SkipDuplicates<StoreImpl, Node>
impl<StoreImpl> GraphStore for SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
Node: Eq + std::hash::Hash + Clone,
StoreImpl: GraphStore,
StoreImpl::Node: Eq + std::hash::Hash + Clone,
{
type Node = StoreImpl::Node;
type Handle = StoreImpl::Handle;

fn insert(
&mut self,
from_handle: Option<Self::Handle>,
node: Node,
) -> Option<(Self::Handle, &Node)> {
if !self.visited.contains(&node) {
self.visited.insert(node.clone());
node: GraphNode<StoreImpl::Node>,
) -> Option<(Self::Handle, &StoreImpl::Node)> {
if !self.visited.contains(node.node()) {
self.visited.insert(node.node().clone());
self.store.insert(from_handle, node)
} else {
// Always insert the node into the store, even if we've already
Expand All @@ -66,9 +90,9 @@ where
}
}

impl<StoreImpl, Node> SkipDuplicates<StoreImpl, Node>
impl<StoreImpl> SkipDuplicates<StoreImpl>
where
StoreImpl: GraphStore<Node>,
StoreImpl: GraphStore,
{
/// Consumes the wrapper and returns the underlying store.
pub fn into_inner(self) -> StoreImpl {
Expand Down
81 changes: 51 additions & 30 deletions crates/turbo-tasks/src/graph/graph_traversal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,54 @@ use std::{future::Future, pin::Pin};
use anyhow::Result;
use futures::{stream::FuturesUnordered, Stream};

use super::{graph_store::GraphStore, with_future::With, Visit, VisitControlFlow};
use super::{
graph_store::{GraphNode, GraphStore},
with_future::With,
SkipDuplicates, Visit, VisitControlFlow,
};

/// [`GraphTraversal`] is a utility type that can be used to traverse a graph of
/// nodes, where each node can have a variable number of outgoing edges. The
/// traversal is done in parallel, and the order of the nodes in the traversal
/// result is determined by the [`GraphStore`] parameter.
pub struct GraphTraversal<Store> {
_store: std::marker::PhantomData<Store>,
pub trait GraphTraversal: GraphStore + Sized {
fn visit<RootEdgesIt, VisitImpl, Abort, Impl>(
self,
root_edges: RootEdgesIt,
visit: VisitImpl,
) -> GraphTraversalFuture<Self, VisitImpl, Abort, Impl>
where
VisitImpl: Visit<Self::Node, Abort, Impl>,
RootEdgesIt: IntoIterator<Item = VisitImpl::Edge>;

fn skip_duplicates(self) -> SkipDuplicates<Self>;
}

impl<Store> GraphTraversal<Store> {
impl<Store> GraphTraversal for Store
where
Store: GraphStore,
{
/// Visits the graph starting from the given `roots`, and returns a future
/// that will resolve to the traversal result.
pub fn visit<Node, RootEdgesIt, VisitImpl, Abort, Impl>(
fn visit<RootEdgesIt, VisitImpl, Abort, Impl>(
mut self,
root_edges: RootEdgesIt,
mut visit: VisitImpl,
) -> GraphTraversalFuture<Node, Store, VisitImpl, Abort, Impl>
) -> GraphTraversalFuture<Self, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
VisitImpl: Visit<Self::Node, Abort, Impl>,
RootEdgesIt: IntoIterator<Item = VisitImpl::Edge>,
{
let mut store = Store::default();
let futures = FuturesUnordered::new();
for edge in root_edges {
match visit.visit(edge) {
VisitControlFlow::Continue(node) => {
if let Some((parent_handle, node_ref)) = store.insert(None, node) {
if let Some((parent_handle, node_ref)) = self.insert(None, GraphNode(node)) {
futures.push(With::new(visit.edges(node_ref), parent_handle));
}
}
VisitControlFlow::Skip(node) => {
store.insert(None, node);
self.insert(None, GraphNode(node));
}
VisitControlFlow::Abort(abort) => {
return GraphTraversalFuture {
Expand All @@ -46,42 +61,46 @@ impl<Store> GraphTraversal<Store> {
}
GraphTraversalFuture {
state: GraphTraversalState::Running(GraphTraversalRunningState {
store,
store: self,
futures,
visit,
}),
}
}

fn skip_duplicates(self) -> SkipDuplicates<Self> {
SkipDuplicates::new(self)
}
}

/// A future that resolves to a [`GraphStore`] containing the result of a graph
/// traversal.
pub struct GraphTraversalFuture<Node, Store, VisitImpl, Abort, Impl>
pub struct GraphTraversalFuture<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
state: GraphTraversalState<Node, Store, VisitImpl, Abort, Impl>,
state: GraphTraversalState<Store, VisitImpl, Abort, Impl>,
}

#[derive(Default)]
enum GraphTraversalState<Node, Store, VisitImpl, Abort, Impl>
enum GraphTraversalState<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
#[default]
Completed,
Aborted {
abort: Abort,
},
Running(GraphTraversalRunningState<Node, Store, VisitImpl, Abort, Impl>),
Running(GraphTraversalRunningState<Store, VisitImpl, Abort, Impl>),
}

struct GraphTraversalRunningState<Node, Store, VisitImpl, Abort, Impl>
struct GraphTraversalRunningState<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
store: Store,
futures: FuturesUnordered<With<VisitImpl::EdgesFuture, Store::Handle>>,
Expand All @@ -102,11 +121,10 @@ impl<Completed> GraphTraversalResult<Completed, !> {
}
}

impl<Node, Store, VisitImpl, Abort, Impl> Future
for GraphTraversalFuture<Node, Store, VisitImpl, Abort, Impl>
impl<Store, VisitImpl, Abort, Impl> Future for GraphTraversalFuture<Store, VisitImpl, Abort, Impl>
where
Store: GraphStore<Node>,
VisitImpl: Visit<Node, Abort, Impl>,
Store: GraphStore,
VisitImpl: Visit<Store::Node, Abort, Impl>,
{
type Output = GraphTraversalResult<Result<Store>, Abort>;

Expand All @@ -132,8 +150,9 @@ where
for edge in edges {
match running.visit.visit(edge) {
VisitControlFlow::Continue(node) => {
if let Some((node_handle, node_ref)) =
running.store.insert(Some(parent_handle.clone()), node)
if let Some((node_handle, node_ref)) = running
.store
.insert(Some(parent_handle.clone()), GraphNode(node))
{
running.futures.push(With::new(
running.visit.edges(node_ref),
Expand All @@ -142,7 +161,9 @@ where
}
}
VisitControlFlow::Skip(node) => {
running.store.insert(Some(parent_handle.clone()), node);
running
.store
.insert(Some(parent_handle.clone()), GraphNode(node));
}
VisitControlFlow::Abort(abort) => {
break 'outer (
Expand Down
15 changes: 11 additions & 4 deletions crates/turbo-tasks/src/graph/non_deterministic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::graph_store::GraphStore;
use super::graph_store::{GraphNode, GraphStore};

/// A graph traversal that does not guarantee any particular order, and may not
/// return the same order every time it is run.
Expand All @@ -8,19 +8,26 @@ pub struct NonDeterministic<T> {

impl<T> Default for NonDeterministic<T> {
fn default() -> Self {
Self::new()
}
}

impl<T> NonDeterministic<T> {
pub fn new() -> Self {
Self { output: Vec::new() }
}
}

impl<T> GraphStore<T> for NonDeterministic<T> {
impl<T> GraphStore for NonDeterministic<T> {
type Node = T;
type Handle = ();

fn insert(
&mut self,
_from_handle: Option<Self::Handle>,
node: T,
node: GraphNode<T>,
) -> Option<(Self::Handle, &T)> {
self.output.push(node);
self.output.push(node.into_node());
Some(((), self.output.last().unwrap()))
}
}
Expand Down
20 changes: 15 additions & 5 deletions crates/turbo-tasks/src/graph/reverse_topological.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::{HashMap, HashSet};

use super::graph_store::GraphStore;
use super::graph_store::{GraphNode, GraphStore};

/// A graph traversal that returns nodes in reverse topological order.
pub struct ReverseTopological<T>
Expand All @@ -16,20 +16,30 @@ where
T: Eq + std::hash::Hash + Clone,
{
fn default() -> Self {
Self::new()
}
}

impl<T> ReverseTopological<T>
where
T: Eq + std::hash::Hash + Clone,
{
pub fn new() -> Self {
Self {
adjacency_map: HashMap::new(),
roots: Vec::new(),
}
}
}

impl<T> GraphStore<T> for ReverseTopological<T>
impl<T> GraphStore for ReverseTopological<T>
where
T: Eq + std::hash::Hash + Clone,
{
type Node = T;
type Handle = T;

fn insert(&mut self, from_handle: Option<T>, node: T) -> Option<(Self::Handle, &T)> {
fn insert(&mut self, from_handle: Option<T>, node: GraphNode<T>) -> Option<(Self::Handle, &T)> {
let vec = if let Some(from_handle) = from_handle {
self.adjacency_map
.entry(from_handle)
Expand All @@ -38,8 +48,8 @@ where
&mut self.roots
};

vec.push(node.clone());
Some((node, vec.last().unwrap()))
vec.push(node.node().clone());
Some((node.into_node(), vec.last().unwrap()))
}
}

Expand Down
21 changes: 10 additions & 11 deletions crates/turbopack-core/src/changed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use turbo_tasks::{
graph::{GraphTraversal, NonDeterministic, SkipDuplicates},
graph::{GraphTraversal, NonDeterministic},
CompletionVc, CompletionsVc,
};

Expand All @@ -20,16 +20,15 @@ async fn get_referenced_assets(parent: AssetVc) -> Result<impl Iterator<Item = A
/// asset graph changes.
#[turbo_tasks::function]
pub async fn any_content_changed(root: AssetVc) -> Result<CompletionVc> {
let completions = GraphTraversal::<SkipDuplicates<NonDeterministic<_>, _>>::visit(
[root],
get_referenced_assets,
)
.await
.completed()?
.into_inner()
.into_iter()
.map(content_changed)
.collect();
let completions = NonDeterministic::new()
.skip_duplicates()
.visit([root], get_referenced_assets)
.await
.completed()?
.into_inner()
.into_iter()
.map(content_changed)
.collect();

Ok(CompletionsVc::cell(completions).completed())
}
Expand Down
Loading

0 comments on commit b9cfb3b

Please sign in to comment.