From 80036dfb03e2e12a2dcb19c0b5e69b19616299ad Mon Sep 17 00:00:00 2001 From: chenyushuo <297086016@qq.com> Date: Thu, 29 Aug 2024 17:34:22 +0800 Subject: [PATCH] fix in worker.cc --- src/agentscope/cpp_server/worker.cc | 66 +++++++++++++++++------------ src/agentscope/cpp_server/worker.h | 4 +- 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/src/agentscope/cpp_server/worker.cc b/src/agentscope/cpp_server/worker.cc index c78b85f6c..f1cab2ddb 100644 --- a/src/agentscope/cpp_server/worker.cc +++ b/src/agentscope/cpp_server/worker.cc @@ -64,7 +64,7 @@ Worker::Worker( _func_ready_sem_prefix("/func_" + port + "_"), _set_result_sem_prefix("/set_result_" + port + "_"), _small_obj_pool_shm_name("/small_obj_pool_shm_" + port), - _small_obj_pool_sem_name("/small_obj_pool_sem_" + port), + _small_obj_pool_filename("./logs/small_obj_pool_" + port), _call_shm_size(1024), _small_obj_max_num(100000), _small_obj_size(1000), @@ -74,6 +74,20 @@ Worker::Worker( _max_timeout_seconds(std::max(max_timeout_seconds, 1u)) { py::gil_scoped_release release; + char *use_logger = getenv("AGENTSCOPE_USE_CPP_LOGGER"); + if (use_logger != nullptr && std::string(use_logger) == "True") + { + _use_logger = true; + } + else + { + _use_logger = false; + } + struct stat info; + if (stat("./logs/", &info) != 0) + { + mkdir("./logs", 0755); + } _small_obj_pool_shm_fd = shm_open(_small_obj_pool_shm_name.c_str(), O_CREAT | O_RDWR, 0666); if (_small_obj_pool_shm_fd == -1) { @@ -88,26 +102,13 @@ Worker::Worker( exit(1); } memset(_small_obj_pool_shm, 0, _small_obj_max_num * _small_obj_shm_size); - _small_obj_pool_sem = sem_open(_small_obj_pool_sem_name.c_str(), O_CREAT, 0666, 1); - if (_small_obj_pool_sem == SEM_FAILED) + _small_obj_pool_fd = open(_small_obj_pool_filename.c_str(), O_RDWR | O_CREAT, 0644); + if (_small_obj_pool_fd == -1) { - perror("Error: sem_open in create _small_obj_pool_sem"); + perror("Error: open in _small_obj_pool_fd"); exit(1); } - char *use_logger = getenv("AGENTSCOPE_USE_CPP_LOGGER"); - if (use_logger != nullptr && std::string(use_logger) == "True") - { - _use_logger = true; - } - else - { - _use_logger = false; - } - struct stat info; - if (stat("./logs/", &info) != 0) - { - mkdir("./logs", 0755); - } + ftruncate(_small_obj_pool_fd, _small_obj_max_num * _small_obj_size); for (int i = 0; i < _num_workers; i++) { string shm_name = _func_call_shm_prefix + to_string(i); @@ -243,7 +244,8 @@ Worker::~Worker() // for main process to release resources close(_small_obj_pool_shm_fd); munmap(_small_obj_pool_shm, _small_obj_max_num * _small_obj_shm_size); shm_unlink(_small_obj_pool_shm_name.c_str()); - sem_unlink(_small_obj_pool_sem_name.c_str()); + close(_small_obj_pool_fd); + remove(_small_obj_pool_filename.c_str()); for (auto iter : _worker_semaphores) { sem_t *worker_avail_sem = iter.first; @@ -322,6 +324,7 @@ int Worker::get_call_id() perror(("Error: sem_open in get_call_id" + set_result_name).c_str()); exit(1); } + sem_close(set_result_sem); return call_id; } @@ -376,23 +379,32 @@ void Worker::set_content(const string &prefix, const int call_id, const string & int pool_idx = call_id % _small_obj_max_num; char *small_obj_shm = (char *)_small_obj_pool_shm + pool_idx * _small_obj_shm_size; int *occupied = (int *)small_obj_shm; - while (true) + struct flock lock; + lock.l_type = F_WRLCK; + lock.l_whence = SEEK_SET; + lock.l_start = pool_idx * _small_obj_shm_size; + lock.l_len = _small_obj_shm_size; + for (bool running = true; running; std::this_thread::sleep_for(std::chrono::milliseconds(1))) { if (*occupied == false) { - sem_wait(_small_obj_pool_sem); + if (fcntl(_small_obj_pool_fd, F_SETLKW, &lock) == -1) + { + perror(("Error: fcntl in set_content (lock): " + prefix + to_string(call_id)).c_str()); + exit(1); + } if (*occupied == false) { *occupied = true; - sem_post(_small_obj_pool_sem); - break; + running = false; } - else + lock.l_type = F_UNLCK; + if (fcntl(_small_obj_pool_fd, F_SETLK, &lock) == -1) { - sem_post(_small_obj_pool_sem); + perror(("Error: fcntl in set_content (unlock): " + prefix + to_string(call_id)).c_str()); + exit(1); } } - std::this_thread::sleep_for(std::chrono::milliseconds(1)); } *(int *)(small_obj_shm + sizeof(int)) = call_id; *(int *)(small_obj_shm + sizeof(int) * 2) = content.size(); @@ -439,7 +451,7 @@ string Worker::get_result(const int call_id) sem_t *set_result_sem = sem_open(set_result_name.c_str(), 0); if (set_result_sem == SEM_FAILED) { - perror(("Error: sem_open in get_result:" + to_string(call_id)).c_str()); + perror(("Error: sem_open in get_result: " + set_result_name).c_str()); exit(1); } sem_wait(set_result_sem); diff --git a/src/agentscope/cpp_server/worker.h b/src/agentscope/cpp_server/worker.h index d7bd433e1..8a087f5b5 100644 --- a/src/agentscope/cpp_server/worker.h +++ b/src/agentscope/cpp_server/worker.h @@ -76,10 +76,10 @@ class Worker const string _func_ready_sem_prefix; const string _set_result_sem_prefix; const string _small_obj_pool_shm_name; - const string _small_obj_pool_sem_name; + const string _small_obj_pool_filename; int _small_obj_pool_shm_fd; + int _small_obj_pool_fd; void *_small_obj_pool_shm; - sem_t *_small_obj_pool_sem; const unsigned int _call_shm_size; const unsigned int _small_obj_max_num; const unsigned int _small_obj_size;