using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Configuration; using System.Threading; using log4net; using Docstoc.Infrastructure; using Docstoc.Infrastructure.Parallel; using MongoDB.Bson; using MongoDB.Bson.Serialization; using MongoDB.Driver; using MongoDB.Driver.Builders; namespace Docstoc.Harvest.DataAccess.Mongo { sealed class CrawlRequestDB : FlushableStore, ICrawlRequestQueue { static readonly ILog _logger = LogManager.GetLogger(typeof(CrawlRequestDB)); static readonly ILog _dequeueLog = LogManager.GetLogger("CrawlDeQueueLog"); static readonly ILog _enqueueLog = LogManager.GetLogger("CrawlEnQueueLog"); readonly ProcessQueue _syncq; readonly Consumer _synker; readonly int MAX_DEQUEUE = 1000; readonly bool INSERT_FAIL_UPDATE = false; #region ctor & dtor public CrawlRequestDB() : base(typeof(CrawlRequestDB).Name, true, 300, TimeSpan.FromSeconds(3)) { try { MAX_DEQUEUE = ConfigurationManager.AppSettings.ExtractValue("CrawlRequestDB_MAX_DEQUEUE", MAX_DEQUEUE); INSERT_FAIL_UPDATE = ConfigurationManager.AppSettings.ExtractValue("CrawlRequestDB_INSERT_FAIL_UPDATE", INSERT_FAIL_UPDATE); _syncq = new ProcessQueue(true); _syncq.OnEnqueue += On_Enqueue; _synker = new Consumer("DmSync", _syncq, SyncDomain); _synker.Run(); _synker.Pause(); const string SL = "Service Start"; _dequeueLog.Info(SL); _enqueueLog.Info(SL); } catch (Exception ex) { _logger.Error("CTOR", ex); throw; } } void On_Enqueue(int count) { try { if (_synker.IsPaused && (count > 0 || _syncq.Count > 0)) _synker.Resume(); } catch (Exception ex) { _logger.Error("On_Enqueue", ex); } } public override void Dispose() { try { base.Dispose(); if (_synker != null) _synker.Dispose(); if (_syncq != null) _syncq.Dispose(); } catch (Exception ex) { _logger.Error("Dispose", ex); throw; } } #endregion #region batch save method static BsonDocument ToBSON(CrawlRequest o) { var k = new BsonDocument { { "When", o.When }, { "_urlh", o.UrlHash }, }; var d = new BsonDocument { { "_id", k }, { "Target", o.Target.ToString() }, { "Domain", o.Domain }, { "SubDomainOK", o.SubDomainOK }, { "Follows", o.Follows }, { "SubPathOnly", o.SubPathOnly }, { "SubPathMax", o.SubPathMax }, { "ForceFileSave", o.ForceFileSave }, { "Spyder", o.Spyder ?? "UNKNOWN" }, { "Job", o.Job }, }; if (o.Referer != null) d.Add("Referer", o.Referer.ToString()); return d; } static UpdateBuilder ToUpdate(CrawlRequest o) { var u = Update .Set("Target", o.Target.ToString()) .Set("Domain", o.Domain) .Set("SubDomainOK", o.SubDomainOK) .Set("Follows", o.Follows) .Set("SubPathOnly", o.SubPathOnly) .Set("SubPathMax", o.SubPathMax) .Set("ForceFileSave", o.ForceFileSave) .Set("Spyder", o.Spyder ?? "UNKNOWN") .Set("Job", o.Job); if (o.Referer != null) u.Set("Referer", o.Referer.ToString()); return u; } protected override void SaveToDB(List items) { DateTime started = DateTime.UtcNow; if (_logger.IsDebugEnabled) _logger.DebugFormat("SaveToDB(count=={0:N0}) begin", items.Count); bool ok = INSERT_FAIL_UPDATE ? InsertFailUpdate(items) : Upsert(items); if (ok && _enqueueLog.IsInfoEnabled) items.ForEach(r => _enqueueLog.Info(r.ToLog())); if (_logger.IsInfoEnabled) _logger.InfoFormat("SaveToDB of {0:N0} items took {1}", items.Count, DateTime.UtcNow.Subtract(started)); } bool InsertFailUpdate(List items) { bool ok = false; List docs = (from o in items where o != null let t = ToBSON(o) where t != null select t).ToList(); if (docs.IsNullOrEmpty()) return ok; bool retries = false; try { using (var ma = new MongoAccess(CONST.Node_CrawlQueue)) ma.Collection(CONST.DB_CrawlQueue, CONST.COL_CrawlQueue, col => col.InsertBatch(docs, SafeMode.True)); ok = true; } catch (MongoSafeModeException se) { if (!se.Message.ToLower().Contains("duplicate key")) throw; else { retries = true; if (_logger.IsWarnEnabled) _logger.WarnFormat("Duplicate Error, trying to upsert items: {0}", se.Message); } } if (retries) { ok = Upsert(items); _logger.InfoFormat("Duplicate upsert retries SUCCESS!"); } return ok; } bool Upsert(List items) { var ups = (from i in items let k = Query.EQ("_id", new BsonDocument { { "When", i.When }, { "_urlh", i.UrlHash }, }) let t = new { Key = k, Update = ToUpdate(i) } select t).ToList(); using (var ma = new MongoAccess(CONST.Node_CrawlQueue)) { ma.Collection(CONST.DB_CrawlQueue, CONST.COL_CrawlQueue, col => ups.ForEach(u => { try { col.Update(u.Key, u.Update, UpdateFlags.Upsert, SafeMode.True); } catch (MongoSafeModeException mse) { if (mse.Message.ToLower().Contains("duplicate key")) { _logger.WarnFormat("Upsert: {0}", mse.Message); ma.ReConnect(); } else throw; } })); } return true; } #endregion #region ICrawlRequestQueue Members static CrawlRequest FromBSON(BsonDocument d) { CrawlRequest o = null; if (d != null) { BsonDocument key = d["_id"].AsBsonDocument; if (key != null) { DateTime when = key["When"].AsDateTime; string hash = key["_urlh"].AsString; o = new CrawlRequest(when, hash); o.Referer = d.Contains("Referer") ? new Uri(d["Referer"].AsString) : null; o.Target = new Uri(d["Target"].AsString); o.SubDomainOK = d["SubDomainOK"].AsBoolean; o.Follows = d["Follows"].AsInt32; o.SubPathOnly = d["SubPathOnly"].AsBoolean; o.SubPathMax = d["SubPathMax"].AsInt32; o.ForceFileSave = d["ForceFileSave"].AsBoolean; o.Spyder = d["Spyder"].AsString; o.Job = d["Job"].AsGuid; } } return o; } public List Dequeue(DateTime when, int count) { DateTime started = DateTime.UtcNow; if (_logger.IsDebugEnabled) _logger.DebugFormat("Dequeue({0} , {1})", when, count); bool countOK = count > 0 && count <= MAX_DEQUEUE; bool dateOK = when > DateTime.MinValue && when < DateTime.MaxValue; if (!countOK && !dateOK) throw (new ArgumentOutOfRangeException("when needs to be within date min/max or count needs to be between 0 & 100,000")); var res = new List(); try { var keys = new List(); //fetch keys only, slave is ok... using (var ma = new MongoAccess(CONST.Node_CrawlQueue)) { ma.Collection(CONST.DB_CrawlQueue, CONST.COL_CrawlQueue, col => { MongoCursor cursor = dateOK ? col.Find(Query.LTE("_id.When", when)) : col.FindAll(); cursor = cursor.SetFields("_id").SetBatchSize(200); if (CONST.HaveSlaves(CONST.Node_CrawlQueue)) cursor.SlaveOk = true; if (countOK) cursor.Limit = count; foreach (BsonDocument d in cursor) { if (d == null) continue; keys.Add(d["_id"].AsBsonDocument); if (keys.Count >= MAX_DEQUEUE) { _logger.WarnFormat("Dequeue is not designed to fetch more than {0} rows", MAX_DEQUEUE.ToString("N0")); break; } } }); } if (keys.Count > 0) //optimistic find/remove on master only { IMongoSortBy s = SortBy.Ascending("_id.When"); List qlist = (from k in keys where k != null select Query.EQ("_id", k)).ToList(); using (var ma = new MongoAccess(CONST.Node_CrawlQueue)) { ma.Collection(CONST.DB_CrawlQueue, CONST.COL_CrawlQueue, col => qlist.ForEach(q => { CrawlRequest cr; FindAndModifyResult r = col.FindAndRemove(q, s); if (r != null && r.Ok && (cr = FromBSON(r.ModifiedDocument)) != null) res.Add(cr); else if(_logger.IsWarnEnabled && !string.IsNullOrEmpty(r.ErrorMessage)) _logger.WarnFormat("Dequeue _id {0} failed: {1}", q.ToJson(), r.ErrorMessage); })); } } if (_logger.IsWarnEnabled && keys.Count != res.Count) _logger.WarnFormat("Dequeue unable to read {0} items of found {1}", (keys.Count - res.Count).ToString("N0"), res.Count.ToString("N0")); } catch (Exception ex) { _logger.Error("Dequeue", ex); throw; } finally { if (_dequeueLog.IsInfoEnabled && !res.IsNullOrEmpty()) res.ForEach(r => _dequeueLog.Info(r.ToLog())); if (_logger.IsInfoEnabled) { _logger.InfoFormat("Dequeue({0} , {1}) found {2} requests, took {3}", when, count, res.Count.ToString("N0"), DateTime.UtcNow.Subtract(started)); } } return res; } public List Dequeue(int count) { return Dequeue(DateTime.MaxValue, count); } public CrawlRequest Dequeue() { return Dequeue(1).FirstOrDefault(); } public void Enqueue(CrawlRequest item) { _queue.Enqueue(item); } public void Enqueue(IEnumerable items) { _queue.Enqueue(items); } static readonly TimeSpan TOLLERABLE_DIFF = TimeSpan.FromMinutes(3); const int MIN_DELAY_MS = 1; /// /// Reserve a single request date for crawl queue key /// /// full domain name /// read delay for domain /// reserved timeslot for request public DateTime ReserveRequest(string domain, TimeSpan delay) { return ReserveRequest(domain, delay, 1).FirstOrDefault(); } /// /// Reserve a single request date for crawl queue key /// /// full domain name /// read delay for domain /// how many items are to be requested /// reserved timeslot for request public List ReserveRequest(string domain, TimeSpan delay, int items) { var res = new List(); try { DateTime started = DateTime.UtcNow; if (string.IsNullOrEmpty(domain)) throw (new ArgumentException("domain can not be null or blank", "domain")); if (items < 1) throw (new ArgumentException("items can not be less than 1", "items")); if (_logger.IsDebugEnabled) _logger.DebugFormat("ReserveRequest(\"{0}\", {1:N0}, {2}) signaled", domain, items, delay); domain = domain.Trim().ToLower(); long delayms = delay.TotalMilliseconds < MIN_DELAY_MS ? MIN_DELAY_MS : (long)delay.TotalMilliseconds; IMongoQuery q = Query.EQ("_id", domain); IMongoUpdate u = Update.Inc("ms", delayms * items); IMongoSortBy s = SortBy.Ascending("_id"); BsonDocument reserve = null; using (var ma = new MongoAccess(CONST.Node_CrawlQueue)) { ma.Collection(CONST.DB_CrawlQueue, CONST.COL_CrawlSync, col => { FindAndModifyResult r = col.FindAndModify(q, s, u, true, true); reserve = r.ModifiedDocument; if (!r.Ok && _logger.IsWarnEnabled && !string.IsNullOrEmpty(r.ErrorMessage)) _logger.WarnFormat("ReserveRequest(\"{0}\", {1:N0}, {2}) => {3}", domain, items, delay, r.ErrorMessage); }); } BsonValue v; long ms = 0; if (reserve != null && reserve.Contains("ms")) { ms = (v = reserve["ms"]).IsInt64 ? v.AsInt64 : v.AsInt32; DateTime dt = ms.FromJsUtcDateValue(); if (dt == DateTime.MinValue || dt == Infrastructure.Extensions.JS_EPOCH_UTC) throw (new ApplicationException("Unable to properly reserve a request")); if (dt < DateTime.UtcNow.Subtract(TOLLERABLE_DIFF)) _syncq.Enqueue(domain); for (int i = items - 1; i >= 0; i--) res.Add(dt - TimeSpan.FromMilliseconds(delayms * i)); #if DEBUG if (res.Count != res.Distinct().Count()) throw (new ApplicationException("Fails to reserve unique time slots!")); #endif if (_logger.IsInfoEnabled) _logger.InfoFormat("ReserveRequest(\"{0}\", {1}, {2}) returned {3:N0} dates, took {4}", domain, items, delay, res.Count, DateTime.UtcNow.Subtract(started)); } else //manual sync & retry again... { if (_logger.IsDebugEnabled) _logger.DebugFormat("ReserveRequest(\"{0}\", {1}, {2}) found 0 dates, will manually resync", domain, items, delay); ManualSyncDomain(domain); return ReserveRequest(domain, delay, items); } } catch (Exception ex) { _logger.Error(string.Format("ReserveRequest(\"{0}\", {1:N0}, {2})", domain, items, delay), ex); throw; } return res; } readonly object SyncLock = new object(); DateTime ManualSyncDomain(string domain) { lock (SyncLock) { DateTime syncTime = DateTime.UtcNow; long newms = syncTime.ToJsUtcDateValue(); IMongoQuery q = Query.And( Query.EQ("_id", domain), Query.LT("ms", newms - TOLLERABLE_DIFF.TotalMilliseconds)); IMongoUpdate u = Update.Set("ms", newms); using (var ma = new MongoAccess(CONST.Node_CrawlQueue)) ma.Collection(CONST.DB_CrawlQueue, CONST.COL_CrawlSync, col => col.Update(q, u, SafeMode.True)); return syncTime; } } void SyncDomain(Consumer caller, List domains) { try { if (domains.IsNullOrEmpty()) return; lock (SyncLock) { long newms = DateTime.UtcNow.ToJsUtcDateValue(); IMongoQuery q = Query.And( Query.In("_id", BsonArray.Create(domains)), Query.LT("ms", newms - TOLLERABLE_DIFF.TotalMilliseconds)); IMongoUpdate u = Update.Set("ms", newms); using (var ma = new MongoAccess(CONST.Node_CrawlQueue)) ma.Collection(CONST.DB_CrawlQueue, CONST.COL_CrawlSync, col => col.Update(q, u, SafeMode.True)); } } catch (Exception ex) { _logger.Error("SyncDomain", ex); } finally { if (caller.Queue.Count == 0) caller.Pause(); } } #endregion } }