diff options
author | Marek Vavruša <marek.vavrusa@nic.cz> | 2015-07-17 18:19:29 +0200 |
---|---|---|
committer | Marek Vavruša <marek.vavrusa@nic.cz> | 2015-07-17 18:19:29 +0200 |
commit | 3d54a7493eb320752a3167aa13c3af1409e19493 (patch) | |
tree | f34abaed05e60465d48e86caf61c112bce80940a /modules/prefetch/prefetch.lua | |
parent | modules/stats: log frequent and expiring queries (diff) | |
download | knot-resolver-3d54a7493eb320752a3167aa13c3af1409e19493.tar.xz knot-resolver-3d54a7493eb320752a3167aa13c3af1409e19493.zip |
modules/prefetch: multisampling, expiring prefetch, configurable window
Diffstat (limited to 'modules/prefetch/prefetch.lua')
-rw-r--r-- | modules/prefetch/prefetch.lua | 116 |
1 files changed, 82 insertions, 34 deletions
diff --git a/modules/prefetch/prefetch.lua b/modules/prefetch/prefetch.lua index ff3b3166..4da6e3ec 100644 --- a/modules/prefetch/prefetch.lua +++ b/modules/prefetch/prefetch.lua @@ -5,26 +5,22 @@ -- @field window length of the coalescing window local prefetch = { queue = {}, + queue_len = 0, batch = 0, epoch = 0, - period = 4 * 24, - window = 15, + period = 24, + window = 10, + sample = 0, log = {} } --- Calculate current epoch (number of quarter-hours today) +-- Calculate current epoch (which window fits current time) local function current_epoch() - return os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window) + 1 + return (os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window)) % prefetch.period + 1 end -- Resolve queued records and flush the queue function prefetch.dispatch(ev) - -- Defer prefetching if the server is loaded - if worker.stats().concurrent > 10 then - event.after(minute, prefetch.dispatch) - prefetch.batch = prefetch.batch + prefetch.batch / 2 - return 0 - end local deleted = 0 for key, val in pairs(prefetch.queue) do worker.resolve(string.sub(key, 2), string.byte(key)) @@ -34,55 +30,107 @@ function prefetch.dispatch(ev) prefetch.queue[key] = nil end deleted = deleted + 1 - if deleted == prefetch.batch then + if deleted >= prefetch.batch then break end end if deleted > 0 then - event.after(minute, prefetch.dispatch) + event.after((prefetch.window * 6) * sec, prefetch.dispatch) end + prefetch.queue_len = prefetch.queue_len - deleted + stats['predict.queue'] = prefetch.queue_len + collectgarbage() return 0 end --- Process current epoch -function prefetch.process(ev) - -- Process current learning epoch +-- Sample current epoch, return number of sampled queries +local function sample(epoch_now) + local queries = stats.frequent() + stats.clear_frequent() local start = os.clock() - local recent_queries = stats.queries() - stats.queries_clear() - local current = {} - for i = 1, #recent_queries do - local entry = recent_queries[i] + local current = prefetch.log[prefetch.epoch] + if prefetch.epoch ~= epoch_now then + current = {} + end + local nr_samples = #queries + for i = 1, nr_samples do + local entry = queries[i] local key = string.char(entry.type)..entry.name current[key] = entry.count - -- print('.. learning', entry.name, entry.type) end - print (string.format('[prob] learned epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start)) + print (string.format('[prob] sampling epoch: %d/%d, %.2f sec (%d items)', prefetch.epoch, prefetch.sample, os.clock() - start, #queries)) prefetch.log[prefetch.epoch] = current - prefetch.epoch = prefetch.epoch % prefetch.period + 1 - -- Predict queries for the next epoch based on the usage patterns + prefetch.sample = prefetch.sample + 1 + return nr_samples +end + +-- Prefetch soon-to-expire records +local function refresh() + local queries = stats.expiring() + stats.clear_expiring() + local nr_samples = #queries + for i = 1, nr_samples do + local entry = queries[i] + local key = string.char(entry.type)..entry.name + prefetch.queue[key] = 1 + end + print (string.format('[prob] prefetching epoch: %d/%d (%d items)', prefetch.epoch, prefetch.sample, nr_samples)) + return nr_samples +end + +-- Sample current epoch, return number of sampled queries +local function predict(epoch_now) + local start = os.clock() + local queued = 0 for i = 1, prefetch.period / 2 - 1 do - current = prefetch.log[prefetch.epoch - i] - local past = prefetch.log[prefetch.epoch - 2*i] + local current = prefetch.log[epoch_now - i] + local past = prefetch.log[epoch_now - 2*i] if current and past then for k, v in pairs(current) do - if past[k] ~= nil then - prefetch.queue[k] = v + if past[k] ~= nil and not prefetch.queue[k] then + queued = queued + 1 + prefetch.queue[k] = 1 end end end end - print (string.format('[prob] predicted epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start)) - -- TODO: batch in soon-expiring queries - -- TODO: clusterize records often found together + print (string.format('[prob] predicted epoch: %d, %.2f sec (%d items)', prefetch.epoch, os.clock() - start, queued)) + return queued +end + +-- Process current epoch +function prefetch.process(ev) + -- Start a new epoch, or continue sampling + local epoch_now = current_epoch() + local nr_learned = sample(epoch_now) + local nr_queued = 0 + -- End of epoch, predict next + if prefetch.epoch ~= epoch_now then + prefetch.queue = {} + prefetch.queue_len = 0 + prefetch.epoch = epoch_now + prefetch.sample = 0 + nr_queued = nr_queued + predict(epoch_now) + prefetch.queue_len = prefetch.queue_len + nr_queued + end + -- Prefetch expiring records + nr_queued = nr_queued + refresh() -- Dispatch prefetch requests - prefetch.batch = #prefetch.queue / prefetch.window - event.after(0, prefetch.dispatch) + if nr_queued > 0 then + prefetch.queue_len = prefetch.queue_len + nr_queued + prefetch.batch = prefetch.queue_len / 10 + event.after(0, prefetch.dispatch) + end + event.after(prefetch.window * minute, prefetch.process) + stats['predict.epoch'] = epoch_now + stats['predict.queue'] = prefetch.queue_len + stats['predict.learned'] = nr_learned + collectgarbage() end function prefetch.init(module) prefetch.epoch = current_epoch() - event.recurrent(prefetch.window * minute, prefetch.process) + event.after(prefetch.window * minute, prefetch.process) end function prefetch.deinit(module) |