path: root/python/knot_resolver/manager/
diff options
Diffstat (limited to 'python/knot_resolver/manager/')
1 files changed, 639 insertions, 0 deletions
diff --git a/python/knot_resolver/manager/ b/python/knot_resolver/manager/
new file mode 100644
index 00000000..972b167f
--- /dev/null
+++ b/python/knot_resolver/manager/
@@ -0,0 +1,639 @@
+import asyncio
+import errno
+import json
+import logging
+import os
+import signal
+import sys
+from functools import partial
+from http import HTTPStatus
+from pathlib import Path
+from time import time
+from typing import Any, Dict, List, Literal, Optional, Set, Union, cast
+from aiohttp import web
+from aiohttp.web import middleware
+from aiohttp.web_app import Application
+from aiohttp.web_response import json_response
+from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
+from knot_resolver.constants import CONFIG_FILE
+from knot_resolver.controller import get_best_controller_implementation
+from knot_resolver.controller.exceptions import SubprocessControllerExecException
+from knot_resolver.controller.registered_workers import command_single_registered_worker
+from knot_resolver.datamodel import kres_config_json_schema
+from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
+from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without_validation
+from knot_resolver.datamodel.globals import Context, set_global_validation_context
+from knot_resolver.datamodel.management_schema import ManagementSchema
+from knot_resolver.manager import metrics
+from knot_resolver.utils import custom_atexit as atexit
+from knot_resolver.utils import ignore_exceptions_optional
+from knot_resolver.utils.async_utils import readfile
+from knot_resolver.utils.compat import asyncio as asyncio_compat
+from knot_resolver.utils.etag import structural_etag
+from knot_resolver.utils.functional import Result
+from knot_resolver.utils.modeling.exceptions import AggregateDataValidationError, DataParsingError, DataValidationError
+from knot_resolver.utils.modeling.parsing import DataFormat, try_to_parse
+from knot_resolver.utils.modeling.query import query
+from knot_resolver.utils.modeling.types import NoneType
+from knot_resolver.utils.systemd_notify import systemd_notify
+from .config_store import ConfigStore
+from .constants import PID_FILE_NAME, init_user_constants
+from .exceptions import KresManagerException
+from .logging import logger_init
+from .manager import KresManager
+logger = logging.getLogger(__name__)
+async def error_handler(request: web.Request, handler: Any) -> web.Response:
+ """
+ Generic error handler for route handlers.
+ If an exception is thrown during request processing, this middleware catches it
+ and responds accordingly.
+ """
+ try:
+ return await handler(request)
+ except DataValidationError as e:
+ return web.Response(text=f"validation of configuration failed:\n{e}", status=HTTPStatus.BAD_REQUEST)
+ except DataParsingError as e:
+ return web.Response(text=f"request processing error:\n{e}", status=HTTPStatus.BAD_REQUEST)
+ except KresManagerException as e:
+ return web.Response(text=f"request processing failed:\n{e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
+def from_mime_type(mime_type: str) -> DataFormat:
+ formats = {
+ "application/json": DataFormat.JSON,
+ "application/octet-stream": DataFormat.JSON, # default in aiohttp
+ }
+ if mime_type not in formats:
+ raise DataParsingError(f"unsupported MIME type '{mime_type}', expected: {str(formats)[1:-1]}")
+ return formats[mime_type]
+def parse_from_mime_type(data: str, mime_type: str) -> Any:
+ return from_mime_type(mime_type).parse_to_dict(data)
+class Server:
+ # pylint: disable=too-many-instance-attributes
+ # This is top-level class containing pretty much everything. Instead of global
+ # variables, we use instance attributes. That's why there are so many and it's
+ # ok.
+ def __init__(self, store: ConfigStore, config_path: Optional[Path]):
+ # config store & server dynamic reconfiguration
+ self.config_store = store
+ # HTTP server
+ = Application(middlewares=[error_handler])
+ self.runner = AppRunner(
+ self.listen: Optional[ManagementSchema] = None
+ Union[NoneType, TCPSite, UnixSite] = None
+ self.listen_lock = asyncio.Lock()
+ self._config_path: Optional[Path] = config_path
+ self._exit_code: int = 0
+ self._shutdown_event = asyncio.Event()
+ async def _reconfigure(self, config: KresConfig) -> None:
+ await self._reconfigure_listen_address(config)
+ async def _deny_management_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+ if !=
+ return Result.err(
+ "/server/management: Changing management API address/uTruenix-socket dynamically is not allowed as it's really dangerous."
+ " If you really need this feature, please contact the developers and explain why. Technically,"
+ " there are no problems in supporting it. We are only blocking the dynamic changes because"
+ " we think the consequences of leaving this footgun unprotected are worse than its usefulness."
+ )
+ return Result.ok(None)
+ async def _reload_config(self) -> None:
+ if self._config_path is None:
+ logger.warning("The manager was started with inlined configuration - can't reload")
+ else:
+ try:
+ data = await readfile(self._config_path)
+ config = KresConfig(try_to_parse(data))
+ await self.config_store.update(config)
+"Configuration file successfully reloaded")
+ except FileNotFoundError:
+ logger.error(
+ f"Configuration file was not found at '{self._config_path}'."
+ " Something must have happened to it while we were running."
+ )
+ logger.error("Configuration have NOT been changed.")
+ except (DataParsingError, DataValidationError) as e:
+ logger.error(f"Failed to parse the updated configuration file: {e}")
+ logger.error("Configuration have NOT been changed.")
+ except KresManagerException as e:
+ logger.error(f"Reloading of the configuration file failed: {e}")
+ logger.error("Configuration have NOT been changed.")
+ async def sigint_handler(self) -> None:
+"Received SIGINT, triggering graceful shutdown")
+ self.trigger_shutdown(0)
+ async def sigterm_handler(self) -> None:
+"Received SIGTERM, triggering graceful shutdown")
+ self.trigger_shutdown(0)
+ async def sighup_handler(self) -> None:
+"Received SIGHUP, reloading configuration file")
+ systemd_notify(RELOADING="1")
+ await self._reload_config()
+ systemd_notify(READY="1")
+ @staticmethod
+ def all_handled_signals() -> Set[signal.Signals]:
+ return {signal.SIGHUP, signal.SIGINT, signal.SIGTERM}
+ def bind_signal_handlers(self):
+ asyncio_compat.add_async_signal_handler(signal.SIGTERM, self.sigterm_handler)
+ asyncio_compat.add_async_signal_handler(signal.SIGINT, self.sigint_handler)
+ asyncio_compat.add_async_signal_handler(signal.SIGHUP, self.sighup_handler)
+ def unbind_signal_handlers(self):
+ asyncio_compat.remove_signal_handler(signal.SIGTERM)
+ asyncio_compat.remove_signal_handler(signal.SIGINT)
+ asyncio_compat.remove_signal_handler(signal.SIGHUP)
+ async def start(self) -> None:
+ self._setup_routes()
+ await self.runner.setup()
+ await self.config_store.register_verifier(self._deny_management_changes)
+ await self.config_store.register_on_change_callback(self._reconfigure)
+ async def wait_for_shutdown(self) -> None:
+ await self._shutdown_event.wait()
+ def trigger_shutdown(self, exit_code: int) -> None:
+ self._shutdown_event.set()
+ self._exit_code = exit_code
+ async def _handler_index(self, _request: web.Request) -> web.Response:
+ """
+ Dummy index handler to indicate that the server is indeed running...
+ """
+ return json_response(
+ {
+ "msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
+ "status": "RUNNING",
+ }
+ )
+ async def _handler_config_query(self, request: web.Request) -> web.Response:
+ """
+ Route handler for changing resolver configuration
+ """
+ # There are a lot of local variables in here, but they are usually immutable (almost SSA form :) )
+ # pylint: disable=too-many-locals
+ # parse the incoming data
+ if request.method == "GET":
+ update_with: Optional[Dict[str, Any]] = None
+ else:
+ update_with = parse_from_mime_type(await request.text(), request.content_type)
+ document_path = request.match_info["path"]
+ getheaders = ignore_exceptions_optional(List[str], None, KeyError)(request.headers.getall)
+ etags = getheaders("if-match")
+ not_etags = getheaders("if-none-match")
+ current_config: Dict[str, Any] = self.config_store.get().get_unparsed_data()
+ # stop processing if etags
+ def strip_quotes(s: str) -> str:
+ return s.strip('"')
+ # WARNING: this check is prone to race conditions. When changing, make sure that the current config
+ # is really the latest current config (i.e. no await in between obtaining the config and the checks)
+ status = HTTPStatus.NOT_MODIFIED if request.method in ("GET", "HEAD") else HTTPStatus.PRECONDITION_FAILED
+ if etags is not None and structural_etag(current_config) not in map(strip_quotes, etags):
+ return web.Response(status=status)
+ if not_etags is not None and structural_etag(current_config) in map(strip_quotes, not_etags):
+ return web.Response(status=status)
+ # run query
+ op = cast(Literal["get", "delete", "patch", "put"], request.method.lower())
+ new_config, to_return = query(current_config, op, document_path, update_with)
+ # update the config
+ if request.method != "GET":
+ # validate
+ config_validated = KresConfig(new_config)
+ # apply
+ await self.config_store.update(config_validated)
+ # serialize the response (the `to_return` object is a Dict/list/scalar, we want to return json)
+ resp_text: Optional[str] = json.dumps(to_return) if to_return is not None else None
+ # create the response and return it
+ res = web.Response(status=HTTPStatus.OK, text=resp_text, content_type="application/json")
+ res.headers.add("ETag", f'"{structural_etag(new_config)}"')
+ return res
+ async def _handler_metrics(self, request: web.Request) -> web.Response:
+ raise web.HTTPMovedPermanently("/metrics/json")
+ async def _handler_metrics_json(self, _request: web.Request) -> web.Response:
+ config = self.config_store.get()
+ return web.Response(
+ body=await metrics.report_json(config),
+ content_type="application/json",
+ charset="utf8",
+ )
+ async def _handler_metrics_prometheus(self, _request: web.Request) -> web.Response:
+ metrics_report = await metrics.report_prometheus()
+ if not metrics_report:
+ raise web.HTTPNotFound()
+ return web.Response(
+ body=metrics_report,
+ content_type="text/plain",
+ charset="utf8",
+ )
+ async def _handler_cache_clear(self, request: web.Request) -> web.Response:
+ data = parse_from_mime_type(await request.text(), request.content_type)
+ try:
+ config = CacheClearRPCSchema(data)
+ except (AggregateDataValidationError, DataValidationError) as e:
+ return web.Response(
+ body=e,
+ status=HTTPStatus.BAD_REQUEST,
+ content_type="text/plain",
+ charset="utf8",
+ )
+ _, result = await command_single_registered_worker(config.render_lua())
+ return web.Response(
+ body=json.dumps(result),
+ content_type="application/json",
+ charset="utf8",
+ )
+ async def _handler_schema(self, _request: web.Request) -> web.Response:
+ return web.json_response(
+ kres_config_json_schema(), headers={"Access-Control-Allow-Origin": "*"}, dumps=partial(json.dumps, indent=4)
+ )
+ async def _handle_view_schema(self, _request: web.Request) -> web.Response:
+ """
+ Provides a UI for visuallising and understanding JSON schema.
+ The feature in the Knot Resolver Manager to render schemas is unwanted, as it's completely
+ out of scope. However, it can be convinient. We therefore rely on a public web-based viewers
+ and provide just a redirect. If this feature ever breaks due to disapearance of the public
+ service, we can fix it. But we are not guaranteeing, that this will always work.
+ """
+ return web.Response(
+ text="""
+ <html>
+ <head><title>Redirect to schema viewer</title></head>
+ <body>
+ <script>
+ // we are using JS in order to use proper host
+ let protocol = window.location.protocol;
+ let host =;
+ let url = encodeURIComponent(`${protocol}//${host}/schema`);
+ window.location.replace(`${url}`);
+ </script>
+ <h1>JavaScript required for a dynamic redirect...</h1>
+ </body>
+ </html>
+ """,
+ content_type="text/html",
+ )
+ async def _handler_stop(self, _request: web.Request) -> web.Response:
+ """
+ Route handler for shutting down the server (and whole manager)
+ """
+ self._shutdown_event.set()
+"Shutdown event triggered...")
+ return web.Response(text="Shutting down...")
+ async def _handler_reload(self, _request: web.Request) -> web.Response:
+ """
+ Route handler for reloading the server
+ """
+"Reloading event triggered...")
+ await self._reload_config()
+ return web.Response(text="Reloading...")
+ def _setup_routes(self) -> None:
+ [
+ web.get("/", self._handler_index),
+ web.get(r"/v1/config{path:.*}", self._handler_config_query),
+ web.put(r"/v1/config{path:.*}", self._handler_config_query),
+ web.delete(r"/v1/config{path:.*}", self._handler_config_query),
+ web.patch(r"/v1/config{path:.*}", self._handler_config_query),
+"/stop", self._handler_stop),
+"/reload", self._handler_reload),
+ web.get("/schema", self._handler_schema),
+ web.get("/schema/ui", self._handle_view_schema),
+ web.get("/metrics", self._handler_metrics),
+ web.get("/metrics/json", self._handler_metrics_json),
+ web.get("/metrics/prometheus", self._handler_metrics_prometheus),
+"/cache/clear", self._handler_cache_clear),
+ ]
+ )
+ async def _reconfigure_listen_address(self, config: KresConfig) -> None:
+ async with self.listen_lock:
+ mgn =
+ # if the listen address did not change, do nothing
+ if self.listen == mgn:
+ return
+ # start the new listen address
+ nsite: Union[web.TCPSite, web.UnixSite]
+ if mgn.unix_socket:
+ nsite = web.UnixSite(self.runner, str(mgn.unix_socket))
+"Starting API HTTP server on http+unix://{mgn.unix_socket}")
+ elif mgn.interface:
+ nsite = web.TCPSite(self.runner, str(mgn.interface.addr), int(mgn.interface.port))
+"Starting API HTTP server on http://{mgn.interface.addr}:{mgn.interface.port}")
+ else:
+ raise KresManagerException("Requested API on unsupported configuration format.")
+ await nsite.start()
+ # stop the old listen
+ assert (self.listen is None) == ( is None)
+ if self.listen is not None and is not None:
+ if self.listen.unix_socket:
+"Stopping API HTTP server on http+unix://{mgn.unix_socket}")
+ elif self.listen.interface:
+ f"Stopping API HTTP server on http://{self.listen.interface.addr}:{self.listen.interface.port}"
+ )
+ await
+ # save new state
+ self.listen = mgn
+ = nsite
+ async def shutdown(self) -> None:
+ if is not None:
+ await
+ await self.runner.cleanup()
+ def get_exit_code(self) -> int:
+ return self._exit_code
+async def _load_raw_config(config: Union[Path, Dict[str, Any]]) -> Dict[str, Any]:
+ # Initial configuration of the manager
+ if isinstance(config, Path):
+ if not config.exists():
+ raise KresManagerException(
+ f"Manager is configured to load config file at {config} on startup, but the file does not exist."
+ )
+ else:
+"Loading configuration from '{config}' file.")
+ config = try_to_parse(await readfile(config))
+ # validate the initial configuration
+ assert isinstance(config, dict)
+ return config
+async def _load_config(config: Dict[str, Any]) -> KresConfig:
+ config_validated = KresConfig(config)
+ return config_validated
+async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
+ config_validated = await _load_config(config)
+ config_store = ConfigStore(config_validated)
+ return config_store
+async def _init_manager(config_store: ConfigStore, server: Server) -> KresManager:
+ """
+ Called asynchronously when the application initializes.
+ """
+ # Instantiate subprocess controller (if we wanted to, we could switch it at this point)
+ controller = await get_best_controller_implementation(config_store.get())
+ # Create KresManager. This will perform autodetection of available service managers and
+ # select the most appropriate to use (or use the one configured directly)
+ manager = await KresManager.create(controller, config_store, server.trigger_shutdown)
+"Initial configuration applied. Process manager initialized...")
+ return manager
+async def _deny_working_directory_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
+ if config_old.rundir != config_new.rundir:
+ return Result.err("Changing manager's `rundir` during runtime is not allowed.")
+ return Result.ok(None)
+def _set_working_directory(config_raw: Dict[str, Any]) -> None:
+ try:
+ rundir = get_rundir_without_validation(config_raw)
+ except ValueError as e:
+ raise DataValidationError(str(e), "/rundir") from e
+ logger.debug(f"Changing working directory to '{rundir.to_path().absolute()}'.")
+ os.chdir(rundir.to_path())
+def _lock_working_directory(attempt: int = 0) -> None:
+ # the following syscall is atomic, it's essentially the same as acquiring a lock
+ try:
+ pidfile_fd =, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
+ except OSError as e:
+ if e.errno == errno.EEXIST and attempt == 0:
+ # the pid file exists, let's check PID
+ with open(PID_FILE_NAME, "r", encoding="utf-8") as f:
+ pid = int(
+ try:
+ os.kill(pid, 0)
+ except OSError as e2:
+ if e2.errno == errno.ESRCH:
+ os.unlink(PID_FILE_NAME)
+ _lock_working_directory(attempt=attempt + 1)
+ return
+ raise KresManagerException(
+ "Another manager is running in the same working directory."
+ f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
+ ) from e
+ else:
+ raise KresManagerException(
+ "Another manager is running in the same working directory."
+ f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
+ ) from e
+ # now we know that we are the only manager running in this directory
+ # write PID to the pidfile and close it afterwards
+ pidfile = os.fdopen(pidfile_fd, "w")
+ pid = os.getpid()
+ pidfile.write(f"{pid}\n")
+ pidfile.close()
+ # make sure that the file is deleted on shutdown
+ atexit.register(lambda: os.unlink(PID_FILE_NAME))
+async def _sigint_while_shutting_down():
+ logger.warning(
+ "Received SIGINT while already shutting down. Ignoring."
+ " If you want to forcefully stop the manager right now, use SIGTERM."
+ )
+async def _sigterm_while_shutting_down():
+ logger.warning("Received SIGTERM. Invoking dirty shutdown!")
+ sys.exit(128 + signal.SIGTERM)
+async def start_server(config: Path = CONFIG_FILE) -> int:
+ # This function is quite long, but it describes how manager runs. So let's silence pylint
+ # pylint: disable=too-many-statements
+ start_time = time()
+ working_directory_on_startup = os.getcwd()
+ manager: Optional[KresManager] = None
+ # Block signals during initialization to force their processing once everything is ready
+ signal.pthread_sigmask(signal.SIG_BLOCK, Server.all_handled_signals())
+ # before starting server, initialize the subprocess controller, config store, etc. Any errors during inicialization
+ # are fatal
+ try:
+ # Make sure that the config path does not change meaning when we change working directory
+ config = config.absolute()
+ # Preprocess config - load from file or in general take it to the last step before validation.
+ config_raw = await _load_raw_config(config)
+ # before processing any configuration, set validation context
+ # - resolve_root = root against which all relative paths will be resolved
+ set_global_validation_context(Context(config.parent, True))
+ # We want to change cwd as soon as possible. Some parts of the codebase are using os.getcwd() to get the
+ # working directory.
+ #
+ # If we fail to read rundir from unparsed config, the first config validation error comes from here
+ _set_working_directory(config_raw)
+ # We don't want more than one manager in a single working directory. So we lock it with a PID file.
+ # Warning - this does not prevent multiple managers with the same naming of kresd service.
+ _lock_working_directory()
+ # set_global_validation_context(Context(config.parent))
+ # After the working directory is set, we can initialize proper config store with a newly parsed configuration.
+ config_store = await _init_config_store(config_raw)
+ # Some "constants" need to be loaded from the initial config, some need to be stored from the initial run conditions
+ await init_user_constants(config_store, working_directory_on_startup)
+ # This behaviour described above with paths means, that we MUST NOT allow `rundir` change after initialization.
+ # It would cause strange problems because every other path configuration depends on it. Therefore, we have to
+ # add a check to the config store, which disallows changes.
+ await config_store.register_verifier(_deny_working_directory_changes)
+ # Up to this point, we have been logging to memory buffer. But now, when we have the configuration loaded, we
+ # can flush the buffer into the proper place
+ await logger_init(config_store)
+ # With configuration on hand, we can initialize monitoring. We want to do this before any subprocesses are
+ # started, therefore before initializing manager
+ await metrics.init_prometheus(config_store)
+ # prepare instance of the server (no side effects)
+ server = Server(config_store, config)
+ # After we have loaded the configuration, we can start worring about subprocess management.
+ manager = await _init_manager(config_store, server)
+ except SubprocessControllerExecException as e:
+ # if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would
+ # be a subprocess manager like supervisord, which wants to make sure the manager runs under supervisord in
+ # the process tree. So now we stop everything, and exec what we are told to. We are assuming, that the thing
+ # we'll exec will invoke us again.
+"Exec requested with arguments: %s", str(e.exec_args))
+ # unblock signals, this could actually terminate us straight away
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
+ # run exit functions
+ atexit.run_callbacks()
+ # and finally exec what we were told to exec
+ os.execl(*e.exec_args)
+ except KresManagerException as e:
+ # We caught an error with a pretty error message. Just print it and exit.
+ logger.error(e)
+ return 1
+ except BaseException:
+ logger.error("Uncaught generic exception during manager inicialization...", exc_info=True)
+ return 1
+ # At this point, all backend functionality-providing components are initialized. It's therefore save to start
+ # the API server.
+ try:
+ await server.start()
+ except OSError as e:
+ if e.errno in (errno.EADDRINUSE, errno.EADDRNOTAVAIL):
+ # fancy error reporting of network binding errors
+ logger.error(str(e))
+ await manager.stop()
+ return 1
+ raise
+ # At this point, pretty much everything is ready to go. We should just make sure the user can shut
+ # the manager down with signals.
+ server.bind_signal_handlers()
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
+"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
+ # notify systemd/anything compatible that we are ready
+ systemd_notify(READY="1")
+ await server.wait_for_shutdown()
+ # notify systemd that we are shutting down
+ systemd_notify(STOPPING="1")
+ # Ok, now we are tearing everything down.
+ # First of all, let's block all unwanted interruptions. We don't want to be reconfiguring kresd's while
+ # shutting down.
+ signal.pthread_sigmask(signal.SIG_BLOCK, Server.all_handled_signals())
+ server.unbind_signal_handlers()
+ # on the other hand, we want to immediatelly stop when the user really wants us to stop
+ asyncio_compat.add_async_signal_handler(signal.SIGTERM, _sigterm_while_shutting_down)
+ asyncio_compat.add_async_signal_handler(signal.SIGINT, _sigint_while_shutting_down)
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGTERM, signal.SIGINT})
+ # After triggering shutdown, we neet to clean everything up
+"Stopping API service...")
+ await server.shutdown()
+"Stopping kresd manager...")
+ await manager.stop()
+"The manager run for {round(time() - start_time)} seconds...")
+ return server.get_exit_code()