Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
format

format

fix

fix

add test

test

test

fix

fix

lower case

fix

format

fix
  • Loading branch information
wuwenchi committed Nov 19, 2024
1 parent cd94cf1 commit f8a398d
Show file tree
Hide file tree
Showing 24 changed files with 341 additions and 98 deletions.
34 changes: 23 additions & 11 deletions be/src/io/fs/file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,58 +25,70 @@ namespace io {

Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(create_file_impl(path, writer, opts));
}

Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions* opts) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(open_file_impl(path, reader, opts));
}

Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
}

Status FileSystem::delete_file(const Path& file) {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(delete_file_impl(path));
}

Status FileSystem::delete_directory(const Path& dir) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(delete_directory_impl(path));
}

Status FileSystem::batch_delete(const std::vector<Path>& files) {
std::vector<Path> abs_files;
for (auto& file : files) {
abs_files.push_back(absolute_path(file));
Path abs_file;
RETURN_IF_ERROR(absolute_path(file, abs_file));
abs_files.push_back(abs_file);
}
FILESYSTEM_M(batch_delete_impl(abs_files));
}

Status FileSystem::exists(const Path& path, bool* res) const {
auto fs_path = absolute_path(path);
Path fs_path;
RETURN_IF_ERROR(absolute_path(path, fs_path));
FILESYSTEM_M(exists_impl(fs_path, res));
}

Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
auto path = absolute_path(file);
Path path;
RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(file_size_impl(path, file_size));
}

Status FileSystem::list(const Path& dir, bool only_file, std::vector<FileInfo>* files,
bool* exists) {
auto path = absolute_path(dir);
Path path;
RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(list_impl(path, only_file, files, exists));
}

Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
auto orig_path = absolute_path(orig_name);
auto new_path = absolute_path(new_name);
Path orig_path;
RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
Path new_path;
RETURN_IF_ERROR(absolute_path(new_name, new_path));
FILESYSTEM_M(rename_impl(orig_path, new_path));
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/io/fs/file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class FileSystem {

// FIMXE(plat1ko): The implementation and semantics of this function are not completely
// consistent, which is confused.
virtual Path absolute_path(const Path& path) const = 0;
virtual Status absolute_path(const Path& path, Path& abs_path) const = 0;

FileSystem(std::string id, FileSystemType type) : _id(std::move(id)), _type(type) {}

Expand Down
42 changes: 42 additions & 0 deletions be/src/io/fs/local_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,46 @@ Status LocalFileSystem::permission_impl(const Path& file, std::filesystem::perms
return Status::OK();
}

Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path& abs_path) {
// valid path include:
// 1. abc/def while return abc/def
// 2. /abc/def while return /abc/def
// 3. file:/abc/def while return /abc/def
// 4. file://<authority>/abc/def while return /abc/def
std::string path_str = input_path_str;
size_t slash = path_str.find('/');
if (slash == 0) {
abs_path = input_path_str;
return Status::OK();
}

// Initialize scheme and authority
std::string scheme;
size_t start = 0;

// Parse URI scheme
size_t colon = path_str.find(':');
if (colon != std::string::npos && (slash == std::string::npos || colon < slash)) {
// Has a scheme
scheme = path_str.substr(0, colon);
if (scheme != "file") {
return Status::InternalError(
"Only supports `file` type scheme, like 'file:///path', 'file:/path'.");
}
start = colon + 1;
}

// Parse URI authority, if any
if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start > 2) {
// Has authority
size_t next_slash = path_str.find('/', start + 2);
size_t auth_end = (next_slash != std::string::npos) ? next_slash : path_str.length();
start = auth_end;
}

// URI path is the rest of the string
abs_path = path_str.substr(start);
return Status::OK();
}

} // namespace doris::io
6 changes: 5 additions & 1 deletion be/src/io/fs/local_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class LocalFileSystem final : public FileSystem {
public:
~LocalFileSystem() override;

static Status convert_to_abs_path(const Path& path, Path& abs_path);

/// hard link dest file to src file
Status link_file(const Path& src, const Path& dest);

Expand Down Expand Up @@ -104,7 +106,9 @@ class LocalFileSystem final : public FileSystem {

// `LocalFileSystem` always use absolute path as arguments
// FIXME(plat1ko): Eliminate this method
Path absolute_path(const Path& path) const override { return path; }
Status absolute_path(const Path& path, Path& abs_path) const override {
return convert_to_abs_path(path, abs_path);
}

friend const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
};
Expand Down
10 changes: 7 additions & 3 deletions be/src/io/fs/remote_file_system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,25 @@
namespace doris::io {

Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file) {
auto dest_path = absolute_path(dest_file);
Path dest_path;
RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
FILESYSTEM_M(upload_impl(local_file, dest_path));
}

Status RemoteFileSystem::batch_upload(const std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
std::vector<Path> remote_paths;
for (auto& path : remote_files) {
remote_paths.push_back(absolute_path(path));
Path abs_path;
RETURN_IF_ERROR(absolute_path(path, abs_path));
remote_paths.push_back(abs_path);
}
FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
}

Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
auto remote_path = absolute_path(remote_file);
Path remote_path;
RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
FILESYSTEM_M(download_impl(remote_path, local));
}

Expand Down
8 changes: 5 additions & 3 deletions be/src/io/fs/remote_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ class RemoteFileSystem : public FileSystem {
virtual Status open_file_internal(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions& opts) = 0;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.is_absolute()) {
return path;
abs_path = path;
} else {
abs_path = _root_path / path;
}
return _root_path / path;
return Status::OK();
}

Path _root_path;
Expand Down
7 changes: 4 additions & 3 deletions be/src/io/fs/s3_file_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,17 @@ class S3FileSystem final : public RemoteFileSystem {
const std::vector<Path>& remote_files) override;
Status download_impl(const Path& remote_file, const Path& local_file) override;

Path absolute_path(const Path& path) const override {
Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.string().find("://") != std::string::npos) {
// the path is with schema, which means this is a full path like:
// s3://bucket/path/to/file.txt
// so no need to concat with prefix
return path;
abs_path = path;
} else {
// path with no schema
return _root_path / path;
abs_path = _root_path / path;
}
return Status::OK();
}

private:
Expand Down
17 changes: 17 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::InvalidArgument("Duplicated field name: {}", _fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
if (_fields[i].field_id != -1) {
_field_id_name_mapping.emplace(_fields[i].field_id, _fields[i].name);
}
}

if (_next_schema_pos != t_schemas.size()) {
Expand All @@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElem
return Status::OK();
}

const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id) const {
auto const it = _field_id_name_mapping.find(id);
if (it == _field_id_name_mapping.end()) {
return {};
}
return {it->second.data()};
}

Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema* node_field) {
if (curr_pos >= t_schemas.size()) {
Expand All @@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
node_field->type.add_sub_type(child->type);
node_field->is_nullable = false;
_next_schema_pos = curr_pos + 1;
node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id : -1;
} else {
bool is_optional = is_optional_node(t_schema);
if (is_optional) {
Expand All @@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
physical_field->field_id = physical_schema.__isset.field_id ? physical_schema.field_id : -1;
}

std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
Expand Down Expand Up @@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElem
group_field->type.type = TYPE_ARRAY;
group_field->type.add_sub_type(struct_field->type);
group_field->is_nullable = false;
group_field->field_id = group_schema.__isset.field_id ? group_schema.field_id : -1;
} else {
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
}
Expand Down Expand Up @@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaEleme
list_field->type.type = TYPE_ARRAY;
list_field->type.add_sub_type(list_field->children[0].type);
list_field->is_nullable = is_optional;
list_field->field_id = first_level.__isset.field_id ? first_level.field_id : -1;

return Status::OK();
}
Expand Down Expand Up @@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElemen
map_field->type.add_sub_type(map_kv_field->type.children[0]);
map_field->type.add_sub_type(map_kv_field->type.children[1]);
map_field->is_nullable = is_optional;
map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id : -1;

return Status::OK();
}
Expand All @@ -619,6 +635,7 @@ Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaEle
struct_field->name = to_lower(struct_schema.name);
struct_field->is_nullable = is_optional;
struct_field->type.type = TYPE_STRUCT;
struct_field->field_id = struct_schema.__isset.field_id ? struct_schema.field_id : -1;
for (int i = 0; i < num_children; ++i) {
struct_field->type.add_sub_type(struct_field->children[i].type,
struct_field->children[i].name);
Expand Down
8 changes: 8 additions & 0 deletions be/src/vec/exec/format/parquet/schema_desc.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "common/status.h"
#include "runtime/types.h"
#include "util/slice.h"

namespace doris::vectorized {

Expand Down Expand Up @@ -56,6 +57,8 @@ struct FieldSchema {
~FieldSchema() = default;
FieldSchema(const FieldSchema& fieldSchema) = default;
std::string debug_string() const;

int32_t field_id;
};

class FieldDescriptor {
Expand All @@ -68,6 +71,7 @@ class FieldDescriptor {
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be parsed
size_t _next_schema_pos;
std::unordered_map<int, std::string> _field_id_name_mapping;

void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable,
FieldSchema* physical_field);
Expand Down Expand Up @@ -128,6 +132,10 @@ class FieldDescriptor {
std::string debug_string() const;

int32_t size() const { return _fields.size(); }

bool has_parquet_field_id() const { return _field_id_name_mapping.size() > 0; }

const doris::Slice get_column_name_from_field_id(int32_t id) const;
};

} // namespace doris::vectorized
6 changes: 2 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,8 @@ Status ParquetReader::_open_file() {
return Status::OK();
}

// Get iceberg col id to col name map stored in parquet metadata key values.
// This is for iceberg schema evolution.
std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
return _t_metadata->key_value_metadata;
const FieldDescriptor ParquetReader::get_file_metadata_schema() {
return _file_metadata->schema();
}

Status ParquetReader::open() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class ParquetReader : public GenericReader {
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;

std::vector<tparquet::KeyValue> get_metadata_key_values();
const FieldDescriptor get_file_metadata_schema();
void set_table_to_file_col_map(std::unordered_map<std::string, std::string>& map) {
_table_col_to_file_col = map;
}
Expand Down
Loading

0 comments on commit f8a398d

Please sign in to comment.