-
Type:
Bug
-
Resolution: Won't Fix
-
Priority:
Minor - P4
-
None
-
Affects Version/s: None
-
Component/s: Not Applicable
-
None
-
3
74.29% [kernel] [k] __pv_queued_spin_lock_slowpath 4.65% [kernel] [k] native_write_msr_safe 3.89% libc-2.17.so [.] __random 1.43% libc-2.17.so [.] __random_r 1.39% [kernel] [k] futex_wake 1.32% [kernel] [k] __switch_to 1.27% [kernel] [k] try_to_wake_up 1.03% [kernel] [k] copy_user_generic_string 1.00% [kernel] [k] __raw_callee_save___pv_queued_spin_unlock 0.80% [kernel] [k] futex_wait_setup 0.53% [kernel] [k] __schedule 0.48% [kernel] [k] _raw_qspin_lock 0.42% libc-2.17.so [.] __lll_lock_wait_private 0.32% [kernel] [k] plist_add 0.27% [kernel] [k] futex_wait 0.26% test_wt [.] rand_str 0.21% [kernel] [k] system_call_after_swapgs 0.21% [kernel] [k] mark_wake_futex 0.20% [kernel] [k] sched_clock_local 0.17% [kernel] [k] get_futex_key 0.17% [kernel] [k] system_call 0.16% [kernel] [k] sysret_check 0.15% [kernel] [k] schedule 0.14% [kernel] [k] __unqueue_futex 0.13% [kernel] [k] sched_clock_cpu 0.13% [kernel] [k] select_task_rq_fair 0.13% [kernel] [k] pvclock_clocksource_read 0.12% [kernel] [k] do_futex 0.12% [kernel] [k] hash_futex 0.12% [kernel] [k] established_get_first.isra.38 0.11% [kernel] [k] __audit_syscall_exit 0.11% [kernel] [k] pick_next_task_fair 0.11% [kernel] [k] check_preempt_wakeup 0.11% [kernel] [k] deactivate_task 0.10% libc-2.17.so [.] rand 0.09% [kernel] [k] rb_erase 0.08% [kernel] [k] sys_futex 0.07% [kernel] [k] dequeue_entity 0.07% [kernel] [k] account_entity_dequeue 0.07% [kernel] [k] dequeue_task_fair 0.06% [kernel] [k] __audit_syscall_entry
my test code
#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdbool.h>
#include <assert.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/stat.h>
#include <wiredtiger_ext.h>
#include <wiredtiger.h>
static char charset[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
typedef struct metric_t
{
char *type;
uint64_tbytes;
uint64_tcount;
intsleep_seconds;
booldone;
} metric;
typedef struct thread_arg_t
{
WT_CONNECTION *conn;
inttid;
size_tk_size;
size_tv_size;
char *prefix;
char *subfix;
uint64_tkv_num;
metric *m;
} thread_arg;
static const char *home;
static int
create_databae_home(const char *data_dir)
{
intret = 0;
DIR *dir = opendir(data_dir);
if (dir)
{
closedir(dir);
}
elseif (ENOENT == errno)
{
mkdir(data_dir, 0644);
}
else
{
ret = -1;
}
returnret;
}
uint32_t jump_consistent_hash(uint64_t key, int32_t num_buckets)
{
int64_tb = -1, j = 0;
while (j < num_buckets)
{
b = j;
key = key * 2862933555777941757ULL + 1;
j = (b + 1) * ((double)(1LL << 31) / (double)((key >> 33) + 1));
}
returnabs(b);
}
int rand_str(char *s, const char *prefix, const char *subfix, size_t n)
{
inti, j;
size_ts_size = 0;
size_tcharset_len = strlen((char *)&charset);
if (prefix != NULL)
{
memcpy(s, prefix, strlen(prefix));
s_size = strlen(prefix);
s[s_size] = '\0';
}
for (i = 0; i < n; i++)
{
//int index = jump_consistent_hash(rand(), charset_len);
intindex = rand()%charset_len;
s[s_size++] = charset[index];
}
if (subfix != NULL)
{
size_tsubfix_size = strlen(subfix);
memcpy(s + s_size, subfix, subfix_size);
s_size = s_size + subfix_size;
}
s[s_size] = '\0';
returns_size;
}
static void *metric_print(void *arg)
{
metric *mt = (metric *)arg;
pthread_mutex_tm = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_tc;
structtimespecto;
pthread_condattr_tattr;
pthread_condattr_init(&attr);
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
pthread_cond_init(&c, &attr);
pthread_mutex_lock(&m);
while (mt->done)
{
clock_gettime(CLOCK_MONOTONIC, &to);
to.tv_sec += mt->sleep_seconds;
pthread_cond_timedwait(&c, &m, &to);
fprintf(stdout, "%s count=%ld,bytes=%ld\n",mt->type,mt->count,mt->bytes);
}
pthread_mutex_unlock(&m);
}
void *
thread_do_put(void *arg)
{
thread_arg *param = (thread_arg *)arg;
WT_SESSION *session = NULL;
WT_CURSOR *cursor = NULL;
intret = param->conn->open_session(param->conn, NULL, NULL, &session);
fprintf(stdout, "thread id=%d,init session=%d\n", param->tid, ret);
session->create(session, "table:perrynzhou-wt_bench", "key_format=S,value_format=S,checksum=off");
ret = session->open_cursor(session, "table:perrynzhou-wt_bench", NULL, NULL,&cursor);
fprintf(stdout, "thread id=%d,init cursor=%d\n", param->tid, ret);
uint64_ti = 0;
size_tfix_sz = 0;
if (param->prefix != NULL)
{
fix_sz += strlen(param->prefix);
}
if (param->subfix != NULL)
{
fix_sz += strlen(param->subfix);
}
chartype[256] = \{'\0'};
snprintf((char *)&type,256,"w-%d",param->tid);
param->m->type = strdup((char *)&type);
pthread_tm_thd;
pthread_create(&m_thd,NULL,(void *)metric_print,(void *)param->m);
char *key_buf = malloc(sizeof(char) * (param->k_size + fix_sz) + 1);
char *val_buf = malloc(sizeof(char) * param->v_size + 1);
for (inti = 0; i < param->kv_num; i++)
{
intk_sz = rand_str(key_buf, param->prefix, param->subfix, param->k_size - fix_sz);
intv_sz = rand_str(val_buf, NULL, NULL, param->v_size);
cursor->set_key(cursor, key_buf);
cursor->set_value(cursor, val_buf);
cursor->insert(cursor);
// usleep(1);
param->m->bytes = param->m->bytes +k_sz+v_sz;
}
pthread_join(m_thd,NULL);
if (key_buf != NULL)
{
free(key_buf);
}
if (val_buf != NULL)
{
free(val_buf);
}
returnNULL;
}
void *
thread_do_get(void *arg)
{
thread_arg *param = (thread_arg *)arg;
WT_SESSION *session = NULL;
WT_CURSOR *cursor = NULL;
intret = param->conn->open_session(param->conn, NULL, NULL, &session);
// fprintf(stdout, "thread id=%d,init session=%d\n", param->tid, ret);
session->create(session, "table:perrynzhou-wt_bench", "key_format=S,value_format=S");
ret = session->open_cursor(session, "table:perrynzhou-wt_bench", NULL,NULL, &cursor);
// fprintf(stdout, "thread id=%d,init cursor=%d\n", param->tid, ret);
size_tfix_sz = 0;
if (param->prefix != NULL)
{
fix_sz += strlen(param->prefix);
}
if (param->subfix != NULL)
{
fix_sz += strlen(param->subfix);
}
size_tline_sz = param->k_size + param->v_size;
char *line_buf = malloc(sizeof(char) * line_sz + 1);
char *key_buf = malloc(sizeof(char) * param->k_size + 1);
char *val_buf = malloc(sizeof(char) * param->v_size + 1);
}
static void
wt_bench(const char *data_dir, int thd_num, uint64_t kv_rec_num, size_t k_size, size_t v_size, char *prefix, char *subfix,int time_interval)
{
WT_CONNECTION *conn;
WT_CURSOR *cursor;
WT_SESSION *session;
constchar *key, *value;
intret;
wiredtiger_open(data_dir, NULL, "create", &conn);
conn->open_session(conn, NULL, NULL, &session);
pthread_t *w_thd = malloc(sizeof(pthread_t) * thd_num);
pthread_t *r_thd = malloc(sizeof(pthread_t) * thd_num);
pthread_t *m_thd = malloc(sizeof(pthread_t));
thread_arg *r_params = malloc(sizeof(thread_arg) * thd_num);
thread_arg *w_params = malloc(sizeof(thread_arg) * thd_num);
// pthread_create(m_thd, NULL, (void *)metric_print, (void *)m);
inti = 0;
for (; i < thd_num; i++)
{
w_params[i].tid = i;
w_params[i].conn = conn;
w_params[i].k_size = k_size;
w_params[i].v_size = v_size;
w_params[i].kv_num = kv_rec_num;
w_params[i].prefix = prefix;
w_params[i].subfix = subfix;
w_params[i].m = calloc(1, sizeof(metric));
w_params[i].m->sleep_seconds=time_interval;
w_params[i].m->done = true;
pthread_create(&w_thd[i], NULL, (void *)&thread_do_put, &w_params[i]);
}
for (i = 0; i < thd_num; i++)
{
pthread_join(w_thd[i], NULL);
free(w_params[i].m->type);
}
for (i = 0; i < thd_num; i++)
{
r_params[i].tid = i;
r_params[i].conn = conn;
r_params[i].k_size = k_size;
r_params[i].v_size = v_size;
r_params[i].prefix = prefix;
r_params[i].subfix = subfix;
pthread_create(&r_thd[i], NULL, (void *)&thread_do_get, &r_params[i]);
}
for (i = 0; i < thd_num; i++)
{
pthread_join(r_thd[i], NULL);
}
// pthread_join(*m_thd, NULL);
conn->close(conn, NULL);
if (r_thd != NULL)
{
free(r_thd);
free(r_params);
}
if (w_thd != NULL)
{
free(w_thd);
free(w_params);
}
}
// wt_bench(const char *data_dir,int thd_num,uint64_t kv_rec_num,size_t k_size,size_t v_size)
void usage(const char *arg) {
if(arg==NULL || strcmp(arg,"--help")==0 || strcmp(arg,"-h")==0||strcmp(arg,"-help")) {
fprintf(stdout,"wt_bench:\n");
fprintf(stdout," -d database home\n");
fprintf(stdout," -t thread count\n");
fprintf(stdout," -n record count\n");
fprintf(stdout," -k key buffer size\n");
fprintf(stdout," -v val buffer size\n");
fprintf(stdout," -p prefix of key\n");
fprintf(stdout," -s subfix of key\n");
fprintf(stdout," -i interval for print metric\n");
exit(-1);
}
}
int main(int argc, char *argv[])
{
intopt;
char *string = "d:t:n:k:v:p:s:i";
char *db_home = NULL;
intthread_num = -1;
longkv_rec_num = -1;
intkey_size = -1;
intval_size = -1;
char *prefix = NULL;
char *subfix = NULL;
inttime_interval = -1;
//usage(argv[1]);
while ((opt = getopt(argc, argv, string)) != -1)
{
if (opt == -1)
{
break;
}
switch (opt)
{
case'd':
db_home = strdup(optarg);
break;
case't':
thread_num = atoi(optarg);
break;
case'n':
kv_rec_num = atol(optarg);
break;
case'k':
key_size = atoi(optarg);
break;
case'v':
val_size = atoi(optarg);
break;
case'p':
prefix = strdup(optarg);
break;
case'i':
time_interval = atoi(optarg);
break;
case's':
subfix = strdup(optarg);
break;
default:
break;
}
}
intdefault_size = 8;
if (db_home == NULL)
{
fprintf(stdout, "db_home is nil\n");
exit(-1);
}
time_interval = (time_interval <0)?1:time_interval;
thread_num = (thread_num < 0) ? 5 : thread_num;
key_size = (key_size < 0) ? default_size : key_size;
val_size = (val_size < 0) ? default_size : val_size;
kv_rec_num = (kv_rec_num < 0) ? 1 : kv_rec_num;
wt_bench(db_home, thread_num, (uint64_t)kv_rec_num, key_size, val_size, prefix, subfix,time_interval);
return0;
}