diff --git a/src/third_party/wiredtiger/src/conn/conn_log.c b/src/third_party/wiredtiger/src/conn/conn_log.c index 9d49e36..826aa9b 100644 --- a/src/third_party/wiredtiger/src/conn/conn_log.c +++ b/src/third_party/wiredtiger/src/conn/conn_log.c @@ -357,100 +357,149 @@ __log_wrlsn_cmp(const void *a, const void *b) return (LOG_CMP(&ae->lsn, &be->lsn)); } +// +// Process written log slots +// called from __wt_log_slot_close with the logging spinlock held +// also called (for now) __log_wrlsn_server also with the logging spinlock held +// returns 1 if progress was made (slots freed), 0 if not +// slots can be freed if they are contiguous with either log->write_lsn, +// or with another slot's slot_end_lsn +// + +int +__log_wrlsn(WT_SESSION_IMPL *session, WT_CONNECTION_IMPL *conn, WT_LOG* log) { + + WT_DECL_RET; + WT_LOG_WRLSN_ENTRY written[SLOT_POOL]; + WT_LOGSLOT *slot; + size_t written_i; + uint32_t i, save_i; + int progress = 0; + + i = 0; + written_i = 0; + /* + * Walk the array once saving any slots that are in the + * WT_LOG_SLOT_WRITTEN state. + */ + while (i < SLOT_POOL) { + save_i = i; + slot = &log->slot_pool[i++]; + if (slot->slot_state != WT_LOG_SLOT_WRITTEN) + continue; + written[written_i].slot_index = save_i; + written[written_i++].lsn = slot->slot_release_lsn; + } + /* + * If we found any written slots process them. We sort them + * based on the release LSN, and then look for them in order. + */ + if (written_i > 0) { + qsort(written, written_i, sizeof(WT_LOG_WRLSN_ENTRY), + __log_wrlsn_cmp); + /* + * We know the written array is sorted by LSN. Go + * through them either advancing write_lsn or coalesce + * contiguous ranges of written slots + */ + WT_LOGSLOT *coalescing = NULL; + for (i = 0; i < written_i; i++) { + + slot = &log->slot_pool[written[i].slot_index]; + + if (coalescing) { + + if (LOG_CMP(&coalescing->slot_end_lsn, + &written[i].lsn) != 0) { + coalescing = slot; + continue; + } + + /* + * If we get here we have a slot to coalesce and free + */ + coalescing->slot_end_lsn = slot->slot_end_lsn; + + /* + * Signal the close thread if needed. + */ + if (F_ISSET(slot, SLOT_CLOSEFH)) + WT_ERR(__wt_cond_signal(session, + conn->log_close_cond)); + WT_ERR(__wt_log_slot_free(session, slot)); + progress = 1; + + } else { + + if (LOG_CMP(&log->write_lsn, + &written[i].lsn) != 0) { + coalescing = slot; + continue; + } + + /* + * If we get here we have a slot to process. + * Advance the LSN and process the slot. + */ + WT_ASSERT(session, LOG_CMP(&written[i].lsn, + &slot->slot_release_lsn) == 0); + log->write_start_lsn = slot->slot_start_lsn; + log->write_lsn = slot->slot_end_lsn; + WT_ERR(__wt_cond_signal(session, + log->log_write_cond)); + WT_STAT_FAST_CONN_INCR(session, log_write_lsn); + + /* + * Signal the close thread if needed. + */ + if (F_ISSET(slot, SLOT_CLOSEFH)) + WT_ERR(__wt_cond_signal(session, + conn->log_close_cond)); + WT_ERR(__wt_log_slot_free(session, slot)); + progress = 1; + } + } + + } + +err: + return progress; +} + + /* * __log_wrlsn_server -- * The log wrlsn server thread. */ + +// +// don't know if this is needed if __log_wrlsn is going to be called +// from elsewhere. Without it I got a hang, related I think to direct +// writes. Maybe there's somewhere else besides __wt_log_slot_close +// that it needs to be called from +// + static WT_THREAD_RET __log_wrlsn_server(void *arg) { WT_CONNECTION_IMPL *conn; - WT_DECL_RET; WT_LOG *log; - WT_LOG_WRLSN_ENTRY written[SLOT_POOL]; - WT_LOGSLOT *slot; WT_SESSION_IMPL *session; - size_t written_i; - uint32_t i, save_i; - int yield; session = arg; conn = S2C(session); log = conn->log; - yield = 0; + + //int snap = 0; + while (F_ISSET(conn, WT_CONN_LOG_SERVER_RUN)) { - /* - * No need to use the log_slot_lock because the slot pool - * is statically allocated and any slot in the - * WT_LOG_SLOT_WRITTEN state is exclusively ours for now. - */ - i = 0; - written_i = 0; - /* - * Walk the array once saving any slots that are in the - * WT_LOG_SLOT_WRITTEN state. - */ - while (i < SLOT_POOL) { - save_i = i; - slot = &log->slot_pool[i++]; - if (slot->slot_state != WT_LOG_SLOT_WRITTEN) - continue; - written[written_i].slot_index = save_i; - written[written_i++].lsn = slot->slot_release_lsn; - } - /* - * If we found any written slots process them. We sort them - * based on the release LSN, and then look for them in order. - */ - if (written_i > 0) { - yield = 0; - qsort(written, written_i, sizeof(WT_LOG_WRLSN_ENTRY), - __log_wrlsn_cmp); - /* - * We know the written array is sorted by LSN. Go - * through them either advancing write_lsn or stop - * as soon as one is not in order. - */ - for (i = 0; i < written_i; i++) { - if (LOG_CMP(&log->write_lsn, - &written[i].lsn) != 0) - break; - /* - * If we get here we have a slot to process. - * Advance the LSN and process the slot. - */ - slot = &log->slot_pool[written[i].slot_index]; - WT_ASSERT(session, LOG_CMP(&written[i].lsn, - &slot->slot_release_lsn) == 0); - log->write_start_lsn = slot->slot_start_lsn; - log->write_lsn = slot->slot_end_lsn; - WT_ERR(__wt_cond_signal(session, - log->log_write_cond)); - WT_STAT_FAST_CONN_INCR(session, log_write_lsn); - - /* - * Signal the close thread if needed. - */ - if (F_ISSET(slot, SLOT_CLOSEFH)) - WT_ERR(__wt_cond_signal(session, - conn->log_close_cond)); - WT_ERR(__wt_log_slot_free(session, slot)); - } - } - /* - * If we saw a later write, we always want to yield because - * we know something is in progress. - */ - if (yield++ < 1000) - __wt_yield(); - else - /* Wait until the next event. */ - WT_ERR(__wt_cond_wait(session, - conn->log_wrlsn_cond, 100000)); + __wt_spin_lock(session, &log->log_slot_lock); + __log_wrlsn(session, conn, log); + __wt_spin_unlock(session, &log->log_slot_lock); + __wt_sleep(0, 1000); + } - if (0) -err: __wt_err(session, ret, "log wrlsn server error"); return (WT_THREAD_RET_VALUE); } diff --git a/src/third_party/wiredtiger/src/log/log_slot.c b/src/third_party/wiredtiger/src/log/log_slot.c index 02b3056..c8a2398 100644 --- a/src/third_party/wiredtiger/src/log/log_slot.c +++ b/src/third_party/wiredtiger/src/log/log_slot.c @@ -167,6 +167,10 @@ join_slot: return (0); } +extern void print_snap(WT_SESSION_IMPL *session, WT_LOG *log, char* when, int snap); + + + /* * __wt_log_slot_close -- * Close a slot and do not allow any other threads to join this slot. @@ -174,6 +178,10 @@ join_slot: * the pool into its place. Set up the size of this group; * Must be called with the logging spinlock held. */ + +extern int __log_wrlsn(WT_SESSION_IMPL *session, WT_CONNECTION_IMPL *conn, WT_LOG* log); + + int __wt_log_slot_close(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) { @@ -181,7 +189,7 @@ __wt_log_slot_close(WT_SESSION_IMPL *session, WT_LOGSLOT *slot) WT_LOG *log; WT_LOGSLOT *newslot; int64_t old_state; - int32_t yields; + //int32_t yields; uint32_t pool_i, switch_fails; conn = S2C(session); @@ -205,7 +213,13 @@ retry: */ if (++switch_fails % SLOT_POOL == 0 && slot->slot_churn < 5) ++slot->slot_churn; - __wt_yield(); + if (switch_fails % SLOT_POOL == 0) { + // looked through all slots, didn't find a free one + // so process written slots to see if we can free any + // yield if that didn't generate any free ones + if (!__log_wrlsn(session, conn, log)) + __wt_yield(); + } goto retry; } else if (slot->slot_churn > 0) { --slot->slot_churn; @@ -213,8 +227,11 @@ retry: } /* Pause to allow other threads a chance to consolidate. */ - for (yields = slot->slot_churn; yields >= 0; yields--) + // BDL: I got better results without this - but may depend on workload +#if 0 + for (int yields = slot->slot_churn; yields >= 0; yields--) __wt_yield(); +#endif /* * Swap out the slot we're going to use and put a free one in the