Skip to content

Commit

Permalink
Added slotting for sibling events in queue
Browse files Browse the repository at this point in the history
The queue data-structure has been molded into a 2d double linked-list
with slots for each set of events that expire at a given time. Other
than having little downsides, the biggest benifit is a constant-time
enqueueing of events with no delay while still maintaining ordering.

To reduce the memory overhead for each event, the queue is not
doubly-linked in both directions. Instead, noting that no more
than 1 reference is needed for each event, the queue is stored as
a 2d single linked-list with generalized pointers to whatever
references events.

To maintain constant-enqueueing without a tail pointer for each slot,
events are inserted as a stack and reversed on dequeueing. This does
not impact amortized complexity as each event already has to be marked
dirty.

Notable performance impact (make prof):
equeue_post_many_prof: 202 cycles (+98%)
equeue_post_future_many_prof: 207 cycles (+98%)
equeue_alloc_size_prof: 56 bytes (-16%)
equeue_alloc_many_size_prof: 64000 bytes (-14%)
equeue_alloc_fragmented_size_prof: 64000 bytes (-14%)
  • Loading branch information
geky committed Jul 30, 2016
1 parent c4d4839 commit e638d20
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 54 deletions.
124 changes: 76 additions & 48 deletions equeue.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,59 +76,57 @@ static struct equeue_event *equeue_mem_alloc(equeue_t *q, size_t size) {

equeue_mutex_lock(&q->memlock);

for (struct equeue_chunk **p = &q->chunks; *p; p = &(*p)->nchunk) {
for (struct equeue_event **p = &q->chunks; *p; p = &(*p)->next) {
if ((*p)->size >= size) {
struct equeue_chunk *c = *p;
if (c->next) {
*p = c->next;
(*p)->nchunk = c->nchunk;
struct equeue_event *e = *p;
if (e->sibling) {
*p = e->sibling;
(*p)->next = e->next;
} else {
*p = c->nchunk;
*p = e->next;
}

c->id += 1;
if (c->id >> (8*sizeof(int)-1 - q->npw2)) {
c->id = 1;
e->id += 1;
if (e->id >> (8*sizeof(int)-1 - q->npw2)) {
e->id = 1;
}

equeue_mutex_unlock(&q->memlock);
return (struct equeue_event *)c;
return e;
}
}

if (q->slab.size >= size) {
struct equeue_chunk *c = (struct equeue_chunk *)q->slab.data;
struct equeue_event *e = (struct equeue_event *)q->slab.data;
q->slab.data += size;
q->slab.size -= size;
c->size = size;
c->id = 1;
e->size = size;
e->id = 1;

equeue_mutex_unlock(&q->memlock);
return (struct equeue_event *)c;
return e;
}

equeue_mutex_unlock(&q->memlock);
return 0;
}

static void equeue_mem_dealloc(equeue_t *q, struct equeue_event *e) {
struct equeue_chunk *c = (struct equeue_chunk *)e;

equeue_mutex_lock(&q->memlock);

struct equeue_chunk **p = &q->chunks;
while (*p && (*p)->size < c->size) {
p = &(*p)->nchunk;
struct equeue_event **p = &q->chunks;
while (*p && (*p)->size < e->size) {
p = &(*p)->next;
}

if (*p && (*p)->size == c->size) {
c->next = *p;
c->nchunk = (*p)->nchunk;
if (*p && (*p)->size == e->size) {
e->sibling = *p;
e->next = (*p)->next;
} else {
c->next = 0;
c->nchunk = *p;
e->sibling = 0;
e->next = *p;
}
*p = c;
*p = e;

equeue_mutex_unlock(&q->memlock);
}
Expand Down Expand Up @@ -166,48 +164,78 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) {
e->target = equeue_tick() + ms;

struct equeue_event **p = &q->queue;
while (*p && equeue_tickdiff((*p)->target, e->target) <= 0) {
while (*p && equeue_tickdiff((*p)->target, e->target) < 0) {
p = &(*p)->next;
}

e->ref = p;
e->next = *p;
if (*p) {
(*p)->ref = &e->next;
if (*p && (*p)->target == e->target) {
if (*p) {
(*p)->ref = &e->sibling;
}
e->sibling = *p;

if ((*p)->next) {
(*p)->next->ref = &e->next;
}
e->next = (*p)->next;
} else {
if (*p) {
(*p)->ref = &e->next;
}
e->next = *p;

e->sibling = 0;
}

e->ref = p;
*p = e;
}

static void equeue_unqueue(equeue_t *q, struct equeue_event *e) {
if (e->next) {
e->next->ref = e->ref;
if (e->sibling) {
if (e->next) {
e->next->ref = &e->sibling->next;
}
e->sibling->next = e->next;

e->sibling->ref = e->ref;
*e->ref = e->sibling;
} else {
if (e->next) {
e->next->ref = e->ref;
}
*e->ref = e->next;
}
*e->ref = e->next;
}

static struct equeue_event *equeue_dequeue(
equeue_t *q, unsigned target, int *deadline) {
struct equeue_event *head = q->queue;
if (!head || equeue_tickdiff(head->target, target) > 0) {
return 0;
}
static struct equeue_event *equeue_dequeue(equeue_t *q, int *deadline) {
unsigned target = equeue_tick();
struct equeue_event *head = 0;
struct equeue_event **tail = &head;

struct equeue_event **p = &q->queue;
while (*p) {
int diff = equeue_tickdiff((*p)->target, target);
while (q->queue) {
int diff = equeue_tickdiff(q->queue->target, target);
if (diff > 0) {
*deadline = diff;
break;
}

p = &(*p)->next;
struct equeue_event *es = q->queue;
q->queue = es->next;

struct equeue_event *prev = 0;
for (struct equeue_event *e = es; e; e = e->sibling) {
e->next = prev;
prev = e;
}

*tail = prev;
tail = &es->next;
}

if (*p) {
(*p)->ref = &q->queue;
if (q->queue) {
q->queue->ref = &q->queue;
}
q->queue = *p;
*p = 0;

return head;
}
Expand Down Expand Up @@ -267,7 +295,7 @@ void equeue_dispatch(equeue_t *q, int ms) {
int deadline = -1;
if (q->queue) {
equeue_mutex_lock(&q->queuelock);
es = equeue_dequeue(q, equeue_tick(), &deadline);
es = equeue_dequeue(q, &deadline);

// mark events as in-flight
for (struct equeue_event *e = es; e; e = e->next) {
Expand Down
8 changes: 2 additions & 6 deletions equeue.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct equeue_event {
unsigned size;
int id;
struct equeue_event *next;
struct equeue_event *sibling;
struct equeue_event **ref;

unsigned target;
Expand All @@ -46,12 +47,7 @@ typedef struct equeue {
unsigned npw2;
void *allocated;

struct equeue_chunk {
unsigned size;
int id;
struct equeue_chunk *next;
struct equeue_chunk *nchunk;
} *chunks;
struct equeue_event *chunks;
struct equeue_slab {
size_t size;
unsigned char *data;
Expand Down

0 comments on commit e638d20

Please sign in to comment.