Skip to content

Commit

Permalink
Moved to constant-time event cancellation
Browse files Browse the repository at this point in the history
The previous cancel relied on a naive linear search to find
events to cancel. This provided a simple and robust solution
that avoided issues with in-flight events.

Achieving constant-time cancellation required two modifications:
- Use doubly-linked list for queue
- Encode event addresses in the event id

Encoding the event addresses turned out to be a bit tricky. The
equeue library supports cancelling events after they have been
dispatched. This used an incrementing counter to obtain pseudo-
unique ids.

To fit everything in a single int, the event address is stored
as an offset from the equeue's memory region. A seperate counter is
used for each allocated chunk, taking advantage of the non-coalescing
nature of the underlying allocator. Together these are ored to obtain
a unique id that can be decoded to obtain the underlying event.

Notable performance impact (make prof):
equeue_cancel_many_prof: 132 cycles (+75%)
equeue_alloc_many_size_prof: 56000 bytes (-12%)
equeue_alloc_fragmented_size_prof: 56000 bytes (-12%)
  • Loading branch information
geky committed Jul 30, 2016
1 parent f602cb5 commit 7937a76
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 58 deletions.
107 changes: 57 additions & 50 deletions equeue.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,24 @@ int equeue_create(equeue_t *q, size_t size) {
}

int err = equeue_create_inplace(q, size, buffer);
q->buffer = buffer;
q->allocated = buffer;
return err;
}

int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
q->buffer = buffer;
q->allocated = 0;

q->npw2 = 0;
for (unsigned s = size; s; s >>= 1) {
q->npw2++;
}

q->chunks = 0;
q->slab.size = size;
q->slab.data = buffer;
q->chunks = 0;
q->buffer = 0;

q->queue = 0;
q->next_id = 42;
q->break_ = (struct equeue_event){
.id = 0,
.period = -1,
Expand All @@ -45,7 +51,7 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) {
return err;
}

err = equeue_mutex_create(&q->freelock);
err = equeue_mutex_create(&q->memlock);
if (err < 0) {
return err;
}
Expand All @@ -60,21 +66,18 @@ void equeue_destroy(equeue_t *q) {
equeue_dealloc(q, e+1);
}

equeue_mutex_destroy(&q->freelock);
equeue_mutex_destroy(&q->memlock);
equeue_mutex_destroy(&q->queuelock);
equeue_sema_destroy(&q->eventsema);
free(q->buffer);
free(q->allocated);
}

// equeue allocation functions
static void *equeue_mem_alloc(equeue_t *q, size_t size) {
size = size + sizeof(unsigned);
size = (size + sizeof(unsigned)-1) & ~(sizeof(unsigned)-1);
if (size < sizeof(struct equeue_chunk)) {
size = sizeof(struct equeue_chunk);
}
// equeue chunk allocation functions
static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) {
size += sizeof(struct equeue_event);
size = (size + sizeof(void*)-1) & ~(sizeof(void*)-1);

equeue_mutex_lock(&q->freelock);
equeue_mutex_lock(&q->memlock);

for (struct equeue_chunk **p = &q->chunks; *p; p = &(*p)->nchunk) {
if ((*p)->size >= size) {
Expand All @@ -85,8 +88,14 @@ static void *equeue_mem_alloc(equeue_t *q, size_t size) {
} else {
*p = c->nchunk;
}
equeue_mutex_unlock(&q->freelock);
return (unsigned *)c + 1;

c->id += 1;
if (c->id >> (8*sizeof(int)-1 - q->npw2)) {
c->id = 1;
}

equeue_mutex_unlock(&q->memlock);
return (struct equeue_event *)c;
}
}

Expand All @@ -95,18 +104,20 @@ static void *equeue_mem_alloc(equeue_t *q, size_t size) {
q->slab.data += size;
q->slab.size -= size;
c->size = size;
equeue_mutex_unlock(&q->freelock);
return (unsigned *)c + 1;
c->id = 1;

equeue_mutex_unlock(&q->memlock);
return (struct equeue_event *)c;
}

equeue_mutex_unlock(&q->freelock);
equeue_mutex_unlock(&q->memlock);
return 0;
}

static void equeue_mem_dealloc(equeue_t *q, void *e) {
struct equeue_chunk *c = (struct equeue_chunk *)((unsigned *)e - 1);
static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) {
struct equeue_chunk *c = (struct equeue_chunk *)e;

equeue_mutex_lock(&q->freelock);
equeue_mutex_lock(&q->memlock);

struct equeue_chunk **p = &q->chunks;
while (*p && (*p)->size < c->size) {
Expand All @@ -121,27 +132,17 @@ static void equeue_mem_dealloc(equeue_t *q, void *e) {
c->nchunk = *p;
}
*p = c;

equeue_mutex_unlock(&q->freelock);
}

// event allocation functions
static inline int equeue_next_id(equeue_t *q) {
int id = q->next_id++;
if (q->next_id < 0) {
q->next_id = 42;
}
return id;
equeue_mutex_unlock(&q->memlock);
}

// equeue allocation functions
void *equeue_alloc(equeue_t *q, size_t size) {
struct equeue_event *e = equeue_mem_alloc(q,
sizeof(struct equeue_event) + size);
struct equeue_event *e = equeue_mem_alloc(q, size);
if (!e) {
return 0;
}

e->id = equeue_next_id(q);
e->target = 0;
e->period = -1;
e->dtor = 0;
Expand Down Expand Up @@ -172,24 +173,23 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
p = &(*p)->next;
}

e->ref = p;
e->next = *p;
if (*p) {
(*p)->ref = &e->next;
}
*p = e;
}

static struct equeue_event *equeue_dequeue(equeue_t *q, int id) {
for (struct equeue_event **p = &q->queue; *p; p = &(*p)->next) {
if ((*p)->id == id) {
struct equeue_event *e = *p;
*p = (*p)->next;
return e;
}
static void equeue_dequeue(equeue_t *q, struct equeue_event *e) {
if (e->next) {
e->next->ref = e->ref;
}

return 0;
*e->ref = e->next;
}

static int equeue_post_in(equeue_t *q, struct equeue_event *e, int ms) {
int id = e->id;
int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer);
if (ms < 0) {
equeue_dealloc(q, e+1);
return id;
Expand All @@ -211,13 +211,19 @@ int equeue_post(equeue_t *q, void (*cb)(void*), void *p) {
}

void equeue_cancel(equeue_t *q, int id) {
struct equeue_event *e = (struct equeue_event *)
&q->buffer[id & ((1 << q->npw2)-1)];

equeue_mutex_lock(&q->queuelock);
struct equeue_event *e = equeue_dequeue(q, id);
if (e->id != id >> q->npw2) {
equeue_mutex_unlock(&q->queuelock);
return;
}

equeue_dequeue(q, e);
equeue_mutex_unlock(&q->queuelock);

if (e) {
equeue_dealloc(q, e+1);
}
equeue_dealloc(q, e+1);
}

void equeue_break(equeue_t *q) {
Expand Down Expand Up @@ -248,6 +254,7 @@ void equeue_dispatch(equeue_t *q, int ms) {
}

struct equeue_event *e = q->queue;
e->id += 1;
q->queue = e->next;

if (e->period >= 0) {
Expand Down
16 changes: 11 additions & 5 deletions equeue.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ extern "C" {

// Definition of the minimum size of an event
// This size fits the events created in the event_call set of functions.
#define EQUEUE_EVENT_SIZE (sizeof(struct equeue_event) + 3*sizeof(void*))
#define EQUEUE_EVENT_SIZE (sizeof(struct equeue_event) + 2*sizeof(void*))

// Event/queue structures
struct equeue_event {
struct equeue_event *next;
unsigned size;
int id;
struct equeue_event *next;
struct equeue_event **ref;

unsigned target;
int period;
void (*dtor)(void *);
Expand All @@ -37,11 +40,14 @@ struct equeue_event {

typedef struct equeue {
struct equeue_event *queue;
int next_id;

void *buffer;
unsigned char *buffer;
unsigned npw2;
void *allocated;

struct equeue_chunk {
unsigned size;
int id;
struct equeue_chunk *next;
struct equeue_chunk *nchunk;
} *chunks;
Expand All @@ -54,7 +60,7 @@ typedef struct equeue {

equeue_sema_t eventsema;
equeue_mutex_t queuelock;
equeue_mutex_t freelock;
equeue_mutex_t memlock;
} equeue_t;


Expand Down
6 changes: 3 additions & 3 deletions tests/prof.c
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ void equeue_post_many_prof(int count) {
struct equeue q;
equeue_create(&q, count*EQUEUE_EVENT_SIZE);

for (int i = 0; i < count; i++) {
for (int i = 0; i < count-1; i++) {
equeue_call(&q, no_func, 0);
}

Expand Down Expand Up @@ -224,7 +224,7 @@ void equeue_post_future_many_prof(int count) {
struct equeue q;
equeue_create(&q, count*EQUEUE_EVENT_SIZE);

for (int i = 0; i < count; i++) {
for (int i = 0; i < count-1; i++) {
equeue_call(&q, no_func, 0);
}

Expand Down Expand Up @@ -293,7 +293,7 @@ void equeue_cancel_many_prof(int count) {
struct equeue q;
equeue_create(&q, count*EQUEUE_EVENT_SIZE);

for (int i = 0; i < count; i++) {
for (int i = 0; i < count-1; i++) {
equeue_call(&q, no_func, 0);
}

Expand Down

0 comments on commit 7937a76

Please sign in to comment.