diff --git a/src/include/connection.h b/src/include/connection.h index 959448554..4e343117b 100644 --- a/src/include/connection.h +++ b/src/include/connection.h @@ -667,34 +667,35 @@ struct __wt_connection_impl { uint32_t server_flags; /* AUTOMATIC FLAG VALUE GENERATION START 0 */ -#define WT_CONN_BACKUP_PARTIAL_RESTORE 0x0000001u -#define WT_CONN_CACHE_CURSORS 0x0000002u -#define WT_CONN_CACHE_POOL 0x0000004u -#define WT_CONN_CALL_LOG_ENABLED 0x0000008u -#define WT_CONN_CKPT_GATHER 0x0000010u -#define WT_CONN_CKPT_SYNC 0x0000020u -#define WT_CONN_CLOSING 0x0000040u -#define WT_CONN_CLOSING_CHECKPOINT 0x0000080u -#define WT_CONN_CLOSING_NO_MORE_OPENS 0x0000100u -#define WT_CONN_COMPATIBILITY 0x0000200u -#define WT_CONN_DATA_CORRUPTION 0x0000400u -#define WT_CONN_EVICTION_RUN 0x0000800u -#define WT_CONN_HS_OPEN 0x0001000u -#define WT_CONN_INCR_BACKUP 0x0002000u -#define WT_CONN_IN_MEMORY 0x0004000u -#define WT_CONN_LEAK_MEMORY 0x0008000u -#define WT_CONN_LSM_MERGE 0x0010000u -#define WT_CONN_MINIMAL 0x0020000u -#define WT_CONN_OPTRACK 0x0040000u -#define WT_CONN_PANIC 0x0080000u -#define WT_CONN_READONLY 0x0100000u -#define WT_CONN_READY 0x0200000u -#define WT_CONN_RECONFIGURING 0x0400000u -#define WT_CONN_RECOVERING 0x0800000u -#define WT_CONN_RECOVERY_COMPLETE 0x1000000u -#define WT_CONN_SALVAGE 0x2000000u -#define WT_CONN_TIERED_FIRST_FLUSH 0x4000000u -#define WT_CONN_WAS_BACKUP 0x8000000u +#define WT_CONN_BACKUP_PARTIAL_RESTORE 0x00000001u +#define WT_CONN_CACHE_CURSORS 0x00000002u +#define WT_CONN_CACHE_POOL 0x00000004u +#define WT_CONN_CALL_LOG_ENABLED 0x00000008u +#define WT_CONN_CKPT_GATHER 0x00000010u +#define WT_CONN_CKPT_SYNC 0x00000020u +#define WT_CONN_CLOSING 0x00000040u +#define WT_CONN_CLOSING_CHECKPOINT 0x00000080u +#define WT_CONN_CLOSING_NO_MORE_OPENS 0x00000100u +#define WT_CONN_COMPATIBILITY 0x00000200u +#define WT_CONN_DATA_CORRUPTION 0x00000400u +#define WT_CONN_EVICTION_RUN 0x00000800u +#define WT_CONN_HS_OPEN 0x00001000u +#define WT_CONN_INCR_BACKUP 0x00002000u +#define WT_CONN_IN_MEMORY 0x00004000u +#define WT_CONN_LEAK_MEMORY 0x00008000u +#define WT_CONN_LSM_MERGE 0x00010000u +#define WT_CONN_MINIMAL 0x00020000u +#define WT_CONN_OPTRACK 0x00040000u +#define WT_CONN_PANIC 0x00080000u +#define WT_CONN_READONLY 0x00100000u +#define WT_CONN_READY 0x00200000u +#define WT_CONN_RECONFIGURING 0x00400000u +#define WT_CONN_RECOVERING 0x00800000u +#define WT_CONN_RECOVERY_COMPLETE 0x01000000u +#define WT_CONN_RTS_THREAD_RUN 0x02000000u +#define WT_CONN_SALVAGE 0x04000000u +#define WT_CONN_TIERED_FIRST_FLUSH 0x08000000u +#define WT_CONN_WAS_BACKUP 0x10000000u /* AUTOMATIC FLAG VALUE GENERATION STOP 32 */ uint32_t flags; }; diff --git a/src/include/extern.h b/src/include/extern.h index bbb967e01..21ae0c499 100644 --- a/src/include/extern.h +++ b/src/include/extern.h @@ -31,6 +31,8 @@ extern bool __wt_page_evict_urgent(WT_SESSION_IMPL *session, WT_REF *ref) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern bool __wt_read_cell_time_window(WT_CURSOR_BTREE *cbt, WT_TIME_WINDOW *tw) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern bool __wt_rts_thread_chk(WT_SESSION_IMPL *session) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern bool __wt_rts_visibility_has_stable_update(WT_UPDATE *upd) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern bool __wt_rts_visibility_page_needs_abort(WT_SESSION_IMPL *session, WT_REF *ref, @@ -1343,6 +1345,8 @@ extern int __wt_rts_btree_walk_btree(WT_SESSION_IMPL *session, wt_timestamp_t ro extern int __wt_rts_btree_walk_btree_apply( WT_SESSION_IMPL *session, const char *uri, const char *config, wt_timestamp_t rollback_timestamp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_btree_work_unit(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rts_check(WT_SESSION_IMPL *session) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rts_history_btree_hs_truncate(WT_SESSION_IMPL *session, uint32_t btree_id) @@ -1351,6 +1355,12 @@ extern int __wt_rts_history_delete_hs(WT_SESSION_IMPL *session, WT_ITEM *key, wt WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rts_history_final_pass(WT_SESSION_IMPL *session, wt_timestamp_t rollback_timestamp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_push_work(WT_SESSION_IMPL *session, const char *uri, + wt_timestamp_t rollback_timestamp) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_thread_run(WT_SESSION_IMPL *session, WT_THREAD *thread) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_rts_thread_stop(WT_SESSION_IMPL *session, WT_THREAD *thread) + WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_rwlock_init(WT_SESSION_IMPL *session, WT_RWLOCK *l) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_salvage(WT_SESSION_IMPL *session, const char *cfg[]) @@ -1910,6 +1920,8 @@ extern void __wt_ref_out(WT_SESSION_IMPL *session, WT_REF *ref); extern void __wt_rollback_to_stable_init(WT_CONNECTION_IMPL *conn); extern void __wt_root_ref_init( WT_SESSION_IMPL *session, WT_REF *root_ref, WT_PAGE *root, bool is_recno); +extern void __wt_rts_pop_work(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT **entryp); +extern void __wt_rts_work_free(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry); extern void __wt_rwlock_destroy(WT_SESSION_IMPL *session, WT_RWLOCK *l); extern void __wt_schema_destroy_colgroup(WT_SESSION_IMPL *session, WT_COLGROUP **colgroupp); extern void __wt_scr_discard(WT_SESSION_IMPL *session); diff --git a/src/include/rollback_to_stable.h b/src/include/rollback_to_stable.h index 0e75d2bc8..2b8158cd6 100644 --- a/src/include/rollback_to_stable.h +++ b/src/include/rollback_to_stable.h @@ -62,6 +62,16 @@ WT_STAT_CONN_DATA_INCR(session, stat); \ } while (0) +/* + * WT_RTS_WORK_UNIT -- + * A definition of maintenance that a RTS tree needs done. + */ +struct __wt_rts_work_unit { + TAILQ_ENTRY(__wt_rts_work_unit) q; /* Worker unit queue */ + char *uri; + wt_timestamp_t rollback_timestamp; +}; + /* * WT_ROLLBACK_TO_STABLE -- * Rollback to stable singleton, contains the interface to rollback to stable along @@ -72,6 +82,16 @@ struct __wt_rollback_to_stable { int (*rollback_to_stable_one)(WT_SESSION_IMPL *, const char *, bool *); int (*rollback_to_stable)(WT_SESSION_IMPL *, const char *[], bool); + /* RTS thread information. */ + WT_CONDVAR *rts_cond; /* RTS thread condition */ + WT_THREAD_GROUP rts_threads; + uint32_t rts_threads_max; /* Max rts threads */ + uint32_t rts_threads_min; /* Min rts threads */ + + /* Locked: rts system work queue. */ + TAILQ_HEAD(__wt_rts_qh, __wt_rts_work_unit) rtsqh; + WT_SPINLOCK rts_lock; /* RTS work queue spinlock */ + /* Configuration. */ bool dryrun; }; diff --git a/src/include/wt_internal.h b/src/include/wt_internal.h index 8d1a6be55..d4d830b5a 100644 --- a/src/include/wt_internal.h +++ b/src/include/wt_internal.h @@ -321,6 +321,8 @@ struct __wt_rollback_to_stable; typedef struct __wt_rollback_to_stable WT_ROLLBACK_TO_STABLE; struct __wt_row; typedef struct __wt_row WT_ROW; +struct __wt_rts_work_unit; +typedef struct __wt_rts_work_unit WT_RTS_WORK_UNIT; struct __wt_rwlock; typedef struct __wt_rwlock WT_RWLOCK; struct __wt_salvage_cookie; @@ -427,9 +429,10 @@ typedef uint64_t wt_timestamp_t; #include "misc.h" #include "mutex.h" -#include "stat.h" /* required by dhandle.h */ -#include "dhandle.h" /* required by btree.h */ -#include "timestamp.h" /* required by reconcile.h */ +#include "stat.h" /* required by dhandle.h */ +#include "dhandle.h" /* required by btree.h */ +#include "timestamp.h" /* required by reconcile.h */ +#include "thread_group.h" /* required by rollback_to_stable.h */ #include "api.h" #include "block.h" @@ -453,7 +456,6 @@ typedef uint64_t wt_timestamp_t; #include "reconcile.h" #include "rollback_to_stable.h" #include "schema.h" -#include "thread_group.h" #include "tiered.h" #include "txn.h" #include "verbose.h" diff --git a/src/rollback_to_stable/rts.c b/src/rollback_to_stable/rts.c index 099152c6c..e3c60b0d4 100644 --- a/src/rollback_to_stable/rts.c +++ b/src/rollback_to_stable/rts.c @@ -96,6 +96,127 @@ __rts_progress_msg(WT_SESSION_IMPL *session, struct timespec rollback_start, } } +/* + * __wt_rts_thread_chk -- + * Check to decide if the rts thread should continue running. + */ +bool +__wt_rts_thread_chk(WT_SESSION_IMPL *session) +{ + return (F_ISSET(S2C(session), WT_CONN_RTS_THREAD_RUN)); +} + +/* + * __wt_rts_thread_run -- + * Entry function for an rts thread. This is called repeatedly from the thread group code so it + * does not need to loop itself. + */ +int +__wt_rts_thread_run(WT_SESSION_IMPL *session, WT_THREAD *thread) +{ + WT_DECL_RET; + WT_RTS_WORK_UNIT *entry; + + WT_UNUSED(thread); + + /* Mark the session as an eviction thread session. */ + F_SET(session, WT_SESSION_ROLLBACK_TO_STABLE); + + __wt_rts_pop_work(session, &entry); + if (entry == NULL) + return (0); + + WT_ERR(__wt_rts_btree_work_unit(session, entry)); + + if (0) { +err: + WT_RET_PANIC(session, ret, "rts thread error"); + } + return (ret); +} + +/* + * __wt_rts_thread_stop -- + * Shutdown function for an rts thread. + */ +int +__wt_rts_thread_stop(WT_SESSION_IMPL *session, WT_THREAD *thread) +{ + if (thread->id != 0) + return (0); + + /* Clear the eviction thread session flag. */ + F_CLR(session, WT_SESSION_ROLLBACK_TO_STABLE); + + __wt_verbose(session, WT_VERB_EVICTSERVER, "%s", "rts thread exiting"); + return (0); +} + +/* + * __wt_rts_thread_create -- + * Start rts threads. + */ +static int +__wt_rts_thread_create(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + uint32_t session_flags; + + conn = S2C(session); + conn->rts->rts_threads_min = conn->rts->rts_threads_max = 1; + + /* Set first, the thread might run before we finish up. */ + F_SET(conn, WT_CONN_RTS_THREAD_RUN); + + TAILQ_INIT(&conn->rts->rtsqh); /* RTS work unit list */ + WT_RET(__wt_spin_init(session, &conn->rts->rts_lock, "rts work unit list")); + WT_RET(__wt_cond_auto_alloc( + session, "rts threads", 10 * WT_THOUSAND, WT_MILLION, &conn->rts->rts_cond)); + + /* + * Create the rts thread group. Set the group size to the maximum allowed sessions. + */ + session_flags = WT_THREAD_CAN_WAIT | WT_THREAD_PANIC_FAIL; + WT_RET(__wt_thread_group_create(session, &conn->rts->rts_threads, "rts-threads", + conn->rts->rts_threads_min, conn->rts->rts_threads_max, session_flags, __wt_rts_thread_chk, + __wt_rts_thread_run, __wt_rts_thread_stop)); + + return (0); +} + +/* + * __wt_rts_thread_destroy -- + * Destroy the rts threads. + */ +static int +__wt_rts_thread_destroy(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + + conn = S2C(session); + + /* Wait for any rts thread group changes to stabilize. */ + __wt_writelock(session, &conn->rts->rts_threads.lock); + + /* + * Signal the threads to finish and stop populating the queue. + */ + F_CLR(conn, WT_CONN_RTS_THREAD_RUN); + __wt_cond_signal(session, conn->rts->rts_cond); + + __wt_verbose(session, WT_VERB_RTS, "%s", "waiting for helper threads"); + + /* + * We call the destroy function still holding the write lock. It assumes it is called locked. + */ + WT_TRET(__wt_thread_group_destroy(session, &conn->rts->rts_threads)); + __wt_spin_destroy(session, &conn->rts->rts_lock); + __wt_cond_destroy(session, &conn->rts->rts_cond); + + return (ret); +} + /* * __wt_rts_btree_apply_all -- * Perform rollback to stable to all files listed in the metadata, apart from the metadata and @@ -107,6 +228,7 @@ __wt_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_times struct timespec rollback_timer; WT_CURSOR *cursor; WT_DECL_RET; + WT_RTS_WORK_UNIT *entry; uint64_t rollback_count, rollback_msg_count; const char *config, *uri; @@ -115,7 +237,9 @@ __wt_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_times rollback_count = 0; rollback_msg_count = 0; - WT_RET(__wt_metadata_cursor(session, &cursor)); + WT_RET(__wt_rts_thread_create(session)); + + WT_ERR(__wt_metadata_cursor(session, &cursor)); while ((ret = cursor->next(cursor)) == 0) { /* Log a progress message. */ __rts_progress_msg(session, rollback_timer, rollback_count, &rollback_msg_count); @@ -143,10 +267,23 @@ __wt_rts_btree_apply_all(WT_SESSION_IMPL *session, wt_timestamp_t rollback_times } WT_ERR_NOTFOUND_OK(ret, false); + /* + * Wait for the entire rts queue is finished processing before performing the history store + * final pass. + */ + while (!TAILQ_EMPTY(&S2C(session)->rts->rtsqh)) { + __wt_rts_pop_work(session, &entry); + if (entry == NULL) + break; + WT_ERR(__wt_rts_btree_work_unit(session, entry)); + __wt_rts_work_free(session, entry); + } + if (F_ISSET(S2C(session), WT_CONN_RECOVERING)) WT_ERR(__wt_rts_history_final_pass(session, rollback_timestamp)); err: WT_TRET(__wt_metadata_cursor_release(session, &cursor)); + WT_TRET(__wt_rts_thread_destroy(session)); return (ret); } diff --git a/src/rollback_to_stable/rts_btree_walk.c b/src/rollback_to_stable/rts_btree_walk.c index c220c33b0..b7752df99 100644 --- a/src/rollback_to_stable/rts_btree_walk.c +++ b/src/rollback_to_stable/rts_btree_walk.c @@ -120,6 +120,99 @@ __rts_btree_walk(WT_SESSION_IMPL *session, wt_timestamp_t rollback_timestamp) return (ret); } +/* + * __wt_rts_work_free -- + * Free a work unit and account. + */ +void +__wt_rts_work_free(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry) +{ + __wt_free(session, entry->uri); + __wt_free(session, entry); +} + +/* + * __wt_rts_pop_work -- + * Pop a work unit from the queue. + */ +void +__wt_rts_pop_work(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT **entryp) +{ + WT_CONNECTION_IMPL *conn; + WT_RTS_WORK_UNIT *entry; + + *entryp = entry = NULL; + + conn = S2C(session); + if (TAILQ_EMPTY(&conn->rts->rtsqh)) + return; + + __wt_spin_lock(session, &conn->rts->rts_lock); + + /* Recheck again to confirm whether the queue is empty or not? */ + if (TAILQ_EMPTY(&conn->rts->rtsqh)) { + __wt_spin_unlock(session, &conn->rts->rts_lock); + return; + } + + entry = TAILQ_FIRST(&conn->rts->rtsqh); + TAILQ_REMOVE(&conn->rts->rtsqh, entry, q); + *entryp = entry; + + __wt_spin_unlock(session, &conn->rts->rts_lock); + return; +} + +/* + * __wt_rts_push_work -- + * Push a work unit to the queue. + */ +int +__wt_rts_push_work(WT_SESSION_IMPL *session, const char *uri, wt_timestamp_t rollback_timestamp) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_RTS_WORK_UNIT *entry; + + conn = S2C(session); + + WT_RET(__wt_calloc_one(session, &entry)); + WT_ERR(__wt_strdup(session, uri, &entry->uri)); + entry->rollback_timestamp = rollback_timestamp; + + __wt_spin_lock(session, &conn->rts->rts_lock); + TAILQ_INSERT_TAIL(&conn->rts->rtsqh, entry, q); + __wt_spin_unlock(session, &conn->rts->rts_lock); + __wt_cond_signal(session, conn->rts->rts_cond); + + return (0); +err: + __wt_free(session, entry); + return (ret); +} + +/* + * __wt_rts_btree_work_unit -- + * Perform rollback to stable on a single work unit. + */ +int +__wt_rts_btree_work_unit(WT_SESSION_IMPL *session, WT_RTS_WORK_UNIT *entry) +{ + WT_DECL_RET; + + /* + * Open a handle; we're potentially opening a lot of handles and there's no reason to cache all + * of them for future unknown use, discard on close. + */ + ret = __wt_session_get_dhandle(session, entry->uri, NULL, NULL, WT_DHANDLE_DISCARD); + if (ret != 0) + WT_RET_MSG(session, ret, "%s: unable to open handle%s", entry->uri, + ret == EBUSY ? ", error indicates handle is unavailable due to concurrent use" : ""); + ret = __wt_rts_btree_walk_btree(session, entry->rollback_timestamp); + WT_TRET(__wt_session_release_dhandle(session)); + return (ret); +} + /* * __wt_rts_btree_walk_btree_apply -- * Perform rollback to stable on a single file. @@ -233,30 +326,19 @@ __wt_rts_btree_walk_btree_apply( if (modified || max_durable_ts > rollback_timestamp || prepared_updates || !durable_ts_found || has_txn_updates_gt_than_ckpt_snap) { - /* - * Open a handle; we're potentially opening a lot of handles and there's no reason to cache - * all of them for future unknown use, discard on close. - */ - ret = __wt_session_get_dhandle(session, uri, NULL, NULL, WT_DHANDLE_DISCARD); - if (ret != 0) - WT_ERR_MSG(session, ret, "%s: unable to open handle%s", uri, - ret == EBUSY ? ", error indicates handle is unavailable due to concurrent use" : ""); - dhandle_allocated = true; - __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), WT_RTS_VERB_TAG_TREE "tree rolled back. modified=%s, durable_timestamp=%s > stable_timestamp=%s: %s, " "has_prepared_updates=%s, durable_timestamp_not_found=%s, txnid=%" PRIu64 " > recovery_checkpoint_snap_min=%" PRIu64 ": %s", - S2BT(session)->modified ? "true" : "false", - __wt_timestamp_to_string(max_durable_ts, ts_string[0]), + modified ? "true" : "false", __wt_timestamp_to_string(max_durable_ts, ts_string[0]), __wt_timestamp_to_string(rollback_timestamp, ts_string[1]), max_durable_ts > rollback_timestamp ? "true" : "false", prepared_updates ? "true" : "false", !durable_ts_found ? "true" : "false", rollback_txnid, S2C(session)->recovery_ckpt_snap_min, has_txn_updates_gt_than_ckpt_snap ? "true" : "false"); - WT_ERR(__wt_rts_btree_walk_btree(session, rollback_timestamp)); + WT_ERR(__wt_rts_push_work(session, uri, rollback_timestamp)); } else __wt_verbose_multi(session, WT_VERB_RECOVERY_RTS(session), WT_RTS_VERB_TAG_TREE_SKIP diff --git a/src/support/thread_group.c b/src/support/thread_group.c index 0f3e3084b..c420f814f 100644 --- a/src/support/thread_group.c +++ b/src/support/thread_group.c @@ -182,7 +182,7 @@ __thread_group_resize(WT_SESSION_IMPL *session, WT_THREAD_GROUP *group, uint32_t /* Threads get their own session. */ session_flags = LF_ISSET(WT_THREAD_CAN_WAIT) ? WT_SESSION_CAN_WAIT : 0; WT_ERR( - __wt_open_internal_session(conn, group->name, false, session_flags, 0, &thread->session)); + __wt_open_internal_session(conn, group->name, false, session_flags, session->lock_flags, &thread->session)); if (LF_ISSET(WT_THREAD_PANIC_FAIL)) F_SET(thread, WT_THREAD_PANIC_FAIL); thread->id = i; diff --git a/test/suite/test_rollback_to_stable43.py b/test/suite/test_rollback_to_stable43.py new file mode 100755 index 000000000..6937dfb91 --- /dev/null +++ b/test/suite/test_rollback_to_stable43.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python +# +# Public Domain 2014-present MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. + +import wttest +from wtdataset import SimpleDataSet +from wiredtiger import stat +from wtscenario import make_scenarios +from rollback_to_stable_util import test_rollback_to_stable_base + +# test_rollback_to_stable43.py +# Test that rollback to stable brings back the history value to replace on-disk value. +class test_rollback_to_stable43(test_rollback_to_stable_base): + + # For FLCS, set the page size down. Otherwise for the in-memory scenarios we get enough + # updates on the page that the in-memory page footprint exceeds the default maximum + # in-memory size, and that in turn leads to pathological behavior where the page gets + # force-evicted over and over again trying to resolve/condense the updates. But they + # don't (for in-memory, they can't be moved to the history store) so this leads to a + # semi-livelock state that makes the test some 20x slower than it needs to be. + # + # FUTURE: it would be better if the system adjusted on its own, but it's not critical + # and this workload (with every entry on the page modified repeatedly) isn't much like + # anything that happens in production. + format_values = [ + ('column', dict(key_format='r', value_format='S', extraconfig='')), + ('column_fix', dict(key_format='r', value_format='8t', extraconfig=',leaf_page_max=4096')), + ('row_integer', dict(key_format='i', value_format='S', extraconfig='')), + ] + + in_memory_values = [ + ('no_inmem', dict(in_memory=False)), + ('inmem', dict(in_memory=True)) + ] + + dryrun_values = [ + ('no_dryrun', dict(dryrun=False)), + ('dryrun', dict(dryrun=True)) + ] + + scenarios = make_scenarios(format_values, in_memory_values, dryrun_values) + + def conn_config(self): + config = 'cache_size=100MB,statistics=(all)' + if self.in_memory: + config += ',in_memory=true' + return config + + def test_rollback_to_stable(self): + nrows = 10000 + ntables = 100 + + # Create a tables. + for i in range(1, ntables + 1): + uri = "table:rollback_to_stable43" + str(i) + ds_config = self.extraconfig + ds_config += ',log=(enabled=false)' if self.in_memory else '' + ds = SimpleDataSet(self, uri, 0, + key_format=self.key_format, value_format=self.value_format, config=ds_config) + ds.populate() + + if self.value_format == '8t': + valuea = 97 + valueb = 98 + valuec = 99 + valued = 100 + else: + valuea = "aaaaa" * 100 + valueb = "bbbbb" * 100 + valuec = "ccccc" * 100 + valued = "ddddd" * 100 + + # Pin oldest and stable to timestamp 1. + self.conn.set_timestamp('oldest_timestamp=' + self.timestamp_str(1) + + ',stable_timestamp=' + self.timestamp_str(1)) + + for i in range(1, ntables + 1): + uri = "table:rollback_to_stable43" + str(i) + self.large_updates(uri, valuea, ds, nrows, None, 10) + # Check that all updates are seen. + self.check(valuea, uri, nrows, None, 10) + + self.large_updates(uri, valueb, ds, nrows, None, 20) + # Check that the new updates are only seen after the update timestamp. + self.check(valueb, uri, nrows, None, 20) + + self.large_updates(uri, valuec, ds, nrows, None, 30) + # Check that the new updates are only seen after the update timestamp. + self.check(valuec, uri, nrows, None, 30) + + self.large_updates(uri, valued, ds, nrows, None, 40) + # Check that the new updates are only seen after the update timestamp. + self.check(valued, uri, nrows, None, 40) + + # Pin stable to timestamp 20. + self.conn.set_timestamp('stable_timestamp=' + self.timestamp_str(20)) + + # Checkpoint to ensure that all the data is flushed. + self.session.breakpoint() + if not self.in_memory: + self.session.checkpoint() + + self.conn.rollback_to_stable('dryrun={}'.format('true' if self.dryrun else 'false')) + # Check that the new updates are only seen after the update timestamp. + self.session.breakpoint() + + if self.dryrun: + self.check(valued, uri, nrows, None, 40) + else: + self.check(valueb, uri, nrows, None, 40) + + self.check(valueb, uri, nrows, None, 20) + self.check(valuea, uri, nrows, None, 10) + + stat_cursor = self.session.open_cursor('statistics:', None, None) + calls = stat_cursor[stat.conn.txn_rts][2] + upd_aborted = (stat_cursor[stat.conn.txn_rts_upd_aborted][2] + + stat_cursor[stat.conn.txn_rts_hs_removed][2]) + keys_removed = stat_cursor[stat.conn.txn_rts_keys_removed][2] + keys_restored = stat_cursor[stat.conn.txn_rts_keys_restored][2] + pages_visited = stat_cursor[stat.conn.txn_rts_pages_visited][2] + stat_cursor.close() + + self.assertEqual(calls, 1) + self.assertEqual(keys_removed, 0) + self.assertEqual(keys_restored, 0) + self.assertGreater(pages_visited, 0) + + if self.dryrun: + self.assertEqual(upd_aborted, 0) + else: + self.assertGreaterEqual(upd_aborted, nrows * 2 * ntables) + +if __name__ == '__main__': + wttest.run()