Skip to content

Commit

Permalink
Moved to tick + generation counter to mark in-flight events
Browse files Browse the repository at this point in the history
Before this commit, the event-id was negated to mark events that
became in-flight. Marking each event required a O(n) operation in a
critical-section. By using the property that in-flight events must have
expired, events do not need to be manually marked, reducing the
critical section to a O(1) operation per slot.

Unfortunately, relying on the most recent dispatch tick is not
sufficient in cases where events post events during the same tick.
A simple generation counter was added to avoid this issue.

Notable performance impact (make prof):
equeue_dispatch_prof: 468 cycles (+20%)
  • Loading branch information
geky committed Aug 5, 2016
1 parent 02ce85d commit 599964f
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 66 deletions.
146 changes: 82 additions & 64 deletions equeue.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
q->slab.data = buffer;

q->queue = 0;
q->tick = equeue_tick();
q->generation = 0;
q->breaks = 0;

int err;
Expand Down Expand Up @@ -158,8 +160,19 @@ static inline int equeue_tickdiff(unsigned a, unsigned b) {
return (int)(a - b);
}

static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
static inline void equeue_incid(equeue_t *q, struct equeue_event *e) {
e->id += 1;
if (e->id >> (8*sizeof(int)-1 - q->npw2)) {
e->id = 1;
}
}

static int equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer);
e->target = equeue_tick() + ms;
e->generation = q->generation;

equeue_mutex_lock(&q->queuelock);

struct equeue_event **p = &q->queue;
while (*p && equeue_tickdiff((*p)->target, e->target) < 0) {
Expand All @@ -185,9 +198,31 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {

*p = e;
e->ref = p;

equeue_mutex_unlock(&q->queuelock);

return id;
}

static void equeue_unqueue(equeue_t *q, struct equeue_event *e) {
static struct equeue_event *equeue_unqueue(equeue_t *q, int id) {
struct equeue_event *e = (struct equeue_event *)
&q->buffer[id & ((1 << q->npw2)-1)];

equeue_mutex_lock(&q->queuelock);
if (e->id != id >> q->npw2) {
equeue_mutex_unlock(&q->queuelock);
return 0;
}

e->cb = 0;
e->period = -1;

int diff = equeue_tickdiff(e->target, q->tick);
if (diff < 0 || (diff == 0 && e->generation != q->generation)) {
equeue_mutex_unlock(&q->queuelock);
return 0;
}

if (e->sibling) {
e->sibling->next = e->next;
if (e->sibling->next) {
Expand All @@ -202,16 +237,41 @@ static void equeue_unqueue(equeue_t *q, struct equeue_event *e) {
e->next->ref = e->ref;
}
}

equeue_incid(q, e);
equeue_mutex_unlock(&q->queuelock);

return e;
}

static struct equeue_event *equeue_dequeue(equeue_t *q) {
unsigned target = equeue_tick();
struct equeue_event *head = 0;
struct equeue_event **tail = &head;
static struct equeue_event *equeue_dequeue(equeue_t *q, unsigned target) {
equeue_mutex_lock(&q->queuelock);

while (q->queue && equeue_tickdiff(q->queue->target, target) <= 0) {
struct equeue_event *es = q->queue;
q->queue = es->next;
q->generation += 1;
if (equeue_tickdiff(q->tick, target) <= 0) {
q->tick = target;
}

struct equeue_event *head = q->queue;
struct equeue_event **p = &head;
while (*p && equeue_tickdiff((*p)->target, target) <= 0) {
p = &(*p)->next;
}

q->queue = *p;
if (q->queue) {
q->queue->ref = &q->queue;
}

*p = 0;

equeue_mutex_unlock(&q->queuelock);

struct equeue_event **tail = &head;
struct equeue_event *ess = head;
while (ess) {
struct equeue_event *es = ess;
ess = es->next;

struct equeue_event *prev = 0;
for (struct equeue_event *e = es; e; e = e->sibling) {
Expand All @@ -223,59 +283,23 @@ static struct equeue_event *equeue_dequeue(equeue_t *q) {
tail = &es->next;
}

if (q->queue) {
q->queue->ref = &q->queue;
}

return head;
}

static inline int equeue_incid(equeue_t *q, int id) {
if ((id+1) >> (8*sizeof(int)-1 - q->npw2)) {
return 1;
}

return id+1;
}

int equeue_post(equeue_t *q, void (*cb)(void*), void *p) {
struct equeue_event *e = (struct equeue_event*)p - 1;
int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer);
e->cb = cb;

if (e->target < 0) {
equeue_dealloc(q, e+1);
return id;
}

equeue_mutex_lock(&q->queuelock);
equeue_enqueue(q, e, e->target);
equeue_mutex_unlock(&q->queuelock);

int id = equeue_enqueue(q, e, e->target);
equeue_sema_signal(&q->eventsema);
return id;
}

void equeue_cancel(equeue_t *q, int id) {
struct equeue_event *e = (struct equeue_event *)
&q->buffer[id & ((1 << q->npw2)-1)];

equeue_mutex_lock(&q->queuelock);
if (e->id == -id >> q->npw2) {
e->cb = 0;
e->period = -1;
}

if (e->id != id >> q->npw2) {
equeue_mutex_unlock(&q->queuelock);
return;
struct equeue_event *e = equeue_unqueue(q, id);
if (e) {
equeue_dealloc(q, e + 1);
}

equeue_unqueue(q, e);
e->id = equeue_incid(q, e->id);
equeue_mutex_unlock(&q->queuelock);

equeue_dealloc(q, e+1);
}

void equeue_break(equeue_t *q) {
Expand All @@ -286,18 +310,12 @@ void equeue_break(equeue_t *q) {
}

void equeue_dispatch(equeue_t *q, int ms) {
unsigned timeout = equeue_tick() + ms;
unsigned tick = equeue_tick();
unsigned timeout = tick + ms;

while (1) {
// collect all the available events and next deadline
equeue_mutex_lock(&q->queuelock);
struct equeue_event *es = equeue_dequeue(q);

// mark events as in-flight
for (struct equeue_event *e = es; e; e = e->next) {
e->id = -e->id;
}
equeue_mutex_unlock(&q->queuelock);
struct equeue_event *es = equeue_dequeue(q, tick);

// dispatch events
while (es) {
Expand All @@ -310,20 +328,17 @@ void equeue_dispatch(equeue_t *q, int ms) {
cb(e + 1);
}

// undirty the id and either dealloc or reenqueue periodic events
// reenqueue periodic events or deallocate
if (e->period >= 0) {
equeue_mutex_lock(&q->queuelock);
e->id = -e->id;
equeue_enqueue(q, e, e->period);
equeue_mutex_unlock(&q->queuelock);
} else {
e->id = equeue_incid(q, -e->id);
equeue_incid(q, e);
equeue_dealloc(q, e+1);
}
}

int deadline = -1;
unsigned tick = equeue_tick();
tick = equeue_tick();

// check if we should stop dispatching soon
if (ms >= 0) {
Expand Down Expand Up @@ -356,6 +371,9 @@ void equeue_dispatch(equeue_t *q, int ms) {
}
equeue_mutex_unlock(&q->queuelock);
}

// update tick for next iteration
tick = equeue_tick();
}
}

Expand Down
9 changes: 7 additions & 2 deletions equeue.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ extern "C" {
#include "equeue_sema.h"

#include <stddef.h>
#include <stdint.h>


// Definition of the minimum size of an event
Expand All @@ -26,7 +27,9 @@ extern "C" {
// Event/queue structures
struct equeue_event {
unsigned size;
int id;
uint8_t id;
uint8_t generation;

struct equeue_event *next;
struct equeue_event *sibling;
struct equeue_event **ref;
Expand All @@ -41,7 +44,9 @@ struct equeue_event {

typedef struct equeue {
struct equeue_event *queue;
int breaks;
unsigned tick;
unsigned breaks;
uint8_t generation;

unsigned char *buffer;
unsigned npw2;
Expand Down
48 changes: 48 additions & 0 deletions tests/tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ void fragment_func(void *p) {
test_assert(id);
}

struct cancel {
equeue_t *q;
int id;
};

void cancel_func(void *p) {
struct cancel *cancel = (struct cancel *)p;
equeue_cancel(cancel->q, cancel->id);
}


// Simple call tests
void simple_call_test(void) {
Expand Down Expand Up @@ -194,6 +204,8 @@ void destructor_test(void) {
for (int i = 0; i < 3; i++) {
equeue_cancel(&q, ids[i]);
}

equeue_dispatch(&q, 0);
test_assert(touched == 3);

touched = 0;
Expand Down Expand Up @@ -251,6 +263,41 @@ void cancel_test(int N) {
equeue_destroy(&q);
}

void cancel_inflight_test(void) {
equeue_t q;
int err = equeue_create(&q, 2048);
test_assert(!err);

bool touched = false;

int id = equeue_call(&q, simple_func, &touched);
equeue_cancel(&q, id);

equeue_dispatch(&q, 0);
test_assert(!touched);

id = equeue_call(&q, simple_func, &touched);
equeue_cancel(&q, id);

equeue_dispatch(&q, 0);
test_assert(!touched);

struct cancel *cancel = equeue_alloc(&q, sizeof(struct cancel));
test_assert(cancel);
cancel->q = &q;
cancel->id = 0;

id = equeue_post(&q, cancel_func, cancel);
test_assert(id);

cancel->id = equeue_call(&q, simple_func, &touched);

equeue_dispatch(&q, 0);
test_assert(!touched);

equeue_destroy(&q);
}

void cancel_unnecessarily_test(void) {
equeue_t q;
int err = equeue_create(&q, 2048);
Expand Down Expand Up @@ -431,6 +478,7 @@ int main() {
test_run(destructor_test);
test_run(allocation_failure_test);
test_run(cancel_test, 20);
test_run(cancel_inflight_test);
test_run(cancel_unnecessarily_test);
test_run(loop_protect_test);
test_run(break_test);
Expand Down

0 comments on commit 599964f

Please sign in to comment.