Skip to content

Commit

Permalink
Fix double decremented threadpool->num_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
sergio-nsk authored and nmoinvaz committed Dec 15, 2022
1 parent 2cf376f commit 713cb96
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 28 deletions.
20 changes: 6 additions & 14 deletions threadpool_pthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ struct threadpool_s;

typedef struct threadpool_thread_s {
pthread_t handle;
struct threadpool_s *pool;
struct threadpool_thread_s *next;
} threadpool_thread_s;

Expand Down Expand Up @@ -88,14 +87,10 @@ static threadpool_job_s *threadpool_dequeue_job(threadpool_s *threadpool) {
}

static void *threadpool_do_work(void *arg) {
threadpool_thread_s *thread = (threadpool_thread_s *)arg;
threadpool_s *threadpool = thread->pool;
threadpool_s *threadpool = arg;

LOG_DEBUG("threadpool - worker 0x%" PRIx64 " - started\n", (uint64_t)pthread_self());

if (!thread)
return NULL;

while (true) {
pthread_mutex_lock(&threadpool->queue_mutex);
LOG_DEBUG("threadpool - worker 0x%" PRIx64 " - waiting for job\n", (uint64_t)pthread_self());
Expand Down Expand Up @@ -140,26 +135,23 @@ static void *threadpool_do_work(void *arg) {

LOG_DEBUG("threadpool - worker 0x%" PRIx64 " - stopped\n", (uint64_t)pthread_self());

// Reduce thread count
threadpool->num_threads--;
pthread_cond_signal(&threadpool->lazy_cond);
pthread_mutex_unlock(&threadpool->queue_mutex);
return NULL;
}

static void threadpool_create_thread_on_demand(threadpool_s *threadpool) {
// Create new thread and add it to the list of threads
threadpool_thread_s *thread = (threadpool_thread_s *)calloc(1, sizeof(threadpool_thread_s));
if (!thread)
pthread_t handle = 0;
if (pthread_create(&handle, NULL, threadpool_do_work, threadpool))
return;

thread->pool = threadpool;
threadpool_thread_s *thread = calloc(1, sizeof(threadpool_thread_s));
thread->handle = handle;
thread->next = threadpool->threads;

threadpool->threads = thread;
threadpool->num_threads++;

pthread_create(&thread->handle, NULL, threadpool_do_work, thread);
}

bool threadpool_enqueue(void *ctx, void *user_data, threadpool_job_cb callback) {
Expand Down Expand Up @@ -193,12 +185,12 @@ static void threadpool_delete_threads(threadpool_s *threadpool) {
while (threadpool->threads) {
thread = threadpool->threads;
threadpool->threads = threadpool->threads->next;
threadpool->num_threads--;

pthread_join(thread->handle, NULL);

free(thread);
}
threadpool->num_threads = 0;
}

static void threadpool_delete_jobs(threadpool_s *threadpool) {
Expand Down
22 changes: 8 additions & 14 deletions threadpool_winxp.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ struct threadpool_s;
typedef struct threadpool_thread_s {
HANDLE handle;
uint32_t id;
struct threadpool_s *pool;
struct threadpool_thread_s *next;
} threadpool_thread_s;

Expand Down Expand Up @@ -92,14 +91,10 @@ static threadpool_job_s *threadpool_dequeue_job(threadpool_s *threadpool) {
}

static uint32_t __stdcall threadpool_do_work(void *arg) {
threadpool_thread_s *thread = (threadpool_thread_s *)arg;
threadpool_s *threadpool = thread->pool;
threadpool_s *threadpool = arg;

LOG_DEBUG("threadpool - worker 0x%" PRIx32 " - started\n", thread->id);

if (!thread)
return 1;

while (true) {
mutex_lock(threadpool->queue_lock);
LOG_DEBUG("threadpool - worker 0x%" PRIx32 " - waiting for job\n", thread->id);
Expand Down Expand Up @@ -146,26 +141,25 @@ static uint32_t __stdcall threadpool_do_work(void *arg) {

LOG_DEBUG("threadpool - worker 0x%" PRIx32 " - stopped\n", thread->id);

// Reduce thread count
threadpool->num_threads--;
event_set(threadpool->lazy_cond);
mutex_unlock(threadpool->queue_lock);
return 0;
}

static void threadpool_create_thread_on_demand(threadpool_s *threadpool) {
// Create new thread and add it to the list of threads
threadpool_thread_s *thread = (threadpool_thread_s *)calloc(1, sizeof(threadpool_thread_s));
if (!thread)
uint32_t id = 0;
HANDLE handle = (HANDLE)_beginthreadex(NULL, 0, threadpool_do_work, threadpool, 0, &id);
if (handle == -1)
return;

thread->pool = threadpool;
threadpool_thread_s *thread = calloc(1, sizeof(threadpool_thread_s));
thread->handle = handle;
thread->id = id;
thread->next = threadpool->threads;

threadpool->threads = thread;
threadpool->num_threads++;

thread->handle = (HANDLE)_beginthreadex(NULL, 0, threadpool_do_work, thread, 0, &thread->id);
}

bool threadpool_enqueue(void *ctx, void *user_data, threadpool_job_cb callback) {
Expand Down Expand Up @@ -209,7 +203,6 @@ static void threadpool_delete_threads(threadpool_s *threadpool) {
while (threadpool->threads) {
thread = threadpool->threads;
threadpool->threads = threadpool->threads->next;
threadpool->num_threads--;

// Wait for thread to exit
while (true) {
Expand All @@ -222,6 +215,7 @@ static void threadpool_delete_threads(threadpool_s *threadpool) {

free(thread);
}
threadpool->num_threads = 0;
}

static void threadpool_delete_jobs(threadpool_s *threadpool) {
Expand Down

0 comments on commit 713cb96

Please sign in to comment.