Skip to content

Commit

Permalink
[Feature](tvf) Support using tvf to read sequence_file/rc_file in loc…
Browse files Browse the repository at this point in the history
…al/hdfs/s3 (#41080)

Issue Number: #30669

<!--Describe your changes.-->

This change supports reading the contents of external file tables from
rcbinary, rctext, and sequence files via the JNI connector.

todo-lists:
- [x] Support read rc_binary files using local tvf
- [x] Support read rc_text/sequence files using local tvf
- [x] Support using s3/hdfs tvf

Example:

**sequence file:**
input:
``` mysql
select * from local( "file_path" = "test/test.seq", "format" = "sequence", "backend_id" = "10011", "hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:map<string,int>;k16:struct<name:string,age:int>");
```
output:
```
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
| k1   | k2   | k3   | k4          | k5   | k6    | k7    | k8    | k9         | k10     | k11  | k12                 | k13        | k14             | k15                  | k16                       |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
|    7 |   13 |   74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char       | Varchar |    1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | {"key2":2, "key1":1} | {"name":"John", "age":30} |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
1 row in set (0.07 sec)
```

**rc_binary file:**
input:
```mysql
select * from local( "file_path" = "test/test.rcbinary", "format" = "rc_binary", "backend_id" = "10011", "hive_schema"="k1:tinyint;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:m
ap<string,int>;k16:struct<name:string,age:int>");
```
output:
```
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
| k1   | k2   | k3   | k4          | k5   | k6   | k7     | k8   | k9         | k10       | k11  | k12                 | k13        | k14             | k15              | k16                           |
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
|    1 |    2 |    3 | 10000000000 | 1.23 | 3.14 | 100.50 | you  | are        | beautiful |    0 | 2023-10-29 02:00:00 | 2023-10-29 | ["D", "E", "F"] | {"k2":5, "k1":3} | {"name":"chandler", "age":54} |
+------+------+------+-------------+------+------+--------+------+------------+-----------+------+---------------------+------------+-----------------+------------------+-------------------------------+
1 row in set (0.12 sec)
```

**rc_text file:**
input:
``` mysql
select * from local( "file_path" = "test/test.rctext", "format" = "rc_text", "backend_id" = "10011", "hive_schema"="k1:tiny
int;k2:smallint;k3:int;k4:bigint;k5:float;k6:double;k7:decimal(10,2);k8:string;k9:char(10);k10:varchar(20);k11:boolean;k12:timestamp;k13:date;k14:array<string>;k15:
map<string,int>;k16:struct<name:string,age:int>");
```
output:
```
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
| k1   | k2   | k3   | k4          | k5   | k6    | k7    | k8    | k9         | k10     | k11  | k12                 | k13        | k14             | k15                  | k16                       |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
|    7 |   13 |   74 | 13000000000 | 6.15 | 4.376 | 57.30 | world | Char       | Varchar |    1 | 2022-01-01 10:00:00 | 2022-01-01 | ["A", "B", "C"] | {"key2":2, "key1":1} | {"name":"John", "age":30} |
+------+------+------+-------------+------+-------+-------+-------+------------+---------+------+---------------------+------------+-----------------+----------------------+---------------------------+
1 row in set (0.06 sec)
```
  • Loading branch information
0130w authored and morningman committed Sep 24, 2024
1 parent 990f18f commit 580f38b
Show file tree
Hide file tree
Showing 19 changed files with 1,413 additions and 55 deletions.
102 changes: 102 additions & 0 deletions be/src/vec/exec/format/hive/hive_jni_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "hive_jni_reader.h"

#include <map>
#include <ostream>

#include "common/logging.h"
#include "runtime/descriptors.h"
#include "runtime/types.h"

namespace doris::vectorized {

HiveJNIReader::HiveJNIReader(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params,
const std::vector<SlotDescriptor*>& file_slot_descs,
const TFileRangeDesc& range)
: JniReader(file_slot_descs, state, profile), _params(params), _range(range) {}

HiveJNIReader::~HiveJNIReader() = default;

TFileType::type HiveJNIReader::get_file_type() {
TFileType::type type;
if (_range.__isset.file_type) {
type = _range.file_type;
} else {
type = _params.file_type;
}
return type;
}

Status HiveJNIReader::init_fetch_table_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
std::ostringstream required_fields;
std::ostringstream columns_types;
std::vector<std::string> column_names;
int index = 0;
for (auto& desc : _file_slot_descs) {
std::string field = desc->col_name();
column_names.emplace_back(field);
std::string type = JniConnector::get_jni_type_v2(desc->type());
if (index == 0) {
required_fields << field;
columns_types << type;
} else {
required_fields << "," << field;
columns_types << "#" << type;
}
index++;
}

TFileType::type type = get_file_type();
std::map<String, String> required_params = {
{"uri", _range.path},
{"file_type", std::to_string(type)},
{"file_format", std::to_string(_params.format_type)},
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
{"split_start_offset", std::to_string(_range.start_offset)},
{"split_size", std::to_string(_range.size)}};
if (type == TFileType::FILE_S3) {
required_params.insert(_params.properties.begin(), _params.properties.end());
}
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/hive/HiveJNIScanner",
required_params, column_names);
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
return _jni_connector->open(_state, _profile);
}

Status HiveJNIReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
if (*eof) {
RETURN_IF_ERROR(_jni_connector->close());
}
return Status::OK();
}

Status HiveJNIReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& desc : _file_slot_descs) {
name_to_type->emplace(desc->col_name(), desc->type());
}
return Status::OK();
}

} // namespace doris::vectorized
84 changes: 84 additions & 0 deletions be/src/vec/exec/format/hive/hive_jni_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <rapidjson/document.h>
#include <stddef.h>

#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "common/status.h"
#include "exec/olap_common.h"
#include "vec/exec/format/jni_reader.h"

namespace doris {

class RuntimeProfile;

class RuntimeState;

class SlotDescriptor;

namespace vectoried {

class Block;

} // namespace vectoried
struct TypeDescriptor;
} // namespace doris

namespace doris::vectorized {

/**
* Read hive-format file: rcbinary, rctext, sequencefile
*/
class HiveJNIReader : public JniReader {
ENABLE_FACTORY_CREATOR(HiveJNIReader);

public:
/**
* Call java side by jni to get table data
*/
HiveJNIReader(RuntimeState* state, RuntimeProfile* profile, const TFileScanRangeParams& params,
const std::vector<SlotDescriptor*>& file_slot_descs, const TFileRangeDesc& range);

~HiveJNIReader() override;

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_fetch_table_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

TFileType::type get_file_type();

private:
const TFileScanRangeParams _params;
const TFileRangeDesc _range;
std::string _column_names;
std::string _column_types;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
};

} // namespace doris::vectorized
78 changes: 78 additions & 0 deletions be/src/vec/exec/jni_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,84 @@ std::string JniConnector::get_jni_type(const DataTypePtr& data_type) {
}
}

std::string JniConnector::get_jni_type_v2(const TypeDescriptor& desc) {
std::ostringstream buffer;
switch (desc.type) {
case TYPE_BOOLEAN:
return "boolean";
case TYPE_TINYINT:
return "tinyint";
case TYPE_SMALLINT:
return "smallint";
case TYPE_INT:
return "int";
case TYPE_BIGINT:
return "bigint";
case TYPE_LARGEINT:
return "largeint";
case TYPE_FLOAT:
return "float";
case TYPE_DOUBLE:
return "double";
case TYPE_VARCHAR: {
buffer << "varchar(" << desc.len << ")";
return buffer.str();
}
case TYPE_DATE:
[[fallthrough]];
case TYPE_DATEV2:
return "date";
case TYPE_DATETIME:
[[fallthrough]];
case TYPE_TIME:
[[fallthrough]];
case TYPE_DATETIMEV2:
[[fallthrough]];
case TYPE_TIMEV2:
return "timestamp";
case TYPE_BINARY:
return "binary";
case TYPE_CHAR: {
buffer << "char(" << desc.len << ")";
return buffer.str();
}
case TYPE_STRING:
return "string";
case TYPE_DECIMALV2:
[[fallthrough]];
case TYPE_DECIMAL32:
[[fallthrough]];
case TYPE_DECIMAL64:
[[fallthrough]];
case TYPE_DECIMAL128I: {
buffer << "decimal(" << desc.precision << "," << desc.scale << ")";
return buffer.str();
}
case TYPE_STRUCT: {
buffer << "struct<";
for (int i = 0; i < desc.children.size(); ++i) {
if (i != 0) {
buffer << ",";
}
buffer << desc.field_names[i] << ":" << get_jni_type(desc.children[i]);
}
buffer << ">";
return buffer.str();
}
case TYPE_ARRAY: {
buffer << "array<" << get_jni_type(desc.children[0]) << ">";
return buffer.str();
}
case TYPE_MAP: {
buffer << "map<" << get_jni_type(desc.children[0]) << "," << get_jni_type(desc.children[1])
<< ">";
return buffer.str();
}
default:
return "unsupported";
}
}

std::string JniConnector::get_jni_type(const TypeDescriptor& desc) {
std::ostringstream buffer;
switch (desc.type) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/jni_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ class JniConnector : public ProfileCollector {
/**
* Map PrimitiveType to hive type.
*/
static std::string get_jni_type_v2(const TypeDescriptor& desc);

static std::string get_jni_type(const TypeDescriptor& desc);

static Status to_java_table(Block* block, size_t num_rows, const ColumnNumbers& arguments,
Expand Down
10 changes: 10 additions & 0 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "vec/exec/format/arrow/arrow_stream_reader.h"
#include "vec/exec/format/avro/avro_jni_reader.h"
#include "vec/exec/format/csv/csv_reader.h"
#include "vec/exec/format/hive/hive_jni_reader.h"
#include "vec/exec/format/json/new_json_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
Expand Down Expand Up @@ -945,6 +946,15 @@ Status VFileScanner::_get_next_reader() {
->init_fetch_table_reader(_colname_to_value_range);
break;
}
case TFileFormatType::FORMAT_SEQUENCE:
case TFileFormatType::FORMAT_RCTEXT:
case TFileFormatType::FORMAT_RCBINARY: {
_cur_reader = HiveJNIReader::create_unique(_state, _profile, *_params, _file_slot_descs,
range);
init_status = ((HiveJNIReader*)(_cur_reader.get()))
->init_fetch_table_reader(_colname_to_value_range);
break;
}
case TFileFormatType::FORMAT_WAL: {
_cur_reader.reset(new WalReader(_state));
init_status = ((WalReader*)(_cur_reader.get()))->init_reader(_output_tuple_desc);
Expand Down
2 changes: 2 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("be-java-extensions/max-compute-scanner")
modules+=("be-java-extensions/avro-scanner")
modules+=("be-java-extensions/lakesoul-scanner")
modules+=("be-java-extensions/hive-scanner")
modules+=("be-java-extensions/preload-extensions")

# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES
Expand Down Expand Up @@ -819,6 +820,7 @@ EOF
extensions_modules+=("max-compute-scanner")
extensions_modules+=("avro-scanner")
extensions_modules+=("lakesoul-scanner")
extensions_modules+=("hive-scanner")
extensions_modules+=("preload-extensions")

if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
Expand Down
Loading

0 comments on commit 580f38b

Please sign in to comment.