Skip to content

Commit

Permalink
Fixed small corner cases dispatch logic
Browse files Browse the repository at this point in the history
- Moved event id updates to deallocation. Dirty events are atomically
  updated directly to new ids instead of transitioning through a
  non-dirty state. This fixes behaviour when cancelling events after
  dispatch and avoids race conditions when cancelling in-flight events.

- Added semaphore signal to requeueing of period events. This avoids
  missing deadline updates from periodic events.

- Fixed fabricated range limitation in the posix equeue_sema_wait caused
  by using the equeue delay type for absolute time values.

Tests have been added for the above bugs where possible.
  • Loading branch information
geky committed Aug 2, 2016
1 parent 43e609e commit eb6eee1
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
19 changes: 13 additions & 6 deletions equeue.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) {
*p = e->next;
}

e->id += 1;
if (e->id >> (8*sizeof(int)-1 - q->npw2)) {
e->id = 1;
}

equeue_mutex_unlock(&q->memlock);
return e;
}
Expand Down Expand Up @@ -240,6 +235,14 @@ static struct equeue_event *equeue_dequeue(equeue_t *q, int *deadline) {
return head;
}

static inline int equeue_incid(equeue_t *q, int id) {
if ((id+1) >> (8*sizeof(int)-1 - q->npw2)) {
return 1;
}

return id+1;
}

int equeue_post(equeue_t *q, void (*cb)(void*), void *p) {
struct equeue_event *e = (struct equeue_event*)p - 1;
int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer);
Expand Down Expand Up @@ -274,6 +277,7 @@ void equeue_cancel(equeue_t *q, int id) {
}

equeue_unqueue(q, e);
e->id = equeue_incid(q, e->id);
equeue_mutex_unlock(&q->queuelock);

equeue_dealloc(q, e+1);
Expand Down Expand Up @@ -316,12 +320,15 @@ void equeue_dispatch(equeue_t *q, int ms) {
}

// undirty the id and either dealloc or reenqueue periodic events
e->id = -e->id;
if (e->period >= 0) {
equeue_mutex_lock(&q->queuelock);
e->id = -e->id;
equeue_enqueue(q, e, e->period);
equeue_mutex_unlock(&q->queuelock);

equeue_sema_signal(&q->eventsema);
} else {
e->id = equeue_incid(q, -e->id);
equeue_dealloc(q, e+1);
}
}
Expand Down
10 changes: 7 additions & 3 deletions equeue_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <time.h>
#include <sys/time.h>
#include <errno.h>


// Tick operations
Expand Down Expand Up @@ -57,11 +58,14 @@ bool equeue_sema_wait(equeue_sema_t *s, int ms) {
if (ms < 0) {
return !sem_wait(s);
} else {
ms += equeue_tick();
struct timeval tv;
gettimeofday(&tv, 0);

struct timespec ts = {
.tv_sec = ms/1000,
.tv_nsec = ms*1000000,
.tv_sec = ms/1000 + tv.tv_sec,
.tv_nsec = ms*1000000 + tv.tv_usec*1000,
};

return !sem_timedwait(s, &ts);
}
}
Expand Down
54 changes: 49 additions & 5 deletions tests/tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void pass_func(void *eh) {
}

void simple_func(void *p) {
*(bool *)p = true;
(*(int *)p)++;
}

struct indirect {
Expand Down Expand Up @@ -113,10 +113,10 @@ void simple_call_in_test(void) {
test_assert(!err);

bool touched = false;
int id = equeue_call_in(&q, 5, simple_func, &touched);
int id = equeue_call_in(&q, 10, simple_func, &touched);
test_assert(id);

equeue_dispatch(&q, 10);
equeue_dispatch(&q, 15);
test_assert(touched);

equeue_destroy(&q);
Expand All @@ -128,10 +128,10 @@ void simple_call_every_test(void) {
test_assert(!err);

bool touched = false;
int id = equeue_call_every(&q, 5, simple_func, &touched);
int id = equeue_call_every(&q, 10, simple_func, &touched);
test_assert(id);

equeue_dispatch(&q, 10);
equeue_dispatch(&q, 15);
test_assert(touched);

equeue_destroy(&q);
Expand Down Expand Up @@ -227,6 +227,34 @@ void cancel_test(int N) {
equeue_destroy(&q);
}

void cancel_unnecessarily_test(void) {
equeue_t q;
int err = equeue_create(&q, 2048);
test_assert(!err);

int id = equeue_call(&q, pass_func, 0);
for (int i = 0; i < 5; i++) {
equeue_cancel(&q, id);
}

id = equeue_call(&q, pass_func, 0);
equeue_dispatch(&q, 0);
for (int i = 0; i < 5; i++) {
equeue_cancel(&q, id);
}

bool touched = false;
equeue_call(&q, simple_func, &touched);
for (int i = 0; i < 5; i++) {
equeue_cancel(&q, id);
}

equeue_dispatch(&q, 0);
test_assert(touched);

equeue_destroy(&q);
}

void loop_protect_test(void) {
equeue_t q;
int err = equeue_create(&q, 2048);
Expand Down Expand Up @@ -262,6 +290,20 @@ void break_test(void) {
equeue_destroy(&q);
}

void period_test(void) {
equeue_t q;
int err = equeue_create(&q, 2048);
test_assert(!err);

int count = 0;
equeue_call_every(&q, 10, simple_func, &count);

equeue_dispatch(&q, 55);
test_assert(count == 5);

equeue_destroy(&q);
}

// Barrage tests
void simple_barrage_test(int N) {
equeue_t q;
Expand Down Expand Up @@ -365,8 +407,10 @@ int main() {
test_run(destructor_test);
test_run(allocation_failure_test);
test_run(cancel_test, 20);
test_run(cancel_unnecessarily_test);
test_run(loop_protect_test);
test_run(break_test);
test_run(period_test);
test_run(simple_barrage_test, 20);
test_run(fragmenting_barrage_test, 20);
test_run(multithreaded_barrage_test, 20);
Expand Down

0 comments on commit eb6eee1

Please sign in to comment.