Skip to content

Commit

Permalink
Shutdown the LocalIndexScheduler before shutting down PageStorage/Del…
Browse files Browse the repository at this point in the history
…taMergeStore

Signed-off-by: JaySon-Huang <tshent@qq.com>
  • Loading branch information
JaySon-Huang committed Dec 13, 2024
1 parent c2c041c commit e69d4dd
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 37 deletions.
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ struct ContextShared
return;
shutdown_called = true;

// The local index scheduler must be shutdown to stop all
// running tasks before shutting down `global_storage_pool`.
if (global_local_indexer_scheduler)
{
global_local_indexer_scheduler->shutdown();
}

if (global_storage_pool)
{
// shutdown the gc task of global storage pool before
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ LocalIndexerScheduler::LocalIndexerScheduler(const Options & options)
start();
}

LocalIndexerScheduler::~LocalIndexerScheduler()
void LocalIndexerScheduler::shutdown()
{
LOG_INFO(logger, "LocalIndexerScheduler is destroying. Waiting scheduler and tasks to finish...");
LOG_INFO(logger, "LocalIndexerScheduler is shutting down. Waiting scheduler and tasks to finish...");

// First quit the scheduler. Don't schedule more tasks.
is_shutting_down = true;
Expand All @@ -81,7 +81,15 @@ LocalIndexerScheduler::~LocalIndexerScheduler()

// Then wait all running tasks to finish.
pool.reset();
LOG_INFO(logger, "LocalIndexerScheduler is shutdown.");
}

LocalIndexerScheduler::~LocalIndexerScheduler()
{
if (!is_shutting_down)
{
shutdown();
}
LOG_INFO(logger, "LocalIndexerScheduler is destroyed");
}

Expand Down
43 changes: 24 additions & 19 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,6 @@ class LocalIndexerScheduler
bool auto_start = true;
};

private:
struct InternalTask
{
const Task user_task;
Stopwatch created_at{};
Stopwatch scheduled_at{};
};

using InternalTaskPtr = std::shared_ptr<InternalTask>;

public:
static LocalIndexerSchedulerPtr create(const Options & options)
{
Expand All @@ -94,16 +84,16 @@ class LocalIndexerScheduler
~LocalIndexerScheduler();

/**
* @brief Start the scheduler. In some tests we need to start scheduler
* after some tasks are pushed.
* @brief Stop the scheduler and wait for running tasks to finish.
* Note that this method won't clear the task pushed.
*/
void start();
void shutdown();

/**
* @brief Blocks until there is no tasks remaining in the queue and there is no running tasks.
* Should be only used in tests.
* @brief Start the scheduler. In some tests we need to start scheduler
* after some tasks are pushed.
*/
void waitForFinish();
void start();

/**
* @brief Push a task to the pool. The task may not be scheduled immediately.
Expand All @@ -114,10 +104,25 @@ class LocalIndexerScheduler

/**
* @brief Drop all tasks matching specified keyspace id and table id.
* Note that this method won't drop the running tasks.
*/
size_t dropTasks(KeyspaceID keyspace_id, TableID table_id);

/**
* @brief Blocks until there is no tasks remaining in the queue and there is no running tasks.
* **Should be only used in tests**.
*/
void waitForFinish();

private:
struct InternalTask
{
const Task user_task;
Stopwatch created_at{};
Stopwatch scheduled_at{};
};
using InternalTaskPtr = std::shared_ptr<InternalTask>;

struct FileIDHasher
{
std::size_t operator()(const FileID & id) const
Expand Down Expand Up @@ -147,9 +152,6 @@ class LocalIndexerScheduler
void moveBackReadyTasks(std::unique_lock<std::mutex> & lock);

private:
bool is_started = false;
std::thread scheduler_thread;

/// Try to add a task to the pool. Returns false if the pool is full
/// (for example, reaches concurrent task limit or memory limit).
/// When pool is full, we will not try to schedule any more tasks at this moment.
Expand All @@ -160,6 +162,9 @@ class LocalIndexerScheduler
/// heavy pressure.
bool tryAddTaskToPool(std::unique_lock<std::mutex> & lock, const InternalTaskPtr & task);

std::thread scheduler_thread;
bool is_started = false;

KeyspaceID last_schedule_keyspace_id = 0;
std::map<KeyspaceID, TableID> last_schedule_table_id_by_ks;

Expand Down
40 changes: 25 additions & 15 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,24 +417,34 @@ SegmentPtr Segment::restoreSegment( //
DMContext & context,
PageIdU64 segment_id)
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

ReadBufferFromMemory buf(page.data.begin(), page.data.size());
Segment::SegmentMetaInfo segment_info;
readSegmentMetaInfo(buf, segment_info);
try
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
readSegmentMetaInfo(buf, segment_info);

return segment;
auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);

return segment;
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id));
e.rethrow();
}
RUNTIME_CHECK_MSG(false, "unreachable");
return {};
}

Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Checksum.h>
#include <IO/Encryption/MockKeyManager.h>
#include <IO/FileProvider/FileProvider.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -871,7 +872,7 @@ class PageStorageControlV3
ChecksumClass digest;
digest.update(buffer, size);
auto checksum = digest.checksum();
fmt::print("checksum: 0x{:X}\n", checksum);
fmt::println("checksum: 0x{:X}", checksum);

auto hex_str = Redact::keyToHexString(buffer, size);
delete[] buffer;
Expand Down

0 comments on commit e69d4dd

Please sign in to comment.