import pymongo import time import threading import sys mongocli = pymongo.MongoClient(host="192.168.99.100", username="admin", password="admin123", \ authSource="analyse_db", maxPoolSize=200) db = mongocli['analyse_db'] collection = sys.argv[1] column = sys.argv[2] threaded = int(sys.argv[3]) start_times = [1587288313, 1587374713, 1587461113, 1587547513, 1587633514, 1587719914, 1587806314] #start_times = [1587288313] interval = 86000 time_column = "timestamp_EP" col = db[collection] group_query = {"$group": {"_id": "", "result": {"$avg": "$" + column}}} def aggregate_main_collection(): for start_time in start_times: end_time = start_time + interval cursor = col.aggregate([{"$match": {time_column: {"$gte": start_time, "$lt": end_time}}}, group_query]) print(cursor.next()) def aggregate_thread(local_col, start_time, end_time, thread_id): #local_mongocli = pymongo.MongoClient(host="192.168.99.100", username="admin", password="admin123", \ #authSource="analyse_db") #local_db = local_mongocli['analyse_db'] local_col_obj = db[local_col] #cursor = local_col.aggregate([{"$match": {time_column: \ # {"$gte": start_time, "$lt": end_time}}}, group_query]) thr_start_time = time.time() cursor = local_col_obj.aggregate([group_query]) try: print(cursor.next()) except Exception: print("no data") print("thread " + str(thread_id) + " completion time: ", time.time() - thr_start_time) def aggregate_threaded(): thr_list = [] nthreads = len(start_times) for i in range(0, nthreads): ftime = start_times[i] ttime = ftime + interval local_col = collection + "_day" + str(i) thr = threading.Thread(target=aggregate_thread, args=[local_col, ftime, ttime, i], daemon=True) thr_list.append(thr) thr.start() for thr in thr_list: _res = thr.join() proc_start = time.time() if threaded: aggregate_threaded() else: #aggregate_serial_aggr() aggregate_main_collection() print(time.time() - proc_start)