diff --git a/README.md b/README.md index 00cfbe2..8328bc3 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ pip install ngdi - Spark 2.4, 3.0(not yet tested) - [NebulaGraph 3.4+](https://github.com/vesoft-inc/nebula) - [NebulaGraph Spark Connector 3.4+](https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/) -- [NebulaGraph Algorithm 3.4+](https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/) +- [NebulaGraph Algorithm 3.1+](https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/) ### NebulaGraph Engine Prerequisites - [NebulaGraph 3.4+](https://github.com/vesoft-inc/nebula) diff --git a/examples/spark_engine.ipynb b/examples/spark_engine.ipynb index f71f2f9..8d1fbda 100644 --- a/examples/spark_engine.ipynb +++ b/examples/spark_engine.ipynb @@ -1,8 +1,19 @@ { "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "f46fdd40", + "metadata": {}, + "outputs": [], + "source": [ + "# install ngdi\n", + "!pip install ngdi" + ] + }, { "cell_type": "markdown", - "id": "8d81af21", + "id": "5b4e4143", "metadata": {}, "source": [ "## Spark Engine Examples\n", @@ -11,7 +22,7 @@ }, { "cell_type": "markdown", - "id": "8d0a7751", + "id": "f17abcf8", "metadata": {}, "source": [ "In this example, we are leveraging the Spark Engine of NebulaGraph DI Suite, with the Storage Scan mode.\n", @@ -69,7 +80,7 @@ }, { "cell_type": "markdown", - "id": "d29ebd47", + "id": "3617de5f", "metadata": {}, "source": [ "#### Step 2, run Pagerank Algorithm" @@ -78,7 +89,7 @@ { "cell_type": "code", "execution_count": 2, - "id": "adeeacbe", + "id": "90069aaf", "metadata": {}, "outputs": [ { @@ -138,6 +149,13 @@ "only showing top 20 rows\n", "\n" ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "23/02/28 12:13:38 WARN BlockManager: Block rdd_75_4 already exists on this machine; not re-adding it\n" + ] } ], "source": [ @@ -146,7 +164,7 @@ }, { "cell_type": "markdown", - "id": "ef66c9e2", + "id": "66e70ca0", "metadata": {}, "source": [ "#### Step 3, check results of the algorithm\n" @@ -165,11 +183,11 @@ "+---------+-------------------+\n", "| _id| pagerank|\n", "+---------+-------------------+\n", - "|player133|0.18601069183310506|\n", - "|player126|0.18601069183310506|\n", - "|player130| 1.2400712788873671|\n", - "|player108|0.18601069183310506|\n", - "|player102| 1.6602373739502538|\n", + "|player133|0.18601069183310504|\n", + "|player126|0.18601069183310504|\n", + "|player130| 1.240071278887367|\n", + "|player108|0.18601069183310504|\n", + "|player102| 1.6602373739502536|\n", "+---------+-------------------+\n", "only showing top 5 rows\n", "\n" @@ -182,7 +200,7 @@ }, { "cell_type": "markdown", - "id": "4b9cf5fe", + "id": "3eb228f8", "metadata": {}, "source": [ "------------------\n", @@ -198,7 +216,7 @@ { "cell_type": "code", "execution_count": 4, - "id": "b6542994", + "id": "e44ac3e9", "metadata": {}, "outputs": [ { @@ -231,7 +249,7 @@ }, { "cell_type": "markdown", - "id": "d24e5903", + "id": "49becbdb", "metadata": {}, "source": [ "#### Step 2, run Conncted Components Algorithm" @@ -240,7 +258,7 @@ { "cell_type": "code", "execution_count": 5, - "id": "fc5dcc2c", + "id": "cfbcda82", "metadata": {}, "outputs": [ { @@ -300,13 +318,6 @@ "only showing top 20 rows\n", "\n" ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "23/02/27 11:29:58 WARN BlockManager: Block rdd_308_3 already exists on this machine; not re-adding it\n" - ] } ], "source": [ @@ -315,7 +326,7 @@ }, { "cell_type": "markdown", - "id": "88a13923", + "id": "38181d45", "metadata": {}, "source": [ "#### Step 3, check results of the algorithm\n" @@ -324,7 +335,7 @@ { "cell_type": "code", "execution_count": 6, - "id": "843b9662", + "id": "bed14375", "metadata": {}, "outputs": [ { @@ -349,13 +360,33 @@ "cc_result.show(5)" ] }, + { + "cell_type": "markdown", + "id": "5bcb02e2", + "metadata": {}, + "source": [ + "## Other algorithm examples" + ] + }, { "cell_type": "code", "execution_count": null, - "id": "a8d97f96", + "id": "ff5a866d", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "# lpa_result = df.algo.label_propagation()\n", + "# louvain_result = df.algo.louvain()\n", + "# k_core_result = df.algo.k_core()\n", + "# degree_statics_result = df.algo.degree_statics()\n", + "# betweenness_centrality_result = df.algo.betweenness_centrality()\n", + "# coefficient_centrality_result = df.algo.coefficient_centrality()\n", + "# bfs_result = df.algo.bfs()\n", + "# hanp_result = df.algo.hanp()\n", + "# jaccard_result = df.algo.jaccard()\n", + "# strong_connected_components_result = df.algo.strong_connected_components()\n", + "# triangle_count_result = df.algo.triangle_count()" + ] } ], "metadata": { diff --git a/ngdi/nebula_algo.py b/ngdi/nebula_algo.py index c7c4d47..d642988 100644 --- a/ngdi/nebula_algo.py +++ b/ngdi/nebula_algo.py @@ -65,6 +65,7 @@ def get_spark_dataframe(self): """ df = self.ndf_obj.data from pyspark.sql.dataframe import DataFrame as pyspark_sql_df + if not isinstance(df, pyspark_sql_df): raise Exception( "The NebulaDataFrameObject is not a spark dataframe", @@ -72,40 +73,41 @@ def get_spark_dataframe(self): ) return df - def pagerank(self, reset_prob: float = 0.15, max_iter: int = 10): + def pagerank( + self, reset_prob: float = 0.15, max_iter: int = 10, weighted: bool = False + ): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "PRConfig", "PageRankAlgo" ) df = self.get_spark_dataframe() config = spark._jvm.PRConfig(max_iter, reset_prob, encode_vertex_id) - result = spark._jvm.PageRankAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + result = spark._jvm.PageRankAlgo.apply(jspark, df._jdf, config, weighted) + return result - def connected_components(self, max_iter: int = 10): + def connected_components(self, max_iter: int = 10, weighted: bool = False): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "CcConfig", "ConnectedComponentsAlgo" ) df = self.get_spark_dataframe() config = spark._jvm.CcConfig(max_iter, encode_vertex_id) result = spark._jvm.ConnectedComponentsAlgo.apply( - jspark, df._jdf, config, False + jspark, df._jdf, config, weighted ) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + return result - def label_propagation(self, max_iter: int = 10): + def label_propagation(self, max_iter: int = 10, weighted: bool = False): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "LPAConfig", "LabelPropagationAlgo" ) df = self.get_spark_dataframe() config = spark._jvm.LPAConfig(max_iter, encode_vertex_id) - result = spark._jvm.LabelPropagationAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + result = spark._jvm.LabelPropagationAlgo.apply( + jspark, df._jdf, config, weighted + ) + return result def louvain(self, max_iter: int = 10, internalIter: int = 10, tol: float = 0.0001): @@ -116,8 +118,7 @@ def louvain(self, max_iter: int = 10, internalIter: int = 10, tol: float = 0.000 config = spark._jvm.LouvainConfig(max_iter, internalIter, tol, encode_vertex_id) result = spark._jvm.LouvainAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + return result def k_core(self, max_iter: int = 10, degree: int = 2): @@ -128,59 +129,62 @@ def k_core(self, max_iter: int = 10, degree: int = 2): config = spark._jvm.KCoreConfig(max_iter, degree, encode_vertex_id) - result = spark._jvm.KCoreAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + result = spark._jvm.KCoreAlgo.apply(jspark, df._jdf, config) + return result - def shortest_path(self, landmarks: list): - engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( - "ShortestPathConfig", "ShortestPathAlgo" - ) - df = self.get_spark_dataframe() + # def shortest_path(self, landmarks: list, weighted: bool = False): + # engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( + # "ShortestPathConfig", "ShortestPathAlgo" + # ) + # # TBD: ShortestPathAlgo is not yet encodeID compatible + # df = self.get_spark_dataframe() - config = spark._jvm.ShortestPathConfig(landmarks, encode_vertex_id) - result = spark._jvm.ShortestPathAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future - return result + # config = spark._jvm.ShortestPathConfig(landmarks, encode_vertex_id) + # result = spark._jvm.ShortestPathAlgo.apply(jspark, df._jdf, config, weighted) + + # return result - def degree_statics(self, degree: bool = True, in_degree: bool = False, out_degree: bool = False): + def degree_statics(self): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "DegreeStaticConfig", "DegreeStaticAlgo" ) df = self.get_spark_dataframe() - config = spark._jvm.DegreeStaticConfig(degree, in_degree, out_degree, encode_vertex_id) - result = spark._jvm.DegreeStaticAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + config = spark._jvm.DegreeStaticConfig(encode_vertex_id) + result = spark._jvm.DegreeStaticAlgo.apply(jspark, df._jdf, config) + return result - def betweenness_centrality(self, max_iter: int = 10, degree: int = 2): + def betweenness_centrality( + self, max_iter: int = 10, degree: int = 2, weighted: bool = False + ): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "BetweennessConfig", "BetweennessCentralityAlgo" ) df = self.get_spark_dataframe() config = spark._jvm.BetweennessConfig(max_iter, encode_vertex_id) - result = spark._jvm.BetweennessCentralityAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + result = spark._jvm.BetweennessCentralityAlgo.apply( + jspark, df._jdf, config, weighted + ) + return result def coefficient_centrality(self, type: str = "local"): # type could be either "local" or "global" - assert type.lower() in ["local", "global"], "type should be either local or global" + assert type.lower() in ["local", "global"], ( + "type should be either local or global" + f"in coefficient_centrality algo. Got type: {type}" + ) engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( - "CoefficientConfig", "CoefficientCentralityAlgo" + "CoefficientConfig", "ClusteringCoefficientAlgo" ) df = self.get_spark_dataframe() config = spark._jvm.CoefficientConfig(type, encode_vertex_id) - result = spark._jvm.CoefficientCentralityAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + result = spark._jvm.ClusteringCoefficientAlgo.apply(jspark, df._jdf, config) + return result def bfs(self, max_depth: int = 10, root: int = 1): @@ -189,56 +193,90 @@ def bfs(self, max_depth: int = 10, root: int = 1): ) df = self.get_spark_dataframe() - config = spark._jvm.BfsConfig(source, max_depth, encode_vertex_id) - result = spark._jvm.BfsAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future - return result - - def dfs(self, max_depth: int = 10, root: int = 1): - engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( - "DfsConfig", "DfsAlgo" - ) - df = self.get_spark_dataframe() + config = spark._jvm.BfsConfig(max_depth, root, encode_vertex_id) + result = spark._jvm.BfsAlgo.apply(jspark, df._jdf, config) - config = spark._jvm.DfsConfig(source, max_depth, encode_vertex_id) - result = spark._jvm.DfsAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future return result - def hanp(self, hop_attenuation: float = 0.5, max_iter: int = 10, preference: float = 1.0): + # dfs is not yet supported, need to revisit upstream nebula-algorithm + # + # def dfs(self, max_depth: int = 10, root: int = 1): + # engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( + # "DfsConfig", "DfsAlgo" + # ) + # df = self.get_spark_dataframe() + + # config = spark._jvm.DfsConfig(max_depth, root, encode_vertex_id) + # result = spark._jvm.DfsAlgo.apply(jspark, df._jdf, config) + + # return result + + def hanp( + self, + hop_attenuation: float = 0.5, + max_iter: int = 10, + preference: float = 1.0, + weighted: bool = False, + preferences=None, + ): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "HanpConfig", "HanpAlgo" ) df = self.get_spark_dataframe() - config = spark._jvm.HanpConfig(hop_attenuation, max_iter, preference, encode_vertex_id) - result = spark._jvm.HanpAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future - return result - - def node2vec( - self, max_iter: int = 10, lr: float = 0.025, - data_num_partition: int = 10, model_num_partition: int = 10, - dim: int = 10, window: int = 3, walk_length: int = 5, - num_walks: int = 3, p: float = 1.0, q: float = 1.0, - directed: bool = False, degree: int = 30, emb_separator: str = ",", - model_path: str = "hdfs://127.0.0.1:9000/model"): - engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( - "Node2VecConfig", "Node2VecAlgo" + config = spark._jvm.HanpConfig( + hop_attenuation, max_iter, preference, encode_vertex_id ) - df = self.get_spark_dataframe() - config = spark._jvm.Node2VecConfig( - max_iter, lr, data_num_partition, model_num_partition, dim, window, - walk_length, num_walks, p, q, directed, degree, emb_separator, model_path, encode_vertex_id + result = spark._jvm.HanpAlgo.apply( + jspark, df._jdf, config, weighted, preferences ) - result = spark._jvm.Node2VecAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + return result + # def node2vec( + # self, + # max_iter: int = 10, + # lr: float = 0.025, + # data_num_partition: int = 10, + # model_num_partition: int = 10, + # dim: int = 10, + # window: int = 3, + # walk_length: int = 5, + # num_walks: int = 3, + # p: float = 1.0, + # q: float = 1.0, + # directed: bool = False, + # degree: int = 30, + # emb_separator: str = ",", + # model_path: str = "hdfs://127.0.0.1:9000/model", + # weighted: bool = False, + # ): + # engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( + # "Node2vecConfig", "Node2VecAlgo" + # ) + # # TBD: Node2VecAlgo is not yet encodeID compatible + # df = self.get_spark_dataframe() + # config = spark._jvm.Node2vecConfig( + # max_iter, + # lr, + # data_num_partition, + # model_num_partition, + # dim, + # window, + # walk_length, + # num_walks, + # p, + # q, + # directed, + # degree, + # emb_separator, + # model_path, + # encode_vertex_id, + # ) + # result = spark._jvm.Node2VecAlgo.apply(jspark, df._jdf, config, weighted) + + # return result + def jaccard(self, tol: float = 1.0): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "JaccardConfig", "JaccardAlgo" @@ -246,48 +284,43 @@ def jaccard(self, tol: float = 1.0): df = self.get_spark_dataframe() config = spark._jvm.JaccardConfig(tol, encode_vertex_id) - result = spark._jvm.JaccardAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + result = spark._jvm.JaccardAlgo.apply(jspark, df._jdf, config) + return result - def strong_connected_components(self, max_iter: int = 10): + def strong_connected_components(self, max_iter: int = 10, weighted: bool = False): engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( "CcConfig", "StronglyConnectedComponentsAlgo" ) df = self.get_spark_dataframe() config = spark._jvm.CcConfig(max_iter, encode_vertex_id) result = spark._jvm.StronglyConnectedComponentsAlgo.apply( - jspark, df._jdf, config, False + jspark, df._jdf, config, weighted ) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future + return result def triangle_count(self): - # TBD: TriangleCountConfig is not yet supported in nebula-algorithm engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( - "TriangleCountConfig", "TriangleCountAlgo" + "TriangleConfig", "TriangleCountAlgo" ) df = self.get_spark_dataframe() - config = spark._jvm.TriangleCountConfig(encode_vertex_id) - result = spark._jvm.TriangleCountAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future - return result + config = spark._jvm.TriangleConfig(encode_vertex_id) + result = spark._jvm.TriangleCountAlgo.apply(jspark, df._jdf, config) - def closeness(self, weighted: bool = False): - # TBD: ClosenessConfig is not yet supported in nebula-algorithm - engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( - "ClosenessConfig", "ClosenessAlgo" - ) - df = self.get_spark_dataframe() - config = spark._jvm.ClosenessConfig(weighted, encode_vertex_id) - result = spark._jvm.ClosenessAlgo.apply(jspark, df._jdf, config, False) - # TBD: False means not to use the default partitioner, - # we could make it configurable in the future return result + # def closeness(self, weighted: bool = False): + # # TBD: ClosenessAlgo is not yet encodeID compatible + # engine, spark, jspark, encode_vertex_id = self.get_spark_engine_context( + # "ClosenessConfig", "ClosenessAlgo" + # ) + # df = self.get_spark_dataframe() + # config = spark._jvm.ClosenessConfig(weighted, encode_vertex_id) + # result = spark._jvm.ClosenessAlgo.apply(jspark, df._jdf, config, False) + + # return result + class NebulaGraphAlgorithm: """ @@ -312,6 +345,4 @@ def check_engine(self): def pagerank(self, reset_prob=0.15, max_iter=10): self.check_engine() - - g = self.graph - return result + pass diff --git a/ngdi/nebula_data.py b/ngdi/nebula_data.py index 4a34872..c1e4edc 100644 --- a/ngdi/nebula_data.py +++ b/ngdi/nebula_data.py @@ -20,6 +20,7 @@ def get_engine(self): @property def algo(self): from ngdi.nebula_algo import NebulaAlgorithm as NebulaAlgorithmImpl + return NebulaAlgorithmImpl(self) def get_nx_graph(self): @@ -81,6 +82,7 @@ def get_engine(self): @property def algo(self): from ngdi.nebula_algo import NebulaAlgorithm as NebulaAlgorithmImpl + return NebulaAlgorithmImpl(self) def to_spark_df(self): diff --git a/pyproject.toml b/pyproject.toml index ce94aad..917ba0a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ [project] name = "ngdi" -version = "0.1.5" +version = "0.1.6" description = "NebulaGraph Data Intelligence Suite" authors = [ {name = "Wey Gu", email = "weyl.gu@gmail.com"},