//============================================================================================== // FILENAME : MongoDBThread.cpp // AUTHOR : // CREATION : // Copyright (C) MicroStrategy Incorporated 2011 //============================================================================================== #include "StdAfx.h" #include "MongoDBThread.h" #include "client/dbclient.h" namespace MSQLTEST { MongoDBThread::MongoDBThread(INT32 iTestType, int iIndex, int iTestDurationSeconds, const char* iHostName, const char* iNamespace, string& iJsonString) { mTestType = iTestType; mThreadIndex = iIndex; mTestDurationSeconds = iTestDurationSeconds; //mMongoConnection = new DBClientConnection(); string lError; if (!mMongoConnection.connect(iHostName, lError)) { cout << "MongoDB connection error: " << (lError) << endl; } mNameSpace = iNamespace; mJsonString = iJsonString; // mInsertRowCount = 0; // mSelectRowCount = 0; } MongoDBThread::MongoDBThread(Int32 iTestType, int iIndex, SQLTestAppImpl* iTestAppImpl) { mTestType = iTestType; mThreadIndex = iIndex; mApp = iTestAppImpl; mTestDurationSeconds = mApp->GetTestDuration(); mJsonString = iTestAppImpl->GetJsonString(); string lError; if(!mMongoConnection.connect(mApp->GetHostName().c_str(), lError)) { cout << "MongoDB connection error: " << (lError) << endl; } std::string tn = iTestAppImpl->GetCollectionName(); mNameSpace = iTestAppImpl->GetCollectionName(); mJsonString = iTestAppImpl->GetJsonString(); // mInsertRowCount = 0; // mSelectRowCount = 0; // mUpdateRowCount = 0; } MongoDBThread::~MongoDBThread() throw() { } void MongoDBThread::Run () throw() { bool lIfClosedThread = false; try { // get some metadata out of the loop Int32 lNumberOfDoc = mMongoConnection.count(mNameSpace); // generate the data for update BSONObj lUpDate = fromjson(mJsonString); lUpDate = BSONObjBuilder().appendElements( lUpDate).append( "NumOfDoc" , lNumberOfDoc).obj(); clock_t start_time = clock(); clock_t delta_start_time = clock(); // operation interval double lInterval = mApp->GetOperationInterval(); /// mongo read/write/update if(mTestType == 0)// read { //ofstream out_file_select("select_debug" , ios_base::app); while (1) { // set interval if (lInterval > 0) { MBase::Sleep(lInterval); } if(lNumberOfDoc > 0) { unsigned int lId = MBase::GetPseudoRandomInt32(lNumberOfDoc); #ifdef DEBUG //std::wcout << L"Select * where _id = "<< lId << std::endl; #endif auto_ptr cursor = mMongoConnection.query( mNameSpace , BSON("_id" << lId) ); std::string err = mMongoConnection.getLastError(); if(err!="") { std::wcout << L"Unrecoverable error in MongoDB thread #"<< mThreadIndex <more() ) { count++; BSONObj obj = cursor->next(); } //out_file_select << lId << "\t" << count<< endl; mApp->AddSelectInfo(); } // timer to stop the thread clock_t end = clock(); double lTime = (end - start_time)*1.0 / CLOCKS_PER_SEC; if(lTime >= mTestDurationSeconds) // 10s for test { break; } // Timer to record the status if((end - delta_start_time)*1.0 / CLOCKS_PER_SEC >= mApp->GetStatusDuration()) { lNumberOfDoc = mMongoConnection.count(mNameSpace); delta_start_time = end; } } } else if (mTestType == 3) // insert by js { // maybe not easy to set bulk int lC = mNameSpace.find("."); std::string ldb = mNameSpace.substr(0, lC); std::string lcollection = mNameSpace.substr(lC + 1); while (1) { if (lInterval > 0) { MBase::Sleep(lInterval); } int lUserId = mApp->GetInsertjsInfo(); std::stringstream lJsonData; lJsonData << "{" << "_id:" << lUserId << "," <AddInsertjsInfo(); // timer to stop the thread clock_t end = clock(); double lTime = (end - start_time)*1.0 / CLOCKS_PER_SEC; if(lTime >= mTestDurationSeconds) // 10s for test { break; } // Timer to record the status if((end - delta_start_time)*1.0 / CLOCKS_PER_SEC >= mApp->GetStatusDuration()) { lNumberOfDoc = mMongoConnection.count(mNameSpace); delta_start_time = end; } } } else if (mTestType == 4) // upsert by api { while(1) { // set interval if (lInterval > 0) { MBase::Sleep(lInterval); } #ifdef DEBUG //std::wcout << L"insert , _id = "<< mApp->GetInsertInfo() << std::endl; #endif BSONObj res = ParseFronJson(mJsonString); std::stringstream lId; lId << MBase::GetPseudoRandomInt32(4294967295)<AddInsertjsInfo(); // timer to stop the thread clock_t end = clock(); double lTime = (end - start_time)*1.0 / CLOCKS_PER_SEC; if(lTime >= mTestDurationSeconds) // 10s for test { break; } // Timer to record the status if((end - delta_start_time)*1.0 / CLOCKS_PER_SEC >= mApp->GetStatusDuration()) { lNumberOfDoc = mMongoConnection.count(mNameSpace); delta_start_time = end; } } } else if (mTestType == 1) // insert maybe in bulk mode { std::vector lBulkVector; int lCurrBulkSize = 0; int lBulkSize = mApp->GetInertBulkSize(); while(1) { // set interval if (lInterval > 0) { MBase::Sleep(lInterval); } #ifdef DEBUG //std::wcout << L"insert , _id = "<< mApp->GetInsertInfo() << std::endl; #endif BSONObj res = ParseFronJson(mJsonString); BSONObj lData = BSONObjBuilder().appendElements(BSON("_id"<GetInsertInfo())).appendElements(res).obj(); if(lBulkSize <= 0) { mMongoConnection.insert(mNameSpace, lData); std::string err = mMongoConnection.getLastError(); if(err!="") { std::wcout << L"Unrecoverable error in MongoDB thread #"<< mThreadIndex <AddInsertInfo(1); } else { if(lCurrBulkSize < lBulkSize) { lBulkVector.push_back(lData); lCurrBulkSize++; mApp->AddInsertInfo(1); } else { mMongoConnection.insert(mNameSpace, lBulkVector); std::string err = mMongoConnection.getLastError(); if(err!="") { std::wcout << L"Unrecoverable error in MongoDB thread #"<< mThreadIndex <= mTestDurationSeconds) // 10s for test { // flush the vector if(lCurrBulkSize > 0) { mMongoConnection.insert(mNameSpace, lBulkVector); } break; } // Timer to record the status int iiii = mApp->GetStatusDuration(); if((end - delta_start_time)*1.0 / CLOCKS_PER_SEC >= mApp->GetStatusDuration()) { lNumberOfDoc = mMongoConnection.count(mNameSpace); delta_start_time = end; } } } else if( mTestType == 2) // update { //ofstream out_file_update("insert_debug" , ios_base::app); while(1) { // set interval if (lInterval > 0) { MBase::Sleep(lInterval); } // random generate the _id to be updated if(lNumberOfDoc > 0) { unsigned int lId = MBase::GetPseudoRandomInt32(lNumberOfDoc); #ifdef DEBUG //std::wcout << L"update * where _id = "<< lId << std::endl; #endif // data for update BSONObj lData = BSONObjBuilder().appendElements(BSON("_id"<AddUpdateInfo(); //out_file_update << lId << endl; } // timer to stop the thread clock_t end = clock(); double lTime = (end - start_time)*1.0 / CLOCKS_PER_SEC; if(lTime >= mTestDurationSeconds) // 10s for test { break; } // Timer to record the status if((end - delta_start_time)*1.0 / CLOCKS_PER_SEC >= mApp->GetStatusDuration()) { lNumberOfDoc = mMongoConnection.count(mNameSpace); delta_start_time = end; } } } else{} } catch (...) { if (!lIfClosedThread) { } std::wcout << L"Unrecoverable error in client (mongo) thread #"<< mThreadIndex <