From c4d48397f2b6f998d70cea81de10863dd6ab1e79 Mon Sep 17 00:00:00 2001 From: Christopher Haster Date: Sat, 30 Jul 2016 05:35:58 -0500 Subject: [PATCH] Moved to segmented dispatch Before, the dispatch function iterated over the events, calling the callbacks as it removed the events from the queue. Now the dispatch function dequeues all pending events before dispatching callbacks. This removes the need for a break event, allows better cache coherency, and results in a better organization of the dispatch function. The only downsides are a longer startup in dispatch and increased jitter, although the latter could be fixed by adding fabricated unlock points during the linked-list traversal. Notable performance impact (make prof): equeue_dispatch_prof: 466 cycles (+25%) equeue_alloc_many_prof: 79 cycles (-12%) equeue_post_many_prof: 7681 cycles (+33%) equeue_post_future_many_prof: 7612 cycles (+35%) equeue_dispatch_many_prof: 8353 cycles (+65%) It's interesting to note there is a decrease in performance for the alloc_many test, this may just be because moving the equeue members around caused the slab info to cross a cache boundary. The alloc_many tests is already very fast (~3* the time for mutex lock). --- equeue.c | 136 +++++++++++++++++++++++++++++++++++++------------------ equeue.h | 3 +- 2 files changed, 92 insertions(+), 47 deletions(-) diff --git a/equeue.c b/equeue.c index 9b72c24..ac7fd92 100644 --- a/equeue.c +++ b/equeue.c @@ -35,10 +35,7 @@ int equeue_create_inplace(equeue_t *q, size_t size, void *buffer) { q->slab.data = buffer; q->queue = 0; - q->break_ = (struct equeue_event){ - .id = 0, - .period = -1, - }; + q->breaks = 0; int err; err = equeue_sema_create(&q->eventsema); @@ -172,7 +169,7 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) { while (*p && equeue_tickdiff((*p)->target, e->target) <= 0) { p = &(*p)->next; } - + e->ref = p; e->next = *p; if (*p) { @@ -181,102 +178,151 @@ static void equeue_enqueue(equeue_t *q, struct equeue_event *e, unsigned ms) { *p = e; } -static void equeue_dequeue(equeue_t *q, struct equeue_event *e) { +static void equeue_unqueue(equeue_t *q, struct equeue_event *e) { if (e->next) { e->next->ref = e->ref; } *e->ref = e->next; } -static int equeue_post_in(equeue_t *q, struct equeue_event *e, int ms) { +static struct equeue_event *equeue_dequeue( + equeue_t *q, unsigned target, int *deadline) { + struct equeue_event *head = q->queue; + if (!head || equeue_tickdiff(head->target, target) > 0) { + return 0; + } + + struct equeue_event **p = &q->queue; + while (*p) { + int diff = equeue_tickdiff((*p)->target, target); + if (diff > 0) { + *deadline = diff; + break; + } + + p = &(*p)->next; + } + + if (*p) { + (*p)->ref = &q->queue; + } + q->queue = *p; + *p = 0; + + return head; +} + +int equeue_post(equeue_t *q, void (*cb)(void*), void *p) { + struct equeue_event *e = (struct equeue_event*)p - 1; int id = (e->id << q->npw2) | ((unsigned char *)e - q->buffer); - if (ms < 0) { + e->cb = cb; + + if (e->target < 0) { equeue_dealloc(q, e+1); return id; } equeue_mutex_lock(&q->queuelock); - equeue_enqueue(q, e, ms); + equeue_enqueue(q, e, e->target); equeue_mutex_unlock(&q->queuelock); equeue_sema_release(&q->eventsema); return id; } -int equeue_post(equeue_t *q, void (*cb)(void*), void *p) { - struct equeue_event *e = (struct equeue_event*)p - 1; - e->cb = cb; - int id = equeue_post_in(q, e, e->target); - return id; -} - void equeue_cancel(equeue_t *q, int id) { struct equeue_event *e = (struct equeue_event *) &q->buffer[id & ((1 << q->npw2)-1)]; equeue_mutex_lock(&q->queuelock); + if (e->id == -id >> q->npw2) { + e->cb = 0; + e->period = -1; + } + if (e->id != id >> q->npw2) { equeue_mutex_unlock(&q->queuelock); return; } - equeue_dequeue(q, e); + equeue_unqueue(q, e); equeue_mutex_unlock(&q->queuelock); equeue_dealloc(q, e+1); } void equeue_break(equeue_t *q) { - equeue_post_in(q, &q->break_, 0); + equeue_mutex_lock(&q->queuelock); + q->breaks++; + equeue_mutex_unlock(&q->queuelock); + equeue_sema_release(&q->eventsema); } void equeue_dispatch(equeue_t *q, int ms) { - if (ms >= 0) { - equeue_post_in(q, &q->break_, ms); - } + unsigned timeout = equeue_tick() + ms; while (1) { + // collect all the available events and next deadline + struct equeue_event *es = 0; int deadline = -1; - - while (q->queue) { - deadline = -1; - + if (q->queue) { equeue_mutex_lock(&q->queuelock); - if (!q->queue) { - equeue_mutex_unlock(&q->queuelock); - break; - } + es = equeue_dequeue(q, equeue_tick(), &deadline); - deadline = equeue_tickdiff(q->queue->target, equeue_tick()); - if (deadline > 0) { - equeue_mutex_unlock(&q->queuelock); - break; + // mark events as in-flight + for (struct equeue_event *e = es; e; e = e->next) { + e->id = -e->id; } + equeue_mutex_unlock(&q->queuelock); + } - struct equeue_event *e = q->queue; - e->id += 1; - q->queue = e->next; + // dispatch events + while (es) { + struct equeue_event *e = es; + es = e->next; + // actually dispatch the callbacks + void (*cb)(void *) = e->cb; + if (cb) { + cb(e + 1); + } + + // undirty the id and either dealloc or reenqueue periodic events + e->id = -e->id; if (e->period >= 0) { - // requeue periodic tasks to avoid race conditions - // in equeue_cancel + equeue_mutex_lock(&q->queuelock); equeue_enqueue(q, e, e->period); + equeue_mutex_unlock(&q->queuelock); + } else { + equeue_dealloc(q, e+1); } - equeue_mutex_unlock(&q->queuelock); + } - if (e == &q->break_) { + // check if we should stop dispatching soon + if (ms >= 0) { + int diff = equeue_tickdiff(timeout, equeue_tick()); + if (diff <= 0) { return; } - // actually dispatch the callback - e->cb(e + 1); - - if (e->period < 0) { - equeue_dealloc(q, e+1); + if (deadline < 0 || diff < deadline) { + deadline = diff; } } + // wait for events equeue_sema_wait(&q->eventsema, deadline); + + // check if we were notified to break out of dispatch + if (q->breaks) { + equeue_mutex_lock(&q->queuelock); + if (q->breaks > 0) { + q->breaks--; + equeue_mutex_unlock(&q->queuelock); + return; + } + equeue_mutex_unlock(&q->queuelock); + } } } diff --git a/equeue.h b/equeue.h index f0217e4..8036bd4 100644 --- a/equeue.h +++ b/equeue.h @@ -40,6 +40,7 @@ struct equeue_event { typedef struct equeue { struct equeue_event *queue; + int breaks; unsigned char *buffer; unsigned npw2; @@ -56,8 +57,6 @@ typedef struct equeue { unsigned char *data; } slab; - struct equeue_event break_; - equeue_sema_t eventsema; equeue_mutex_t queuelock; equeue_mutex_t memlock;