diff --git a/.github/workflows/spark.yaml b/.github/workflows/spark.yaml index 2ec25b3d5..6967f0a86 100644 --- a/.github/workflows/spark.yaml +++ b/.github/workflows/spark.yaml @@ -76,3 +76,32 @@ jobs: # stop and clean popd + + - name: Run Nebula2GraphAr example + run: | + export JAVA_HOME=${JAVA_HOME_11_X64} + pushd spark + scripts/get-nebula-to-home.sh + export SPARK_HOME="${HOME}/spark-3.2.2-bin-hadoop3.2" + export PATH="${SPARK_HOME}/bin":"${PATH}" + + scripts/get-nebula-to-home.sh + + scripts/deploy-nebula-default-data.sh + + scripts/build.sh + + scripts/run-nebula2graphar.sh + + # clean the data + docker run \ + --rm \ + --name nebula-console-loader \ + --network nebula-docker-env_nebula-net \ + vesoft/nebula-console:nightly -addr 172.28.3.1 -port 9669 -u root -p nebula -e "use basketballplayer; clear space basketballplayer;" + + # import from GraphAr + scripts/run-graphar2nebula.sh + + # stop and clean + popd diff --git a/spark/README.md b/spark/README.md index a959d0a90..6715624a7 100644 --- a/spark/README.md +++ b/spark/README.md @@ -135,12 +135,88 @@ echo "match (a) -[r] -> () delete a, r;match (a) delete a;" | cypher-shell -u ${ ``` Then run the example: + ```bash scripts/run-graphar2neo4j.sh ``` The example will import the movie graph from GraphAr to Neo4j and you can check the result in the Neo4j browser. +## Running NebulaGraph to GraphAr example + +Running this example requires `Docker` to be installed, if not, follow [this link](https://docs.docker.com/engine/install/). Run `docker version` to check it. + +Spark provides a simple example to convert NebulaGraph data to GraphAr data. +The example is located in the directory ``spark/src/main/scala/com/alibaba/graphar/examples/``. + +To run the example, download Spark and Neo4j first. + +### Spark 3.2.x + +Spark 3.2.x is the recommended runtime to use. The rest of the instructions are provided assuming Spark 3.2.x. + +To place Spark under `${HOME}`: + +```bash +scripts/get-spark-to-home.sh +export SPARK_HOME="${HOME}/spark-3.2.2-bin-hadoop3.2" +export PATH="${SPARK_HOME}/bin":"${PATH}" +``` + +### NebulaGraph + +To place NebulaGraph docker-compose.yaml under `${HOME}`: + +```bash +scripts/get-nebula-to-home.sh +``` + +Start NebulaGraph server by Docker and load `basketballplayer` data: + +```bash +scripts/deploy-nebula-default-data.sh +``` + +Use [NebulaGraph Studio](https://docs.nebula-graph.com.cn/master/nebula-studio/deploy-connect/st-ug-deploy/#docker_studio) to check the graph data, the username is ``root`` and the password is ``nebula``. + +### Building the project + +Run: + +```bash +scripts/build.sh +``` + +### Running the Nebula2GraphAr example + +```bash +scripts/run-nebula2graphar.sh +``` + +The example will convert the basketballplayer data in NebulaGraph to GraphAr data and save it to the directory ``/tmp/graphar/nebula2graphar``. + +### Running the GraphAr2Nebula example + +We can also import the basketballplayer graph from GraphAr to NebulaGraph. + +First clear the NebulaGraph's basketballplayer graph space to show the import result clearly: + +```bash +docker run \ + --rm \ + --name nebula-console-loader \ + --network nebula-docker-env_nebula-net \ + vesoft/nebula-console:nightly -addr 172.28.3.1 -port 9669 -u root -p nebula -e "use basketballplayer; clear space basketballplayer;" +``` + +Then run the example: + +```bash +scripts/run-graphar2nebula.sh +``` + +The example will import the basketballplayer graph from GraphAr to NebulaGraph and you can check the result in NebulaGraph Studio. + ## How to use Please refer to our [GraphAr Spark Library Documentation](https://alibaba.github.io/GraphAr/user-guide/spark-lib.html). diff --git a/spark/pom.xml b/spark/pom.xml index ac7e8a9a4..3c8f8fb32 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -101,6 +101,16 @@ neo4j-connector-apache-spark_2.12 5.0.0_for_spark_3 + + com.vesoft + nebula-spark-connector_3.0 + 3.6.0 + + + org.scala-lang.modules + scala-collection-compat_2.12 + 2.1.1 + diff --git a/spark/scripts/deploy-nebula-default-data.sh b/spark/scripts/deploy-nebula-default-data.sh new file mode 100755 index 000000000..5f7dc61ee --- /dev/null +++ b/spark/scripts/deploy-nebula-default-data.sh @@ -0,0 +1,31 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to You under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +set -eu + +nebula_env_dir="${HOME}/nebula-docker-env" +cd ${nebula_env_dir} + +docker compose up -d +sleep 30 + +docker run \ + --rm \ + --name nebula-console-loader \ + --network nebula-docker-env_nebula-net \ + vesoft/nebula-console:nightly -addr 172.28.3.1 -port 9669 -u root -p nebula -e ":play basketballplayer" diff --git a/spark/scripts/get-nebula-to-home.sh b/spark/scripts/get-nebula-to-home.sh new file mode 100755 index 000000000..e047aae69 --- /dev/null +++ b/spark/scripts/get-nebula-to-home.sh @@ -0,0 +1,32 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to You under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +set -eu +cd "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" + +nebula_env_dir="${HOME}/nebula-docker-env" +if [[ ! -d ${nebula_env_dir} ]]; then + mkdir ${nebula_env_dir} +else + echo ${nebula_env_dir} already exist. +fi +cd ${nebula_env_dir} + +curl -s \ + -o docker-compose.yaml \ + https://raw.githubusercontent.com/vesoft-inc/nebula-spark-connector/master/nebula-spark-connector_3.0/src/test/resources/docker-compose.yaml diff --git a/spark/scripts/run-graphar2nebula.sh b/spark/scripts/run-graphar2nebula.sh new file mode 100755 index 000000000..93ef43ce7 --- /dev/null +++ b/spark/scripts/run-graphar2nebula.sh @@ -0,0 +1,26 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to You under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +set -eu + +cur_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +jar_file="${cur_dir}/../target/graphar-0.1.0-SNAPSHOT-shaded.jar" + +graph_info_path="${GRAPH_INFO_PATH:-/tmp/graphar/nebula2graphar/basketballplayergraph.graph.yml}" +spark-submit --class com.alibaba.graphar.example.GraphAr2Nebula ${jar_file} \ + ${graph_info_path} diff --git a/spark/scripts/run-nebula2graphar.sh b/spark/scripts/run-nebula2graphar.sh new file mode 100755 index 000000000..534d9f2ff --- /dev/null +++ b/spark/scripts/run-nebula2graphar.sh @@ -0,0 +1,28 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with this +# work for additional information regarding copyright ownership. The ASF +# licenses this file to You under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +set -eu + +cur_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +jar_file="${cur_dir}/../target/graphar-0.1.0-SNAPSHOT-shaded.jar" + +vertex_chunk_size=100 +edge_chunk_size=1024 +file_type="parquet" +spark-submit --class com.alibaba.graphar.example.Nebula2GraphAr ${jar_file} \ + "/tmp/graphar/nebula2graphar" ${vertex_chunk_size} ${edge_chunk_size} ${file_type} diff --git a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala new file mode 100644 index 000000000..7138d19bc --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala @@ -0,0 +1,160 @@ +/** + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.alibaba.graphar.example + +import com.alibaba.graphar.{GeneralParams, GraphInfo} +import com.alibaba.graphar.graph.GraphReader +import com.alibaba.graphar.util.Utils +import com.facebook.thrift.protocol.TCompactProtocol +import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter +import com.vesoft.nebula.connector.{ + NebulaConnectionConfig, + WriteMode, + WriteNebulaEdgeConfig, + WriteNebulaVertexConfig +} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.slf4j.LoggerFactory + +object GraphAr2Nebula { + + private val LOG = LoggerFactory.getLogger(this.getClass) + + private val DEFAULT_GRAPH_SPACE = "basketballplayer"; + + def main(args: Array[String]): Unit = { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + + val spark = SparkSession + .builder() + .master("local") + .config(sparkConf) + .getOrCreate() + + // path to the graph information file + val graphInfoPath: String = args(0) + val graphInfo = GraphInfo.loadGraphInfo(graphInfoPath, spark) + + // The edge data need to convert src and dst to the vertex id, + // so we need to read the vertex data with index column. + val graphData = + GraphReader.read(graphInfoPath, spark, addVertexIndex = true) + val vertexData = graphData._1 + val edgeData = graphData._2 + putVertexDataIntoNebula(graphInfo, vertexData) + putEdgeDataIntoNebula(graphInfo, vertexData, edgeData) + } + + private def putVertexDataIntoNebula( + graphInfo: GraphInfo, + vertexData: Map[String, DataFrame] + ): Unit = { + // write each vertex tag to Nebula + vertexData.foreach { case (tag, df) => + val newDF = df.drop(GeneralParams.vertexIndexCol) + val primaryKey = graphInfo.getVertexInfo(tag) + writeVertex(tag, primaryKey.getPrimaryKey(), newDF) + } + } + + def putEdgeDataIntoNebula( + graphInfo: GraphInfo, + vertexData: Map[String, DataFrame], + edgeData: Map[(String, String, String), Map[String, DataFrame]] + ): Unit = { + // write each edge type to Nebula + edgeData.foreach { case (srcEdgeDstLabels, orderMap) => + val sourceTag = srcEdgeDstLabels._1 + val edgeType = srcEdgeDstLabels._2 + val targetTag = srcEdgeDstLabels._3 + val sourcePrimaryKey = graphInfo.getVertexInfo(sourceTag).getPrimaryKey() + val targetPrimaryKey = graphInfo.getVertexInfo(targetTag).getPrimaryKey() + val sourceDF = vertexData(sourceTag) + val targetDF = vertexData(targetTag) + + // convert the source and target index column to the primary key column + val df = Utils.joinEdgesWithVertexPrimaryKey( + orderMap.head._2, + sourceDF, + targetDF, + sourcePrimaryKey, + targetPrimaryKey + ) // use the first dataframe of (adj_list_type_str, dataframe) map + + writeEdge(edgeType, "src", "dst", "_rank", df) + } + } + + private def writeVertex( + tag: String, + idFieldName: String, + df: DataFrame + ): Unit = { + val config = getNebulaConnectionConfig + + val nebulaWriterVertexConfig: WriteNebulaVertexConfig = + WriteNebulaVertexConfig + .builder() + .withSpace(DEFAULT_GRAPH_SPACE) + .withTag(tag) + .withVidField(idFieldName) + .withWriteMode(WriteMode.INSERT) + .withVidAsProp(false) + .withBatch(100) + .build() + + df.write.nebula(config, nebulaWriterVertexConfig).writeVertices() + } + + private def writeEdge( + edgeType: String, + srcField: String, + dstField: String, + rankField: String, + df: DataFrame + ): Unit = { + + val config = getNebulaConnectionConfig + val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig + .builder() + .withSpace(DEFAULT_GRAPH_SPACE) + .withEdge(edgeType) + .withSrcIdField(srcField) + .withDstIdField(dstField) + .withRankField(rankField) + .withSrcAsProperty(false) + .withDstAsProperty(false) + .withRankAsProperty(false) + .withBatch(1000) + .build() + + df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges() + } + + private def getNebulaConnectionConfig: NebulaConnectionConfig = { + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withGraphAddress("127.0.0.1:9669") + .withConenctionRetry(3) + .build() + } +} diff --git a/spark/src/main/scala/com/alibaba/graphar/example/Nebula2GraphAr.scala b/spark/src/main/scala/com/alibaba/graphar/example/Nebula2GraphAr.scala new file mode 100644 index 000000000..ff8461c24 --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/example/Nebula2GraphAr.scala @@ -0,0 +1,162 @@ +/** + * Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.alibaba.graphar.example + +import com.alibaba.graphar.graph.GraphWriter +import com.facebook.thrift.protocol.TCompactProtocol +import com.vesoft.nebula.connector.connector.NebulaDataFrameReader +import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig} +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.slf4j.LoggerFactory + +object Nebula2GraphAr { + + private val LOG = LoggerFactory.getLogger(this.getClass) + + private val DEFAULT_GRAPH_SPACE = "basketballplayer"; + + def main(args: Array[String]): Unit = { + val sparkConf = new SparkConf + sparkConf + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .registerKryoClasses(Array[Class[_]](classOf[TCompactProtocol])) + + val spark = SparkSession + .builder() + .appName("NebulaGraph to GraphAr for basketballplayer Graph") + .master("local") + .config(sparkConf) + .getOrCreate() + + // initialize a graph writer + val writer: GraphWriter = new GraphWriter + + // put basketballplayer graph data into writer + readAndPutDataIntoWriter(writer, spark) + + // output directory + val outputPath: String = args(0) + // vertex chunk size + val vertexChunkSize: Long = args(1).toLong + // edge chunk size + val edgeChunkSize: Long = args(2).toLong + // file type + val fileType: String = args(3) + + // write in graph format + writer.write( + outputPath, + spark, + DEFAULT_GRAPH_SPACE + "graph", + vertexChunkSize, + edgeChunkSize, + fileType + ) + + spark.close() + } + + def readAndPutDataIntoWriter( + writer: GraphWriter, + spark: SparkSession + ): Unit = { + + // read vertices with tag "player" from NebulaGraph as a DataFrame + val playerDF = readVertexDF(spark, "player") + LOG.info("player vertices count: " + playerDF.count()) + assert(playerDF.count() == 51) + // put into writer, vertex tag is "player" + writer.PutVertexData("player", playerDF) + + // read vertices with tag "team" from NebulaGraph as a DataFrame + val teamDF = readVertexDF(spark, "team") + LOG.info("team vertices count: " + teamDF.count()) + assert(teamDF.count() == 30) + // put into writer, vertex tag is "team" + writer.PutVertexData("team", teamDF) + + // read edges with type "player"->"follow"->"player" from NebulaGraph as a DataFrame + val followDF = readEdgeDF(spark, "follow") + LOG.info("follow edges count: " + followDF.count()) + assert(followDF.count() == 81) + // put into writer, source vertex tag is "player", edge type is "follow" + // target vertex tag is "player" + writer.PutEdgeData(("player", "follow", "player"), followDF) + + // read edges with type "player"->"serve"->"team" from NebulaGraph as a DataFrame + val serveDF = readEdgeDF(spark, "serve") + LOG.info("serve edges count: " + serveDF.count()) + assert(serveDF.count() == 152) + // put into writer, source vertex tag is "player", edge type is "serve" + // target vertex tag is "team" + writer.PutEdgeData(("player", "serve", "team"), serveDF) + } + + private def readVertexDF( + spark: SparkSession, + vertexTag: String + ): DataFrame = { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withConenctionRetry(3) + .build() + + val nebulaReadVertexConfig = ReadNebulaConfig + .builder() + .withSpace(DEFAULT_GRAPH_SPACE) + .withLabel(vertexTag) + .withNoColumn(false) + .withReturnCols(List()) + .withLimit(10) + .withPartitionNum(10) + .build() + + val vertices = spark.read + .nebula(config, nebulaReadVertexConfig) + .loadVerticesToDF() + vertices + } + + def readEdgeDF( + spark: SparkSession, + edgeTag: String + ): DataFrame = { + val config = + NebulaConnectionConfig + .builder() + .withMetaAddress("127.0.0.1:9559") + .withTimeout(6000) + .withConenctionRetry(3) + .build() + + val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig + .builder() + .withSpace(DEFAULT_GRAPH_SPACE) + .withLabel(edgeTag) + .withNoColumn(false) + .withReturnCols(List()) + .withLimit(10) + .withPartitionNum(10) + .build() + + val edges = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF() + edges + } +}