Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize-vecop-program-execution #736

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions icicle/backend/cpu/src/field/cpu_vec_ops.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <sys/types.h>
#include <vector>

#include "taskflow/taskflow.hpp"
#include "icicle/program/program.h"
#include "cpu_program_executor.h"

Expand Down Expand Up @@ -852,20 +853,36 @@ eIcicleError cpu_execute_program(
<< " parameters";
return eIcicleError::INVALID_ARGUMENT;
}
tf::Taskflow taskflow; // Accumulate tasks
tf::Executor executor; // execute all tasks accumulated on multiple threads
const uint64_t total_nof_operations = size * config.batch_size;
CpuProgramExecutor prog_executor(program);
// init prog_executor to point to data vectors
for (int param_idx = 0; param_idx < program.m_nof_parameters; ++param_idx) {
prog_executor.m_variable_ptrs[param_idx] = data[param_idx];
}

// run over all elements in the arrays and execute the program
for (uint64_t i = 0; i < total_nof_operations; i++) {
prog_executor.execute();
for (int param_idx = 0; param_idx < program.m_nof_parameters; ++param_idx) {
(prog_executor.m_variable_ptrs[param_idx])++;
}
// Divide the problem to workers
const int nof_workers = get_nof_workers(config);
const uint64_t worker_task_size = (total_nof_operations + nof_workers - 1) / nof_workers; // round up

for (uint64_t start_idx = 0; start_idx < total_nof_operations; start_idx += worker_task_size) {
taskflow.emplace([=]() {
CpuProgramExecutor prog_executor(program);
// init prog_executor to point to data vectors
for (int param_idx = 0; param_idx < program.m_nof_parameters; ++param_idx) {
prog_executor.m_variable_ptrs[param_idx] = &(data[param_idx][start_idx]);
}

const uint64_t task_size = std::min(worker_task_size, total_nof_operations - start_idx);
// run over all task elements in the arrays and execute the program
for (uint64_t i = 0; i < task_size; i++) {
prog_executor.execute();
// update the program pointers
for (int param_idx = 0; param_idx < program.m_nof_parameters; ++param_idx) {
(prog_executor.m_variable_ptrs[param_idx])++;
}
}
});
}

executor.run(taskflow).wait();
taskflow.clear();
return eIcicleError::SUCCESS;
}

Expand Down
Loading