summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorLukáš Ondráček <lukas.ondracek@nic.cz>2024-12-04 17:31:36 +0100
committerVladimír Čunát <vladimir.cunat@nic.cz>2024-12-11 14:20:16 +0100
commite9a6537fb605b86478ae45e1a1dce79e0c2e6f84 (patch)
treeed8e1d12770cd8ed646a3c2d81523beedcf1f409 /python
parentdaemon/defer: add missing libm dependency (diff)
parentMerge !1450: manager: subprocess debugging via GDB (diff)
downloadknot-resolver-e9a6537fb605b86478ae45e1a1dce79e0c2e6f84.tar.xz
knot-resolver-e9a6537fb605b86478ae45e1a1dce79e0c2e6f84.zip
Merge remote-tracking branch 'origin/master' into defer-wip
Diffstat (limited to 'python')
-rw-r--r--python/knot_resolver/client/commands/debug.py144
-rw-r--r--python/knot_resolver/client/commands/pids.py63
-rw-r--r--python/knot_resolver/client/main.py17
-rw-r--r--python/knot_resolver/constants.py2
-rw-r--r--python/knot_resolver/controller/interface.py62
-rw-r--r--python/knot_resolver/controller/supervisord/__init__.py8
-rw-r--r--python/knot_resolver/datamodel/rate_limiting_schema.py29
-rw-r--r--python/knot_resolver/datamodel/templates/rate_limiting.lua.j22
-rw-r--r--python/knot_resolver/datamodel/types/__init__.py2
-rw-r--r--python/knot_resolver/datamodel/types/types.py5
-rw-r--r--python/knot_resolver/manager/files/__init__.py3
-rw-r--r--python/knot_resolver/manager/files/watchdog.py133
-rw-r--r--python/knot_resolver/manager/manager.py25
-rw-r--r--python/knot_resolver/manager/server.py61
14 files changed, 484 insertions, 72 deletions
diff --git a/python/knot_resolver/client/commands/debug.py b/python/knot_resolver/client/commands/debug.py
new file mode 100644
index 00000000..5d9a81df
--- /dev/null
+++ b/python/knot_resolver/client/commands/debug.py
@@ -0,0 +1,144 @@
+import argparse
+import json
+import os
+import sys
+from pathlib import Path
+from typing import List, Optional, Tuple, Type
+
+from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
+from knot_resolver.utils import which
+from knot_resolver.utils.requests import request
+
+PROCS_TYPE = List
+
+
+@register_command
+class DebugCommand(Command):
+ def __init__(self, namespace: argparse.Namespace) -> None:
+ self.proc_type: Optional[str] = namespace.proc_type
+ self.sudo: bool = namespace.sudo
+ self.gdb: str = namespace.gdb
+ self.print_only: bool = namespace.print_only
+ self.gdb_args: List[str] = namespace.extra
+ super().__init__(namespace)
+
+ @staticmethod
+ def register_args_subparser(
+ subparser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
+ debug = subparser.add_parser(
+ "debug",
+ help="Run GDB on the manager's subprocesses",
+ )
+ debug.add_argument(
+ "proc_type",
+ help="Optional, the type of process to debug. May be 'kresd' (default), 'gc', or 'all'.",
+ type=str,
+ nargs="?",
+ default="kresd",
+ )
+ debug.add_argument(
+ "--sudo",
+ dest="sudo",
+ help="Run GDB with sudo",
+ action="store_true",
+ default=False,
+ )
+ debug.add_argument(
+ "--gdb",
+ help="Custom GDB executable (may be a command on PATH, or an absolute path)",
+ type=str,
+ default=None,
+ )
+ debug.add_argument(
+ "--print-only",
+ help="Prints the GDB command line into stderr as a Python array, does not execute GDB",
+ action="store_true",
+ default=False,
+ )
+ return debug, DebugCommand
+
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
+ return {}
+
+ def run(self, args: CommandArgs) -> None: # noqa: PLR0912, PLR0915
+ if self.gdb is None:
+ try:
+ gdb_cmd = str(which.which("gdb"))
+ except RuntimeError:
+ print("Could not find 'gdb' in $PATH. Is GDB installed?", file=sys.stderr)
+ sys.exit(1)
+ elif "/" not in self.gdb:
+ try:
+ gdb_cmd = str(which.which(self.gdb))
+ except RuntimeError:
+ print(f"Could not find '{self.gdb}' in $PATH.", file=sys.stderr)
+ sys.exit(1)
+ else:
+ gdb_cmd_path = Path(self.gdb).absolute()
+ if not gdb_cmd_path.exists():
+ print(f"Could not find '{self.gdb}'.", file=sys.stderr)
+ sys.exit(1)
+ gdb_cmd = str(gdb_cmd_path)
+
+ response = request(args.socket, "GET", f"processes/{self.proc_type}")
+ if response.status != 200:
+ print(response, file=sys.stderr)
+ sys.exit(1)
+
+ procs = json.loads(response.body)
+ if not isinstance(procs, PROCS_TYPE):
+ print(
+ f"Unexpected response type '{type(procs).__name__}' from manager. Expected '{PROCS_TYPE.__name__}'",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+ if len(procs) == 0:
+ print(
+ f"There are no processes of type '{self.proc_type}' available to debug",
+ file=sys.stderr,
+ )
+
+ exec_args = []
+
+ # Put `sudo --` at the beginning of the command.
+ if self.sudo:
+ try:
+ sudo_cmd = str(which.which("sudo"))
+ except RuntimeError:
+ print("Could not find 'sudo' in $PATH. Is sudo installed?", file=sys.stderr)
+ sys.exit(1)
+ exec_args.extend([sudo_cmd, "--"])
+
+ # Attach GDB to processes - the processes are attached using the `add-inferior` and `attach` GDB
+ # commands. This way, we can debug multiple processes.
+ exec_args.extend([gdb_cmd, "--"])
+ exec_args.extend(["-init-eval-command", "set detach-on-fork off"])
+ exec_args.extend(["-init-eval-command", "set schedule-multiple on"])
+ exec_args.extend(["-init-eval-command", f'attach {procs[0]["pid"]}'])
+ inferior = 2
+ for proc in procs[1:]:
+ exec_args.extend(["-init-eval-command", "add-inferior"])
+ exec_args.extend(["-init-eval-command", f"inferior {inferior}"])
+ exec_args.extend(["-init-eval-command", f'attach {proc["pid"]}'])
+ inferior += 1
+
+ num_inferiors = inferior - 1
+ if num_inferiors > 1:
+ # Now we switch back to the first process and add additional provided GDB arguments.
+ exec_args.extend(["-init-eval-command", "inferior 1"])
+ exec_args.extend(
+ [
+ "-init-eval-command",
+ "echo \\n\\nYou are now debugging multiple Knot Resolver processes. To switch between "
+ "them, use the 'inferior <n>' command, where <n> is an integer from 1 to "
+ f"{num_inferiors}.\\n\\n",
+ ]
+ )
+ exec_args.extend(self.gdb_args)
+
+ if self.print_only:
+ print(f"{exec_args}")
+ else:
+ os.execl(*exec_args)
diff --git a/python/knot_resolver/client/commands/pids.py b/python/knot_resolver/client/commands/pids.py
new file mode 100644
index 00000000..a1ab5f8c
--- /dev/null
+++ b/python/knot_resolver/client/commands/pids.py
@@ -0,0 +1,63 @@
+import argparse
+import json
+import sys
+from typing import Iterable, List, Optional, Tuple, Type
+
+from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
+from knot_resolver.utils.requests import request
+
+PROCESSES_TYPE = Iterable
+
+
+@register_command
+class PidsCommand(Command):
+ def __init__(self, namespace: argparse.Namespace) -> None:
+ self.proc_type: Optional[str] = namespace.proc_type
+ self.json: int = namespace.json
+
+ super().__init__(namespace)
+
+ @staticmethod
+ def register_args_subparser(
+ subparser: "argparse._SubParsersAction[argparse.ArgumentParser]",
+ ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
+ pids = subparser.add_parser("pids", help="List the PIDs of the Manager's subprocesses")
+ pids.add_argument(
+ "proc_type",
+ help="Optional, the type of process to query. May be 'kresd', 'gc', or 'all' (default).",
+ nargs="?",
+ default="all",
+ )
+ pids.add_argument(
+ "--json",
+ help="Optional, makes the output more verbose, in JSON.",
+ action="store_true",
+ default=False,
+ )
+ return pids, PidsCommand
+
+ @staticmethod
+ def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
+ return {}
+
+ def run(self, args: CommandArgs) -> None:
+ response = request(args.socket, "GET", f"processes/{self.proc_type}")
+
+ if response.status == 200:
+ processes = json.loads(response.body)
+ if isinstance(processes, PROCESSES_TYPE):
+ if self.json:
+ print(json.dumps(processes, indent=2))
+ else:
+ for p in processes:
+ print(p["pid"])
+
+ else:
+ print(
+ f"Unexpected response type '{type(processes).__name__}' from manager. Expected '{PROCESSES_TYPE.__name__}'",
+ file=sys.stderr,
+ )
+ sys.exit(1)
+ else:
+ print(response, file=sys.stderr)
+ sys.exit(1)
diff --git a/python/knot_resolver/client/main.py b/python/knot_resolver/client/main.py
index 75cd6a77..461b7fc4 100644
--- a/python/knot_resolver/client/main.py
+++ b/python/knot_resolver/client/main.py
@@ -1,6 +1,7 @@
import argparse
import importlib
import os
+import sys
from knot_resolver.constants import VERSION
@@ -68,7 +69,21 @@ def main() -> None:
parser = create_main_argument_parser()
install_commands_parsers(parser)
- namespace = parser.parse_args()
+ # TODO: This is broken with unpatched versions of poethepoet, because they drop the `--` pseudo-argument.
+ # Patch submitted at <https://github.com/nat-n/poethepoet/pull/163>.
+ try:
+ pa_index = sys.argv.index("--", 1)
+ argv_to_parse = sys.argv[1:pa_index]
+ argv_extra = sys.argv[(pa_index + 1) :]
+ except ValueError:
+ argv_to_parse = sys.argv[1:]
+ argv_extra = []
+
+ namespace = parser.parse_args(argv_to_parse)
+ if hasattr(namespace, "extra"):
+ raise TypeError("'extra' is already an attribute - this is disallowed for commands")
+ namespace.extra = argv_extra
+
client = KresClient(namespace, parser)
client.execute()
diff --git a/python/knot_resolver/constants.py b/python/knot_resolver/constants.py
index 2acb8660..f37bb2af 100644
--- a/python/knot_resolver/constants.py
+++ b/python/knot_resolver/constants.py
@@ -1,6 +1,6 @@
from pathlib import Path
-VERSION = "6.0.8"
+VERSION = "6.0.9"
USER = "knot-resolver"
GROUP = "knot-resolver"
diff --git a/python/knot_resolver/controller/interface.py b/python/knot_resolver/controller/interface.py
index 0544dac2..49808d01 100644
--- a/python/knot_resolver/controller/interface.py
+++ b/python/knot_resolver/controller/interface.py
@@ -14,7 +14,6 @@ from knot_resolver.controller.exceptions import SubprocessControllerError
from knot_resolver.controller.registered_workers import register_worker, unregister_worker
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.manager.constants import kresd_config_file, policy_loader_config_file
-from knot_resolver.utils.async_utils import writefile
logger = logging.getLogger(__name__)
@@ -110,20 +109,35 @@ class Subprocess(ABC):
self._id = kresid
self._config = config
self._registered_worker: bool = False
+ self._pid: Optional[int] = None
+
+ self._config_file: Optional[Path] = None
+ if self.type is SubprocessType.KRESD:
+ self._config_file = kresd_config_file(self._config, self.id)
+ elif self.type is SubprocessType.POLICY_LOADER:
+ self._config_file = policy_loader_config_file(self._config)
+
+ def _render_lua(self) -> Optional[str]:
+ if self.type is SubprocessType.KRESD:
+ return self._config.render_lua()
+ if self.type is SubprocessType.POLICY_LOADER:
+ return self._config.render_lua_policy()
+ return None
+
+ def _write_config(self) -> None:
+ config_lua = self._render_lua()
+ if config_lua and self._config_file:
+ with open(self._config_file, "w", encoding="utf8") as file:
+ file.write(config_lua)
+
+ def _unlink_config(self) -> None:
+ if self._config_file:
+ self._config_file.unlink(missing_ok=True)
async def start(self, new_config: Optional[KresConfig] = None) -> None:
if new_config:
self._config = new_config
-
- config_file: Optional[Path] = None
- if self.type is SubprocessType.KRESD:
- config_lua = self._config.render_lua()
- config_file = kresd_config_file(self._config, self.id)
- await writefile(config_file, config_lua)
- elif self.type is SubprocessType.POLICY_LOADER:
- config_lua = self._config.render_lua_policy()
- config_file = policy_loader_config_file(self._config)
- await writefile(config_file, config_lua)
+ self._write_config()
try:
await self._start()
@@ -131,8 +145,7 @@ class Subprocess(ABC):
register_worker(self)
self._registered_worker = True
except SubprocessControllerError as e:
- if config_file:
- config_file.unlink()
+ self._unlink_config()
raise e
async def apply_new_config(self, new_config: KresConfig) -> None:
@@ -140,16 +153,7 @@ class Subprocess(ABC):
# update config file
logger.debug(f"Writing config file for {self.id}")
-
- config_file: Optional[Path] = None
- if self.type is SubprocessType.KRESD:
- config_lua = self._config.render_lua()
- config_file = kresd_config_file(self._config, self.id)
- await writefile(config_file, config_lua)
- elif self.type is SubprocessType.POLICY_LOADER:
- config_lua = self._config.render_lua_policy()
- config_file = policy_loader_config_file(self._config)
- await writefile(config_file, config_lua)
+ self._write_config()
# update runtime status
logger.debug(f"Restarting {self.id}")
@@ -166,13 +170,7 @@ class Subprocess(ABC):
Remove temporary files and all traces of this instance running. It is NOT SAFE to call this while
the kresd is running, because it will break automatic restarts (at the very least).
"""
-
- if self.type is SubprocessType.KRESD:
- config_file = kresd_config_file(self._config, self.id)
- config_file.unlink()
- elif self.type is SubprocessType.POLICY_LOADER:
- config_file = policy_loader_config_file(self._config)
- config_file.unlink()
+ self._unlink_config()
def __eq__(self, o: object) -> bool:
return isinstance(o, type(self)) and o.type == self.type and o.id == self.id
@@ -193,6 +191,10 @@ class Subprocess(ABC):
pass
@abstractmethod
+ async def get_pid(self) -> int:
+ pass
+
+ @abstractmethod
def status(self) -> SubprocessStatus:
pass
diff --git a/python/knot_resolver/controller/supervisord/__init__.py b/python/knot_resolver/controller/supervisord/__init__.py
index 347ac1e7..ddb9b29b 100644
--- a/python/knot_resolver/controller/supervisord/__init__.py
+++ b/python/knot_resolver/controller/supervisord/__init__.py
@@ -223,6 +223,14 @@ class SupervisordSubprocess(Subprocess):
fast = _create_fast_proxy(self._config)
fast.startProcess(self.name)
+ @async_in_a_thread
+ def get_pid(self) -> int:
+ if self._pid is None:
+ supervisord = _create_supervisord_proxy(self._config)
+ info = supervisord.getProcessInfo(self.name)
+ self._pid = info["pid"]
+ return self._pid
+
def get_used_config(self) -> KresConfig:
return self._config
diff --git a/python/knot_resolver/datamodel/rate_limiting_schema.py b/python/knot_resolver/datamodel/rate_limiting_schema.py
index d93272da..60994c20 100644
--- a/python/knot_resolver/datamodel/rate_limiting_schema.py
+++ b/python/knot_resolver/datamodel/rate_limiting_schema.py
@@ -1,3 +1,8 @@
+from knot_resolver.datamodel.types import (
+ Int0_32,
+ IntPositive,
+ TimeUnit,
+)
from knot_resolver.utils.modeling import ConfigSchema
@@ -10,26 +15,20 @@ class RateLimitingSchema(ConfigSchema):
rate_limit: Maximal number of allowed queries per second from a single host.
instant_limit: Maximal number of allowed queries at a single point in time from a single host.
slip: Number of restricted responses out of which one is sent as truncated, the others are dropped.
- log_period: Minimal time in msec between two log messages, or zero to disable.
+ log_period: Minimal time between two log messages, or '0s' to disable.
dry_run: Perform only classification and logging but no restrictions.
"""
- capacity: int = 524288
- rate_limit: int
- instant_limit: int = 50
- slip: int = 2
- log_period: int = 0
+ capacity: IntPositive = IntPositive(524288)
+ rate_limit: IntPositive
+ instant_limit: IntPositive = IntPositive(50)
+ slip: Int0_32 = Int0_32(2)
+ log_period: TimeUnit = TimeUnit("0s")
dry_run: bool = False
def _validate(self) -> None:
- max_instant_limit = int(2**32 / 768 - 1)
- if not 1 <= self.instant_limit <= max_instant_limit:
+ max_instant_limit = int(2 ** 32 // 768 - 1)
+ if not int(self.instant_limit) <= max_instant_limit:
raise ValueError(f"'instant-limit' has to be in range 1..{max_instant_limit}")
- if not 1 <= self.rate_limit <= 1000 * self.instant_limit:
+ if not int(self.rate_limit) <= 1000 * int(self.instant_limit):
raise ValueError("'rate-limit' has to be in range 1..(1000 * instant-limit)")
- if not 0 < self.capacity:
- raise ValueError("'capacity' has to be positive")
- if not 0 <= self.slip <= 100:
- raise ValueError("'slip' has to be in range 0..100")
- if not 0 <= self.log_period:
- raise ValueError("'log-period' has to be non-negative")
diff --git a/python/knot_resolver/datamodel/templates/rate_limiting.lua.j2 b/python/knot_resolver/datamodel/templates/rate_limiting.lua.j2
index 4f9547f5..63f92125 100644
--- a/python/knot_resolver/datamodel/templates/rate_limiting.lua.j2
+++ b/python/knot_resolver/datamodel/templates/rate_limiting.lua.j2
@@ -7,6 +7,6 @@ assert(C.ratelimiting_init(
{{ cfg.rate_limiting.instant_limit }},
{{ cfg.rate_limiting.rate_limit }},
{{ cfg.rate_limiting.slip }},
- {{ cfg.rate_limiting.log_period }},
+ {{ cfg.rate_limiting.log_period.millis() }},
{{ boolean(cfg.rate_limiting.dry_run) }}) == 0)
{%- endif %}
diff --git a/python/knot_resolver/datamodel/types/__init__.py b/python/knot_resolver/datamodel/types/__init__.py
index a3d7db3e..d1334b5a 100644
--- a/python/knot_resolver/datamodel/types/__init__.py
+++ b/python/knot_resolver/datamodel/types/__init__.py
@@ -6,6 +6,7 @@ from .types import (
EscapedStr,
EscapedStr32B,
IDPattern,
+ Int0_32,
Int0_512,
Int0_65535,
InterfaceName,
@@ -37,6 +38,7 @@ __all__ = [
"EscapedStr",
"EscapedStr32B",
"IDPattern",
+ "Int0_32",
"Int0_512",
"Int0_65535",
"InterfaceName",
diff --git a/python/knot_resolver/datamodel/types/types.py b/python/knot_resolver/datamodel/types/types.py
index 6cd1e4cb..3c9b9fe1 100644
--- a/python/knot_resolver/datamodel/types/types.py
+++ b/python/knot_resolver/datamodel/types/types.py
@@ -14,6 +14,11 @@ class IntPositive(IntRangeBase):
_min: int = 1
+class Int0_32(IntRangeBase): # noqa: N801
+ _min: int = 0
+ _max: int = 32
+
+
class Int0_512(IntRangeBase): # noqa: N801
_min: int = 0
_max: int = 512
diff --git a/python/knot_resolver/manager/files/__init__.py b/python/knot_resolver/manager/files/__init__.py
new file mode 100644
index 00000000..49700656
--- /dev/null
+++ b/python/knot_resolver/manager/files/__init__.py
@@ -0,0 +1,3 @@
+from .watchdog import init_files_watchdog
+
+__all__ = ["init_files_watchdog"]
diff --git a/python/knot_resolver/manager/files/watchdog.py b/python/knot_resolver/manager/files/watchdog.py
new file mode 100644
index 00000000..64547192
--- /dev/null
+++ b/python/knot_resolver/manager/files/watchdog.py
@@ -0,0 +1,133 @@
+import importlib
+import logging
+from pathlib import Path
+from threading import Timer
+from typing import List, Optional
+
+from knot_resolver.controller.registered_workers import command_registered_workers
+from knot_resolver.datamodel import KresConfig
+from knot_resolver.datamodel.types import File
+from knot_resolver.manager.config_store import ConfigStore, only_on_real_changes_update
+from knot_resolver.utils import compat
+
+_watchdog = False
+if importlib.util.find_spec("watchdog"):
+ _watchdog = True
+
+logger = logging.getLogger(__name__)
+
+
+def tls_cert_paths(config: KresConfig) -> List[str]:
+ files: List[Optional[File]] = [
+ config.network.tls.cert_file,
+ config.network.tls.key_file,
+ ]
+ return [str(file) for file in files if file is not None]
+
+
+if _watchdog:
+ from watchdog.events import (
+ FileSystemEvent,
+ FileSystemEventHandler,
+ )
+ from watchdog.observers import Observer
+
+ _tls_cert_watchdog: Optional["TLSCertWatchDog"] = None
+
+ class TLSCertEventHandler(FileSystemEventHandler):
+ def __init__(self, files: List[Path], cmd: str) -> None:
+ self._files = files
+ self._cmd = cmd
+ self._timer: Optional[Timer] = None
+
+ def _reload(self) -> None:
+ def command() -> None:
+ if compat.asyncio.is_event_loop_running():
+ compat.asyncio.create_task(command_registered_workers(self._cmd))
+ else:
+ compat.asyncio.run(command_registered_workers(self._cmd))
+ logger.info("Reloading of TLS certificate files has finished")
+
+ # skipping if reload was already triggered
+ if self._timer and self._timer.is_alive():
+ logger.info("Skipping TLS certificate files reloading, reload command was already triggered")
+ return
+ # start a 5sec timer
+ logger.info("Delayed reload of TLS certificate files has started")
+ self._timer = Timer(5, command)
+ self._timer.start()
+
+ def on_created(self, event: FileSystemEvent) -> None:
+ src_path = Path(str(event.src_path))
+ if src_path in self._files:
+ logger.info(f"Watched file '{src_path}' has been created")
+ self._reload()
+
+ def on_deleted(self, event: FileSystemEvent) -> None:
+ src_path = Path(str(event.src_path))
+ if src_path in self._files:
+ logger.warning(f"Watched file '{src_path}' has been deleted")
+ if self._timer:
+ self._timer.cancel()
+ for file in self._files:
+ if file.parent == src_path:
+ logger.warning(f"Watched directory '{src_path}' has been deleted")
+ if self._timer:
+ self._timer.cancel()
+
+ def on_modified(self, event: FileSystemEvent) -> None:
+ src_path = Path(str(event.src_path))
+ if src_path in self._files:
+ logger.info(f"Watched file '{src_path}' has been modified")
+ self._reload()
+
+ class TLSCertWatchDog:
+ def __init__(self, cert_file: Path, key_file: Path) -> None:
+ self._observer = Observer()
+
+ cmd = f"net.tls('{cert_file}', '{key_file}')"
+
+ cert_files: List[Path] = []
+ cert_files.append(cert_file)
+ cert_files.append(key_file)
+
+ cert_dirs: List[Path] = []
+ cert_dirs.append(cert_file.parent)
+ if cert_file.parent != key_file.parent:
+ cert_dirs.append(key_file.parent)
+
+ event_handler = TLSCertEventHandler(cert_files, cmd)
+ for d in cert_dirs:
+ self._observer.schedule(
+ event_handler,
+ str(d),
+ recursive=False,
+ )
+ logger.info(f"Directory '{d}' scheduled for watching")
+
+ def start(self) -> None:
+ self._observer.start()
+
+ def stop(self) -> None:
+ self._observer.stop()
+ self._observer.join()
+
+ @only_on_real_changes_update(tls_cert_paths)
+ async def _init_tls_cert_watchdog(config: KresConfig) -> None:
+ global _tls_cert_watchdog
+ if _tls_cert_watchdog:
+ _tls_cert_watchdog.stop()
+
+ if config.network.tls.cert_file and config.network.tls.key_file:
+ logger.info("Initializing TLS certificate files WatchDog")
+ _tls_cert_watchdog = TLSCertWatchDog(
+ config.network.tls.cert_file.to_path(),
+ config.network.tls.key_file.to_path(),
+ )
+ _tls_cert_watchdog.start()
+
+
+async def init_files_watchdog(config_store: ConfigStore) -> None:
+ if _watchdog:
+ # watchdog for TLS certificate files
+ await config_store.register_on_change_callback(_init_tls_cert_watchdog)
diff --git a/python/knot_resolver/manager/manager.py b/python/knot_resolver/manager/manager.py
index 6ce32fcc..df57bd63 100644
--- a/python/knot_resolver/manager/manager.py
+++ b/python/knot_resolver/manager/manager.py
@@ -55,6 +55,14 @@ async def _deny_max_worker_changes(config_old: KresConfig, config_new: KresConfi
return Result.ok(None)
+async def _subprocess_desc(subprocess: Subprocess) -> object:
+ return {
+ "type": subprocess.type.name,
+ "pid": await subprocess.get_pid(),
+ "status": subprocess.status().name,
+ }
+
+
class KresManager: # pylint: disable=too-many-instance-attributes
"""
Core of the whole operation. Orchestrates individual instances under some
@@ -63,7 +71,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes
Instantiate with `KresManager.create()`, not with the usual constructor!
"""
- def __init__(self, shutdown_trigger: Callable[[int], None], _i_know_what_i_am_doing: bool = False):
+ def __init__(self, _i_know_what_i_am_doing: bool = False):
if not _i_know_what_i_am_doing:
logger.error(
"Trying to create an instance of KresManager using normal constructor. Please use "
@@ -80,19 +88,18 @@ class KresManager: # pylint: disable=too-many-instance-attributes
self._watchdog_task: Optional["asyncio.Task[None]"] = None
self._fix_counter: _FixCounter = _FixCounter()
self._config_store: ConfigStore
- self._shutdown_trigger: Callable[[int], None] = shutdown_trigger
+ self._shutdown_triggers: List[Callable[[int], None]] = []
@staticmethod
async def create(
subprocess_controller: SubprocessController,
config_store: ConfigStore,
- shutdown_trigger: Callable[[int], None],
) -> "KresManager":
"""
Creates new instance of KresManager.
"""
- inst = KresManager(shutdown_trigger, _i_know_what_i_am_doing=True)
+ inst = KresManager(_i_know_what_i_am_doing=True)
await inst._async_init(subprocess_controller, config_store) # noqa: SLF001
return inst
@@ -211,6 +218,9 @@ class KresManager: # pylint: disable=too-many-instance-attributes
await self._gc.stop()
self._gc = None
+ def add_shutdown_trigger(self, trigger: Callable[[int], None]) -> None:
+ self._shutdown_triggers.append(trigger)
+
async def validate_config(self, _old: KresConfig, new: KresConfig) -> Result[NoneType, str]:
async with self._manager_lock:
if _old.rate_limiting != new.rate_limiting:
@@ -239,6 +249,10 @@ class KresManager: # pylint: disable=too-many-instance-attributes
logger.debug("Canary process test passed.")
return Result.ok(None)
+ async def get_processes(self, proc_type: Optional[SubprocessType]) -> List[object]:
+ processes = await self._controller.get_all_running_instances()
+ return [await _subprocess_desc(pr) for pr in processes if proc_type is None or pr.type == proc_type]
+
async def _reload_system_state(self) -> None:
async with self._manager_lock:
self._workers = []
@@ -344,7 +358,8 @@ class KresManager: # pylint: disable=too-many-instance-attributes
logger.warning("Collecting all remaining workers...")
await self._reload_system_state()
logger.warning("Terminating...")
- self._shutdown_trigger(1)
+ for trigger in self._shutdown_triggers:
+ trigger(1)
async def _instability_handler(self) -> None:
if self._fix_counter.is_too_high():
diff --git a/python/knot_resolver/manager/server.py b/python/knot_resolver/manager/server.py
index 90fd4d3b..06fab0cf 100644
--- a/python/knot_resolver/manager/server.py
+++ b/python/knot_resolver/manager/server.py
@@ -21,13 +21,14 @@ from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
from knot_resolver.constants import CONFIG_FILE, USER
from knot_resolver.controller import get_best_controller_implementation
from knot_resolver.controller.exceptions import SubprocessControllerExecError
+from knot_resolver.controller.interface import SubprocessType
from knot_resolver.controller.registered_workers import command_single_registered_worker
from knot_resolver.datamodel import kres_config_json_schema
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without_validation
from knot_resolver.datamodel.globals import Context, set_global_validation_context
from knot_resolver.datamodel.management_schema import ManagementSchema
-from knot_resolver.manager import metrics
+from knot_resolver.manager import files, metrics
from knot_resolver.utils import custom_atexit as atexit
from knot_resolver.utils import ignore_exceptions_optional
from knot_resolver.utils.async_utils import readfile
@@ -60,8 +61,8 @@ async def error_handler(request: web.Request, handler: Any) -> web.Response:
try:
return await handler(request)
- except DataValidationError as e:
- return web.Response(text=f"validation of configuration failed:\n{e}", status=HTTPStatus.BAD_REQUEST)
+ except (AggregateDataValidationError, DataValidationError) as e:
+ return web.Response(text=str(e), status=HTTPStatus.BAD_REQUEST)
except DataParsingError as e:
return web.Response(text=f"request processing error:\n{e}", status=HTTPStatus.BAD_REQUEST)
except KresManagerException as e:
@@ -87,7 +88,7 @@ class Server:
# This is top-level class containing pretty much everything. Instead of global
# variables, we use instance attributes. That's why there are so many and it's
# ok.
- def __init__(self, store: ConfigStore, config_path: Optional[Path]):
+ def __init__(self, store: ConfigStore, config_path: Optional[Path], manager: KresManager):
# config store & server dynamic reconfiguration
self.config_store = store
@@ -100,6 +101,7 @@ class Server:
self._config_path: Optional[Path] = config_path
self._exit_code: int = 0
self._shutdown_event = asyncio.Event()
+ self._manager = manager
async def _reconfigure(self, config: KresConfig) -> None:
await self._reconfigure_listen_address(config)
@@ -262,16 +264,7 @@ class Server:
async def _handler_cache_clear(self, request: web.Request) -> web.Response:
data = parse_from_mime_type(await request.text(), request.content_type)
-
- try:
- config = CacheClearRPCSchema(data)
- except (AggregateDataValidationError, DataValidationError) as e:
- return web.Response(
- body=e,
- status=HTTPStatus.BAD_REQUEST,
- content_type="text/plain",
- charset="utf8",
- )
+ config = CacheClearRPCSchema(data)
_, result = await command_single_registered_worker(config.render_lua())
return web.Response(
@@ -332,6 +325,30 @@ class Server:
await self._reload_config()
return web.Response(text="Reloading...")
+ async def _handler_processes(self, request: web.Request) -> web.Response:
+ """
+ Route handler for listing PIDs of subprocesses
+ """
+
+ proc_type: Optional[SubprocessType] = None
+
+ if "path" in request.match_info and len(request.match_info["path"]) > 0:
+ ptstr = request.match_info["path"]
+ if ptstr == "/kresd":
+ proc_type = SubprocessType.KRESD
+ elif ptstr == "/gc":
+ proc_type = SubprocessType.GC
+ elif ptstr == "/all":
+ proc_type = None
+ else:
+ return web.Response(text=f"Invalid process type '{ptstr}'", status=400)
+
+ return web.json_response(
+ await self._manager.get_processes(proc_type),
+ headers={"Access-Control-Allow-Origin": "*"},
+ dumps=partial(json.dumps, indent=4),
+ )
+
def _setup_routes(self) -> None:
self.app.add_routes(
[
@@ -348,6 +365,7 @@ class Server:
web.get("/metrics/json", self._handler_metrics_json),
web.get("/metrics/prometheus", self._handler_metrics_prometheus),
web.post("/cache/clear", self._handler_cache_clear),
+ web.get("/processes{path:.*}", self._handler_processes),
]
)
@@ -419,7 +437,7 @@ async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
return ConfigStore(config_validated)
-async def _init_manager(config_store: ConfigStore, server: Server) -> KresManager:
+async def _init_manager(config_store: ConfigStore) -> KresManager:
"""
Called asynchronously when the application initializes.
"""
@@ -429,7 +447,7 @@ async def _init_manager(config_store: ConfigStore, server: Server) -> KresManage
# Create KresManager. This will perform autodetection of available service managers and
# select the most appropriate to use (or use the one configured directly)
- manager = await KresManager.create(controller, config_store, server.trigger_shutdown)
+ manager = await KresManager.create(controller, config_store)
logger.info("Initial configuration applied. Process manager initialized...")
return manager
@@ -566,11 +584,16 @@ async def start_server(config: Path = CONFIG_FILE) -> int: # noqa: PLR0915
# started, therefore before initializing manager
await metrics.init_prometheus(config_store)
+ await files.init_files_watchdog(config_store)
+
+ # After we have loaded the configuration, we can start worrying about subprocess management.
+ manager = await _init_manager(config_store)
+
# prepare instance of the server (no side effects)
- server = Server(config_store, config)
+ server = Server(config_store, config, manager)
- # After we have loaded the configuration, we can start worring about subprocess management.
- manager = await _init_manager(config_store, server)
+ # add Server's shutdown trigger to the manager
+ manager.add_shutdown_trigger(server.trigger_shutdown)
except SubprocessControllerExecError as e:
# if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would