-
Type: Bug
-
Resolution: Unresolved
-
Priority: Minor - P4
-
None
-
Affects Version/s: None
-
Component/s: None
-
3
74.29% [kernel] [k] __pv_queued_spin_lock_slowpath 4.65% [kernel] [k] native_write_msr_safe 3.89% libc-2.17.so [.] __random 1.43% libc-2.17.so [.] __random_r 1.39% [kernel] [k] futex_wake 1.32% [kernel] [k] __switch_to 1.27% [kernel] [k] try_to_wake_up 1.03% [kernel] [k] copy_user_generic_string 1.00% [kernel] [k] __raw_callee_save___pv_queued_spin_unlock 0.80% [kernel] [k] futex_wait_setup 0.53% [kernel] [k] __schedule 0.48% [kernel] [k] _raw_qspin_lock 0.42% libc-2.17.so [.] __lll_lock_wait_private 0.32% [kernel] [k] plist_add 0.27% [kernel] [k] futex_wait 0.26% test_wt [.] rand_str 0.21% [kernel] [k] system_call_after_swapgs 0.21% [kernel] [k] mark_wake_futex 0.20% [kernel] [k] sched_clock_local 0.17% [kernel] [k] get_futex_key 0.17% [kernel] [k] system_call 0.16% [kernel] [k] sysret_check 0.15% [kernel] [k] schedule 0.14% [kernel] [k] __unqueue_futex 0.13% [kernel] [k] sched_clock_cpu 0.13% [kernel] [k] select_task_rq_fair 0.13% [kernel] [k] pvclock_clocksource_read 0.12% [kernel] [k] do_futex 0.12% [kernel] [k] hash_futex 0.12% [kernel] [k] established_get_first.isra.38 0.11% [kernel] [k] __audit_syscall_exit 0.11% [kernel] [k] pick_next_task_fair 0.11% [kernel] [k] check_preempt_wakeup 0.11% [kernel] [k] deactivate_task 0.10% libc-2.17.so [.] rand 0.09% [kernel] [k] rb_erase 0.08% [kernel] [k] sys_futex 0.07% [kernel] [k] dequeue_entity 0.07% [kernel] [k] account_entity_dequeue 0.07% [kernel] [k] dequeue_task_fair 0.06% [kernel] [k] __audit_syscall_entry
my test code
#include <dirent.h> #include <errno.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #include <string.h> #include <stdint.h> #include <stdbool.h> #include <assert.h> #include <pthread.h> #include <fcntl.h> #include <sys/stat.h> #include <sys/stat.h> #include <wiredtiger_ext.h> #include <wiredtiger.h> static char charset[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; typedef struct metric_t { char *type; uint64_tbytes; uint64_tcount; intsleep_seconds; booldone; } metric; typedef struct thread_arg_t { WT_CONNECTION *conn; inttid; size_tk_size; size_tv_size; char *prefix; char *subfix; uint64_tkv_num; metric *m; } thread_arg; static const char *home; static int create_databae_home(const char *data_dir) { intret = 0; DIR *dir = opendir(data_dir); if (dir) { closedir(dir); } elseif (ENOENT == errno) { mkdir(data_dir, 0644); } else { ret = -1; } returnret; } uint32_t jump_consistent_hash(uint64_t key, int32_t num_buckets) { int64_tb = -1, j = 0; while (j < num_buckets) { b = j; key = key * 2862933555777941757ULL + 1; j = (b + 1) * ((double)(1LL << 31) / (double)((key >> 33) + 1)); } returnabs(b); } int rand_str(char *s, const char *prefix, const char *subfix, size_t n) { inti, j; size_ts_size = 0; size_tcharset_len = strlen((char *)&charset); if (prefix != NULL) { memcpy(s, prefix, strlen(prefix)); s_size = strlen(prefix); s[s_size] = '\0'; } for (i = 0; i < n; i++) { //int index = jump_consistent_hash(rand(), charset_len); intindex = rand()%charset_len; s[s_size++] = charset[index]; } if (subfix != NULL) { size_tsubfix_size = strlen(subfix); memcpy(s + s_size, subfix, subfix_size); s_size = s_size + subfix_size; } s[s_size] = '\0'; returns_size; } static void *metric_print(void *arg) { metric *mt = (metric *)arg; pthread_mutex_tm = PTHREAD_MUTEX_INITIALIZER; pthread_cond_tc; structtimespecto; pthread_condattr_tattr; pthread_condattr_init(&attr); pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); pthread_cond_init(&c, &attr); pthread_mutex_lock(&m); while (mt->done) { clock_gettime(CLOCK_MONOTONIC, &to); to.tv_sec += mt->sleep_seconds; pthread_cond_timedwait(&c, &m, &to); fprintf(stdout, "%s count=%ld,bytes=%ld\n",mt->type,mt->count,mt->bytes); } pthread_mutex_unlock(&m); } void * thread_do_put(void *arg) { thread_arg *param = (thread_arg *)arg; WT_SESSION *session = NULL; WT_CURSOR *cursor = NULL; intret = param->conn->open_session(param->conn, NULL, NULL, &session); fprintf(stdout, "thread id=%d,init session=%d\n", param->tid, ret); session->create(session, "table:perrynzhou-wt_bench", "key_format=S,value_format=S,checksum=off"); ret = session->open_cursor(session, "table:perrynzhou-wt_bench", NULL, NULL,&cursor); fprintf(stdout, "thread id=%d,init cursor=%d\n", param->tid, ret); uint64_ti = 0; size_tfix_sz = 0; if (param->prefix != NULL) { fix_sz += strlen(param->prefix); } if (param->subfix != NULL) { fix_sz += strlen(param->subfix); } chartype[256] = \{'\0'}; snprintf((char *)&type,256,"w-%d",param->tid); param->m->type = strdup((char *)&type); pthread_tm_thd; pthread_create(&m_thd,NULL,(void *)metric_print,(void *)param->m); char *key_buf = malloc(sizeof(char) * (param->k_size + fix_sz) + 1); char *val_buf = malloc(sizeof(char) * param->v_size + 1); for (inti = 0; i < param->kv_num; i++) { intk_sz = rand_str(key_buf, param->prefix, param->subfix, param->k_size - fix_sz); intv_sz = rand_str(val_buf, NULL, NULL, param->v_size); cursor->set_key(cursor, key_buf); cursor->set_value(cursor, val_buf); cursor->insert(cursor); // usleep(1); param->m->bytes = param->m->bytes +k_sz+v_sz; } pthread_join(m_thd,NULL); if (key_buf != NULL) { free(key_buf); } if (val_buf != NULL) { free(val_buf); } returnNULL; } void * thread_do_get(void *arg) { thread_arg *param = (thread_arg *)arg; WT_SESSION *session = NULL; WT_CURSOR *cursor = NULL; intret = param->conn->open_session(param->conn, NULL, NULL, &session); // fprintf(stdout, "thread id=%d,init session=%d\n", param->tid, ret); session->create(session, "table:perrynzhou-wt_bench", "key_format=S,value_format=S"); ret = session->open_cursor(session, "table:perrynzhou-wt_bench", NULL,NULL, &cursor); // fprintf(stdout, "thread id=%d,init cursor=%d\n", param->tid, ret); size_tfix_sz = 0; if (param->prefix != NULL) { fix_sz += strlen(param->prefix); } if (param->subfix != NULL) { fix_sz += strlen(param->subfix); } size_tline_sz = param->k_size + param->v_size; char *line_buf = malloc(sizeof(char) * line_sz + 1); char *key_buf = malloc(sizeof(char) * param->k_size + 1); char *val_buf = malloc(sizeof(char) * param->v_size + 1); } static void wt_bench(const char *data_dir, int thd_num, uint64_t kv_rec_num, size_t k_size, size_t v_size, char *prefix, char *subfix,int time_interval) { WT_CONNECTION *conn; WT_CURSOR *cursor; WT_SESSION *session; constchar *key, *value; intret; wiredtiger_open(data_dir, NULL, "create", &conn); conn->open_session(conn, NULL, NULL, &session); pthread_t *w_thd = malloc(sizeof(pthread_t) * thd_num); pthread_t *r_thd = malloc(sizeof(pthread_t) * thd_num); pthread_t *m_thd = malloc(sizeof(pthread_t)); thread_arg *r_params = malloc(sizeof(thread_arg) * thd_num); thread_arg *w_params = malloc(sizeof(thread_arg) * thd_num); // pthread_create(m_thd, NULL, (void *)metric_print, (void *)m); inti = 0; for (; i < thd_num; i++) { w_params[i].tid = i; w_params[i].conn = conn; w_params[i].k_size = k_size; w_params[i].v_size = v_size; w_params[i].kv_num = kv_rec_num; w_params[i].prefix = prefix; w_params[i].subfix = subfix; w_params[i].m = calloc(1, sizeof(metric)); w_params[i].m->sleep_seconds=time_interval; w_params[i].m->done = true; pthread_create(&w_thd[i], NULL, (void *)&thread_do_put, &w_params[i]); } for (i = 0; i < thd_num; i++) { pthread_join(w_thd[i], NULL); free(w_params[i].m->type); } for (i = 0; i < thd_num; i++) { r_params[i].tid = i; r_params[i].conn = conn; r_params[i].k_size = k_size; r_params[i].v_size = v_size; r_params[i].prefix = prefix; r_params[i].subfix = subfix; pthread_create(&r_thd[i], NULL, (void *)&thread_do_get, &r_params[i]); } for (i = 0; i < thd_num; i++) { pthread_join(r_thd[i], NULL); } // pthread_join(*m_thd, NULL); conn->close(conn, NULL); if (r_thd != NULL) { free(r_thd); free(r_params); } if (w_thd != NULL) { free(w_thd); free(w_params); } } // wt_bench(const char *data_dir,int thd_num,uint64_t kv_rec_num,size_t k_size,size_t v_size) void usage(const char *arg) { if(arg==NULL || strcmp(arg,"--help")==0 || strcmp(arg,"-h")==0||strcmp(arg,"-help")) { fprintf(stdout,"wt_bench:\n"); fprintf(stdout," -d database home\n"); fprintf(stdout," -t thread count\n"); fprintf(stdout," -n record count\n"); fprintf(stdout," -k key buffer size\n"); fprintf(stdout," -v val buffer size\n"); fprintf(stdout," -p prefix of key\n"); fprintf(stdout," -s subfix of key\n"); fprintf(stdout," -i interval for print metric\n"); exit(-1); } } int main(int argc, char *argv[]) { intopt; char *string = "d:t:n:k:v:p:s:i"; char *db_home = NULL; intthread_num = -1; longkv_rec_num = -1; intkey_size = -1; intval_size = -1; char *prefix = NULL; char *subfix = NULL; inttime_interval = -1; //usage(argv[1]); while ((opt = getopt(argc, argv, string)) != -1) { if (opt == -1) { break; } switch (opt) { case'd': db_home = strdup(optarg); break; case't': thread_num = atoi(optarg); break; case'n': kv_rec_num = atol(optarg); break; case'k': key_size = atoi(optarg); break; case'v': val_size = atoi(optarg); break; case'p': prefix = strdup(optarg); break; case'i': time_interval = atoi(optarg); break; case's': subfix = strdup(optarg); break; default: break; } } intdefault_size = 8; if (db_home == NULL) { fprintf(stdout, "db_home is nil\n"); exit(-1); } time_interval = (time_interval <0)?1:time_interval; thread_num = (thread_num < 0) ? 5 : thread_num; key_size = (key_size < 0) ? default_size : key_size; val_size = (val_size < 0) ? default_size : val_size; kv_rec_num = (kv_rec_num < 0) ? 1 : kv_rec_num; wt_bench(db_home, thread_num, (uint64_t)kv_rec_num, key_size, val_size, prefix, subfix,time_interval); return0; }