Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flash-456] alter for nullable attribute #207

Merged
merged 4 commits into from
Aug 29, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Columns/ColumnNullable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ void ColumnNullable::applyNullMap(const ColumnNullable & other)
void ColumnNullable::checkConsistency() const
{
if (null_map->size() != getNestedColumn().size())
throw Exception("Logical error: Sizes of nested column and null map of Nullable column are not equal",
throw Exception("Logical error: Sizes of nested column and null map of Nullable column are not equal: null size is : " + std::to_string(null_map->size()) + " column size is : "+ std::to_string(getNestedColumn().size()),
ErrorCodes::SIZES_OF_NESTED_COLUMNS_ARE_INCONSISTENT);
}

Expand Down
17 changes: 17 additions & 0 deletions dbms/src/DataTypes/DataTypeNumberBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,23 @@ void DataTypeNumberBase<T>::deserializeBinary(IColumn & column, ReadBuffer & ist
template <typename T>
void DataTypeNumberBase<T>::serializeBinaryBulk(const IColumn & column, WriteBuffer & ostr, size_t offset, size_t limit) const
{
if constexpr (std::is_same_v<T, UInt8>)
{
// This is for tmt to write null vector.
const ColumnConst * x = typeid_cast<const ColumnConst *>(&column);
if (x != nullptr)
{
size_t size = x->size();

if (limit == 0 || offset + limit > size)
limit = size - offset;

for(size_t i = 0; i < limit; i++)
writeIntBinary<UInt8>(UInt8(0), ostr);

return;
}
}
const typename ColumnVector<T>::Container & x = typeid_cast<const ColumnVector<T> &>(column).getData();

size_t size = x.size();
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,11 @@ void MockTiDB::modifyColumnInTable(const String & database_name, const String &
ColumnInfo column_info = getColumnInfoFromColumn(column, 0, Field());
if (it->hasUnsignedFlag() != column_info.hasUnsignedFlag())
throw Exception("Modify column " + column.name + " UNSIGNED flag is not allowed", ErrorCodes::LOGICAL_ERROR);
if (it->hasNotNullFlag() != column_info.hasNotNullFlag())
throw Exception("Modify column " + column.name + " NOT NULL flag is not allowed", ErrorCodes::LOGICAL_ERROR);
if (it->tp == column_info.tp)
if (it->tp == column_info.tp && it->hasNotNullFlag() == column_info.hasNotNullFlag())
throw Exception("Column " + column.name + " type not changed", ErrorCodes::LOGICAL_ERROR);

it->tp = column_info.tp;
it->flag = column_info.flag;

version++;
SchemaDiff diff;
Expand Down
42 changes: 33 additions & 9 deletions dbms/src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -948,12 +948,12 @@ void MergeTreeData::checkAlter(const AlterCommands & commands)
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;

createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(), unused_expression, unused_map, unused_bool);
DataPart::Checksums checksums;
createConvertExpression(nullptr, checksums, getColumns().getAllPhysical(), new_columns.getAllPhysical(), unused_expression, unused_map, unused_bool);
}

void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
void MergeTreeData::createConvertExpression(const DataPartPtr & part, DataPart::Checksums & checksums, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata)
{
out_expression = nullptr;
out_rename_map = {};
Expand Down Expand Up @@ -1006,8 +1006,32 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name

if (!new_type->equals(*old_type) && (!part || part->hasColumnFiles(column.name)))
{
// TODO: Asserting TXN table never needs data conversion might be arbitary.
if (isMetadataOnlyConversion(old_type, new_type) || merging_params.mode == MergingParams::Txn)
if (merging_params.mode == MergingParams::Txn)
hanfei1991 marked this conversation as resolved.
Show resolved Hide resolved
{
// Any type conversion for TMT is ignored, except adding nullable property.
// And for adding null map ONLY, i.e. not touching the data file, we do null map writing here in place.
if (part && !old_type->isNullable() && new_type->isNullable())
{
auto null_map_name = column.name + "_null";
auto null_map_type = std::make_shared<DataTypeUInt8>();
Block b;
b.insert({ null_map_type->createColumnConstWithDefaultValue(part->rows_count), null_map_type, null_map_name});
hanfei1991 marked this conversation as resolved.
Show resolved Hide resolved
auto compression_settings = this->context.chooseCompressionSettings(
part->bytes_on_disk,
static_cast<double>(part->bytes_on_disk) / this->getTotalActiveSizeInBytes());
MergedColumnOnlyOutputStream out(*this, b, part->getFullPath(), true /* sync */, compression_settings, true /* skip_offsets */);
out.write(b);
auto add_checksums = out.writeSuffixAndGetChecksums();
checksums.files[column.name + ".null.bin"] = add_checksums.files[null_map_name + ".bin"];
checksums.files[column.name + ".null.mrk"] = add_checksums.files[null_map_name + ".mrk"];
out_rename_map[null_map_name + ".bin"] = column.name + ".null.bin";
out_rename_map[null_map_name + ".mrk"] = column.name + ".null.mrk";
}
out_force_update_metadata = true;
continue;
}

if (isMetadataOnlyConversion(old_type, new_type))
{
out_force_update_metadata = true;
continue;
Expand Down Expand Up @@ -1110,7 +1134,8 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
ExpressionActionsPtr expression;
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Blocks changes to the part.
bool force_update_metadata;
createConvertExpression(part, part->columns, new_columns, expression, transaction->rename_map, force_update_metadata);
DataPart::Checksums new_checksums = part->checksums;
createConvertExpression(part, new_checksums, part->columns, new_columns, expression, transaction->rename_map, force_update_metadata);

size_t num_files_to_modify = transaction->rename_map.size();
size_t num_files_to_remove = 0;
Expand Down Expand Up @@ -1260,12 +1285,11 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
}

/// Update the checksums.
DataPart::Checksums new_checksums = part->checksums;
for (auto it : transaction->rename_map)
{
if (it.second.empty())
new_checksums.files.erase(it.first);
else
else if (add_checksums.files.find(it.first) != add_checksums.files.end())
new_checksums.files[it.second] = add_checksums.files[it.first];
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ class MergeTreeData : public ITableDeclaration
/// for transformation-free changing of Enum values list).
/// Files to be deleted are mapped to an empty string in out_rename_map.
/// If part == nullptr, just checks that all type conversions are possible.
void createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata) const;
void createConvertExpression(const DataPartPtr & part, DataPart::Checksums &, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
ExpressionActionsPtr & out_expression, NameToNameMap & out_rename_map, bool & out_force_update_metadata);

/// Calculates column sizes in compressed form for the current state of data_parts. Call with data_parts mutex locked.
void calculateColumnSizesImpl();
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Transaction/Codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,11 @@ inline T getFieldValue(const Field & field)

void EncodeDatum(const Field & field, TiDB::CodecFlag flag, std::stringstream & ss)
{
if (field.isNull())
{
ss << UInt8(TiDB::CodecFlagNil);
return;
}
ss << UInt8(flag);
switch (flag)
{
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Transaction/Datum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ DatumFlat::DatumFlat(const DB::Field & field, TP tp) : DatumBase(field, tp)
}
}

bool DatumFlat::invalidNull(const ColumnInfo & column_info) { return column_info.hasNotNullFlag() && orig.isNull(); }

bool DatumFlat::overflow(const ColumnInfo & column_info)
{
switch (tp)
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Transaction/Datum.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ class DatumFlat : public DatumBase
public:
DatumFlat(const DB::Field & field, TP tp);

/// Checks if it's null value with a not null type for schema mismatch detection.
bool invalidNull(const ColumnInfo & column_info);

/// Checks overflow for schema mismatch detection.
bool overflow(const ColumnInfo & column_info);
};
Expand Down
15 changes: 15 additions & 0 deletions dbms/src/Storages/Transaction/RegionBlockReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,21 @@ std::tuple<Block, bool> readRegionBlock(const TableInfo & table_info,

return std::make_tuple(Block(), false);
}
if (datum.invalidNull(column_info))
{
// Null value with non-null type detected, fatal if force_decode is true,
// as schema being newer and narrow shouldn't happen.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newer and narrow -> newer and with invalid null

// Otherwise return false to outer, outer should sync schema and try again.
if (force_decode)
{
const auto & data_type = ColumnDataInfoMap::getNameAndTypePair(col_info).type;
throw Exception("Detected invalid null when decoding data " + std::to_string(unflattened.get<UInt64>())
+ " of column " + column_info.name + " with type " + data_type->getName(),
ErrorCodes::LOGICAL_ERROR);
}

return std::make_tuple(Block(), false);
}
auto & mut_col = ColumnDataInfoMap::getMutableColumnPtr(col_info);
mut_col->insert(unflattened);
}
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/Transaction/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TableInfo & table_i
{
AlterCommands alter_commands;

/// Detect new columns.
// TODO: Detect rename columns.

/// Detect dropped columns.
Expand All @@ -75,6 +74,7 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TableInfo & table_i
alter_commands.emplace_back(std::move(command));
}

/// Detect new columns.
for (const auto & column_info : table_info.columns)
{
const auto & orig_column_info = std::find_if(orig_table_info.columns.begin(),
Expand Down Expand Up @@ -105,7 +105,8 @@ inline AlterCommands detectSchemaChanges(Logger * log, const TableInfo & table_i
if (column_info_.id == orig_column_info.id && column_info_.name != orig_column_info.name)
LOG_ERROR(log, "detect column " << orig_column_info.name << " rename to " << column_info_.name);

return column_info_.id == orig_column_info.id && column_info_.tp != orig_column_info.tp;
return column_info_.id == orig_column_info.id
&& (column_info_.tp != orig_column_info.tp || column_info_.hasNotNullFlag() != orig_column_info.hasNotNullFlag());
});

AlterCommand command;
Expand Down
68 changes: 68 additions & 0 deletions tests/mutable-test/txn_schema/alter_for_nullable.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@

# Preparation.
=> DBGInvoke __enable_schema_sync_service('false')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
=> DBGInvoke __refresh_schemas()

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Sync add column by reading.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int8, col_3 Int32')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test', 1, 3)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test', 2, 4)
=> DBGInvoke __try_flush_region(4)

# test add nullable flag and change type at the same time.
=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_2 Nullable(Int32)')
hanfei1991 marked this conversation as resolved.
Show resolved Hide resolved
# test trigger by background worker.
=> DBGInvoke __refresh_schemas()
=> select col_2 from default.test
┌─col_2─┐
│ 1 │
│ 2 │
└───────┘

# test only add nullable.
=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_3 Nullable(Int32)')
hanfei1991 marked this conversation as resolved.
Show resolved Hide resolved

=> DBGInvoke __put_region(5, 100, 150, default, test)
=> DBGInvoke __raft_insert_row(default, test, 5, 100, 'test', 1, NULL)
=> DBGInvoke __raft_insert_row(default, test, 5, 101, 'test', 2, NULL)
# test trigger by flush worker.
=> DBGInvoke __try_flush_region(5)

=> select col_3 from default.test
┌─col_2─┐
│ 3 │
│ 4 │
│ \N │
│ \N │
└───────┘

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

# Test convert nullable type to not-null type.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)')
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test', 1)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test', 2)
=> DBGInvoke __try_flush_region(4)
=> select col_2 from default.test
┌─col_2─┐
│ 1 │
│ 2 │
└───────┘
=> DBGInvoke __modify_column_in_tidb_table(default, test, 'col_2 Int16')
=> DBGInvoke __refresh_schemas()
=> select col_2 from default.test
┌─col_2─┐
│ 1 │
│ 2 │
└───────┘

=> DBGInvoke __refresh_schemas()