Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](serde) support presto compatible output format (#37039) #37253

Merged
merged 3 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/service/point_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,12 @@ Status PointQueryExecutor::_output_data() {
if (_binary_row_format) {
vectorized::VMysqlResultWriter<true> mysql_writer(nullptr, _reusable->output_exprs(),
nullptr);
RETURN_IF_ERROR(mysql_writer.init(_reusable->runtime_state()));
RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response));
} else {
vectorized::VMysqlResultWriter<false> mysql_writer(nullptr, _reusable->output_exprs(),
nullptr);
RETURN_IF_ERROR(mysql_writer.init(_reusable->runtime_state()));
RETURN_IF_ERROR(_serialize_block(mysql_writer, *_result_block, _response));
}
VLOG_DEBUG << "dump block " << _result_block->dump_data();
Expand Down
2 changes: 2 additions & 0 deletions be/src/service/point_query_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class Reusable {

int64_t mem_size() const;

RuntimeState* runtime_state() { return _runtime_state.get(); }

private:
// caching TupleDescriptor, output_expr, etc...
std::unique_ptr<RuntimeState> _runtime_state;
Expand Down
28 changes: 16 additions & 12 deletions be/src/vec/data_types/serde/data_type_array_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ void DataTypeArraySerDe::read_column_from_arrow(IColumn& column, const arrow::Ar
template <bool is_binary_format>
Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
int row_idx_of_mysql, bool col_const) const {
int row_idx_of_mysql, bool col_const,
const FormatOptions& options) const {
auto& column_array = assert_cast<const ColumnArray&>(column);
auto& offsets = column_array.get_offsets();
auto& data = column_array.get_data();
Expand All @@ -332,21 +333,22 @@ Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,
}
}
if (data.is_null_at(j)) {
if (0 != result.push_string(NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()))) {
if (0 != result.push_string(options.null_format, options.null_len)) {
return Status::InternalError("pack mysql buffer failed.");
}
} else {
if (is_nested_string) {
if (0 != result.push_string("\"", 1)) {
if (is_nested_string && options.wrapper_len > 0) {
if (0 != result.push_string(options.nested_string_wrapper, options.wrapper_len)) {
return Status::InternalError("pack mysql buffer failed.");
}
RETURN_IF_ERROR(nested_serde->write_column_to_mysql(data, result, j, false));
if (0 != result.push_string("\"", 1)) {
RETURN_IF_ERROR(
nested_serde->write_column_to_mysql(data, result, j, false, options));
if (0 != result.push_string(options.nested_string_wrapper, options.wrapper_len)) {
return Status::InternalError("pack mysql buffer failed.");
}
} else {
RETURN_IF_ERROR(nested_serde->write_column_to_mysql(data, result, j, false));
RETURN_IF_ERROR(
nested_serde->write_column_to_mysql(data, result, j, false, options));
}
}
}
Expand All @@ -359,14 +361,16 @@ Status DataTypeArraySerDe::_write_column_to_mysql(const IColumn& column,

Status DataTypeArraySerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<true>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeArraySerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<false>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeArraySerDe::write_column_to_orc(const std::string& timezone, const IColumn& column,
Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/data_types/serde/data_type_array_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,11 @@ class DataTypeArraySerDe : public DataTypeSerDe {
int end, const cctz::time_zone& ctz) const override;

Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;

Status write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
Expand All @@ -102,7 +104,7 @@ class DataTypeArraySerDe : public DataTypeSerDe {
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const;
int row_idx, bool col_const, const FormatOptions& options) const;

DataTypeSerDeSPtr nested_serde;
};
Expand Down
13 changes: 8 additions & 5 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbV
template <bool is_binary_format>
Status DataTypeBitMapSerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const {
int row_idx, bool col_const,
const FormatOptions& options) const {
auto& data_column = assert_cast<const ColumnBitmap&>(column);
if (_return_object_as_string) {
const auto col_index = index_check_const(row_idx, col_const);
Expand All @@ -126,14 +127,16 @@ Status DataTypeBitMapSerDe::_write_column_to_mysql(const IColumn& column,

Status DataTypeBitMapSerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<true>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeBitMapSerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<false>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, const IColumn& column,
Expand Down
8 changes: 5 additions & 3 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {
}

Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;

Status write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
Expand All @@ -88,7 +90,7 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {
// Bitmap is binary data which is not shown by mysql.
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const;
int row_idx, bool col_const, const FormatOptions& options) const;
};
} // namespace vectorized
} // namespace doris
21 changes: 12 additions & 9 deletions be/src/vec/data_types/serde/data_type_date64_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,23 +242,24 @@ void DataTypeDate64SerDe::read_column_from_arrow(IColumn& column, const arrow::A
template <bool is_binary_format>
Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const {
int row_idx, bool col_const,
const FormatOptions& options) const {
auto& data = assert_cast<const ColumnVector<Int64>&>(column).get_data();
const auto col_index = index_check_const(row_idx, col_const);
auto time_num = data[col_index];
VecDateTimeValue time_val = binary_cast<Int64, VecDateTimeValue>(time_num);
// _nesting_level >= 2 means this datetimev2 is in complex type
// and we should add double quotes
if (_nesting_level >= 2) {
if (UNLIKELY(0 != result.push_string("\"", 1))) {
if (_nesting_level >= 2 && options.wrapper_len > 0) {
if (UNLIKELY(0 != result.push_string(options.nested_string_wrapper, options.wrapper_len))) {
return Status::InternalError("pack mysql buffer failed.");
}
}
if (UNLIKELY(0 != result.push_vec_datetime(time_val))) {
return Status::InternalError("pack mysql buffer failed.");
}
if (_nesting_level >= 2) {
if (UNLIKELY(0 != result.push_string("\"", 1))) {
if (_nesting_level >= 2 && options.wrapper_len > 0) {
if (UNLIKELY(0 != result.push_string(options.nested_string_wrapper, options.wrapper_len))) {
return Status::InternalError("pack mysql buffer failed.");
}
}
Expand All @@ -267,14 +268,16 @@ Status DataTypeDate64SerDe::_write_column_to_mysql(const IColumn& column,

Status DataTypeDate64SerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<true>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeDate64SerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<false>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, const IColumn& column,
Expand Down
10 changes: 6 additions & 4 deletions be/src/vec/data_types/serde/data_type_date64_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> {
void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
int end, const cctz::time_zone& ctz) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;

Status write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
Expand All @@ -74,7 +76,7 @@ class DataTypeDate64SerDe : public DataTypeNumberSerDe<Int64> {
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const;
int row_idx, bool col_const, const FormatOptions& options) const;
};

class DataTypeDateTimeSerDe : public DataTypeDate64SerDe {
Expand All @@ -94,4 +96,4 @@ class DataTypeDateTimeSerDe : public DataTypeDate64SerDe {
const FormatOptions& options) const override;
};
} // namespace vectorized
} // namespace doris
} // namespace doris
21 changes: 12 additions & 9 deletions be/src/vec/data_types/serde/data_type_datetimev2_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,24 @@ void DataTypeDateTimeV2SerDe::read_column_from_arrow(IColumn& column,
template <bool is_binary_format>
Status DataTypeDateTimeV2SerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const {
int row_idx, bool col_const,
const FormatOptions& options) const {
const auto& data = assert_cast<const ColumnVector<UInt64>&>(column).get_data();
const auto col_index = index_check_const(row_idx, col_const);
DateV2Value<DateTimeV2ValueType> date_val =
binary_cast<UInt64, DateV2Value<DateTimeV2ValueType>>(data[col_index]);
// _nesting_level >= 2 means this datetimev2 is in complex type
// and we should add double quotes
if (_nesting_level >= 2) {
if (UNLIKELY(0 != result.push_string("\"", 1))) {
if (_nesting_level >= 2 && options.wrapper_len > 0) {
if (UNLIKELY(0 != result.push_string(options.nested_string_wrapper, options.wrapper_len))) {
return Status::InternalError("pack mysql buffer failed.");
}
}
if (UNLIKELY(0 != result.push_vec_datetime(date_val, scale))) {
return Status::InternalError("pack mysql buffer failed.");
}
if (_nesting_level >= 2) {
if (UNLIKELY(0 != result.push_string("\"", 1))) {
if (_nesting_level >= 2 && options.wrapper_len > 0) {
if (UNLIKELY(0 != result.push_string(options.nested_string_wrapper, options.wrapper_len))) {
return Status::InternalError("pack mysql buffer failed.");
}
}
Expand All @@ -199,14 +200,16 @@ Status DataTypeDateTimeV2SerDe::_write_column_to_mysql(const IColumn& column,

Status DataTypeDateTimeV2SerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<true>& row_buffer, int row_idx,
bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeDateTimeV2SerDe::write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const);
int row_idx, bool col_const,
const FormatOptions& options) const {
return _write_column_to_mysql(column, row_buffer, row_idx, col_const, options);
}

Status DataTypeDateTimeV2SerDe::write_column_to_orc(const std::string& timezone,
Expand Down
10 changes: 6 additions & 4 deletions be/src/vec/data_types/serde/data_type_datetimev2_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<UInt64> {
int end, const cctz::time_zone& ctz) const override;

Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<true>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;
Status write_column_to_mysql(const IColumn& column, MysqlRowBuffer<false>& row_buffer,
int row_idx, bool col_const) const override;
int row_idx, bool col_const,
const FormatOptions& options) const override;

Status write_column_to_orc(const std::string& timezone, const IColumn& column,
const NullMap* null_map, orc::ColumnVectorBatch* orc_col_batch,
Expand All @@ -78,8 +80,8 @@ class DataTypeDateTimeV2SerDe : public DataTypeNumberSerDe<UInt64> {
private:
template <bool is_binary_format>
Status _write_column_to_mysql(const IColumn& column, MysqlRowBuffer<is_binary_format>& result,
int row_idx, bool col_const) const;
int row_idx, bool col_const, const FormatOptions& options) const;
int scale;
};
} // namespace vectorized
} // namespace doris
} // namespace doris
Loading
Loading