// mongo.h typedef void (*mongo_async_send_handler)(const char* pbuf, size_t len, void* arg); #define MONGO_ASYNC_READ_HEAD_LEN (sizeof(mongo_header) + sizeof(mongo_reply_fields)) void mongo_async_init(mongo_async_send_handler pfunc); void mongo_async_find(mongo_connection* conn, const char* ns, const bson* query, bson* fields, int nToReturn, int nToSkip, int options, int id, void* arg); mongo_cursor* mongo_async_response(mongo_connection* conn, const char* ns, mongo_reply* mm, int* id); mongo_reply * mongo_read_async_response_head(mongo_connection* conn, const char* buf, int* data_len/*out*/); // buf size must be MONGO_ASYNC_READ_HEAD_LEN and retu rn left data len mongo_reply* mongo_read_async_response_data(mongo_connection* conn, mongo_reply* out, const char* buf); // mongo.c static mongo_async_send_handler async_handler = NULL; void mongo_message_async_send(mongo_connection* conn, mongo_message* mm, void* arg){ mongo_header head; /* little endian */ bson_little_endian32(&head.len, &mm->head.len); bson_little_endian32(&head.id, &mm->head.id); bson_little_endian32(&head.responseTo, &mm->head.responseTo); bson_little_endian32(&head.op, &mm->head.op); MONGO_TRY{ //looping_write(conn, &head, sizeof(head)); //looping_write(conn, &mm->data, mm->head.len - sizeof(head)); async_handler((char*)&head, sizeof(head), arg); async_handler((char*)&mm->data, mm->head.len - sizeof(head), arg); }MONGO_CATCH{ free(mm); MONGO_RETHROW(); } free(mm); } void mongo_async_init(mongo_async_send_handler pfunc){ if( !pfunc ){ printf("mongodb async init failed\n"); exit(-5); } async_handler = pfunc; } // async mongo find // note: the id must not be zero or mongo_message_create will call rand() to generate void mongo_async_find(mongo_connection* conn, const char* ns, const bson* query, bson* fields, int nToReturn, int nToSkip, int options, int id, void* arg){ char * data; mongo_message * mm = mongo_message_create( 16 + /* header */ 4 + /* options */ strlen( ns ) + 1 + /* ns */ 4 + 4 + /* skip,return */ bson_size( query ) + bson_size( fields ) , id , 0 , mongo_op_query ); data = &mm->data; data = mongo_data_append32( data , &options ); data = mongo_data_append( data , ns , strlen( ns ) + 1 ); data = mongo_data_append32( data , &nToSkip ); data = mongo_data_append32( data , &nToReturn ); data = mongo_data_append( data , query->data , bson_size( query ) ); if ( fields ) data = mongo_data_append( data , fields->data , bson_size( fields ) ); bson_fatal_msg( (data == ((char*)mm) + mm->head.len), "query building fail!" ); mongo_message_async_send(conn, mm, arg); } // buf len must be >= MONGO_ASYNC_READ_HEAD_LEN mongo_reply * mongo_read_async_response_head(mongo_connection* conn, const char* buf, int* data_len/*out*/ ){ mongo_header head; /* header from network */ mongo_reply_fields fields; /* header from network */ mongo_reply * out; /* native endian */ int len; //looping_read(conn, &head, sizeof(head)); //looping_read(conn, &fields, sizeof(fields)); memcpy(&head, buf, sizeof(head)); memcpy(&fields, buf + sizeof(head), sizeof(fields)); bson_little_endian32(&len, &head.len); if (len < sizeof(head)+sizeof(fields) || len > 64*1024*1024) MONGO_THROW(MONGO_EXCEPT_NETWORK); /* most likely corruption */ out = (mongo_reply*)bson_malloc(len); out->head.len = len; bson_little_endian32(&out->head.id, &head.id); bson_little_endian32(&out->head.responseTo, &head.responseTo); bson_little_endian32(&out->head.op, &head.op); bson_little_endian32(&out->fields.flag, &fields.flag); bson_little_endian64(&out->fields.cursorID, &fields.cursorID); bson_little_endian32(&out->fields.start, &fields.start); bson_little_endian32(&out->fields.num, &fields.num); *data_len = out->head.len - MONGO_ASYNC_READ_HEAD_LEN; return out; } // read response data mongo_reply* mongo_read_async_response_data(mongo_connection* conn, mongo_reply* out, const char* buf){ int len = out->head.len; MONGO_TRY{ //looping_read(conn, &out->objs, len-sizeof(head)-sizeof(fields)); memcpy(&out->objs, buf, len - MONGO_ASYNC_READ_HEAD_LEN); }MONGO_CATCH{ free(out); MONGO_RETHROW(); } return out; } mongo_cursor* mongo_async_response(mongo_connection* conn, const char* ns, mongo_reply* mm, int* id){ int sl; volatile mongo_cursor * cursor; /* volatile due to longjmp in mongo exception handler */ cursor = (mongo_cursor*)bson_malloc(sizeof(mongo_cursor)); MONGO_TRY{ cursor->mm = mm; }MONGO_CATCH{ free((mongo_cursor*)cursor); /* cast away volatile, not changing type */ MONGO_RETHROW(); } sl = strlen(ns)+1; cursor->ns = bson_malloc(sl); if (!cursor->ns){ free(cursor->mm); free((mongo_cursor*)cursor); /* cast away volatile, not changing type */ return 0; } memcpy((void*)cursor->ns, ns, sl); /* cast needed to silence GCC warning */ cursor->conn = conn; cursor->current.data = NULL; *id = mm->head.responseTo; // return the id return (mongo_cursor*)cursor; }