Skip to content

Commit

Permalink
fix in worker.cc
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyushuo committed Aug 29, 2024
1 parent 7c7a2bc commit 80036df
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 29 deletions.
66 changes: 39 additions & 27 deletions src/agentscope/cpp_server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Worker::Worker(
_func_ready_sem_prefix("/func_" + port + "_"),
_set_result_sem_prefix("/set_result_" + port + "_"),
_small_obj_pool_shm_name("/small_obj_pool_shm_" + port),
_small_obj_pool_sem_name("/small_obj_pool_sem_" + port),
_small_obj_pool_filename("./logs/small_obj_pool_" + port),
_call_shm_size(1024),
_small_obj_max_num(100000),
_small_obj_size(1000),
Expand All @@ -74,6 +74,20 @@ Worker::Worker(
_max_timeout_seconds(std::max(max_timeout_seconds, 1u))
{
py::gil_scoped_release release;
char *use_logger = getenv("AGENTSCOPE_USE_CPP_LOGGER");
if (use_logger != nullptr && std::string(use_logger) == "True")
{
_use_logger = true;
}
else
{
_use_logger = false;
}
struct stat info;
if (stat("./logs/", &info) != 0)
{
mkdir("./logs", 0755);
}
_small_obj_pool_shm_fd = shm_open(_small_obj_pool_shm_name.c_str(), O_CREAT | O_RDWR, 0666);
if (_small_obj_pool_shm_fd == -1)
{
Expand All @@ -88,26 +102,13 @@ Worker::Worker(
exit(1);
}
memset(_small_obj_pool_shm, 0, _small_obj_max_num * _small_obj_shm_size);
_small_obj_pool_sem = sem_open(_small_obj_pool_sem_name.c_str(), O_CREAT, 0666, 1);
if (_small_obj_pool_sem == SEM_FAILED)
_small_obj_pool_fd = open(_small_obj_pool_filename.c_str(), O_RDWR | O_CREAT, 0644);
if (_small_obj_pool_fd == -1)
{
perror("Error: sem_open in create _small_obj_pool_sem");
perror("Error: open in _small_obj_pool_fd");
exit(1);
}
char *use_logger = getenv("AGENTSCOPE_USE_CPP_LOGGER");
if (use_logger != nullptr && std::string(use_logger) == "True")
{
_use_logger = true;
}
else
{
_use_logger = false;
}
struct stat info;
if (stat("./logs/", &info) != 0)
{
mkdir("./logs", 0755);
}
ftruncate(_small_obj_pool_fd, _small_obj_max_num * _small_obj_size);
for (int i = 0; i < _num_workers; i++)
{
string shm_name = _func_call_shm_prefix + to_string(i);
Expand Down Expand Up @@ -243,7 +244,8 @@ Worker::~Worker() // for main process to release resources
close(_small_obj_pool_shm_fd);
munmap(_small_obj_pool_shm, _small_obj_max_num * _small_obj_shm_size);
shm_unlink(_small_obj_pool_shm_name.c_str());
sem_unlink(_small_obj_pool_sem_name.c_str());
close(_small_obj_pool_fd);
remove(_small_obj_pool_filename.c_str());
for (auto iter : _worker_semaphores)
{
sem_t *worker_avail_sem = iter.first;
Expand Down Expand Up @@ -322,6 +324,7 @@ int Worker::get_call_id()
perror(("Error: sem_open in get_call_id" + set_result_name).c_str());
exit(1);
}
sem_close(set_result_sem);
return call_id;
}

Expand Down Expand Up @@ -376,23 +379,32 @@ void Worker::set_content(const string &prefix, const int call_id, const string &
int pool_idx = call_id % _small_obj_max_num;
char *small_obj_shm = (char *)_small_obj_pool_shm + pool_idx * _small_obj_shm_size;
int *occupied = (int *)small_obj_shm;
while (true)
struct flock lock;
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
lock.l_start = pool_idx * _small_obj_shm_size;
lock.l_len = _small_obj_shm_size;
for (bool running = true; running; std::this_thread::sleep_for(std::chrono::milliseconds(1)))
{
if (*occupied == false)
{
sem_wait(_small_obj_pool_sem);
if (fcntl(_small_obj_pool_fd, F_SETLKW, &lock) == -1)
{
perror(("Error: fcntl in set_content (lock): " + prefix + to_string(call_id)).c_str());
exit(1);
}
if (*occupied == false)
{
*occupied = true;
sem_post(_small_obj_pool_sem);
break;
running = false;
}
else
lock.l_type = F_UNLCK;
if (fcntl(_small_obj_pool_fd, F_SETLK, &lock) == -1)
{
sem_post(_small_obj_pool_sem);
perror(("Error: fcntl in set_content (unlock): " + prefix + to_string(call_id)).c_str());
exit(1);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
*(int *)(small_obj_shm + sizeof(int)) = call_id;
*(int *)(small_obj_shm + sizeof(int) * 2) = content.size();
Expand Down Expand Up @@ -439,7 +451,7 @@ string Worker::get_result(const int call_id)
sem_t *set_result_sem = sem_open(set_result_name.c_str(), 0);
if (set_result_sem == SEM_FAILED)
{
perror(("Error: sem_open in get_result:" + to_string(call_id)).c_str());
perror(("Error: sem_open in get_result: " + set_result_name).c_str());
exit(1);
}
sem_wait(set_result_sem);
Expand Down
4 changes: 2 additions & 2 deletions src/agentscope/cpp_server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,10 @@ class Worker
const string _func_ready_sem_prefix;
const string _set_result_sem_prefix;
const string _small_obj_pool_shm_name;
const string _small_obj_pool_sem_name;
const string _small_obj_pool_filename;
int _small_obj_pool_shm_fd;
int _small_obj_pool_fd;
void *_small_obj_pool_shm;
sem_t *_small_obj_pool_sem;
const unsigned int _call_shm_size;
const unsigned int _small_obj_max_num;
const unsigned int _small_obj_size;
Expand Down

0 comments on commit 80036df

Please sign in to comment.