Skip to content

Commit

Permalink
Added read/write lock to queue (#433)
Browse files Browse the repository at this point in the history
  • Loading branch information
DerjenigeUberMensch authored Sep 22, 2024
1 parent 320955e commit 198cd32
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 21 deletions.
2 changes: 1 addition & 1 deletion include/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct CQueue
void *data;
uint32_t datalen;
size_t datasize;
pthread_mutex_t mutex;
pthread_rwlock_t mutex;
pthread_mutex_t condmutex;
pthread_cond_t cond;
};
Expand Down
51 changes: 31 additions & 20 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@

#include "queue.h"

static inline int
CQueueLockR(CQueue *q)
{ return pthread_rwlock_rdlock(&q->mutex);
}

static inline int
CQueueLockW(CQueue *q)
{ return pthread_rwlock_wrlock(&q->mutex);
}

static inline int
CQueueLock(CQueue *q)
{ return pthread_mutex_lock(&q->mutex);
CQueueUnlockR(CQueue *q)
{ return pthread_rwlock_unlock(&q->mutex);
}

static inline int
CQueueUnlock(CQueue *q)
{ return pthread_mutex_unlock(&q->mutex);
CQueueUnlockW(CQueue *q)
{ return pthread_rwlock_unlock(&q->mutex);
}

static inline const int
Expand Down Expand Up @@ -44,9 +53,9 @@ CQueueIsFull(CQueue *queue)
{ return 0;
}
uint8_t ret = 0;
CQueueLock(queue);
CQueueLockR(queue);
ret = __CQueue_full_no_lock(queue);
CQueueUnlock(queue);
CQueueUnlockR(queue);
return ret;
}

Expand All @@ -58,9 +67,9 @@ CQueueIsEmpty(CQueue *queue)
}
uint8_t ret = 0;

CQueueLock(queue);
CQueueLockR(queue);
ret = queue->front == -1;
CQueueUnlock(queue);
CQueueUnlockR(queue);
return ret;
}

Expand All @@ -70,10 +79,10 @@ CQueuePop(CQueue *queue, void *fill)
if(!queue)
{ return 0;
}
CQueueLock(queue);
CQueueLockW(queue);
if(queue->front == -1)
{
CQueueUnlock(queue);
CQueueUnlockW(queue);
return 0;
}
if(fill)
Expand All @@ -85,7 +94,7 @@ CQueuePop(CQueue *queue, void *fill)
else
{ queue->front = (queue->front + 1) % queue->datalen;
}
CQueueUnlock(queue);
CQueueUnlockW(queue);
return 1;
}

Expand All @@ -97,10 +106,10 @@ CQueueAdd(CQueue *queue, void *data)
}
uint32_t empty;
int64_t index;
CQueueLock(queue);
CQueueLockW(queue);
if(__CQueue_full_no_lock(queue))
{
CQueueUnlock(queue);
CQueueUnlockW(queue);
return 0;
}
empty = queue->front == -1;
Expand All @@ -110,7 +119,7 @@ CQueueAdd(CQueue *queue, void *data)

memcpy((uint8_t *)queue->data + queue->datasize * index, data, queue->datasize);
pthread_cond_signal(&queue->cond);
CQueueUnlock(queue);
CQueueUnlockW(queue);
return 1;
}

Expand All @@ -121,11 +130,11 @@ CQueueGetFirst(CQueue *queue)
if(!queue)
{ return ret;
}
CQueueLock(queue);
CQueueLockR(queue);
if(queue->front != -1)
{ ret = (uint8_t *)queue->data + queue->rear * queue->datasize;
}
CQueueUnlock(queue);
CQueueUnlockR(queue);
return ret;
}

Expand All @@ -136,11 +145,11 @@ CQueueGetLast(CQueue *queue)
if(!queue)
{ return ret;
}
CQueueLock(queue);
CQueueLockR(queue);
if(queue->rear != -1)
{ ret = (uint8_t *)queue->data + queue->rear * queue->datasize;
}
CQueueUnlock(queue);
CQueueUnlockR(queue);
return ret;
}

Expand All @@ -154,16 +163,18 @@ CQueueCreate(void *data, uint32_t datalen, size_t sizeof_one_item, CQueue *_Q_RE
_Q_RETURN->datalen = datalen;
_Q_RETURN->rear = -1;
_Q_RETURN->front = -1;
_Q_RETURN->mutex = mutex;
_Q_RETURN->condmutex = mutex;
_Q_RETURN->cond = cond;
if(pthread_rwlock_init(&_Q_RETURN->mutex, NULL))
{ return 1;
}
return 0;
}

void
CQueueDestroy(CQueue *queue)
{
pthread_mutex_destroy(&queue->mutex);
pthread_rwlock_destroy(&queue->mutex);
pthread_mutex_destroy(&queue->condmutex);
pthread_cond_destroy(&queue->cond);
}
Expand Down

0 comments on commit 198cd32

Please sign in to comment.