Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Shutdown the LocalIndexScheduler before shutting down PageStorage/DeltaMergeStore #9712

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,13 @@ struct ContextShared
return;
shutdown_called = true;

// The local index scheduler must be shutdown to stop all
// running tasks before shutting down `global_storage_pool`.
if (global_local_indexer_scheduler)
{
global_local_indexer_scheduler->shutdown();
}

if (global_storage_pool)
{
// shutdown the gc task of global storage pool before
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/executeQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ std::tuple<ASTPtr, BlockIO> executeQueryImpl(

if (elem.read_rows != 0)
{
LOG_INFO(
LOG_DEBUG(
execute_query_logger,
"Read {} rows, {} in {:.3f} sec., {} rows/sec., {}/sec.",
elem.read_rows,
Expand Down Expand Up @@ -421,7 +421,7 @@ void logQueryPipeline(const LoggerPtr & logger, const BlockInputStreamPtr & in)
in->dumpTree(log_buffer);
return log_buffer.toString();
};
LOG_INFO(logger, pipeline_log_str());
LOG_DEBUG(logger, pipeline_log_str());
}

BlockIO executeQuery(const String & query, Context & context, bool internal, QueryProcessingStage::Enum stage)
Expand Down
17 changes: 14 additions & 3 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ LocalIndexerScheduler::LocalIndexerScheduler(const Options & options)
start();
}

LocalIndexerScheduler::~LocalIndexerScheduler()
void LocalIndexerScheduler::shutdown()
{
LOG_INFO(logger, "LocalIndexerScheduler is destroying. Waiting scheduler and tasks to finish...");
LOG_INFO(logger, "LocalIndexerScheduler is shutting down. Waiting scheduler and tasks to finish...");

// First quit the scheduler. Don't schedule more tasks.
is_shutting_down = true;
Expand All @@ -81,7 +81,15 @@ LocalIndexerScheduler::~LocalIndexerScheduler()

// Then wait all running tasks to finish.
pool.reset();
LOG_INFO(logger, "LocalIndexerScheduler is shutdown.");
}

LocalIndexerScheduler::~LocalIndexerScheduler()
{
if (!is_shutting_down)
{
shutdown();
}
LOG_INFO(logger, "LocalIndexerScheduler is destroyed");
}

Expand Down Expand Up @@ -295,7 +303,10 @@ bool LocalIndexerScheduler::tryAddTaskToPool(std::unique_lock<std::mutex> & lock
}
};

RUNTIME_CHECK(pool);
if (is_shutting_down || !pool)
// shutting down, retry again
return false;

if (!pool->trySchedule(real_job))
// Concurrent task limit reached
return false;
Expand Down
15 changes: 11 additions & 4 deletions dbms/src/Storages/DeltaMerge/LocalIndexerScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ class LocalIndexerScheduler

~LocalIndexerScheduler();

/**
* @brief Stop the scheduler and wait for running tasks to finish.
* Note that this method won't clear the task pushed.
*/
void shutdown();

/**
* @brief Start the scheduler. In some tests we need to start scheduler
* after some tasks are pushed.
Expand All @@ -101,7 +107,7 @@ class LocalIndexerScheduler

/**
* @brief Blocks until there is no tasks remaining in the queue and there is no running tasks.
* Should be only used in tests.
* **Should be only used in tests**.
*/
void waitForFinish();

Expand All @@ -114,6 +120,7 @@ class LocalIndexerScheduler

/**
* @brief Drop all tasks matching specified keyspace id and table id.
* Note that this method won't drop the running tasks.
*/
size_t dropTasks(KeyspaceID keyspace_id, TableID table_id);

Expand Down Expand Up @@ -147,9 +154,6 @@ class LocalIndexerScheduler
void moveBackReadyTasks(std::unique_lock<std::mutex> & lock);

private:
bool is_started = false;
std::thread scheduler_thread;

/// Try to add a task to the pool. Returns false if the pool is full
/// (for example, reaches concurrent task limit or memory limit).
/// When pool is full, we will not try to schedule any more tasks at this moment.
Expand All @@ -160,6 +164,9 @@ class LocalIndexerScheduler
/// heavy pressure.
bool tryAddTaskToPool(std::unique_lock<std::mutex> & lock, const InternalTaskPtr & task);

std::thread scheduler_thread;
bool is_started = false;

KeyspaceID last_schedule_keyspace_id = 0;
std::map<KeyspaceID, TableID> last_schedule_table_id_by_ks;

Expand Down
40 changes: 25 additions & 15 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,24 +417,34 @@ SegmentPtr Segment::restoreSegment( //
DMContext & context,
PageIdU64 segment_id)
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

ReadBufferFromMemory buf(page.data.begin(), page.data.size());
Segment::SegmentMetaInfo segment_info;
readSegmentMetaInfo(buf, segment_info);
try
{
Page page = context.storage_pool->metaReader()->read(segment_id); // not limit restore

auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);
ReadBufferFromMemory buf(page.data.begin(), page.data.size());
readSegmentMetaInfo(buf, segment_info);

return segment;
auto delta = DeltaValueSpace::restore(context, segment_info.range, segment_info.delta_id);
auto stable = StableValueSpace::restore(context, segment_info.stable_id);
auto segment = std::make_shared<Segment>(
parent_log,
segment_info.epoch,
segment_info.range,
segment_id,
segment_info.next_segment_id,
delta,
stable);

return segment;
}
catch (DB::Exception & e)
{
e.addMessage(fmt::format("while restoreSegment, segment_id={}", segment_id));
e.rethrow();
}
RUNTIME_CHECK_MSG(false, "unreachable");
return {};
}

Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( //
Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ class Segment
PageIdU64 next_segment_id{};
PageIdU64 delta_id{};
PageIdU64 stable_id{};

String toString() const
{
return fmt::format(
"{{version={} epoch={} range={} segment_id={} next_segment_id={} delta_id={} stable_id={}}}",
version,
epoch,
range.toString(),
segment_id,
next_segment_id,
delta_id,
stable_id);
}
};

using SegmentMetaInfos = std::vector<SegmentMetaInfo>;
Expand Down
13 changes: 3 additions & 10 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ namespace DB::ErrorCodes
extern const int DT_DELTA_INDEX_ERROR;
}

namespace DB
{
namespace DM
{
namespace GC
namespace DB::DM::GC
{
bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(
const DMContext & context, //
Expand All @@ -63,7 +59,7 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(
double invalid_data_ratio_threshold,
const LoggerPtr & log);
}
namespace tests
namespace DB::DM::tests
{

class SegmentOperationTest : public SegmentTestBasic
Expand Down Expand Up @@ -1366,7 +1362,4 @@ try
}
CATCH


} // namespace tests
} // namespace DM
} // namespace DB
} // namespace DB::DM::tests
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Checksum.h>
#include <IO/Encryption/MockKeyManager.h>
#include <IO/FileProvider/FileProvider.h>
#include <Interpreters/Context.h>
Expand Down Expand Up @@ -871,7 +872,7 @@ class PageStorageControlV3
ChecksumClass digest;
digest.update(buffer, size);
auto checksum = digest.checksum();
fmt::print("checksum: 0x{:X}\n", checksum);
fmt::println("checksum: 0x{:X}", checksum);

auto hex_str = Redact::keyToHexString(buffer, size);
delete[] buffer;
Expand Down