summaryrefslogtreecommitdiff
path: root/block/io_uring.c
blob: 00a3ee9fb852c82295aace92f71517afa45bf9be (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
/*
 * Linux io_uring support.
 *
 * Copyright (C) 2009 IBM, Corp.
 * Copyright (C) 2009 Red Hat, Inc.
 * Copyright (C) 2019 Aarushi Mehta
 *
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
 * See the COPYING file in the top-level directory.
 */
#include "qemu/osdep.h"
#include <liburing.h>
#include "qemu-common.h"
#include "block/aio.h"
#include "qemu/queue.h"
#include "block/block.h"
#include "block/raw-aio.h"
#include "qemu/coroutine.h"
#include "qapi/error.h"
#include "trace.h"

/* io_uring ring size */
#define MAX_ENTRIES 128

typedef struct LuringAIOCB {
    Coroutine *co;
    struct io_uring_sqe sqeq;
    ssize_t ret;
    QEMUIOVector *qiov;
    bool is_read;
    QSIMPLEQ_ENTRY(LuringAIOCB) next;

    /*
     * Buffered reads may require resubmission, see
     * luring_resubmit_short_read().
     */
    int total_read;
    QEMUIOVector resubmit_qiov;
} LuringAIOCB;

typedef struct LuringQueue {
    int plugged;
    unsigned int in_queue;
    unsigned int in_flight;
    bool blocked;
    QSIMPLEQ_HEAD(, LuringAIOCB) submit_queue;
} LuringQueue;

typedef struct LuringState {
    AioContext *aio_context;

    struct io_uring ring;

    /* io queue for submit at batch.  Protected by AioContext lock. */
    LuringQueue io_q;

    /* I/O completion processing.  Only runs in I/O thread.  */
    QEMUBH *completion_bh;
} LuringState;

/**
 * luring_resubmit:
 *
 * Resubmit a request by appending it to submit_queue.  The caller must ensure
 * that ioq_submit() is called later so that submit_queue requests are started.
 */
static void luring_resubmit(LuringState *s, LuringAIOCB *luringcb)
{
    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
    s->io_q.in_queue++;
}

/**
 * luring_resubmit_short_read:
 *
 * Before Linux commit 9d93a3f5a0c ("io_uring: punt short reads to async
 * context") a buffered I/O request with the start of the file range in the
 * page cache could result in a short read.  Applications need to resubmit the
 * remaining read request.
 *
 * This is a slow path but recent kernels never take it.
 */
static void luring_resubmit_short_read(LuringState *s, LuringAIOCB *luringcb,
                                       int nread)
{
    QEMUIOVector *resubmit_qiov;
    size_t remaining;

    trace_luring_resubmit_short_read(s, luringcb, nread);

    /* Update read position */
    luringcb->total_read = nread;
    remaining = luringcb->qiov->size - luringcb->total_read;

    /* Shorten qiov */
    resubmit_qiov = &luringcb->resubmit_qiov;
    if (resubmit_qiov->iov == NULL) {
        qemu_iovec_init(resubmit_qiov, luringcb->qiov->niov);
    } else {
        qemu_iovec_reset(resubmit_qiov);
    }
    qemu_iovec_concat(resubmit_qiov, luringcb->qiov, luringcb->total_read,
                      remaining);

    /* Update sqe */
    luringcb->sqeq.off = nread;
    luringcb->sqeq.addr = (__u64)(uintptr_t)luringcb->resubmit_qiov.iov;
    luringcb->sqeq.len = luringcb->resubmit_qiov.niov;

    luring_resubmit(s, luringcb);
}

/**
 * luring_process_completions:
 * @s: AIO state
 *
 * Fetches completed I/O requests, consumes cqes and invokes their callbacks
 * The function is somewhat tricky because it supports nested event loops, for
 * example when a request callback invokes aio_poll().
 *
 * Function schedules BH completion so it  can be called again in a nested
 * event loop.  When there are no events left  to complete the BH is being
 * canceled.
 *
 */
static void luring_process_completions(LuringState *s)
{
    struct io_uring_cqe *cqes;
    int total_bytes;
    /*
     * Request completion callbacks can run the nested event loop.
     * Schedule ourselves so the nested event loop will "see" remaining
     * completed requests and process them.  Without this, completion
     * callbacks that wait for other requests using a nested event loop
     * would hang forever.
     *
     * This workaround is needed because io_uring uses poll_wait, which
     * is woken up when new events are added to the uring, thus polling on
     * the same uring fd will block unless more events are received.
     *
     * Other leaf block drivers (drivers that access the data themselves)
     * are networking based, so they poll sockets for data and run the
     * correct coroutine.
     */
    qemu_bh_schedule(s->completion_bh);

    while (io_uring_peek_cqe(&s->ring, &cqes) == 0) {
        LuringAIOCB *luringcb;
        int ret;

        if (!cqes) {
            break;
        }

        luringcb = io_uring_cqe_get_data(cqes);
        ret = cqes->res;
        io_uring_cqe_seen(&s->ring, cqes);
        cqes = NULL;

        /* Change counters one-by-one because we can be nested. */
        s->io_q.in_flight--;
        trace_luring_process_completion(s, luringcb, ret);

        /* total_read is non-zero only for resubmitted read requests */
        total_bytes = ret + luringcb->total_read;

        if (ret < 0) {
            if (ret == -EINTR) {
                luring_resubmit(s, luringcb);
                continue;
            }
        } else if (!luringcb->qiov) {
            goto end;
        } else if (total_bytes == luringcb->qiov->size) {
            ret = 0;
        /* Only read/write */
        } else {
            /* Short Read/Write */
            if (luringcb->is_read) {
                if (ret > 0) {
                    luring_resubmit_short_read(s, luringcb, ret);
                    continue;
                } else {
                    /* Pad with zeroes */
                    qemu_iovec_memset(luringcb->qiov, total_bytes, 0,
                                      luringcb->qiov->size - total_bytes);
                    ret = 0;
                }
            } else {
                ret = -ENOSPC;
            }
        }
end:
        luringcb->ret = ret;
        qemu_iovec_destroy(&luringcb->resubmit_qiov);

        /*
         * If the coroutine is already entered it must be in ioq_submit()
         * and will notice luringcb->ret has been filled in when it
         * eventually runs later. Coroutines cannot be entered recursively
         * so avoid doing that!
         */
        if (!qemu_coroutine_entered(luringcb->co)) {
            aio_co_wake(luringcb->co);
        }
    }
    qemu_bh_cancel(s->completion_bh);
}

static int ioq_submit(LuringState *s)
{
    int ret = 0;
    LuringAIOCB *luringcb, *luringcb_next;

    while (s->io_q.in_queue > 0) {
        /*
         * Try to fetch sqes from the ring for requests waiting in
         * the overflow queue
         */
        QSIMPLEQ_FOREACH_SAFE(luringcb, &s->io_q.submit_queue, next,
                              luringcb_next) {
            struct io_uring_sqe *sqes = io_uring_get_sqe(&s->ring);
            if (!sqes) {
                break;
            }
            /* Prep sqe for submission */
            *sqes = luringcb->sqeq;
            QSIMPLEQ_REMOVE_HEAD(&s->io_q.submit_queue, next);
        }
        ret = io_uring_submit(&s->ring);
        trace_luring_io_uring_submit(s, ret);
        /* Prevent infinite loop if submission is refused */
        if (ret <= 0) {
            if (ret == -EAGAIN || ret == -EINTR) {
                continue;
            }
            break;
        }
        s->io_q.in_flight += ret;
        s->io_q.in_queue  -= ret;
    }
    s->io_q.blocked = (s->io_q.in_queue > 0);

    if (s->io_q.in_flight) {
        /*
         * We can try to complete something just right away if there are
         * still requests in-flight.
         */
        luring_process_completions(s);
    }
    return ret;
}

static void luring_process_completions_and_submit(LuringState *s)
{
    aio_context_acquire(s->aio_context);
    luring_process_completions(s);

    if (!s->io_q.plugged && s->io_q.in_queue > 0) {
        ioq_submit(s);
    }
    aio_context_release(s->aio_context);
}

static void qemu_luring_completion_bh(void *opaque)
{
    LuringState *s = opaque;
    luring_process_completions_and_submit(s);
}

static void qemu_luring_completion_cb(void *opaque)
{
    LuringState *s = opaque;
    luring_process_completions_and_submit(s);
}

static bool qemu_luring_poll_cb(void *opaque)
{
    LuringState *s = opaque;

    if (io_uring_cq_ready(&s->ring)) {
        luring_process_completions_and_submit(s);
        return true;
    }

    return false;
}

static void ioq_init(LuringQueue *io_q)
{
    QSIMPLEQ_INIT(&io_q->submit_queue);
    io_q->plugged = 0;
    io_q->in_queue = 0;
    io_q->in_flight = 0;
    io_q->blocked = false;
}

void luring_io_plug(BlockDriverState *bs, LuringState *s)
{
    trace_luring_io_plug(s);
    s->io_q.plugged++;
}

void luring_io_unplug(BlockDriverState *bs, LuringState *s)
{
    assert(s->io_q.plugged);
    trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged,
                           s->io_q.in_queue, s->io_q.in_flight);
    if (--s->io_q.plugged == 0 &&
        !s->io_q.blocked && s->io_q.in_queue > 0) {
        ioq_submit(s);
    }
}

/**
 * luring_do_submit:
 * @fd: file descriptor for I/O
 * @luringcb: AIO control block
 * @s: AIO state
 * @offset: offset for request
 * @type: type of request
 *
 * Fetches sqes from ring, adds to pending queue and preps them
 *
 */
static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s,
                            uint64_t offset, int type)
{
    int ret;
    struct io_uring_sqe *sqes = &luringcb->sqeq;

    switch (type) {
    case QEMU_AIO_WRITE:
        io_uring_prep_writev(sqes, fd, luringcb->qiov->iov,
                             luringcb->qiov->niov, offset);
        break;
    case QEMU_AIO_READ:
        io_uring_prep_readv(sqes, fd, luringcb->qiov->iov,
                            luringcb->qiov->niov, offset);
        break;
    case QEMU_AIO_FLUSH:
        io_uring_prep_fsync(sqes, fd, IORING_FSYNC_DATASYNC);
        break;
    default:
        fprintf(stderr, "%s: invalid AIO request type, aborting 0x%x.\n",
                        __func__, type);
        abort();
    }
    io_uring_sqe_set_data(sqes, luringcb);

    QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next);
    s->io_q.in_queue++;
    trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged,
                           s->io_q.in_queue, s->io_q.in_flight);
    if (!s->io_q.blocked &&
        (!s->io_q.plugged ||
         s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) {
        ret = ioq_submit(s);
        trace_luring_do_submit_done(s, ret);
        return ret;
    }
    return 0;
}

int coroutine_fn luring_co_submit(BlockDriverState *bs, LuringState *s, int fd,
                                  uint64_t offset, QEMUIOVector *qiov, int type)
{
    int ret;
    LuringAIOCB luringcb = {
        .co         = qemu_coroutine_self(),
        .ret        = -EINPROGRESS,
        .qiov       = qiov,
        .is_read    = (type == QEMU_AIO_READ),
    };
    trace_luring_co_submit(bs, s, &luringcb, fd, offset, qiov ? qiov->size : 0,
                           type);
    ret = luring_do_submit(fd, &luringcb, s, offset, type);

    if (ret < 0) {
        return ret;
    }

    if (luringcb.ret == -EINPROGRESS) {
        qemu_coroutine_yield();
    }
    return luringcb.ret;
}

void luring_detach_aio_context(LuringState *s, AioContext *old_context)
{
    aio_set_fd_handler(old_context, s->ring.ring_fd, false, NULL, NULL, NULL,
                       s);
    qemu_bh_delete(s->completion_bh);
    s->aio_context = NULL;
}

void luring_attach_aio_context(LuringState *s, AioContext *new_context)
{
    s->aio_context = new_context;
    s->completion_bh = aio_bh_new(new_context, qemu_luring_completion_bh, s);
    aio_set_fd_handler(s->aio_context, s->ring.ring_fd, false,
                       qemu_luring_completion_cb, NULL, qemu_luring_poll_cb, s);
}

LuringState *luring_init(Error **errp)
{
    int rc;
    LuringState *s = g_new0(LuringState, 1);
    struct io_uring *ring = &s->ring;

    trace_luring_init_state(s, sizeof(*s));

    rc = io_uring_queue_init(MAX_ENTRIES, ring, 0);
    if (rc < 0) {
        error_setg_errno(errp, errno, "failed to init linux io_uring ring");
        g_free(s);
        return NULL;
    }

    ioq_init(&s->io_q);
    return s;

}

void luring_cleanup(LuringState *s)
{
    io_uring_queue_exit(&s->ring);
    trace_luring_cleanup_state(s);
    g_free(s);
}