From e9a484bd5a4cf0e7fe8d4e608b3fd80ff221caad Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sun, 16 Jun 2024 20:28:36 +0800 Subject: [PATCH] Fix sparse expression parsing memory leak (#1338) ### What problem does this PR solve? ``` ================================================================= ==58327==ERROR: LeakSanitizer: detected memory leaks Direct leak of 576 byte(s) in 24 object(s) allocated from: #0 0x55be02292f41 in operator new(unsigned long) (/infinity/cmake-build-debug/src/infinity+0x707f41) (BuildId: a8a47e2552e9916f1f7d2b289e095233935d70f4) #1 0x55be051ac8d0 in sqlparse(void*, infinity::ParserResult*) /infinity/cmake-build-debug/parser.y:2942:39 #2 0x55be050f73b8 in infinity::SQLParser::Parse(std::__cxx11::basic_string, std::allocator> const&, infinity::ParserResult*) /infinity/src/parser/sql_parser.cpp:39:9 #3 0x55be02a13141 in infinity::QueryContext@query_context::Query(std::__cxx11::basic_string, std::allocator> const&) /infinity/src/main/query_context.cpp:90:14 #4 0x55be02454cbb in infinity::Connection@connection::HandlerSimpleQuery(infinity::QueryContext@query_context*) /infinity/src/network/connection.cpp:155:41 #5 0x55be024541b3 in infinity::Connection@connection::HandleRequest() /infinity/src/network/connection.cpp:131:13 #6 0x55be0245219b in infinity::Connection@connection::Run() /infinity/src/network/connection.cpp:70:13 #7 0x55be0284b0ac in infinity::PGServer@pg_server::StartConnection(std::shared_ptr&)::$_0::operator()() /infinity/src/network/pg_server.cpp:81:25 #8 0x55be0284b004 in void std::__invoke_impl&)::$_0>(std::__invoke_other, infinity::PGServer@pg_server::StartConnection(std::shared_ptr&)::$_0&&) /usr/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61:14 #9 0x55be0284afc4 in std::__invoke_result&)::$_0>::type std::__invoke&)::$_0>(infinity::PGServer@pg_server::StartConnection(std::shared_ptr&)::$_0&&) /usr/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:96:14 #10 0x55be0284af9c in void std::thread::_Invoker&)::$_0>>::_M_invoke<0ul>(std::_Index_tuple<0ul>) /usr/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_thread.h:292:13 #11 0x55be0284af74 in std::thread::_Invoker&)::$_0>>::operator()() /usr/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_thread.h:299:11 #12 0x55be0284ade8 in std::thread::_State_impl&)::$_0>>>::_M_run() /usr/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_thread.h:244:13 #13 0x55be05869602 in execute_native_thread_routine thread.o ``` ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) Signed-off-by: Jin Hai --- CMakeLists.txt | 18 +- benchmark/polling_scheduler/CMakeLists.txt | 29 -- benchmark/polling_scheduler/buffer.h | 39 --- benchmark/polling_scheduler/buffer_queue.h | 41 --- benchmark/polling_scheduler/fragment.cpp | 42 --- benchmark/polling_scheduler/fragment.h | 99 ------- benchmark/polling_scheduler/operator.h | 76 ------ benchmark/polling_scheduler/poller_queue.h | 58 ---- .../polling_scheduler/scheduler_benchmark.cpp | 156 ----------- benchmark/polling_scheduler/task.cpp | 241 ----------------- benchmark/polling_scheduler/task.h | 254 ------------------ benchmark/polling_scheduler/task_queue.h | 33 --- src/parser/expr/match_sparse_expr.cpp | 2 + .../knn/sparse/test_knn_sparse_integer.slt | 2 +- 14 files changed, 16 insertions(+), 1074 deletions(-) delete mode 100644 benchmark/polling_scheduler/CMakeLists.txt delete mode 100644 benchmark/polling_scheduler/buffer.h delete mode 100644 benchmark/polling_scheduler/buffer_queue.h delete mode 100644 benchmark/polling_scheduler/fragment.cpp delete mode 100644 benchmark/polling_scheduler/fragment.h delete mode 100644 benchmark/polling_scheduler/operator.h delete mode 100644 benchmark/polling_scheduler/poller_queue.h delete mode 100644 benchmark/polling_scheduler/scheduler_benchmark.cpp delete mode 100644 benchmark/polling_scheduler/task.cpp delete mode 100644 benchmark/polling_scheduler/task.h delete mode 100644 benchmark/polling_scheduler/task_queue.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d4b26183..0926b8b95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,11 +92,19 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "Debug") set(CMAKE_CXX_FLAGS "-O0 -g") set(CMAKE_C_FLAGS "-O0 -g") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -fno-stack-protector -fno-omit-frame-pointer -fno-var-tracking ") - # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=memory -fno-omit-frame-pointer") - # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=thread -fno-omit-frame-pointer") - # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=undefined -fno-omit-frame-pointer") - # set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=cfi -fno-omit-frame-pointer") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-stack-protector -fno-var-tracking ") + add_compile_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak) + add_link_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak) + + add_compile_options("-fno-omit-frame-pointer") + add_link_options("-fno-omit-frame-pointer") + +# add_compile_options("-fsanitize=undefined") +# add_link_options("-fsanitize=undefined") + +# add_compile_options("-fsanitize=thread") +# add_link_options("-fsanitize=thread") + set(CMAKE_DEBUG_POSTFIX "") else () diff --git a/benchmark/polling_scheduler/CMakeLists.txt b/benchmark/polling_scheduler/CMakeLists.txt deleted file mode 100644 index 568d8b6a9..000000000 --- a/benchmark/polling_scheduler/CMakeLists.txt +++ /dev/null @@ -1,29 +0,0 @@ - -file(GLOB_RECURSE - scheduler_benchmark_files - CONFIGURE_DEPENDS - scheduler_benchmark.cpp - task.cpp - task.h - pipeline.cpp - fragment.h - fragment.cpp - poller_queue.h - ) - -add_executable(polling_scheduler_benchmark - ${scheduler_benchmark_files} - ) - -target_include_directories(polling_scheduler_benchmark PUBLIC "${CMAKE_SOURCE_DIR}/third_party/concurrentqueue") -target_include_directories(polling_scheduler_benchmark PUBLIC "${CMAKE_SOURCE_DIR}/src") - -target_link_libraries( - polling_scheduler_benchmark - infinity_core - benchmark_profiler -) - -if(ENABLE_JEMALLOC) - target_link_libraries(polling_scheduler_benchmark jemalloc.a) -endif() diff --git a/benchmark/polling_scheduler/buffer.h b/benchmark/polling_scheduler/buffer.h deleted file mode 100644 index b492fddb4..000000000 --- a/benchmark/polling_scheduler/buffer.h +++ /dev/null @@ -1,39 +0,0 @@ -// -// Created by jinhai on 23-5-11. -// - -#pragma once - -#include -#include - -namespace infinity { - -class Buffer { -public: - explicit - Buffer(size_t size) : size_(size) { - buffer_ = std::make_unique(size); - } - - inline void - Append(const char* str) const { - size_t len = std::strlen(str); - if(len + offset_ >= size_) { - throw; - } - memcpy(buffer_.get() + offset_, str, len); - } - - inline char* - Get() const { - return buffer_.get(); - } - -private: - std::unique_ptr buffer_{nullptr}; - size_t size_{}; - size_t offset_{}; -}; - -} diff --git a/benchmark/polling_scheduler/buffer_queue.h b/benchmark/polling_scheduler/buffer_queue.h deleted file mode 100644 index f622b32d1..000000000 --- a/benchmark/polling_scheduler/buffer_queue.h +++ /dev/null @@ -1,41 +0,0 @@ -// -// Created by jinhai on 23-5-9. -// - -#pragma once - -#include "mpsc_queue.h" -#include "concurrentqueue.h" -#include - -namespace infinity { - -struct ConcurrentQueue { - bool - TryEnqueue(std::shared_ptr buffer) { - return queue_.try_enqueue(std::move(buffer)); - } - - bool - TryDequeue(std::shared_ptr& buffer) { - return queue_.try_dequeue(buffer); - } - - moodycamel::ConcurrentQueue> queue_{}; -}; - -struct WaitFreeQueue { - void - TryEnqueue(std::shared_ptr buffer) { - queue_.enqueue(std::move(buffer)); - } - - bool - TryDequeue(std::shared_ptr& buffer) { - return queue_.dequeue(buffer); - } - - MPSCQueue> queue_{}; -}; - -} diff --git a/benchmark/polling_scheduler/fragment.cpp b/benchmark/polling_scheduler/fragment.cpp deleted file mode 100644 index 3536d8e12..000000000 --- a/benchmark/polling_scheduler/fragment.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// -// Created by jinhai on 23-5-9. -// - -#include "fragment.h" - -namespace infinity { - -std::vector> -Fragment::BuildTask(uint64_t parallel_size) { - assert(parallel_size > 0); - assert(this->source_ != nullptr); - assert(this->sink_ != nullptr); - - std::vector> child_tasks; - if(this->child_ != nullptr) { - child_tasks = this->child_->BuildTask(parallel_size); - } - - std::vector> result; - - size_t task_count = 0; - if(fragment_type_ == FragmentType::kSerial) { - task_count = 1; - } else if(fragment_type_ == FragmentType::kParallel) { - task_count = parallel_size; - } - - for(size_t idx = 0; idx < task_count; ++idx) { - result.emplace_back(std::make_shared()); - PipelineTask* the_task = (PipelineTask*)(result[idx].get()); - the_task->AddSink(sink_.get()); - the_task->AddSource(source_.get(), !child_tasks.empty()); - for(auto& op: this->operators_) { - the_task->AddOperator(op.get()); - } - the_task->SetChildren(child_tasks); - } - return result; -} - -} \ No newline at end of file diff --git a/benchmark/polling_scheduler/fragment.h b/benchmark/polling_scheduler/fragment.h deleted file mode 100644 index 9d0be96a1..000000000 --- a/benchmark/polling_scheduler/fragment.h +++ /dev/null @@ -1,99 +0,0 @@ -// -// Created by jinhai on 23-5-7. -// - -#pragma once - -#include "task.h" -#include "operator.h" -#include -#include -#include -#include - -namespace infinity { -#if 0 -class Pipeline { -public: - inline explicit - Pipeline(uint64_t fragment_id, uint64_t pipeline_id): fragment_id_(fragment_id), pipeline_id_(pipeline_id) {} - - inline void - Add(std::unique_ptr op) { - operators_.emplace_back(std::move(op)); - - // 128 bytes buffer for the operators input - buffers_.emplace_back(128); - - SizeT last_index = operators_.size() - 1; - operators_[last_index]->SetInput(&buffers_[last_index]); - if(last_index > 0) { - operators_[last_index]->SetOutput(&buffers_[last_index - 1]); - } - } - - [[nodiscard]] inline SizeT - size() const { - return operators_.size(); - } - - inline void - ConnectBuffer(Buffer* buffer) { - operators_[0]->SetInput(buffer); - } - - inline void - PrintOutput() const { - printf("Pipeline Output: %s", buffers_.back().Get()); - } - -private: - std::vector> operators_{}; - std::vector buffers_{}; - uint64_t fragment_id_{}; - uint64_t pipeline_id_{}; -}; -#endif -enum class FragmentType { - kParallel, - kSerial, - kInvalid -}; - -class Fragment { -public: - inline explicit - Fragment(uint64_t id, FragmentType type) : id_(id), fragment_type_(type) {} - - std::vector> - BuildTask(uint64_t parallel_size); - - inline void - AddOperator(std::unique_ptr op) { - operators_.emplace_back(std::move(op)); - } - - inline void - SetChild(std::unique_ptr child) { - child_ = std::move(child); - } - - inline void - AddSource(std::unique_ptr op) { - source_ = std::move(op); - } - - inline void - AddSink(std::unique_ptr op) { - sink_ = std::move(op); - } -private: - uint64_t id_{}; - FragmentType fragment_type_{FragmentType::kInvalid}; - std::unique_ptr source_{}; - std::vector> operators_{}; - std::unique_ptr sink_{}; - std::unique_ptr child_{}; -}; - -} diff --git a/benchmark/polling_scheduler/operator.h b/benchmark/polling_scheduler/operator.h deleted file mode 100644 index 4f59034c5..000000000 --- a/benchmark/polling_scheduler/operator.h +++ /dev/null @@ -1,76 +0,0 @@ -// -// Created by jinhai on 23-5-8. -// - -#pragma once - -#include "buffer.h" -#include "buffer_queue.h" -#include - -namespace infinity { - -class Operator { -public: - explicit - Operator(const std::string& name) : op_name_(std::make_unique(name)) {} - - inline void - Run(const Buffer* input_buffer, Buffer* output_buffer) { -// printf("Operator::Run(): %s\n", op_name_->c_str()); - } - -private: - std::unique_ptr op_name_; -}; - -class Sink { -public: - explicit - Sink(const std::string& name) : op_name_(std::make_unique(name)) {} - - inline void - Run(const Buffer* input_buffer, std::vector& output_buffers) { -// printf("Sink::Run(): %s\n", op_name_->c_str()); - // Read all input buffer and send to output buffer -// output_buffer_->Append(input_buffer_->Get()); - } -private: - std::unique_ptr op_name_; -}; - -enum class SourceType { - kScan, // each pipeline has its own source - kExchange, // all pipelines share one source -}; - -class Source { -public: - explicit - Source(const std::string& name, SourceType source_type) : op_name_(std::make_unique(name)), type_(source_type) {} - - inline void - Run(ConcurrentQueue* input_queue, const Buffer* input_buffer, std::shared_ptr& output_buffer) { - // Send read file request to file reader -// printf("Source::Run(): %s\n", op_name_->c_str()); - if(input_queue == nullptr) { - std::string id_str = std::to_string(0); - output_buffer->Append(id_str.c_str()); -// memcpy((void*)(source_buffer_.get()), id_str.c_str(), id_str.size()); - } else { -// printf("Get data from input queue\n"); - input_queue->TryDequeue(output_buffer); - } - // Or read data from the input buffer, and put it to output_buffer - } - - SourceType - type() const { - return type_; - } -private: - std::unique_ptr op_name_; - SourceType type_; -}; - -} diff --git a/benchmark/polling_scheduler/poller_queue.h b/benchmark/polling_scheduler/poller_queue.h deleted file mode 100644 index 61dfea51d..000000000 --- a/benchmark/polling_scheduler/poller_queue.h +++ /dev/null @@ -1,58 +0,0 @@ -// -// Created by jinhai on 23-5-14. -// - -#pragma once - -#include "blockingconcurrentqueue.h" -#include -#include - -namespace infinity { - -struct Task; - -class PollerQueue { -public: - explicit - PollerQueue(size_t capacity = 1024) : capacity_(capacity) { - } - - void - Enqueue(Task* task_ptr) { - std::unique_lock lock(queue_mutex_); - full_cv_.wait(lock, [this] { return queue_.size() < capacity_; }); - queue_.push_back(task_ptr); - empty_cv_.notify_one(); - } - - void - DequeueBulk(std::list& output_queue) { - std::unique_lock lock(queue_mutex_); - empty_cv_.wait(lock, [this] { return !queue_.empty(); }); - output_queue.splice(output_queue.end(), queue_); - full_cv_.notify_one(); - } - - size_t - Size() const { - std::lock_guard lock(queue_mutex_); - return queue_.size(); - } - - bool - Empty() const { - std::lock_guard lock(queue_mutex_); - return queue_.empty(); - } - -protected: - mutable std::mutex queue_mutex_{}; - std::condition_variable full_cv_{}; - std::condition_variable empty_cv_{}; - std::list queue_{}; - size_t capacity_{32}; -}; - -} - diff --git a/benchmark/polling_scheduler/scheduler_benchmark.cpp b/benchmark/polling_scheduler/scheduler_benchmark.cpp deleted file mode 100644 index 54a49740a..000000000 --- a/benchmark/polling_scheduler/scheduler_benchmark.cpp +++ /dev/null @@ -1,156 +0,0 @@ -// -// Created by jinhai on 23-5-7. -// - - -#include "task.h" -#include "fragment.h" -#include "base_profiler.h" -#include "ctpl.h" -#include - -using namespace infinity; - -static BaseProfiler profiler; -std::atomic_long long_atomic{0}; - -void -execute_task(int64_t id, Task* task, int64_t task_count) { -// printf("execute task by thread: %ld\n", id); - if(task->type() == TaskType::kPipeline) { - PipelineTask* root_task = (PipelineTask*)(task); - root_task->Init(); - - std::queue queue; - queue.push(root_task); - while(!queue.empty()) { - PipelineTask* task_node = queue.front(); - queue.pop(); - if(task_node->children().empty()) { - task_node->set_state(TaskState::kReady); - } else { - task_node->set_state(TaskState::kPending); - } - - NewScheduler::RunTask(task_node); - for(const auto& child_task: task_node->children()) { - queue.push((PipelineTask*)child_task.get()); - } - } - - root_task->GetResult(); - ++long_atomic; - if(long_atomic > task_count) { - printf("time cost: %ld ms\n", profiler.Elapsed() / 1000000); - } - } -} - -void -start_scheduler() { -// const std::unordered_set cpu_mask{1, 3, 5, 7, 9, 11, 13, 15}; - const std::unordered_set cpu_mask{1, 2, 3, 5, 6, 7, 9, 10, 11, 13, 14, 15}; -// const std::unordered_set cpu_mask{1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15}; -// const std::unordered_set cpu_mask{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}; -// const std::unordered_set cpu_mask{1, 3, 5, 7}; -// const std::unordered_set cpu_mask; -// total_query_count = 16; - - int64_t cpu_count = std::thread::hardware_concurrency(); - std::unordered_set cpu_set; - for(int64_t idx = 0; idx < cpu_count; ++idx) { - if(!cpu_mask.contains(idx)) { - cpu_set.insert(idx); - } - } - - NewScheduler::Init(cpu_set); -} - -void -stop_scheduler() { - NewScheduler::Uninit(); -} - -std::unique_ptr -build_fragment0(uint64_t id, const std::string& name) { - // sink->op2->op1->scan - - std::unique_ptr fragment = std::make_unique(id, FragmentType::kParallel); - - std::unique_ptr source = std::make_unique(name, SourceType::kScan); - fragment->AddSource(std::move(source)); - - std::unique_ptr op1 = std::make_unique(name); - fragment->AddOperator(std::move(op1)); -// std::unique_ptr op2 = std::make_unique(name); -// fragment->AddOperator(std::move(op2)); - - std::unique_ptr sink = std::make_unique(name); - fragment->AddSink(std::move(sink)); - - return fragment; -} - -std::unique_ptr -build_fragment1(uint64_t id, const std::string& name) { - // sink->op2->op1->exchange - std::unique_ptr root_fragment = std::make_unique(id, FragmentType::kSerial); - - { - std::unique_ptr source = std::make_unique(name, SourceType::kExchange); - root_fragment->AddSource(std::move(source)); - - std::unique_ptr op1 = std::make_unique(name); - root_fragment->AddOperator(std::move(op1)); - std::unique_ptr op2 = std::make_unique(name); - root_fragment->AddOperator(std::move(op2)); - - std::unique_ptr sink = std::make_unique(name); - root_fragment->AddSink(std::move(sink)); - } - - std::unique_ptr child_fragment = std::make_unique(id, FragmentType::kParallel); - - { - std::unique_ptr source = std::make_unique(name, SourceType::kScan); - child_fragment->AddSource(std::move(source)); - - std::unique_ptr op1 = std::make_unique(name); - child_fragment->AddOperator(std::move(op1)); - std::unique_ptr op2 = std::make_unique(name); - child_fragment->AddOperator(std::move(op2)); - - std::unique_ptr sink = std::make_unique(name); - child_fragment->AddSink(std::move(sink)); - } - - root_fragment->SetChild(std::move(child_fragment)); - - return root_fragment; -} - -auto -main() -> int { - -// uint64_t parallel_size = std::thread::hardware_concurrency(); - uint64_t parallel_size = 65536 * 50; -// uint64_t parallel_size = 1; - - start_scheduler(); - - ctpl::thread_pool pool(32); - - std::unique_ptr frag0 = build_fragment0(0, "test"); -// std::unique_ptr frag0 = build_fragment1(0, "test"); - std::vector> root_tasks = frag0->BuildTask(parallel_size); -// std::shared_ptr source_buffer_ = std::make_unique(BUFFER_SIZE); -#if 1 - profiler.Begin(); - for(const auto& task: root_tasks) { - pool.push(execute_task, task.get(), parallel_size - 1); - } -#endif - sleep(7); - stop_scheduler(); -} diff --git a/benchmark/polling_scheduler/task.cpp b/benchmark/polling_scheduler/task.cpp deleted file mode 100644 index 565c53a6e..000000000 --- a/benchmark/polling_scheduler/task.cpp +++ /dev/null @@ -1,241 +0,0 @@ -// -// Created by jinhai on 23-5-11. -// - - -#include "task.h" -#include "threadutil.h" -#include - -#if defined(__GNUC__) && (defined(__x86_64__) || defined(__i386__)) -#include -#elif defined(__GNUC__) && defined(__aarch64__) -#include -#endif - -namespace infinity { - -std::unordered_set NewScheduler::cpu_set{}; -std::unordered_map> NewScheduler::task_queues{}; -std::unordered_map> NewScheduler::workers{}; - -std::unique_ptr NewScheduler::ready_queue{}; -std::unique_ptr NewScheduler::coordinator{}; - -std::unique_ptr NewScheduler::poller_queue{}; -std::unique_ptr NewScheduler::poller{}; - -std::vector NewScheduler::cpu_array{}; -uint64_t NewScheduler::current_cpu_id{}; - -void -NewScheduler::PollerLoop(int64_t cpu_id) { - std::list local_task_list{}; - std::vector local_ready_queue{}; - bool running{true}; - printf("start poller on CPU: %ld\n", cpu_id); - size_t spin_count{0}; - while(running) { - poller_queue->DequeueBulk(local_task_list); - auto task_iter = local_task_list.begin(); - while(!local_task_list.empty()) { - Task*& task_ptr = (*task_iter); - if(task_ptr->type() == TaskType::kTerminate) { - running = false; - local_ready_queue.emplace_back(task_ptr); - local_task_list.erase(task_iter++); -// ++ task_iter; - } else { - TaskState task_state = task_ptr->state_.load(); - switch(task_state) { - case TaskState::kRunning: - case TaskState::kCancelled: - case TaskState::kFinished: - case TaskState::kReady: { - local_task_list.erase(task_iter++); - local_ready_queue.push_back(task_ptr); - break; - } - case TaskState::kPending: { - ++task_iter; - } - } - } - - if(local_ready_queue.empty()) { - spin_count += 1; - } else { - spin_count = 0; - - ready_queue->EnqueueBulk(local_ready_queue.begin(), local_ready_queue.size()); - local_ready_queue.clear(); - } - - if(spin_count != 0 && spin_count % 32 == 0) { - _mm_pause(); - } - if(spin_count == 6400) { - spin_count = 0; - sched_yield(); - } - if(task_iter == local_task_list.end()) { - task_iter = local_task_list.begin(); - } - } - } - printf("stop poller on CPU: %ld\n", cpu_id); -} - -void -NewScheduler::CoordinatorLoop(int64_t cpu_id) { - Task* input_task{nullptr}; - bool running{true}; - printf("start coordinator on CPU: %ld\n", cpu_id); - while(running) { - NewScheduler::ready_queue->Dequeue(input_task); - if(input_task == nullptr) { - printf("coordinator: null task\n"); - continue; - } - - switch(input_task->type()) { - case TaskType::kTerminate: { - printf("terminate coordinator on CPU: %ld\n", cpu_id); - running = false; - break; - } - case TaskType::kDummy: { - DummyTask* dummy_task = (DummyTask*)input_task; - if(dummy_task->last_worker_id_ == -1) { - // Select an available cpu - NewScheduler::DispatchTask(current_cpu_id % cpu_array.size(), dummy_task); - } else { - NewScheduler::DispatchTask(dummy_task->last_worker_id_, dummy_task); - } - break; - } - case TaskType::kPipeline: { -// printf("coordinator receives PIPELINE TASK on CPU: %ld\n", cpu_id); - PipelineTask* pipeline_task = (PipelineTask*)input_task; - // Construct pipeline task and schedule it. - if(pipeline_task->last_worker_id_ == -1) { - // Select an available cpu - current_cpu_id = current_cpu_id % cpu_array.size(); -// printf("Dispatched to CPU: %ld\n", cpu_array[current_cpu_id]); - NewScheduler::DispatchTask(cpu_array[current_cpu_id], pipeline_task); - ++current_cpu_id; - } else { - NewScheduler::DispatchTask(pipeline_task->last_worker_id_, pipeline_task); - } - break; - } - case TaskType::kInvalid: { - printf("receive invalid type of task, terminate coordinator on CPU: %ld\n", cpu_id); - running = false; - break; - } - } - if(input_task->type() == TaskType::kTerminate) { - running = false; - } else { -// task->run(worker_id); - } - } -} - -void -NewScheduler::WorkerLoop(BlockingQueue* task_queue, int64_t worker_id) { - Task* task{nullptr}; - bool running{true}; - printf("start worker on CPU: %ld\n", worker_id); - while(running) { - task_queue->Dequeue(task); - if(task == nullptr) { - printf("worker %ld: null task\n", worker_id); - continue; - } - switch(task->type()) { - case TaskType::kTerminate: { - printf("terminate worker on CPU: %ld\n", worker_id); - running = false; - break; - } - case TaskType::kDummy: - case TaskType::kPipeline: { - task->Run(worker_id); - break; - } - case TaskType::kInvalid: { - printf("receive invalid type of task, terminate worker on CPU: %ld\n", worker_id); - running = false; - break; - } - } - } -} - -void -NewScheduler::Init(const std::unordered_set& input_cpu_set) { - if(!cpu_set.empty()) { - std::cerr << "scheduler was initialized before" << std::endl; - return; - } - cpu_set = input_cpu_set; - - cpu_array.reserve(cpu_set.size()); - for(int64_t cpu_id: cpu_set) { - cpu_array.emplace_back(cpu_id); - - std::unique_ptr task_queue = std::make_unique(); - std::unique_ptr task_thread = std::make_unique(WorkerLoop, task_queue.get(), cpu_id); - - // Pin the thread to specific cpu - ThreadUtil::pin(*task_thread, cpu_id); - - task_queues.emplace(cpu_id, std::move(task_queue)); - workers.emplace(cpu_id, std::move(task_thread)); - } - - // Start coordinator - ready_queue = std::make_unique(); - coordinator = std::make_unique(CoordinatorLoop, 0); - - poller_queue = std::make_unique(); - poller = std::make_unique(PollerLoop, 0); - ThreadUtil::pin(*coordinator, 0); - ThreadUtil::pin(*poller, 0); -} - -int64_t -NewScheduler::GetAvailableCPU() { - assert(false); - return 0; -} - -void -NewScheduler::Uninit() { - std::unique_ptr terminate_task = std::make_unique(); - poller_queue->Enqueue(terminate_task.get()); - poller->join(); - coordinator->join(); - for(int64_t cpu_id: cpu_set) { - task_queues[cpu_id]->Enqueue(terminate_task.get()); - workers[cpu_id]->join(); - } -} - -void -NewScheduler::RunTask(Task* task) { - poller_queue->Enqueue(task); -} - -void -NewScheduler::DispatchTask(int64_t worker_id, Task* task) { - if(!task_queues.contains(worker_id)) { - printf("Can't use this CPU: %ld\n", worker_id); - assert(false); - } - task_queues[worker_id]->Enqueue(task); -} - -} diff --git a/benchmark/polling_scheduler/task.h b/benchmark/polling_scheduler/task.h deleted file mode 100644 index 8dd774a2c..000000000 --- a/benchmark/polling_scheduler/task.h +++ /dev/null @@ -1,254 +0,0 @@ -// -// Created by jinhai on 23-5-7. -// - -#pragma once - -#include "operator.h" -#include "buffer_queue.h" -#include "task_queue.h" -#include "poller_queue.h" -#include -#include -#include -#include - -namespace infinity { - -struct Task; - -class NewScheduler { -public: - static void - Init(const std::unordered_set& cpu_set); - - static void - Uninit(); - - static void - RunTask(Task* task); - -private: - static void - DispatchTask(int64_t worker_id, Task* task); - - static void - PollerLoop(int64_t cpu_id); - - static void - CoordinatorLoop(int64_t cpu_id); - - static void - WorkerLoop(BlockingQueue* task_queue, int64_t worker_id); - - static int64_t - GetAvailableCPU(); - -private: - static std::unordered_set cpu_set; - - static std::unordered_map> task_queues; - static std::unordered_map> workers; - - static std::unique_ptr ready_queue; - static std::unique_ptr coordinator; - - static std::unique_ptr poller_queue; - static std::unique_ptr poller; - - static std::vector cpu_array; - static uint64_t current_cpu_id; -}; - - -#define BUFFER_SIZE 128 - -enum class TaskType { - kTerminate, - kDummy, - kPipeline, - kInvalid, -}; - -enum class TaskState { - kReady, - kFinished, - kCancelled, - kPending, - kRunning, -}; - -struct Task { - inline explicit - Task(TaskType type) : type_(type) {} - - virtual void - Run(int64_t worker_id) { - // Not implemented - last_worker_id_ = worker_id; - } - - inline void - set_state(TaskState state) { - state_.store(state); - } - - [[nodiscard]] inline TaskType - type() const { - return type_; - } - - TaskType type_{TaskType::kInvalid}; - int64_t last_worker_id_{-1}; - bool ready_{false}; - std::atomic state_{TaskState::kPending}; -}; - -struct TerminateTask final : public Task { - inline explicit - TerminateTask() : Task(TaskType::kTerminate) { - ready_ = true; - } -}; - -struct DummyTask final : public Task { - inline explicit - DummyTask() : Task(TaskType::kDummy) { - ready_ = true; - } - - void - Run(int64_t worker_id) override { - last_worker_id_ = worker_id; - printf("Run dummy task by worker: %ld\n", worker_id); - sleep(1); - } -}; - -struct PipelineTask final : public Task { - inline explicit - PipelineTask() : Task(TaskType::kPipeline) {} - - inline void - Init() { - if(parents_.empty()) { - root_task_ = true; - } else { - root_task_ = false; - } - } - - inline void - AddSink(Sink* sink) { - sink_ = sink; - } - - inline void - AddSource(Source* source, bool input_queue) { - source_ = source; - if(input_queue) { - input_queue_ = std::make_unique(); - } - } - - inline void - AddOperator(Operator* op) { - operators_.emplace_back(op); - buffers_.emplace_back(std::make_unique(BUFFER_SIZE)); - } - - inline void - Run(int64_t worker_id) override { - last_worker_id_ = worker_id; -// printf("Run pipeline task by worker: %ld\n", worker_id); - - // Read data from source buffer or input queue - source_buffer_ = std::make_shared(BUFFER_SIZE); - source_->Run(input_queue_.get(), nullptr, source_buffer_); - - // process the data one by one operator and push to next operator - size_t op_count = operators_.size(); - assert(op_count > 0); - operators_[0]->Run(source_buffer_.get(), buffers_[0].get()); - for(size_t idx = 1; idx < op_count; ++idx) { - operators_[idx]->Run(buffers_[idx - 1].get(), buffers_[idx].get()); - } - - // push the data into output queue - sink_->Run(buffers_.back().get(), output_queues_); - - // put the parent task into scheduler - for(Task* parent: parents_) { -// printf("Notify parent to run\n"); - parent->set_state(TaskState::kReady); -// NewScheduler::RunTask(parent); - } - - if(root_task_) { -// wait_flag_.notify_one(); -// printf("Notify result\n"); - std::unique_lock lck(result_lk_); - completed_ = true; - result_cv_.notify_one(); - } -// sleep(1); - } - - inline void - SetChildren(std::vector> children) { - children_ = std::move(children); - for(const std::shared_ptr& child: children_) { - PipelineTask* child_pipeline = (PipelineTask*)child.get(); - child_pipeline->AddOutputQueue(input_queue_.get()); - child_pipeline->AddParent(this); - } - } - - [[nodiscard]] inline const std::vector>& - children() const { - return children_; - } - - inline void - GetResult() { -// wait_flag_.wait(true); - std::unique_lock locker(result_lk_); - result_cv_.wait(locker, [&] { - return completed_; - }); -// printf("Get result\n"); - } - -private: - inline void - AddOutputQueue(ConcurrentQueue* queue) { - output_queues_.emplace_back(queue); - } - - inline void - AddParent(Task* parent) { - parents_.emplace_back(parent); - } -private: - Sink* sink_{}; - std::vector output_queues_; - - std::vector operators_{}; - std::vector> buffers_{}; - - Source* source_{}; - std::shared_ptr source_buffer_ = nullptr; - // Wait-free queue - std::unique_ptr input_queue_{nullptr}; - - std::vector> children_{}; - std::vector parents_{}; - - bool root_task_{false}; - bool completed_{false}; - std::mutex result_lk_; - std::condition_variable result_cv_; -// std::atomic_bool wait_flag_{false}; -}; - -} diff --git a/benchmark/polling_scheduler/task_queue.h b/benchmark/polling_scheduler/task_queue.h deleted file mode 100644 index d3ba69cee..000000000 --- a/benchmark/polling_scheduler/task_queue.h +++ /dev/null @@ -1,33 +0,0 @@ -// -// Created by jinhai on 23-5-9. -// - -#pragma once - -#include "blockingconcurrentqueue.h" - -namespace infinity { - -struct Task; - -struct BlockingQueue { - void - Enqueue(Task* task) { - queue_.enqueue(task); - } - - template - void - EnqueueBulk(It iter, size_t count) { - queue_.enqueue_bulk(std::forward(iter), count); - } - - void - Dequeue(Task*& task) { - queue_.wait_dequeue(task); - } - - moodycamel::BlockingConcurrentQueue queue_; -}; - -} diff --git a/src/parser/expr/match_sparse_expr.cpp b/src/parser/expr/match_sparse_expr.cpp index 873d735cb..4bef404d1 100644 --- a/src/parser/expr/match_sparse_expr.cpp +++ b/src/parser/expr/match_sparse_expr.cpp @@ -49,6 +49,8 @@ void MatchSparseExpr::SetOptParams(size_t topn, std::vector *&o opt_params_[i].reset(param); param = nullptr; } + delete opt_params; + opt_params = nullptr; } std::string MatchSparseExpr::ToString() const { diff --git a/test/sql/dql/knn/sparse/test_knn_sparse_integer.slt b/test/sql/dql/knn/sparse/test_knn_sparse_integer.slt index 17b0cef5e..ad12056f7 100644 --- a/test/sql/dql/knn/sparse/test_knn_sparse_integer.slt +++ b/test/sql/dql/knn/sparse/test_knn_sparse_integer.slt @@ -14,7 +14,7 @@ CREATE TABLE test_knn_sparse_integer(c1 INT, c2 SPARSE(INT, 100)); statement ok COPY test_knn_sparse_integer FROM '/var/infinity/test_data/sparse_knn_integer.csv' WITH (DELIMITER ','); -# mertic ip will order descendingly. The query will return row 4, 2, 1 +# metric ip will be in descending order. The query will return row 4, 2, 1 query I SELECT c1 FROM test_knn_sparse_integer SEARCH MATCH SPARSE (c2, [0:1.0,20:2.0,80:3.0], 'ip', 3); ----