diff --git a/dbms/src/Debug/dbgFuncRegion.cpp b/dbms/src/Debug/dbgFuncRegion.cpp index 96efb186823..b88e0e9d741 100644 --- a/dbms/src/Debug/dbgFuncRegion.cpp +++ b/dbms/src/Debug/dbgFuncRegion.cpp @@ -127,7 +127,7 @@ void dbgFuncRegionSnapshotWithData(Context & context, const ASTs & args, DBGInvo for (auto it = args_begin; it != args_end; it += len) { HandleID handle_id = (HandleID)safeGet(typeid_cast(*it[0]).value); - Timestamp tso = safeGet(typeid_cast(*it[1]).value); + Timestamp tso = (Timestamp)safeGet(typeid_cast(*it[1]).value); UInt8 del = (UInt8)safeGet(typeid_cast(*it[2]).value); { std::vector fields; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 8afa78013d3..65734d43c80 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -185,8 +185,13 @@ try ASTPtr astptr(new ASTIdentifier("t", ASTIdentifier::Kind::Table)); astptr->children.emplace_back(new ASTIdentifier("col1")); - storage = StorageDeltaMerge::create( - ".", /* db_name= */ "default", /* name= */ "t", std::nullopt, ColumnsDescription{names_and_types_list}, astptr, DMTestEnv::getContext()); + storage = StorageDeltaMerge::create(".", + /* db_name= */ "default", + /* name= */ "t", + std::nullopt, + ColumnsDescription{names_and_types_list}, + astptr, + DMTestEnv::getContext()); storage->startup(); } @@ -201,10 +206,14 @@ try } // get read stream from DeltaMergeStorage + Context & global_ctx = DMTestEnv::getContext(); QueryProcessingStage::Enum stage2; SelectQueryInfo query_info; - query_info.query = std::make_shared(); - BlockInputStreamPtr dms = storage->read(column_names, query_info, DMTestEnv::getContext(), stage2, 8192, 1)[0]; + query_info.query = std::make_shared(); + query_info.mvcc_query_info = std::make_unique(); + query_info.mvcc_query_info->resolve_locks = global_ctx.getSettingsRef().resolve_locks; + query_info.mvcc_query_info->read_tso = global_ctx.getSettingsRef().read_tso; + BlockInputStreamPtr dms = storage->read(column_names, query_info, global_ctx, stage2, 8192, 1)[0]; dms->readPrefix(); size_t num_rows_read = 0; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 4f3d6364372..c4b9bdff4ee 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -20,6 +20,9 @@ #include #include #include +#include +#include +#include #include #include @@ -238,6 +241,110 @@ BlockOutputStreamPtr StorageDeltaMerge::write(const ASTPtr & query, const Settin } +namespace +{ + +void throwRetryRegion(const MvccQueryInfo::RegionsQueryInfo & regions_info) +{ + std::vector region_ids; + region_ids.reserve(regions_info.size()); + for (const auto & info : regions_info) + region_ids.push_back(info.region_id); + throw RegionException(region_ids); +} + +inline void doLearnerRead(const TiDB::TableID table_id, // + const MvccQueryInfo::RegionsQueryInfo & regions_query_info, // + TMTContext & tmt, Poco::Logger * log) +{ + assert(log != nullptr); + + MvccQueryInfo::RegionsQueryInfo regions_info; + if (!regions_query_info.empty()) + { + regions_info = regions_query_info; + } + else + { + // Only for test, because regions_query_info should never be empty if query is from TiDB or TiSpark. + auto regions = tmt.getRegionTable().getRegionsByTable(table_id); + regions_info.reserve(regions.size()); + for (const auto & [id, region] : regions) + { + if (region == nullptr) + continue; + regions_info.emplace_back(RegionQueryInfo{id, region->version(), region->confVer(), {0, 0}}); + } + } + + KVStorePtr & kvstore = tmt.getKVStore(); + RegionMap kvstore_region; + // check region is not null and store region map. + for (const auto & info : regions_info) + { + auto region = kvstore->getRegion(info.region_id); + if (region == nullptr) + { + LOG_WARNING(log, "[region " << info.region_id << "] is not found in KVStore, try again"); + throwRetryRegion(regions_info); + } + kvstore_region.emplace(info.region_id, std::move(region)); + } + // make sure regions are not duplicated. + if (unlikely(kvstore_region.size() != regions_info.size())) + throw Exception("Duplicate region id", ErrorCodes::LOGICAL_ERROR); + + auto start_time = Clock::now(); + /// Blocking learner read. Note that learner read must be performed ahead of data read, + /// otherwise the desired index will be blocked by the lock of data read. + for (auto && [region_id, region] : kvstore_region) + { + (void)region_id; + region->waitIndex(region->learnerRead()); + } + auto end_time = Clock::now(); + LOG_DEBUG(log, + "[Learner Read] wait index cost " << std::chrono::duration_cast(end_time - start_time).count() << " ms"); + + // After raft index is satisfied, we flush region to StorageDeltaMerge so that we can read all data + start_time = Clock::now(); + std::set regions_flushing_in_bg_threads; + auto & region_table = tmt.getRegionTable(); + for (auto && [region_id, region] : kvstore_region) + { + (void)region; + bool is_flushed = region_table.tryFlushRegion(region_id, table_id, false); + // If region is flushing by other bg threads, we should mark those regions to wait. + if (!is_flushed) + { + regions_flushing_in_bg_threads.insert(region_id); + LOG_DEBUG(log, "[Learner Read] region " << region_id << " is flushing by other thread."); + } + } + end_time = Clock::now(); + LOG_DEBUG(log, + "[Learner Read] flush " << kvstore_region.size() - regions_flushing_in_bg_threads.size() << " regions of " << kvstore_region.size() + << " to StorageDeltaMerge cost " + << std::chrono::duration_cast(end_time - start_time).count() << " ms"); + + // Maybe there is some data not flush to store yet, we should wait till all regions is flushed. + if (!regions_flushing_in_bg_threads.empty()) + { + start_time = Clock::now(); + for (const auto & region_id : regions_flushing_in_bg_threads) + { + region_table.waitTillRegionFlushed(region_id); + } + end_time = Clock::now(); + LOG_DEBUG(log, + "[Learner Read] wait bg flush " << regions_flushing_in_bg_threads.size() << " regions to StorageDeltaMerge cost " + << std::chrono::duration_cast(end_time - start_time).count() + << " ms"); + } +} + +} // namespace + BlockInputStreams StorageDeltaMerge::read( // const Names & column_names, const SelectQueryInfo & query_info, @@ -279,11 +386,30 @@ BlockInputStreams StorageDeltaMerge::read( // return store->readRaw(context, context.getSettingsRef(), to_read, num_streams); else { - // read with specify tso - UInt64 max_version = DEFAULT_MAX_READ_TSO; - if (query_info.mvcc_query_info) - max_version = query_info.mvcc_query_info->read_tso; - return store->read(context, context.getSettingsRef(), to_read, ranges, num_streams, max_version, max_block_size); + if (unlikely(!query_info.mvcc_query_info)) + throw Exception("mvcc query info is null", ErrorCodes::LOGICAL_ERROR); + + TMTContext & tmt = context.getTMTContext(); + if (unlikely(!tmt.isInitialized())) + throw Exception("TMTContext is not initialized", ErrorCodes::LOGICAL_ERROR); + + const auto & mvcc_query_info = *query_info.mvcc_query_info; + // Read with specify tso, check if tso is smaller than TiDB GcSafePoint + const auto safe_point = tmt.getPDClient()->getGCSafePoint(); + if (mvcc_query_info.read_tso < safe_point) + throw Exception("query id: " + context.getCurrentQueryId() + ", read tso: " + toString(mvcc_query_info.read_tso) + + " is smaller than tidb gc safe point: " + toString(safe_point), + ErrorCodes::LOGICAL_ERROR); + + // With `no_kvstore` is true, we do not do learner read + if (likely(!select_query.no_kvstore)) + { + /// Learner read. + doLearnerRead(tidb_table_info.id, mvcc_query_info.regions_query_info, tmt, log); + } + + return store->read( + context, context.getSettingsRef(), to_read, ranges, num_streams, /*max_version=*/mvcc_query_info.read_tso, max_block_size); } } diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 8d86a0d4d73..029f50c8b6b 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -361,7 +361,7 @@ void RegionTable::tryFlushRegion(RegionID region_id) tryFlushRegion(region_id, table_id, false); } -void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist) +bool RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist) { const auto func_update_region = [&](std::function && callback) -> bool { std::lock_guard lock(mutex); @@ -397,7 +397,7 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const boo }); if (!status) - return; + return false; std::exception_ptr first_exception; @@ -420,7 +420,7 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const boo if (cache_bytes) dirty_regions.insert(region_id); else - dirty_regions.erase(region_id); + clearDirtyFlag(region_id); internal_region.last_flush_time = Clock::now(); return true; @@ -428,6 +428,8 @@ void RegionTable::tryFlushRegion(RegionID region_id, TableID table_id, const boo if (first_exception) std::rethrow_exception(first_exception); + + return true; } bool RegionTable::tryFlushRegions() @@ -466,7 +468,7 @@ bool RegionTable::tryFlushRegions() if (shouldFlush(region)) { to_flush = DataToFlush{table_id, region.region_id, true}; - dirty_regions.erase(region.region_id); + clearDirtyFlag(region.region_id); return true; } return false; @@ -481,6 +483,19 @@ bool RegionTable::tryFlushRegions() return true; } +void RegionTable::clearDirtyFlag(RegionID region_id) +{ + std::lock_guard lock(dirty_regions_mutex); + dirty_regions.erase(region_id); + dirty_regions_cv.notify_all(); +} + +void RegionTable::waitTillRegionFlushed(const RegionID region_id) +{ + std::unique_lock lock(dirty_regions_mutex); + dirty_regions_cv.wait(lock, [this, region_id]{ return dirty_regions.count(region_id) == 0;}); +} + void RegionTable::handleInternalRegionsByTable(const TableID table_id, std::function && callback) const { std::lock_guard lock(mutex); diff --git a/dbms/src/Storages/Transaction/RegionTable.h b/dbms/src/Storages/Transaction/RegionTable.h index 1cf5722b661..58ee9a52d2c 100644 --- a/dbms/src/Storages/Transaction/RegionTable.h +++ b/dbms/src/Storages/Transaction/RegionTable.h @@ -1,6 +1,8 @@ #pragma once +#include #include +#include #include #include @@ -120,6 +122,8 @@ class RegionTable : private boost::noncopyable TableMap tables; RegionInfoMap regions; std::unordered_set dirty_regions; + std::mutex dirty_regions_mutex; + std::condition_variable dirty_regions_cv; std::unordered_set table_to_clean; FlushThresholds flush_thresholds; @@ -143,6 +147,8 @@ class RegionTable : private boost::noncopyable void doShrinkRegionRange(const Region & region); void doUpdateRegion(const Region & region, TableID table_id); + void clearDirtyFlag(RegionID region_id); + public: RegionTable(Context & context_); void restore(); @@ -173,7 +179,9 @@ class RegionTable : private boost::noncopyable bool tryFlushRegions(); void tryFlushRegion(RegionID region_id); - void tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist); + bool tryFlushRegion(RegionID region_id, TableID table_id, const bool try_persist); + + void waitTillRegionFlushed(RegionID region_id); void handleInternalRegionsByTable(const TableID table_id, std::function && callback) const; std::vector> getRegionsByTable(const TableID table_id) const; diff --git a/dbms/src/test_utils/TiflashTestBasic.h b/dbms/src/test_utils/TiflashTestBasic.h index be4d77a5c46..8467c2347e8 100644 --- a/dbms/src/test_utils/TiflashTestBasic.h +++ b/dbms/src/test_utils/TiflashTestBasic.h @@ -1,8 +1,10 @@ #pragma once -#include #include +#include +#include + namespace DB { namespace tests @@ -20,7 +22,8 @@ class TiFlashTestEnv } catch (Exception & e) { - context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", "./__tmp_data/regmap", TiDB::StorageEngine::TMT); + context.createTMTContext({}, "", "", {"default"}, "./__tmp_data/kvstore", TiDB::StorageEngine::TMT); + context.getTMTContext().restore(); } context.getSettingsRef() = settings; return context; diff --git a/tests/delta-merge-test/raft/read_with_specify_tso.test b/tests/delta-merge-test/raft/read_with_specify_tso.test index e099c4c9320..e88384315b9 100644 --- a/tests/delta-merge-test/raft/read_with_specify_tso.test +++ b/tests/delta-merge-test/raft/read_with_specify_tso.test @@ -17,9 +17,9 @@ => DBGInvoke __region_snapshot_data( default, test_dm, 4, 0, 1000, - 2, 0, 0, 11, - 1, 3, 0, 13, - 3, 4, 0, 0 + 2, 20000000, 0, 11, + 1, 20000003, 0, 13, + 3, 20000004, 0, 0 ) => DBGInvoke __try_flush_region(4) => select * from default.test_dm @@ -31,11 +31,11 @@ ## raft_insert_row_full(db_name, tbl_name, region_id, handle_id, tso, del, val1, val2, ...) ## upset rowid==3 with col_1 == 10086 -=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 5, 0, 10086) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 3, 20000005, 0, 10086) ## del rowid == 2 -=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 5, 1, 0) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 2, 20000005, 1, 0) ## insert rowid==4 -=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 4, 5, 0, 1234) +=> DBGInvoke __raft_insert_row_full(default, test_dm, 4, 4, 20000005, 0, 1234) => DBGInvoke __try_flush_region(4) => select (*) from default.test_dm ┌─col_1─┬─_tidb_rowid─┐ @@ -45,32 +45,32 @@ └───────┴─────────────┘ ## read with specify tso -=> select * from default.test_dm " --read_tso "0 +=> select * from default.test_dm " --read_tso "20000000 ┌─col_1─┬─_tidb_rowid─┐ │ 11 │ 2 │ └───────┴─────────────┘ -=> select * from default.test_dm " --read_tso "3 +=> select * from default.test_dm " --read_tso "20000003 ┌─col_1─┬─_tidb_rowid─┐ │ 13 │ 1 │ │ 11 │ 2 │ └───────┴─────────────┘ -=> select * from default.test_dm " --read_tso "4 +=> select * from default.test_dm " --read_tso "20000004 ┌─col_1─┬─_tidb_rowid─┐ │ 13 │ 1 │ │ 11 │ 2 │ │ 0 │ 3 │ └───────┴─────────────┘ -=> select * from default.test_dm " --read_tso "5 +=> select * from default.test_dm " --read_tso "20000005 ┌─col_1─┬─_tidb_rowid─┐ │ 13 │ 1 │ │ 10086 │ 3 │ │ 1234 │ 4 │ └───────┴─────────────┘ -=> select * from default.test_dm " --read_tso "100000 +=> select * from default.test_dm " --read_tso "90000000 ┌─col_1─┬─_tidb_rowid─┐ │ 13 │ 1 │ │ 10086 │ 3 │