Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-470/516] DM learner read and check gc safe point #249

Merged
merged 3 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void dbgFuncRegionSnapshotWithData(Context & context, const ASTs & args, DBGInvo
for (auto it = args_begin; it != args_end; it += len)
{
HandleID handle_id = (HandleID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[0]).value);
Timestamp tso = (UInt8)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[1]).value);
Timestamp tso = (Timestamp)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[1]).value);
UInt8 del = (UInt8)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[2]).value);
{
std::vector<Field> fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,13 @@ try
ASTPtr astptr(new ASTIdentifier("t", ASTIdentifier::Kind::Table));
astptr->children.emplace_back(new ASTIdentifier("col1"));

storage = StorageDeltaMerge::create(
".", /* db_name= */ "default", /* name= */ "t", std::nullopt, ColumnsDescription{names_and_types_list}, astptr, DMTestEnv::getContext());
storage = StorageDeltaMerge::create(".",
/* db_name= */ "default",
/* name= */ "t",
std::nullopt,
ColumnsDescription{names_and_types_list},
astptr,
DMTestEnv::getContext());
storage->startup();
}

Expand All @@ -201,10 +206,14 @@ try
}

// get read stream from DeltaMergeStorage
Context & global_ctx = DMTestEnv::getContext();
QueryProcessingStage::Enum stage2;
SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
BlockInputStreamPtr dms = storage->read(column_names, query_info, DMTestEnv::getContext(), stage2, 8192, 1)[0];
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>();
query_info.mvcc_query_info->resolve_locks = global_ctx.getSettingsRef().resolve_locks;
query_info.mvcc_query_info->read_tso = global_ctx.getSettingsRef().read_tso;
BlockInputStreamPtr dms = storage->read(column_names, query_info, global_ctx, stage2, 8192, 1)[0];
dms->readPrefix();

size_t num_rows_read = 0;
Expand Down
136 changes: 131 additions & 5 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TypeMapping.h>

Expand Down Expand Up @@ -238,6 +241,110 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin
}


namespace
{

void throwRetryRegion(const MvccQueryInfo::RegionsQueryInfo & regions_info)
{
std::vector<RegionID> region_ids;
region_ids.reserve(regions_info.size());
for (const auto & info : regions_info)
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
}

inline void doLearnerRead(const TiDB::TableID table_id, //
const MvccQueryInfo::RegionsQueryInfo & regions_query_info, //
TMTContext & tmt, Poco::Logger * log)
{
assert(log != nullptr);

MvccQueryInfo::RegionsQueryInfo regions_info;
if (!regions_query_info.empty())
{
regions_info = regions_query_info;
}
else
{
// Only for test, because regions_query_info should never be empty if query is from TiDB or TiSpark.
auto regions = tmt.getRegionTable().getRegionsByTable(table_id);
regions_info.reserve(regions.size());
for (const auto & [id, region] : regions)
{
if (region == nullptr)
continue;
regions_info.emplace_back(RegionQueryInfo{id, region->version(), region->confVer(), {0, 0}});
}
}

KVStorePtr & kvstore = tmt.getKVStore();
RegionMap kvstore_region;
// check region is not null and store region map.
for (const auto & info : regions_info)
{
auto region = kvstore->getRegion(info.region_id);
if (region == nullptr)
{
LOG_WARNING(log, "[region " << info.region_id << "] is not found in KVStore, try again");
throwRetryRegion(regions_info);
}
kvstore_region.emplace(info.region_id, std::move(region));
}
// make sure regions are not duplicated.
if (unlikely(kvstore_region.size() != regions_info.size()))
throw Exception("Duplicate region id", ErrorCodes::LOGICAL_ERROR);

auto start_time = Clock::now();
/// Blocking learner read. Note that learner read must be performed ahead of data read,
/// otherwise the desired index will be blocked by the lock of data read.
for (auto && [region_id, region] : kvstore_region)
{
(void)region_id;
region->waitIndex(region->learnerRead());
}
auto end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait index cost " << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data
start_time = Clock::now();
std::set<RegionID> regions_flushing_in_bg_threads;
auto & region_table = tmt.getRegionTable();
for (auto && [region_id, region] : kvstore_region)
{
(void)region;
bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false);
// If region is flushing by other bg threads, we should mark those regions to wait.
if (!is_flushed)
{
regions_flushing_in_bg_threads.insert(region_id);
LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread.");
}
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of " << kvstore_region.size()
<< " to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// Maybe there is some data not flush to store yet, we should wait till all regions is flushed.
if (!regions_flushing_in_bg_threads.empty())
{
start_time = Clock::now();
for (const auto & region_id : regions_flushing_in_bg_threads)
{
region_table.waitTillRegionFlushed(region_id);
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count()
<< " ms");
}
}

} // namespace

BlockInputStreams StorageDeltaMerge::read( //
const Names & column_names,
const SelectQueryInfo & query_info,
Expand Down Expand Up @@ -279,11 +386,30 @@ BlockInputStreams StorageDeltaMerge::read( //
return store->readRaw(context, context.getSettingsRef(), to_read, num_streams);
else
{
// read with specify tso
UInt64 max_version = DEFAULT_MAX_READ_TSO;
if (query_info.mvcc_query_info)
max_version = query_info.mvcc_query_info->read_tso;
return store->read(context, context.getSettingsRef(), to_read, ranges, num_streams, max_version, max_block_size);
if (unlikely(!query_info.mvcc_query_info))
throw Exception("mvcc query info is null", ErrorCodes::LOGICAL_ERROR);

TMTContext & tmt = context.getTMTContext();
if (unlikely(!tmt.isInitialized()))
throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR);

const auto & mvcc_query_info = *query_info.mvcc_query_info;
// Read with specify tso, check if tso is smaller than TiDB GcSafePoint
const auto safe_point = tmt.getPDClient()->getGCSafePoint();
if (mvcc_query_info.read_tso < safe_point)
throw Exception("query id: " + context.getCurrentQueryId() + ", read tso: " + toString(mvcc_query_info.read_tso)
+ " is smaller than tidb gc safe point: " + toString(safe_point),
ErrorCodes::LOGICAL_ERROR);

// With `no_kvstore` is true, we do not do learner read
if (likely(!select_query.no_kvstore))
{
/// Learner read.
doLearnerRead(tidb_table_info.id, mvcc_query_info.regions_query_info, tmt, log);
}

return store->read(
context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, max_block_size);
}
}

Expand Down
23 changes: 19 additions & 4 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ void RegionTable::tryFlushRegion(RegionID region_id)
tryFlushRegion(region_id, table_id, false);
}

void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist)
bool RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist)
{
const auto func_update_region = [&](std::function<bool(InternalRegion &)> && callback) -> bool {
std::lock_guard<std::mutex> lock(mutex);
Expand Down Expand Up @@ -383,7 +383,7 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const boo
});

if (!status)
return;
return false;

std::exception_ptr first_exception;

Expand All @@ -406,14 +406,16 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const boo
if (cache_bytes)
dirty_regions.insert(region_id);
else
dirty_regions.erase(region_id);
clearDirtyFlag(region_id);

internal_region.last_flush_time = Clock::now();
return true;
});

if (first_exception)
std::rethrow_exception(first_exception);

return true;
}

bool RegionTable::tryFlushRegions()
Expand Down Expand Up @@ -452,7 +454,7 @@ bool RegionTable::tryFlushRegions()
if (shouldFlush(region))
{
to_flush = DataToFlush{table_id, region.region_id, true};
dirty_regions.erase(region.region_id);
clearDirtyFlag(region.region_id);
return true;
}
return false;
Expand All @@ -467,6 +469,19 @@ bool RegionTable::tryFlushRegions()
return true;
}

void RegionTable::clearDirtyFlag(RegionID region_id)
{
std::lock_guard lock(dirty_regions_mutex);
dirty_regions.erase(region_id);
dirty_regions_cv.notify_all();
}

void RegionTable::waitTillRegionFlushed(const RegionID region_id)
{
std::unique_lock lock(dirty_regions_mutex);
dirty_regions_cv.wait(lock, [this, region_id]{ return dirty_regions.count(region_id) == 0;});
}

void RegionTable::traverseInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegion &)> && callback) const
{
std::lock_guard<std::mutex> lock(mutex);
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <condition_variable>
#include <functional>
#include <mutex>
#include <optional>
#include <vector>

Expand Down Expand Up @@ -120,6 +122,8 @@ class RegionTable : private boost::noncopyable
TableMap tables;
RegionInfoMap regions;
std::unordered_set<RegionID> dirty_regions;
std::mutex dirty_regions_mutex;
std::condition_variable dirty_regions_cv;

FlushThresholds flush_thresholds;

Expand All @@ -142,6 +146,8 @@ class RegionTable : private boost::noncopyable
void doShrinkRegionRange(const Region & region);
void doUpdateRegion(const Region & region, TableID table_id);

void clearDirtyFlag(RegionID region_id);

public:
RegionTable(Context & context_);
void restore();
Expand Down Expand Up @@ -170,7 +176,9 @@ class RegionTable : private boost::noncopyable
bool tryFlushRegions();

void tryFlushRegion(RegionID region_id);
void tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist);
bool tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist);

void waitTillRegionFlushed(RegionID region_id);

void traverseInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegion &)> && callback) const;
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(const TableID table_id) const;
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/test_utils/TiflashTestBasic.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#pragma once

#include <Interpreters/Context.h>
#include <gtest/gtest.h>

#include <Interpreters/Context.h>
#include <Storages/Transaction/TMTContext.h>

namespace DB
{
namespace tests
Expand All @@ -20,7 +22,8 @@ class TiFlashTestEnv
}
catch (Exception & e)
{
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "./__tmp_data/regmap", TiDB::StorageEngine::TMT);
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", TiDB::StorageEngine::TMT);
context.getTMTContext().restore();
}
context.getSettingsRef() = settings;
return context;
Expand Down
22 changes: 11 additions & 11 deletions tests/delta-merge-test/raft/read_with_specify_tso.test
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
=> DBGInvoke __region_snapshot_data(
default, test_dm,
4, 0, 1000,
2, 0, 0, 11,
1, 3, 0, 13,
3, 4, 0, 0
2, 20000000, 0, 11,
1, 20000003, 0, 13,
3, 20000004, 0, 0
)
=> DBGInvoke __try_flush_region(4)
=> select * from default.test_dm
Expand All @@ -31,11 +31,11 @@

## raft_insert_row_full(db_name, tbl_name, region_id, handle_id, tso, del, val1, val2, ...)
## upset rowid==3 with col_1 == 10086
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 5, 0, 10086)
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 20000005, 0, 10086)
## del rowid == 2
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 5, 1, 0)
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 20000005, 1, 0)
## insert rowid==4
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 4, 5, 0, 1234)
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 4, 20000005, 0, 1234)
=> DBGInvoke __try_flush_region(4)
=> select (*) from default.test_dm
┌─col_1─┬─_tidb_rowid─┐
Expand All @@ -45,32 +45,32 @@
└───────┴─────────────┘

## read with specify tso
=> select * from default.test_dm " --read_tso "0
=> select * from default.test_dm " --read_tso "20000000
┌─col_1─┬─_tidb_rowid─┐
│ 11 │ 2 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "3
=> select * from default.test_dm " --read_tso "20000003
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 11 │ 2 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "4
=> select * from default.test_dm " --read_tso "20000004
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 11 │ 2 │
│ 0 │ 3 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "5
=> select * from default.test_dm " --read_tso "20000005
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 10086 │ 3 │
│ 1234 │ 4 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "100000
=> select * from default.test_dm " --read_tso "90000000
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 10086 │ 3 │
Expand Down