diff --git a/hugegraph-dist/scripts/dependency/known-dependencies.txt b/hugegraph-dist/scripts/dependency/known-dependencies.txt
index c71c7e018..57ba9bf1d 100644
--- a/hugegraph-dist/scripts/dependency/known-dependencies.txt
+++ b/hugegraph-dist/scripts/dependency/known-dependencies.txt
@@ -179,6 +179,24 @@ hadoop-yarn-client-3.1.1.jar
hadoop-yarn-common-3.1.1.jar
hamcrest-2.1.jar
hamcrest-core-1.3.jar
+hbase-client-2.2.3.jar
+hbase-common-2.2.3.jar
+hbase-hadoop-compat-2.2.3.jar
+hbase-hadoop2-compat-2.2.3.jar
+hbase-http-2.2.3.jar
+hbase-mapreduce-2.2.3.jar
+hbase-metrics-2.2.3.jar
+hbase-metrics-api-2.2.3.jar
+hbase-procedure-2.2.3.jar
+hbase-protocol-2.2.3.jar
+hbase-protocol-shaded-2.2.3.jar
+hbase-replication-2.2.3.jar
+hbase-server-2.2.3.jar
+hbase-shaded-client-byo-hadoop-2.2.3.jar
+hbase-shaded-miscellaneous-2.2.1.jar
+hbase-shaded-netty-2.2.1.jar
+hbase-shaded-protobuf-2.2.1.jar
+hbase-zookeeper-2.2.3.jar
hibernate-validator-6.0.17.Final.jar
hive-classification-3.1.2.jar
hive-classification-3.1.3.jar
@@ -225,6 +243,7 @@ hk2-utils-3.0.1.jar
hppc-0.7.2.jar
htrace-core4-4.0.1-incubating.jar
htrace-core4-4.1.0-incubating.jar
+htrace-core4-4.2.0-incubating.jar
httpclient-4.5.13.jar
httpclient-4.5.2.jar
httpclient-4.5.9.jar
@@ -578,3 +597,11 @@ zookeeper-3.4.9.jar
zookeeper-3.6.2.jar
zookeeper-jute-3.6.2.jar
zstd-jni-1.5.0-4.jar
+disruptor-3.3.6.jar
+findbugs-annotations-1.3.9-1.jar
+jamon-runtime-2.4.1.jar
+javax.el-3.0.1-b12.jar
+javax.servlet.jsp-2.3.2.jar
+javax.servlet.jsp-api-2.3.1.jar
+jcodings-1.0.18.jar
+joni-2.1.11.jar
diff --git a/hugegraph-loader/assembly/static/bin/get-params.sh b/hugegraph-loader/assembly/static/bin/get-params.sh
index c73ebbbd6..e0302399c 100644
--- a/hugegraph-loader/assembly/static/bin/get-params.sh
+++ b/hugegraph-loader/assembly/static/bin/get-params.sh
@@ -27,7 +27,7 @@ function get_params() {
--incremental-mode | --failure-mode | --batch-insert-threads | --single-insert-threads | \
--max-conn | --max-conn-per-route | --batch-size | --max-parse-errors | --max-insert-errors | \
--timeout | --shutdown-timeout | --retry-times | --retry-interval | --check-vertex | \
- --print-progress | --dry-run | --help)
+ --print-progress | --dry-run | --sink-type | --vertex-partitions | --edge-partitions | --help )
HUGEGRAPH_PARAMS="$HUGEGRAPH_PARAMS $1 $2"
shift 2
;;
diff --git a/hugegraph-loader/assembly/static/example/spark/edge_created.json b/hugegraph-loader/assembly/static/example/spark/edge_created.json
new file mode 100644
index 000000000..ba093eab1
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/edge_created.json
@@ -0,0 +1,4 @@
+{"source_name": "marko", "target_id": 1, "date": "2017-12-10", "weight": 0.4}
+{"source_name": "josh", "target_id": 1, "date": "2009-11-11", "weight": 0.4}
+{"source_name": "josh", "target_id": 2, "date": "2017-12-10", "weight": 1.0}
+{"source_name": "peter", "target_id": 1, "date": "2017-03-24", "weight": 0.2}
diff --git a/hugegraph-loader/assembly/static/example/spark/edge_knows.json b/hugegraph-loader/assembly/static/example/spark/edge_knows.json
new file mode 100644
index 000000000..0c2af9ad7
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/edge_knows.json
@@ -0,0 +1,2 @@
+{"source_name": "marko", "target_name": "vadas", "date": "20160110", "weight": 0.5}
+{"source_name": "marko", "target_name": "josh", "date": "20130220", "weight": 1.0}
diff --git a/hugegraph-loader/assembly/static/example/spark/schema.groovy b/hugegraph-loader/assembly/static/example/spark/schema.groovy
new file mode 100644
index 000000000..15e829d31
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/schema.groovy
@@ -0,0 +1,33 @@
+// Define schema
+schema.propertyKey("name").asText().ifNotExist().create();
+schema.propertyKey("age").asInt().ifNotExist().create();
+schema.propertyKey("city").asText().ifNotExist().create();
+schema.propertyKey("weight").asDouble().ifNotExist().create();
+schema.propertyKey("lang").asText().ifNotExist().create();
+schema.propertyKey("date").asText().ifNotExist().create();
+schema.propertyKey("price").asDouble().ifNotExist().create();
+
+schema.vertexLabel("person")
+ .properties("name", "age", "city")
+ .primaryKeys("name")
+ .nullableKeys("age", "city")
+ .ifNotExist()
+ .create();
+schema.vertexLabel("software")
+ .properties("name", "lang", "price")
+ .primaryKeys("name")
+ .ifNotExist()
+ .create();
+
+schema.edgeLabel("knows")
+ .sourceLabel("person")
+ .targetLabel("person")
+ .properties("date", "weight")
+ .ifNotExist()
+ .create();
+schema.edgeLabel("created")
+ .sourceLabel("person")
+ .targetLabel("software")
+ .properties("date", "weight")
+ .ifNotExist()
+ .create();
diff --git a/hugegraph-loader/assembly/static/example/spark/struct.json b/hugegraph-loader/assembly/static/example/spark/struct.json
new file mode 100644
index 000000000..275bdbd32
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/struct.json
@@ -0,0 +1,57 @@
+{
+ "vertices": [
+ {
+ "label": "person",
+ "input": {
+ "type": "file",
+ "path": "example/spark/vertex_person.json",
+ "format": "JSON",
+ "header": ["name", "age", "city"],
+ "charset": "UTF-8",
+ "skipped_line": {
+ "regex": "(^#|^//).*"
+ }
+ },
+ "id": "name",
+ "null_values": ["NULL", "null", ""]
+ },
+ {
+ "label": "software",
+ "input": {
+ "type": "file",
+ "path": "example/spark/vertex_software.json",
+ "format": "JSON",
+ "header": ["id","name", "lang", "price","ISBN"],
+ "charset": "GBK"
+ },
+ "id": "name",
+ "ignored": ["ISBN"]
+ }
+ ],
+ "edges": [
+ {
+ "label": "knows",
+ "source": ["source_name"],
+ "target": ["target_name"],
+ "input": {
+ "type": "file",
+ "path": "example/spark/edge_knows.json",
+ "format": "JSON",
+ "date_format": "yyyyMMdd",
+ "header": ["source_name","target_name", "date", "weight"]
+ },
+ "field_mapping": {
+ "source_name": "name",
+ "target_name": "name"
+ }
+ }
+ ],
+ "backendStoreInfo":
+ {
+ "edge_tablename": "hugegraph:g_oe",
+ "vertex_tablename": "hugegraph:g_v",
+ "hbase_zookeeper_quorum": "127.0.0.1",
+ "hbase_zookeeper_property_clientPort": "2181",
+ "zookeeper_znode_parent": "/hbase"
+ }
+}
diff --git a/hugegraph-loader/assembly/static/example/spark/vertex_person.json b/hugegraph-loader/assembly/static/example/spark/vertex_person.json
new file mode 100644
index 000000000..e018df411
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/vertex_person.json
@@ -0,0 +1,6 @@
+{"name": "marko", "age": "29", "city": "Beijing"}
+{"name": "vadas", "age": "27", "city": "Hongkong"}
+{"name": "josh", "age": "32", "city": "Beijing"}
+{"name": "peter", "age": "35", "city": "Shanghai"}
+{"name": "li,nary", "age": "26", "city": "Wu,han"}
+{"name": "tom", "age": "null", "city": "NULL"}
diff --git a/hugegraph-loader/assembly/static/example/spark/vertex_software.json b/hugegraph-loader/assembly/static/example/spark/vertex_software.json
new file mode 100644
index 000000000..cd9dbf3cb
--- /dev/null
+++ b/hugegraph-loader/assembly/static/example/spark/vertex_software.json
@@ -0,0 +1,2 @@
+{ "name": "lop", "lang": "java","price": "328","ISBN": "ISBN978-7-107-18618-5"}
+{ "name": "ripple", "lang": "java","price": "199","ISBN": "ISBN978-7-100-13678-5"}
diff --git a/hugegraph-loader/pom.xml b/hugegraph-loader/pom.xml
index b959395f8..2b2ace69d 100644
--- a/hugegraph-loader/pom.xml
+++ b/hugegraph-loader/pom.xml
@@ -173,6 +173,117 @@
30.0-jre
+
+
+ org.apache.hbase
+ hbase-common
+ ${hbase.version}
+
+
+ com.github.stephenc.findbugs
+ findbugs-annotations
+
+
+ com.sun.jersey
+ jersey-server
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ com.sun.jersey
+ jersey-json
+
+
+
+
+ org.apache.hbase
+ hbase-client
+ ${hbase.version}
+
+
+ hadoop-auth
+ org.apache.hadoop
+
+
+
+
+ org.apache.hbase
+ hbase-shaded-client-byo-hadoop
+
+ ${hbase.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-ipc
+
+
+
+
+ org.apache.hbase
+ hbase-mapreduce
+ ${hbase.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ hadoop-auth
+ org.apache.hadoop
+
+
+ javax.ws.rs-api
+ javax.ws.rs
+
+
+ jersey-client
+ org.glassfish.jersey.core
+
+
+ jersey-common
+ org.glassfish.jersey.core
+
+
+ jersey-server
+ org.glassfish.jersey.core
+
+
+ org.apache.hadoop
+ hadoop-distcp
+
+
+
+
org.apache.hadoop
@@ -342,10 +453,18 @@
org.apache.parquet
parquet-hadoop-bundle
+
+ org.apache.hadoop
+ hadoop-common
+
org.apache.parquet
parquet-column
+
+ org.apache.hadoop
+ hadoop-hdfs
+
org.apache.thrift
libfb303
@@ -354,6 +473,14 @@
com.fasterxml.jackson.core
*
+
+ org.apache.hadoop
+ hadoop-distcp
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java
index e3d411ef0..e384e2084 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/EdgeBuilder.java
@@ -37,6 +37,7 @@
import com.baidu.hugegraph.structure.schema.VertexLabel;
import org.apache.hugegraph.util.E;
import com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.Row;
public class EdgeBuilder extends ElementBuilder {
@@ -109,6 +110,53 @@ public List build(String[] names, Object[] values) {
}
return edges;
}
+
+ @Override
+ public List build(Row row) {
+ String[] names = row.schema().fieldNames();
+ Object[] values = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ values[i] = row.get(i);
+ }
+ if (this.vertexIdsIndex == null ||
+ !Arrays.equals(this.lastNames, names)) {
+ this.vertexIdsIndex = this.extractVertexIdsIndex(names);
+ }
+
+ this.lastNames = names;
+ EdgeKVPairs kvPairs = this.newEdgeKVPairs();
+ kvPairs.source.extractFromEdge(names, values,
+ this.vertexIdsIndex.sourceIndexes);
+ kvPairs.target.extractFromEdge(names, values,
+ this.vertexIdsIndex.targetIndexes);
+ kvPairs.extractProperties(names, values);
+
+ List sources = kvPairs.source.buildVertices(false);
+ List targets = kvPairs.target.buildVertices(false);
+ if (sources.isEmpty() || targets.isEmpty()) {
+ return ImmutableList.of();
+ }
+ E.checkArgument(sources.size() == 1 || targets.size() == 1 ||
+ sources.size() == targets.size(),
+ "The elements number of source and target must be: " +
+ "1 to n, n to 1, n to n");
+ int size = Math.max(sources.size(), targets.size());
+ List edges = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ Vertex source = i < sources.size() ?
+ sources.get(i) : sources.get(0);
+ Vertex target = i < targets.size() ?
+ targets.get(i) : targets.get(0);
+ Edge edge = new Edge(this.mapping.label());
+ edge.source(source);
+ edge.target(target);
+ // Add properties
+ this.addProperties(edge, kvPairs.properties);
+ this.checkNonNullableKeys(edge);
+ edges.add(edge);
+ }
+ return edges;
+ }
private EdgeKVPairs newEdgeKVPairs() {
EdgeKVPairs kvPairs = new EdgeKVPairs();
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java
index aff8e7be0..72002336c 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/ElementBuilder.java
@@ -54,6 +54,7 @@
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.LongEncoding;
import com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.Row;
public abstract class ElementBuilder {
@@ -75,6 +76,8 @@ public ElementBuilder(LoadContext context, InputStruct struct) {
public abstract List build(String[] names, Object[] values);
+ public abstract List build(Row row);
+
public abstract SchemaLabel schemaLabel();
protected abstract Collection nonNullableKeys();
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java
index 2366bc2c1..633bdea53 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/builder/VertexBuilder.java
@@ -28,6 +28,8 @@
import com.baidu.hugegraph.structure.graph.Vertex;
import com.baidu.hugegraph.structure.schema.SchemaLabel;
import com.baidu.hugegraph.structure.schema.VertexLabel;
+
+import org.apache.spark.sql.Row;
import org.apache.hugegraph.util.E;
public class VertexBuilder extends ElementBuilder {
@@ -59,6 +61,19 @@ public List build(String[] names, Object[] values) {
return kvPairs.buildVertices(true);
}
+ @Override
+ public List build(Row row) {
+ VertexKVPairs kvPairs = this.newKVPairs(this.vertexLabel,
+ this.mapping.unfold());
+ String[] names = row.schema().fieldNames();
+ Object[] values = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ values[i] = row.get(i);
+ }
+ kvPairs.extractFromVertex(names, values);
+ return kvPairs.buildVertices(true);
+ }
+
@Override
public SchemaLabel schemaLabel() {
return this.vertexLabel;
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java
index 6f577f00c..fb42312ae 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/constant/Constants.java
@@ -79,4 +79,8 @@ public final class Constants {
public static final String CDC_DATA = "data";
public static final String CDC_OP = "op";
+ public static final String HBASE_COL_FAMILY = "f";
+ public static final String LOAD_DATA_PARSE_SUFFIX = "parse";
+ public static final String LOAD_DATA_SER_SUFFIX = "ser";
+ public static final String LOAD_DATA_INSERT_SUFFIX = "insert";
}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java
new file mode 100644
index 000000000..c197f90bc
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/DirectLoader.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.EdgeBuilder;
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.executor.LoadContext;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.io.Serializable;
+import java.util.LinkedList;
+import java.util.List;
+
+public abstract class DirectLoader implements Serializable {
+
+ LoadOptions loadOptions;
+ InputStruct struct;
+
+ public DirectLoader(LoadOptions loadOptions,
+ InputStruct struct) {
+ this.loadOptions = loadOptions;
+ this.struct = struct;
+ }
+
+ public final void bulkload(Dataset ds) {
+ JavaPairRDD javaPairRDD = buildVertexAndEdge(ds);
+ String path = generateFiles(javaPairRDD);
+ loadFiles(path);
+ }
+
+ protected List getElementBuilders() {
+ LoadContext context = new LoadContext(loadOptions);
+ context.schemaCache().updateAll();
+ List buildersForGraphElement = new LinkedList<>();
+ for (VertexMapping vertexMapping : struct.vertices()) {
+ buildersForGraphElement.add(
+ new VertexBuilder(context, struct, vertexMapping)
+ );
+ }
+ for (EdgeMapping edgeMapping : struct.edges()) {
+ buildersForGraphElement.add(new EdgeBuilder(context, struct, edgeMapping));
+ }
+ context.close();
+ return buildersForGraphElement;
+ }
+
+ abstract JavaPairRDD buildVertexAndEdge(Dataset ds);
+
+ abstract String generateFiles(JavaPairRDD buildAndSerRdd);
+
+ abstract void loadFiles(String path);
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java
new file mode 100644
index 000000000..b2c10af92
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/loader/HBaseDirectLoader.java
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.loader;
+
+import com.baidu.hugegraph.loader.builder.ElementBuilder;
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.direct.util.SinkToHBase;
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
+import com.baidu.hugegraph.loader.util.HugeClientHolder;
+import com.baidu.hugegraph.serializer.direct.HBaseSerializer;
+import com.baidu.hugegraph.structure.GraphElement;
+import com.baidu.hugegraph.structure.graph.Edge;
+import com.baidu.hugegraph.structure.graph.Vertex;
+
+import org.apache.hugegraph.util.Log;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+public class HBaseDirectLoader extends DirectLoader {
+
+ private SinkToHBase sinkToHBase;
+ private LoadDistributeMetrics loadDistributeMetrics;
+
+ public static final Logger LOG = Log.logger(HBaseDirectLoader.class);
+
+ public HBaseDirectLoader(LoadOptions loadOptions,
+ InputStruct struct,
+ LoadDistributeMetrics loadDistributeMetrics) {
+ super(loadOptions,struct);
+ this.loadDistributeMetrics = loadDistributeMetrics;
+ this.sinkToHBase = new SinkToHBase(loadOptions);
+
+ }
+
+ public String getTableName() {
+
+ String tableName = null;
+ if (struct.edges().size() > 0) {
+ tableName = this.loadOptions.edgeTablename;
+
+ } else if (struct.vertices().size() > 0) {
+ tableName = this.loadOptions.vertexTablename;
+
+ }
+ return tableName;
+ }
+
+ public Integer getTablePartitions () {
+ return struct.edges().size() > 0 ?
+ loadOptions.edgePartitions :
+ loadOptions.vertexPartitions;
+ }
+
+ public JavaPairRDD buildVertexAndEdge(Dataset ds) {
+ LOG.info("Start build vertexs and edges");
+ JavaPairRDD tuple2KeyValueJavaPairRDD =
+ ds.toJavaRDD().mapPartitionsToPair(
+ new PairFlatMapFunction, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Iterator> call(
+ Iterator rowIterator) throws Exception {
+
+ HBaseSerializer serializer = new HBaseSerializer(
+ HugeClientHolder.create(loadOptions),
+ loadOptions.vertexPartitions,loadOptions.edgePartitions);
+ List buildersForGraphElement = getElementBuilders();
+ List> result =
+ new LinkedList<>();
+ while (rowIterator.hasNext()) {
+ Row row = rowIterator.next();
+ List> serList =
+ buildAndSer(serializer, row,buildersForGraphElement);
+ result.addAll(serList);
+ }
+ serializer.close();
+ return result.iterator();
+ }
+ }
+ );
+ return tuple2KeyValueJavaPairRDD;
+ }
+
+ @Override
+ String generateFiles(JavaPairRDD buildAndSerRdd) {
+ LOG.info("Start to generate hfile");
+ try {
+ Tuple2 tuple =
+ sinkToHBase.getPartitionerByTableName(getTablePartitions(), getTableName());
+ Partitioner partitioner = (Partitioner) tuple._1;
+ TableDescriptor tableDescriptor = (TableDescriptor) tuple._2;
+
+ JavaPairRDD repartitionedRdd =
+ buildAndSerRdd.repartitionAndSortWithinPartitions(partitioner);
+ Configuration conf = sinkToHBase.getHBaseConfiguration().get();
+ Job job = Job.getInstance(conf);
+ HFileOutputFormat2.configureIncrementalLoadMap(job, tableDescriptor);
+ conf.set("hbase.mapreduce.hfileoutputformat.table.name",
+ tableDescriptor.getTableName().getNameAsString());
+ String path = getHFilePath(job.getConfiguration());
+ repartitionedRdd.saveAsNewAPIHadoopFile(
+ path,
+ ImmutableBytesWritable.class,
+ KeyValue.class,
+ HFileOutputFormat2.class,
+ conf
+ );
+ LOG.info("Saved HFiles to: '{}'", path);
+ flushPermission(conf,path);
+ return path;
+ } catch (IOException e) {
+ LOG.error("Failed to generate files", e);
+ }
+ return Constants.EMPTY_STR;
+ }
+
+ public String getHFilePath (Configuration conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ long timeStr = System.currentTimeMillis();
+ String pathStr = fs.getWorkingDirectory().toString() + "/hfile-gen" + "/" + timeStr + "/";
+ Path hfileGenPath = new Path(pathStr);
+ if (fs.exists(hfileGenPath)) {
+ LOG.info("\n Delete the path where the hfile is generated,path {} ", pathStr);
+ fs.delete(hfileGenPath,true);
+ }
+ return pathStr;
+ }
+
+ @Override
+ public void loadFiles(String path) {
+ try {
+ sinkToHBase.loadHfiles(path, getTableName());// BulkLoad HFile to HBase
+ } catch (Exception e) {
+ LOG.error(" Failed to load hfiles", e);
+ }
+ }
+
+ private void flushPermission (Configuration conf, String path) {
+ FsShell shell = new FsShell(conf);
+ try {
+ LOG.info("Chmod hfile directory permission");
+ shell.run(new String[]{"-chmod", "-R", "777", path});
+ shell.close();
+ } catch (Exception e) {
+ LOG.error("Couldnt change the file permissions " + e +
+ " Please run command:" +
+ "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles " + path +
+ " '" + "test" + "'\n" + " to load generated HFiles into HBase table");
+ }
+ }
+
+ List> buildAndSer (HBaseSerializer serializer,
+ Row row,
+ List builders) {
+ List elementsElement;
+
+ List> result = new LinkedList<>();
+
+ for (ElementBuilder builder : builders) {
+ ElementMapping elementMapping = builder.mapping();
+ if (elementMapping.skip()) {
+ continue;
+ }
+ if ("".equals(row.mkString())) {
+ break;
+ }
+ switch (struct.input().type()) {
+ case FILE:
+ case HDFS:
+ elementsElement = builder.build(row);
+ break;
+ default:
+ throw new AssertionError(String.format(
+ "Unsupported input source '%s'",
+ struct.input().type()));
+ }
+
+ boolean isVertex = builder.mapping().type().isVertex();
+ if (isVertex) {
+ for (Vertex vertex : (List) (Object) elementsElement) {
+ LOG.debug("vertex already build done {} ", vertex.toString());
+ Tuple2 tuple2 =
+ vertexSerialize(serializer,vertex);
+ loadDistributeMetrics.increaseDisVertexInsertSuccess(builder.mapping());
+ result.add(tuple2);
+ }
+ } else {
+ for (Edge edge : (List) (Object) elementsElement) {
+ LOG.debug("edge already build done {}", edge.toString());
+ Tuple2 tuple2 =
+ edgeSerialize(serializer,edge);
+ loadDistributeMetrics.increaseDisEdgeInsertSuccess(builder.mapping());
+ result.add(tuple2);
+
+ }
+ }
+ }
+ return result;
+ }
+
+ private Tuple2 edgeSerialize (HBaseSerializer serializer,
+ Edge edge) {
+ LOG.debug("edge start serialize {}", edge.toString());
+ byte[] rowkey = serializer.getKeyBytes(edge);
+ byte[] values = serializer.getValueBytes(edge);
+ ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+ rowKey.set(rowkey);
+ KeyValue keyValue = new KeyValue(rowkey,
+ Bytes.toBytes(Constants.HBASE_COL_FAMILY),
+ Bytes.toBytes(Constants.EMPTY_STR),
+ values);
+ return new Tuple2<>(rowKey,keyValue);
+ }
+
+ private Tuple2 vertexSerialize (HBaseSerializer serializer,
+ Vertex vertex) {
+ LOG.debug("vertex start serialize {}", vertex.toString());
+ byte[] rowkey = serializer.getKeyBytes(vertex);
+ byte[] values = serializer.getValueBytes(vertex);
+ ImmutableBytesWritable rowKey = new ImmutableBytesWritable();
+ rowKey.set(rowkey);
+ KeyValue keyValue = new KeyValue(rowkey,
+ Bytes.toBytes(Constants.HBASE_COL_FAMILY),
+ Bytes.toBytes(Constants.EMPTY_STR),
+ values);
+ return new Tuple2<>(rowKey,keyValue);
+ }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java
new file mode 100644
index 000000000..99ea1d781
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/direct/util/SinkToHBase.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.direct.util;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HRegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hugegraph.util.Log;
+import org.apache.spark.Partitioner;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.loader.executor.LoadOptions;
+
+import scala.Tuple2;
+
+public class SinkToHBase implements Serializable {
+
+ private LoadOptions loadOptions;
+ public static final Logger LOG = Log.logger(SinkToHBase.class);
+
+ public SinkToHBase(LoadOptions loadOptions) {
+ this.loadOptions = loadOptions;
+ }
+
+ public Optional getHBaseConfiguration() {
+ Configuration baseConf = HBaseConfiguration.create();
+ baseConf.set("hbase.zookeeper.quorum", this.loadOptions.hbaseZKQuorum);
+ baseConf.set("hbase.zookeeper.property.clientPort", this.loadOptions.hbaseZKPort);
+ baseConf.set("zookeeper.znode.parent", this.loadOptions.hbaseZKParent);
+ return Optional.ofNullable(baseConf);
+ }
+
+ private Optional getConnection() {
+ Optional baseConf = getHBaseConfiguration();
+ Connection conn = null;
+ try {
+ conn = ConnectionFactory.createConnection(baseConf.get());
+ } catch (IOException e) {
+ LOG.error("get hbase connection failed",e);
+ }
+ return Optional.ofNullable(conn);
+ }
+
+ public Tuple2
+ getPartitionerByTableName (int numPartitions, String tableName) throws IOException {
+ Optional optionalConnection = getConnection();
+ TableDescriptor descriptor = optionalConnection
+ .get()
+ .getTable(TableName.valueOf(tableName))
+ .getDescriptor();
+ LOG.debug("getPartitionerByTableName get TableDescriptor " +
+ descriptor.getTableName());
+ optionalConnection.get().close();
+ return new Tuple2(
+ new IntPartitioner(numPartitions, tableName),descriptor);
+ }
+
+ public void loadHfiles(String path, String tableName) throws Exception {
+ Connection conn = getConnection().get();
+ Table table = conn.getTable(TableName.valueOf(tableName));
+ Configuration conf = conn.getConfiguration();
+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
+ bulkLoadHFilesTool.bulkLoad(table.getName(), new Path(path));
+ table.close();
+ conn.close();
+ }
+
+ public class IntPartitioner extends Partitioner {
+
+ private final int numPartitions;
+ public Map, Integer> rangeMap = new HashMap<>();
+ private String tableName;
+
+ public IntPartitioner(int numPartitions, String tableName) throws IOException {
+ this.numPartitions = numPartitions;
+ this.rangeMap = getRangeMap(tableName);
+ this.tableName = tableName;
+ }
+
+ private Map, Integer> getRangeMap(String tableName) throws IOException {
+ Connection conn = getConnection().get();
+ HRegionLocator locator =
+ (HRegionLocator) conn.getRegionLocator(TableName.valueOf(tableName));
+ Pair startEndKeys = locator.getStartEndKeys();
+ Map, Integer> rangeMap = new HashMap<>();
+ for (int i = 0; i < startEndKeys.getFirst().length; i++) {
+ String startKey = Bytes.toString(startEndKeys.getFirst()[i]);
+ String endKey = Bytes.toString(startEndKeys.getSecond()[i]);
+ rangeMap.put(new ArrayList<>(Arrays.asList(startKey, endKey)), i);
+ }
+ conn.close();
+ return rangeMap;
+ }
+
+ @Override
+ public int numPartitions() {
+ return numPartitions;
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ if (key instanceof ImmutableBytesWritable) {
+ try {
+ ImmutableBytesWritable immutableBytesWritableKey = (ImmutableBytesWritable) key;
+ if (rangeMap == null || rangeMap.isEmpty()) {
+ rangeMap = getRangeMap(this.tableName);
+ }
+ String keyString = Bytes.toString(immutableBytesWritableKey.get());
+ for (List range : rangeMap.keySet()) {
+ if (keyString.compareToIgnoreCase(range.get(0)) >= 0 &&
+ ((keyString.compareToIgnoreCase(range.get(1)) < 0) ||
+ range.get(1).equals(""))) {
+ return rangeMap.get(range);
+ }
+ }
+ LOG.error("Didn't find proper key in rangeMap, so returning 0 ...");
+ return 0;
+ } catch (Exception e) {
+ LOG.error("When trying to get partitionID, " +
+ "encountered exception: {} \t key = {}", e, key);
+ return 0;
+ }
+ } else {
+ LOG.error("key is NOT ImmutableBytesWritable type ...");
+ return 0;
+ }
+ }
+ }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java
index c90b1d746..118b97373 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/executor/LoadOptions.java
@@ -23,6 +23,7 @@
import java.io.Serializable;
import java.util.Set;
+import com.baidu.hugegraph.loader.mapping.BackendStoreInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -206,6 +207,34 @@ public class LoadOptions implements Serializable {
description = "Print usage of HugeGraphLoader")
public boolean help;
+ @Parameter(names = {"--sink-type"}, arity = 1,
+ description = "Sink to different storage")
+ public boolean sinkType = true;
+
+ @Parameter(names = {"--edge-partitions"}, arity = 1,
+ description = "The number of partitions of the HBase edge table")
+ public int edgePartitions = 64;
+
+ @Parameter(names = {"--vertex-partitions"}, arity = 1,
+ description = "The number of partitions of the HBase vertex table")
+ public int vertexPartitions = 64;
+
+ @Parameter(names = {"edgeTablename"}, arity = 1,
+ description = "edgeTablename")
+ public String edgeTablename;
+ @Parameter(names = {"vertexTablename"}, arity = 1,
+ description = "vertexTablename")
+ public String vertexTablename;
+ @Parameter(names = {"hbaseZKQuorum"}, arity = 1,
+ description = "hbaseZKQuorum")
+ public String hbaseZKQuorum;
+ @Parameter(names = {"hbaseZKPort"}, arity = 1,
+ description = "hbaseZKPort")
+ public String hbaseZKPort;
+ @Parameter(names = {"hbaseZKParent"}, arity = 1,
+ description = "hbaseZKParent")
+ public String hbaseZKParent;
+
public String workModeString() {
if (this.incrementalMode) {
return "INCREMENTAL MODE";
@@ -263,6 +292,15 @@ public static LoadOptions parseOptions(String[] args) {
return options;
}
+ public void copyBackendStoreInfo (BackendStoreInfo backendStoreInfo) {
+ E.checkArgument(null != backendStoreInfo, "The backendStoreInfo can't be null");
+ this.edgeTablename = backendStoreInfo.getEdgeTablename();
+ this.vertexTablename = backendStoreInfo.getVertexTablename();
+ this.hbaseZKParent = backendStoreInfo.getHbaseZKParent();
+ this.hbaseZKPort = backendStoreInfo.getHbaseZKPort();
+ this.hbaseZKQuorum = backendStoreInfo.getHbaseZKQuorum();
+ }
+
public static class UrlValidator implements IParameterValidator {
@Override
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java
new file mode 100644
index 000000000..1b533262d
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/BackendStoreInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.mapping;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({"edge_tablename", "vertex_tablename", "hbase_zookeeper_quorum",
+ "hbase_zookeeper_property_clientPort", "zookeeper_znode_parent"})
+public class BackendStoreInfo {
+
+ @JsonProperty("edge_tablename")
+ private String edgeTablename;
+ @JsonProperty("vertex_tablename")
+ private String vertexTablename;
+ @JsonProperty("hbase_zookeeper_quorum")
+ private String hbaseZKQuorum;
+ @JsonProperty("hbase_zookeeper_property_clientPort")
+ private String hbaseZKPort;
+ @JsonProperty("zookeeper_znode_parent")
+ private String hbaseZKParent;
+
+ public String getEdgeTablename() {
+ return edgeTablename;
+ }
+
+ public void setEdgeTablename(String edgeTablename) {
+ this.edgeTablename = edgeTablename;
+ }
+
+ public String isVertexTablename() {
+ return vertexTablename;
+ }
+
+ public void setVertexTablename(String vertexTablename) {
+ this.vertexTablename = vertexTablename;
+ }
+
+ public String getVertexTablename() {
+ return vertexTablename;
+ }
+
+ public String getHbaseZKQuorum() {
+ return hbaseZKQuorum;
+ }
+
+ public void setHbaseZKQuorum(String hbaseZKQuorum) {
+ this.hbaseZKQuorum = hbaseZKQuorum;
+ }
+
+ public String getHbaseZKPort() {
+ return hbaseZKPort;
+ }
+
+ public void setHbaseZKPort(String hbaseZKPort) {
+ this.hbaseZKPort = hbaseZKPort;
+ }
+
+ public String getHbaseZKParent() {
+ return hbaseZKParent;
+ }
+
+ public void setHbaseZKParent(String hbaseZKParent) {
+ this.hbaseZKParent = hbaseZKParent;
+ }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java
index 48f5beca3..c3afd7367 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/mapping/LoadMapping.java
@@ -53,6 +53,12 @@ public class LoadMapping implements Checkable {
private String version;
@JsonProperty("structs")
private List structs;
+ @JsonProperty("backendStoreInfo")
+ private BackendStoreInfo backendStoreInfo;
+
+ public BackendStoreInfo getBackendStoreInfo() {
+ return backendStoreInfo;
+ }
public static LoadMapping of(String filePath) {
File file = FileUtils.getFile(filePath);
@@ -81,6 +87,14 @@ public LoadMapping(@JsonProperty("structs") List structs) {
this.structs = structs;
}
+ @JsonCreator
+ public LoadMapping(@JsonProperty("structs") List structs,
+ @JsonProperty("backendStoreInfo") BackendStoreInfo backendStoreInfo) {
+ this.version = Constants.V2_STRUCT_VERSION;
+ this.structs = structs;
+ this.backendStoreInfo = backendStoreInfo;
+ }
+
@Override
public void check() throws IllegalArgumentException {
E.checkArgument(!StringUtils.isEmpty(this.version),
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java
new file mode 100644
index 000000000..3345cd169
--- /dev/null
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/metrics/LoadDistributeMetrics.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.loader.metrics;
+
+import com.baidu.hugegraph.loader.constant.Constants;
+import com.baidu.hugegraph.loader.mapping.EdgeMapping;
+import com.baidu.hugegraph.loader.mapping.ElementMapping;
+import com.baidu.hugegraph.loader.mapping.InputStruct;
+import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import org.apache.spark.SparkContext;
+import org.apache.spark.util.LongAccumulator;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public final class LoadDistributeMetrics implements Serializable {
+
+ private final InputStruct struct;
+ private Map vertexDisMetrics;
+ private Map edgeDisMetrics;
+
+ public LoadDistributeMetrics(InputStruct struct) {
+ this.struct = struct;
+ this.vertexDisMetrics = new HashMap<>();
+ this.edgeDisMetrics = new HashMap<>();
+ for (VertexMapping mapping : struct.vertices()) {
+ this.vertexDisMetrics.put(mapping.label(), new Metrics());
+ }
+ for (EdgeMapping mapping : struct.edges()) {
+ this.edgeDisMetrics.put(mapping.label(), new Metrics());
+ }
+ }
+
+ public void init(SparkContext sc) {
+ for (VertexMapping mapping : this.struct.vertices()) {
+ Metrics metrics = this.vertexDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR + Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR + Constants.LOAD_DATA_PARSE_SUFFIX);
+ }
+ for (EdgeMapping mapping : this.struct.edges()) {
+ Metrics metrics = this.edgeDisMetrics.get(mapping.label());
+ metrics.insertSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR + Constants.LOAD_DATA_INSERT_SUFFIX);
+ metrics.parseSuccess = sc.longAccumulator(mapping.label() +
+ Constants.UNDERLINE_STR + Constants.LOAD_DATA_PARSE_SUFFIX);
+ }
+ }
+
+ public void increaseDisVertexParseSuccess(ElementMapping mapping) {
+ this.disMetrics(mapping).parseSuccess.add(1);
+ }
+
+ public void pluseDisVertexParseSuccess(ElementMapping mapping, Long count) {
+ this.disMetrics(mapping).parseSuccess.add(count);
+ }
+
+ public void increaseDisVertexInsertSuccess(ElementMapping mapping) {
+ this.disMetrics(mapping).insertSuccess.add(1);
+ }
+
+ public void plusDisVertexInsertSuccess(ElementMapping mapping, Long count) {
+ this.disMetrics(mapping).insertSuccess.add(count);
+ }
+
+ public void increaseDisEdgeParseSuccess(ElementMapping mapping) {
+ this.disMetrics(mapping).parseSuccess.add(1);
+ }
+
+ public void pluseDisEdgeParseSuccess(ElementMapping mapping, Long count) {
+ this.disMetrics(mapping).parseSuccess.add(count);
+ }
+
+ public void increaseDisEdgeInsertSuccess(ElementMapping mapping) {
+ this.disMetrics(mapping).insertSuccess.add(1);
+ }
+
+ public void plusDisEdgeInsertSuccess(ElementMapping mapping, Long count) {
+ this.disMetrics(mapping).insertSuccess.add(count);
+ }
+
+ public Long readVertexInsertSuccess() {
+ Long totalCnt = 0L;
+ Collection values = vertexDisMetrics.values();
+ for (Metrics metrics : values) {
+ totalCnt += metrics.insertSuccess();
+ }
+ return totalCnt;
+ }
+
+ public Long readEdgeInsertSuccess() {
+ Long totalCnt = 0L;
+ Collection values = edgeDisMetrics.values();
+ for (Metrics metrics : values) {
+ totalCnt += metrics.insertSuccess();
+ }
+ return totalCnt;
+ }
+
+ private Metrics disMetrics(ElementMapping mapping) {
+ if (mapping.type().isVertex()) {
+ return this.vertexDisMetrics.get(mapping.label());
+ } else {
+ return this.edgeDisMetrics.get(mapping.label());
+ }
+ }
+
+ public static class Metrics implements Serializable {
+
+ private LongAccumulator parseSuccess;
+ private LongAccumulator parseFailure;
+ private LongAccumulator insertSuccess;
+ private LongAccumulator insertFailure;
+
+ public Metrics() {
+ }
+
+ public long parseSuccess() {
+ return this.parseSuccess.value();
+ }
+
+ public long parseFailure() {
+ return this.parseFailure.value();
+ }
+
+ public long insertSuccess() {
+ return this.insertSuccess.value();
+ }
+
+ public long insertFailure() {
+ return this.insertFailure.value();
+ }
+ }
+}
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
index ae8213030..acb188aee 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/spark/HugeGraphSparkLoader.java
@@ -23,6 +23,7 @@
import com.baidu.hugegraph.loader.builder.EdgeBuilder;
import com.baidu.hugegraph.loader.builder.ElementBuilder;
import com.baidu.hugegraph.loader.builder.VertexBuilder;
+import com.baidu.hugegraph.loader.direct.loader.HBaseDirectLoader;
import com.baidu.hugegraph.loader.executor.LoadContext;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.mapping.EdgeMapping;
@@ -30,6 +31,7 @@
import com.baidu.hugegraph.loader.mapping.InputStruct;
import com.baidu.hugegraph.loader.mapping.LoadMapping;
import com.baidu.hugegraph.loader.mapping.VertexMapping;
+import com.baidu.hugegraph.loader.metrics.LoadDistributeMetrics;
import com.baidu.hugegraph.loader.source.InputSource;
import com.baidu.hugegraph.loader.source.file.Compression;
import com.baidu.hugegraph.loader.source.file.FileFilter;
@@ -46,11 +48,14 @@
import com.baidu.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.Log;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructField;
+import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import java.io.Serializable;
@@ -89,29 +94,92 @@ public HugeGraphSparkLoader(String[] args) {
public void load() {
LoadMapping mapping = LoadMapping.of(this.loadOptions.file);
List structs = mapping.structs();
+ boolean sinkType = this.loadOptions.sinkType;
+ if(!sinkType) {
+ this.loadOptions.copyBackendStoreInfo(mapping.getBackendStoreInfo());
+ }
+ SparkConf conf = new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// kryo序列化
+ .set("spark.kryo.registrationRequired", "true");
+ try {
+ conf.registerKryoClasses(new Class[] {
+ org.apache.hadoop.hbase.io.ImmutableBytesWritable.class,
+ org.apache.hadoop.hbase.KeyValue.class,
+ org.apache.spark.sql.types.StructType.class,
+ StructField[].class,
+ StructField.class,
+ org.apache.spark.sql.types.LongType$.class,
+ org.apache.spark.sql.types.Metadata.class,
+ org.apache.spark.sql.types.StringType$.class,
+ Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"),
+ Class.forName("scala.reflect.ClassTag$$anon$1"),
+ Class.forName("scala.collection.immutable.Set$EmptySet$"),
+ Class.forName("org.apache.spark.sql.types.DoubleType$")
+ });
+ } catch (ClassNotFoundException e) {
+ LOG.error("spark kryo serialized registration failed");
+ }
+ SparkSession session = SparkSession.builder()
+ .config(conf)
+ .getOrCreate();
+ SparkContext sc = session.sparkContext();
- SparkSession session = SparkSession.builder().getOrCreate();
+ LongAccumulator totalInsertSuccess = sc.longAccumulator("totalInsertSuccess");
for (InputStruct struct : structs) {
+ LOG.info("\n Initializes the accumulator corresponding to the {} ",
+ struct.input().asFileSource().path());
+ LoadDistributeMetrics loadDistributeMetrics = new LoadDistributeMetrics(struct);
+ loadDistributeMetrics.init(sc);
+ LOG.info("\n Start to load data, data info is: \t {} ",
+ struct.input().asFileSource().path());
Dataset ds = read(session, struct);
- ds.foreachPartition((Iterator p) -> {
- LoadContext context = initPartition(this.loadOptions, struct);
- p.forEachRemaining((Row row) -> {
- loadRow(struct, row, p, context);
+ if (sinkType) {
+ LOG.info("\n Start to load data using spark apis \n");
+ ds.foreachPartition((Iterator p) -> {
+ LoadContext context = initPartition(this.loadOptions, struct);
+ p.forEachRemaining((Row row) -> {
+ loadRow(struct, row, p, context);
+ });
+ context.close();
});
- context.close();
- });
+
+ } else {
+ LOG.info("\n Start to load data using spark bulkload \n");
+ // gen-hfile
+ HBaseDirectLoader directLoader = new HBaseDirectLoader(loadOptions,
+ struct,loadDistributeMetrics);
+ directLoader.bulkload(ds);
+
+ }
+ collectLoadMetrics(loadDistributeMetrics,totalInsertSuccess);
+ LOG.info("\n Finished load {} data ",
+ struct.input().asFileSource().path());
}
+ Long totalInsertSuccessCnt = totalInsertSuccess.value();
+ LOG.info("\n ------------The data load task is complete-------------------\n" +
+ "\n insertSuccesscnt:\t {}" +
+ "\n ---------------------------------------------\n"
+ , totalInsertSuccessCnt);
+
+ sc.stop();
session.close();
session.stop();
}
+ private void collectLoadMetrics(LoadDistributeMetrics loadMetrics,
+ LongAccumulator totalInsertSuccess) {
+ Long edgeInsertSuccess = loadMetrics.readEdgeInsertSuccess();
+ Long vertexInsertSuccess = loadMetrics.readVertexInsertSuccess();
+ totalInsertSuccess.add(edgeInsertSuccess);
+ totalInsertSuccess.add(vertexInsertSuccess);
+ }
+
private LoadContext initPartition(
LoadOptions loadOptions, InputStruct struct) {
LoadContext context = new LoadContext(loadOptions);
for (VertexMapping vertexMapping : struct.vertices()) {
- this.builders.put(
- new VertexBuilder(context, struct, vertexMapping),
- new ArrayList<>());
+ this.builders.put(new VertexBuilder(context, struct, vertexMapping),
+ new ArrayList<>());
}
for (EdgeMapping edgeMapping : struct.edges()) {
this.builders.put(new EdgeBuilder(context, struct, edgeMapping),
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java
index f38e4b20d..1b9af2949 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/struct/GraphStructV1.java
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.stream.Collectors;
+import com.baidu.hugegraph.loader.mapping.BackendStoreInfo;
import org.apache.commons.collections.ListUtils;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
@@ -56,6 +57,9 @@ public class GraphStructV1 implements Checkable {
@JsonProperty("edges")
private final List edgeStructs;
+ @JsonProperty("backendStoreInfo")
+ private BackendStoreInfo backendStoreInfo;
+
public GraphStructV1() {
this.vertexStructs = new ArrayList<>();
this.edgeStructs = new ArrayList<>();
@@ -76,6 +80,10 @@ public static GraphStructV1 of(LoadContext context) {
}
}
+ public BackendStoreInfo getBackendStoreInfo() {
+ return backendStoreInfo;
+ }
+
@Override
public void check() throws IllegalArgumentException {
LOG.info("Checking vertex mapping descriptions");
diff --git a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java
index b43d8b554..a9667806f 100644
--- a/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java
+++ b/hugegraph-loader/src/main/java/com/baidu/hugegraph/loader/util/MappingUtil.java
@@ -129,7 +129,7 @@ private static LoadMapping parseV1(String json) {
inputStruct.id(String.valueOf(++id));
inputStructs.add(inputStruct);
}
- return new LoadMapping(inputStructs);
+ return new LoadMapping(inputStructs, graphStruct.getBackendStoreInfo());
}
private static ElementMapping convertV1ToV2(ElementStructV1 origin) {
diff --git a/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java b/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java
index c937daed0..c2a14b018 100644
--- a/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java
+++ b/hugegraph-loader/src/test/java/com/baidu/hugegraph/loader/test/unit/MappingConverterTest.java
@@ -73,7 +73,15 @@ public void testConvertV1ToV2() throws IOException {
" \"Rating\": \"rate\"" +
" }" +
" }" +
- " ]" +
+ " ]," +
+ " \"backendStoreInfo\":" +
+ " {" +
+ " \"edge_tablename\": \"hugegraph:g_oe\"," +
+ " \"vertex_tablename\": \"hugegraph:g_v\"," +
+ " \"hbase_zookeeper_quorum\": \"127.0.0.1\"," +
+ " \"hbase_zookeeper_property_clientPort\": \"2181\"," +
+ " \"zookeeper_znode_parent\": \"/hbase\"" +
+ " }" +
"}";
String input = "struct.json";
File inputFile = new File(input);
@@ -117,7 +125,13 @@ public void testConvertV1ToV2() throws IOException {
"\"field_mapping\":{\"UserID\":\"id\",\"MovieID\":\"id\"," +
"\"Rating\":\"rate\"},\"value_mapping\":{},\"selected\":[]," +
"\"ignored\":[\"Timestamp\"],\"null_values\":[\"\"]," +
- "\"update_strategies\":{},\"batch_size\":500}]}]}";
+ "\"update_strategies\":{},\"batch_size\":500}]}]," +
+ "\"backendStoreInfo\":{" +
+ "\"edge_tablename\":\"hugegraph:g_oe\"," +
+ "\"vertex_tablename\":\"hugegraph:g_v\"," +
+ "\"hbase_zookeeper_quorum\":\"127.0.0.1\"," +
+ "\"hbase_zookeeper_property_clientPort\":\"2181\"," +
+ "\"zookeeper_znode_parent\":\"/hbase\"}}";
Assert.assertEquals(expectV2Json, actualV2Json);
FileUtils.forceDelete(inputFile);
diff --git a/pom.xml b/pom.xml
index d8181f909..53d26db0f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
hugegraph
${project.name}
${project.version}
+ 2.2.3