Skip to content

Commit

Permalink
ioq: Submit and pop requests in batches
Browse files Browse the repository at this point in the history
The new ioq_submit() function is now necessary to call to ensure the
pending request batch is flushed.
  • Loading branch information
tavianator committed Dec 2, 2024
1 parent 638850e commit 92895bc
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 17 deletions.
1 change: 1 addition & 0 deletions bench/ioq.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ int main(int argc, char *argv[]) {
break;
}
}
ioq_submit(ioq);
}

if (load(&quit, relaxed)) {
Expand Down
5 changes: 5 additions & 0 deletions src/bftw.c
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,7 @@ static int bftw_ioq_pop(struct bftw_state *state, bool block) {
return -1;
}

ioq_submit(ioq);
struct ioq_ent *ent = ioq_pop(ioq, block);
if (!ent) {
return -1;
Expand Down Expand Up @@ -1957,6 +1958,10 @@ static void bftw_flush(struct bftw_state *state) {

bftw_queue_flush(&state->dirq);
bftw_ioq_opendirs(state);

if (state->ioq) {
ioq_submit(state->ioq);
}
}

/** Close the current directory. */
Expand Down
49 changes: 32 additions & 17 deletions src/ioq.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,13 +427,6 @@ static void ioqq_push_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t s
} while (size > 0);
}

/** Pop an entry from the queue. */
static struct ioq_ent *ioqq_pop(struct ioqq *ioqq, bool block) {
size_t i = fetch_add(&ioqq->tail, 1, relaxed);
ioq_slot *slot = &ioqq->slots[i & ioqq->slot_mask];
return ioq_slot_pop(ioqq, slot, block);
}

/** Pop a batch of entries from the queue. */
static void ioqq_pop_batch(struct ioqq *ioqq, struct ioq_ent *batch[], size_t size, bool block) {
size_t mask = ioqq->slot_mask;
Expand Down Expand Up @@ -467,7 +460,7 @@ static void ioq_batch_reset(struct ioq_batch *batch) {

/** Check if a batch is empty. */
static bool ioq_batch_empty(const struct ioq_batch *batch) {
return batch->head == batch->tail;
return batch->head >= batch->tail;
}

/** Send a batch to a queue. */
Expand Down Expand Up @@ -505,6 +498,15 @@ static bool ioq_batch_fill(struct ioqq *ioqq, struct ioq_batch *batch, bool bloc
/** Pop an entry from a batch, filling it first if necessary. */
static struct ioq_ent *ioq_batch_pop(struct ioqq *ioqq, struct ioq_batch *batch, bool block) {
if (ioq_batch_empty(batch)) {
// For non-blocking pops, make sure that each ioq_batch_pop()
// corresponds to a single (amortized) increment of ioqq->head.
// Otherwise, we start skipping many slots and batching ends up
// degrading performance.
if (!block && batch->head < IOQ_BATCH) {
++batch->head;
return NULL;
}

if (!ioq_batch_fill(ioqq, batch, block)) {
return NULL;
}
Expand Down Expand Up @@ -559,11 +561,16 @@ struct ioq {
struct arena xbufs;
#endif

/** Pending I/O requests. */
/** Pending I/O request queue. */
struct ioqq *pending;
/** Ready I/O responses. */
/** Ready I/O response queue. */
struct ioqq *ready;

/** Pending request batch. */
struct ioq_batch pending_batch;
/** Ready request batch. */
struct ioq_batch ready_batch;

/** The number of background threads. */
size_t nthreads;
/** The background threads themselves. */
Expand Down Expand Up @@ -1122,7 +1129,7 @@ int ioq_nop(struct ioq *ioq, enum ioq_nop_type type, void *ptr) {

ent->nop.type = type;

ioqq_push(ioq->pending, ent);
ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}

Expand All @@ -1134,7 +1141,7 @@ int ioq_close(struct ioq *ioq, int fd, void *ptr) {

ent->close.fd = fd;

ioqq_push(ioq->pending, ent);
ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}

Expand All @@ -1150,7 +1157,7 @@ int ioq_opendir(struct ioq *ioq, struct bfs_dir *dir, int dfd, const char *path,
args->path = path;
args->flags = flags;

ioqq_push(ioq->pending, ent);
ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}

Expand All @@ -1162,7 +1169,7 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr) {

ent->closedir.dir = dir;

ioqq_push(ioq->pending, ent);
ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}

Expand All @@ -1186,16 +1193,23 @@ int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags fla
}
#endif

ioqq_push(ioq->pending, ent);
ioq_batch_push(ioq->pending, &ioq->pending_batch, ent);
return 0;
}

void ioq_submit(struct ioq *ioq) {
ioq_batch_flush(ioq->pending, &ioq->pending_batch);
}

struct ioq_ent *ioq_pop(struct ioq *ioq, bool block) {
// Don't forget to submit before popping
bfs_assert(ioq_batch_empty(&ioq->pending_batch));

if (ioq->size == 0) {
return NULL;
}

return ioqq_pop(ioq->ready, block);
return ioq_batch_pop(ioq->ready, &ioq->ready_batch, block);
}

void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {
Expand All @@ -1213,7 +1227,8 @@ void ioq_free(struct ioq *ioq, struct ioq_ent *ent) {

void ioq_cancel(struct ioq *ioq) {
if (!exchange(&ioq->cancel, true, relaxed)) {
ioqq_push(ioq->pending, &IOQ_STOP);
ioq_batch_push(ioq->pending, &ioq->pending_batch, &IOQ_STOP);
ioq_submit(ioq);
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/ioq.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ int ioq_closedir(struct ioq *ioq, struct bfs_dir *dir, void *ptr);
*/
int ioq_stat(struct ioq *ioq, int dfd, const char *path, enum bfs_stat_flags flags, struct bfs_stat *buf, void *ptr);

/**
* Submit any buffered requests.
*/
void ioq_submit(struct ioq *ioq);

/**
* Pop a response from the queue.
*
Expand Down
1 change: 1 addition & 0 deletions tests/ioq.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ static void check_ioq_push_block(void) {
int ret = ioq_opendir(ioq, dir, AT_FDCWD, ".", 0, NULL);
bfs_everify(ret == 0, "ioq_opendir()");
}
ioq_submit(ioq);
bfs_verify(ioq_capacity(ioq) == 0);

// Now cancel the queue, pushing an additional IOQ_STOP message
Expand Down

0 comments on commit 92895bc

Please sign in to comment.