Skip to content

Commit

Permalink
wip: breadth-first topological sort for graph exec
Browse files Browse the repository at this point in the history
  • Loading branch information
KennedyRichard committed May 6, 2024
1 parent 545aece commit 30debe6
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 92 deletions.
22 changes: 22 additions & 0 deletions nodezator/graphman/callablenode/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ def create_execution_support_objects(self):
for param_name in self.var_kind_map
}

### create set to store ids of nodes providing incoming
### connections (inputs) to this one;
###
### also store references to recurrent methods

isi = self.input_source_ids = set()

self.clear_source_ids = isi.clear
self.remove_source_ids = isi.difference_update
self.add_source_id = isi.add

def perform_execution_setup(self):
"""Clear input slots, set finished flag to False.
Expand All @@ -35,6 +46,17 @@ def perform_execution_setup(self):
for param_name in self.var_kind_map:
self.argument_map[param_name] = {}

### update set of ids from nodes that provide input to this one
self.clear_source_ids()

def send_id_to_direct_children(self):
### add id to direct children as source of data
###
### used for indegree tracking

for output_socket in self.output_sockets:
output_socket.add_source_id(self.id)

def check_and_setup_parameters(self):
"""Perform checks/setups on parameters.
Expand Down
129 changes: 40 additions & 89 deletions nodezator/graphman/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

### standard library imports

from collections import deque

from time import time

from io import StringIO
Expand All @@ -14,6 +12,8 @@

from os import linesep

from itertools import chain


### local imports

Expand Down Expand Up @@ -121,20 +121,15 @@ def __init__(self):
setattr(self, attr_name, method)


### create collections to aid in sorting nodes for execution

## create deque used to sort nodes to be executed;
##
## also reference useful recurrent methods

sorted_nodes = self.sorted_nodes = deque()
### create list used to sort groups of nodes (generations) to
### be executed;
###
### also reference useful recurrent methods

self.clear_sorted_nodes = sorted_nodes.clear
self.prepend_node = sorted_nodes.appendleft
node_generations = self.node_generations = []

## create additional set to store nodes in the final stage of
## the sorting process
self.visited_nodes = set()
self.clear_node_generations = node_generations.clear
self.append_node_generation = node_generations.append

### create list of executed nodes
self.executed_nodes = []
Expand Down Expand Up @@ -220,7 +215,6 @@ def execute_graph(self, requested_nodes=None):
add_data_node = self.add_data_node
add_callable_mode_node = self.add_callable_mode_node
add_node_for_execution = self.add_node_for_execution
a_set = add_node_for_execution.__self__

## try iterating over nodes, adding them to specific sets and
## performing extra tasks as needed
Expand Down Expand Up @@ -276,9 +270,7 @@ def execute_graph(self, requested_nodes=None):
### were visited and dealt with already;
###
### now it is time to sort the nodes meant to be executed
###
### while doing so, we also perform setups/checks
self._sort_and_setup_nodes_for_execution()
self._sort_nodes_for_execution()

### reference and clear list to store references of executed nodes

Expand All @@ -299,7 +291,7 @@ def execute_graph(self, requested_nodes=None):
### executing them and perform several other related tasks as needed,
### like sending output to the next nodes, etc.

for node in self.sorted_nodes:
for node in chain.from_iterable(self.node_generations):

### try executing the node

Expand Down Expand Up @@ -528,90 +520,49 @@ def execute_graph(self, requested_nodes=None):

set_status_message(f'Total execution time was {time_for_humans}')

def _sort_and_setup_nodes_for_execution(self):
"""Sort nodes to be executed and perform execution setups/checks.
Uses a depth-first topological sort algorithm based on the pseudocode
documented here:
def _sort_nodes_for_execution(self):
"""Sort nodes to be executed.
https://en.wikipedia.org/wiki/Topological_sorting#Depth-first_search
...which is described in the "Introduction to Algorightms" book
(2nd ed.) by Cormen et al (2001).
However, the algorithm was adapted here to take into account the fact
that we have a subsystem that prevents the formation of cycles.
That is, when the user tries to connect nodes in a way that the output
would cycle back to the same node from which it came, that connection
is never formed.
As a consequence, we removed the extra checks performed in the original
algorithm to detect cycles, since they are unnecessary here. We operate
under the assumption that no cycles exist among the nodes to be sorted.
Uses a breadth-first topological sort algorithm.
"""
### clear deque where references to the nodes to be executed will be
### referenced in order
self.clear_sorted_nodes()

### clear set of visited nodes, used to reference nodes
### in the final stage of the sorting process
self.visited_nodes.clear()
### clear list where references to the node generations to be
### executed will be referenced in order
self.clear_node_generations()

### iterate over nodes to be executed
### reference objects locally for quicker and easier access

for node in self.nodes_for_execution:
## operation to append node generations
append_node_generation = self.append_node_generation

## the remaining ones must be executed, so we can begin
## visiting them one by one, while sorting them and
## performing extra setups/checks
self._visit(node)
## set of nodes for execution
nodes_for_execution = self.nodes_for_execution

### send ids of nodes for execution to their direct children,
### so they can keep track of their indegree

def _visit(self, node):
for node in nodes_for_execution:
node.send_id_to_direct_children()

### if node was already visited, there's nothing else to be
### done, so exit the method right away
### while there's nodes for execution, keep creating and appending
### new node generations from the nodes with indegree == 0, always
### decrementing the indegree of their direct children

if node in self.visited_nodes:
return
while nodes_for_execution:

### iterate over the output sockets, visiting the nodes
### attached to them, if any
node_generation = [
node
for node in nodes_for_execution
if len(node.input_source_ids) == 0
]

for output_socket in node.output_sockets:
append_node_generation(node_generation)

## try retrieving the socket's children

try:
children = output_socket.children

## if they don't exist, just pass
except AttributeError:
pass

## otherwise, the children are input sockets from other
## nodes that are attached to this one downstream (nodes
## that depend on the output of this node)
##
## so visit each of such nodes

else:

for child_input_socket in children:
self._visit(child_input_socket.node)

### add node to set of visited ones
self.visited_nodes.add(node)

### only prepend nodes that are not redirect nodes;
###
### redirect nodes cannot be executed, they are not meant for that;
### they just pass the data to the next nodes on the graph
nodes_for_execution.difference_update(node_generation)

if node not in self.redirect_nodes:
self.prepend_node(node)
ids_to_remove = {node.id for node in node_generation}

for node in nodes_for_execution:
node.remove_source_ids(ids_to_remove)

def execute_with_custom_stdout(self):

Expand Down
27 changes: 24 additions & 3 deletions nodezator/graphman/operatornode/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,35 @@ def __init__(self):
### (the input comes only from other nodes)
self.argument_map = {}

### set the clear method of argument map as the callable
### for the 'perform_execution_setup' routine
self.perform_execution_setup = self.argument_map.clear
### create set to store ids of nodes providing incoming
### connections (inputs) to this one;
###
### also store references to recurrent methods

isi = self.input_source_ids = set()

self.clear_source_ids = isi.clear
self.remove_source_ids = isi.difference_update
self.add_source_id = isi.add

### set an empty function as the 'perform_pre_execution_setups'
### routine
self.perform_pre_execution_setups = empty_function

def perform_execution_setup(self):

### clear argument map
self.argument_map.clear()

### clear set of ids from nodes that provide input to this one
self.clear_source_ids()

def send_id_to_direct_children(self):
### add id to direct children as source of data
###
### used for indegree tracking
self.signature_output_socket.add_source_id(self.id)

def check_and_setup_parameters(self):
"""Check whether parameters lack a source of daa.
Expand Down
3 changes: 3 additions & 0 deletions nodezator/graphman/socket/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ def receive_input(self, data):
self.subparameter_index,
)

def add_source_id(self, source_id):
self.node.add_source_id(source_id)

def svg_repr(self):
""""""
socket_radius_str = str(7 - 1)
Expand Down
7 changes: 7 additions & 0 deletions nodezator/graphman/socket/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ def signal_severance(self):
else:
method(self)

def add_source_id(self, source_id):

if hasattr(self, 'children'):

for child in self.children:
child.add_source_id(source_id)

def svg_repr(self):
""""""

Expand Down
9 changes: 9 additions & 0 deletions nodezator/graphman/socket/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,15 @@ def on_right_mouse_release(self, event):
event.pos,
)

def add_source_id(self, source_id):

out_socket = self.node.output_socket

if hasattr(out_socket, 'children'):

for child in out_socket.children:
child.add_source_id(source_id)

def svg_repr(self):

socket_radius_str = str(7 - 1)
Expand Down

0 comments on commit 30debe6

Please sign in to comment.