Skip to content

Commit

Permalink
ioq: Probe for supported io_uring operations
Browse files Browse the repository at this point in the history
  • Loading branch information
tavianator committed Feb 28, 2024
1 parent b83343f commit 8bc72d6
Showing 1 changed file with 67 additions and 18 deletions.
85 changes: 67 additions & 18 deletions src/ioq.c
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,17 @@ static void ioq_batch_push(struct ioqq *ioqq, struct ioq_batch *batch, struct io
/** Sentinel stop command. */
static struct ioq_ent IOQ_STOP;

#if BFS_USE_LIBURING
/**
* Supported io_uring operations.
*/
enum ioq_ring_ops {
IOQ_RING_OPENAT = 1 << 0,
IOQ_RING_CLOSE = 1 << 1,
IOQ_RING_STATX = 1 << 2,
};
#endif

/** I/O queue thread-specific data. */
struct ioq_thread {
/** The thread handle. */
Expand All @@ -470,6 +481,8 @@ struct ioq_thread {
struct io_uring ring;
/** Any error that occurred initializing the ring. */
int ring_err;
/** Bitmask of supported io_uring operations. */
enum ioq_ring_ops ring_ops;
#endif
};

Expand Down Expand Up @@ -553,6 +566,8 @@ struct ioq_ring_state {
struct ioq *ioq;
/** The io_uring. */
struct io_uring *ring;
/** Supported io_uring operations. */
enum ioq_ring_ops ops;
/** Number of prepped, unsubmitted SQEs. */
size_t prepped;
/** Number of submitted, unreaped SQEs. */
Expand All @@ -564,40 +579,48 @@ struct ioq_ring_state {
};

/** Dispatch a single request asynchronously. */
static struct io_uring_sqe *ioq_dispatch_async(struct io_uring *ring, struct ioq_ent *ent) {
static struct io_uring_sqe *ioq_dispatch_async(struct ioq_ring_state *state, struct ioq_ent *ent) {
struct io_uring *ring = state->ring;
enum ioq_ring_ops ops = state->ops;
struct io_uring_sqe *sqe = NULL;

switch (ent->op) {
case IOQ_CLOSE:
case IOQ_CLOSE:
if (ops & IOQ_RING_CLOSE) {
sqe = io_uring_get_sqe(ring);
io_uring_prep_close(sqe, ent->close.fd);
return sqe;
}
return sqe;

case IOQ_OPENDIR: {
case IOQ_OPENDIR:
if (ops & IOQ_RING_OPENAT) {
sqe = io_uring_get_sqe(ring);
struct ioq_opendir *args = &ent->opendir;
int flags = O_RDONLY | O_CLOEXEC | O_DIRECTORY;
io_uring_prep_openat(sqe, args->dfd, args->path, flags, 0);
return sqe;
}
return sqe;

case IOQ_CLOSEDIR:
case IOQ_CLOSEDIR:
#if BFS_USE_UNWRAPDIR
if (ops & IOQ_RING_CLOSE) {
sqe = io_uring_get_sqe(ring);
io_uring_prep_close(sqe, bfs_unwrapdir(ent->closedir.dir));
}
#endif
return sqe;
return sqe;

case IOQ_STAT: {
case IOQ_STAT:
#if BFS_USE_STATX
if (ops & IOQ_RING_STATX) {
sqe = io_uring_get_sqe(ring);
struct ioq_stat *args = &ent->stat;
int flags = bfs_statx_flags(args->flags);
unsigned int mask = STATX_BASIC_STATS | STATX_BTIME;
io_uring_prep_statx(sqe, args->dfd, args->path, flags, mask, args->xbuf);
#endif
return sqe;
}
#endif
return sqe;
}

bfs_bug("Unknown ioq_op %d", (int)ent->op);
Expand All @@ -617,7 +640,7 @@ static void ioq_prep_sqe(struct ioq_ring_state *state, struct ioq_ent *ent) {
return;
}

struct io_uring_sqe *sqe = ioq_dispatch_async(state->ring, ent);
struct io_uring_sqe *sqe = ioq_dispatch_async(state, ent);
if (sqe) {
io_uring_sqe_set_data(sqe, ent);
++state->prepped;
Expand Down Expand Up @@ -743,6 +766,7 @@ static void ioq_ring_work(struct ioq_thread *thread) {
struct ioq_ring_state state = {
.ioq = thread->parent,
.ring = &thread->ring,
.ops = thread->ring_ops,
};

while (ioq_ring_prep(&state)) {
Expand Down Expand Up @@ -824,14 +848,39 @@ static int ioq_ring_init(struct ioq *ioq, struct ioq_thread *thread) {
return -1;
}

if (!prev) {
// Limit the number of io_uring workers
unsigned int values[] = {
ioq->nthreads, // [IO_WQ_BOUND]
0, // [IO_WQ_UNBOUND]
};
io_uring_register_iowq_max_workers(&thread->ring, values);
if (prev) {
// Initial setup already complete
return 0;
}

// Check for supported operations
struct io_uring_probe *probe = io_uring_get_probe_ring(&thread->ring);
if (probe) {
if (io_uring_opcode_supported(probe, IORING_OP_OPENAT)) {
thread->ring_ops |= IOQ_RING_OPENAT;
}
if (io_uring_opcode_supported(probe, IORING_OP_CLOSE)) {
thread->ring_ops |= IOQ_RING_CLOSE;
}
#if BFS_USE_STATX
if (io_uring_opcode_supported(probe, IORING_OP_STATX)) {
thread->ring_ops |= IOQ_RING_STATX;
}
#endif
io_uring_free_probe(probe);
}
if (!thread->ring_ops) {
io_uring_queue_exit(&thread->ring);
thread->ring_err = ENOTSUP;
return -1;
}

// Limit the number of io_uring workers
unsigned int values[] = {
ioq->nthreads, // [IO_WQ_BOUND]
0, // [IO_WQ_UNBOUND]
};
io_uring_register_iowq_max_workers(&thread->ring, values);
#endif

return 0;
Expand Down

0 comments on commit 8bc72d6

Please sign in to comment.