From 69802005dcded40dbbab77d071c69055149c9c0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=9F=E3=81=8C=E9=99=8D=E3=82=89=E3=81=AA=E3=81=84?= =?UTF-8?q?=E8=A1=97?= <106635619+0130w@users.noreply.github.com> Date: Mon, 23 Sep 2024 16:13:32 +0800 Subject: [PATCH] [Feature](tvf) Support using tvf to read sequence_file/rc_file in local/hdfs/s3 (#41080) ## Proposed changes Issue Number: #30669 This change supports reading the contents of external file tables from rcbinary, rctext, and sequence files via the JNI connector. todo-lists: - [x] Support read rc_binary files using local tvf - [x] Support read rc_text/sequence files using local tvf - [x] Support using s3/hdfs tvf Example: **sequence file:** input: ``` mysql select * from local( "file_path" = "test/test.seq", "format" = "sequence", "backend_id" = "10011", "hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array;k15:map;k16:struct"); ``` output: ``` +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9 | k10 | k11 | k12 | k13 | k14 | k15 | k16 | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | 7 | 13 | 74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char | Varchar | 1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | {"key2":2, "key1":1} | {"name":"John", "age":30} | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ 1 row in set (0.07 sec) ``` **rc_binary file:** input: ```mysql select * from local( "file_path" = "test/test.rcbinary", "format" = "rc_binary", "backend_id" = "10011", "hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array;k15:m ap;k16:struct"); ``` output: ``` +------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+ | k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9 | k10 | k11 | k12 | k13 | k14 | k15 | k16 | +------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+ | 1 | 2 | 3 | 10000000000 | 1.23 | 3.14 | 100.50 | you | are | beautiful | 0 | 2023-10-29 02:00:00 | 2023-10-29 | ["D", "E", "F"] | {"k2":5, "k1":3} | {"name":"chandler", "age":54} | +------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+ 1 row in set (0.12 sec) ``` **rc_text file:** input: ``` mysql select * from local( "file_path" = "test/test.rctext", "format" = "rc_text", "backend_id" = "10011", "hive_schema"="k1:tiny int;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array;k15: map;k16:struct"); ``` output: ``` +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | k1 | k2 | k3 | k4 | k5 | k6 | k7 | k8 | k9 | k10 | k11 | k12 | k13 | k14 | k15 | k16 | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ | 7 | 13 | 74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char | Varchar | 1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | {"key2":2, "key1":1} | {"name":"John", "age":30} | +------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+ 1 row in set (0.06 sec) ``` --- .../vec/exec/format/hive/hive_jni_reader.cpp | 102 ++++++ be/src/vec/exec/format/hive/hive_jni_reader.h | 84 +++++ be/src/vec/exec/jni_connector.cpp | 78 +++++ be/src/vec/exec/jni_connector.h | 2 + be/src/vec/exec/scan/vfile_scanner.cpp | 10 + build.sh | 2 + fe/be-java-extensions/hive-scanner/pom.xml | 102 ++++++ .../apache/doris/hive/HiveColumnValue.java | 311 ++++++++++++++++++ .../apache/doris/hive/HiveFileContext.java | 52 +++ .../org/apache/doris/hive/HiveJNIScanner.java | 259 +++++++++++++++ .../org/apache/doris/hive/HiveProperties.java | 49 +++ .../java/org/apache/doris/hive/S3Utils.java | 102 ++++++ .../src/main/resources/package.xml | 41 +++ fe/be-java-extensions/pom.xml | 1 + .../common/util/FileFormatConstants.java | 11 +- .../doris/common/util/FileFormatUtils.java | 234 ++++++++++--- .../ExternalFileTableValuedFunction.java | 18 + fe/pom.xml | 5 + gensrc/thrift/PlanNodes.thrift | 5 +- 19 files changed, 1413 insertions(+), 55 deletions(-) create mode 100644 be/src/vec/exec/format/hive/hive_jni_reader.cpp create mode 100644 be/src/vec/exec/format/hive/hive_jni_reader.h create mode 100644 fe/be-java-extensions/hive-scanner/pom.xml create mode 100644 fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java create mode 100644 fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java create mode 100644 fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java create mode 100644 fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java create mode 100644 fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java create mode 100644 fe/be-java-extensions/hive-scanner/src/main/resources/package.xml diff --git a/be/src/vec/exec/format/hive/hive_jni_reader.cpp b/be/src/vec/exec/format/hive/hive_jni_reader.cpp new file mode 100644 index 000000000000000..1b00cee1678be57 --- /dev/null +++ b/be/src/vec/exec/format/hive/hive_jni_reader.cpp @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "hive_jni_reader.h" + +#include +#include + +#include "common/logging.h" +#include "runtime/descriptors.h" +#include "runtime/types.h" + +namespace doris::vectorized { + +HiveJNIReader::HiveJNIReader(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector& file_slot_descs, + const TFileRangeDesc& range) + : JniReader(file_slot_descs, state, profile), _params(params), _range(range) {} + +HiveJNIReader::~HiveJNIReader() = default; + +TFileType::type HiveJNIReader::get_file_type() { + TFileType::type type; + if (_range.__isset.file_type) { + type = _range.file_type; + } else { + type = _params.file_type; + } + return type; +} + +Status HiveJNIReader::init_fetch_table_reader( + std::unordered_map* colname_to_value_range) { + _colname_to_value_range = colname_to_value_range; + std::ostringstream required_fields; + std::ostringstream columns_types; + std::vector column_names; + int index = 0; + for (auto& desc : _file_slot_descs) { + std::string field = desc->col_name(); + column_names.emplace_back(field); + std::string type = JniConnector::get_jni_type_v2(desc->type()); + if (index == 0) { + required_fields << field; + columns_types << type; + } else { + required_fields << "," << field; + columns_types << "#" << type; + } + index++; + } + + TFileType::type type = get_file_type(); + std::map required_params = { + {"uri", _range.path}, + {"file_type", std::to_string(type)}, + {"file_format", std::to_string(_params.format_type)}, + {"required_fields", required_fields.str()}, + {"columns_types", columns_types.str()}, + {"split_start_offset", std::to_string(_range.start_offset)}, + {"split_size", std::to_string(_range.size)}}; + if (type == TFileType::FILE_S3) { + required_params.insert(_params.properties.begin(), _params.properties.end()); + } + _jni_connector = std::make_unique("org/apache/doris/hive/HiveJNIScanner", + required_params, column_names); + RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range)); + return _jni_connector->open(_state, _profile); +} + +Status HiveJNIReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof)); + if (*eof) { + RETURN_IF_ERROR(_jni_connector->close()); + } + return Status::OK(); +} + +Status HiveJNIReader::get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) { + for (auto& desc : _file_slot_descs) { + name_to_type->emplace(desc->col_name(), desc->type()); + } + return Status::OK(); +} + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/hive/hive_jni_reader.h b/be/src/vec/exec/format/hive/hive_jni_reader.h new file mode 100644 index 000000000000000..14051c320e09151 --- /dev/null +++ b/be/src/vec/exec/format/hive/hive_jni_reader.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include + +#include "common/status.h" +#include "exec/olap_common.h" +#include "vec/exec/format/jni_reader.h" + +namespace doris { + +class RuntimeProfile; + +class RuntimeState; + +class SlotDescriptor; + +namespace vectoried { + +class Block; + +} // namespace vectoried +struct TypeDescriptor; +} // namespace doris + +namespace doris::vectorized { + +/** + * Read hive-format file: rcbinary, rctext, sequencefile + */ +class HiveJNIReader : public JniReader { + ENABLE_FACTORY_CREATOR(HiveJNIReader); + +public: + /** + * Call java side by jni to get table data + */ + HiveJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params, + const std::vector& file_slot_descs, const TFileRangeDesc& range); + + ~HiveJNIReader() override; + + Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; + + Status get_columns(std::unordered_map* name_to_type, + std::unordered_set* missing_cols) override; + + Status init_fetch_table_reader( + std::unordered_map* colname_to_value_range); + + TFileType::type get_file_type(); + +private: + const TFileScanRangeParams _params; + const TFileRangeDesc _range; + std::string _column_names; + std::string _column_types; + std::unordered_map* _colname_to_value_range = nullptr; +}; + +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/jni_connector.cpp b/be/src/vec/exec/jni_connector.cpp index 0c2485ada3bbedb..1bba35a85dff154 100644 --- a/be/src/vec/exec/jni_connector.cpp +++ b/be/src/vec/exec/jni_connector.cpp @@ -519,6 +519,84 @@ std::string JniConnector::get_jni_type(const DataTypePtr& data_type) { } } +std::string JniConnector::get_jni_type_v2(const TypeDescriptor& desc) { + std::ostringstream buffer; + switch (desc.type) { + case TYPE_BOOLEAN: + return "boolean"; + case TYPE_TINYINT: + return "tinyint"; + case TYPE_SMALLINT: + return "smallint"; + case TYPE_INT: + return "int"; + case TYPE_BIGINT: + return "bigint"; + case TYPE_LARGEINT: + return "largeint"; + case TYPE_FLOAT: + return "float"; + case TYPE_DOUBLE: + return "double"; + case TYPE_VARCHAR: { + buffer << "varchar(" << desc.len << ")"; + return buffer.str(); + } + case TYPE_DATE: + [[fallthrough]]; + case TYPE_DATEV2: + return "date"; + case TYPE_DATETIME: + [[fallthrough]]; + case TYPE_TIME: + [[fallthrough]]; + case TYPE_DATETIMEV2: + [[fallthrough]]; + case TYPE_TIMEV2: + return "timestamp"; + case TYPE_BINARY: + return "binary"; + case TYPE_CHAR: { + buffer << "char(" << desc.len << ")"; + return buffer.str(); + } + case TYPE_STRING: + return "string"; + case TYPE_DECIMALV2: + [[fallthrough]]; + case TYPE_DECIMAL32: + [[fallthrough]]; + case TYPE_DECIMAL64: + [[fallthrough]]; + case TYPE_DECIMAL128I: { + buffer << "decimal(" << desc.precision << "," << desc.scale << ")"; + return buffer.str(); + } + case TYPE_STRUCT: { + buffer << "struct<"; + for (int i = 0; i < desc.children.size(); ++i) { + if (i != 0) { + buffer << ","; + } + buffer << desc.field_names[i] << ":" << get_jni_type(desc.children[i]); + } + buffer << ">"; + return buffer.str(); + } + case TYPE_ARRAY: { + buffer << "array<" << get_jni_type(desc.children[0]) << ">"; + return buffer.str(); + } + case TYPE_MAP: { + buffer << "map<" << get_jni_type(desc.children[0]) << "," << get_jni_type(desc.children[1]) + << ">"; + return buffer.str(); + } + default: + return "unsupported"; + } +} + std::string JniConnector::get_jni_type(const TypeDescriptor& desc) { std::ostringstream buffer; switch (desc.type) { diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h index 52a3fb2e7782ca6..7a1c5a1df4968c9 100644 --- a/be/src/vec/exec/jni_connector.h +++ b/be/src/vec/exec/jni_connector.h @@ -261,6 +261,8 @@ class JniConnector : public ProfileCollector { /** * Map PrimitiveType to hive type. */ + static std::string get_jni_type_v2(const TypeDescriptor& desc); + static std::string get_jni_type(const TypeDescriptor& desc); static Status to_java_table(Block* block, size_t num_rows, const ColumnNumbers& arguments, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 52aa752935e88d0..58f520e693baa8f 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -56,6 +56,7 @@ #include "vec/exec/format/arrow/arrow_stream_reader.h" #include "vec/exec/format/avro/avro_jni_reader.h" #include "vec/exec/format/csv/csv_reader.h" +#include "vec/exec/format/hive/hive_jni_reader.h" #include "vec/exec/format/json/new_json_reader.h" #include "vec/exec/format/orc/vorc_reader.h" #include "vec/exec/format/parquet/vparquet_reader.h" @@ -945,6 +946,15 @@ Status VFileScanner::_get_next_reader() { ->init_fetch_table_reader(_colname_to_value_range); break; } + case TFileFormatType::FORMAT_SEQUENCE: + case TFileFormatType::FORMAT_RCTEXT: + case TFileFormatType::FORMAT_RCBINARY: { + _cur_reader = HiveJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs, + range); + init_status = ((HiveJNIReader*)(_cur_reader.get())) + ->init_fetch_table_reader(_colname_to_value_range); + break; + } case TFileFormatType::FORMAT_WAL: { _cur_reader.reset(new WalReader(_state)); init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc); diff --git a/build.sh b/build.sh index c8aa2bf2c43626d..9ad36f7bbaad0ba 100755 --- a/build.sh +++ b/build.sh @@ -534,6 +534,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then modules+=("be-java-extensions/max-compute-scanner") modules+=("be-java-extensions/avro-scanner") modules+=("be-java-extensions/lakesoul-scanner") + modules+=("be-java-extensions/hive-scanner") modules+=("be-java-extensions/preload-extensions") # If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES @@ -819,6 +820,7 @@ EOF extensions_modules+=("max-compute-scanner") extensions_modules+=("avro-scanner") extensions_modules+=("lakesoul-scanner") + extensions_modules+=("hive-scanner") extensions_modules+=("preload-extensions") if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then diff --git a/fe/be-java-extensions/hive-scanner/pom.xml b/fe/be-java-extensions/hive-scanner/pom.xml new file mode 100644 index 000000000000000..64b4f7f0342b66e --- /dev/null +++ b/fe/be-java-extensions/hive-scanner/pom.xml @@ -0,0 +1,102 @@ + + + + + be-java-extensions + org.apache.doris + ${revision} + + 4.0.0 + + hive-scanner + + + 8 + 8 + UTF-8 + + + + + org.apache.doris + java-common + ${project.version} + + + org.apache.hadoop + hadoop-client + provided + + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-hdfs + provided + + + org.apache.doris + hive-catalog-shade + provided + + + org.apache.hive + hive-serde + provided + + + org.apache.hive + hive-common + provided + + + + + hive-scanner + + + org.apache.maven.plugins + maven-assembly-plugin + + src/main/resources/package.xml + + + + + + + + + make-assembly + package + + single + + + + + + + \ No newline at end of file diff --git a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java new file mode 100644 index 000000000000000..03c716677f7f615 --- /dev/null +++ b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveColumnValue.java @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hive; + +import org.apache.doris.common.jni.vec.ColumnValue; + +import org.apache.hadoop.hive.common.type.Date; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.Timestamp; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map.Entry; + +public class HiveColumnValue implements ColumnValue { + + private static final Logger LOG = LogManager.getLogger(HiveColumnValue.class); + private final Object fieldData; + private final ObjectInspector fieldInspector; + + public HiveColumnValue(ObjectInspector fieldInspector, Object fieldData) { + this.fieldInspector = fieldInspector; + this.fieldData = fieldData; + } + + private Object inspectObject() { + if (fieldData == null) { + return null; + } + if (fieldInspector instanceof PrimitiveObjectInspector) { + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) fieldInspector; + return poi.getPrimitiveJavaObject(fieldData); + } + return fieldData; + } + + @Override + public boolean canGetStringAsBytes() { + return fieldInspector instanceof BinaryObjectInspector; + } + + @Override + public boolean isNull() { + return fieldData == null || inspectObject() == null; + } + + @Override + public boolean getBoolean() { + Object value = inspectObject(); + return value != null && ((Boolean) value); + } + + @Override + public byte getByte() { + Object value = inspectObject(); + if (value == null) { + return 0; + } + if (value instanceof Number) { + return ((Number) value).byteValue(); + } + return Byte.parseByte(value.toString()); + } + + @Override + public short getShort() { + Object value = inspectObject(); + if (value == null) { + return 0; + } + if (value instanceof Number) { + return ((Number) value).shortValue(); + } + return Short.parseShort(value.toString()); + } + + @Override + public int getInt() { + Object value = inspectObject(); + if (value == null) { + return 0; + } + if (value instanceof Number) { + return ((Number) value).intValue(); + } + return Integer.parseInt(value.toString()); + } + + @Override + public float getFloat() { + Object value = inspectObject(); + if (value == null) { + return 0.0f; + } + if (value instanceof Number) { + return ((Number) value).floatValue(); + } + return Float.parseFloat(value.toString()); + } + + @Override + public long getLong() { + Object value = inspectObject(); + if (value == null) { + return 0L; + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + return Long.parseLong(value.toString()); + } + + @Override + public double getDouble() { + Object value = inspectObject(); + if (value == null) { + return 0.0d; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + return Double.parseDouble(value.toString()); + } + + @Override + public BigInteger getBigInteger() { + Object value = inspectObject(); + if (value == null) { + return null; + } + if (value instanceof BigInteger) { + return (BigInteger) value; + } else if (value instanceof Number) { + return BigInteger.valueOf(((Number) value).longValue()); + } + return new BigInteger(value.toString()); + } + + @Override + public BigDecimal getDecimal() { + Object value = inspectObject(); + if (value == null) { + return null; + } + if (value instanceof HiveDecimal) { + return ((HiveDecimal) value).bigDecimalValue(); + } else if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + return new BigDecimal(value.toString()); + } + + @Override + public String getString() { + Object value = inspectObject(); + if (value == null) { + return null; + } + return value.toString(); + } + + @Override + public byte[] getStringAsBytes() { + if (fieldData == null) { + return null; + } + if (fieldInspector instanceof BinaryObjectInspector) { + BytesWritable bw = ((BinaryObjectInspector) fieldInspector).getPrimitiveWritableObject(fieldData); + return bw.copyBytes(); + } else if (fieldInspector instanceof StringObjectInspector) { + String str = getString(); + return str != null ? str.getBytes() : null; + } + return null; + } + + @Override + public LocalDate getDate() { + if (fieldData == null) { + return null; + } + if (fieldInspector instanceof DateObjectInspector) { + DateObjectInspector doi = (DateObjectInspector) fieldInspector; + Date hiveDate = doi.getPrimitiveJavaObject(fieldData); + if (hiveDate != null) { + return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(), hiveDate.getDay()); + } + } else if (fieldInspector instanceof PrimitiveObjectInspector) { + Object value = inspectObject(); + if (value instanceof Date) { + Date hiveDate = (Date) value; + return LocalDate.of(hiveDate.getYear(), hiveDate.getMonth(), hiveDate.getDay()); + } + } + return null; + } + + @Override + public LocalDateTime getDateTime() { + if (fieldData == null) { + return null; + } + if (fieldInspector instanceof TimestampObjectInspector) { + TimestampObjectInspector toi = (TimestampObjectInspector) fieldInspector; + Timestamp hiveTimestamp = toi.getPrimitiveJavaObject(fieldData); + if (hiveTimestamp != null) { + // Convert Hive Timestamp to LocalDateTime + return LocalDateTime.of( + hiveTimestamp.getYear(), + hiveTimestamp.getMonth(), + hiveTimestamp.getDay(), + hiveTimestamp.getHours(), + hiveTimestamp.getMinutes(), + hiveTimestamp.getSeconds(), + hiveTimestamp.getNanos() + ); + } + } else if (fieldInspector instanceof DateObjectInspector) { + LocalDate date = getDate(); + if (date != null) { + return date.atStartOfDay(); + } + } + return null; + } + + @Override + public byte[] getBytes() { + return getStringAsBytes(); + } + + @Override + public void unpackArray(List values) { + if (fieldData == null) { + return; + } + ListObjectInspector listInspector = (ListObjectInspector) fieldInspector; + List items = listInspector.getList(fieldData); + ObjectInspector itemInspector = listInspector.getListElementObjectInspector(); + for (Object item : items) { + ColumnValue cv = item != null ? new HiveColumnValue(itemInspector, item) : null; + values.add(cv); + } + } + + @Override + public void unpackMap(List keys, List values) { + if (fieldData == null) { + return; + } + MapObjectInspector mapInspector = (MapObjectInspector) fieldInspector; + ObjectInspector keyInspector = mapInspector.getMapKeyObjectInspector(); + ObjectInspector valueInspector = mapInspector.getMapValueObjectInspector(); + for (Entry entry : mapInspector.getMap(fieldData).entrySet()) { + ColumnValue key = entry.getKey() != null ? new HiveColumnValue(keyInspector, entry.getKey()) : null; + ColumnValue value = entry.getValue() != null ? new HiveColumnValue(valueInspector, entry.getValue()) : null; + keys.add(key); + values.add(value); + } + } + + @Override + public void unpackStruct(List structFieldIndex, List values) { + if (fieldData == null) { + return; + } + StructObjectInspector structInspector = (StructObjectInspector) fieldInspector; + List fields = structInspector.getAllStructFieldRefs(); + for (Integer idx : structFieldIndex) { + if (idx != null && idx >= 0 && idx < fields.size()) { + StructField sf = fields.get(idx); + Object fieldObj = structInspector.getStructFieldData(fieldData, sf); + ColumnValue cv = fieldObj != null ? new HiveColumnValue(sf.getFieldObjectInspector(), fieldObj) : null; + values.add(cv); + } else { + values.add(null); + } + } + } +} diff --git a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java new file mode 100644 index 000000000000000..0b13935a5fcc272 --- /dev/null +++ b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveFileContext.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hive; + +import org.apache.doris.thrift.TFileFormatType; + +public class HiveFileContext { + private final String serde; + private final String inputFormat; + + HiveFileContext(TFileFormatType fileFormatType) { + switch (fileFormatType) { + case FORMAT_RCBINARY: + serde = HiveProperties.RC_BINARY_SERDE_CLASS; + inputFormat = HiveProperties.RC_BINARY_INPUT_FORMAT; + break; + case FORMAT_RCTEXT: + serde = HiveProperties.RC_TEXT_SERDE_CLASS; + inputFormat = HiveProperties.RC_TEXT_INPUT_FORMAT; + break; + case FORMAT_SEQUENCE: + serde = HiveProperties.SEQUENCE_SERDE_CLASS; + inputFormat = HiveProperties.SEQUENCE_INPUT_FORMAT; + break; + default: + throw new UnsupportedOperationException("Unrecognized file format " + fileFormatType); + } + } + + String getSerde() { + return serde; + } + + String getInputFormat() { + return inputFormat; + } +} diff --git a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java new file mode 100644 index 000000000000000..ae98bb28a10766a --- /dev/null +++ b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveJNIScanner.java @@ -0,0 +1,259 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hive; + +import org.apache.doris.avro.S3Utils; +import org.apache.doris.common.jni.JniScanner; +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.TableSchema; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileType; + +import io.trino.spi.classloader.ThreadContextClassLoader; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +public class HiveJNIScanner extends JniScanner { + + private static final Logger LOG = LogManager.getLogger(HiveJNIScanner.class); + private final ClassLoader classLoader; + private final TFileType fileType; + private final int fetchSize; + private final Map requiredParams; + private final String[] columnTypes; + private final String[] requiredFields; + private final ColumnType[] requiredTypes; + private final int[] requiredColumnIds; + private final TFileFormatType fileFormat; + private final StructField[] structFields; + private final ObjectInspector[] fieldInspectors; + private final Long splitStartOffset; + private final Long splitSize; + private String uri; + private StructObjectInspector rowInspector; + private Deserializer deserializer; + private RecordReader reader; + private Writable key; + private Writable value; + private HiveFileContext hiveFileContext; + + public HiveJNIScanner(int fetchSize, Map requiredParams) { + this.classLoader = this.getClass().getClassLoader(); + this.fetchSize = fetchSize; + this.requiredParams = requiredParams; + this.fileType = TFileType.findByValue(Integer.parseInt(requiredParams.get(HiveProperties.FILE_TYPE))); + this.fileFormat = TFileFormatType.findByValue(Integer.parseInt(requiredParams.get(HiveProperties.FILE_FORMAT))); + this.columnTypes = requiredParams.get(HiveProperties.COLUMNS_TYPES) + .split(HiveProperties.COLUMNS_TYPE_DELIMITER); + this.requiredFields = requiredParams.get(HiveProperties.REQUIRED_FIELDS).split(HiveProperties.FIELDS_DELIMITER); + this.requiredTypes = new ColumnType[requiredFields.length]; + this.requiredColumnIds = new int[requiredFields.length]; + this.uri = requiredParams.get(HiveProperties.URI); + this.splitStartOffset = Long.parseLong(requiredParams.get(HiveProperties.SPLIT_START_OFFSET)); + this.splitSize = Long.parseLong(requiredParams.get(HiveProperties.SPLIT_SIZE)); + this.structFields = new StructField[requiredFields.length]; + this.fieldInspectors = new ObjectInspector[requiredFields.length]; + } + + private void processS3Conf(String accessKey, String secretKey, String endpoint, + String region, JobConf jobConf) { + if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) { + jobConf.set(HiveProperties.FS_S3A_ACCESS_KEY, accessKey); + jobConf.set(HiveProperties.FS_S3A_SECRET_KEY, secretKey); + } + jobConf.set(HiveProperties.FS_S3A_ENDPOINT, endpoint); + jobConf.set(HiveProperties.FS_S3A_REGION, region); + } + + private String processS3Uri(String uri) throws IOException { + S3Utils.parseURI(uri); + uri = "s3a://" + S3Utils.getBucket() + "/" + S3Utils.getKey(); + return uri; + } + + private void initReader() throws Exception { + this.hiveFileContext = new HiveFileContext(fileFormat); + Properties properties = createProperties(); + JobConf jobConf = makeJobConf(properties); + switch (fileType) { + case FILE_LOCAL: + case FILE_HDFS: + break; + case FILE_S3: + String accessKey = requiredParams.get(HiveProperties.S3_ACCESS_KEY); + String secretKey = requiredParams.get(HiveProperties.S3_SECRET_KEY); + String endpoint = requiredParams.get(HiveProperties.S3_ENDPOINT); + String region = requiredParams.get(HiveProperties.S3_REGION); + processS3Conf(accessKey, secretKey, endpoint, region, jobConf); + uri = processS3Uri(uri); + break; + default: + throw new Exception("Unsupported " + fileType.getValue() + " file type."); + } + Path path = new Path(uri); + FileSplit fileSplit = new FileSplit(path, splitStartOffset, splitSize, (String[]) null); + InputFormat inputFormatClass = createInputFormat(jobConf, hiveFileContext.getInputFormat()); + reader = (RecordReader) inputFormatClass.getRecordReader(fileSplit, jobConf, Reporter.NULL); + deserializer = getDeserializer(jobConf, properties, hiveFileContext.getSerde()); + rowInspector = getTableObjectInspector(deserializer); + for (int i = 0; i < requiredFields.length; i++) { + StructField field = rowInspector.getStructFieldRef(requiredFields[i]); + structFields[i] = field; + fieldInspectors[i] = field.getFieldObjectInspector(); + } + } + + private InputFormat createInputFormat(Configuration conf, String inputFormat) throws Exception { + Class clazz = conf.getClassByName(inputFormat); + Class> cls = + (Class>) clazz.asSubclass(InputFormat.class); + return ReflectionUtils.newInstance(cls, conf); + } + + private StructObjectInspector getTableObjectInspector(Deserializer deserializer) throws Exception { + ObjectInspector inspector = deserializer.getObjectInspector(); + return (StructObjectInspector) inspector; + } + + private Properties createProperties() { + Properties properties = new Properties(); + properties.setProperty(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, + Arrays.stream(this.requiredColumnIds).mapToObj(String::valueOf).collect(Collectors.joining(","))); + properties.setProperty(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, String.join(",", requiredFields)); + properties.setProperty(HiveProperties.COLUMNS, String.join(",", requiredFields)); + properties.setProperty(HiveProperties.COLUMNS2TYPES, String.join(",", columnTypes)); + properties.setProperty(serdeConstants.SERIALIZATION_LIB, hiveFileContext.getSerde()); + return properties; + } + + private JobConf makeJobConf(Properties properties) { + Configuration conf = new Configuration(); + JobConf jobConf = new JobConf(conf); + jobConf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + properties.stringPropertyNames().forEach(name -> jobConf.set(name, properties.getProperty(name))); + return jobConf; + } + + private Deserializer getDeserializer(Configuration configuration, Properties properties, String name) + throws Exception { + Class deserializerClass = Class.forName(name, true, JavaUtils.getClassLoader()) + .asSubclass(Deserializer.class); + Deserializer deserializer = deserializerClass.getConstructor().newInstance(); + deserializer.initialize(configuration, properties); + return deserializer; + } + + @Override + public void open() throws IOException { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + parseRequiredTypes(); + initTableInfo(requiredTypes, requiredFields, fetchSize); + initReader(); + } catch (Exception e) { + close(); + LOG.error("Failed to open the hive reader.", e); + throw new IOException("Failed to open the hive reader.", e); + } + } + + @Override + public void close() throws IOException { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + if (reader != null) { + reader.close(); + } + } catch (IOException e) { + LOG.error("Failed to close the hive reader.", e); + throw new IOException("Failed to close the hive reader.", e); + } + } + + @Override + public int getNext() throws IOException { + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + key = reader.createKey(); + value = reader.createValue(); + int numRows = 0; + for (; numRows < getBatchSize(); numRows++) { + if (!reader.next(key, value)) { + break; + } + Object rowData = deserializer.deserialize(value); + for (int i = 0; i < requiredFields.length; i++) { + Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]); + if (fieldData == null) { + appendData(i, null); + } else { + HiveColumnValue fieldValue = new HiveColumnValue(fieldInspectors[i], fieldData); + appendData(i, fieldValue); + } + } + } + return numRows; + } catch (Exception e) { + close(); + LOG.error("Failed to get next row of data.", e); + throw new IOException("Failed to get next row of data.", e); + } + } + + @Override + protected TableSchema parseTableSchema() throws UnsupportedOperationException { + return null; + } + + private void parseRequiredTypes() { + HashMap hiveColumnNameToIndex = new HashMap<>(); + HashMap hiveColumnNameToType = new HashMap<>(); + for (int i = 0; i < requiredFields.length; i++) { + hiveColumnNameToIndex.put(requiredFields[i], i); + hiveColumnNameToType.put(requiredFields[i], columnTypes[i]); + } + + for (int i = 0; i < requiredFields.length; i++) { + String fieldName = requiredFields[i]; + requiredColumnIds[i] = hiveColumnNameToIndex.get(fieldName); + String typeStr = hiveColumnNameToType.get(fieldName); + requiredTypes[i] = ColumnType.parseType(fieldName, typeStr); + } + } +} diff --git a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java new file mode 100644 index 000000000000000..3949c84dd314d72 --- /dev/null +++ b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/HiveProperties.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.hive; + +public class HiveProperties { + + protected static final String COLUMNS_TYPE_DELIMITER = "#"; + protected static final String FIELDS_DELIMITER = ","; + + protected static final String COLUMNS_TYPES = "columns_types"; + protected static final String REQUIRED_FIELDS = "required_fields"; + protected static final String FILE_TYPE = "file_type"; + protected static final String FILE_FORMAT = "file_format"; + protected static final String URI = "uri"; + protected static final String S3_ACCESS_KEY = "s3.access_key"; + protected static final String S3_SECRET_KEY = "s3.secret_key"; + protected static final String S3_ENDPOINT = "s3.endpoint"; + protected static final String S3_REGION = "s3.region"; + protected static final String COLUMNS = "columns"; + protected static final String COLUMNS2TYPES = "columns.types"; + protected static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key"; + protected static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key"; + protected static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint"; + protected static final String FS_S3A_REGION = "fs.s3a.region"; + protected static final String SPLIT_START_OFFSET = "split_start_offset"; + protected static final String SPLIT_SIZE = "split_size"; + protected static final String RC_BINARY_SERDE_CLASS + = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"; + protected static final String RC_BINARY_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; + protected static final String RC_TEXT_SERDE_CLASS = "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"; + protected static final String RC_TEXT_INPUT_FORMAT = "org.apache.hadoop.hive.ql.io.RCFileInputFormat"; + protected static final String SEQUENCE_SERDE_CLASS = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; + protected static final String SEQUENCE_INPUT_FORMAT = "org.apache.hadoop.mapred.SequenceFileInputFormat"; +} diff --git a/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java new file mode 100644 index 000000000000000..45845af3c0392e3 --- /dev/null +++ b/fe/be-java-extensions/hive-scanner/src/main/java/org/apache/doris/hive/S3Utils.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.avro; + +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; + +public class S3Utils { + private static final String SCHEMA_S3 = "s3"; + private static final String SCHEMA_HTTP = "http"; + private static final String SCHEMA_HTTPS = "https"; + private static final String SCHEME_DELIM = "://"; + private static final String PATH_DELIM = "/"; + private static final String QUERY_DELIM = "\\?"; + private static final String FRAGMENT_DELIM = "#"; + private static String bucket; + private static String key; + + /** + * eg: + * s3: s3://bucket1/path/to/file.txt + * http: http://10.10.10.1:9000/bucket1/to/file.txt + * https: https://10.10.10.1:9000/bucket1/to/file.txt + *

+ * schema: s3,http,https + * bucket: bucket1 + * key: path/to/file.txt + */ + public static void parseURI(String uri) throws IOException { + if (StringUtils.isEmpty(uri)) { + throw new IOException("s3 uri is empty."); + } + String[] schemeSplit = uri.split(SCHEME_DELIM); + String rest; + if (schemeSplit.length == 2) { + if (schemeSplit[0].equalsIgnoreCase(SCHEMA_S3)) { + // has scheme, eg: s3://bucket1/path/to/file.txt + rest = schemeSplit[1]; + String[] authoritySplit = rest.split(PATH_DELIM, 2); + if (authoritySplit.length < 1) { + throw new IOException("Invalid S3 URI. uri=" + uri); + } + bucket = authoritySplit[0]; + // support s3://bucket1 + key = authoritySplit.length == 1 ? "/" : authoritySplit[1]; + } else if (schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTP) || schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTPS)) { + // has scheme, eg: http(s)://host/bucket1/path/to/file.txt + rest = schemeSplit[1]; + String[] authoritySplit = rest.split(PATH_DELIM, 3); + if (authoritySplit.length != 3) { + throw new IOException("Invalid S3 HTTP URI: uri=" + uri); + } + // authority_split[1] is host + bucket = authoritySplit[1]; + key = authoritySplit[2]; + } else { + throw new IOException("Invalid S3 HTTP URI: uri=" + uri); + } + + } else if (schemeSplit.length == 1) { + // no scheme, eg: path/to/file.txt + bucket = ""; // unknown + key = uri; + } else { + throw new IOException("Invalid S3 URI. uri=" + uri); + } + + key = key.trim(); + if (StringUtils.isEmpty(key)) { + throw new IOException("Invalid S3 URI. uri=" + uri); + } + // Strip query and fragment if they exist + String[] querySplit = key.split(QUERY_DELIM); + String[] fragmentSplit = querySplit[0].split(FRAGMENT_DELIM); + key = fragmentSplit[0]; + } + + public static String getBucket() { + return bucket; + } + + public static String getKey() { + return key; + } + +} diff --git a/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml b/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml new file mode 100644 index 000000000000000..d9ebe58202b9edb --- /dev/null +++ b/fe/be-java-extensions/hive-scanner/src/main/resources/package.xml @@ -0,0 +1,41 @@ + + + + jar-with-dependencies + + jar + + false + + + / + true + true + runtime + + + **/Log4j2Plugins.dat + + + + + diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml index bbe056739d51eca..8123c5c63d905cf 100644 --- a/fe/be-java-extensions/pom.xml +++ b/fe/be-java-extensions/pom.xml @@ -31,6 +31,7 @@ under the License. lakesoul-scanner preload-extensions trino-connector-scanner + hive-scanner diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java index bdb2e97b9f25185..bdc12226600de14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatConstants.java @@ -34,6 +34,9 @@ public class FileFormatConstants { public static final String FORMAT_AVRO = "avro"; public static final String FORMAT_WAL = "wal"; public static final String FORMAT_ARROW = "arrow"; + public static final String FORMAT_RC_BINARY = "rc_binary"; + public static final String FORMAT_RC_TEXT = "rc_text"; + public static final String FORMAT_SEQUENCE = "sequence"; public static final String PROP_FORMAT = "format"; public static final String PROP_COLUMN_SEPARATOR = "column_separator"; @@ -47,6 +50,7 @@ public class FileFormatConstants { public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes"; public static final String PROP_SKIP_LINES = "skip_lines"; public static final String PROP_CSV_SCHEMA = "csv_schema"; + public static final String PROP_HIVE_SCHEMA = "hive_schema"; public static final String PROP_COMPRESS = "compress"; public static final String PROP_COMPRESS_TYPE = "compress_type"; public static final String PROP_PATH_PARTITION_KEYS = "path_partition_keys"; @@ -55,5 +59,10 @@ public class FileFormatConstants { public static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("decimal\\((\\d+),(\\d+)\\)"); // datetime(p) public static final Pattern DATETIME_TYPE_PATTERN = Pattern.compile("datetime\\((\\d+)\\)"); - + // timestamp(p) + public static final Pattern TIMESTAMP_TYPE_PATTERN = Pattern.compile("timestamp\\((\\d+)\\)"); + // char(len) + public static final Pattern CHAR_TYPE_PATTERN = Pattern.compile("char\\((\\d+)\\)"); + // varchar(len) + public static final Pattern VARCHAR_TYPE_PATTERN = Pattern.compile("varchar\\((\\d+)\\)"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java index 0b646a00b164d1a..c79647a9c36a8e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/FileFormatUtils.java @@ -17,14 +17,19 @@ package org.apache.doris.common.util; +import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeNameFormat; import com.google.common.base.Strings; +import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; @@ -37,72 +42,195 @@ public static boolean isCsv(String formatStr) { || FileFormatConstants.FORMAT_HIVE_TEXT.equalsIgnoreCase(formatStr); } + public static boolean isHiveFormat(String formatStr) { + return FileFormatConstants.FORMAT_RC_BINARY.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_RC_TEXT.equalsIgnoreCase(formatStr) + || FileFormatConstants.FORMAT_SEQUENCE.equalsIgnoreCase(formatStr); + } + // public for unit test public static void parseCsvSchema(List csvSchema, String csvSchemaStr) throws AnalysisException { if (Strings.isNullOrEmpty(csvSchemaStr)) { return; } - // the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6)" + // the schema str is like: "k1:int;k2:bigint;k3:varchar(20);k4:datetime(6);" + + // "k5:array;k6:map,k7:struct" String[] schemaStrs = csvSchemaStr.split(";"); try { for (String schemaStr : schemaStrs) { - String[] kv = schemaStr.replace(" ", "").split(":"); - if (kv.length != 2) { - throw new AnalysisException("invalid csv schema: " + csvSchemaStr); + schemaStr = schemaStr.replace(" ", ""); + int colonIndex = schemaStr.indexOf(":"); + if (colonIndex == -1) { + throw new AnalysisException("invalid schema: " + csvSchemaStr); } - Column column = null; - String name = kv[0].toLowerCase(); + String name = schemaStr.substring(0, colonIndex).toLowerCase(); + String type = schemaStr.substring(colonIndex + 1).toLowerCase(); FeNameFormat.checkColumnName(name); - String type = kv[1].toLowerCase(); - if (type.equals("tinyint")) { - column = new Column(name, PrimitiveType.TINYINT, true); - } else if (type.equals("smallint")) { - column = new Column(name, PrimitiveType.SMALLINT, true); - } else if (type.equals("int")) { - column = new Column(name, PrimitiveType.INT, true); - } else if (type.equals("bigint")) { - column = new Column(name, PrimitiveType.BIGINT, true); - } else if (type.equals("largeint")) { - column = new Column(name, PrimitiveType.LARGEINT, true); - } else if (type.equals("float")) { - column = new Column(name, PrimitiveType.FLOAT, true); - } else if (type.equals("double")) { - column = new Column(name, PrimitiveType.DOUBLE, true); - } else if (type.startsWith("decimal")) { - // regex decimal(p, s) - Matcher matcher = FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(type); - if (!matcher.find()) { - throw new AnalysisException("invalid decimal type: " + type); - } - int precision = Integer.parseInt(matcher.group(1)); - int scale = Integer.parseInt(matcher.group(2)); - column = new Column(name, ScalarType.createDecimalV3Type(precision, scale), false, null, true, null, - ""); - } else if (type.equals("date")) { - column = new Column(name, ScalarType.createDateType(), false, null, true, null, ""); - } else if (type.startsWith("datetime")) { - int scale = 0; - if (!type.equals("datetime")) { - // regex datetime(s) - Matcher matcher = FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(type); - if (!matcher.find()) { - throw new AnalysisException("invalid datetime type: " + type); - } - scale = Integer.parseInt(matcher.group(1)); - } - column = new Column(name, ScalarType.createDatetimeV2Type(scale), false, null, true, null, ""); - } else if (type.equals("string")) { - column = new Column(name, PrimitiveType.STRING, true); - } else if (type.equals("boolean")) { - column = new Column(name, PrimitiveType.BOOLEAN, true); - } else { - throw new AnalysisException("unsupported column type: " + type); - } + + Type columnType = parseType(type); + Column column = new Column(name, columnType, false, null, true, null, ""); + csvSchema.add(column); } } catch (Exception e) { - throw new AnalysisException("invalid csv schema: " + e.getMessage()); + throw new AnalysisException("invalid schema: " + e.getMessage()); + } + } + + private static Type parseType(String typeStr) throws AnalysisException { + typeStr = typeStr.trim().toLowerCase(); + if (typeStr.equals("tinyint")) { + return ScalarType.TINYINT; + } else if (typeStr.equals("smallint")) { + return ScalarType.SMALLINT; + } else if (typeStr.equals("int")) { + return ScalarType.INT; + } else if (typeStr.equals("bigint")) { + return ScalarType.BIGINT; + } else if (typeStr.equals("largeint")) { + return ScalarType.LARGEINT; + } else if (typeStr.equals("float")) { + return ScalarType.FLOAT; + } else if (typeStr.equals("double")) { + return ScalarType.DOUBLE; + } else if (typeStr.startsWith("decimal")) { + // Parse decimal(p, s) + Matcher matcher = FileFormatConstants.DECIMAL_TYPE_PATTERN.matcher(typeStr); + if (!matcher.find()) { + throw new AnalysisException("Invalid decimal type: " + typeStr); + } + int precision = Integer.parseInt(matcher.group(1)); + int scale = Integer.parseInt(matcher.group(2)); + return ScalarType.createDecimalV3Type(precision, scale); + } else if (typeStr.equals("date")) { + return ScalarType.createDateType(); + } else if (typeStr.startsWith("timestamp")) { + int scale = 0; + if (!typeStr.equals("timestamp")) { + // Parse timestamp(s) + Matcher matcher = FileFormatConstants.TIMESTAMP_TYPE_PATTERN.matcher(typeStr); + if (!matcher.find()) { + throw new AnalysisException("Invalid timestamp type: " + typeStr); + } + scale = Integer.parseInt(matcher.group(1)); + } + return ScalarType.createDatetimeV2Type(scale); + } else if (typeStr.startsWith("datetime")) { + int scale = 0; + if (!typeStr.equals("datetime")) { + // Parse datetime(s) + Matcher matcher = FileFormatConstants.DATETIME_TYPE_PATTERN.matcher(typeStr); + if (!matcher.find()) { + throw new AnalysisException("Invalid datetime type: " + typeStr); + } + scale = Integer.parseInt(matcher.group(1)); + } + return ScalarType.createDatetimeV2Type(scale); + } else if (typeStr.equals("string")) { + return ScalarType.createStringType(); + } else if (typeStr.equals("boolean")) { + return ScalarType.BOOLEAN; + } else if (typeStr.startsWith("char")) { + // Parse char(len) + Matcher matcher = FileFormatConstants.CHAR_TYPE_PATTERN.matcher(typeStr); + if (matcher.matches()) { + int len = Integer.parseInt(matcher.group(1)); + return ScalarType.createChar(len); + } + throw new AnalysisException("Invalid char type: " + typeStr); + } else if (typeStr.startsWith("varchar")) { + // Parse varchar(len) + Matcher matcher = FileFormatConstants.VARCHAR_TYPE_PATTERN.matcher(typeStr); + if (matcher.matches()) { + int len = Integer.parseInt(matcher.group(1)); + return ScalarType.createVarcharType(len); + } + throw new AnalysisException("Invalid varchar type: " + typeStr); + } else if (typeStr.startsWith("array")) { + // Parse array + if (typeStr.indexOf('<') == 5 && typeStr.endsWith(">")) { + String elementTypeStr = typeStr.substring(6, typeStr.length() - 1); + Type elementType = parseType(elementTypeStr); + return new ArrayType(elementType); + } + throw new AnalysisException("Invalid array type: " + typeStr); + } else if (typeStr.startsWith("map")) { + // Parse map + if (typeStr.indexOf('<') == 3 && typeStr.endsWith(">")) { + String keyValueStr = typeStr.substring(4, typeStr.length() - 1); + int commaIndex = findCommaOutsideBrackets(keyValueStr); + if (commaIndex == -1) { + throw new AnalysisException("Invalid map type: " + typeStr); + } + String keyTypeStr = keyValueStr.substring(0, commaIndex).trim(); + String valueTypeStr = keyValueStr.substring(commaIndex + 1).trim(); + Type keyType = parseType(keyTypeStr); + Type valueType = parseType(valueTypeStr); + return new MapType(keyType, valueType); + } + throw new AnalysisException("Invalid map type: " + typeStr); + } else if (typeStr.startsWith("struct")) { + // Parse struct + if (typeStr.indexOf('<') == 6 && typeStr.endsWith(">")) { + String fieldStr = typeStr.substring(7, typeStr.length() - 1); + List fieldDefs = splitStructFields(fieldStr); + ArrayList structFields = new ArrayList<>(); + for (String fieldDef : fieldDefs) { + int colonIndex = fieldDef.indexOf(":"); + if (colonIndex == -1) { + throw new AnalysisException("Invalid struct field: " + fieldDef); + } + String fieldName = fieldDef.substring(0, colonIndex).trim(); + String fieldTypeStr = fieldDef.substring(colonIndex + 1).trim(); + Type fieldType = parseType(fieldTypeStr); + StructField structField = new StructField(fieldName, fieldType); + structFields.add(structField); + } + return new StructType(structFields); + } + throw new AnalysisException("Invalid struct type: " + typeStr); + } else { + throw new AnalysisException("Unsupported type: " + typeStr); + } + } + + private static int findCommaOutsideBrackets(String s) { + int level = 0; + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c == '<') { + level++; + } else if (c == '>') { + level--; + } else if (c == ',' && level == 0) { + return i; + } + } + return -1; + } + + private static List splitStructFields(String s) throws AnalysisException { + List fields = new ArrayList<>(); + int level = 0; + int start = 0; + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + if (c == '<') { + level++; + } else if (c == '>') { + level--; + } else if (c == ',' && level == 0) { + fields.add(s.substring(start, i).trim()); + start = i + 1; + } + } + if (start < s.length()) { + fields.add(s.substring(start).trim()); + } + if (level != 0) { + throw new AnalysisException("Unmatched angle brackets in struct definition."); } + return fields; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index e137d5d200cc842..52d0477e581c86a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -211,6 +211,15 @@ protected Map parseCommonProperties(Map properti case "wal": this.fileFormatType = TFileFormatType.FORMAT_WAL; break; + case "sequence": + this.fileFormatType = TFileFormatType.FORMAT_SEQUENCE; + break; + case "rc_binary": + this.fileFormatType = TFileFormatType.FORMAT_RCBINARY; + break; + case "rc_text": + this.fileFormatType = TFileFormatType.FORMAT_RCTEXT; + break; default: throw new AnalysisException("format:" + formatString + " is not supported."); } @@ -259,6 +268,15 @@ protected Map parseCommonProperties(Map properti } } + // When parsing rc_binary/rc_text/sequence files, reuse parseCsvSchema to parse column names and types + if (FileFormatUtils.isHiveFormat(formatString)) { + FileFormatUtils.parseCsvSchema(csvSchema, getOrDefaultAndRemove(copiedProps, + FileFormatConstants.PROP_HIVE_SCHEMA, "")); + if (LOG.isDebugEnabled()) { + LOG.debug("get hive schema: {}", csvSchema); + } + } + pathPartitionKeys = Optional.ofNullable( getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_PATH_PARTITION_KEYS, null)) .map(str -> Arrays.stream(str.split(",")) diff --git a/fe/pom.xml b/fe/pom.xml index 29b2b61530a935e..02150a03301b619 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -1468,6 +1468,11 @@ under the License. mariadb-java-client ${mariadb-java-client.version} + + org.apache.hive + hive-serde + ${hive.version} + org.apache.hive hive-common diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index c77ab48b2d404cc..6760bf54875b82c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -120,7 +120,10 @@ enum TFileFormatType { FORMAT_CSV_LZ4BLOCK, FORMAT_CSV_SNAPPYBLOCK, FORMAT_WAL, - FORMAT_ARROW + FORMAT_ARROW, + FORMAT_RCBINARY, + FORMAT_RCTEXT, + FORMAT_SEQUENCE, } // In previous versions, the data compression format and file format were stored together, as TFileFormatType,