Skip to content

Commit

Permalink
[FLASH-461/471] Basic read/write StorageDeltaMerge's table through Ra…
Browse files Browse the repository at this point in the history
…ft (pingcap#217)

* Add enum StorageEngine for indicating which storage engine to use
* `DBGInvoke mock_tidb_table` now support Engine=DeltaMerge
* We can specify default storage engine by raft.storage_engine in tiflash.xml

* Use `IManageableStorage` as interface for Storages synced from TiDB
* TMTStorages now store ptr to IManageableStore instead of StorageMergeTree

* Add `StorageDeltaMerge::deleteRange`
* Support `applySnapshot` by using `StorageDeltaMerge::deleteRange`

* Use ::DB::MutableSupport for constant naming in DeltaMergeDefines.h

* Note that we can NOT read data in KVStore by now, we must flush data to StorageDeltaMerge by using some commands like `DBGInvoke try_flush_region`
  • Loading branch information
JaySon-Huang committed Oct 23, 2019
1 parent 1b2933c commit 2b5380b
Show file tree
Hide file tree
Showing 68 changed files with 1,162 additions and 280 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ config-preprocessed.xml
*.pb.cpp
*.pb.h

# Mac OS
.DS_Store

# Ignore symlink to private repository
/private

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@
\
M(DMWriteBlock) \
M(DMWriteBlockNS) \
M(DMDeleteRange) \
M(DMDeleteRangeNS) \
M(DMAppendDeltaPrepare) \
M(DMAppendDeltaPrepareNS) \
M(DMAppendDeltaCommitMemory) \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/DeletingDeletedBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <common/logger_useful.h>
#include <DataStreams/IProfilingBlockInputStream.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/StringStreamBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/dedupUtils.h>
#include <DataTypes/DataTypeString.h>

namespace DB
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/dedupUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Storages/MutableSupport.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnFixedString.h>
#include <DataStreams/IBlockInputStream.h>

#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
Expand Down
26 changes: 22 additions & 4 deletions dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
namespace DB
{

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
} // namespace ErrorCodes

using ColumnInfo = TiDB::ColumnInfo;
using TableInfo = TiDB::TableInfo;
using PartitionInfo = TiDB::PartitionInfo;
Expand Down Expand Up @@ -134,7 +139,7 @@ DatabaseID MockTiDB::newDataBase(const String & database_name)
}

TableID MockTiDB::newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name)
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, const String & engine_type)
{
std::lock_guard lock(tables_mutex);

Expand All @@ -144,12 +149,12 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
throw Exception("Mock TiDB table " + qualified_name + " already exists", ErrorCodes::TABLE_ALREADY_EXISTS);
}

TableInfo table_info;

if (databases.find(database_name) == databases.end())
{
throw Exception("MockTiDB not found db: " + database_name, ErrorCodes::LOGICAL_ERROR);
}

TableInfo table_info;
table_info.db_id = databases[database_name];
table_info.db_name = database_name;
table_info.id = table_id_allocator++;
Expand All @@ -172,9 +177,22 @@ TableID MockTiDB::newTable(const String & database_name, const String & table_na
table_info.comment = "Mocked.";
table_info.update_timestamp = tso;

// set storage engine type
std::transform(engine_type.begin(), engine_type.end(), engine_type.begin(), [](unsigned char c) { return std::tolower(c); });
if (!(engine_type == "tmt" || engine_type == "dm"))
{
throw Exception("Unknown engine type : " + engine_type +", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS);
}
if (engine_type == "tmt")
table_info.engine_type = TiDB::StorageEngine::TMT;
else if (engine_type == "dm")
table_info.engine_type = TiDB::StorageEngine::DM;
else
throw Exception("Unknown engine type : " + engine_type +", must be 'tmt' or 'dm'", ErrorCodes::BAD_ARGUMENTS);

auto table = std::make_shared<Table>(database_name, table_name, std::move(table_info));
tables_by_id.emplace(table->table_info.id, table);
tables_by_name.emplace(database_name + "." + table_name, table);
tables_by_name.emplace(qualified_name, table);

version++;
SchemaDiff diff;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class MockTiDB : public ext::singleton<MockTiDB>

public:
TableID newTable(const String & database_name, const String & table_name,
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name);
const ColumnsDescription & columns, Timestamp tso, const String & handle_pk_name, const String &engine_type);

DatabaseID newDataBase(const String & database_name);

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Debug/dbgFuncMockTiDBTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ extern const int LOGICAL_ERROR;

void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, DBGInvoker::Printer output)
{
if (args.size() != 3 && args.size() != 4)
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() != 3 && args.size() != 4 && args.size() != 5)
throw Exception("Args not matched, should be: database-name, table-name, schema-string [, handle_pk_name], [, engine-type(tmt|dm)]", ErrorCodes::BAD_ARGUMENTS);

const String & database_name = typeid_cast<const ASTIdentifier &>(*args[0]).name;
const String & table_name = typeid_cast<const ASTIdentifier &>(*args[1]).name;
Expand All @@ -45,9 +45,14 @@ void MockTiDBTable::dbgFuncMockTiDBTable(Context & context, const ASTs & args, D
throw Exception("Invalid TiDB table schema", ErrorCodes::LOGICAL_ERROR);
ColumnsDescription columns
= InterpreterCreateQuery::getColumnsDescription(typeid_cast<const ASTExpressionList &>(*columns_ast), context);

String engine_type("tmt");
if (args.size() == 4)
engine_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);

auto tso = context.getTMTContext().getPDClient()->getTS();

TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso, handle_pk_name);
TableID table_id = MockTiDB::instance().newTable(database_name, table_name, columns, tso, handle_pk_name, engine_type);

std::stringstream ss;
ss << "mock table #" << table_id;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ TableID getTableID(Context & context, const std::string & database_name, const s
}

auto storage = context.getTable(database_name, table_name);
auto * merge_tree = dynamic_cast<StorageMergeTree *>(storage.get());
auto table_info = merge_tree->getTableInfo();
auto managed_storage = std::static_pointer_cast<IManageableStorage>(storage);
auto table_info = managed_storage->getTableInfo();
return table_info.id;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void dbgFuncDumpAllMockRegion(Context & context, const ASTs & args, DBGInvoker::

// Try flush regions
// Usage:
// ./storage-client.sh "DBGInvoke try_flush()"
// ./storage-client.sh "DBGInvoke try_flush([force_flush])"
void dbgFuncTryFlush(Context & context, const ASTs & args, DBGInvoker::Printer output);

// Try flush regions
Expand Down
13 changes: 7 additions & 6 deletions dbms/src/Functions/tests/gtest_funtions_decimal_arith.cpp
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
#include <test_utils/TiflashTestBasic.h>

#include <Interpreters/Context.h>
#include <DataTypes/DataTypeDecimal.h>
#include <Functions/FunctionsArithmetic.h>
#include <Interpreters/Context.h>

namespace DB
{
namespace tests
{

void ASSERT_DecimalDataTypeScaleEq(const DataTypePtr &actual_, ScaleType expected_scale)
void ASSERT_DecimalDataTypeScaleEq(const DataTypePtr & actual_, ScaleType expected_scale)
{
if (auto actual = checkDecimal<Decimal32>(*actual_))
ASSERT_EQ(actual->getScale(), expected_scale);
Expand All @@ -36,10 +36,12 @@ TEST(DataTypeDecimal_test, A)
DataTypePtr lhs = createDecimal(10, 4);
DataTypePtr rhs = createDecimal(10, 6);

const ScaleType scale_max = std::max(typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale(), (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale());
const ScaleType scale_sum = typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale() + (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale();
const ScaleType scale_max = std::max(
typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale(), (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale());
const ScaleType scale_sum
= typeid_cast<const DataTypeDecimal64 *>(lhs.get())->getScale() + (typeid_cast<const DataTypeDecimal64 *>(rhs.get()))->getScale();

Context context = TiFlashTestEnv::getContext();
Context & context = TiFlashTestEnv::getContext();
DataTypes args{lhs, rhs};

// Decimal(10, 4) + Decimal(10, 6)
Expand All @@ -56,7 +58,6 @@ TEST(DataTypeDecimal_test, A)
func = FunctionMultiply::create(context);
return_type = func->getReturnTypeImpl(args);
ASSERT_DecimalDataTypeScaleEq(return_type, scale_sum);

}

} // namespace tests
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1410,12 +1410,13 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & raft_service_address)
const std::string & raft_service_address,
::TiDB::StorageEngine engine)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, raft_service_address);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, raft_service_address, engine);
}

void Context::initializePartPathSelector(std::vector<std::string> && all_normal_path, std::vector<std::string> && all_fast_path)
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Interpreters/ClientInfo.h>
#include <IO/CompressionSettings.h>
#include <Storages/PartPathSelector.h>
#include <Storages/Transaction/StorageEngineType.h>


namespace Poco
Expand Down Expand Up @@ -365,7 +366,8 @@ class Context
const std::string & learner_value,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
const std::string & raft_service_address);
const std::string & raft_service_address,
::TiDB::StorageEngine engine);
RaftService & getRaftService();

void initializeSchemaSyncService();
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
if (!storage_)
return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false);

// TODO handle if storage_ is a DeltaMerge?
const auto merge_tree = dynamic_cast<const StorageMergeTree *>(storage_.get());
if (!merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Specifying schema_version for non-TMT storage: " + storage_->getName() + ", table: " + qualified_name + " is not allowed", ErrorCodes::LOGICAL_ERROR);
Expand All @@ -219,7 +220,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
// Not allow storage schema version greater than query schema version in any case.
if (storage_schema_version > query_schema_version)
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);
throw Exception("Table " + qualified_name + " schema version " + toString(storage_schema_version) + " newer than query schema version " + toString(query_schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

// If schema synced, we must be very recent so we are good as long as storage schema version is no greater than query schema version.
// If schema not synced, we are good if storage schema version is right on query schema version.
Expand Down
23 changes: 19 additions & 4 deletions dbms/src/Parsers/ASTInsertQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ namespace DB
*/
class ASTInsertQuery : public IAST
{
public:
explicit ASTInsertQuery(bool is_import_ = false) : is_import(is_import_) {}
explicit ASTInsertQuery(String database_, String table_, bool is_import_)
: database(std::move(database_)), table(std::move(table_)), is_import(is_import_)
{}

public:
String database;
String table;
Expand Down Expand Up @@ -38,11 +44,20 @@ class ASTInsertQuery : public IAST
auto res = std::make_shared<ASTInsertQuery>(*this);
res->children.clear();

if (columns) { res->columns = columns->clone(); res->children.push_back(res->columns); }
if (select) { res->select = select->clone(); res->children.push_back(res->select); }
if (columns)
{
res->columns = columns->clone();
res->children.push_back(res->columns);
}
if (select)
{
res->select = select->clone();
res->children.push_back(res->select);
}
if (table_function)
{
res->table_function = table_function->clone(); res->children.push_back(res->table_function);
res->table_function = table_function->clone();
res->children.push_back(res->table_function);
}

return res;
Expand All @@ -52,4 +67,4 @@ class ASTInsertQuery : public IAST
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};

}
} // namespace DB
20 changes: 19 additions & 1 deletion dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
#include <Storages/Transaction/SchemaSyncer.h>
#include <Storages/Transaction/StorageEngineType.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/registerStorages.h>
#include <TableFunctions/registerTableFunctions.h>
Expand Down Expand Up @@ -350,6 +351,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
std::string kvstore_path = path + "kvstore/";
String flash_server_addr = config().getString("flash.service_addr", "0.0.0.0:3930");

::TiDB::StorageEngine engine_if_empty = ::TiDB::StorageEngine::TMT;
::TiDB::StorageEngine engine = engine_if_empty;

if (config().has("raft"))
{
need_raft_service = true;
Expand Down Expand Up @@ -405,11 +409,25 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
kvstore_path = config().getString("raft.kvstore_path");
}

if (config().has("raft.storage_engine"))
{
String s_engine = config().getString("raft.storage_engine");
std::transform(s_engine.begin(), s_engine.end(), s_engine.begin(),
[](char ch){return std::tolower(ch);});
if (s_engine == "tmt")
engine = ::TiDB::StorageEngine::TMT;
else if (s_engine == "dm")
engine = ::TiDB::StorageEngine::DM;
else
engine = engine_if_empty;
}
}

{
LOG_DEBUG(log, "Default storage engine: " << static_cast<Int64>(engine));
/// create TMTContext
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr);
global_context->createTMTContext(pd_addrs, learner_key, learner_value, ignore_databases, kvstore_path, flash_server_addr, engine);
global_context->getTMTContext().getRegionTable().setTableCheckerThreshold(config().getDouble("flash.overlap_threshold", 0.9));
}

Expand Down
Loading

0 comments on commit 2b5380b

Please sign in to comment.