Skip to content

Commit

Permalink
Flash-577, Flash-578 dag codec enhance (#299)
Browse files Browse the repository at this point in the history
* dag codec enhance

* format
  • Loading branch information
windtalker authored and zanmato1984 committed Oct 31, 2019
1 parent b3ee5fe commit e643a21
Show file tree
Hide file tree
Showing 15 changed files with 335 additions and 66 deletions.
13 changes: 12 additions & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TypeMapping.h>
Expand Down Expand Up @@ -159,7 +161,16 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
int i = 1;
for (auto & column : columns.getAllPhysical())
{
table_info.columns.emplace_back(reverseGetColumnInfo(column, i++, Field()));
Field default_value;
auto it = columns.defaults.find(column.name);
if (it != columns.defaults.end())
{
const auto * func = typeid_cast<const ASTFunction *>(it->second.expression.get());
const auto * value_ptr
= typeid_cast<const ASTLiteral *>(typeid_cast<const ASTExpressionList *>(func->arguments.get())->children[0].get());
default_value = value_ptr->value;
}
table_info.columns.emplace_back(reverseGetColumnInfo(column, i++, default_value));
if (handle_pk_name == column.name)
{
if (!column.type->isInteger() && !column.type->isUnsignedInteger())
Expand Down
29 changes: 26 additions & 3 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,29 @@ void compileFilter(const DAGSchema & input, ASTPtr ast, tipb::Selection * filter
compileExpr(input, ast, cond, referred_columns, col_ref_map);
}

// The rule is
// 1. datetime with fsp = 5 => timestamp
// 2. Int64 with default value = 1024 => time
// 3. UInt64 with default value in [1, 64] => bit(default_value)
void hijackTiDBTypeForMockTest(ColumnInfo & ci)
{
if (ci.tp == TiDB::TypeLongLong && !ci.origin_default_value.isEmpty())
{
auto default_value = ci.origin_default_value.convert<Int64>();
if (default_value == 1024 && !ci.hasUnsignedFlag())
{
ci.tp = TiDB::TypeTime;
}
if (default_value >= 1 && default_value <= 64 && ci.hasUnsignedFlag())
{
ci.tp = TiDB::TypeBit;
ci.flen = default_value;
}
}
if (ci.tp == TiDB::TypeDatetime && ci.decimal == 5)
ci.tp = TiDB::TypeTimestamp;
}

std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name, const String & encode_type)
{
Expand Down Expand Up @@ -360,9 +383,9 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context,
ci.flen = column_info.flen;
ci.decimal = column_info.decimal;
ci.elems = column_info.elems;
// a hack to test timestamp type in mock test
if (column_info.tp == TiDB::TypeDatetime && ci.decimal == 5)
ci.tp = TiDB::TypeTimestamp;
ci.default_value = column_info.default_value;
ci.origin_default_value = column_info.origin_default_value;
hijackTiDBTypeForMockTest(ci);
ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci)));
}
executor_ctx_map.emplace(
Expand Down
156 changes: 127 additions & 29 deletions dbms/src/Flash/Coprocessor/ArrowColCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataTypes/DataTypeDecimal.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeMyDate.h>
#include <DataTypes/DataTypeMyDateTime.h>
#include <DataTypes/DataTypeNullable.h>
Expand All @@ -21,6 +22,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_EXCEPTION;
extern const int NOT_IMPLEMENTED;
} // namespace ErrorCodes

const IColumn * getNestedCol(const IColumn * flash_col)
Expand Down Expand Up @@ -90,9 +92,7 @@ void flashDecimalColToArrowCol(
|| flashDecimalColToArrowColInternal<Decimal128, is_nullable>(dag_column, flash_col_untyped, start_index, end_index, data_type)
|| flashDecimalColToArrowColInternal<Decimal256, is_nullable>(
dag_column, flash_col_untyped, start_index, end_index, data_type)))
throw Exception("Error while trying to convert flash col to DAG col, "
"column name "
+ flash_col_untyped->getName(),
throw Exception("Error while trying to convert flash col to DAG col, column name " + flash_col_untyped->getName(),
ErrorCodes::UNKNOWN_EXCEPTION);
}

Expand Down Expand Up @@ -143,10 +143,8 @@ void flashDoubleColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col
}
return;
}
throw Exception("Error while trying to convert flash col to DAG col, "
"column name "
+ flash_col_untyped->getName(),
ErrorCodes::UNKNOWN_EXCEPTION);
throw Exception(
"Error while trying to convert flash col to DAG col, column name " + flash_col_untyped->getName(), ErrorCodes::UNKNOWN_EXCEPTION);
}

template <bool is_nullable>
Expand All @@ -160,9 +158,7 @@ void flashIntegerColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_co
|| flashIntegerColToArrowColInternal<Int16, is_nullable>(dag_column, flash_col_untyped, start_index, end_index)
|| flashIntegerColToArrowColInternal<Int32, is_nullable>(dag_column, flash_col_untyped, start_index, end_index)
|| flashIntegerColToArrowColInternal<Int64, is_nullable>(dag_column, flash_col_untyped, start_index, end_index)))
throw Exception("Error while trying to convert flash col to DAG col, "
"column name "
+ flash_col_untyped->getName(),
throw Exception("Error while trying to convert flash col to DAG col, column name " + flash_col_untyped->getName(),
ErrorCodes::UNKNOWN_EXCEPTION);
}

Expand Down Expand Up @@ -210,6 +206,53 @@ void flashStringColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col
}
}

template <bool is_nullable>
void flashBitColToArrowCol(
TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index, const tipb::FieldType & field_type)
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
auto * flash_col = checkAndGetColumn<ColumnVector<UInt64>>(nested_col);
for (size_t i = start_index; i < end_index; i++)
{
if constexpr (is_nullable)
{
if (flash_col_untyped->isNullAt(i))
{
dag_column.appendNull();
continue;
}
}
TiDBBit bit(flash_col->getElement(i), field_type.flen() < 0 ? -1 : (field_type.flen() + 7u) >> 3u);
dag_column.append(bit);
}
}

template <bool is_nullable>
void flashEnumColToArrowCol(
TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index, const IDataType * data_type)
{
const IColumn * nested_col = getNestedCol(flash_col_untyped);
auto * flash_col = checkAndGetColumn<ColumnVector<DataTypeEnum16::FieldType>>(nested_col);
const auto * enum_type = checkAndGetDataType<DataTypeEnum16>(data_type);
size_t enum_value_size = enum_type->getValues().size();
for (size_t i = start_index; i < end_index; i++)
{
if constexpr (is_nullable)
{
if (flash_col_untyped->isNullAt(i))
{
dag_column.appendNull();
continue;
}
}
auto enum_value = (UInt64)flash_col->getElement(i);
if (enum_value == 0 || enum_value > enum_value_size)
throw Exception("number of enum overflow enum boundary", ErrorCodes::LOGICAL_ERROR);
TiDBEnum ti_enum(enum_value, enum_type->getNameForValue((const DataTypeEnum16::FieldType)enum_value));
dag_column.append(ti_enum);
}
}

void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & flash_col, const tipb::FieldType & field_type,
size_t start_index, size_t end_index)
{
Expand All @@ -232,10 +275,10 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
case TiDB::TypeLong:
case TiDB::TypeLongLong:
case TiDB::TypeYear:
case TiDB::TypeTime:
if (!type->isInteger())
throw Exception("Type un-matched during arrow encode, target col type is integer and source column"
" type is "
+ type->getName(),
throw Exception(
"Type un-matched during arrow encode, target col type is integer and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (type->isUnsignedInteger() != tidb_column_info.hasUnsignedFlag())
throw Exception("Flash column and TiDB column has different unsigned flag", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -246,9 +289,8 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
break;
case TiDB::TypeFloat:
if (!checkDataType<DataTypeFloat32>(type))
throw Exception("Type un-matched during arrow encode, target col type is float32 and source column"
" type is "
+ type->getName(),
throw Exception(
"Type un-matched during arrow encode, target col type is float32 and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (tidb_column_info.hasNotNullFlag())
flashDoubleColToArrowCol<Float32, false>(dag_column, col, start_index, end_index);
Expand All @@ -257,9 +299,8 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
break;
case TiDB::TypeDouble:
if (!checkDataType<DataTypeFloat64>(type))
throw Exception("Type un-matched during arrow encode, target col type is float64 and source column"
" type is "
+ type->getName(),
throw Exception(
"Type un-matched during arrow encode, target col type is float64 and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (tidb_column_info.hasNotNullFlag())
flashDoubleColToArrowCol<Float64, false>(dag_column, col, start_index, end_index);
Expand All @@ -270,9 +311,8 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
case TiDB::TypeDatetime:
case TiDB::TypeTimestamp:
if (!type->isDateOrDateTime())
throw Exception("Type un-matched during arrow encode, target col type is datetime and source column"
" type is "
+ type->getName(),
throw Exception(
"Type un-matched during arrow encode, target col type is datetime and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (tidb_column_info.hasNotNullFlag())
flashDateOrDateTimeColToArrowCol<false>(dag_column, col, start_index, end_index, field_type);
Expand All @@ -281,9 +321,8 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
break;
case TiDB::TypeNewDecimal:
if (!type->isDecimal())
throw Exception("Type un-matched during arrow encode, target col type is datetime and source column"
" type is "
+ type->getName(),
throw Exception(
"Type un-matched during arrow encode, target col type is datetime and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (tidb_column_info.hasNotNullFlag())
flashDecimalColToArrowCol<false>(dag_column, col, start_index, end_index, type);
Expand All @@ -298,15 +337,32 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
case TiDB::TypeMediumBlob:
case TiDB::TypeTinyBlob:
if (!checkDataType<DataTypeString>(type))
throw Exception("Type un-matched during arrow encode, target col type is string and source column"
" type is "
+ type->getName(),
throw Exception(
"Type un-matched during arrow encode, target col type is string and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (tidb_column_info.hasNotNullFlag())
flashStringColToArrowCol<false>(dag_column, col, start_index, end_index);
else
flashStringColToArrowCol<true>(dag_column, col, start_index, end_index);
break;
case TiDB::TypeBit:
if (!checkDataType<DataTypeUInt64>(type))
throw Exception("Type un-matched during arrow encode, target col type is bit and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (tidb_column_info.hasNotNullFlag())
flashBitColToArrowCol<false>(dag_column, col, start_index, end_index, field_type);
else
flashBitColToArrowCol<true>(dag_column, col, start_index, end_index, field_type);
break;
case TiDB::TypeEnum:
if (!checkDataType<DataTypeEnum16>(type))
throw Exception("Type un-matched during arrow encode, target col type is bit and source column type is " + type->getName(),
ErrorCodes::LOGICAL_ERROR);
if (tidb_column_info.hasNotNullFlag())
flashEnumColToArrowCol<false>(dag_column, col, start_index, end_index, type);
else
flashEnumColToArrowCol<true>(dag_column, col, start_index, end_index, type);
break;
default:
throw Exception("Unsupported field type " + field_type.DebugString() + " when try to convert flash col to DAG col",
ErrorCodes::NOT_IMPLEMENTED);
Expand Down Expand Up @@ -341,6 +397,39 @@ const char * arrowStringColToFlashCol(const char * pos, UInt8, UInt32 null_count
return pos + offsets[length];
}

const char * arrowEnumColToFlashCol(const char * pos, UInt8, UInt32 null_count, const std::vector<UInt8> & null_bitmap,
const std::vector<UInt64> & offsets, const ColumnWithTypeAndName & col, const ColumnInfo &, UInt32 length)
{
for (UInt32 i = 0; i < length; i++)
{
if (checkNull(i, null_count, null_bitmap, col))
continue;
auto enum_value = (const Int64)toLittleEndian(*(reinterpret_cast<const UInt32 *>(pos + offsets[i])));
col.column->assumeMutable()->insert(Field(enum_value));
}
return pos + offsets[length];
}

const char * arrowBitColToFlashCol(const char * pos, UInt8, UInt32 null_count, const std::vector<UInt8> & null_bitmap,
const std::vector<UInt64> & offsets, const ColumnWithTypeAndName & col, const ColumnInfo &, UInt32 length)
{
for (UInt32 i = 0; i < length; i++)
{
if (checkNull(i, null_count, null_bitmap, col))
continue;
const String value = String(pos + offsets[i], pos + offsets[i + 1]);
if (value.length() == 0)
col.column->assumeMutable()->insert(Field(UInt64(0)));
UInt64 result = 0;
for (auto & c : value)
{
result = (result << 8u) | (UInt8)c;
}
col.column->assumeMutable()->insert(Field(result));
}
return pos + offsets[length];
}

template <typename T>
T toCHDecimal(UInt8 digits_int, UInt8 digits_frac, bool negative, const Int32 * word_buf)
{
Expand Down Expand Up @@ -493,6 +582,10 @@ const char * arrowNumColToFlashCol(const char * pos, UInt8 field_length, UInt32
col.column->assumeMutable()->insert(Field(i64));
}
break;
case TiDB::TypeTime:
i64 = toLittleEndian(*(reinterpret_cast<const Int64 *>(pos)));
col.column->assumeMutable()->insert(Field(i64));
break;
case TiDB::TypeFloat:
u32 = toLittleEndian(*(reinterpret_cast<const UInt32 *>(pos)));
std::memcpy(&f32, &u32, sizeof(Float32));
Expand Down Expand Up @@ -523,6 +616,7 @@ const char * arrowColToFlashCol(const char * pos, UInt8 field_length, UInt32 nul
case TiDB::TypeYear:
case TiDB::TypeFloat:
case TiDB::TypeDouble:
case TiDB::TypeTime:
return arrowNumColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length);
case TiDB::TypeDatetime:
case TiDB::TypeDate:
Expand All @@ -538,8 +632,12 @@ const char * arrowColToFlashCol(const char * pos, UInt8 field_length, UInt32 nul
case TiDB::TypeMediumBlob:
case TiDB::TypeLongBlob:
return arrowStringColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length);
case TiDB::TypeBit:
return arrowBitColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length);
case TiDB::TypeEnum:
return arrowEnumColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length);
default:
throw Exception("Not supported yet: field tp = " + std::to_string(col_info.tp));
throw Exception("Not supported yet: field tp = " + std::to_string(col_info.tp), ErrorCodes::NOT_IMPLEMENTED);
}
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType,
std::vector<tipb::FieldType> && result_field_types_, Block header_)
std::vector<tipb::FieldType> && result_field_types_, Block && header_)
: dag_response(dag_response_),
result_field_types(result_field_types_),
result_field_types(std::move(result_field_types_)),
header(std::move(header_)),
records_per_chunk(records_per_chunk_),
current_records_num(0)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <DataTypes/IDataType.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <common/logger_useful.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <tipb/select.pb.h>
Expand All @@ -20,7 +21,7 @@ class DAGBlockOutputStream : public IBlockOutputStream
{
public:
DAGBlockOutputStream(tipb::SelectResponse & response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
std::vector<tipb::FieldType> && result_field_types, Block header_);
std::vector<tipb::FieldType> && result_field_types, Block && header_);

Block getHeader() const override { return header; }
void write(const Block & block) override;
Expand Down
Loading

0 comments on commit e643a21

Please sign in to comment.