From 456676bed29dfe79af82ee910891c2e742dcb935 Mon Sep 17 00:00:00 2001 From: Wey Gu Date: Sun, 26 Mar 2023 10:04:40 +0800 Subject: [PATCH] feat: phase 2 of networkx/nebula engine, writer design - nebula engine reader, algo passed - API docs - writer design proposed in examples partially-implement: #28 --- docs/API.md | 24 ++- examples/networkx_engine.ipynb | 304 +++++++++++++++++++++++++++++++++ ng_ai/engines.py | 7 +- ng_ai/nebula_algo.py | 12 +- ng_ai/nebula_data.py | 61 +++---- ng_ai/nebula_reader.py | 25 ++- pdm.lock | 14 +- pyproject.toml | 2 +- tests/unit/test_nebula_data.py | 9 - 9 files changed, 383 insertions(+), 75 deletions(-) create mode 100644 examples/networkx_engine.ipynb diff --git a/docs/API.md b/docs/API.md index 43404b6..fefc492 100644 --- a/docs/API.md +++ b/docs/API.md @@ -81,13 +81,31 @@ df = reader.read() # this will take some time df.show(10) ``` +#### NebulaGraph Engine(NetworkX) + +```python +from ng_ai import NebulaReader +from ng_ai.config import NebulaGraphConfig +# read data with spark engine, query mode +config_dict = { + "graphd_hosts": "127.0.0.1:9669", + "user": "root", + "password": "nebula", + "space": "basketballplayer", +} +config = NebulaGraphConfig(**config_dict) +reader = NebulaReader(engine="nebula", config=config) +reader.query(edges=["follow", "serve"], props=[["degree"],[]]) +g = reader.read() +g.show(10) +g.draw() +``` + ## engines - `ng_ai.engines.SparkEngine` is the Spark Engine for `ng_ai.NebulaReader`, `ng_ai.NebulaWriter` and `ng_ai.NebulaAlgorithm`. -- `ng_ai.engines.NebulaEngine` is the NebulaGraph Engine for `ng_ai.NebulaReader`, `ng_ai.NebulaWriter`. - -- `ng_ai.engines.NetworkXEngine` is the NetworkX Engine for `ng_ai.NebulaAlgorithm`. +- `ng_ai.engines.NebulaEngine` is the NebulaGraph Engine for `ng_ai.NebulaReader`, `ng_ai.NebulaWriter` and `ng_ai.NebulaAlgorithm`, which is based on NetworkX and Nebula-Python. ## `NebulaDataFrameObject` diff --git a/examples/networkx_engine.ipynb b/examples/networkx_engine.ipynb new file mode 100644 index 0000000..bd605f4 --- /dev/null +++ b/examples/networkx_engine.ipynb @@ -0,0 +1,304 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a54fe998", + "metadata": {}, + "source": [ + "![image](https://user-images.githubusercontent.com/1651790/221876073-61ef4edb-adcd-4f10-b3fc-8ddc24918ea1.png)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f46fdd40", + "metadata": {}, + "outputs": [], + "source": [ + "# install ng_ai in the first run\n", + "!pip install ng_ai[networkx]" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "5b4e4143", + "metadata": {}, + "source": [ + "## AI Suite NetworkX Engine Examples\n", + "### read data with NetowrkX engine, query mode" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "f17abcf8", + "metadata": {}, + "source": [ + "In this example, we are leveraging the NetworkX Engine of NebulaGraph AI Suite, with the GraphD Query mode.\n", + "\n", + "#### Step 1, get dataframe by Querying the Graph\n", + "\n", + "We will scan all edge in type `follow` and `serve` first with props `degree` in `follow` and no props in `serve` as graph: `g`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e158440f", + "metadata": {}, + "outputs": [], + "source": [ + "from ng_ai import NebulaReader\n", + "from ng_ai.config import NebulaGraphConfig\n", + "\n", + "# read data with spark engine, query mode\n", + "config_dict = {\n", + " \"graphd_hosts\": \"graphd:9669\",\n", + " \"user\": \"root\",\n", + " \"password\": \"nebula\",\n", + " \"space\": \"basketballplayer\",\n", + "}\n", + "config = NebulaGraphConfig(**config_dict)\n", + "reader = NebulaReader(engine=\"nebula\", config=config)\n", + "reader.query(edges=[\"follow\", \"serve\"], props=[[\"degree\"], []])\n", + "g = reader.read()\n", + "g.show(10)\n", + "g.draw()" + ] + }, + { + "cell_type": "markdown", + "id": "3617de5f", + "metadata": {}, + "source": [ + "#### Step 2, run Pagerank Algorithm" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "90069aaf", + "metadata": {}, + "outputs": [], + "source": [ + "pr_result = g.algo.pagerank(reset_prob=0.15, max_iter=10)" + ] + }, + { + "cell_type": "markdown", + "id": "66e70ca0", + "metadata": {}, + "source": [ + "#### Step 3, check results of the algorithm\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "abbce2fa", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---------+-------------------+\n", + "| _id| pagerank|\n", + "+---------+-------------------+\n", + "|player133|0.18601069183310504|\n", + "|player126|0.18601069183310504|\n", + "|player130| 1.240071278887367|\n", + "|player108|0.18601069183310504|\n", + "|player102| 1.6602373739502536|\n", + "+---------+-------------------+\n", + "only showing top 5 rows\n", + "\n" + ] + } + ], + "source": [ + "pr_result" + ] + }, + { + "cell_type": "markdown", + "id": "49becbdb", + "metadata": {}, + "source": [ + "#### Step 2, run Conncted Components Algorithm" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cfbcda82", + "metadata": {}, + "outputs": [], + "source": [ + "cc_result = g.algo.connected_components(max_iter=10)" + ] + }, + { + "cell_type": "markdown", + "id": "38181d45", + "metadata": {}, + "source": [ + "#### Step 3, check results of the algorithm\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "bed14375", + "metadata": {}, + "outputs": [], + "source": [ + "cc_result" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "3d088006", + "metadata": {}, + "source": [ + "### Write back algo result to NebulaGraph\n", + "\n", + "Assume that we have a result `graph_result` computed with `g.algo.pagerank()`:\n", + "\n", + "```python\n", + "{'player102': 0.014770646980811417,\n", + " 'player100': 0.02878478843123552,\n", + " 'player101': 0.020163880830622937,\n", + " 'player129': 0.012381302535422786,\n", + " 'player116': 0.015041184157101154,\n", + " 'player121': 0.012178909379871223,\n", + " 'player128': 0.010197889677928056,\n", + "...\n", + "}\n", + "```\n", + "\n", + "Let's write them back to tag: pagerank(pagerank). So we create a TAG `pagerank` in NebulaGraph on same space with the following schema:\n", + "\n", + "```ngql\n", + "CREATE TAG IF NOT EXISTS pagerank (\n", + " pagerank double NOT NULL\n", + ");\n", + "```\n", + "\n", + "Then, we could write the pagerank result to NebulaGraph, to tag `pagerank` with property `pagerank`:\n", + "\n", + "```python\n", + "properties = [\"pagerank\"]\n", + "```\n", + "And pass it to NebulaWriter in `nebula` engine and `nebulagraph_vertex` sink" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6b43261f", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# Run pagerank Algorithm\n", + "graph_result = g.algo.pagerank()" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "c5bbf9e0", + "metadata": {}, + "outputs": [], + "source": [ + "from ng_ai import NebulaWriter\n", + "from ng_ai.config import NebulaGraphConfig\n", + "\n", + "config = NebulaGraphConfig()\n", + "writer = NebulaWriter(\n", + " data=graph_result, sink=\"nebulagraph_vertex\", config=config, engine=\"nebula\"\n", + ")\n", + "\n", + "# properties to write\n", + "properties = [\"pagerank\"]\n", + "\n", + "writer.set_options(\n", + " tag=\"pagerank\",\n", + " vid_field=\"_id\",\n", + " properties=properties,\n", + " batch_size=256,\n", + " write_mode=\"insert\",\n", + ")\n", + "# write back to NebulaGraph\n", + "writer.write()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "9da30271", + "metadata": {}, + "source": [ + "Then we could query the result in NebulaGraph:\n", + "\n", + "```cypher\n", + "MATCH (v:pagerank)\n", + "RETURN id(v), v.pagerank.pagerank LIMIT 10;\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "5bcb02e2", + "metadata": {}, + "source": [ + "## How to run other algorithm examples" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ff5a866d", + "metadata": {}, + "outputs": [], + "source": [ + "# lpa_result = df.algo.label_propagation()\n", + "# louvain_result = df.algo.louvain()\n", + "# k_core_result = df.algo.k_core()\n", + "# degree_statics_result = df.algo.degree_statics()\n", + "# betweenness_centrality_result = df.algo.betweenness_centrality()\n", + "# coefficient_centrality_result = df.algo.coefficient_centrality()\n", + "# bfs_result = df.algo.bfs()\n", + "# hanp_result = df.algo.hanp()\n", + "# jaccard_result = df.algo.jaccard()\n", + "# strong_connected_components_result = df.algo.strong_connected_components()\n", + "# triangle_count_result = df.algo.triangle_count()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/ng_ai/engines.py b/ng_ai/engines.py index 983c756..e4cc2cb 100644 --- a/ng_ai/engines.py +++ b/ng_ai/engines.py @@ -121,8 +121,10 @@ def __init__(self, config=None): import networkx as nx import ng_nx from ng_nx import NebulaReader as NxReader - from ng_nx import NxScanReader, NxWriter - from ng_nx.utils import NxConfig, result_to_df + from ng_nx import NebulaScanReader as NxScanReader + from ng_nx import NebulaWriter as NxWriter + from ng_nx.utils import NebulaGraphConfig as NxConfig + from ng_nx.utils import result_to_df self.nx = nx self.ng_nx = ng_nx @@ -130,6 +132,7 @@ def __init__(self, config=None): self.nx_writer = NxWriter self.nx_scan_reader = NxScanReader self._nx_config = NxConfig + self.nx_config = None self.result_to_df = result_to_df diff --git a/ng_ai/nebula_algo.py b/ng_ai/nebula_algo.py index d577a5b..5d7fb4e 100644 --- a/ng_ai/nebula_algo.py +++ b/ng_ai/nebula_algo.py @@ -57,7 +57,7 @@ def check_engine(self): to NebulaGraphObject For spark, we can directly use the NebulaDataFrameObject """ - if self.ndf_obj.engine.type == "networkx": + if self.ndf_obj.engine.type == "nebula": raise Exception( "For NebulaDataFrameObject in networkx engine," "Plz transform it to NebulaGraphObject to run algorithm", @@ -363,10 +363,10 @@ class NebulaGraphAlgorithm: Networkx to run algorithm """ - def __init__(self, graph): - self.graph = graph + def __init__(self, ng_obj: NebulaGraphObjectImpl): + self.ngraph = ng_obj self.algorithms = [] - self.engine = graph.engine + self.engine = ng_obj.engine def register_algo(self, func): self.algorithms.append(func.__name__) @@ -391,7 +391,7 @@ def check_engine(self): "Plz transform it to NebulaDataFrameObject to run algorithm", "For example: df = nebula_graph.to_df; df.algo.pagerank()", ) - if self.engine.type == "networkx": + if self.engine.type == "nebula": return True else: raise Exception("Unsupported engine type") @@ -399,7 +399,7 @@ def check_engine(self): @algo def pagerank(self, reset_prob=0.15, max_iter=10, **kwargs): self.check_engine() - g = self.graph._graph + g = self.ngraph.get_nx_graph() weight = kwargs.get("weight", None) assert type(weight) in [str, type(None)], "weight must be str or None" assert type(reset_prob) == float, "reset_prob must be float" diff --git a/ng_ai/nebula_data.py b/ng_ai/nebula_data.py index 86d9818..13f248f 100644 --- a/ng_ai/nebula_data.py +++ b/ng_ai/nebula_data.py @@ -3,14 +3,17 @@ from __future__ import annotations +from ng_ai.engines import NebulaEngine + class NebulaGraphObject: - def __init__(self, df: NebulaDataFrameObject): - self.engine = df.engine - self.df = df + def __init__(self, engine: NebulaEngine, raw_graph, **kwargs): + self.engine = engine # if engine is nebula, self._graph is a networkx graph object # if engine is spark, self._graph is a spark graph object - self._graph = None + self._graph = raw_graph + # let's keep unified interface as NebulaDataFrameObject + self.data = self._graph def get_engine(self): return self.engine @@ -30,10 +33,6 @@ def algo(self): def get_nx_graph(self): if self.engine.type == "nebula": - if self._graph is None: - # convert the graph to a networkx graph - # and return the result - self.to_nx_graph() return self._graph else: # for now the else case will be spark, to networkx is not supported @@ -42,33 +41,29 @@ def get_nx_graph(self): "convert to networkx graph is not supported", ) - def to_networkx(self, update=False): + def stats(self, *keywords, **kwargs): + if self.engine.type == "spark": + raise NotImplementedError + elif self.engine.type == "nebula": + print(self.data) + + def draw(self, with_labels=True, **kwargs): if self.engine.type == "nebula": - # convert the graph to a networkx graph - # and return the result - if self._graph is None or update: - self._graph = self.df.to_networkx() - return self._graph + self.engine.nx.draw(self.data, with_labels=with_labels, **kwargs) else: - # for now the else case will be spark, to networkx is not supported - raise Exception( - "For NebulaGraphObject in spark engine," - "convert to networkx graph is not supported", - ) + raise NotImplementedError - def to_graphx(self, update=False): + def show(self, limit=None): if self.engine.type == "spark": - # convert the graph to a graphx graph - # and return the result - if self._graph is None or update: - self._graph = self.df.to_graphx() - return self._graph - else: - # for now the else case will be nebula, to graphx is not supported - raise Exception( - "For NebulaGraphObject in nebula engine," - "convert to graphx is not supported", - ) + raise NotImplementedError + elif self.engine.type == "nebula": + if limit is None: + limit = 0 + for u, v, key, attrs in self.data.edges(keys=True, data=True): + print(f"{u} -> {v} ({key}): {attrs}") + limit -= 1 + if limit == 0: + break class NebulaDataFrameObject: @@ -97,7 +92,7 @@ def algo(self): ) raise NotImplementedError - def to_spark_df(self): + def get_spark_df(self): if self.engine.type == "spark": return self.data else: @@ -134,7 +129,7 @@ def to_graphx(self): ) def to_graph(self): - return NebulaGraphObject(self) + raise NotImplementedError def show(self, *keywords, **kwargs): if self.engine.type == "spark": diff --git a/ng_ai/nebula_reader.py b/ng_ai/nebula_reader.py index 9c2ef30..aeea726 100644 --- a/ng_ai/nebula_reader.py +++ b/ng_ai/nebula_reader.py @@ -3,7 +3,7 @@ from __future__ import annotations from ng_ai.config import NebulaGraphConfig -from ng_ai.nebula_data import NebulaDataFrameObject +from ng_ai.nebula_data import NebulaDataFrameObject, NebulaGraphObject DEFAULT_NEBULA_QUERY_LIMIT = 1000 @@ -54,9 +54,9 @@ def __init__(self, config: NebulaGraphConfig, **kwargs): from ng_ai.engines import NebulaEngine self.engine = NebulaEngine(config) - self.raw_df = None - self.df = None - self.reader = None + self.raw_graph = None + self.graph = None + self.raw_graph_reader = None def scan(self, **kwargs): # Implement the scan method specific to Nebula engine @@ -65,8 +65,7 @@ def scan(self, **kwargs): def query(self, **kwargs): limit = kwargs.get("limit", DEFAULT_NEBULA_QUERY_LIMIT) assert type(limit) == int, "limit should be an integer" - assert "space" in kwargs, "space is required" - space = kwargs["space"] + space = self.config.space assert "edges" in kwargs, "edges is required" edges = kwargs["edges"] assert type(edges) == list, "edges should be a list" @@ -80,28 +79,26 @@ def query(self, **kwargs): assert type(prop) == list, "props should be a list of list" for item in prop: assert type(item) == str, "props should be a list of list of string" - - self.reader = NebulaReader( + self.raw_graph_reader = self.engine.nx_reader( space=space, edges=edges, properties=props, - nebula_config=self.engine._nx_config, + nebula_config=self.engine.nx_config, limit=limit, ) - return self.reader - def load(self, **kwargs): # Implement the load method specific to Nebula engine raise NotImplementedError def read(self, **kwargs): - if self.reader is None: + if self.raw_graph_reader is None: raise Exception( "reader is not initialized, please call query or scan first" ) - self._graph = self.reader.read() - return self._graph + self.raw_graph = self.raw_graph_reader.read() + self.graph = NebulaGraphObject(engine=self.engine, raw_graph=self.raw_graph) + return self.graph def show(self, **kwargs): # Implement the show method specific to Nebula engine diff --git a/pdm.lock b/pdm.lock index 485841b..bf190a0 100644 --- a/pdm.lock +++ b/pdm.lock @@ -231,11 +231,11 @@ summary = "Python package for creating and manipulating graphs and networks" [[package]] name = "ng-nx" -version = "0.1.6" -requires_python = ">=3.7,<3.11" +version = "0.1.7" +requires_python = ">=3.7.1,<3.11" summary = "NebulaGraph NetowrkX adaptor" dependencies = [ - "nebula3-python", + "nebula3-python>=3.4.0", "networkx>=2.5.1", "numpy>=1.21.6", "pandas>=1.3.5", @@ -465,7 +465,7 @@ summary = "Backport of pathlib-compatible object wrapper for zip files" [metadata] lock_version = "4.1" -content_hash = "sha256:f1b40b7de51cfc845d4eec5615e89ae39dc225ea6b8391262d94572f6e78fa49" +content_hash = "sha256:51892055a0a3816e79dbfd7a9633ca9097fc6b1be5522816466f4ae2f7ab7b35" [metadata.files] "appnope 0.1.3" = [ @@ -642,9 +642,9 @@ content_hash = "sha256:f1b40b7de51cfc845d4eec5615e89ae39dc225ea6b8391262d94572f6 {url = "https://files.pythonhosted.org/packages/97/ae/7497bc5e1c84af95e585e3f98585c9f06c627fac6340984c4243053e8f44/networkx-2.6.3.tar.gz", hash = "sha256:c0946ed31d71f1b732b5aaa6da5a0388a345019af232ce2f49c766e2d6795c51"}, {url = "https://files.pythonhosted.org/packages/e9/93/aa6613aa70d6eb4868e667068b5a11feca9645498fd31b954b6c4bb82fa5/networkx-2.6.3-py3-none-any.whl", hash = "sha256:80b6b89c77d1dfb64a4c7854981b60aeea6360ac02c6d4e4913319e0a313abef"}, ] -"ng-nx 0.1.6" = [ - {url = "https://files.pythonhosted.org/packages/95/f6/75c1170a5ff6d49b34ef32831954cd2ed7299be4a7f11eafe4b6f5c889d9/ng-nx-0.1.6.tar.gz", hash = "sha256:5a33e48124a98542eb0068d40182cbafafb2449080d7a1665d1c93dea387ff01"}, - {url = "https://files.pythonhosted.org/packages/ea/ef/577faa8dc05d28f9a1831b0e9d58e061b4b8e186266578b1cf10c5f371f1/ng_nx-0.1.6-py3-none-any.whl", hash = "sha256:6ca8460500bc9863cc44862d8d38ce0910f009f320f494cf9ce9b8c645eea845"}, +"ng-nx 0.1.7" = [ + {url = "https://files.pythonhosted.org/packages/35/b0/97194a1736f34ca687e391fa211be3856e085298d8d51c922c208620ef77/ng-nx-0.1.7.tar.gz", hash = "sha256:3c0d9c1798988bf886a0e34444ec3aaa6285b0a223d448ae4fbb032df61afd06"}, + {url = "https://files.pythonhosted.org/packages/97/99/d202e5ba18b821b6eed62f8c04e2bc10220933a91d0382d770e06cbdd428/ng_nx-0.1.7-py3-none-any.whl", hash = "sha256:15a49e34eb8ec232497288338b2049a969ed2477576afff84f89f1434302b252"}, ] "numpy 1.21.6" = [ {url = "https://files.pythonhosted.org/packages/06/78/b184f13f5461812a17a90b380d70a93fa3532460f0af9d72b0d93d8bc4ff/numpy-1.21.6-cp37-cp37m-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:67c261d6c0a9981820c3a149d255a76918278a6b03b6a036800359aba1256d46"}, diff --git a/pyproject.toml b/pyproject.toml index b63a001..6403a53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,7 +145,7 @@ build-backend = "pdm.pep517.api" # pyspark 2.4.8 doesn't work with python 3.8+, so we use 3.2.3 spark = ["pyspark>=3.2.3"] networkx = [ - "ng_nx>=0.1.6", + "ng_nx>=0.1.7", "pandas>=1.3.5", "numpy>=1.21.6", "scipy>=1.7.3", diff --git a/tests/unit/test_nebula_data.py b/tests/unit/test_nebula_data.py index 4169902..4c9bbd1 100644 --- a/tests/unit/test_nebula_data.py +++ b/tests/unit/test_nebula_data.py @@ -50,12 +50,3 @@ def test_nebula_data_frame_object(spark_df): nebula_df = NebulaDataFrameObject(engine=spark_engine, data=spark_df) assert nebula_df.get_engine().type == "spark" assert_df_equality(nebula_df.data, spark_df) - - -def test_nebula_graph_object(spark_df): - config = NebulaGraphConfig() - spark_engine = SparkEngine(config=config) - nebula_df = NebulaDataFrameObject(engine=spark_engine, data=spark_df) - nebula_graph = NebulaGraphObject(nebula_df) - assert nebula_graph.get_engine().type == "spark" - assert_df_equality(nebula_graph.df.data, spark_df)