summaryrefslogtreecommitdiffstats
path: root/manager/knot_resolver_manager/statistics.py
blob: ae9d98119942c2506298f79b2801c4c9f061281f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
import asyncio
import importlib
import json
import logging
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Tuple

from knot_resolver_manager import compat
from knot_resolver_manager.config_store import ConfigStore, only_on_real_changes_update
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.kresd_controller.registered_workers import (
    command_registered_workers,
    get_registered_workers_kresids,
)
from knot_resolver_manager.utils.functional import Result
from knot_resolver_manager.utils.modeling.parsing import DataFormat

if TYPE_CHECKING:
    from knot_resolver_manager.kresd_controller.interface import KresID

logger = logging.getLogger(__name__)


_prometheus_support = False
if importlib.util.find_spec("prometheus_client"):
    _prometheus_support = True


if _prometheus_support:
    from prometheus_client import exposition  # type: ignore
    from prometheus_client.bridge.graphite import GraphiteBridge  # type: ignore
    from prometheus_client.core import GaugeMetricFamily  # type: ignore
    from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetricFamily, Metric

    def _counter(name: str, description: str, label: Tuple[str, str], value: float) -> CounterMetricFamily:
        c = CounterMetricFamily(name, description, labels=(label[0],))
        c.add_metric((label[1],), value)  # type: ignore
        return c

    def _gauge(name: str, description: str, label: Tuple[str, str], value: float) -> GaugeMetricFamily:
        c = GaugeMetricFamily(name, description, labels=(label[0],))
        c.add_metric((label[1],), value)  # type: ignore
        return c

    def _histogram(
        name: str, description: str, label: Tuple[str, str], buckets: List[Tuple[str, int]], sum_value: float
    ) -> HistogramMetricFamily:
        c = HistogramMetricFamily(name, description, labels=(label[0],))
        c.add_metric((label[1],), buckets, sum_value=sum_value)  # type: ignore
        return c

    def _parse_resolver_metrics(instance_id: "KresID", metrics: Any) -> Generator[Metric, None, None]:
        sid = str(instance_id)

        # response latency histogram
        BUCKET_NAMES_IN_RESOLVER = ("1ms", "10ms", "50ms", "100ms", "250ms", "500ms", "1000ms", "1500ms", "slow")
        BUCKET_NAMES_PROMETHEUS = ("0.001", "0.01", "0.05", "0.1", "0.25", "0.5", "1.0", "1.5", "+Inf")
        yield _histogram(
            "resolver_response_latency",
            "Time it takes to respond to queries in seconds",
            label=("instance_id", sid),
            buckets=[
                (bnp, metrics["answer"][f"{duration}"])
                for bnp, duration in zip(BUCKET_NAMES_PROMETHEUS, BUCKET_NAMES_IN_RESOLVER)
            ],
            sum_value=metrics["answer"]["sum_ms"] / 1_000,
        )

        yield _counter(
            "resolver_request_total",
            "total number of DNS requests (including internal client requests)",
            label=("instance_id", sid),
            value=metrics["request"]["total"],
        )
        yield _counter(
            "resolver_request_internal",
            "number of internal requests generated by Knot Resolver (e.g. DNSSEC trust anchor updates)",
            label=("instance_id", sid),
            value=metrics["request"]["internal"],
        )
        yield _counter(
            "resolver_request_udp",
            "number of external requests received over plain UDP (RFC 1035)",
            label=("instance_id", sid),
            value=metrics["request"]["udp"],
        )
        yield _counter(
            "resolver_request_tcp",
            "number of external requests received over plain TCP (RFC 1035)",
            label=("instance_id", sid),
            value=metrics["request"]["tcp"],
        )
        yield _counter(
            "resolver_request_dot",
            "number of external requests received over DNS-over-TLS (RFC 7858)",
            label=("instance_id", sid),
            value=metrics["request"]["dot"],
        )
        yield _counter(
            "resolver_request_doh",
            "number of external requests received over DNS-over-HTTP (RFC 8484)",
            label=("instance_id", sid),
            value=metrics["request"]["doh"],
        )
        yield _counter(
            "resolver_request_xdp",
            "number of external requests received over plain UDP via an AF_XDP socket",
            label=("instance_id", sid),
            value=metrics["request"]["xdp"],
        )
        yield _counter(
            "resolver_answer_total",
            "total number of answered queries",
            label=("instance_id", sid),
            value=metrics["answer"]["total"],
        )
        yield _counter(
            "resolver_answer_cached",
            "number of queries answered from cache",
            label=("instance_id", sid),
            value=metrics["answer"]["cached"],
        )
        yield _counter(
            "resolver_answer_stale",
            "number of queries that utilized stale data",
            label=("instance_id", sid),
            value=metrics["answer"]["stale"],
        )
        yield _counter(
            "resolver_answer_rcode_noerror",
            "number of NOERROR answers",
            label=("instance_id", sid),
            value=metrics["answer"]["noerror"],
        )
        yield _counter(
            "resolver_answer_rcode_nodata",
            "number of NOERROR answers without any data",
            label=("instance_id", sid),
            value=metrics["answer"]["nodata"],
        )
        yield _counter(
            "resolver_answer_rcode_nxdomain",
            "number of NXDOMAIN answers",
            label=("instance_id", sid),
            value=metrics["answer"]["nxdomain"],
        )
        yield _counter(
            "resolver_answer_rcode_servfail",
            "number of SERVFAIL answers",
            label=("instance_id", sid),
            value=metrics["answer"]["servfail"],
        )
        yield _counter(
            "resolver_answer_flag_aa",
            "number of authoritative answers",
            label=("instance_id", sid),
            value=metrics["answer"]["aa"],
        )
        yield _counter(
            "resolver_answer_flag_tc",
            "number of truncated answers",
            label=("instance_id", sid),
            value=metrics["answer"]["tc"],
        )
        yield _counter(
            "resolver_answer_flag_ra",
            "number of answers with recursion available flag",
            label=("instance_id", sid),
            value=metrics["answer"]["ra"],
        )
        yield _counter(
            "resolver_answer_flags_rd",
            "number of recursion desired (in answer!)",
            label=("instance_id", sid),
            value=metrics["answer"]["rd"],
        )
        yield _counter(
            "resolver_answer_flag_ad",
            "number of authentic data (DNSSEC) answers",
            label=("instance_id", sid),
            value=metrics["answer"]["ad"],
        )
        yield _counter(
            "resolver_answer_flag_cd",
            "number of checking disabled (DNSSEC) answers",
            label=("instance_id", sid),
            value=metrics["answer"]["cd"],
        )
        yield _counter(
            "resolver_answer_flag_do",
            "number of DNSSEC answer OK",
            label=("instance_id", sid),
            value=metrics["answer"]["do"],
        )
        yield _counter(
            "resolver_answer_flag_edns0",
            "number of answers with EDNS0 present",
            label=("instance_id", sid),
            value=metrics["answer"]["edns0"],
        )
        yield _counter(
            "resolver_query_edns",
            "number of queries with EDNS present",
            label=("instance_id", sid),
            value=metrics["query"]["edns"],
        )
        yield _counter(
            "resolver_query_dnssec",
            "number of queries with DNSSEC DO=1",
            label=("instance_id", sid),
            value=metrics["query"]["dnssec"],
        )

        if "predict" in metrics:
            if "epoch" in metrics["predict"]:
                yield _counter(
                    "resolver_predict_epoch",
                    "current prediction epoch (based on time of day and sampling window)",
                    label=("instance_id", sid),
                    value=metrics["predict"]["epoch"],
                )
            yield _counter(
                "resolver_predict_queue",
                "number of queued queries in current window",
                label=("instance_id", sid),
                value=metrics["predict"]["queue"],
            )
            yield _counter(
                "resolver_predict_learned",
                "number of learned queries in current window",
                label=("instance_id", sid),
                value=metrics["predict"]["learned"],
            )

    def _create_resolver_metrics_loaded_gauge(kresid: "KresID", loaded: bool) -> GaugeMetricFamily:
        return _gauge(
            "resolver_metrics_loaded",
            "0 if metrics from resolver instance were not loaded, otherwise 1",
            label=("instance_id", str(kresid)),
            value=int(loaded),
        )

    async def _deny_turning_off_graphite_bridge(old_config: KresConfig, new_config: KresConfig) -> Result[None, str]:
        if old_config.monitoring.graphite and not new_config.monitoring.graphite:
            return Result.err(
                "You can't turn off graphite monitoring dynamically. If you really want this feature, please let the developers know."
            )

        if (
            old_config.monitoring.graphite is not None
            and new_config.monitoring.graphite is not None
            and old_config.monitoring.graphite != new_config.monitoring.graphite
        ):
            return Result.err("Changing graphite exporter configuration in runtime is not allowed.")

        return Result.ok(None)

    _graphite_bridge: Optional[GraphiteBridge] = None

    @only_on_real_changes_update(lambda c: c.monitoring.graphite)
    async def _configure_graphite_bridge(config: KresConfig) -> None:
        """
        Starts graphite bridge if required
        """
        global _graphite_bridge
        if config.monitoring.graphite is not False and _graphite_bridge is None:
            logger.info(
                "Starting Graphite metrics exporter for [%s]:%d",
                str(config.monitoring.graphite.host),
                int(config.monitoring.graphite.port),
            )
            _graphite_bridge = GraphiteBridge(
                (str(config.monitoring.graphite.host), int(config.monitoring.graphite.port))
            )
            _graphite_bridge.start(  # type: ignore
                interval=config.monitoring.graphite.interval.seconds(), prefix=str(config.monitoring.graphite.prefix)
            )


class ResolverCollector:
    def __init__(self, config_store: ConfigStore) -> None:
        self._stats_raw: "Optional[Dict[KresID, object]]" = None
        self._config_store: ConfigStore = config_store
        self._collection_task: "Optional[asyncio.Task[None]]" = None
        self._skip_immediate_collection: bool = False

    if _prometheus_support:

        def collect(self) -> Generator[Metric, None, None]:
            # schedule new stats collection
            self._trigger_stats_collection()

            # if we have no data, return metrics with information about it and exit
            if self._stats_raw is None:
                for kresid in get_registered_workers_kresids():
                    yield _create_resolver_metrics_loaded_gauge(kresid, False)
                return

            # if we have data, parse them
            for kresid in get_registered_workers_kresids():
                success = False
                try:
                    if kresid in self._stats_raw:
                        metrics = self._stats_raw[kresid]
                        yield from _parse_resolver_metrics(kresid, metrics)
                        success = True
                except json.JSONDecodeError:
                    logger.warning(
                        "Failed to load metrics from resolver instance %s: failed to parse statistics", str(kresid)
                    )
                except KeyError as e:
                    logger.warning(
                        "Failed to load metrics from resolver instance %s: attempted to read missing statistic %s",
                        str(kresid),
                        str(e),
                    )

                yield _create_resolver_metrics_loaded_gauge(kresid, success)

        def describe(self) -> List[Metric]:
            # this function prevents the collector registry from invoking the collect function on startup
            return []

    def report_json(self) -> str:
        # schedule new stats collection
        self._trigger_stats_collection()

        # if we have no data, return metrics with information about it and exit
        if self._stats_raw is None:
            no_stats_dict: Dict[str, None] = {}
            for kresid in get_registered_workers_kresids():
                no_stats_dict[str(kresid)] = None
            return DataFormat.JSON.dict_dump(no_stats_dict)

        stats_dict: Dict[str, object] = {}
        for kresid, stats in self._stats_raw.items():
            stats_dict[str(kresid)] = stats

        return DataFormat.JSON.dict_dump(stats_dict)

    async def collect_kresd_stats(self, _triggered_from_prometheus_library: bool = False) -> None:
        if self._skip_immediate_collection:
            # this would happen because we are calling this function first manually before stat generation,
            # and once again immediately afterwards caused by the prometheus library's stat collection
            #
            # this is a code made to solve problem with calling async functions from sync methods
            self._skip_immediate_collection = False
            return

        config = self._config_store.get()

        if config.monitoring.enabled == "manager-only":
            logger.debug("Skipping kresd stat collection due to configuration")
            self._stats_raw = None
            return

        lazy = config.monitoring.enabled == "lazy"
        cmd = "collect_lazy_statistics()" if lazy else "collect_statistics()"
        logger.debug("Collecting kresd stats with method '%s'", cmd)
        stats_raw = await command_registered_workers(cmd)
        self._stats_raw = stats_raw

        # if this function was not called by the prometheus library and calling collect() is imminent,
        # we should block the next collection cycle as it would be useless
        if not _triggered_from_prometheus_library:
            self._skip_immediate_collection = True

    def _trigger_stats_collection(self) -> None:
        # we are running inside an event loop, but in a synchronous function and that sucks a lot
        # it means that we shouldn't block the event loop by performing a blocking stats collection
        # but it also means that we can't yield to the event loop as this function is synchronous
        # therefore we can only start a new task, but we can't wait for it
        # which causes the metrics to be delayed by one collection pass (not the best, but probably good enough)
        #
        # this issue can be prevented by calling the `collect_kresd_stats()` function manually before entering
        # the Prometheus library. We just have to prevent the library from invoking it again. See the mentioned
        # function for details

        if compat.asyncio.is_event_loop_running():
            # when running, we can schedule the new data collection
            if self._collection_task is not None and not self._collection_task.done():
                logger.warning("Statistics collection task is still running. Skipping scheduling of a new one!")
            else:
                self._collection_task = compat.asyncio.create_task(
                    self.collect_kresd_stats(_triggered_from_prometheus_library=True)
                )

        else:
            # when not running, we can start a new loop (we are not in the manager's main thread)
            compat.asyncio.run(self.collect_kresd_stats(_triggered_from_prometheus_library=True))


_resolver_collector: Optional[ResolverCollector] = None


async def _collect_stats() -> None:
    # manually trigger stat collection so that we do not have to wait for it
    if _resolver_collector is not None:
        await _resolver_collector.collect_kresd_stats()
    else:
        raise RuntimeError("Function invoked before initializing the module!")


async def report_stats(prometheus_format: bool = False) -> Optional[bytes]:
    """
    Collects metrics from everything, returns data string in JSON (default) or Prometheus format.
    """

    # manually trigger stat collection so that we do not have to wait for it
    if _resolver_collector is not None:
        await _resolver_collector.collect_kresd_stats()
    else:
        raise RuntimeError("Function invoked before initializing the module!")

    if prometheus_format:
        if _prometheus_support:
            return exposition.generate_latest()  # type: ignore
        return None
    return _resolver_collector.report_json().encode()


async def init_monitoring(config_store: ConfigStore) -> None:
    """
    Initialize monitoring. Must be called before any other function from this module.
    """
    global _resolver_collector
    _resolver_collector = ResolverCollector(config_store)

    if _prometheus_support:
        # register metrics collector
        REGISTRY.register(_resolver_collector)  # type: ignore

        # register graphite bridge
        await config_store.register_verifier(_deny_turning_off_graphite_bridge)
        await config_store.register_on_change_callback(_configure_graphite_bridge)