diff options
Diffstat (limited to 'lib/kru.inc.c')
-rw-r--r-- | lib/kru.inc.c | 506 |
1 files changed, 506 insertions, 0 deletions
diff --git a/lib/kru.inc.c b/lib/kru.inc.c new file mode 100644 index 00000000..5f630fb7 --- /dev/null +++ b/lib/kru.inc.c @@ -0,0 +1,506 @@ +/* Copyright (C) 2024 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +/* +KRU estimates recently pricey inputs + +Authors of the simple agorithm (without aging, multi-choice, etc.): + Metwally, D. Agrawal, and A. E. Abbadi. + Efficient computation of frequent and top-k elements in data streams. + In International Conference on Database Theory, 2005. + +With TABLE_COUNT > 1 we're improving reliability by utilizing the property that +longest buckets (cache-lines) get very much shortened, already by providing two choices: + https://en.wikipedia.org/wiki/2-choice_hashing + +The point is to answer point-queries that estimate if the item has been heavily used recently. +To give more weight to recent usage, we use aging via exponential decay (simple to compute). +That has applications for garbage collection of cache and various limiting scenario +(excessive rate, traffic, CPU, maybe RAM). + +### Choosing parameters + +Size (`loads_bits` = log2 length): + - The KRU takes 64 bytes * length * TABLE_COUNT + some small constants. + As TABLE_COUNT == 2 and loads_bits = capacity_log >> 4, we get capacity * 8 Bytes. + - The length should probably be at least something like the square of the number of utilized CPUs. + But this most likely won't be a limiting factor. +*/ + +#include <stdlib.h> +#include <assert.h> +#include <stdatomic.h> +#include <stdbool.h> +#include <stddef.h> +#include <string.h> + +#include "./kru.h" + +/// Block of loads sharing the same time, so that we're more space-efficient. +/// It's exactly a single cache line. +struct load_cl { + _Atomic uint32_t time; + #define LOADS_LEN 15 + uint16_t ids[LOADS_LEN]; + uint16_t loads[LOADS_LEN]; +} ALIGNED_CPU_CACHE; +static_assert(64 == sizeof(struct load_cl), "bad size of struct load_cl"); + +inline static uint64_t rand_bits(unsigned int bits) { + static _Thread_local uint64_t state = 3723796604792068981ull; + const uint64_t prime1 = 11737314301796036329ull; + const uint64_t prime2 = 3107264277052274849ull; + state = prime1 * state + prime2; + //return state & ((1 << bits) - 1); + return state >> (64 - bits); +} + +#include "./kru-decay.inc.c" + +#include "contrib/ucw/lib.h" +#include "libdnssec/error.h" +#include "libdnssec/random.h" +typedef uint64_t hash_t; +#if USE_AES + /// 4-8 rounds should be an OK choice, most likely. + #define AES_ROUNDS 4 +#else + #include "contrib/openbsd/siphash.h" + + /// 1,3 should be OK choice, probably. + enum { + SIPHASH_RC = 1, + SIPHASH_RF = 3, + }; +#endif + +#if USE_AVX2 || USE_SSE41 || USE_AES + #include <immintrin.h> + #include <x86intrin.h> +#endif + +struct kru { +#if USE_AES + /// Hashing secret. Random but shared by all users of the table. + /// Let's not make it too large, so that header fits into 64 Bytes. + char hash_key[48] ALIGNED(32); +#else + /// Hashing secret. Random but shared by all users of the table. + SIPHASH_KEY hash_key; +#endif + struct decay_config decay; + + /// Length of `loads_cls`, stored as binary logarithm. + uint32_t loads_bits; + + #define TABLE_COUNT 2 + /// These are read-write. Each struct has exactly one cache line. + struct load_cl load_cls[][TABLE_COUNT]; +}; + +/// Convert capacity_log to loads_bits +static inline int32_t capacity2loads(int capacity_log) +{ + static_assert(LOADS_LEN == 15 && TABLE_COUNT == 2, ""); + // So, the pair of cache lines hold up to 2*15 elements. + // Let's say that we can reliably store 16 = 1 << (1+3). + // (probably more but certainly not 1 << 5) + const int shift = 1 + 3; + int loads_bits = capacity_log - shift; + // Let's behave reasonably for weird capacity_log values. + return loads_bits > 0 ? loads_bits : 1; +} + +static size_t kru_get_size(int capacity_log) +{ + uint32_t loads_bits = capacity2loads(capacity_log); + if (8 * sizeof(hash_t) < TABLE_COUNT * loads_bits + + 8 * sizeof(((struct kru *)0)->load_cls[0]->ids[0])) { + assert(false); + return 0; + } + + return offsetof(struct kru, load_cls) + + sizeof(struct load_cl) * TABLE_COUNT * (1 << loads_bits); +} + +static bool kru_initialize(struct kru *kru, int capacity_log, kru_price_t max_decay) +{ + if (!kru) { + return false; + } + + uint32_t loads_bits = capacity2loads(capacity_log); + if (8 * sizeof(hash_t) < TABLE_COUNT * loads_bits + + 8 * sizeof(((struct kru *)0)->load_cls[0]->ids[0])) { + assert(false); + return false; + } + + kru->loads_bits = loads_bits; + + if (dnssec_random_buffer((uint8_t *)&kru->hash_key, sizeof(kru->hash_key)) != DNSSEC_EOK) { + return false; + } + + decay_initialize(&kru->decay, max_decay); + + return true; +} + +struct query_ctx { + struct load_cl *l[TABLE_COUNT]; + uint32_t time_now; + kru_price_t price; + uint16_t price16, limit16; + uint16_t id; + uint16_t final_load_value; // set by kru_limited_update if not blocked + uint16_t *load; +}; + +/// Phase 1/3 of a query -- hash, prefetch, ctx init. Based on one 16-byte key. +static inline void kru_limited_prefetch(struct kru *kru, uint32_t time_now, uint8_t key[static 16], kru_price_t price, struct query_ctx *ctx) +{ + // Obtain hash of *buf. + hash_t hash; +#if !USE_AES + hash = SipHash(&kru->hash_key, SIPHASH_RC, SIPHASH_RF, key, 16); +#else + { + __m128i h; /// hashing state + h = _mm_load_si128((__m128i *)key); + // Now do the the hashing itself. + __m128i *aes_key = (void*)kru->hash_key; + for (int i = 0; i < AES_ROUNDS; ++i) { + int key_id = i % (sizeof(kru->hash_key) / sizeof(__m128i)); + h = _mm_aesenc_si128(h, _mm_load_si128(&aes_key[key_id])); + } + memcpy(&hash, &h, sizeof(hash)); + } +#endif + + // Choose the cache-lines to operate on + const uint32_t loads_mask = (1 << kru->loads_bits) - 1; + // Fetch the two cache-lines in parallel before we really touch them. + for (int li = 0; li < TABLE_COUNT; ++li) { + struct load_cl * const l = &kru->load_cls[hash & loads_mask][li]; + __builtin_prefetch(l, 0); // hope for read-only access + hash >>= kru->loads_bits; + ctx->l[li] = l; + } + + ctx->time_now = time_now; + ctx->price = price; + ctx->id = hash; +} + + +/// Phase 1/3 of a query -- hash, prefetch, ctx init. Based on a bit prefix of one 16-byte key. +static inline void kru_limited_prefetch_prefix(struct kru *kru, uint32_t time_now, uint8_t namespace, uint8_t key[static 16], uint8_t prefix, kru_price_t price, struct query_ctx *ctx) +{ + // Obtain hash of *buf. + hash_t hash; + +#if !USE_AES + { + const int rc = SIPHASH_RC, rf = SIPHASH_RF; + + // Hash prefix of key, prefix size, and namespace together. + SIPHASH_CTX hctx; + SipHash_Init(&hctx, &kru->hash_key); + SipHash_Update(&hctx, rc, rf, &namespace, sizeof(namespace)); + SipHash_Update(&hctx, rc, rf, &prefix, sizeof(prefix)); + SipHash_Update(&hctx, rc, rf, key, prefix / 8); + if (prefix % 8) { + const uint8_t masked_byte = key[prefix / 8] & (0xFF00 >> (prefix % 8)); + SipHash_Update(&hctx, rc, rf, &masked_byte, 1); + } + hash = SipHash_End(&hctx, rc, rf); + } +#else + { + + __m128i h; /// hashing state + h = _mm_load_si128((__m128i *)key); + + { // Keep only the prefix. + const uint8_t p = prefix; + + // Prefix mask (1...0) -> little endian byte array (0x00 ... 0x00 0xFF ... 0xFF). + __m128i mask = _mm_set_epi64x( + (p < 64 ? (p == 0 ? 0 : (uint64_t)-1 << (64 - p)) : (uint64_t)-1), // higher 64 bits (1...) -> second half of byte array (... 0xFF) + (p <= 64 ? 0 : (uint64_t)-1 << (128 - p))); // lower 64 bits (...0) -> first half of byte array (0x00 ...) + + // Swap mask endianness (0x11 ... 0x11 0x00 ... 0x00). + mask = _mm_shuffle_epi8(mask, + _mm_set_epi8(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)); + + // Apply mask. + h = _mm_and_si128(h, mask); + } + + // Now do the the hashing itself. + __m128i *aes_key = (void*)kru->hash_key; + { + // Mix namespace and prefix size into the first aes key. + __m128i aes_key1 = _mm_insert_epi16(_mm_load_si128(aes_key), (namespace << 8) | prefix, 0); + h = _mm_aesenc_si128(h, aes_key1); + } + for (int j = 1; j < AES_ROUNDS; ++j) { + int key_id = j % (sizeof(kru->hash_key) / sizeof(__m128i)); + h = _mm_aesenc_si128(h, _mm_load_si128(&aes_key[key_id])); + } + memcpy(&hash, &h, sizeof(hash)); + } +#endif + + // Choose the cache-lines to operate on + const uint32_t loads_mask = (1 << kru->loads_bits) - 1; + // Fetch the two cache-lines in parallel before we really touch them. + for (int li = 0; li < TABLE_COUNT; ++li) { + struct load_cl * const l = &kru->load_cls[hash & loads_mask][li]; + __builtin_prefetch(l, 0); // hope for read-only access + hash >>= kru->loads_bits; + ctx->l[li] = l; + } + + ctx->time_now = time_now; + ctx->price = price; + ctx->id = hash; +} + +/// Phase 2/3 of a query -- returns answer with no state modification (except update_time). +static inline bool kru_limited_fetch(struct kru *kru, struct query_ctx *ctx) +{ + // Compute 16-bit limit and price. + // For 32-bit prices we assume that a 16-bit load value corresponds + // to the 32-bit value extended by low-significant ones and the limit is 2^32 (without ones). + // The 16-bit price is thus rounded up for the comparison with limit, + // but rounded probabilistically for rising the load. + { + const int fract_bits = 8 * sizeof(ctx->price) - 16; + const kru_price_t price = ctx->price; + const kru_price_t fract = price & ((((kru_price_t)1) << fract_bits) - 1); + + ctx->price16 = price >> fract_bits; + ctx->limit16 = -ctx->price16; + + if ((fract_bits > 0) && (fract > 0)) { + ctx->price16 += (rand_bits(fract_bits) < fract); + ctx->limit16--; + } + } + + for (int li = 0; li < TABLE_COUNT; ++li) { + update_time(ctx->l[li], ctx->time_now, &kru->decay); + } + + const uint16_t id = ctx->id; + + // Find matching element. Matching 16 bits in addition to loads_bits. + ctx->load = NULL; +#if !USE_AVX2 + for (int li = 0; li < TABLE_COUNT; ++li) + for (int i = 0; i < LOADS_LEN; ++i) + if (ctx->l[li]->ids[i] == id) { + ctx->load = &ctx->l[li]->loads[i]; + goto load_found; + } +#else + const __m256i id_v = _mm256_set1_epi16(id); + for (int li = 0; li < TABLE_COUNT; ++li) { + static_assert(LOADS_LEN == 15 && sizeof(ctx->l[li]->ids[0]) == 2, ""); + // unfortunately we can't use aligned load here + __m256i ids_v = _mm256_loadu_si256((__m256i *)((uint16_t *)ctx->l[li]->ids - 1)); + __m256i match_mask = _mm256_cmpeq_epi16(ids_v, id_v); + if (_mm256_testz_si256(match_mask, match_mask)) + continue; // no match of id + int index = _bit_scan_reverse(_mm256_movemask_epi8(match_mask)) / 2 - 1; + // there's a small possibility that we hit equality only on the -1 index + if (index >= 0) { + ctx->load = &ctx->l[li]->loads[index]; + goto load_found; + } + } +#endif + + return false; + +load_found:; + return (*ctx->load >= ctx->limit16); +} + +/// Phase 3/3 of a query -- state update, return value overrides previous answer in case of race. +/// Not needed if blocked by fetch phase. +static inline bool kru_limited_update(struct kru *kru, struct query_ctx *ctx) +{ + _Atomic uint16_t *load_at; + if (!ctx->load) { + // No match, so find position of the smallest load. + int min_li = 0; + int min_i = 0; +#if !USE_SSE41 + for (int li = 0; li < TABLE_COUNT; ++li) + for (int i = 0; i < LOADS_LEN; ++i) + if (ctx->l[li]->loads[i] < ctx->l[min_li]->loads[min_i]) { + min_li = li; + min_i = i; + } +#else + int min_val = 0; + for (int li = 0; li < TABLE_COUNT; ++li) { + // BEWARE: we're relying on the exact memory layout of struct load_cl, + // where the .loads array take 15 16-bit values at the very end. + static_assert((offsetof(struct load_cl, loads) - 2) % 16 == 0, + "bad alignment of struct load_cl::loads"); + static_assert(LOADS_LEN == 15 && sizeof(ctx->l[li]->loads[0]) == 2, ""); + __m128i *l_v = (__m128i *)((uint16_t *)ctx->l[li]->loads - 1); + __m128i l0 = _mm_load_si128(l_v); + __m128i l1 = _mm_load_si128(l_v + 1); + // We want to avoid the first item in l0, so we maximize it. + // (but this function takes a signed integer, so -1 is the maximum) + l0 = _mm_insert_epi16(l0, -1, 0); + + // Only one instruction can find minimum and its position, + // and it works on 8x uint16_t. + __m128i mp0 = _mm_minpos_epu16(l0); + __m128i mp1 = _mm_minpos_epu16(l1); + int min0 = _mm_extract_epi16(mp0, 0); + int min1 = _mm_extract_epi16(mp1, 0); + int min01, min_ix; + if (min0 < min1) { + min01 = min0; + min_ix = _mm_extract_epi16(mp0, 1); + } else { + min01 = min1; + min_ix = 8 + _mm_extract_epi16(mp1, 1); + } + + if (li == 0 || min_val > min01) { + min_li = li; + min_i = min_ix; + min_val = min01; + } + } + // now, min_i (and min_ix) is offset by one due to alignment of .loads + if (min_i != 0) // zero is very unlikely + --min_i; +#endif + + ctx->l[min_li]->ids[min_i] = ctx->id; + load_at = (_Atomic uint16_t *)&ctx->l[min_li]->loads[min_i]; + } else { + load_at = (_Atomic uint16_t *)ctx->load; + } + + static_assert(ATOMIC_CHAR16_T_LOCK_FREE == 2, "insufficient atomics"); + const uint16_t price = ctx->price16; + const uint16_t limit = ctx->limit16; + uint16_t load_orig = atomic_load_explicit(load_at, memory_order_relaxed); + do { + if (load_orig >= limit) + return true; + } while (!atomic_compare_exchange_weak_explicit(load_at, &load_orig, load_orig + price, memory_order_relaxed, memory_order_relaxed)); + + ctx->final_load_value = load_orig + price; + return false; +} + +static bool kru_limited_multi_or(struct kru *kru, uint32_t time_now, uint8_t **keys, kru_price_t *prices, size_t queries_cnt) +{ + struct query_ctx ctx[queries_cnt]; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch(kru, time_now, keys[i], prices[i], ctx + i); + } + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_fetch(kru, ctx + i)) + return true; + } + bool ret = false; + + for (size_t i = 0; i < queries_cnt; i++) { + ret |= kru_limited_update(kru, ctx + i); + } + + return ret; +} + +static bool kru_limited_multi_or_nobreak(struct kru *kru, uint32_t time_now, uint8_t **keys, kru_price_t *prices, size_t queries_cnt) +{ + struct query_ctx ctx[queries_cnt]; + bool ret = false; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch(kru, time_now, keys[i], prices[i], ctx + i); + } + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_fetch(kru, ctx + i)) + ret = true; + } + if (ret) return true; + + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_update(kru, ctx + i)) + ret = true; + } + + return ret; +} + +static uint8_t kru_limited_multi_prefix_or(struct kru *kru, uint32_t time_now, uint8_t namespace, + uint8_t key[static 16], uint8_t *prefixes, kru_price_t *prices, size_t queries_cnt, uint16_t *max_load_out) +{ + struct query_ctx ctx[queries_cnt]; + + for (size_t i = 0; i < queries_cnt; i++) { + kru_limited_prefetch_prefix(kru, time_now, namespace, key, prefixes[i], prices[i], ctx + i); + } + + for (size_t i = 0; i < queries_cnt; i++) { + if (kru_limited_fetch(kru, ctx + i)) + return prefixes[i]; + } + + for (int i = queries_cnt - 1; i >= 0; i--) { + if (kru_limited_update(kru, ctx + i)) + return prefixes[i]; + } + + if (max_load_out) { + *max_load_out = 0; + for (size_t i = 0; i < queries_cnt; i++) { + *max_load_out = MAX(*max_load_out, ctx[i].final_load_value); + } + } + + return 0; +} + +/// Update limiting and return true iff it hit the limit instead. +static bool kru_limited(struct kru *kru, uint32_t time_now, uint8_t key[static 16], kru_price_t price) +{ + return kru_limited_multi_or(kru, time_now, &key, &price, 1); +} + +#define KRU_API_INITIALIZER { \ + .get_size = kru_get_size, \ + .initialize = kru_initialize, \ + .limited = kru_limited, \ + .limited_multi_or = kru_limited_multi_or, \ + .limited_multi_or_nobreak = kru_limited_multi_or_nobreak, \ + .limited_multi_prefix_or = kru_limited_multi_prefix_or, \ +} |