diff --git a/src/third_party/wiredtiger/src/conn/conn_log.c b/src/third_party/wiredtiger/src/conn/conn_log.c index 9d49e36..3521359 100644 --- a/src/third_party/wiredtiger/src/conn/conn_log.c +++ b/src/third_party/wiredtiger/src/conn/conn_log.c @@ -357,100 +357,155 @@ __log_wrlsn_cmp(const void *a, const void *b) return (LOG_CMP(&ae->lsn, &be->lsn)); } -/* - * __log_wrlsn_server -- - * The log wrlsn server thread. - */ -static WT_THREAD_RET -__log_wrlsn_server(void *arg) -{ - WT_CONNECTION_IMPL *conn; - WT_DECL_RET; - WT_LOG *log; +// +// Process written log slots +// +// called from __wt_log_slot_close with the logging spinlock held +// also called (for now) __log_wrlsn_server also with the logging spinlock held +// +// written slots can be freed if they are contiguous with either log->write_lsn, +// or with another written slot's slot_end_lsn +// +// returns a free slot in *free_slot, or NULL if no slots are freeable or already free +// returns earliest free slot - so may be able to tweak this to avoid +// scanning whole array by tracking highest non-free slot (if that matters) +// + +int +__wt_log_wrlsn(WT_SESSION_IMPL *session, WT_CONNECTION_IMPL *conn, WT_LOG* log, WT_LOGSLOT** free_slot) { + WT_LOG_WRLSN_ENTRY written[SLOT_POOL]; WT_LOGSLOT *slot; - WT_SESSION_IMPL *session; size_t written_i; uint32_t i, save_i; - int yield; - session = arg; - conn = S2C(session); - log = conn->log; - yield = 0; - while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) { - /* - * No need to use the log_slot_lock because the slot pool - * is statically allocated and any slot in the - * WT_LOG_SLOT_WRITTEN state is exclusively ours for now. - */ - i = 0; - written_i = 0; - /* - * Walk the array once saving any slots that are in the - * WT_LOG_SLOT_WRITTEN state. - */ - while (i < SLOT_POOL) { - save_i = i; - slot = &log->slot_pool[i++]; - if (slot->slot_state != WT_LOG_SLOT_WRITTEN) - continue; - written[written_i].slot_index = save_i; - written[written_i++].lsn = slot->slot_release_lsn; - } + i = 0; + written_i = 0; + *free_slot = NULL; + /* + * Walk the array once saving any slots that are in the + * WT_LOG_SLOT_WRITTEN state. + */ + while (i < SLOT_POOL) { + save_i = i; + slot = &log->slot_pool[i++]; + if (!*free_slot && slot->slot_state==WT_LOG_SLOT_FREE) + *free_slot = slot; + if (slot->slot_state != WT_LOG_SLOT_WRITTEN) + continue; + written[written_i].slot_index = save_i; + written[written_i++].lsn = slot->slot_release_lsn; + } + /* + * If we found any written slots process them. We sort them + * based on the release LSN, and then look for them in order. + */ + if (written_i > 0) { + qsort(written, written_i, sizeof(WT_LOG_WRLSN_ENTRY), + __log_wrlsn_cmp); /* - * If we found any written slots process them. We sort them - * based on the release LSN, and then look for them in order. + * We know the written array is sorted by LSN. Go + * through them either advancing write_lsn or coalesce + * contiguous ranges of written slots */ - if (written_i > 0) { - yield = 0; - qsort(written, written_i, sizeof(WT_LOG_WRLSN_ENTRY), - __log_wrlsn_cmp); - /* - * We know the written array is sorted by LSN. Go - * through them either advancing write_lsn or stop - * as soon as one is not in order. - */ - for (i = 0; i < written_i; i++) { + WT_LOGSLOT *coalescing = NULL; + for (i = 0; i < written_i; i++) { + + slot = &log->slot_pool[written[i].slot_index]; + + if (coalescing) { + + if (LOG_CMP(&coalescing->slot_end_lsn, + &written[i].lsn) != 0) { + coalescing = slot; + continue; + } + + /* + * If we get here we have a slot to coalesce and free + */ + coalescing->slot_end_lsn = slot->slot_end_lsn; + + /* + * Signal the close thread if needed. + */ + if (F_ISSET(slot, SLOT_CLOSEFH)) + WT_RET(__wt_cond_signal(session, + conn->log_close_cond)); + WT_RET(__wt_log_slot_free(session, slot)); + if (!*free_slot) + *free_slot = slot; + + } else { + if (LOG_CMP(&log->write_lsn, - &written[i].lsn) != 0) - break; + &written[i].lsn) != 0) { + coalescing = slot; + continue; + } + /* * If we get here we have a slot to process. * Advance the LSN and process the slot. */ - slot = &log->slot_pool[written[i].slot_index]; WT_ASSERT(session, LOG_CMP(&written[i].lsn, - &slot->slot_release_lsn) == 0); + &slot->slot_release_lsn) == 0); log->write_start_lsn = slot->slot_start_lsn; log->write_lsn = slot->slot_end_lsn; - WT_ERR(__wt_cond_signal(session, - log->log_write_cond)); + WT_RET(__wt_cond_signal(session, + log->log_write_cond)); WT_STAT_FAST_CONN_INCR(session, log_write_lsn); /* * Signal the close thread if needed. */ if (F_ISSET(slot, SLOT_CLOSEFH)) - WT_ERR(__wt_cond_signal(session, - conn->log_close_cond)); - WT_ERR(__wt_log_slot_free(session, slot)); + WT_RET(__wt_cond_signal(session, + conn->log_close_cond)); + WT_RET(__wt_log_slot_free(session, slot)); + if (!*free_slot) + *free_slot = slot; } } - /* - * If we saw a later write, we always want to yield because - * we know something is in progress. - */ - if (yield++ < 1000) - __wt_yield(); - else - /* Wait until the next event. */ - WT_ERR(__wt_cond_wait(session, - conn->log_wrlsn_cond, 100000)); + + } + + return 0; +} + + +/* + * __log_wrlsn_server -- + * The log wrlsn server thread. + */ + +// +// don't know if this is needed if __log_wrlsn is going to be called +// from elsewhere. Without it I got a hang, related I think to direct +// writes. Maybe there's somewhere else besides __wt_log_slot_close +// that it needs to be called from +// + +static WT_THREAD_RET +__log_wrlsn_server(void *arg) +{ + WT_CONNECTION_IMPL *conn; + WT_LOG *log; + WT_SESSION_IMPL *session; + WT_LOGSLOT *free_slot; + + session = arg; + conn = S2C(session); + log = conn->log; + + while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) { + __wt_spin_lock(session, &log->log_slot_lock); + __wt_log_wrlsn(session, conn, log, &free_slot); + __wt_spin_unlock(session, &log->log_slot_lock); + __wt_sleep(0, 1000); + } - if (0) -err: __wt_err(session, ret, "log wrlsn server error"); return (WT_THREAD_RET_VALUE); } diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index 59e7958..85946f5 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -237,6 +237,7 @@ extern int __wt_conn_dhandle_discard(WT_SESSION_IMPL *session); extern int __wt_connection_init(WT_CONNECTION_IMPL *conn); extern int __wt_connection_destroy(WT_CONNECTION_IMPL *conn); extern int __wt_log_truncate_files( WT_SESSION_IMPL *session, WT_CURSOR *cursor, const char *cfg[]); +extern int __wt_log_wrlsn(WT_SESSION_IMPL *session, WT_CONNECTION_IMPL *conn, WT_LOG* log, WT_LOGSLOT** free); extern int __wt_logmgr_create(WT_SESSION_IMPL *session, const char *cfg[]); extern int __wt_logmgr_open(WT_SESSION_IMPL *session); extern int __wt_logmgr_destroy(WT_SESSION_IMPL *session); diff --git a/src/third_party/wiredtiger/src/include/log.h b/src/third_party/wiredtiger/src/include/log.h index e522a61..66f346e 100644 --- a/src/third_party/wiredtiger/src/include/log.h +++ b/src/third_party/wiredtiger/src/include/log.h @@ -156,7 +156,7 @@ typedef struct { * slot count of one. */ #define SLOT_ACTIVE 1 -#define SLOT_POOL 16 +#define SLOT_POOL 128 uint32_t pool_index; /* Global pool index */ WT_LOGSLOT *slot_array[SLOT_ACTIVE]; /* Active slots */ WT_LOGSLOT slot_pool[SLOT_POOL]; /* Pool of all slots */ diff --git a/src/third_party/wiredtiger/src/log/log_slot.c b/src/third_party/wiredtiger/src/log/log_slot.c index 02b3056..5175884 100644 --- a/src/third_party/wiredtiger/src/log/log_slot.c +++ b/src/third_party/wiredtiger/src/log/log_slot.c @@ -167,6 +167,7 @@ join_slot: return (0); } + /* * __wt_log_slot_close -- * Close a slot and do not allow any other threads to join this slot. @@ -174,6 +175,7 @@ join_slot: * the pool into its place. Set up the size of this group; * Must be called with the logging spinlock held. */ + int __wt_log_slot_close(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) { @@ -181,40 +183,22 @@ __wt_log_slot_close(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) WT_LOG *log; WT_LOGSLOT *newslot; int64_t old_state; - int32_t yields; - uint32_t pool_i, switch_fails; + int pool_i; conn = S2C(session); log = conn->log; - switch_fails = 0; -retry: + /* * Find an unused slot in the pool. */ - pool_i = log->pool_index; - newslot = &log->slot_pool[pool_i]; - if (++log->pool_index >= SLOT_POOL) - log->pool_index = 0; - if (newslot->slot_state != WT_LOG_SLOT_FREE) { - WT_STAT_FAST_CONN_INCR(session, log_slot_switch_fails); - /* - * If it takes a number of attempts to find an available slot - * it's likely all slots are waiting to be released. This - * churn is used to change how long we pause before closing - * the slot - which leads to more consolidation and less churn. - */ - if (++switch_fails % SLOT_POOL == 0 && slot->slot_churn < 5) - ++slot->slot_churn; + for (;;) { + WT_RET(__wt_log_wrlsn(session, conn, log, &newslot)); + if (newslot) + break; __wt_yield(); - goto retry; - } else if (slot->slot_churn > 0) { - --slot->slot_churn; - WT_ASSERT(session, slot->slot_churn >= 0); } - - /* Pause to allow other threads a chance to consolidate. */ - for (yields = slot->slot_churn; yields >= 0; yields--) - __wt_yield(); + pool_i = newslot - log->slot_pool; + WT_ASSERT(session, newslot->slot_state==WT_LOG_SLOT_FREE); /* * Swap out the slot we're going to use and put a free one in the