Skip to content

Commit

Permalink
Storages: disaggregated mode supports storage read thread. (#8272)
Browse files Browse the repository at this point in the history
ref #6834
  • Loading branch information
JinheLin authored Nov 4, 2023
1 parent 489efc6 commit cfe64ce
Show file tree
Hide file tree
Showing 21 changed files with 1,137 additions and 629 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ extern const int S3_ERROR = 11000;
extern const int CANNOT_SCHEDULE_TASK = 11001;
extern const int S3_LOCK_CONFLICT = 11002;
extern const int DT_DELTA_INDEX_ERROR = 11003;
extern const int FETCH_PAGES_ERROR = 11004;
} // namespace ErrorCodes

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ namespace DB
F(type_worker_fetch_page, {{"type", "worker_fetch_page"}}, ExpBuckets{0.01, 2, 20}), \
F(type_worker_prepare_stream, {{"type", "worker_prepare_stream"}}, ExpBuckets{0.01, 2, 20}), \
F(type_stream_wait_next_task, {{"type", "stream_wait_next_task"}}, ExpBuckets{0.01, 2, 20}), \
F(type_stream_read, {{"type", "stream_read"}}, ExpBuckets{0.01, 2, 20})) \
F(type_stream_read, {{"type", "stream_read"}}, ExpBuckets{0.01, 2, 20}), \
F(type_deserialize_page, {{"type", "deserialize_page"}}, ExpBuckets{0.01, 2, 20})) \
M(tiflash_disaggregated_details, \
"", \
Counter, \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class ColumnFilePersistedSet
/// Thread safe part end
String detailInfo() const { return columnFilesToString(persisted_files); }

const ColumnFilePersisteds & getFiles() const { return persisted_files; }

void saveMeta(WriteBatches & wbs) const;

void recordRemoveColumnFilesPages(WriteBatches & wbs) const;
Expand Down
15 changes: 9 additions & 6 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
#include <Storages/Page/PageDefinesBase.h>


namespace DB
namespace DB::DM
{
namespace DM
namespace tests
{
class SegmentReadTaskTest;
}

using GenPageId = std::function<PageIdU64()>;
class DeltaValueSpace;
class DeltaValueSnapshot;
Expand Down Expand Up @@ -136,9 +139,10 @@ class DeltaValueSpace
persisted_file_set->resetLogger(segment_log);
}

/// The following two methods are just for test purposes
/// The following 3 methods are just for test purposes
MemTableSetPtr getMemTableSet() const { return mem_table_set; }
ColumnFilePersistedSetPtr getPersistedFileSet() const { return persisted_file_set; }
UInt64 getDeltaIndexEpoch() const { return delta_index_epoch; }

String simpleInfo() const { return "<delta_id=" + DB::toString(persisted_file_set->getId()) + ">"; }
String info() const { return fmt::format("{}. {}", mem_table_set->info(), persisted_file_set->info()); }
Expand Down Expand Up @@ -395,7 +399,7 @@ class DeltaValueSnapshot

RowKeyRange getSquashDeleteRange() const;

const auto & getSharedDeltaIndex() { return shared_delta_index; }
const auto & getSharedDeltaIndex() const { return shared_delta_index; }
size_t getDeltaIndexEpoch() const { return delta_index_epoch; }

bool isForUpdate() const { return is_update; }
Expand Down Expand Up @@ -548,5 +552,4 @@ class DeltaValueInputStream : public SkippableBlockInputStream
}
};

} // namespace DM
} // namespace DB
} // namespace DB::DM
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ class MemTableSet
: public std::enable_shared_from_this<MemTableSet>
, private boost::noncopyable
{
#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
// Note that we must update `column_files_count` for outer thread-safe after `column_files` changed
ColumnFiles column_files;
// TODO: check the proper memory_order when use this atomic variable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
auto block_slots = pool->getFreeBlockSlots();
LOG_DEBUG(
log,
"Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={}ns do_add_cost={}ns", //
"Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", //
pool->pool_id,
block_slots,
tasks.size(),
read_pools.size(),
sw_add.elapsed(),
sw_do_add.elapsed());
sw_add.elapsed() / 1000.0,
sw_do_add.elapsed() / 1000.0);
}

std::pair<MergedTaskPtr, bool> SegmentReadTaskScheduler::scheduleMergedTask()
Expand Down
48 changes: 48 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h>

namespace DB::DM::Remote
{

IPreparedDMFileTokenPtr DataStoreMock::prepareDMFileByKey(const String & remote_key)
{
return std::make_shared<MockPreparedDMFileToken>(file_provider, remote_key);
}

static std::tuple<String, UInt64> parseDMFilePath(const String & path)
{
// Path likes /disk1/data/t_100/stable/dmf_2.
auto pos = path.find_last_of('_');
RUNTIME_CHECK(pos != std::string::npos, path);
auto file_id = stoul(path.substr(pos + 1));

pos = path.rfind("/dmf_");
RUNTIME_CHECK(pos != std::string::npos, path);
auto parent_path = path.substr(0, pos);
return std::tuple<String, UInt64>{parent_path, file_id};
}

DMFilePtr MockPreparedDMFileToken::restore(DMFile::ReadMetaMode read_mode)
{
auto [parent_path, file_id] = parseDMFilePath(path);
return DMFile::restore(
file_provider,
file_id,
/*page_id*/ 0,
parent_path,
read_mode);
}
} // namespace DB::DM::Remote
78 changes: 78 additions & 0 deletions dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreMock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Encryption/FileProvider.h>
#include <Storages/DeltaMerge/Remote/DataStore/DataStore.h>

namespace DB::DM::Remote
{
class DataStoreMock final : public IDataStore
{
public:
explicit DataStoreMock(FileProviderPtr file_provider_)
: file_provider(file_provider_)
{}

~DataStoreMock() override = default;

void putDMFile(DMFilePtr, const S3::DMFileOID &, bool) override
{
throw Exception("DataStoreMock::putDMFile unsupported");
}

IPreparedDMFileTokenPtr prepareDMFile(const S3::DMFileOID &, UInt64) override
{
throw Exception("DataStoreMock::prepareDMFile unsupported");
}

IPreparedDMFileTokenPtr prepareDMFileByKey(const String & remote_key) override;

bool putCheckpointFiles(const PS::V3::LocalCheckpointFiles &, StoreID, UInt64) override
{
throw Exception("DataStoreMock::putCheckpointFiles unsupported");
}

std::unordered_map<String, DataFileInfo> getDataFilesInfo(const std::unordered_set<String> &) override
{
throw Exception("DataStoreMock::getDataFilesInfo unsupported");
}

void setTaggingsForKeys(const std::vector<String> &, std::string_view) override
{
throw Exception("DataStoreMock::setTaggingsForKeys unsupported");
}

private:
FileProviderPtr file_provider;
};

class MockPreparedDMFileToken : public IPreparedDMFileToken
{
public:
MockPreparedDMFileToken(const FileProviderPtr & file_provider_, const String & path_)
: IPreparedDMFileToken::IPreparedDMFileToken(file_provider_, {}, 0)
, path(path_)
{}

~MockPreparedDMFileToken() override = default;

DMFilePtr restore(DMFile::ReadMetaMode read_mode) override;

private:
String path;
};

} // namespace DB::DM::Remote
Loading

0 comments on commit cfe64ce

Please sign in to comment.