Skip to content

Commit

Permalink
WIP: feat: ngdi UDF gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
wey-gu committed Mar 7, 2023
1 parent 66ccbb0 commit 68d2173
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 0 deletions.
14 changes: 14 additions & 0 deletions docs/ngdi_API_Gateway.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

## Calling from ngql

```cypher
RETURN ngdi("pagerank", ["follow"], ["degree"], "compact")
```

## Setup ngdi API Gateway

See: [../examples/ngdi_from_ngql_udf.ipynb](https://github.com/wey-gu/nebulagraph-di/blob/main/examples/ngdi_from_ngql_udf.ipynb)

## UDF build

See https://github.com/wey-gu/nebula/tree/ngdi_udf
166 changes: 166 additions & 0 deletions examples/ngdi_from_ngql_udf.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "07b36477",
"metadata": {},
"source": [
"### Verify that pyspark-jupyter can serve http requests"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "779b98c7",
"metadata": {},
"outputs": [],
"source": [
"import http.server\n",
"import socketserver\n",
"\n",
"PORT = 9999\n",
"\n",
"Handler = http.server.SimpleHTTPRequestHandler\n",
"\n",
"with socketserver.TCPServer((\"\", PORT), Handler) as httpd:\n",
" print(\"serving at port\", PORT)\n",
" httpd.serve_forever()\n",
"\n",
"# listen at 9999\n",
"\n",
"# call from graphd\n",
"\n",
"!docker exec -it nebula-docker-compose_graphd_1 curl http://jupyter:9999\n",
" \n",
"# it worked!"
]
},
{
"cell_type": "markdown",
"id": "ee4221c8",
"metadata": {},
"source": [
"### Let's create an API to take request from the UDF from GraphD"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a9076c3c",
"metadata": {},
"outputs": [],
"source": [
"!pip install flask"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a94e7d58",
"metadata": {},
"outputs": [],
"source": [
"from ngdi import NebulaReader, NebulaWriter\n",
"from ngdi.config import NebulaGraphConfig\n",
"\n",
"from flask import Flask, request\n",
"app = Flask(__name__)\n",
"\n",
"nebula_config = NebulaGraphConfig()\n",
"\n",
"@app.route('/api/v0/parallel/<algo_name>', methods=['POST'])\n",
"def parallel(algo_name):\n",
" data = request.get_json()\n",
" \n",
" reader = NebulaReader(engine=\"spark\")\n",
" # get read_context\n",
" try:\n",
" read_context = data.get(\"read_context\")\n",
" read_mode = read_context.get(\"read_mode\")\n",
" edges = read_context.get(\"edges\")\n",
" edge_weights = read_context.get(\"edge_weights\")\n",
" \n",
" assert len(edges) == len(edge_weights) and len(edges) > 0, \"edges and edge_weights should have the same length and length > 0\"\n",
" # TBD, it seems that the reader.scan() need to support more than one edge type\n",
" # https://github.com/wey-gu/nebulagraph-di/issues/19\n",
" # need to query all and union them.\n",
" if read_mode == \"scan\":\n",
" reader.scan(edge=edges[0], props=edge_weights[0])\n",
" elif read_mode == \"query\":\n",
" query = read_context.get(\"query\")\n",
" assert query is not None, \"query should not be None\"\n",
" reader.query(query, edge=edges[0], props=edge_weights[0])\n",
" # TODO(wey): need to revisit the query and scan API, to align them.\n",
" # ref: https://github.com/vesoft-inc/nebula-algorithm/blob/master/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala\n",
" # ref: https://github.com/vesoft-inc/nebula-algorithm/blob/master/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala\n",
" # ref: https://github.com/vesoft-inc/nebula-spark-connector/blob/master/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala\n",
" df = reader.read()\n",
" exception Exception as e:\n",
" # TBD, need to return error code, return empty json for now\n",
" print(e)\n",
" return {\"error\": f\"read failed: {e}\"}\n",
" try:\n",
" # ensure the algo_name is supported\n",
" assert algo_name in df.algo.get_all_algo(), f\"{algo_name} is not supported\"\n",
"\n",
" # get algo_context\n",
" algo_context = data.get(\"algo_context\")\n",
" algo_config = algo_context.get(\"algo_config\", {})\n",
" # call df.algo.algo_name(**algo_config)\n",
" algo_result = getattr(df.algo, algo_name)(**algo_config)\n",
" except Exception as e:\n",
" # TBD, need to return error code, return empty json for now\n",
" print(e)\n",
" return {\"error\": f\"algo execution failed: {e}\"}\n",
"\n",
" try:\n",
" # get write_context\n",
" write_context = data.get(\"write_context\")\n",
" write_mode = write_context.get(\"write_mode\")\n",
" properties = write_context.get(\"properties\", {})\n",
" batch_size = write_context.get(\"batch_size\", 256)\n",
" # TBD, need to support more than one edge type\n",
" writer = NebulaWriter(data=algo_result, sink=\"nebulagraph_vertex\", config=nebula_config, engine=\"spark\")\n",
" writer.set_options(\n",
" tag=algo_name,\n",
" vid_field=\"_id\",\n",
" properties=properties,\n",
" batch_size=batch_size,\n",
" write_mode=write_mode,\n",
" )\n",
" response = writer.write()\n",
" except Exception as e:\n",
" # TBD, need to return error code, return empty json for now\n",
" print(e)\n",
" return {\"error\": f\"write failed: {e}\"}\n",
" # return reader result's stats, algo result's stats, writer result\n",
" return {\n",
" \"reader_result_stats\": list(map(lambda r: r.asDict(), df.data.summary().collect())),\n",
" \"algo_result_stats\": list(map(lambda r: r.asDict(), writer.raw_df.summary().collect())),\n",
" \"writer_result\": response is None or response,\n",
" }\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

0 comments on commit 68d2173

Please sign in to comment.