diff --git a/README.md b/README.md index da65313..1e4c59c 100644 --- a/README.md +++ b/README.md @@ -1,48 +1,47 @@ -# NebulaGraph Data Intelligence(ngdi) Suite - ![image](https://user-images.githubusercontent.com/1651790/221876073-61ef4edb-adcd-4f10-b3fc-8ddc24918ea1.png) -[![pdm-managed](https://img.shields.io/badge/pdm-managed-blueviolet)](https://pdm.fming.dev) [![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](LICENSE) [![PyPI version](https://badge.fury.io/py/ngdi.svg)](https://badge.fury.io/py/ngdi) [![Python](https://img.shields.io/badge/python-3.6%2B-blue.svg)](https://www.python.org/downloads/release/python-360/) +

+ Data Intelligence Suite with 4 line code to run Graph Algo on NebulaGraph +

-NebulaGraph Data Intelligence Suite for Python (ngdi) is a powerful Python library that offers a range of APIs for data scientists to effectively read, write, analyze, and compute data in NebulaGraph. This library allows data scientists to perform these operations on a single machine using NetworkX, or in a distributed computing environment using Spark, in unified and intuitive API. With ngdi, data scientists can easily access and process data in NebulaGraph, enabling them to perform advanced analytics and gain valuable insights. +

+ + License + -``` - ┌───────────────────────────────────────────────────┐ - │ Spark Cluster │ - │ .─────. .─────. .─────. .─────. │ - ┌─▶│ : ; : ; : ; : ; │ - │ │ `───' `───' `───' `───' │ -Algorithm │ - Spark └───────────────────────────────────────────────────┘ - Engine ┌────────────────────────────────────────────────────────────────┐ - └──┤ │ - │ NebulaGraph Data Intelligence Suite(ngdi) │ - │ ┌────────┐ ┌──────┐ ┌────────┐ ┌─────┐ │ - │ │ Reader │ │ Algo │ │ Writer │ │ GNN │ │ - │ └────────┘ └──────┘ └────────┘ └─────┘ │ - │ ├────────────┴───┬────────┴─────┐ └──────┐ │ - │ ▼ ▼ ▼ ▼ │ - │ ┌─────────────┐ ┌──────────────┐ ┌──────────┐┌───────────┐ │ - ┌──┤ │ SparkEngine │ │ NebulaEngine │ │ NetworkX ││ DGLEngine │ │ - │ │ └─────────────┘ └──────────────┘ └──────────┘└───────────┘ │ - │ └──────────┬─────────────────────────────────────────────────────┘ - │ │ Spark - │ └────────Reader ────────────┐ -Spark Reader Query Mode │ -Scan Mode ▼ - │ ┌───────────────────────────────────────────────────┐ - │ │ NebulaGraph Graph Engine Nebula-GraphD │ - │ ├──────────────────────────────┬────────────────────┤ - │ │ NebulaGraph Storage Engine │ │ - └─▶│ Nebula-StorageD │ Nebula-Metad │ - └──────────────────────────────┴────────────────────┘ -``` + + PyPI version + + + + Python + + + + pdm-managed + + +

+ +--- + +**Documentation**: https://github.com/wey-gu/nebulagraph-di#documentation + +**Source Code**: https://github.com/wey-gu/nebulagraph-di + +--- + + +NebulaGraph Data Intelligence Suite for Python (ngdi) is a powerful Python library that offers APIs for data scientists to effectively read, write, analyze, and compute data in NebulaGraph. + +With the support of single-machine engine(NetworkX), or distributed computing environment using Spark we could perform Graph Analysis and Algorithms on top of NebulaGraph in less than 10 lines of code, in unified and intuitive API. ## Quick Start in 5 Minutes - Setup env with Nebula-Up following [this guide](https://github.com/wey-gu/nebulagraph-di/blob/main/docs/Environment_Setup.md). - Install ngdi with pip from the Jupyter Notebook with http://localhost:8888 (password: `nebula`). -- Open the demo notebook and run cells with `Shift+Enter` or `Ctrl+Enter`. +- Open the demo notebook and run cells one by one. +- Check the [API Reference](https://github.com/wey-gu/nebulagraph-di/docs/API.md) ## Installation @@ -50,181 +49,115 @@ Scan Mode ▼ pip install ngdi ``` -### Spark Engine Prerequisites -- Spark 2.4, 3.0(not yet tested) -- [NebulaGraph 3.4+](https://github.com/vesoft-inc/nebula) -- [NebulaGraph Spark Connector 3.4+](https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/) -- [NebulaGraph Algorithm 3.1+](https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/) - -### NebulaGraph Engine Prerequisites -- [NebulaGraph 3.4+](https://github.com/vesoft-inc/nebula) -- [NebulaGraph Python Client 3.4+](https://github.com/vesoft-inc/nebula-python) -- [NetworkX](https://networkx.org/) - -## Run on PySpark Jupyter Notebook(Spark Engine) - -Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`. +## Usage -```bash -export PYSPARK_PYTHON=python3 -export PYSPARK_DRIVER_PYTHON=jupyter -export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=0.0.0.0 --port=8888 --no-browser" - -pyspark --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ - --driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \ - --jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ - --jars /opt/nebulagraph/ngdi/package/nebula-algo.jar -``` +### Spark Engine Examples -Then we could access Jupyter Notebook with PySpark and refer to [examples/spark_engine.ipynb](https://github.com/wey-gu/nebulagraph-di/examples/spark_engine.ipynb) +See also: [examples/spark_engine.ipynb](https://github.com/wey-gu/nebulagraph-di/blob/main/examples/spark_engine.ipynb) -## Submit Algorithm job to Spark Cluster(Spark Engine) +Run Algorithm on top of NebulaGraph: -Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`; -We have put the `ngdi-py3-env.zip` in `/opt/nebulagraph/ngdi/package/`. -And we have the following Algorithm job in `pagerank.py`: +> Note, there is also query mode, refer to [examples](https://github.com/wey-gu/nebulagraph-di/blob/main/examples/spark_engine.ipynb) or [docs](https://github.com/wey-gu/nebulagraph-di/docs/API.md) for more details. ```python -from ngdi import NebulaGraphConfig from ngdi import NebulaReader -# set NebulaGraph config -config_dict = { - "graphd_hosts": "graphd:9669", - "metad_hosts": "metad0:9669,metad1:9669,metad2:9669", - "user": "root", - "password": "nebula", - "space": "basketballplayer", -} -config = NebulaGraphConfig(**config_dict) - -# read data with spark engine, query mode +# read data with spark engine, scan mode reader = NebulaReader(engine="spark") -query = """ - MATCH ()-[e:follow]->() - RETURN e LIMIT 100000 -""" -reader.query(query=query, edge="follow", props="degree") +reader.scan(edge="follow", props="degree") df = reader.read() # run pagerank algorithm pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10) ``` -> Note, this could be done by Airflow, or other job scheduler in production. - -Then we can submit the job to Spark cluster: +Write back to NebulaGraph: -```bash -spark-submit --master spark://master:7077 \ - --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ - --driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \ - --jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ - --jars /opt/nebulagraph/ngdi/package/nebula-algo.jar \ - --py-files /opt/nebulagraph/ngdi/package/ngdi-py3-env.zip \ - pagerank.py -``` +```python +from ngdi import NebulaWriter +from ngdi.config import NebulaGraphConfig -## Run ngdi algorithm job from python script(Spark Engine) +config = NebulaGraphConfig() -We have everything ready as above, including the `pagerank.py`. +properties = {"louvain": "cluster_id"} -```python -import subprocess - -subprocess.run(["spark-submit", "--master", "spark://master:7077", - "--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar", - "--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-algo.jar", - "--jars", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar", - "--jars", "/opt/nebulagraph/ngdi/package/nebula-algo.jar", - "--py-files", "/opt/nebulagraph/ngdi/package/ngdi-py3-env.zip", - "pagerank.py"]) +writer = NebulaWriter( + data=df_result, sink="nebulagraph_vertex", config=config, engine="spark") +writer.set_options( + tag="louvain", vid_field="_id", properties=properties, + batch_size=256, write_mode="insert",) +writer.write() ``` -## Run on single machine(NebulaGraph Engine) +Then we could query the result in NebulaGraph: -Assuming we have NebulaGraph cluster up and running, and we have the following Algorithm job in `pagerank_nebula_engine.py`: +```cypher +MATCH (v:louvain) +RETURN id(v), v.louvain.cluster_id LIMIT 10; +``` -This file is the same as `pagerank.py` except for the following line: +### NebulaGraph Engine Examples(not yet implemented) + +Basically the same as Spark Engine, but with `engine="nebula"`. ```diff - reader = NebulaReader(engine="spark") + reader = NebulaReader(engine="nebula") ``` -Then we can run the job on single machine: - -```bash -python3 pagerank.py -``` - ## Documentation -[API Reference](https://github.com/wey-gu/nebulagraph-di/docs/API.md) - -## Usage +[Environment Setup](https://github.com/wey-gu/nebulagraph-di/blob/main/docs/Environment_Setup.md) -### Spark Engine Examples - -See also: [examples/spark_engine.ipynb](https://github.com/wey-gu/nebulagraph-di/examples/spark_engine.ipynb) - -```python -from ngdi import NebulaReader - -# read data with spark engine, query mode -reader = NebulaReader(engine="spark") -query = """ - MATCH ()-[e:follow]->() - RETURN e LIMIT 100000 -""" -reader.query(query=query, edge="follow", props="degree") -df = reader.read() # this will take some time -df.show(10) - -# read data with spark engine, scan mode -reader = NebulaReader(engine="spark") -reader.scan(edge="follow", props="degree") -df = reader.read() # this will take some time -df.show(10) +[API Reference](https://github.com/wey-gu/nebulagraph-di/docs/API.md) -# read data with spark engine, load mode (not yet implemented) -reader = NebulaReader(engine="spark") -reader.load(source="hdfs://path/to/edge.csv", format="csv", header=True, schema="src: string, dst: string, rank: int") -df = reader.read() # this will take some time -df.show(10) +## How it works -# run pagerank algorithm -pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10) # this will take some time +ngdi is an unified abstraction layer for different engines, the current implementation is based on Spark, NetworkX, DGL and NebulaGraph, but it's easy to extend to other engines like Flink, GraphScope, PyG etc. -# convert dataframe to NebulaGraphObject -graph = reader.to_graphx() # not yet implemented +``` + ┌───────────────────────────────────────────────────┐ + │ Spark Cluster │ + │ .─────. .─────. .─────. .─────. │ + ┌─▶│ : ; : ; : ; : ; │ + │ │ `───' `───' `───' `───' │ +Algorithm │ + Spark └───────────────────────────────────────────────────┘ + Engine ┌────────────────────────────────────────────────────────────────┐ + └──┤ │ + │ NebulaGraph Data Intelligence Suite(ngdi) │ + │ ┌────────┐ ┌──────┐ ┌────────┐ ┌─────┐ │ + │ │ Reader │ │ Algo │ │ Writer │ │ GNN │ │ + │ └────────┘ └──────┘ └────────┘ └─────┘ │ + │ ├────────────┴───┬────────┴─────┐ └──────┐ │ + │ ▼ ▼ ▼ ▼ │ + │ ┌─────────────┐ ┌──────────────┐ ┌──────────┐┌───────────┐ │ + ┌──┤ │ SparkEngine │ │ NebulaEngine │ │ NetworkX ││ DGLEngine │ │ + │ │ └─────────────┘ └──────────────┘ └──────────┘└───────────┘ │ + │ └──────────┬─────────────────────────────────────────────────────┘ + │ │ Spark + │ └────────Reader ────────────┐ +Spark Reader Query Mode │ +Scan Mode ▼ + │ ┌───────────────────────────────────────────────────┐ + │ │ NebulaGraph Graph Engine Nebula-GraphD │ + │ ├──────────────────────────────┬────────────────────┤ + │ │ NebulaGraph Storage Engine │ │ + └─▶│ Nebula-StorageD │ Nebula-Metad │ + └──────────────────────────────┴────────────────────┘ ``` -### NebulaGraph Engine Examples(not yet implemented) +### Spark Engine Prerequisites +- Spark 2.4, 3.0(not yet tested) +- [NebulaGraph 3.4+](https://github.com/vesoft-inc/nebula) +- [NebulaGraph Spark Connector 3.4+](https://repo1.maven.org/maven2/com/vesoft/nebula-spark-connector/) +- [NebulaGraph Algorithm 3.1+](https://repo1.maven.org/maven2/com/vesoft/nebula-algorithm/) -```python -from ngdi import NebulaReader +### NebulaGraph Engine Prerequisites +- [NebulaGraph 3.4+](https://github.com/vesoft-inc/nebula) +- [NebulaGraph Python Client 3.4+](https://github.com/vesoft-inc/nebula-python) +- [NetworkX](https://networkx.org/) -# read data with nebula engine, query mode -reader = NebulaReader(engine="nebula") -reader.query(""" - MATCH ()-[e:follow]->() - RETURN e.src, e.dst, e.degree LIMIT 100000 -""") -df = reader.read() # this will take some time -df.show(10) - -# read data with nebula engine, scan mode -reader = NebulaReader(engine="nebula") -reader.scan(edge_types=["follow"]) -df = reader.read() # this will take some time -df.show(10) - -# convert dataframe to NebulaGraphObject -graph = reader.to_graph() # this will take some time -graph.nodes.show(10) -graph.edges.show(10) +## License -# run pagerank algorithm -pr_result = graph.algo.pagerank(reset_prob=0.15, max_iter=10) # this will take some time -``` +This project is licensed under the terms of the Apache License 2.0. \ No newline at end of file diff --git a/docs/API.md b/docs/API.md index 9a2023d..1d074c8 100644 --- a/docs/API.md +++ b/docs/API.md @@ -69,6 +69,18 @@ reader.query(query=query, edge="follow", props="degree") df = reader.read() ``` +- Load mode + +> not yet implemented + +```python +# read data with spark engine, load mode (not yet implemented) +reader = NebulaReader(engine="spark") +reader.load(source="hdfs://path/to/edge.csv", format="csv", header=True, schema="src: string, dst: string, rank: int") +df = reader.read() # this will take some time +df.show(10) +``` + ## engines - `ngdi.engines.SparkEngine` is the Spark Engine for `ngdi.NebulaReader`, `ngdi.NebulaWriter` and `ngdi.NebulaAlgorithm`. diff --git a/docs/Environment_Setup.md b/docs/Environment_Setup.md index 4b8df8f..e055cc5 100644 --- a/docs/Environment_Setup.md +++ b/docs/Environment_Setup.md @@ -1,6 +1,15 @@ # Envrionment Setup -## With Nebula-UP +**TOC** + +- [Quick Start in 5 Minutes](#with-nebula-upqiuck-start) +- [Run In Production](#in-production) + - [Run on PySpark Jupyter Notebook](#run-on-pyspark-jupyter-notebook) + - [Submit Algorithm job to Spark Cluster](#submit-algorithm-job-to-spark-cluster) + - [Run ngdi algorithm PySpark job from python script](#run-ngdi-algorithm-pyspark-job-from-python-script) + - [Run on single machine with NebulaGraph engine](#run-on-single-machine-with-nebulagraph-engine) + +## With Nebula-UP(qiuck start) ### Installation @@ -31,3 +40,102 @@ Just visit [http://localhost:7001](http://localhost:7001) in your browser, with: - host: `graphd:9669` - user: `root` - password: `nebula` + +## Rin In Production + +### Run on PySpark Jupyter Notebook + +Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`. + +```bash +export PYSPARK_PYTHON=python3 +export PYSPARK_DRIVER_PYTHON=jupyter +export PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip=0.0.0.0 --port=8888 --no-browser" + +pyspark --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ + --driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \ + --jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ + --jars /opt/nebulagraph/ngdi/package/nebula-algo.jar +``` + +Then we could access Jupyter Notebook with PySpark and refer to [examples/spark_engine.ipynb](https://github.com/wey-gu/nebulagraph-di/blob/main/examples/spark_engine.ipynb) + +### Submit Algorithm job to Spark Cluster + +Assuming we have put the `nebula-spark-connector.jar` and `nebula-algo.jar` in `/opt/nebulagraph/ngdi/package/`; +We have put the `ngdi-py3-env.zip` in `/opt/nebulagraph/ngdi/package/`. +And we have the following Algorithm job in `pagerank.py`: + +```python +from ngdi import NebulaGraphConfig +from ngdi import NebulaReader + +# set NebulaGraph config +config_dict = { + "graphd_hosts": "graphd:9669", + "metad_hosts": "metad0:9669,metad1:9669,metad2:9669", + "user": "root", + "password": "nebula", + "space": "basketballplayer", +} +config = NebulaGraphConfig(**config_dict) + +# read data with spark engine, query mode +reader = NebulaReader(engine="spark") +query = """ + MATCH ()-[e:follow]->() + RETURN e LIMIT 100000 +""" +reader.query(query=query, edge="follow", props="degree") +df = reader.read() + +# run pagerank algorithm +pr_result = df.algo.pagerank(reset_prob=0.15, max_iter=10) +``` + +> Note, this could be done by Airflow, or other job scheduler in production. + +Then we can submit the job to Spark cluster: + +```bash +spark-submit --master spark://master:7077 \ + --driver-class-path /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ + --driver-class-path /opt/nebulagraph/ngdi/package/nebula-algo.jar \ + --jars /opt/nebulagraph/ngdi/package/nebula-spark-connector.jar \ + --jars /opt/nebulagraph/ngdi/package/nebula-algo.jar \ + --py-files /opt/nebulagraph/ngdi/package/ngdi-py3-env.zip \ + pagerank.py +``` + +### Run ngdi algorithm PySpark job from python script + +We have everything ready as above, including the `pagerank.py`. + +```python +import subprocess + +subprocess.run(["spark-submit", "--master", "spark://master:7077", + "--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar", + "--driver-class-path", "/opt/nebulagraph/ngdi/package/nebula-algo.jar", + "--jars", "/opt/nebulagraph/ngdi/package/nebula-spark-connector.jar", + "--jars", "/opt/nebulagraph/ngdi/package/nebula-algo.jar", + "--py-files", "/opt/nebulagraph/ngdi/package/ngdi-py3-env.zip", + "pagerank.py"]) +``` + +### Run on single machine with NebulaGraph engine + +Assuming we have NebulaGraph cluster up and running, and we have the following Algorithm job in `pagerank_nebula_engine.py`: + +This file is the same as `pagerank.py` except for the following line: + +```diff +- reader = NebulaReader(engine="spark") ++ reader = NebulaReader(engine="nebula") +``` + +Then we can run the job on single machine: + +```bash +python3 pagerank.py +``` \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index ad5977c..e192d19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ [project] name = "ngdi" -version = "0.1.9" +version = "0.2.3" description = "NebulaGraph Data Intelligence Suite" authors = [ {name = "Wey Gu", email = "weyl.gu@gmail.com"},