diff --git a/dist/s_string.ok b/dist/s_string.ok index e8040db4b..4aac7eb93 100644 --- a/dist/s_string.ok +++ b/dist/s_string.ok @@ -1385,6 +1385,7 @@ update's upg uri uri's +uring uris usec usecs diff --git a/src/btree/bt_sync.c b/src/btree/bt_sync.c index 461acf4ec..9b857455f 100644 --- a/src/btree/bt_sync.c +++ b/src/btree/bt_sync.c @@ -688,6 +688,10 @@ err: if (txn->isolation == WT_ISO_READ_COMMITTED && saved_pinned_id == WT_TXN_NONE) __wt_txn_release_snapshot(session); + /* Sync the file before leave to write io_uring */ + if (ret == 0) + WT_RET(btree->bm->sync(btree->bm, session, true)); + /* Clear the checkpoint flag. */ btree->syncing = WT_BTREE_SYNC_OFF; btree->sync_session = NULL; diff --git a/src/include/os.h b/src/include/os.h index 965baab05..71ad98f6f 100644 --- a/src/include/os.h +++ b/src/include/os.h @@ -140,6 +140,13 @@ struct __wt_file_handle_posix { bool direct_io; /* O_DIRECT configured */ + /* io_uring support */ + struct io_uring ring; + bool ring_initialized; + unsigned io_uring_requests; + uint64_t nsubmit; + uint64_t ncomplete; + /* The memory buffer and variables if we use mmap for I/O */ uint8_t *mmap_buf; bool mmap_file_mappable; diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index b0e0b4c45..3422afdf0 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -61,6 +61,9 @@ extern "C" { #include #endif +/* io_uring support */ +#include + /* * DO NOT EDIT: automatically built by dist/s_typedef. * Forward type declarations for internal types: BEGIN diff --git a/src/os_posix/os_fs.c b/src/os_posix/os_fs.c index ede22518c..b13a64c91 100644 --- a/src/os_posix/os_fs.c +++ b/src/os_posix/os_fs.c @@ -28,6 +28,12 @@ #include "wt_internal.h" +#define WT_IO_URING_ENTRIES 64 + +static int __posix_io_uring_done(WT_FILE_HANDLE *, WT_SESSION *, bool, bool *); +static int __posix_file_write_io_uring_complete( + WT_SESSION_IMPL *session, WT_FILE_HANDLE_POSIX *pfh, bool wait); + /* * __posix_sync -- * Underlying support function to flush a file descriptor. Fsync calls (or fsync-style calls, @@ -342,12 +348,19 @@ __posix_file_close(WT_FILE_HANDLE *file_handle, WT_SESSION *wt_session) WT_DECL_RET; WT_FILE_HANDLE_POSIX *pfh; WT_SESSION_IMPL *session; + bool io_uring_done; session = (WT_SESSION_IMPL *)wt_session; pfh = (WT_FILE_HANDLE_POSIX *)file_handle; __wt_verbose(session, WT_VERB_FILEOPS, "%s, file-close: fd=%d", file_handle->name, pfh->fd); + if (pfh->nsubmit - pfh->ncomplete > 0) { + if (__posix_file_write_io_uring_complete(session, pfh, true) != 0) + WT_RET_MSG( + session, WT_ERROR, "%s: handle-write: io_uring: failed to submit", file_handle->name); + } + if (pfh->mmap_file_mappable && pfh->mmap_buf != NULL) __wt_unmap_file(file_handle, wt_session); @@ -357,6 +370,14 @@ __posix_file_close(WT_FILE_HANDLE *file_handle, WT_SESSION *wt_session) if (ret != 0) __wt_err(session, ret, "%s: handle-close: close", file_handle->name); } + if (pfh->ring_initialized){ + WT_RET(__posix_io_uring_done(file_handle, wt_session, false, &io_uring_done)); + if (!io_uring_done) + __wt_verbose(session, WT_VERB_FILEOPS, "%s, file-close: fd=%d WARNING - IO still pending", + file_handle->name, pfh->fd); + io_uring_queue_exit(&pfh->ring); + pfh->ring_initialized = false; + } __wt_free(session, file_handle->name); __wt_free(session, pfh); @@ -422,6 +443,12 @@ __posix_file_read( (!((uintptr_t)buf & (uintptr_t)(S2C(session)->buffer_alignment - 1)) && len >= S2C(session)->buffer_alignment && len % S2C(session)->buffer_alignment == 0)); + if (pfh->nsubmit - pfh->ncomplete > 0) { + if (__posix_file_write_io_uring_complete(session, pfh, true) != 0) + WT_RET_MSG( + session, WT_ERROR, "%s: handle-write: io_uring: failed to submit", file_handle->name); + } + /* Break reads larger than 1GB into 1GB chunks. */ for (addr = buf; len > 0; addr += nr, len -= (size_t)nr, offset += nr) { chunk = WT_MIN(len, WT_GIGABYTE); @@ -517,7 +544,13 @@ __posix_file_sync(WT_FILE_HANDLE *file_handle, WT_SESSION *wt_session) session = (WT_SESSION_IMPL *)wt_session; pfh = (WT_FILE_HANDLE_POSIX *)file_handle; - return (__posix_sync(session, pfh->fd, file_handle->name, "handle-sync")); + if (pfh->nsubmit - pfh->ncomplete > 0) { + if (__posix_file_write_io_uring_complete(session, pfh, true) != 0) + WT_RET_MSG( + session, WT_ERROR, "%s: handle-write: io_uring: failed to submit", file_handle->name); + return (0); + } else + return (__posix_sync(session, pfh->fd, file_handle->name, "handle-sync")); } #ifdef HAVE_SYNC_FILE_RANGE @@ -535,6 +568,11 @@ __posix_file_sync_nowait(WT_FILE_HANDLE *file_handle, WT_SESSION *wt_session) session = (WT_SESSION_IMPL *)wt_session; pfh = (WT_FILE_HANDLE_POSIX *)file_handle; + if (pfh->nsubmit - pfh->ncomplete > 0) { + io_uring_submit_and_wait(&pfh->ring, pfh->nsubmit - pfh->ncomplete); + pfh->ncomplete = pfh->nsubmit; + } + /* See comment in __posix_sync(): sync cannot be retried or fail. */ WT_SYSCALL(sync_file_range(pfh->fd, (off64_t)0, (off64_t)0, SYNC_FILE_RANGE_WRITE), ret); if (ret == 0) @@ -581,6 +619,56 @@ __posix_file_truncate(WT_FILE_HANDLE *file_handle, WT_SESSION *wt_session, wt_of } #endif +/* + * __posix_file_write_io_uring_complete -- + * POSIX io_uring submit and process the completion queue. + */ +static int +__posix_file_write_io_uring_complete(WT_SESSION_IMPL *session, WT_FILE_HANDLE_POSIX *pfh, bool wait) +{ + struct io_uring_cqe *complete; + WT_DECL_RET; + void *data; + + __wt_verbose(session, WT_VERB_WRITE, "%s, io_uring-submit: fd=%d", pfh->iface.name, pfh->fd); + + if (pfh->nsubmit - pfh->ncomplete > 0) { + if (io_uring_submit(&pfh->ring) < 0) + WT_RET_MSG(session, __wt_errno(), "%s: handle-write: io_uring: failed to submit", + pfh->iface.name); + } + + while (pfh->ncomplete < pfh->nsubmit) { + if (wait) + ret = io_uring_wait_cqe(&pfh->ring, &complete); + else { + ret = io_uring_peek_cqe(&pfh->ring, &complete); + if (ret == -EAGAIN) + return (0); + } + + if (ret < 0) + WT_RET_MSG(session, __wt_errno(), + "%s: handle-write: io_uring: failed to wait complete queue", pfh->iface.name); + + /* Retrieve user data from complete */ + data = io_uring_cqe_get_data(complete); + + /* The system call invoked failed */ + if (complete->res < 0) + WT_RET_MSG(session, complete->res, "%s: handle-write: io_uring: system call failed", + pfh->iface.name); + + __wt_free(session, data); + + /* Mark this completion as seen */ + io_uring_cqe_seen(&pfh->ring, complete); + pfh->ncomplete++; + } + + return (0); +} + /* * __posix_file_write -- * POSIX pwrite. @@ -589,14 +677,19 @@ static int __posix_file_write( WT_FILE_HANDLE *file_handle, WT_SESSION *wt_session, wt_off_t offset, size_t len, const void *buf) { + struct io_uring_sqe *sqe; + struct iovec iov; + WT_BTREE *btree; WT_FILE_HANDLE_POSIX *pfh; WT_SESSION_IMPL *session; size_t chunk; ssize_t nw; const uint8_t *addr; + void *image; session = (WT_SESSION_IMPL *)wt_session; pfh = (WT_FILE_HANDLE_POSIX *)file_handle; + btree = S2BT_SAFE(session); __wt_verbose(session, WT_VERB_WRITE, "write: %s, fd=%d, offset=%" PRId64 ", len=%" WT_SIZET_FMT, file_handle->name, pfh->fd, offset, len); @@ -610,11 +703,39 @@ __posix_file_write( /* Break writes larger than 1GB into 1GB chunks. */ for (addr = buf; len > 0; addr += nw, len -= (size_t)nw, offset += nw) { chunk = WT_MIN(len, WT_GIGABYTE); - if ((nw = pwrite(pfh->fd, addr, chunk, offset)) < 0) - WT_RET_MSG(session, __wt_errno(), - "%s: handle-write: pwrite: failed to write %" WT_SIZET_FMT - " bytes at offset %" PRIuMAX, - file_handle->name, chunk, (uintmax_t)offset); + if (S2C(session)->txn_global.checkpoint_running && btree != NULL && + WT_SESSION_BTREE_SYNC(session)) { + sqe = io_uring_get_sqe(&pfh->ring); + while (sqe == NULL) { + if (__posix_file_write_io_uring_complete(session, pfh, true) != 0) + WT_RET_MSG(session, WT_ERROR, "%s: handle-write: io_uring: failed to submit", + file_handle->name); + sqe = io_uring_get_sqe(&pfh->ring); + } + WT_RET(__wt_memdup(session, addr, chunk, &image)); + iov.iov_base = image; + iov.iov_len = chunk; + nw = (ssize_t)chunk; + + io_uring_prep_writev(sqe, pfh->fd, &iov, 1, offset); + io_uring_sqe_set_data(sqe, image); + pfh->nsubmit++; + + if (__posix_file_write_io_uring_complete(session, pfh, true) != 0) + WT_RET_MSG(session, WT_ERROR, "%s: handle-write: io_uring: failed to submit", + file_handle->name); + } else { + if (pfh->nsubmit - pfh->ncomplete > 0) { + if (__posix_file_write_io_uring_complete(session, pfh, true) != 0) + WT_RET_MSG(session, WT_ERROR, "%s: handle-write: io_uring: failed to submit", + file_handle->name); + } + if ((nw = pwrite(pfh->fd, addr, chunk, offset)) < 0) + WT_RET_MSG(session, __wt_errno(), + "%s: handle-write: pwrite: failed to write %" WT_SIZET_FMT + " bytes at offset %" PRIuMAX, + file_handle->name, chunk, (uintmax_t)offset); + } } WT_STAT_CONN_INCRV(session, block_byte_write_syscall, len); return (0); @@ -738,6 +859,8 @@ __posix_open_file(WT_FILE_SYSTEM *file_system, WT_SESSION *wt_session, const cha /* Set up error handling. */ pfh->fd = -1; + pfh->ring_initialized = false; + if (file_type == WT_FS_OPEN_FILE_TYPE_DIRECTORY) { f = O_RDONLY; #ifdef O_CLOEXEC @@ -901,6 +1024,12 @@ directory_open: else file_handle->fh_write = __posix_file_write; + WT_ERR(io_uring_queue_init(WT_IO_URING_ENTRIES, &pfh->ring, 0)); + pfh->nsubmit = 0; + pfh->ncomplete = 0; + pfh->io_uring_requests = 0; + pfh->ring_initialized = true; + *file_handlep = file_handle; return (0); @@ -925,6 +1054,68 @@ __posix_terminate(WT_FILE_SYSTEM *file_system, WT_SESSION *wt_session) return (0); } +/* + * __posix_io_uring_done -- + * Sets result as true if all io_uring requests have finished and false otherwise. If wait is + * set, the call will block until all io_uring requests are done. Returns an error if something + * goes wrong while checking the status. Note that result will be set false in case there is an + * error to be returned. + */ +static int +__posix_io_uring_done(WT_FILE_HANDLE *file_handle, WT_SESSION *wt_session, bool wait, bool *result) +{ + struct io_uring_cqe *cqe; + WT_DECL_RET; + WT_FILE_HANDLE_POSIX *pfh; + WT_SESSION_IMPL *session; + unsigned n_requests; + unsigned req_finished; + + session = (WT_SESSION_IMPL *)wt_session; + pfh = (WT_FILE_HANDLE_POSIX *)file_handle; + cqe = NULL; + + WT_UNUSED(session); + + n_requests = pfh->io_uring_requests; + /* Fast path: If there are no requests pending, return now. */ + if (n_requests == 0) { + *result = true; + return 0; + } +retry: + if (wait) { + WT_ERR(io_uring_wait_cqe_nr(&pfh->ring, &cqe, n_requests)); + if (cqe->res == -EAGAIN){ + io_uring_cqe_seen(&pfh->ring, cqe); + goto retry; + } + *result = true; + } else { + req_finished = io_uring_peek_batch_cqe(&pfh->ring, &cqe, n_requests); + if (cqe->res == -EAGAIN){ + ret = 0; + *result = false; + goto err; + } + n_requests -= req_finished; + if (n_requests > 0) { + pfh->io_uring_requests = n_requests; + *result = false; + } else + *result = true; + } + +err: + if (cqe) + io_uring_cqe_seen(&pfh->ring, cqe); + if (ret) + *result = false; + if (*result) + pfh->io_uring_requests = 0; + return ret; +} + /* * __wt_os_posix -- * Initialize a POSIX configuration. diff --git a/test/cursor_order/cursor_order.c b/test/cursor_order/cursor_order.c index 9d9540225..e7e8ebf8a 100644 --- a/test/cursor_order/cursor_order.c +++ b/test/cursor_order/cursor_order.c @@ -34,7 +34,7 @@ static FILE *logfp; /* Log file */ static int handle_error(WT_EVENT_HANDLER *, WT_SESSION *, int, const char *); static int handle_message(WT_EVENT_HANDLER *, WT_SESSION *, const char *); static void onint(int) WT_GCC_FUNC_DECL_ATTRIBUTE((noreturn)); -static void shutdown(void); +static void __shutdown(void); static int usage(void); static void wt_connect(SHARED_CONFIG *, char *); static void wt_shutdown(SHARED_CONFIG *); @@ -146,7 +146,7 @@ main(int argc, char *argv[]) printf(" %d: %" PRIu64 " reverse scanners, %" PRIu64 " writers\n", cnt, cfg->reverse_scanners, cfg->append_inserters); - shutdown(); /* Clean up previous runs */ + __shutdown(); /* Clean up previous runs */ wt_connect(cfg, config_open); /* WiredTiger connection */ @@ -203,7 +203,7 @@ wt_shutdown(SHARED_CONFIG *cfg) * Clean up from previous runs. */ static void -shutdown(void) +__shutdown(void) { testutil_clean_work_dir(home); } @@ -239,7 +239,7 @@ onint(int signo) { (void)(signo); - shutdown(); + __shutdown(); fprintf(stderr, "\n"); exit(EXIT_FAILURE); diff --git a/test/fops/t.c b/test/fops/t.c index db366bfb2..2987c6062 100644 --- a/test/fops/t.c +++ b/test/fops/t.c @@ -42,7 +42,7 @@ static char home[512]; static int handle_error(WT_EVENT_HANDLER *, WT_SESSION *, int, const char *); static int handle_message(WT_EVENT_HANDLER *, WT_SESSION *, const char *); static void onint(int) WT_GCC_FUNC_DECL_ATTRIBUTE((noreturn)); -static void shutdown(void); +static void __shutdown(void); static int usage(void); static void wt_startup(char *); static void wt_shutdown(void); @@ -117,7 +117,7 @@ main(int argc, char *argv[]) printf("%s: process %" PRIu64 "\n", progname, (uint64_t)getpid()); for (cnt = 1; runs == 0 || cnt <= runs; ++cnt) { - shutdown(); /* Clean up previous runs */ + __shutdown(); /* Clean up previous runs */ for (cp = configs; cp->uri != NULL; ++cp) { uri = cp->uri; @@ -171,7 +171,7 @@ wt_shutdown(void) * Clean up from previous runs. */ static void -shutdown(void) +__shutdown(void) { testutil_clean_work_dir(home); } @@ -219,7 +219,7 @@ onint(int signo) { (void)(signo); - shutdown(); + __shutdown(); fprintf(stderr, "\n"); exit(EXIT_FAILURE); diff --git a/test/thread/t.c b/test/thread/t.c index 4ad76f8ac..97a1f6d94 100644 --- a/test/thread/t.c +++ b/test/thread/t.c @@ -42,7 +42,7 @@ static FILE *logfp; /* Log file */ static int handle_error(WT_EVENT_HANDLER *, WT_SESSION *, int, const char *); static int handle_message(WT_EVENT_HANDLER *, WT_SESSION *, const char *); static void onint(int) WT_GCC_FUNC_DECL_ATTRIBUTE((noreturn)); -static void shutdown(void); +static void __shutdown(void); static int usage(void); static void wt_connect(char *); static void wt_shutdown(void); @@ -150,7 +150,7 @@ main(int argc, char *argv[]) for (cnt = 1; runs == 0 || cnt <= runs; ++cnt) { printf(" %d: %u readers, %u writers\n", cnt, readers, writers); - shutdown(); /* Clean up previous runs */ + __shutdown(); /* Clean up previous runs */ wt_connect(config_open); /* WiredTiger connection */ @@ -206,7 +206,7 @@ wt_shutdown(void) * Clean up from previous runs. */ static void -shutdown(void) +__shutdown(void) { testutil_clean_work_dir(home); } @@ -242,7 +242,7 @@ onint(int signo) { (void)(signo); - shutdown(); + __shutdown(); fprintf(stderr, "\n"); exit(EXIT_FAILURE);