Skip to content

Commit

Permalink
Merge branch 'main' into fix-loading
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 authored Jun 14, 2024
2 parents 40ec775 + aa60805 commit d44cc3d
Show file tree
Hide file tree
Showing 55 changed files with 721 additions and 84 deletions.
4 changes: 2 additions & 2 deletions charts/gie-standalone/templates/frontend/statefulset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ spec:
value: {{ .Values.frontendQueryPerSecondLimit | quote }}
- name: GREMLIN_SCRIPT_LANGUAGE_NAME
value: {{ .Values.gremlinScriptLanguageName | quote }}
- name: PHYSICAL_OPT_CONFIG
value: {{ .Values.physicalOptConfig}}
- name: GRAPH_PHYSICAL_OPT
value: {{ .Values.graphPhysicalOpt | quote }}
ports:
- name: gremlin
containerPort: {{ .Values.frontend.service.gremlinPort }}
Expand Down
9 changes: 9 additions & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ data:
gremlin.server.port=12312
## disable neo4j when launching groot server by default
neo4j.bolt.server.disabled=true

## GOpt config
graph.planner.is.on={{ .Values.graphPlannerIsOn }}
graph.planner.opt={{ .Values.graphPlannerOpt }}
graph.planner.rules={{ .Values.graphPlannerRules }}
graph.physical.opt={{ .Values.graphPhysicalOpt }}
gremlin.script.language.name={{ .Values.gremlinScriptLanguageName }}
query.execution.timeout.ms={{ .Values.queryExecutionTimeoutMs }}

log4rs.config=LOG4RS_CONFIG
## Auth config
Expand Down Expand Up @@ -88,6 +96,7 @@ data:
offsets.persist.interval.ms={{ .Values.offsetsPersistIntervalMs }}
file.meta.store.path={{ .Values.fileMetaStorePath }}
log.recycle.enable={{ .Values.logRecycleEnable }}
collect.statistics={{ .Values.collectStatistics }}
log.recycle.offset.reserve={{ .Values.logRecycleOffsetReserve }}

## Extra Config
Expand Down
11 changes: 11 additions & 0 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,15 @@ zkBasePath: "/graphscope/groot"
## Frontend Config
# gremlinServerPort: 12312

## GOpt config
## To adopt a CBO planner, set graphPlannerOpt to CBO, set gremlinScriptLanguageName to antlr_gremlin_calcite, and set physicalOptConfig to proto.
graphPlannerIsOn: true
graphPlannerOpt: RBO
graphPlannerRules: FilterIntoJoinRule, FilterMatchRule, ExtendIntersectRule, ExpandGetVFusionRule
gremlinScriptLanguageName: antlr_gremlin_traversal
physicalOptConfig: ffi
queryExecutionTimeoutMs: 600000

## Key-value pair separated by ;
## For example extraConfig="k1=v1;k2=v2"
extraConfig: ""
Expand Down Expand Up @@ -532,3 +541,5 @@ uptrace:

distributed:
enabled: false

collectStatistics: false
28 changes: 26 additions & 2 deletions docs/flex/interactive/data_import.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@ In our guide on [using custom graph data](./custom_graph_data.md), we introduced

Currently we only support import data to graph from local `csv` files or `odps` table. See configuration `loading_config.data_source.scheme`.

## Column mapping

When importing vertex and edge data into a graph, users must define how the raw data maps to the graph's schema.
This can be done using a YAML configuration, as shown:

```yaml
- column:
index: 0 # Column index in the data source
name: col_name # If a column name is present
property: property_name # The mapped property name
```
The column mapping requirements differ based on the data source:
#### Import from CSV
You can provide either `index`, `name`, or both. If both `index` and `name` are specified, we will check whether they matches.

#### Import from ODPS Table

You just need to specify the name of the `column`, since the name is guaranteed to be unique in a odps table. The `index` is disregarded.

## Sample Configuration for loading "Modern" Graph from csv files

To illustrate, let's examine the `examples/modern_import_full.yaml` file. This configuration is designed for importing the "modern" graph and showcases the full range of configuration possibilities. We'll dissect each configuration item in the sections that follow.
Expand Down Expand Up @@ -67,11 +89,13 @@ edge_mappings:
source_vertex_mappings:
- column:
index: 0
name: id
name: src_id
property: id
destination_vertex_mappings:
- column:
index: 1
name: id
name: dst_id
property: id
column_mappings:
- column:
index: 2
Expand Down
2 changes: 1 addition & 1 deletion flex/interactive/examples/movies/import.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ edge_mappings:
destination_vertex: Movie
column_mappings:
- column:
index: 3
index: 2
name: rating
property: rating
inputs:
Expand Down
33 changes: 26 additions & 7 deletions flex/storages/rt_mutable_graph/loader/csv_fragment_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ void CSVFragmentLoader::fillVertexReaderMeta(
// parse all column_names

std::vector<std::string> included_col_names;
std::vector<size_t> included_col_indices;
std::vector<std::string> mapped_property_names;

auto cur_label_col_mapping = loading_config_.GetVertexColumnMappings(v_label);
Expand All @@ -383,20 +382,29 @@ void CSVFragmentLoader::fillVertexReaderMeta(

for (size_t i = 0; i < read_options.column_names.size(); ++i) {
included_col_names.emplace_back(read_options.column_names[i]);
included_col_indices.emplace_back(i);
// We assume the order of the columns in the file is the same as the
// order of the properties in the schema, except for primary key.
mapped_property_names.emplace_back(property_names[i]);
}
} else {
for (size_t i = 0; i < cur_label_col_mapping.size(); ++i) {
auto& [col_id, col_name, property_name] = cur_label_col_mapping[i];
if (col_name.empty()) {
// use default mapping
if (col_name.empty()){
if (col_id >= read_options.column_names.size() || col_id < 0) {
LOG(FATAL) << "The specified column index: " << col_id
<< " is out of range, please check your configuration";
}
col_name = read_options.column_names[col_id];
}
// check whether index match to the name if col_id is valid
if (col_id >= 0 && col_id < read_options.column_names.size()) {
if (col_name != read_options.column_names[col_id]) {
LOG(FATAL) << "The specified column name: " << col_name
<< " does not match the column name in the file: "
<< read_options.column_names[col_id];
}
}
included_col_names.emplace_back(col_name);
included_col_indices.emplace_back(col_id);
mapped_property_names.emplace_back(property_name);
}
}
Expand Down Expand Up @@ -521,10 +529,21 @@ void CSVFragmentLoader::fillEdgeReaderMeta(
for (size_t i = 0; i < cur_label_col_mapping.size(); ++i) {
// TODO: make the property column's names are in same order with schema.
auto& [col_id, col_name, property_name] = cur_label_col_mapping[i];
if (col_name.empty()) {
// use default mapping
if (col_name.empty()){
if (col_id >= read_options.column_names.size() || col_id < 0) {
LOG(FATAL) << "The specified column index: " << col_id
<< " is out of range, please check your configuration";
}
col_name = read_options.column_names[col_id];
}
// check whether index match to the name if col_id is valid
if (col_id >= 0 && col_id < read_options.column_names.size()) {
if (col_name != read_options.column_names[col_id]) {
LOG(FATAL) << "The specified column name: " << col_name
<< " does not match the column name in the file: "
<< read_options.column_names[col_id];
}
}
included_col_names.emplace_back(col_name);
mapped_property_names.emplace_back(property_name);
}
Expand Down
18 changes: 14 additions & 4 deletions flex/storages/rt_mutable_graph/loading_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,26 @@ static bool parse_column_mappings(
LOG(ERROR) << "column_mappings should have field [column]";
return false;
}
int32_t column_id;
int32_t column_id = -1;
if (!get_scalar(column_mapping, "index", column_id)) {
LOG(ERROR) << "Expect column index for column mapping";
return false;
VLOG(10) << "Column index for column mapping is not set, skip";
}
else {
if (column_id < 0) {
LOG(ERROR) << "Column index for column mapping should be non-negative";
return false;
}
}
std::string column_name;
std::string column_name = "";
if (!get_scalar(column_mapping, "name", column_name)) {
VLOG(10) << "Column name for col_id: " << column_id
<< " is not set, make it empty";
}
// At least one need to be specified.
if (column_id == -1 && column_name.empty()) {
LOG(ERROR) << "Expect column index or name for column mapping";
return false;
}

std::string property_name; // property name is optional.
if (!get_scalar(node[i], "property", property_name)) {
Expand Down
2 changes: 1 addition & 1 deletion flex/storages/rt_mutable_graph/loading_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class LoadingConfig {
GetEdgeLoadingMeta() const;

// Get vertex column mappings. Each element in the vector is a pair of
// <column_index, property_name>.
// <column_index, column_name, property_name>.
const std::vector<std::tuple<size_t, std::string, std::string>>&
GetVertexColumnMappings(label_t label_id) const;

Expand Down
13 changes: 12 additions & 1 deletion flex/storages/rt_mutable_graph/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,17 @@ static Status parse_edge_schema(YAML::Node node, Schema& schema) {
RETURN_IF_NOT_OK(parse_edge_properties(node["properties"], edge_label_name,
property_types, prop_names,
schema.GetVersion()));

// TODO(zhanglei): Remove this check after multiple edge properties are
// supported.
if (property_types.size() > 1) {
LOG(ERROR) << "Currently edge can not have "
"more than one property";
return Status(StatusCode::InvalidSchema,
"Currently edge can not have "
"more than one property");
}

if (node["description"]) {
description = node["description"].as<std::string>();
}
Expand Down Expand Up @@ -969,7 +980,7 @@ static Status parse_edge_schema(YAML::Node node, Schema& schema) {
}

static Status parse_edges_schema(YAML::Node node, Schema& schema) {
if (node.IsNull()){
if (node.IsNull()) {
LOG(INFO) << "No edge is set";
return Status::OK();
}
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/assembly/src/bin/groot/store_ctl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ _setup_env() {

mkdir -p ${LOG_DIR}

export OTEL_SDK_DISABLED="${OTEL_SDK_DISABLED:-true}"

export LD_LIBRARY_PATH=${GROOT_HOME}/native:${GROOT_HOME}/native/lib:${LD_LIBRARY_PATH}:/usr/local/lib
libpath="$(echo "${GROOT_HOME}"/lib/*.jar | tr ' ' ':')"
}
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/assembly/src/conf/groot/config.template
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ pegasus.worker.num=2
pegasus.hosts=localhost:8080

kafka.test.cluster.enable=true
OTEL_SDK_DISABLED=true
collect.statistics=false
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,7 @@ public class CommonConfig {
// Only available in multi pod mode.
public static final Config<Boolean> WRITE_HA_ENABLED =
Config.boolConfig("write.ha.enabled", false);

public static final Config<Boolean> COLLECT_STATISTICS =
Config.boolConfig("collect.statistics", false);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
public interface SchemaFetcher {
Map<Long, GraphSchema> getSchemaSnapshotPair();

GraphStatistics getStatistics();

int getPartitionNum();

int getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import com.alibaba.graphscope.groot.common.schema.api.GraphStatistics;
import com.alibaba.graphscope.groot.common.schema.wrapper.EdgeKind;
import com.alibaba.graphscope.groot.common.schema.wrapper.LabelId;
import com.alibaba.graphscope.proto.groot.Statistics;
import com.google.common.collect.Maps;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -94,4 +96,32 @@ public Long getEdgeTypeCount(

return count == null ? 0L : count;
}

public static DefaultGraphStatistics parseProto(Statistics statistics) {
long vcount = statistics.getNumVertices();
long ecount = statistics.getNumEdges();
Map<LabelId, Long> vertexTypeCounts = new HashMap<>();
Map<EdgeKind, Long> edgeTypeCounts = new HashMap<>();
for (Statistics.VertexTypeStatistics sts : statistics.getVertexTypeStatisticsList()) {
vertexTypeCounts.put(LabelId.parseProto(sts.getLabelId()), sts.getNumVertices());
}
for (Statistics.EdgeTypeStatistics sts : statistics.getEdgeTypeStatisticsList()) {
edgeTypeCounts.put(EdgeKind.parseProto(sts.getEdgeKind()), sts.getNumEdges());
}
return new DefaultGraphStatistics(vertexTypeCounts, edgeTypeCounts, vcount, ecount);
}

@Override
public String toString() {
return "DefaultGraphStatistics{"
+ "vertexTypeCounts="
+ vertexTypeCounts
+ ", edgeTypeCounts="
+ edgeTypeCounts
+ ", totalVertexCount="
+ totalVertexCount
+ ", totalEdgeCount="
+ totalEdgeCount
+ '}';
}
}
2 changes: 1 addition & 1 deletion interactive_engine/compiler/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ run:
-Dgraph.store=${graph.store} \
-Dpegasus.hosts=${pegasus.hosts} \
-Dgremlin.script.language.name=${gremlin.script.language.name} \
-Dphysical.opt.config=${physical.opt.config} \
-Dgraph.physical.opt=${graph.physical.opt} \
-Dgraph.planner.rules=${graph.planner.rules} \
-Dgraph.planner.opt=${graph.planner.opt} \
-Dgraph.statistics=${graph.statistics} \
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/compiler/conf/ir.compiler.properties
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ calcite.default.charset: UTF-8
# gremlin.script.language.name: antlr_gremlin_traversal

# the output plan format, can be ffi(default) or proto
# physical.opt.config: ffi
# graph.physical.opt: ffi

# set the max capacity of the result streaming buffer for each query
# per.query.stream.buffer.max.capacity: 256
2 changes: 1 addition & 1 deletion interactive_engine/compiler/ir_experimental_advanced_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ RUST_LOG=info DATA_PATH=/tmp/gstest/ldbc_graph_exp_bin PARTITION_ID=0 ./start_rp
cd ${base_dir}/../executor/ir/target/release &&
RUST_LOG=info DATA_PATH=/tmp/gstest/ldbc_graph_exp_bin PARTITION_ID=1 ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config/distributed/server_1 &
sleep 10
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/ldbc_schema.json gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=src/main/resources/statistics/ldbc1_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/ldbc_schema.json gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=src/main/resources/statistics/ldbc1_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
sleep 5s
cd ${base_dir} && make pattern_test && make ldbc_test && make simple_test
exit_code=$?
Expand Down
6 changes: 3 additions & 3 deletions interactive_engine/compiler/ir_experimental_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fi

# Test2: run gremlin standard tests on experimental store via calcite-based ir
# restart compiler service
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json &
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json &
sleep 5s
# run gremlin standard tests to test calcite-based IR layer
cd ${base_dir} && make gremlin_calcite_test
Expand All @@ -42,7 +42,7 @@ RUST_LOG=info DATA_PATH=/tmp/gstest/modern_graph_exp_bin PARTITION_ID=0 ./start_
cd ${base_dir}/../executor/ir/target/release &&
RUST_LOG=info DATA_PATH=/tmp/gstest/modern_graph_exp_bin PARTITION_ID=1 ./start_rpc_server --config ${base_dir}/../executor/ir/integrated/config/distributed/server_1 &
# start compiler service
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite physical.opt.config=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 &
cd ${base_dir} && make run gremlin.script.language.name=antlr_gremlin_calcite graph.physical.opt=proto graph.planner.opt=CBO graph.statistics=./src/test/resources/statistics/modern_statistics.json pegasus.hosts:=127.0.0.1:1234,127.0.0.1:1235 &
sleep 5s
cd ${base_dir} && make gremlin_calcite_test
exit_code=$?
Expand Down Expand Up @@ -76,7 +76,7 @@ fi

# Test5: run cypher movie tests on experimental store via calcite-based ir
# restart compiler service
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json graph.planner.opt=CBO graph.statistics:=./src/main/resources/statistics/movie_statistics.json physical.opt.config=proto graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
cd ${base_dir} && make run graph.schema:=../executor/ir/core/resource/movie_schema.json graph.planner.opt=CBO graph.statistics:=./src/main/resources/statistics/movie_statistics.json graph.physical.opt=proto graph.planner.rules=NotMatchToAntiJoinRule,FilterIntoJoinRule,FilterMatchRule,ExtendIntersectRule,ExpandGetVFusionRule &
sleep 10s
export ENGINE_TYPE=pegasus
cd ${base_dir} && make cypher_test
Expand Down
4 changes: 2 additions & 2 deletions interactive_engine/compiler/set_properties.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ frontend_query_per_second_limit="frontend.query.per.second.limit: $FRONTEND_QUER

gremlin_script_language_name="gremlin.script.language.name: $GREMLIN_SCRIPT_LANGUAGE_NAME"

physical_opt_config="physical.opt.config: $PHYSICAL_OPT_CONFIG"
graph_physical_opt="graph.physical.opt: $GRAPH_PHYSICAL_OPT"

count=1;
while (($count<$SERVERSSIZE))
Expand All @@ -47,6 +47,6 @@ done

graph_schema="graph.schema: $GRAPH_SCHEMA"

properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port\n$frontend_query_per_second_limit\n$gremlin_script_language_name\n$physical_opt_config"
properties="$worker_num\n$timeout\n$batch_size\n$output_capacity\n$hosts\n$server_num\n$graph_schema\n$gremlin_server_port\n$cypher_server_port\n$frontend_query_per_second_limit\n$gremlin_script_language_name\n$graph_physical_opt"

echo -e $properties > ./conf/ir.compiler.properties
Loading

0 comments on commit d44cc3d

Please sign in to comment.