Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not mark spill if there is no data to spill in auto spill mode (#8906) #8912

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dbms/src/Interpreters/AggSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ bool AggSpillContext::updatePerThreadRevocableMemory(Int64 new_value, size_t thr
if (!in_spillable_stage || !isSpillEnabled())
return false;
per_thread_revocable_memories[thread_num] = new_value;
if (new_value == 0)
// new_value == 0 means no agg data to spill
return false;
if (auto_spill_mode)
{
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
Expand Down Expand Up @@ -97,6 +100,8 @@ Int64 AggSpillContext::triggerSpillImpl(Int64 expected_released_memories)
for (; checked_thread < per_thread_revocable_memories.size(); ++checked_thread)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
if (per_thread_revocable_memories[checked_thread] < MIN_SPILL_THRESHOLD)
continue;
if (per_thread_auto_spill_status[checked_thread].compare_exchange_strong(
old_value,
AutoSpillStatus::NEED_AUTO_SPILL))
Expand Down Expand Up @@ -146,7 +151,7 @@ void AggSpillContext::finishOneSpill(size_t thread_num)

bool AggSpillContext::markThreadForAutoSpill(size_t thread_num)
{
if (in_spillable_stage && isSpillEnabled())
if (in_spillable_stage && isSpillEnabled() && per_thread_revocable_memories[thread_num] > 0)
{
auto old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
return per_thread_auto_spill_status[thread_num].compare_exchange_strong(
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ bool Aggregator::executeOnBlock(AggProcessInfo & agg_process_info, AggregatedDat
LOG_TRACE(log, "Revocable bytes after insert one block {}, thread {}", revocable_bytes, thread_num);
if (agg_spill_context->updatePerThreadRevocableMemory(revocable_bytes, thread_num))
{
assert(!result.empty());
result.tryMarkNeedSpill();
}

Expand Down Expand Up @@ -1331,7 +1332,7 @@ inline void Aggregator::insertAggregatesIntoColumns(
for (size_t destroy_i = 0; destroy_i < params.aggregates_size; ++destroy_i)
{
/// If ownership was not transferred to ColumnAggregateFunction.
if (!(destroy_i < insert_i && aggregate_functions[destroy_i]->isState()))
if (destroy_i >= insert_i || !aggregate_functions[destroy_i]->isState())
aggregate_functions[destroy_i]->destroy(mapped + offsets_of_aggregate_states[destroy_i]);
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/HashJoinSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ bool HashJoinSpillContext::updatePartitionRevocableMemory(size_t partition_id, I
return false;
bool is_spilled = (*partition_is_spilled)[partition_id];
(*partition_revocable_memories)[partition_id] = new_value;
if (new_value == 0)
return false;
if (operator_spill_threshold > 0)
{
auto force_spill = is_spilled && operator_spill_threshold > 0
Expand Down Expand Up @@ -244,7 +246,7 @@ Int64 HashJoinSpillContext::triggerSpillImpl(Int64 expected_released_memories)
});
for (const auto & pair : partition_index_to_revocable_memories)
{
if (pair.second.second <= 0)
if (pair.second.second < MIN_SPILL_THRESHOLD)
continue;
if (!in_build_stage && !isPartitionSpilled(pair.first))
/// no new partition spill is allowed if not in build stage
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Interpreters/SortSpillContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ bool SortSpillContext::updateRevocableMemory(Int64 new_value)
if (!in_spillable_stage || !isSpillEnabled())
return false;
revocable_memory = new_value;
if (new_value == 0)
return false;
if (auto_spill_mode)
{
AutoSpillStatus old_value = AutoSpillStatus::NEED_AUTO_SPILL;
Expand Down Expand Up @@ -69,9 +71,12 @@ bool SortSpillContext::updateRevocableMemory(Int64 new_value)

Int64 SortSpillContext::triggerSpillImpl(DB::Int64 expected_released_memories)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
auto_spill_status.compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL);
expected_released_memories = std::max(expected_released_memories - revocable_memory, 0);
if (revocable_memory >= MIN_SPILL_THRESHOLD)
{
AutoSpillStatus old_value = AutoSpillStatus::NO_NEED_AUTO_SPILL;
auto_spill_status.compare_exchange_strong(old_value, AutoSpillStatus::NEED_AUTO_SPILL);
expected_released_memories = std::max(expected_released_memories - revocable_memory, 0);
}
return expected_released_memories;
}

Expand Down
32 changes: 32 additions & 0 deletions dbms/src/Interpreters/tests/gtest_operator_spill_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,38 @@ try
}
CATCH

TEST_F(TestOperatorSpillContext, AutoTriggerSpillOnEmptyData)
try
{
auto agg_spill_context = std::make_shared<AggSpillContext>(2, *spill_config_ptr, 0, logger);
agg_spill_context->setAutoSpillMode();
agg_spill_context->updatePerThreadRevocableMemory(OperatorSpillContext::MIN_SPILL_THRESHOLD, 0);
agg_spill_context->updatePerThreadRevocableMemory(OperatorSpillContext::MIN_SPILL_THRESHOLD, 1);
ASSERT_TRUE(agg_spill_context->triggerSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD) == 0);
ASSERT_TRUE(agg_spill_context->isThreadMarkedForAutoSpill(0));
ASSERT_FALSE(agg_spill_context->isThreadMarkedForAutoSpill(1));

auto sort_spill_context = std::make_shared<SortSpillContext>(*spill_config_ptr, 0, logger);
sort_spill_context->setAutoSpillMode();
ASSERT_FALSE(sort_spill_context->updateRevocableMemory(0));
ASSERT_TRUE(
sort_spill_context->triggerSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD)
== OperatorSpillContext::MIN_SPILL_THRESHOLD);

auto join_spill_context = std::make_shared<HashJoinSpillContext>(*spill_config_ptr, *spill_config_ptr, 0, logger);
join_spill_context->setAutoSpillMode();
join_spill_context->init(2);
ASSERT_TRUE(
join_spill_context->updatePartitionRevocableMemory(0, OperatorSpillContext::MIN_SPILL_THRESHOLD) == false);
ASSERT_TRUE(join_spill_context->updatePartitionRevocableMemory(1, 0) == false);
ASSERT_TRUE(
join_spill_context->triggerSpill(OperatorSpillContext::MIN_SPILL_THRESHOLD * 2)
== OperatorSpillContext::MIN_SPILL_THRESHOLD);
ASSERT_TRUE(join_spill_context->isPartitionMarkedForAutoSpill(0));
ASSERT_FALSE(join_spill_context->isPartitionMarkedForAutoSpill(1));
}
CATCH

TEST_F(TestOperatorSpillContext, SortMarkSpill)
try
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Operators/AggregateContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ bool AggregateContext::isTaskMarkedForSpill(size_t task_index)
return true;
if (getAggSpillContext()->updatePerThreadRevocableMemory(many_data[task_index]->revocableBytes(), task_index))
{
assert(!many_data[task_index]->empty());
return many_data[task_index]->tryMarkNeedSpill();
}
return false;
Expand Down