diff --git a/examples/ngdi_from_ngql_udf.ipynb b/examples/ngdi_from_ngql_udf.ipynb index 3f37c2f..9275936 100644 --- a/examples/ngdi_from_ngql_udf.ipynb +++ b/examples/ngdi_from_ngql_udf.ipynb @@ -1,46 +1,55 @@ { "cells": [ { + "attachments": {}, "cell_type": "markdown", - "id": "07b36477", - "metadata": {}, - "source": [ - "### Verify that pyspark-jupyter can serve http requests" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "779b98c7", + "id": "ee4221c8", "metadata": {}, - "outputs": [], "source": [ - "import http.server\n", - "import socketserver\n", + "### Run ngdi API gateway to handle call with HTTP\n", "\n", - "PORT = 9999\n", + "It's either a cURL call or a UDF query from ngdi-graphd.\n", "\n", - "Handler = http.server.SimpleHTTPRequestHandler\n", + "- Query from ngdi-graphd\n", "\n", - "with socketserver.TCPServer((\"\", PORT), Handler) as httpd:\n", - " print(\"serving at port\", PORT)\n", - " httpd.serve_forever()\n", + "```cypher\n", + "-- Prepare the write schema\n", + "USE basketballplayer;\n", + "CREATE TAG IF NOT EXISTS pagerank(pagerank string);\n", + ":sleep 20;\n", + "-- Call with ngdi()\n", + "RETURN ngdi(\"pagerank\", [\"follow\"], [\"degree\"], \"spark\", {space: \"basketballplayer\", max_iter: 10}, {write_mode: \"insert\"})\n", + "```\n", "\n", - "# listen at 9999\n", + "Where the parameters are:\n", "\n", - "# call from graphd\n", + "- `algo_name`: the name of the algorithm, e.g. `pagerank`\n", + "- `edge_types`: the edge types to be used in the algorithm, e.g. `[\"follow\"]`\n", + "- `edge_weights`: the edge weights to be used in the algorithm, e.g. `[\"degree\"]`\n", + "- `mode`: the mode(engine) to be used in the algorithm, e.g. `spark`, `networkx`\n", + "- `algo_context`: the context to be used in the algorithm, e.g. `{space: \"basketballplayer\", max_iter: 10}`\n", "\n", - "!docker exec -it nebula-docker-compose_graphd_1 curl http://jupyter:9999\n", - " \n", - "# it worked!" - ] - }, - { - "cell_type": "markdown", - "id": "ee4221c8", - "metadata": {}, - "source": [ - "### Let's create an API to take request from the UDF from GraphD" + "- Call with cURL\n", + "\n", + "```bash\n", + "curl -X POST \\\n", + " -H \"Content-Type: application/json\" \\\n", + " -d '{\n", + " \"write_context\": {\n", + " \"write_mode\": \"insert\"\n", + " },\n", + " \"read_context\": {\n", + " \"edge_types\": [\"follow\"],\n", + " \"read_mode\": \"scan\",\n", + " \"edge_weights\": [\"degree\"]\n", + " },\n", + " \"algo_context\": {\n", + " \"name\": \"pagerank\",\n", + " \"space\": \"basketballplayer\"\n", + " }\n", + " }' \\\n", + " http://jupyter:9999/api/v0/spark/pagerank\n", + "```\n" ] }, { @@ -50,7 +59,8 @@ "metadata": {}, "outputs": [], "source": [ - "!pip install flask" + "# install ngdi if not yet.\n", + "!pip install flask ngdi" ] }, { @@ -60,24 +70,55 @@ "metadata": {}, "outputs": [], "source": [ + "import os\n", + "\n", "from ngdi import NebulaReader, NebulaWriter\n", "from ngdi.config import NebulaGraphConfig\n", "\n", "from flask import Flask, request\n", "app = Flask(__name__)\n", "\n", - "nebula_config = NebulaGraphConfig()\n", "\n", - "@app.route('/api/v0/parallel/', methods=['POST'])\n", + "def get_nebulagraph_config(space=\"basketballplayer\"):\n", + " # get credentials from env\n", + " graphd_hosts = os.getenv(\"GRAPHD_HOSTS\", \"graphd:9669\")\n", + " metad_hosts = os.getenv(\"METAD_HOSTS\", \"metad0:9559,metad1:9559,metad2:9559\")\n", + " user = os.getenv(\"USER\", \"root\")\n", + " password = os.getenv(\"PASSWORD\", \"nebula\")\n", + "\n", + " return NebulaGraphConfig(\n", + " graphd_hosts = graphd_hosts,\n", + " metad_hosts = metad_hosts,\n", + " user = user,\n", + " password = password,\n", + " space = space\n", + " )\n", + "\n", + "@app.route('/api/v0/spark/', methods=['GET'])\n", + "def test(algo_name):\n", + " return {\"status\": \"OK\"}\n", + "\n", + "@app.route('/api/v0/spark/', methods=['POST'])\n", "def parallel(algo_name):\n", " data = request.get_json()\n", - " \n", + "\n", + " try:\n", + " # get algo_context\n", + " algo_context = data.get(\"algo_context\")\n", + " assert algo_context is not None, \"algo_context should not be None\"\n", + " assert algo_context.get(\"space\") is not None, \"space should not be None\"\n", + " except Exception as e:\n", + " print(e)\n", + " return {\"error\": f\"algo context parsing failed: {e}\"}\n", + " space = algo_context.get(\"space\")\n", + " nebula_config = get_nebulagraph_config(space=space)\n", + "\n", " reader = NebulaReader(engine=\"spark\")\n", " # get read_context\n", " try:\n", " read_context = data.get(\"read_context\")\n", " read_mode = read_context.get(\"read_mode\")\n", - " edges = read_context.get(\"edges\")\n", + " edges = read_context.get(\"edge_types\")\n", " edge_weights = read_context.get(\"edge_weights\")\n", " \n", " assert len(edges) == len(edge_weights) and len(edges) > 0, \"edges and edge_weights should have the same length and length > 0\"\n", @@ -95,7 +136,7 @@ " # ref: https://github.com/vesoft-inc/nebula-algorithm/blob/master/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala\n", " # ref: https://github.com/vesoft-inc/nebula-spark-connector/blob/master/example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala\n", " df = reader.read()\n", - " exception Exception as e:\n", + " except Exception as e:\n", " # TBD, need to return error code, return empty json for now\n", " print(e)\n", " return {\"error\": f\"read failed: {e}\"}\n", @@ -103,9 +144,9 @@ " # ensure the algo_name is supported\n", " assert algo_name in df.algo.get_all_algo(), f\"{algo_name} is not supported\"\n", "\n", - " # get algo_context\n", - " algo_context = data.get(\"algo_context\")\n", - " algo_config = algo_context.get(\"algo_config\", {})\n", + " algo_config = dict(algo_context)\n", + " algo_config.pop(\"space\")\n", + " algo_config.pop(\"name\")\n", " # call df.algo.algo_name(**algo_config)\n", " algo_result = getattr(df.algo, algo_name)(**algo_config)\n", " except Exception as e:\n", @@ -140,6 +181,29 @@ " \"writer_result\": response is None or response,\n", " }\n" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a62330fa", + "metadata": {}, + "outputs": [], + "source": [ + "app.run(host='0.0.0.0', port=9999)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "id": "8d9fecc1", + "metadata": {}, + "source": [ + "The result of the json(translated into table) is like:\n", + "\n", + "| algo_result_stats.0._id | algo_result_stats.0.pagerank | algo_result_stats.0.summary | algo_result_stats.1._id | algo_result_stats.1.pagerank | algo_result_stats.1.summary | algo_result_stats.2._id | algo_result_stats.2.pagerank | algo_result_stats.2.summary | algo_result_stats.3._id | algo_result_stats.3.pagerank | algo_result_stats.3.summary | algo_result_stats.4._id | algo_result_stats.4.pagerank | algo_result_stats.4.summary | algo_result_stats.5._id | algo_result_stats.5.pagerank | algo_result_stats.5.summary | algo_result_stats.6._id | algo_result_stats.6.pagerank | algo_result_stats.6.summary | algo_result_stats.7._id | algo_result_stats.7.pagerank | algo_result_stats.7.summary | reader_result_stats.0._dstId | reader_result_stats.0._rank | reader_result_stats.0._srcId | reader_result_stats.0.degree | reader_result_stats.0.summary | reader_result_stats.1._dstId | reader_result_stats.1._rank | reader_result_stats.1._srcId | reader_result_stats.1.degree | reader_result_stats.1.summary | reader_result_stats.2._dstId | reader_result_stats.2._rank | reader_result_stats.2._srcId | reader_result_stats.2.degree | reader_result_stats.2.summary | reader_result_stats.3._dstId | reader_result_stats.3._rank | reader_result_stats.3._srcId | reader_result_stats.3.degree | reader_result_stats.3.summary | reader_result_stats.4._dstId | reader_result_stats.4._rank | reader_result_stats.4._srcId | reader_result_stats.4.degree | reader_result_stats.4.summary | reader_result_stats.5._dstId | reader_result_stats.5._rank | reader_result_stats.5._srcId | reader_result_stats.5.degree | reader_result_stats.5.summary | reader_result_stats.6._dstId | reader_result_stats.6._rank | reader_result_stats.6._srcId | reader_result_stats.6.degree | reader_result_stats.6.summary | reader_result_stats.7._dstId | reader_result_stats.7._rank | reader_result_stats.7._srcId | reader_result_stats.7.degree | reader_result_stats.7.summary | writer_result |\n", + "| ----------------------- | ---------------------------- | --------------------------- | ----------------------- | ---------------------------- | --------------------------- | ----------------------- | ---------------------------- | --------------------------- | ----------------------- | ---------------------------- | --------------------------- | ----------------------- | ---------------------------- | --------------------------- | ----------------------- | ---------------------------- | --------------------------- | ----------------------- | ---------------------------- | --------------------------- | ----------------------- | ---------------------------- | --------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ---------------------------- | --------------------------- | ---------------------------- | ---------------------------- | ----------------------------- | ------------- |\n", + "| 44 | 44 | count | | 1.0 | mean | | 1.2523434472897175 | stddev | player100 | 0.18601069183310504 | min | | 0.2003842452929359 | 25% | | 0.45392364809815683 | 50% | | 1.0722447015912284 | 75% | player150 | 5.488939515247179 | max | 81 | 81 | 81 | 81 | count | | 0.0 | | 82.44444444444444 | mean | | 0.0 | | 22.10316719386613 | stddev | player100 | 0 | player100 | -1 | min | | 0 | | 80 | 25% | | 0 | | 90 | 50% | | 0 | | 90 | 75% | player150 | 0 | player150 | 100 | max | true |" + ] } ], "metadata": {