Skip to content

Commit

Permalink
[FLASH-470/516] DM learner read and check gc safe point (#249)
Browse files Browse the repository at this point in the history
* Do learner read before reading data from StorageDeltaMerge
* Check GC safe point
  • Loading branch information
JaySon-Huang committed Oct 22, 2019
1 parent 0ea92c5 commit b0e55ed
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 28 deletions.
2 changes: 1 addition & 1 deletion dbms/src/Debug/dbgFuncRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void dbgFuncRegionSnapshotWithData(Context & context, const ASTs & args, DBGInvo
for (auto it = args_begin; it != args_end; it += len)
{
HandleID handle_id = (HandleID)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[0]).value);
Timestamp tso = safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[1]).value);
Timestamp tso = (Timestamp)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[1]).value);
UInt8 del = (UInt8)safeGet<UInt64>(typeid_cast<const ASTLiteral &>(*it[2]).value);
{
std::vector<Field> fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,13 @@ try
ASTPtr astptr(new ASTIdentifier("t", ASTIdentifier::Kind::Table));
astptr->children.emplace_back(new ASTIdentifier("col1"));

storage = StorageDeltaMerge::create(
".", /* db_name= */ "default", /* name= */ "t", std::nullopt, ColumnsDescription{names_and_types_list}, astptr, DMTestEnv::getContext());
storage = StorageDeltaMerge::create(".",
/* db_name= */ "default",
/* name= */ "t",
std::nullopt,
ColumnsDescription{names_and_types_list},
astptr,
DMTestEnv::getContext());
storage->startup();
}

Expand All @@ -201,10 +206,14 @@ try
}

// get read stream from DeltaMergeStorage
Context & global_ctx = DMTestEnv::getContext();
QueryProcessingStage::Enum stage2;
SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
BlockInputStreamPtr dms = storage->read(column_names, query_info, DMTestEnv::getContext(), stage2, 8192, 1)[0];
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>();
query_info.mvcc_query_info->resolve_locks = global_ctx.getSettingsRef().resolve_locks;
query_info.mvcc_query_info->read_tso = global_ctx.getSettingsRef().read_tso;
BlockInputStreamPtr dms = storage->read(column_names, query_info, global_ctx, stage2, 8192, 1)[0];
dms->readPrefix();

size_t num_rows_read = 0;
Expand Down
136 changes: 131 additions & 5 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/Region.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TypeMapping.h>

Expand Down Expand Up @@ -238,6 +241,110 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin
}


namespace
{

void throwRetryRegion(const MvccQueryInfo::RegionsQueryInfo & regions_info)
{
std::vector<RegionID> region_ids;
region_ids.reserve(regions_info.size());
for (const auto & info : regions_info)
region_ids.push_back(info.region_id);
throw RegionException(region_ids);
}

inline void doLearnerRead(const TiDB::TableID table_id, //
const MvccQueryInfo::RegionsQueryInfo & regions_query_info, //
TMTContext & tmt, Poco::Logger * log)
{
assert(log != nullptr);

MvccQueryInfo::RegionsQueryInfo regions_info;
if (!regions_query_info.empty())
{
regions_info = regions_query_info;
}
else
{
// Only for test, because regions_query_info should never be empty if query is from TiDB or TiSpark.
auto regions = tmt.getRegionTable().getRegionsByTable(table_id);
regions_info.reserve(regions.size());
for (const auto & [id, region] : regions)
{
if (region == nullptr)
continue;
regions_info.emplace_back(RegionQueryInfo{id, region->version(), region->confVer(), {0, 0}});
}
}

KVStorePtr & kvstore = tmt.getKVStore();
RegionMap kvstore_region;
// check region is not null and store region map.
for (const auto & info : regions_info)
{
auto region = kvstore->getRegion(info.region_id);
if (region == nullptr)
{
LOG_WARNING(log, "[region " << info.region_id << "] is not found in KVStore, try again");
throwRetryRegion(regions_info);
}
kvstore_region.emplace(info.region_id, std::move(region));
}
// make sure regions are not duplicated.
if (unlikely(kvstore_region.size() != regions_info.size()))
throw Exception("Duplicate region id", ErrorCodes::LOGICAL_ERROR);

auto start_time = Clock::now();
/// Blocking learner read. Note that learner read must be performed ahead of data read,
/// otherwise the desired index will be blocked by the lock of data read.
for (auto && [region_id, region] : kvstore_region)
{
(void)region_id;
region->waitIndex(region->learnerRead());
}
auto end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait index cost " << std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data
start_time = Clock::now();
std::set<RegionID> regions_flushing_in_bg_threads;
auto & region_table = tmt.getRegionTable();
for (auto && [region_id, region] : kvstore_region)
{
(void)region;
bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false);
// If region is flushing by other bg threads, we should mark those regions to wait.
if (!is_flushed)
{
regions_flushing_in_bg_threads.insert(region_id);
LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread.");
}
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of " << kvstore_region.size()
<< " to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count() << " ms");

// Maybe there is some data not flush to store yet, we should wait till all regions is flushed.
if (!regions_flushing_in_bg_threads.empty())
{
start_time = Clock::now();
for (const auto & region_id : regions_flushing_in_bg_threads)
{
region_table.waitTillRegionFlushed(region_id);
}
end_time = Clock::now();
LOG_DEBUG(log,
"[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end_time - start_time).count()
<< " ms");
}
}

} // namespace

BlockInputStreams StorageDeltaMerge::read( //
const Names & column_names,
const SelectQueryInfo & query_info,
Expand Down Expand Up @@ -279,11 +386,30 @@ BlockInputStreams StorageDeltaMerge::read( //
return store->readRaw(context, context.getSettingsRef(), to_read, num_streams);
else
{
// read with specify tso
UInt64 max_version = DEFAULT_MAX_READ_TSO;
if (query_info.mvcc_query_info)
max_version = query_info.mvcc_query_info->read_tso;
return store->read(context, context.getSettingsRef(), to_read, ranges, num_streams, max_version, max_block_size);
if (unlikely(!query_info.mvcc_query_info))
throw Exception("mvcc query info is null", ErrorCodes::LOGICAL_ERROR);

TMTContext & tmt = context.getTMTContext();
if (unlikely(!tmt.isInitialized()))
throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR);

const auto & mvcc_query_info = *query_info.mvcc_query_info;
// Read with specify tso, check if tso is smaller than TiDB GcSafePoint
const auto safe_point = tmt.getPDClient()->getGCSafePoint();
if (mvcc_query_info.read_tso < safe_point)
throw Exception("query id: " + context.getCurrentQueryId() + ", read tso: " + toString(mvcc_query_info.read_tso)
+ " is smaller than tidb gc safe point: " + toString(safe_point),
ErrorCodes::LOGICAL_ERROR);

// With `no_kvstore` is true, we do not do learner read
if (likely(!select_query.no_kvstore))
{
/// Learner read.
doLearnerRead(tidb_table_info.id, mvcc_query_info.regions_query_info, tmt, log);
}

return store->read(
context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, max_block_size);
}
}

Expand Down
23 changes: 19 additions & 4 deletions dbms/src/Storages/Transaction/RegionTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ void RegionTable::tryFlushRegion(RegionID region_id)
tryFlushRegion(region_id, table_id, false);
}

void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist)
bool RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist)
{
const auto func_update_region = [&](std::function<bool(InternalRegion &)> && callback) -> bool {
std::lock_guard<std::mutex> lock(mutex);
Expand Down Expand Up @@ -397,7 +397,7 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const boo
});

if (!status)
return;
return false;

std::exception_ptr first_exception;

Expand All @@ -420,14 +420,16 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const boo
if (cache_bytes)
dirty_regions.insert(region_id);
else
dirty_regions.erase(region_id);
clearDirtyFlag(region_id);

internal_region.last_flush_time = Clock::now();
return true;
});

if (first_exception)
std::rethrow_exception(first_exception);

return true;
}

bool RegionTable::tryFlushRegions()
Expand Down Expand Up @@ -466,7 +468,7 @@ bool RegionTable::tryFlushRegions()
if (shouldFlush(region))
{
to_flush = DataToFlush{table_id, region.region_id, true};
dirty_regions.erase(region.region_id);
clearDirtyFlag(region.region_id);
return true;
}
return false;
Expand All @@ -481,6 +483,19 @@ bool RegionTable::tryFlushRegions()
return true;
}

void RegionTable::clearDirtyFlag(RegionID region_id)
{
std::lock_guard lock(dirty_regions_mutex);
dirty_regions.erase(region_id);
dirty_regions_cv.notify_all();
}

void RegionTable::waitTillRegionFlushed(const RegionID region_id)
{
std::unique_lock lock(dirty_regions_mutex);
dirty_regions_cv.wait(lock, [this, region_id]{ return dirty_regions.count(region_id) == 0;});
}

void RegionTable::handleInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegions &)> && callback) const
{
std::lock_guard<std::mutex> lock(mutex);
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/Transaction/RegionTable.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <condition_variable>
#include <functional>
#include <mutex>
#include <optional>
#include <vector>

Expand Down Expand Up @@ -120,6 +122,8 @@ class RegionTable : private boost::noncopyable
TableMap tables;
RegionInfoMap regions;
std::unordered_set<RegionID> dirty_regions;
std::mutex dirty_regions_mutex;
std::condition_variable dirty_regions_cv;
std::unordered_set<TableID> table_to_clean;

FlushThresholds flush_thresholds;
Expand All @@ -143,6 +147,8 @@ class RegionTable : private boost::noncopyable
void doShrinkRegionRange(const Region & region);
void doUpdateRegion(const Region & region, TableID table_id);

void clearDirtyFlag(RegionID region_id);

public:
RegionTable(Context & context_);
void restore();
Expand Down Expand Up @@ -173,7 +179,9 @@ class RegionTable : private boost::noncopyable
bool tryFlushRegions();

void tryFlushRegion(RegionID region_id);
void tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist);
bool tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist);

void waitTillRegionFlushed(RegionID region_id);

void handleInternalRegionsByTable(const TableID table_id, std::function<void(const InternalRegions &)> && callback) const;
std::vector<std::pair<RegionID, RegionPtr>> getRegionsByTable(const TableID table_id) const;
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/test_utils/TiflashTestBasic.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#pragma once

#include <Interpreters/Context.h>
#include <gtest/gtest.h>

#include <Interpreters/Context.h>
#include <Storages/Transaction/TMTContext.h>

namespace DB
{
namespace tests
Expand All @@ -20,7 +22,8 @@ class TiFlashTestEnv
}
catch (Exception & e)
{
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "./__tmp_data/regmap", TiDB::StorageEngine::TMT);
context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", TiDB::StorageEngine::TMT);
context.getTMTContext().restore();
}
context.getSettingsRef() = settings;
return context;
Expand Down
22 changes: 11 additions & 11 deletions tests/delta-merge-test/raft/read_with_specify_tso.test
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
=> DBGInvoke __region_snapshot_data(
default, test_dm,
4, 0, 1000,
2, 0, 0, 11,
1, 3, 0, 13,
3, 4, 0, 0
2, 20000000, 0, 11,
1, 20000003, 0, 13,
3, 20000004, 0, 0
)
=> DBGInvoke __try_flush_region(4)
=> select * from default.test_dm
Expand All @@ -31,11 +31,11 @@

## raft_insert_row_full(db_name, tbl_name, region_id, handle_id, tso, del, val1, val2, ...)
## upset rowid==3 with col_1 == 10086
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 5, 0, 10086)
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 20000005, 0, 10086)
## del rowid == 2
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 5, 1, 0)
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 20000005, 1, 0)
## insert rowid==4
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 4, 5, 0, 1234)
=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 4, 20000005, 0, 1234)
=> DBGInvoke __try_flush_region(4)
=> select (*) from default.test_dm
┌─col_1─┬─_tidb_rowid─┐
Expand All @@ -45,32 +45,32 @@
└───────┴─────────────┘

## read with specify tso
=> select * from default.test_dm " --read_tso "0
=> select * from default.test_dm " --read_tso "20000000
┌─col_1─┬─_tidb_rowid─┐
│ 11 │ 2 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "3
=> select * from default.test_dm " --read_tso "20000003
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 11 │ 2 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "4
=> select * from default.test_dm " --read_tso "20000004
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 11 │ 2 │
│ 0 │ 3 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "5
=> select * from default.test_dm " --read_tso "20000005
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 10086 │ 3 │
│ 1234 │ 4 │
└───────┴─────────────┘

=> select * from default.test_dm " --read_tso "100000
=> select * from default.test_dm " --read_tso "90000000
┌─col_1─┬─_tidb_rowid─┐
│ 13 │ 1 │
│ 10086 │ 3 │
Expand Down

0 comments on commit b0e55ed

Please sign in to comment.