summaryrefslogtreecommitdiffstats
path: root/modules/prefetch/prefetch.lua
diff options
context:
space:
mode:
authorMarek Vavruša <marek.vavrusa@nic.cz>2015-07-17 18:19:29 +0200
committerMarek Vavruša <marek.vavrusa@nic.cz>2015-07-17 18:19:29 +0200
commit3d54a7493eb320752a3167aa13c3af1409e19493 (patch)
treef34abaed05e60465d48e86caf61c112bce80940a /modules/prefetch/prefetch.lua
parentmodules/stats: log frequent and expiring queries (diff)
downloadknot-resolver-3d54a7493eb320752a3167aa13c3af1409e19493.tar.xz
knot-resolver-3d54a7493eb320752a3167aa13c3af1409e19493.zip
modules/prefetch: multisampling, expiring prefetch, configurable window
Diffstat (limited to 'modules/prefetch/prefetch.lua')
-rw-r--r--modules/prefetch/prefetch.lua116
1 files changed, 82 insertions, 34 deletions
diff --git a/modules/prefetch/prefetch.lua b/modules/prefetch/prefetch.lua
index ff3b3166..4da6e3ec 100644
--- a/modules/prefetch/prefetch.lua
+++ b/modules/prefetch/prefetch.lua
@@ -5,26 +5,22 @@
-- @field window length of the coalescing window
local prefetch = {
queue = {},
+ queue_len = 0,
batch = 0,
epoch = 0,
- period = 4 * 24,
- window = 15,
+ period = 24,
+ window = 10,
+ sample = 0,
log = {}
}
--- Calculate current epoch (number of quarter-hours today)
+-- Calculate current epoch (which window fits current time)
local function current_epoch()
- return os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window) + 1
+ return (os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window)) % prefetch.period + 1
end
-- Resolve queued records and flush the queue
function prefetch.dispatch(ev)
- -- Defer prefetching if the server is loaded
- if worker.stats().concurrent > 10 then
- event.after(minute, prefetch.dispatch)
- prefetch.batch = prefetch.batch + prefetch.batch / 2
- return 0
- end
local deleted = 0
for key, val in pairs(prefetch.queue) do
worker.resolve(string.sub(key, 2), string.byte(key))
@@ -34,55 +30,107 @@ function prefetch.dispatch(ev)
prefetch.queue[key] = nil
end
deleted = deleted + 1
- if deleted == prefetch.batch then
+ if deleted >= prefetch.batch then
break
end
end
if deleted > 0 then
- event.after(minute, prefetch.dispatch)
+ event.after((prefetch.window * 6) * sec, prefetch.dispatch)
end
+ prefetch.queue_len = prefetch.queue_len - deleted
+ stats['predict.queue'] = prefetch.queue_len
+ collectgarbage()
return 0
end
--- Process current epoch
-function prefetch.process(ev)
- -- Process current learning epoch
+-- Sample current epoch, return number of sampled queries
+local function sample(epoch_now)
+ local queries = stats.frequent()
+ stats.clear_frequent()
local start = os.clock()
- local recent_queries = stats.queries()
- stats.queries_clear()
- local current = {}
- for i = 1, #recent_queries do
- local entry = recent_queries[i]
+ local current = prefetch.log[prefetch.epoch]
+ if prefetch.epoch ~= epoch_now then
+ current = {}
+ end
+ local nr_samples = #queries
+ for i = 1, nr_samples do
+ local entry = queries[i]
local key = string.char(entry.type)..entry.name
current[key] = entry.count
- -- print('.. learning', entry.name, entry.type)
end
- print (string.format('[prob] learned epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
+ print (string.format('[prob] sampling epoch: %d/%d, %.2f sec (%d items)', prefetch.epoch, prefetch.sample, os.clock() - start, #queries))
prefetch.log[prefetch.epoch] = current
- prefetch.epoch = prefetch.epoch % prefetch.period + 1
- -- Predict queries for the next epoch based on the usage patterns
+ prefetch.sample = prefetch.sample + 1
+ return nr_samples
+end
+
+-- Prefetch soon-to-expire records
+local function refresh()
+ local queries = stats.expiring()
+ stats.clear_expiring()
+ local nr_samples = #queries
+ for i = 1, nr_samples do
+ local entry = queries[i]
+ local key = string.char(entry.type)..entry.name
+ prefetch.queue[key] = 1
+ end
+ print (string.format('[prob] prefetching epoch: %d/%d (%d items)', prefetch.epoch, prefetch.sample, nr_samples))
+ return nr_samples
+end
+
+-- Sample current epoch, return number of sampled queries
+local function predict(epoch_now)
+ local start = os.clock()
+ local queued = 0
for i = 1, prefetch.period / 2 - 1 do
- current = prefetch.log[prefetch.epoch - i]
- local past = prefetch.log[prefetch.epoch - 2*i]
+ local current = prefetch.log[epoch_now - i]
+ local past = prefetch.log[epoch_now - 2*i]
if current and past then
for k, v in pairs(current) do
- if past[k] ~= nil then
- prefetch.queue[k] = v
+ if past[k] ~= nil and not prefetch.queue[k] then
+ queued = queued + 1
+ prefetch.queue[k] = 1
end
end
end
end
- print (string.format('[prob] predicted epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
- -- TODO: batch in soon-expiring queries
- -- TODO: clusterize records often found together
+ print (string.format('[prob] predicted epoch: %d, %.2f sec (%d items)', prefetch.epoch, os.clock() - start, queued))
+ return queued
+end
+
+-- Process current epoch
+function prefetch.process(ev)
+ -- Start a new epoch, or continue sampling
+ local epoch_now = current_epoch()
+ local nr_learned = sample(epoch_now)
+ local nr_queued = 0
+ -- End of epoch, predict next
+ if prefetch.epoch ~= epoch_now then
+ prefetch.queue = {}
+ prefetch.queue_len = 0
+ prefetch.epoch = epoch_now
+ prefetch.sample = 0
+ nr_queued = nr_queued + predict(epoch_now)
+ prefetch.queue_len = prefetch.queue_len + nr_queued
+ end
+ -- Prefetch expiring records
+ nr_queued = nr_queued + refresh()
-- Dispatch prefetch requests
- prefetch.batch = #prefetch.queue / prefetch.window
- event.after(0, prefetch.dispatch)
+ if nr_queued > 0 then
+ prefetch.queue_len = prefetch.queue_len + nr_queued
+ prefetch.batch = prefetch.queue_len / 10
+ event.after(0, prefetch.dispatch)
+ end
+ event.after(prefetch.window * minute, prefetch.process)
+ stats['predict.epoch'] = epoch_now
+ stats['predict.queue'] = prefetch.queue_len
+ stats['predict.learned'] = nr_learned
+ collectgarbage()
end
function prefetch.init(module)
prefetch.epoch = current_epoch()
- event.recurrent(prefetch.window * minute, prefetch.process)
+ event.after(prefetch.window * minute, prefetch.process)
end
function prefetch.deinit(module)