#include <dirent.h>
|
#include <errno.h>
|
#include <fcntl.h>
|
#include <unistd.h>
|
#include <stdlib.h>
|
#include <string.h>
|
#include <stdint.h>
|
#include <stdbool.h>
|
#include <assert.h>
|
#include <pthread.h>
|
#include <fcntl.h>
|
#include <sys/stat.h>
|
#include <sys/stat.h>
|
#include <wiredtiger_ext.h>
|
#include <wiredtiger.h>
|
static char charset[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
typedef struct metric_t
|
{
|
char *type;
|
uint64_tbytes;
|
uint64_tcount;
|
intsleep_seconds;
|
booldone;
|
} metric;
|
|
typedef struct thread_arg_t
|
{
|
WT_CONNECTION *conn;
|
inttid;
|
size_tk_size;
|
size_tv_size;
|
char *prefix;
|
char *subfix;
|
uint64_tkv_num;
|
metric *m;
|
|
} thread_arg;
|
|
static const char *home;
|
|
static int
|
create_databae_home(const char *data_dir)
|
{
|
intret = 0;
|
DIR *dir = opendir(data_dir);
|
if (dir)
|
{
|
closedir(dir);
|
}
|
elseif (ENOENT == errno)
|
{
|
mkdir(data_dir, 0644);
|
}
|
else
|
{
|
ret = -1;
|
}
|
returnret;
|
}
|
|
uint32_t jump_consistent_hash(uint64_t key, int32_t num_buckets)
|
{
|
int64_tb = -1, j = 0;
|
while (j < num_buckets)
|
{
|
b = j;
|
key = key * 2862933555777941757ULL + 1;
|
j = (b + 1) * ((double)(1LL << 31) / (double)((key >> 33) + 1));
|
}
|
returnabs(b);
|
}
|
|
int rand_str(char *s, const char *prefix, const char *subfix, size_t n)
|
{
|
inti, j;
|
size_ts_size = 0;
|
size_tcharset_len = strlen((char *)&charset);
|
if (prefix != NULL)
|
{
|
memcpy(s, prefix, strlen(prefix));
|
s_size = strlen(prefix);
|
s[s_size] = '\0';
|
}
|
|
for (i = 0; i < n; i++)
|
{
|
//int index = jump_consistent_hash(rand(), charset_len);
|
intindex = rand()%charset_len;
|
s[s_size++] = charset[index];
|
}
|
if (subfix != NULL)
|
{
|
size_tsubfix_size = strlen(subfix);
|
memcpy(s + s_size, subfix, subfix_size);
|
s_size = s_size + subfix_size;
|
}
|
s[s_size] = '\0';
|
returns_size;
|
}
|
static void *metric_print(void *arg)
|
{
|
metric *mt = (metric *)arg;
|
pthread_mutex_tm = PTHREAD_MUTEX_INITIALIZER;
|
pthread_cond_tc;
|
structtimespecto;
|
pthread_condattr_tattr;
|
|
pthread_condattr_init(&attr);
|
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
|
pthread_cond_init(&c, &attr);
|
|
pthread_mutex_lock(&m);
|
while (mt->done)
|
{
|
clock_gettime(CLOCK_MONOTONIC, &to);
|
to.tv_sec += mt->sleep_seconds;
|
pthread_cond_timedwait(&c, &m, &to);
|
fprintf(stdout, "%s count=%ld,bytes=%ld\n",mt->type,mt->count,mt->bytes);
|
}
|
pthread_mutex_unlock(&m);
|
}
|
void *
|
thread_do_put(void *arg)
|
{
|
thread_arg *param = (thread_arg *)arg;
|
WT_SESSION *session = NULL;
|
WT_CURSOR *cursor = NULL;
|
intret = param->conn->open_session(param->conn, NULL, NULL, &session);
|
|
fprintf(stdout, "thread id=%d,init session=%d\n", param->tid, ret);
|
session->create(session, "table:perrynzhou-wt_bench", "key_format=S,value_format=S,checksum=off");
|
ret = session->open_cursor(session, "table:perrynzhou-wt_bench", NULL, NULL,&cursor);
|
fprintf(stdout, "thread id=%d,init cursor=%d\n", param->tid, ret);
|
uint64_ti = 0;
|
size_tfix_sz = 0;
|
if (param->prefix != NULL)
|
{
|
fix_sz += strlen(param->prefix);
|
}
|
if (param->subfix != NULL)
|
{
|
fix_sz += strlen(param->subfix);
|
}
|
chartype[256] = \{'\0'};
|
snprintf((char *)&type,256,"w-%d",param->tid);
|
param->m->type = strdup((char *)&type);
|
pthread_tm_thd;
|
pthread_create(&m_thd,NULL,(void *)metric_print,(void *)param->m);
|
char *key_buf = malloc(sizeof(char) * (param->k_size + fix_sz) + 1);
|
char *val_buf = malloc(sizeof(char) * param->v_size + 1);
|
for (inti = 0; i < param->kv_num; i++)
|
{
|
|
intk_sz = rand_str(key_buf, param->prefix, param->subfix, param->k_size - fix_sz);
|
intv_sz = rand_str(val_buf, NULL, NULL, param->v_size);
|
cursor->set_key(cursor, key_buf);
|
cursor->set_value(cursor, val_buf);
|
cursor->insert(cursor);
|
// usleep(1);
|
param->m->bytes = param->m->bytes +k_sz+v_sz;
|
}
|
pthread_join(m_thd,NULL);
|
if (key_buf != NULL)
|
{
|
free(key_buf);
|
}
|
if (val_buf != NULL)
|
{
|
free(val_buf);
|
}
|
returnNULL;
|
}
|
|
void *
|
thread_do_get(void *arg)
|
{
|
|
thread_arg *param = (thread_arg *)arg;
|
WT_SESSION *session = NULL;
|
WT_CURSOR *cursor = NULL;
|
intret = param->conn->open_session(param->conn, NULL, NULL, &session);
|
|
// fprintf(stdout, "thread id=%d,init session=%d\n", param->tid, ret);
|
session->create(session, "table:perrynzhou-wt_bench", "key_format=S,value_format=S");
|
|
ret = session->open_cursor(session, "table:perrynzhou-wt_bench", NULL,NULL, &cursor);
|
// fprintf(stdout, "thread id=%d,init cursor=%d\n", param->tid, ret);
|
|
size_tfix_sz = 0;
|
if (param->prefix != NULL)
|
{
|
fix_sz += strlen(param->prefix);
|
}
|
if (param->subfix != NULL)
|
{
|
fix_sz += strlen(param->subfix);
|
}
|
size_tline_sz = param->k_size + param->v_size;
|
char *line_buf = malloc(sizeof(char) * line_sz + 1);
|
char *key_buf = malloc(sizeof(char) * param->k_size + 1);
|
char *val_buf = malloc(sizeof(char) * param->v_size + 1);
|
}
|
|
static void
|
wt_bench(const char *data_dir, int thd_num, uint64_t kv_rec_num, size_t k_size, size_t v_size, char *prefix, char *subfix,int time_interval)
|
{
|
WT_CONNECTION *conn;
|
WT_CURSOR *cursor;
|
WT_SESSION *session;
|
constchar *key, *value;
|
intret;
|
|
wiredtiger_open(data_dir, NULL, "create", &conn);
|
conn->open_session(conn, NULL, NULL, &session);
|
|
pthread_t *w_thd = malloc(sizeof(pthread_t) * thd_num);
|
pthread_t *r_thd = malloc(sizeof(pthread_t) * thd_num);
|
pthread_t *m_thd = malloc(sizeof(pthread_t));
|
thread_arg *r_params = malloc(sizeof(thread_arg) * thd_num);
|
thread_arg *w_params = malloc(sizeof(thread_arg) * thd_num);
|
|
// pthread_create(m_thd, NULL, (void *)metric_print, (void *)m);
|
inti = 0;
|
for (; i < thd_num; i++)
|
{
|
w_params[i].tid = i;
|
w_params[i].conn = conn;
|
w_params[i].k_size = k_size;
|
w_params[i].v_size = v_size;
|
w_params[i].kv_num = kv_rec_num;
|
w_params[i].prefix = prefix;
|
w_params[i].subfix = subfix;
|
w_params[i].m = calloc(1, sizeof(metric));
|
w_params[i].m->sleep_seconds=time_interval;
|
|
w_params[i].m->done = true;
|
pthread_create(&w_thd[i], NULL, (void *)&thread_do_put, &w_params[i]);
|
}
|
|
for (i = 0; i < thd_num; i++)
|
{
|
pthread_join(w_thd[i], NULL);
|
free(w_params[i].m->type);
|
}
|
|
for (i = 0; i < thd_num; i++)
|
{
|
r_params[i].tid = i;
|
r_params[i].conn = conn;
|
r_params[i].k_size = k_size;
|
r_params[i].v_size = v_size;
|
r_params[i].prefix = prefix;
|
r_params[i].subfix = subfix;
|
pthread_create(&r_thd[i], NULL, (void *)&thread_do_get, &r_params[i]);
|
}
|
for (i = 0; i < thd_num; i++)
|
{
|
pthread_join(r_thd[i], NULL);
|
}
|
// pthread_join(*m_thd, NULL);
|
conn->close(conn, NULL);
|
if (r_thd != NULL)
|
{
|
free(r_thd);
|
free(r_params);
|
}
|
if (w_thd != NULL)
|
{
|
free(w_thd);
|
free(w_params);
|
}
|
}
|
// wt_bench(const char *data_dir,int thd_num,uint64_t kv_rec_num,size_t k_size,size_t v_size)
|
|
void usage(const char *arg) {
|
if(arg==NULL || strcmp(arg,"--help")==0 || strcmp(arg,"-h")==0||strcmp(arg,"-help")) {
|
fprintf(stdout,"wt_bench:\n");
|
fprintf(stdout," -d database home\n");
|
fprintf(stdout," -t thread count\n");
|
fprintf(stdout," -n record count\n");
|
fprintf(stdout," -k key buffer size\n");
|
fprintf(stdout," -v val buffer size\n");
|
fprintf(stdout," -p prefix of key\n");
|
fprintf(stdout," -s subfix of key\n");
|
fprintf(stdout," -i interval for print metric\n");
|
exit(-1);
|
}
|
}
|
int main(int argc, char *argv[])
|
{
|
intopt;
|
char *string = "d:t:n:k:v:p:s:i";
|
char *db_home = NULL;
|
intthread_num = -1;
|
longkv_rec_num = -1;
|
intkey_size = -1;
|
intval_size = -1;
|
char *prefix = NULL;
|
char *subfix = NULL;
|
inttime_interval = -1;
|
//usage(argv[1]);
|
while ((opt = getopt(argc, argv, string)) != -1)
|
{
|
if (opt == -1)
|
{
|
break;
|
}
|
switch (opt)
|
{
|
case'd':
|
db_home = strdup(optarg);
|
break;
|
case't':
|
thread_num = atoi(optarg);
|
break;
|
case'n':
|
kv_rec_num = atol(optarg);
|
break;
|
case'k':
|
key_size = atoi(optarg);
|
break;
|
case'v':
|
val_size = atoi(optarg);
|
break;
|
case'p':
|
prefix = strdup(optarg);
|
break;
|
case'i':
|
time_interval = atoi(optarg);
|
break;
|
case's':
|
subfix = strdup(optarg);
|
break;
|
default:
|
break;
|
}
|
}
|
intdefault_size = 8;
|
if (db_home == NULL)
|
{
|
fprintf(stdout, "db_home is nil\n");
|
exit(-1);
|
}
|
time_interval = (time_interval <0)?1:time_interval;
|
thread_num = (thread_num < 0) ? 5 : thread_num;
|
key_size = (key_size < 0) ? default_size : key_size;
|
|
val_size = (val_size < 0) ? default_size : val_size;
|
kv_rec_num = (kv_rec_num < 0) ? 1 : kv_rec_num;
|
|
wt_bench(db_home, thread_num, (uint64_t)kv_rec_num, key_size, val_size, prefix, subfix,time_interval);
|
return0;
|
}
|