Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce HugeGraphFlinkCDCLoader #291

Merged
merged 11 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions hugegraph-loader/assembly/static/bin/get-params.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash
function get_params() {
echo "params: $*"
engine_params=""
hugegraph_params=""
while (("$#")); do
case "$1" in
-–file | --graph | --schema | --host | --port | --username | --token | --protocol | \
--trust-store-file | --trust-store-password | --clear-all-data | --clear-timeout | \
simon824 marked this conversation as resolved.
Show resolved Hide resolved
--incremental-mode | --failure-mode | --batch-insert-threads | --single-insert-threads | \
--max-conn | --max-conn-per-route | --batch-size | --max-parse-errors | --max-insert-errors | \
--timeout | --shutdown-timeout | --retry-times | --retry-interval | --check-vertex | \
--print-progress | --dry-run | --help)
hugegraph_params="$hugegraph_params $1 $2"
shift 2
;;

*) # preserve positional arguments
engine_params="$engine_params $1"
shift
;;
esac
done
}
20 changes: 20 additions & 0 deletions hugegraph-loader/assembly/static/bin/hugegraph-flinkcdc-loader.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash

BIN_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
APP_DIR=$(dirname ${BIN_DIR})
LIB_DIR=${APP_DIR}/lib
assemblyJarName=$(find ${LIB_DIR} -name hugegraph-loader*.jar)

# get hugegraph_params and engine_params
source "$BIN_DIR"/get_params.sh
get_params $*
echo "engine_params: $engine_params"
echo "hugegraph_params: $hugegraph_params"

CMD=${FLINK_HOME}/bin/flink run \
${engine_params} \
-c com.baidu.hugegraph.loader.flink.HugeGraphFlinkCDCLoader \
${assemblyJarName} ${hugegraph_params}

echo ${CMD}
exec ${CMD}
58 changes: 9 additions & 49 deletions hugegraph-loader/assembly/static/bin/hugegraph-spark-loader.sh
Original file line number Diff line number Diff line change
@@ -1,64 +1,24 @@
#!/bin/bash

PARAMS=""
while (( "$#" )); do
case "$1" in
-m|--master)
MASTER=$2
shift 2
;;

-n|--name)
APP_NAME=$2
shift 2
;;

-e|--deploy-mode)
DEPLOY_MODE=$2
shift 2
;;

-c|--conf)
SPARK_CONFIG=${SPARK_CONFIG}" --conf "$2
shift 2
;;

--) # end argument parsing
shift
break
;;

*) # preserve positional arguments
PARAMS="$PARAMS $1"
shift
;;

esac
done

if [ -z ${MASTER} ] || [ -z ${DEPLOY_MODE} ]; then
echo "Error: The following options are required:
[-e | --deploy-mode], [-m | --master]"
usage
exit 0
fi

BIN_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BIN_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
APP_DIR=$(dirname ${BIN_DIR})
LIB_DIR=${APP_DIR}/lib

# get hugegraph_params and engine_params
source "$BIN_DIR"/get_params.sh
get_params $*
echo "engine_params: $engine_params"
echo "hugegraph_params: $hugegraph_params"

assemblyJarName=$(find ${LIB_DIR} -name hugegraph-loader*.jar)

DEFAULT_APP_NAME="hugegraph-spark-loader"
APP_NAME=${APP_NAME:-$DEFAULT_APP_NAME}

CMD="${SPARK_HOME}/bin/spark-submit
--name ${APP_NAME} \
--master ${MASTER} \
--deploy-mode ${DEPLOY_MODE} \
--class com.baidu.hugegraph.loader.spark.HugeGraphSparkLoader \
${SPARK_CONFIG}
--jars $(echo ${LIB_DIR}/*.jar | tr ' ' ',') ${assemblyJarName} ${PARAMS}"
${engine_params}
--jars $(echo ${LIB_DIR}/*.jar | tr ' ' ',') ${assemblyJarName} ${hugegraph_params}"

echo ${CMD}
exec ${CMD}
52 changes: 47 additions & 5 deletions hugegraph-loader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,55 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>antlr4-runtime</artifactId>
<groupId>org.antlr</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${flinkcdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -176,7 +218,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
<version>8.0.16</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,9 @@ public final class Constants {
public static final int VERTEX_ID_LIMIT = 128;
public static final String[] SEARCH_LIST = new String[]{":", "!"};
public static final String[] TARGET_LIST = new String[]{"`:", "`!"};

public static final String HOST_PORT_REGEX = ".+://(.+):(\\d+)";
simon824 marked this conversation as resolved.
Show resolved Hide resolved
public static final String CDC_DATA = "data";
public static final String CDC_OP = "op";

simon824 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public class LoadOptions implements Serializable {
description = "The number of lines in each submit")
public int batchSize = 500;

@Parameter(names = {"--cdc-flush-interval"}, arity = 1,
description = "The flush interval for flink cdc")
public int flushIntervalMs = 30000;

@Parameter(names = {"--shutdown-timeout"}, arity = 1,
validateWith = {PositiveValidator.class},
description = "The timeout of awaitTermination in seconds")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.baidu.hugegraph.loader.flink;

import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.baidu.hugegraph.loader.constant.Constants;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;

import io.debezium.data.Envelope;

public class HugeGraphDeserialization implements DebeziumDeserializationSchema<String> {
simon824 marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger LOG = LoggerFactory.getLogger(HugeGraphDeserialization.class);

@Override
public void deserialize(SourceRecord sourceRecord,
Collector<String> collector) throws Exception {
ObjectMapper mapper = new ObjectMapper();
ObjectNode result = mapper.createObjectNode();

Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String op = operation.code();
Struct value = (Struct) sourceRecord.value();
Struct data;
switch (operation) {
case DELETE:
data = value.getStruct("before");
break;
case CREATE:
case READ:
case UPDATE:
simon824 marked this conversation as resolved.
Show resolved Hide resolved
data = value.getStruct("after");
break;
default:
throw new RuntimeException("The type of `op` should be 'c' 'r' 'u' 'd' only");
simon824 marked this conversation as resolved.
Show resolved Hide resolved
}
ObjectNode rootNode = mapper.createObjectNode();
if (data != null) {
Schema afterSchema = data.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = data.get(field);
rootNode.put(field.name(), afterValue.toString());
}
}

result.set(Constants.CDC_DATA, rootNode);
result.put(Constants.CDC_OP, op);
LOG.debug("Loaded data: {}", result.toString());
collector.collect(result.toString());

simon824 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
Loading