diff --git a/pydatastructs/graphs/__init__.py b/pydatastructs/graphs/__init__.py index 8fd88ea7b..6c2f3ba90 100644 --- a/pydatastructs/graphs/__init__.py +++ b/pydatastructs/graphs/__init__.py @@ -8,7 +8,8 @@ from . import algorithms from .algorithms import ( - breadth_first_search + breadth_first_search, + breadth_first_search_parallel ) __all__.extend(algorithms.__all__) diff --git a/pydatastructs/graphs/algorithms.py b/pydatastructs/graphs/algorithms.py index 8a0bbd320..3f253f3b6 100644 --- a/pydatastructs/graphs/algorithms.py +++ b/pydatastructs/graphs/algorithms.py @@ -5,9 +5,11 @@ # TODO: REPLACE COLLECTIONS QUEUE WITH PYDATASTRUCTS QUEUE from collections import deque as Queue from pydatastructs.utils.misc_util import AdjacencyListGraphNode +from concurrent.futures import ThreadPoolExecutor __all__ = [ 'breadth_first_search', + 'breadth_first_search_parallel' ] def breadth_first_search( @@ -90,3 +92,98 @@ def _breadth_first_search_adjacency_list( return None _breadth_first_search_adjacency_matrix = _breadth_first_search_adjacency_list + +def breadth_first_search_parallel( + graph, source_node, num_threads, operation, *args, **kwargs): + """ + Parallel implementation of breadth first search on graphs. + + Parameters + ========== + + graph: Graph + The graph on which BFS is to be performed. + source_node: str + The name of the source node from where the BFS is + to be initiated. + num_threads: int + Number of threads to be used for computation. + operation: function + The function which is to be applied + on every node when it is visited. + The prototype which is to be followed is, + `function_name(curr_node, next_node, + arg_1, arg_2, . . ., arg_n)`. + Here, the first two arguments denote, the + current node and the node next to current node. + The rest of the arguments are optional and you can + provide your own stuff there. + + Note + ==== + + You should pass all the arguments which you are going + to use in the prototype of your `operation` after + passing the operation function. + + Examples + ======== + + >>> from pydatastructs import Graph, AdjacencyListGraphNode + >>> V1 = AdjacencyListGraphNode("V1") + >>> V2 = AdjacencyListGraphNode("V2") + >>> V3 = AdjacencyListGraphNode("V3") + >>> G = Graph(V1, V2, V3) + >>> from pydatastructs import breadth_first_search_parallel + >>> def f(curr_node, next_node, dest_node): + ... return curr_node != dest_node + ... + >>> G.add_edge(V1.name, V2.name) + >>> G.add_edge(V2.name, V3.name) + >>> breadth_first_search_parallel(G, V1.name, 3, f, V3.name) + """ + import pydatastructs.graphs.algorithms as algorithms + func = "_breadth_first_search_parallel_" + graph._impl + if not hasattr(algorithms, func): + raise NotImplementedError( + "Currently breadth first search isn't implemented for " + "%s graphs."%(graph._impl)) + return getattr(algorithms, func)( + graph, source_node, num_threads, operation, *args, **kwargs) + +def _generate_layer(**kwargs): + _args, _kwargs = kwargs.get('args'), kwargs.get('kwargs') + (graph, curr_node, next_layer, visited, operation) = _args[0:5] + op_args, op_kwargs = _args[5:], _kwargs + next_nodes = graph.neighbors(curr_node) + status = True + if len(next_nodes) != 0: + for next_node in next_nodes: + if visited.get(next_node, False) is False: + status = status and operation(curr_node, next_node.name, *op_args, **op_kwargs) + next_layer.add(next_node.name) + visited[next_node.name] = True + else: + status = status and operation(curr_node, "", *op_args, **op_kwargs) + return status + +def _breadth_first_search_parallel_adjacency_list( + graph, source_node, num_threads, operation, *args, **kwargs): + visited, layers = dict(), dict() + layers[0] = set() + layers[0].add(source_node) + visited[source_node] = True + layer = 0 + while len(layers[layer]) != 0: + layers[layer+1] = set() + with ThreadPoolExecutor(max_workers=num_threads) as Executor: + for node in layers[layer]: + status = Executor.submit( + _generate_layer, args= + (graph, node, layers[layer+1], visited, + operation, *args), kwargs=kwargs).result() + layer += 1 + if not status: + return None + +_breadth_first_search_parallel_adjacency_matrix = _breadth_first_search_parallel_adjacency_list diff --git a/pydatastructs/graphs/tests/test_algorithms.py b/pydatastructs/graphs/tests/test_algorithms.py index 208108120..065fa2e94 100644 --- a/pydatastructs/graphs/tests/test_algorithms.py +++ b/pydatastructs/graphs/tests/test_algorithms.py @@ -1,4 +1,5 @@ -from pydatastructs import breadth_first_search, Graph +from pydatastructs import (breadth_first_search, Graph, +breadth_first_search_parallel) def test_breadth_first_search(): @@ -72,3 +73,51 @@ def path_finder(curr_node, next_node, dest_node, parent, path): _test_breadth_first_search("List", "adjacency_list") _test_breadth_first_search("Matrix", "adjacency_matrix") + +def test_breadth_first_search_parallel(): + + def _test_breadth_first_search_parallel(ds, impl): + import pydatastructs.utils.misc_util as utils + GraphNode = getattr(utils, "Adjacency" + ds + "GraphNode") + + V1 = GraphNode(0) + V2 = GraphNode(1) + V3 = GraphNode(2) + V4 = GraphNode(3) + V5 = GraphNode(4) + V6 = GraphNode(5) + V7 = GraphNode(6) + V8 = GraphNode(7) + + + G1 = Graph(V1, V2, V3, V4, V5, V6, V7, V8, implementation=impl) + + edges = [ + (V1.name, V2.name), + (V1.name, V3.name), + (V1.name, V4.name), + (V2.name, V5.name), + (V2.name, V6.name), + (V3.name, V6.name), + (V3.name, V7.name), + (V4.name, V7.name), + (V4.name, V8.name) + ] + + for edge in edges: + G1.add_edge(*edge) + + parent = dict() + def bfs_tree(curr_node, next_node, parent): + if next_node != "": + parent[next_node] = curr_node + return True + + breadth_first_search_parallel(G1, V1.name, 5, bfs_tree, parent) + assert (parent[V2.name] == V1.name and parent[V3.name] == V1.name and + parent[V4.name] == V1.name and parent[V5.name] == V2.name and + (parent[V6.name] in (V2.name, V3.name)) and + (parent[V7.name] in (V3.name, V4.name)) and (parent[V8.name] == V4.name)) + + _test_breadth_first_search_parallel("List", "adjacency_list") + _test_breadth_first_search_parallel("Matrix", "adjacency_matrix")