using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Configuration; using log4net; using Docstoc.Infrastructure; using Docstoc.Infrastructure.Parallel; namespace Docstoc.Harvest.DataAccess { abstract class FlushableStore : IFlushable { static readonly ILog _logger = LogManager.GetLogger(typeof(IFlushable)); const int DELAY_S = 5, BATCH = 200; readonly protected ProcessQueue _queue; readonly protected Consumer _writer; readonly Watcher _watcher; readonly TimeSpan _delay = TimeSpan.FromSeconds(DELAY_S), FLUSHABLE_THROTTLE = TimeSpan.Zero; readonly int _batch = BATCH; #region ctor & dtor protected FlushableStore(string configPrefix, bool uniqueQueue) : this(configPrefix, uniqueQueue, 0, TimeSpan.Zero) { } protected FlushableStore(string configPrefix, bool uniqueQueue, int batchSize, TimeSpan flushDelay) { try { if (string.IsNullOrEmpty(configPrefix)) throw (new ArgumentNullException("configPrefix")); if (batchSize <= 0) batchSize = BATCH; if (flushDelay <= TimeSpan.Zero) flushDelay = TimeSpan.FromSeconds(DELAY_S); FLUSHABLE_THROTTLE = ConfigurationManager.AppSettings.ExtractValue("FLUSHABLE_THROTTLE", FLUSHABLE_THROTTLE); _delay = ConfigurationManager.AppSettings.ExtractValue(configPrefix + "BATCH_DELAY", flushDelay); _logger.InfoFormat("{0} BATCH_DELAY=={1} | FLUSHABLE_THROTTLE=={2}", configPrefix, _delay, FLUSHABLE_THROTTLE); _queue = new ProcessQueue(uniqueQueue); _queue.OnDequeue += (count) => CheckResume(); _batch = ConfigurationManager.AppSettings.ExtractValue(configPrefix + "BATCH_SIZE", batchSize); _writer = new Consumer("FlushQW", _queue, FlushToDB, _batch); _writer.Throttle = FLUSHABLE_THROTTLE; _writer.Run(); _writer.Pause(); _watcher = new Watcher(() => true, TimeSpan.FromMilliseconds(250)); //every quater sec _watcher.Executed += (self) => CheckResume(); _watcher.Start(); } catch (Exception ex) { _logger.Fatal("CTOR", ex); throw; } } void CheckResume() { try { if (_writer.IsPaused) { if (_queue.Count >= _batch) _writer.Resume(); else if(_queue.Count > 0) { lock (_slock) { if (DateTime.Now.Subtract(_lastFlush) >= _delay) _writer.Resume(); } } } } catch (Exception ex) { _logger.Error("CheckResume", ex); } } ~FlushableStore() { Dispose(); } int _disposed = 0; public virtual void Dispose() { try { if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) { if (_watcher != null) _watcher.Dispose(); Flush(false); //force wait flush... if (_writer != null) _writer.Dispose(); if (_queue != null) _queue.Dispose(); } } catch (Exception ex) { _logger.Error("Dispose", ex); throw; } } #endregion readonly object _slock = new object(); DateTime _lastFlush = DateTime.MinValue; #region IFlushable Members protected abstract void SaveToDB(List items); void FlushToDB(Consumer caller, List items) { try { if (items.IsNullOrEmpty()) return; SaveToDB(items); //call custom write logic } catch (Exception ex) { _logger.Error("FlushToDB", ex); throw; } finally { lock (_slock) _lastFlush = DateTime.Now; if (caller.Queue.Count == 0) caller.Pause(); } } public virtual void Flush(bool async) { try { if (_queue.Count > 0 && _writer.IsPaused) { _writer.Resume(); if (!async) //spin block... { while (_queue.Count > 0 || _writer.IsProcessing) Thread.Sleep(100); } } } catch (Exception ex) { _logger.Error("Flush(" + async + ")", ex); throw; } } #endregion } }