summaryrefslogtreecommitdiffstats
path: root/python/knot_resolver
diff options
context:
space:
mode:
authorLukáš Ondráček <lukas.ondracek@nic.cz>2024-10-21 19:48:46 +0200
committerLukáš Ondráček <lukas.ondracek@nic.cz>2024-10-21 19:49:14 +0200
commit308b61863d52a78292c78e9c637483e2d193e9fb (patch)
tree54e1847aacb14c6ef3d0c06502fbe9df52da2214 /python/knot_resolver
parentpytests: change log level from debug to notice (diff)
parentMerge !1621: python: Ruff linter and formatter (diff)
downloadknot-resolver-308b61863d52a78292c78e9c637483e2d193e9fb.tar.xz
knot-resolver-308b61863d52a78292c78e9c637483e2d193e9fb.zip
Merge remote-tracking branch 'master' into rrl-wip
Diffstat (limited to 'python/knot_resolver')
-rw-r--r--python/knot_resolver/client/command.py2
-rw-r--r--python/knot_resolver/client/commands/config.py26
-rw-r--r--python/knot_resolver/controller/__init__.py3
-rw-r--r--python/knot_resolver/controller/exceptions.py4
-rw-r--r--python/knot_resolver/controller/interface.py14
-rw-r--r--python/knot_resolver/controller/registered_workers.py4
-rw-r--r--python/knot_resolver/controller/supervisord/__init__.py22
-rw-r--r--python/knot_resolver/controller/supervisord/config_file.py14
-rw-r--r--python/knot_resolver/controller/supervisord/plugin/fast_rpcinterface.py10
-rw-r--r--python/knot_resolver/controller/supervisord/plugin/manager_integration.py4
-rw-r--r--python/knot_resolver/controller/supervisord/plugin/patch_logger.py4
-rw-r--r--python/knot_resolver/controller/supervisord/plugin/sd_notify.py14
-rw-r--r--python/knot_resolver/datamodel/config_schema.py2
-rw-r--r--python/knot_resolver/datamodel/forward_schema.py2
-rw-r--r--python/knot_resolver/datamodel/globals.py10
-rw-r--r--python/knot_resolver/datamodel/logging_schema.py3
-rw-r--r--python/knot_resolver/datamodel/network_schema.py4
-rw-r--r--python/knot_resolver/datamodel/types/base_types.py7
-rw-r--r--python/knot_resolver/datamodel/types/files.py18
-rw-r--r--python/knot_resolver/datamodel/types/types.py18
-rw-r--r--python/knot_resolver/exceptions.py2
-rw-r--r--python/knot_resolver/manager/logging.py19
-rw-r--r--python/knot_resolver/manager/manager.py22
-rw-r--r--python/knot_resolver/manager/metrics/collect.py3
-rw-r--r--python/knot_resolver/manager/metrics/prometheus.py16
-rw-r--r--python/knot_resolver/manager/server.py45
-rw-r--r--python/knot_resolver/utils/__init__.py3
-rw-r--r--python/knot_resolver/utils/compat/__init__.py4
-rw-r--r--python/knot_resolver/utils/compat/asyncio.py15
-rw-r--r--python/knot_resolver/utils/compat/typing.py8
-rw-r--r--python/knot_resolver/utils/modeling/base_schema.py163
-rw-r--r--python/knot_resolver/utils/modeling/exceptions.py4
-rw-r--r--python/knot_resolver/utils/modeling/json_pointer.py33
-rw-r--r--python/knot_resolver/utils/modeling/parsing.py12
-rw-r--r--python/knot_resolver/utils/modeling/query.py17
-rw-r--r--python/knot_resolver/utils/modeling/renaming.py5
-rw-r--r--python/knot_resolver/utils/modeling/types.py6
37 files changed, 280 insertions, 282 deletions
diff --git a/python/knot_resolver/client/command.py b/python/knot_resolver/client/command.py
index 960ac1f5..76c0f1d0 100644
--- a/python/knot_resolver/client/command.py
+++ b/python/knot_resolver/client/command.py
@@ -48,7 +48,7 @@ def get_socket_from_config(config: Path, optional_file: bool) -> Optional[Socket
f'http+unix://{quote(management["unix-socket"], safe="")}/',
f'Key "/management/unix-socket" in "{config}" file',
)
- elif "interface" in management:
+ if "interface" in management:
ip = IPAddressPort(management["interface"], object_path=f"/{mkey}/interface")
return SocketDesc(
f"http://{ip.addr}:{ip.port}",
diff --git a/python/knot_resolver/client/commands/config.py b/python/knot_resolver/client/commands/config.py
index c3c976e4..52df39c4 100644
--- a/python/knot_resolver/client/commands/config.py
+++ b/python/knot_resolver/client/commands/config.py
@@ -17,7 +17,7 @@ class Operations(Enum):
def operation_to_method(operation: Operations) -> Literal["PUT", "GET", "DELETE"]:
if operation == Operations.SET:
return "PUT"
- elif operation == Operations.DELETE:
+ if operation == Operations.DELETE:
return "DELETE"
return "GET"
@@ -91,10 +91,10 @@ class ConfigCommand(Command):
config_subparsers = config.add_subparsers(help="operation type")
# GET operation
- get = config_subparsers.add_parser("get", help="Get current configuration from the resolver.")
- get.set_defaults(operation=Operations.GET, format=DataFormat.YAML)
+ get_op = config_subparsers.add_parser("get", help="Get current configuration from the resolver.")
+ get_op.set_defaults(operation=Operations.GET, format=DataFormat.YAML)
- get.add_argument(
+ get_op.add_argument(
"-p",
"--path",
help=path_help,
@@ -102,14 +102,14 @@ class ConfigCommand(Command):
type=str,
default="",
)
- get.add_argument(
+ get_op.add_argument(
"file",
help="Optional, path to the file where to save exported configuration data. If not specified, data will be printed.",
type=str,
nargs="?",
)
- get_formats = get.add_mutually_exclusive_group()
+ get_formats = get_op.add_mutually_exclusive_group()
get_formats.add_argument(
"--json",
help="Get configuration data in JSON format.",
@@ -126,10 +126,10 @@ class ConfigCommand(Command):
)
# SET operation
- set = config_subparsers.add_parser("set", help="Set new configuration for the resolver.")
- set.set_defaults(operation=Operations.SET)
+ set_op = config_subparsers.add_parser("set", help="Set new configuration for the resolver.")
+ set_op.set_defaults(operation=Operations.SET)
- set.add_argument(
+ set_op.add_argument(
"-p",
"--path",
help=path_help,
@@ -138,7 +138,7 @@ class ConfigCommand(Command):
default="",
)
- value_or_file = set.add_mutually_exclusive_group()
+ value_or_file = set_op.add_mutually_exclusive_group()
value_or_file.add_argument(
"file",
help="Optional, path to file with new configuraion.",
@@ -153,11 +153,11 @@ class ConfigCommand(Command):
)
# DELETE operation
- delete = config_subparsers.add_parser(
+ delete_op = config_subparsers.add_parser(
"delete", help="Delete given configuration property or list item at the given index."
)
- delete.set_defaults(operation=Operations.DELETE)
- delete.add_argument(
+ delete_op.set_defaults(operation=Operations.DELETE)
+ delete_op.add_argument(
"-p",
"--path",
help=path_help,
diff --git a/python/knot_resolver/controller/__init__.py b/python/knot_resolver/controller/__init__.py
index 5dc2fc55..bddfe9dd 100644
--- a/python/knot_resolver/controller/__init__.py
+++ b/python/knot_resolver/controller/__init__.py
@@ -86,8 +86,7 @@ async def get_controller_by_name(config: KresConfig, name: str) -> SubprocessCon
if await controller.is_controller_available(config):
logger.info("Selected controller '%s'", str(controller))
return controller
- else:
- raise LookupError("The selected subprocess controller is not available for use on this system.")
+ raise LookupError("The selected subprocess controller is not available for use on this system.")
# run the imports on module load
diff --git a/python/knot_resolver/controller/exceptions.py b/python/knot_resolver/controller/exceptions.py
index 149c2989..99fdad3c 100644
--- a/python/knot_resolver/controller/exceptions.py
+++ b/python/knot_resolver/controller/exceptions.py
@@ -3,11 +3,11 @@ from typing import List
from knot_resolver import KresBaseException
-class SubprocessControllerException(KresBaseException):
+class SubprocessControllerError(KresBaseException):
pass
-class SubprocessControllerExecException(Exception):
+class SubprocessControllerExecError(Exception):
"""
Exception that is used to deliberately terminate system startup
and make exec() of something else. This is used by the subprocess controller
diff --git a/python/knot_resolver/controller/interface.py b/python/knot_resolver/controller/interface.py
index 906592cb..0544dac2 100644
--- a/python/knot_resolver/controller/interface.py
+++ b/python/knot_resolver/controller/interface.py
@@ -10,7 +10,7 @@ from pathlib import Path
from typing import Dict, Iterable, Optional, Type, TypeVar
from weakref import WeakValueDictionary
-from knot_resolver.controller.exceptions import SubprocessControllerException
+from knot_resolver.controller.exceptions import SubprocessControllerError
from knot_resolver.controller.registered_workers import register_worker, unregister_worker
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.manager.constants import kresd_config_file, policy_loader_config_file
@@ -47,8 +47,7 @@ class KresID:
# find free ID closest to zero
for i in itertools.count(start=0, step=1):
if i not in cls._used[typ]:
- res = cls.new(typ, i)
- return res
+ return cls.new(typ, i)
raise RuntimeError("Reached an end of an infinite loop. How?")
@@ -59,10 +58,9 @@ class KresID:
# typed based on subclass. I am not even sure that it's different between subclasses,
# it's probably still the same dict. But we don't really care about it
return cls._used[typ][n] # type: ignore
- else:
- val = cls(typ, n, _i_know_what_i_am_doing=True)
- cls._used[typ][n] = val
- return val
+ val = cls(typ, n, _i_know_what_i_am_doing=True)
+ cls._used[typ][n] = val
+ return val
def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False):
if not _i_know_what_i_am_doing:
@@ -132,7 +130,7 @@ class Subprocess(ABC):
if self.type is SubprocessType.KRESD:
register_worker(self)
self._registered_worker = True
- except SubprocessControllerException as e:
+ except SubprocessControllerError as e:
if config_file:
config_file.unlink()
raise e
diff --git a/python/knot_resolver/controller/registered_workers.py b/python/knot_resolver/controller/registered_workers.py
index eed1aded..abc1f1a2 100644
--- a/python/knot_resolver/controller/registered_workers.py
+++ b/python/knot_resolver/controller/registered_workers.py
@@ -2,7 +2,7 @@ import asyncio
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple
-from .exceptions import SubprocessControllerException
+from .exceptions import SubprocessControllerError
if TYPE_CHECKING:
from knot_resolver.controller.interface import KresID, Subprocess
@@ -21,7 +21,7 @@ def get_registered_workers_kresids() -> "List[KresID]":
async def command_single_registered_worker(cmd: str) -> "Tuple[KresID, object]":
for sub in _REGISTERED_WORKERS.values():
return sub.id, await sub.command(cmd)
- raise SubprocessControllerException(
+ raise SubprocessControllerError(
"Unable to execute the command. There is no kresd worker running to execute the command."
"Try start/restart the resolver.",
)
diff --git a/python/knot_resolver/controller/supervisord/__init__.py b/python/knot_resolver/controller/supervisord/__init__.py
index cbc181d2..347ac1e7 100644
--- a/python/knot_resolver/controller/supervisord/__init__.py
+++ b/python/knot_resolver/controller/supervisord/__init__.py
@@ -6,7 +6,7 @@ from xmlrpc.client import Fault, ServerProxy
import supervisor.xmlrpc # type: ignore[import]
-from knot_resolver.controller.exceptions import SubprocessControllerException, SubprocessControllerExecException
+from knot_resolver.controller.exceptions import SubprocessControllerError, SubprocessControllerExecError
from knot_resolver.controller.interface import (
KresID,
Subprocess,
@@ -30,14 +30,14 @@ async def _start_supervisord(config: KresConfig) -> None:
logger.debug("Starting supervisord")
res = await call(["supervisord", "--configuration", str(supervisord_config_file(config).absolute())])
if res != 0:
- raise SubprocessControllerException(f"Supervisord exited with exit code {res}")
+ raise SubprocessControllerError(f"Supervisord exited with exit code {res}")
async def _exec_supervisord(config: KresConfig) -> NoReturn:
logger.debug("Writing supervisord config")
await write_config_file(config)
logger.debug("Execing supervisord")
- raise SubprocessControllerExecException(
+ raise SubprocessControllerExecError(
[
str(which.which("supervisord")),
"supervisord",
@@ -53,7 +53,7 @@ async def _reload_supervisord(config: KresConfig) -> None:
supervisord = _create_supervisord_proxy(config)
supervisord.reloadConfig()
except Fault as e:
- raise SubprocessControllerException("supervisord reload failed") from e
+ raise SubprocessControllerError("supervisord reload failed") from e
@async_in_a_thread
@@ -113,11 +113,10 @@ async def _is_supervisord_running(config: KresConfig) -> bool:
pid = await _get_supervisord_pid(config)
if pid is None:
return False
- elif not _is_process_runinng(pid):
+ if not _is_process_runinng(pid):
supervisord_pid_file(config).unlink()
return False
- else:
- return True
+ return True
def _create_proxy(config: KresConfig) -> ServerProxy:
@@ -164,7 +163,7 @@ def _list_running_subprocesses(config: KresConfig) -> Dict[SupervisordKresID, Su
supervisord = _create_supervisord_proxy(config)
processes: Any = supervisord.getAllProcessInfo()
except Fault as e:
- raise SubprocessControllerException(f"failed to get info from all running processes: {e}") from e
+ raise SubprocessControllerError(f"failed to get info from all running processes: {e}") from e
# there will be a manager process as well, but we don't want to report anything on ourselves
processes = [pr for pr in processes if pr["name"] != "manager"]
@@ -199,7 +198,7 @@ class SupervisordSubprocess(Subprocess):
supervisord = _create_supervisord_proxy(self._config)
status = supervisord.getProcessInfo(self.name)
except Fault as e:
- raise SubprocessControllerException(f"failed to get status from '{self.id}' process: {e}") from e
+ raise SubprocessControllerError(f"failed to get status from '{self.id}' process: {e}") from e
return _convert_subprocess_status(status)
@async_in_a_thread
@@ -210,7 +209,7 @@ class SupervisordSubprocess(Subprocess):
supervisord = _create_fast_proxy(self._config)
supervisord.startProcess(self.name)
except Fault as e:
- raise SubprocessControllerException(f"failed to start '{self.id}'") from e
+ raise SubprocessControllerError(f"failed to start '{self.id}'") from e
@async_in_a_thread
def _stop(self) -> None:
@@ -253,8 +252,7 @@ class SupervisordSubprocessController(SubprocessController):
for id_ in states
if states[id_] == SubprocessStatus.RUNNING
]
- else:
- return []
+ return []
async def initialize_controller(self, config: KresConfig) -> None:
self._controller_config = config
diff --git a/python/knot_resolver/controller/supervisord/config_file.py b/python/knot_resolver/controller/supervisord/config_file.py
index 45a1a83e..05e380dc 100644
--- a/python/knot_resolver/controller/supervisord/config_file.py
+++ b/python/knot_resolver/controller/supervisord/config_file.py
@@ -36,21 +36,19 @@ class SupervisordKresID(KresID):
# the double name is checked because thats how we read it from supervisord
if val in ("cache-gc", "cache-gc:cache-gc"):
return SupervisordKresID.new(SubprocessType.GC, 0)
- elif val in ("policy-loader", "policy-loader:policy-loader"):
+ if val in ("policy-loader", "policy-loader:policy-loader"):
return SupervisordKresID.new(SubprocessType.POLICY_LOADER, 0)
- else:
- val = val.replace("kresd:kresd", "")
- return SupervisordKresID.new(SubprocessType.KRESD, int(val))
+ val = val.replace("kresd:kresd", "")
+ return SupervisordKresID.new(SubprocessType.KRESD, int(val))
def __str__(self) -> str:
if self.subprocess_type is SubprocessType.GC:
return "cache-gc"
- elif self.subprocess_type is SubprocessType.POLICY_LOADER:
+ if self.subprocess_type is SubprocessType.POLICY_LOADER:
return "policy-loader"
- elif self.subprocess_type is SubprocessType.KRESD:
+ if self.subprocess_type is SubprocessType.KRESD:
return f"kresd:kresd{self._id}"
- else:
- raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
+ raise RuntimeError(f"Unexpected subprocess type {self.subprocess_type}")
def kres_cache_gc_args(config: KresConfig) -> str:
diff --git a/python/knot_resolver/controller/supervisord/plugin/fast_rpcinterface.py b/python/knot_resolver/controller/supervisord/plugin/fast_rpcinterface.py
index c3834784..dc1703fc 100644
--- a/python/knot_resolver/controller/supervisord/plugin/fast_rpcinterface.py
+++ b/python/knot_resolver/controller/supervisord/plugin/fast_rpcinterface.py
@@ -73,7 +73,7 @@ class SupervisorNamespaceRPCInterface:
# RPC API methods
- def _getGroupAndProcess(self, name):
+ def _getGroupAndProcess(self, name): # noqa: N802
# get process to start from name
group_name, process_name = split_namespec(name)
@@ -90,7 +90,7 @@ class SupervisorNamespaceRPCInterface:
return group, process
- def startProcess(self, name, wait=True):
+ def startProcess(self, name, wait=True): # noqa: N802
"""Start a process
@param string name Process name (or ``group:name``, or ``group:*``)
@@ -108,10 +108,10 @@ class SupervisorNamespaceRPCInterface:
# eventually fail
try:
filename, argv = process.get_execv_args()
- except NotFound as why:
- raise RPCError(Faults.NO_FILE, why.args[0])
+ except NotFound as e:
+ raise RPCError(Faults.NO_FILE, e.args[0]) from e
except (BadCommand, NotExecutable, NoPermission) as why:
- raise RPCError(Faults.NOT_EXECUTABLE, why.args[0])
+ raise RPCError(Faults.NOT_EXECUTABLE, why.args[0]) from why
if process.get_state() in RUNNING_STATES:
raise RPCError(Faults.ALREADY_STARTED, name)
diff --git a/python/knot_resolver/controller/supervisord/plugin/manager_integration.py b/python/knot_resolver/controller/supervisord/plugin/manager_integration.py
index 2fc8cf94..6c575b95 100644
--- a/python/knot_resolver/controller/supervisord/plugin/manager_integration.py
+++ b/python/knot_resolver/controller/supervisord/plugin/manager_integration.py
@@ -51,7 +51,7 @@ def check_for_runnning_manager(event: ProcessStateRunningEvent) -> None:
systemd_notify(READY="1", STATUS="Ready")
-def ServerOptions_get_signal(self):
+def get_server_options_signal(self):
sig = self.signal_receiver.get_signal()
if sig == signal.SIGHUP and superd is not None:
superd.options.logger.info("received SIGHUP, forwarding to the process 'manager'")
@@ -77,7 +77,7 @@ def inject(supervisord: Supervisor, **_config: Any) -> Any: # pylint: disable=u
subscribe(ProcessStateRunningEvent, check_for_runnning_manager)
# forward SIGHUP to manager
- ServerOptions.get_signal = ServerOptions_get_signal
+ ServerOptions.get_signal = get_server_options_signal
# this method is called by supervisord when loading the plugin,
# it should return XML-RPC object, which we don't care about
diff --git a/python/knot_resolver/controller/supervisord/plugin/patch_logger.py b/python/knot_resolver/controller/supervisord/plugin/patch_logger.py
index b5f96617..db27f105 100644
--- a/python/knot_resolver/controller/supervisord/plugin/patch_logger.py
+++ b/python/knot_resolver/controller/supervisord/plugin/patch_logger.py
@@ -20,7 +20,7 @@ def empty_function(*args, **kwargs):
FORWARD_MSG_FORMAT: str = "%(name)s[%(pid)d]%(stream)s: %(data)s"
-def POutputDispatcher_log(self: POutputDispatcher, data: bytearray):
+def p_output_dispatcher_log(self: POutputDispatcher, data: bytearray):
if data:
# parse the input
if not isinstance(data, bytes):
@@ -75,7 +75,7 @@ def inject(supervisord: Supervisor, **config: Any) -> Any: # pylint: disable=us
supervisord.options.logger.handlers = supervisord_handlers
# replace output handler for subprocesses
- POutputDispatcher._log = POutputDispatcher_log
+ POutputDispatcher._log = p_output_dispatcher_log # noqa: SLF001
# we forward stdio in all cases, even when logging to syslog. This should prevent the unforturtunate
# case of swallowing an error message leaving the users confused. To make the forwarded lines obvious
diff --git a/python/knot_resolver/controller/supervisord/plugin/sd_notify.py b/python/knot_resolver/controller/supervisord/plugin/sd_notify.py
index ff32828b..4894b6f7 100644
--- a/python/knot_resolver/controller/supervisord/plugin/sd_notify.py
+++ b/python/knot_resolver/controller/supervisord/plugin/sd_notify.py
@@ -1,6 +1,7 @@
# type: ignore
-# pylint: disable=protected-access
+# ruff: noqa: SLF001
# pylint: disable=c-extension-no-member
+
import os
import signal
import time
@@ -46,7 +47,7 @@ class NotifySocketDispatcher:
res: Optional[Tuple[int, bytes]] = notify.read_message(self.fd)
if res is None:
- return None # there was some junk
+ return # there was some junk
pid, data = res
# pylint: disable=undefined-loop-variable
@@ -55,7 +56,7 @@ class NotifySocketDispatcher:
break
else:
logger.warn(f"ignoring ready notification from unregistered PID={pid}")
- return None
+ return
if data.startswith(b"READY=1"):
# handle case, when some process is really ready
@@ -84,7 +85,7 @@ class NotifySocketDispatcher:
else:
# handle case, when we got something unexpected
logger.warn(f"ignoring unrecognized data on $NOTIFY_SOCKET sent from PID={pid}, data='{data!r}'")
- return None
+ return
def handle_write_event(self):
raise ValueError("this dispatcher is not writable")
@@ -186,8 +187,7 @@ def chain(first: Callable[..., U], second: Callable[[U], T]) -> Callable[..., T]
res = first(*args, **kwargs)
if isinstance(res, tuple):
return second(*res)
- else:
- return second(res)
+ return second(res)
return wrapper
@@ -204,7 +204,7 @@ def append(first: Callable[..., T], second: Callable[..., None]) -> Callable[...
def monkeypatch(supervisord: Supervisor) -> None:
"""Inject ourselves into supervisord code"""
- # append notify socket handler to event loopo
+ # append notify socket handler to event loop
supervisord.get_process_map = chain(supervisord.get_process_map, partial(supervisord_get_process_map, supervisord))
# prepend timeout handler to transition method
diff --git a/python/knot_resolver/datamodel/config_schema.py b/python/knot_resolver/datamodel/config_schema.py
index 16adbc16..641032dc 100644
--- a/python/knot_resolver/datamodel/config_schema.py
+++ b/python/knot_resolver/datamodel/config_schema.py
@@ -222,7 +222,7 @@ class KresConfig(ConfigSchema):
# raise all validation errors
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError("/", errs)
def render_lua(self) -> str:
diff --git a/python/knot_resolver/datamodel/forward_schema.py b/python/knot_resolver/datamodel/forward_schema.py
index 3e3af21d..6b693e6b 100644
--- a/python/knot_resolver/datamodel/forward_schema.py
+++ b/python/knot_resolver/datamodel/forward_schema.py
@@ -59,7 +59,7 @@ class ForwardSchema(ConfigSchema):
for server in servers:
if isinstance(server, IPAddressOptionalPort) and server.port:
return int(server.port) != 53
- elif isinstance(server, ForwardServerSchema):
+ if isinstance(server, ForwardServerSchema):
return is_port_custom(server.address.to_std())
return False
diff --git a/python/knot_resolver/datamodel/globals.py b/python/knot_resolver/datamodel/globals.py
index 88f95c2a..044cf475 100644
--- a/python/knot_resolver/datamodel/globals.py
+++ b/python/knot_resolver/datamodel/globals.py
@@ -24,10 +24,14 @@ from typing import Optional
class Context:
resolve_root: Optional[Path]
strict_validation: bool
+ permissions_default: bool
- def __init__(self, resolve_root: Optional[Path], strict_validation: bool = True) -> None:
+ def __init__(
+ self, resolve_root: Optional[Path], strict_validation: bool = True, permissions_default: bool = True
+ ) -> None:
self.resolve_root = resolve_root
self.strict_validation = strict_validation
+ self.permissions_default = permissions_default
_global_context: Context = Context(None)
@@ -59,3 +63,7 @@ def get_resolve_root() -> Path:
def get_strict_validation() -> bool:
return _global_context.strict_validation
+
+
+def get_permissions_default() -> bool:
+ return _global_context.permissions_default
diff --git a/python/knot_resolver/datamodel/logging_schema.py b/python/knot_resolver/datamodel/logging_schema.py
index b98039be..6c398a85 100644
--- a/python/knot_resolver/datamodel/logging_schema.py
+++ b/python/knot_resolver/datamodel/logging_schema.py
@@ -129,8 +129,7 @@ class LoggingSchema(ConfigSchema):
if not is_obj_type_valid(target, cast(Type[Any], LogTargetEnum)):
raise ValueError(f"logging target '{target}' read from $KRES_LOGGING_TARGET is invalid")
return cast(LogTargetEnum, target)
- else:
- return raw.target
+ return raw.target
def _validate(self):
if self.groups is None:
diff --git a/python/knot_resolver/datamodel/network_schema.py b/python/knot_resolver/datamodel/network_schema.py
index 3063cef2..a71c006b 100644
--- a/python/knot_resolver/datamodel/network_schema.py
+++ b/python/knot_resolver/datamodel/network_schema.py
@@ -116,10 +116,10 @@ class ListenSchema(ConfigSchema):
if origin.port:
return origin.port
# default port number based on kind
- elif origin.interface:
+ if origin.interface:
if origin.kind == "dot":
return PortNumber(853)
- elif origin.kind in ["doh-legacy", "doh2"]:
+ if origin.kind in ["doh-legacy", "doh2"]:
return PortNumber(443)
return PortNumber(53)
return None
diff --git a/python/knot_resolver/datamodel/types/base_types.py b/python/knot_resolver/datamodel/types/base_types.py
index c2d60312..2dce91a9 100644
--- a/python/knot_resolver/datamodel/types/base_types.py
+++ b/python/knot_resolver/datamodel/types/base_types.py
@@ -1,6 +1,9 @@
+# ruff: noqa: SLF001
+
import re
-from typing import Any, Dict, Pattern, Type
+from typing import Any, Dict, Type
+from knot_resolver.utils.compat.typing import Pattern
from knot_resolver.utils.modeling import BaseValueType
@@ -191,7 +194,7 @@ class UnitBase(StrBase):
val, unit = grouped.groups()
if unit is None:
raise ValueError(f"Missing units. Accepted units are {list(type(self)._units.keys())}", object_path)
- elif unit not in type(self)._units:
+ if unit not in type(self)._units:
raise ValueError(
f"Used unexpected unit '{unit}' for {type(self).__name__}."
f" Accepted units are {list(type(self)._units.keys())}",
diff --git a/python/knot_resolver/datamodel/types/files.py b/python/knot_resolver/datamodel/types/files.py
index c2962729..2d22d075 100644
--- a/python/knot_resolver/datamodel/types/files.py
+++ b/python/knot_resolver/datamodel/types/files.py
@@ -3,11 +3,11 @@ import stat
from enum import Flag, auto
from grp import getgrnam
from pathlib import Path
-from pwd import getpwnam
+from pwd import getpwnam, getpwuid
from typing import Any, Dict, Tuple, Type, TypeVar
from knot_resolver.constants import GROUP, USER
-from knot_resolver.datamodel.globals import get_resolve_root, get_strict_validation
+from knot_resolver.datamodel.globals import get_permissions_default, get_resolve_root, get_strict_validation
from knot_resolver.utils.modeling.base_value_type import BaseValueType
@@ -157,8 +157,14 @@ def _kres_accessible(dest_path: Path, perm_mode: _PermissionMode) -> bool:
_PermissionMode.EXECUTE: [stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH],
}
- user_uid = getpwnam(USER).pw_uid
- user_gid = getgrnam(GROUP).gr_gid
+ if get_permissions_default():
+ user_uid = getpwnam(USER).pw_uid
+ user_gid = getgrnam(GROUP).gr_gid
+ username = USER
+ else:
+ user_uid = os.getuid()
+ user_gid = os.getgid()
+ username = getpwuid(user_uid).pw_name
dest_stat = os.stat(dest_path)
dest_uid = dest_stat.st_uid
@@ -168,13 +174,13 @@ def _kres_accessible(dest_path: Path, perm_mode: _PermissionMode) -> bool:
def accessible(perm: _PermissionMode) -> bool:
if user_uid == dest_uid:
return bool(dest_mode & chflags[perm][0])
- b_groups = os.getgrouplist(os.getlogin(), user_gid)
+ b_groups = os.getgrouplist(username, user_gid)
if user_gid == dest_gid or dest_gid in b_groups:
return bool(dest_mode & chflags[perm][1])
return bool(dest_mode & chflags[perm][2])
# __iter__ for class enum.Flag added in python3.11
- # 'for perm in perm_mode:' failes for <=python3.11
+ # 'for perm in perm_mode:' fails for <=python3.11
for perm in _PermissionMode:
if perm in perm_mode:
if not accessible(perm):
diff --git a/python/knot_resolver/datamodel/types/types.py b/python/knot_resolver/datamodel/types/types.py
index 1a125720..6cd1e4cb 100644
--- a/python/knot_resolver/datamodel/types/types.py
+++ b/python/knot_resolver/datamodel/types/types.py
@@ -14,12 +14,12 @@ class IntPositive(IntRangeBase):
_min: int = 1
-class Int0_512(IntRangeBase):
+class Int0_512(IntRangeBase): # noqa: N801
_min: int = 0
_max: int = 512
-class Int0_65535(IntRangeBase):
+class Int0_65535(IntRangeBase): # noqa: N801
_min: int = 0
_max: int = 65_535
@@ -147,7 +147,7 @@ class DomainName(StrBase):
object_path,
) from e
- if type(self)._re.match(punycode):
+ if type(self)._re.match(punycode): # noqa: SLF001
self._punycode = punycode
else:
raise ValueError(
@@ -208,9 +208,9 @@ class InterfacePort(StrBase):
try:
self.if_name = InterfaceName(parts[0])
except ValueError as e2:
- raise ValueError(
- f"expected IP address or interface name, got '{parts[0]}'.", object_path
- ) from e1 and e2
+ raise ValueError(f"expected IP address or interface name, got '{parts[0]}'.", object_path) from (
+ e1 and e2
+ )
self.port = PortNumber.from_str(parts[1], object_path)
else:
raise ValueError(f"expected '<ip-address|interface-name>@<port>', got '{source_value}'.", object_path)
@@ -232,9 +232,9 @@ class InterfaceOptionalPort(StrBase):
try:
self.if_name = InterfaceName(parts[0])
except ValueError as e2:
- raise ValueError(
- f"expected IP address or interface name, got '{parts[0]}'.", object_path
- ) from e1 and e2
+ raise ValueError(f"expected IP address or interface name, got '{parts[0]}'.", object_path) from (
+ e1 and e2
+ )
if len(parts) == 2:
self.port = PortNumber.from_str(parts[1], object_path)
else:
diff --git a/python/knot_resolver/exceptions.py b/python/knot_resolver/exceptions.py
index 3e90b0bc..c84c5ea6 100644
--- a/python/knot_resolver/exceptions.py
+++ b/python/knot_resolver/exceptions.py
@@ -1,4 +1,4 @@
-class KresBaseException(Exception):
+class KresBaseException(Exception): # noqa: N818
"""
Base class for all custom exceptions we use in Knot Resolver.
"""
diff --git a/python/knot_resolver/manager/logging.py b/python/knot_resolver/manager/logging.py
index c9b44653..39d0c6bf 100644
--- a/python/knot_resolver/manager/logging.py
+++ b/python/knot_resolver/manager/logging.py
@@ -21,16 +21,15 @@ def get_log_format(config: KresConfig) -> str:
if os.environ.get("KRES_SUPRESS_LOG_PREFIX") == "true":
# In this case, we are running under supervisord and it's adding prefixes to our output
return "[%(levelname)s] %(name)s: %(message)s"
- else:
- # In this case, we are running standalone during inicialization and we need to add a prefix to each line
- # by ourselves to make it consistent
- assert config.logging.target != "syslog"
- stream = ""
- if config.logging.target == "stderr":
- stream = " (stderr)"
-
- pid = os.getpid()
- return f"%(asctime)s manager[{pid}]{stream}: [%(levelname)s] %(name)s: %(message)s"
+ # In this case, we are running standalone during inicialization and we need to add a prefix to each line
+ # by ourselves to make it consistent
+ assert config.logging.target != "syslog"
+ stream = ""
+ if config.logging.target == "stderr":
+ stream = " (stderr)"
+
+ pid = os.getpid()
+ return f"%(asctime)s manager[{pid}]{stream}: [%(levelname)s] %(name)s: %(message)s"
async def _set_log_level(config: KresConfig) -> None:
diff --git a/python/knot_resolver/manager/manager.py b/python/knot_resolver/manager/manager.py
index 105b2081..afc5e678 100644
--- a/python/knot_resolver/manager/manager.py
+++ b/python/knot_resolver/manager/manager.py
@@ -7,7 +7,7 @@ from secrets import token_hex
from subprocess import SubprocessError
from typing import Any, Callable, List, Optional
-from knot_resolver.controller.exceptions import SubprocessControllerException
+from knot_resolver.controller.exceptions import SubprocessControllerError
from knot_resolver.controller.interface import Subprocess, SubprocessController, SubprocessStatus, SubprocessType
from knot_resolver.controller.registered_workers import command_registered_workers, get_registered_workers_kresids
from knot_resolver.datamodel import KresConfig
@@ -69,7 +69,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes
"Trying to create an instance of KresManager using normal constructor. Please use "
"`KresManager.get_instance()` instead"
)
- assert False
+ raise AssertionError
self._workers: List[Subprocess] = []
self._gc: Optional[Subprocess] = None
@@ -93,7 +93,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes
"""
inst = KresManager(shutdown_trigger, _i_know_what_i_am_doing=True)
- await inst._async_init(subprocess_controller, config_store) # pylint: disable=protected-access
+ await inst._async_init(subprocess_controller, config_store) # noqa: SLF001
return inst
async def _async_init(self, subprocess_controller: SubprocessController, config_store: ConfigStore) -> None:
@@ -225,7 +225,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes
# if it keeps running, the config is valid and others will soon join as well
# if it crashes and the startup fails, then well, it's not running anymore... :)
await self._spawn_new_worker(new)
- except (SubprocessError, SubprocessControllerException):
+ except (SubprocessError, SubprocessControllerError):
logger.error("Kresd with the new config failed to start, rejecting config")
return Result.err("canary kresd process failed to start. Config might be invalid.")
@@ -240,7 +240,6 @@ class KresManager: # pylint: disable=too-many-instance-attributes
await self._collect_already_running_workers()
async def reset_workers_policy_rules(self, _config: KresConfig) -> None:
-
# command all running 'kresd' workers to reset their old policy rules,
# unless the workers have already been started with a new config so reset is not needed
if self._workers_reset_needed and get_registered_workers_kresids():
@@ -256,7 +255,6 @@ class KresManager: # pylint: disable=too-many-instance-attributes
)
async def set_new_tls_sticket_secret(self, config: KresConfig) -> None:
-
if config.network.tls.sticket_secret or config.network.tls.sticket_secret_file:
logger.debug("User-configured TLS resumption secret found - skipping auto-generation.")
return
@@ -283,13 +281,15 @@ class KresManager: # pylint: disable=too-many-instance-attributes
else:
logger.debug("Stopping cache GC")
await self._stop_gc()
- except SubprocessControllerException as e:
+ except SubprocessControllerError as e:
if _noretry:
raise
- elif self._fix_counter.is_too_high():
+ if self._fix_counter.is_too_high():
logger.error(f"Failed to apply config: {e}")
logger.error("There have already been problems recently, refusing to try to fix it.")
- await self.forced_shutdown() # possible improvement - the person who requested this change won't get a response this way
+ await (
+ self.forced_shutdown()
+ ) # possible improvement - the person who requested this change won't get a response this way
else:
logger.error(f"Failed to apply config: {e}")
logger.warning("Reloading system state and trying again.")
@@ -310,7 +310,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes
while not self._is_policy_loader_exited():
await asyncio.sleep(1)
- except (SubprocessError, SubprocessControllerException) as e:
+ except (SubprocessError, SubprocessControllerError) as e:
logger.error(f"Failed to load policy rules: {e}")
return Result.err("kresd 'policy-loader' process failed to start. Config might be invalid.")
@@ -358,7 +358,7 @@ class KresManager: # pylint: disable=too-many-instance-attributes
logger.error("Failed attempting to fix an error. Forcefully shutting down.", exc_info=True)
await self.forced_shutdown()
- async def _watchdog(self) -> None: # pylint: disable=too-many-branches
+ async def _watchdog(self) -> None: # pylint: disable=too-many-branches # noqa: PLR0912
while True:
await asyncio.sleep(WATCHDOG_INTERVAL_SEC)
diff --git a/python/knot_resolver/manager/metrics/collect.py b/python/knot_resolver/manager/metrics/collect.py
index cc9a0712..733f40cc 100644
--- a/python/knot_resolver/manager/metrics/collect.py
+++ b/python/knot_resolver/manager/metrics/collect.py
@@ -19,8 +19,7 @@ async def collect_kresd_workers_metrics(config: KresConfig) -> Optional[Dict[Kre
cmd = "collect_lazy_statistics()"
logger.debug(f"Collecting stats from all kresd workers using method '{cmd}'")
- metrics_dict = await command_registered_workers(cmd)
- return metrics_dict
+ return await command_registered_workers(cmd)
async def report_json(config: KresConfig) -> bytes:
diff --git a/python/knot_resolver/manager/metrics/prometheus.py b/python/knot_resolver/manager/metrics/prometheus.py
index ba5f6334..5dd0d171 100644
--- a/python/knot_resolver/manager/metrics/prometheus.py
+++ b/python/knot_resolver/manager/metrics/prometheus.py
@@ -19,11 +19,15 @@ if importlib.util.find_spec("prometheus_client"):
logger = logging.getLogger(__name__)
if _prometheus_client:
-
from prometheus_client import exposition # type: ignore
from prometheus_client.bridge.graphite import GraphiteBridge # type: ignore
- from prometheus_client.core import GaugeMetricFamily # type: ignore
- from prometheus_client.core import REGISTRY, CounterMetricFamily, HistogramMetricFamily, Metric
+ from prometheus_client.core import (
+ REGISTRY,
+ CounterMetricFamily,
+ GaugeMetricFamily, # type: ignore
+ HistogramMetricFamily,
+ Metric,
+ )
_graphite_bridge: Optional[GraphiteBridge] = None
@@ -50,15 +54,15 @@ if _prometheus_client:
sid = str(instance_id)
# response latency histogram
- BUCKET_NAMES_IN_RESOLVER = ("1ms", "10ms", "50ms", "100ms", "250ms", "500ms", "1000ms", "1500ms", "slow")
- BUCKET_NAMES_PROMETHEUS = ("0.001", "0.01", "0.05", "0.1", "0.25", "0.5", "1.0", "1.5", "+Inf")
+ bucket_names_in_resolver = ("1ms", "10ms", "50ms", "100ms", "250ms", "500ms", "1000ms", "1500ms", "slow")
+ bucket_names_in_prometheus = ("0.001", "0.01", "0.05", "0.1", "0.25", "0.5", "1.0", "1.5", "+Inf")
yield _histogram(
"resolver_response_latency",
"Time it takes to respond to queries in seconds",
label=("instance_id", sid),
buckets=[
(bnp, metrics["answer"][f"{duration}"])
- for bnp, duration in zip(BUCKET_NAMES_PROMETHEUS, BUCKET_NAMES_IN_RESOLVER)
+ for bnp, duration in zip(bucket_names_in_prometheus, bucket_names_in_resolver)
],
sum_value=metrics["answer"]["sum_ms"] / 1_000,
)
diff --git a/python/knot_resolver/manager/server.py b/python/knot_resolver/manager/server.py
index 972b167f..90fd4d3b 100644
--- a/python/knot_resolver/manager/server.py
+++ b/python/knot_resolver/manager/server.py
@@ -8,6 +8,7 @@ import sys
from functools import partial
from http import HTTPStatus
from pathlib import Path
+from pwd import getpwuid
from time import time
from typing import Any, Dict, List, Literal, Optional, Set, Union, cast
@@ -17,9 +18,9 @@ from aiohttp.web_app import Application
from aiohttp.web_response import json_response
from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
-from knot_resolver.constants import CONFIG_FILE
+from knot_resolver.constants import CONFIG_FILE, USER
from knot_resolver.controller import get_best_controller_implementation
-from knot_resolver.controller.exceptions import SubprocessControllerExecException
+from knot_resolver.controller.exceptions import SubprocessControllerExecError
from knot_resolver.controller.registered_workers import command_single_registered_worker
from knot_resolver.datamodel import kres_config_json_schema
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
@@ -240,7 +241,6 @@ class Server:
raise web.HTTPMovedPermanently("/metrics/json")
async def _handler_metrics_json(self, _request: web.Request) -> web.Response:
-
config = self.config_store.get()
return web.Response(
@@ -250,7 +250,6 @@ class Server:
)
async def _handler_metrics_prometheus(self, _request: web.Request) -> web.Response:
-
metrics_report = await metrics.report_prometheus()
if not metrics_report:
raise web.HTTPNotFound()
@@ -403,9 +402,8 @@ async def _load_raw_config(config: Union[Path, Dict[str, Any]]) -> Dict[str, Any
raise KresManagerException(
f"Manager is configured to load config file at {config} on startup, but the file does not exist."
)
- else:
- logger.info(f"Loading configuration from '{config}' file.")
- config = try_to_parse(await readfile(config))
+ logger.info(f"Loading configuration from '{config}' file.")
+ config = try_to_parse(await readfile(config))
# validate the initial configuration
assert isinstance(config, dict)
@@ -413,14 +411,12 @@ async def _load_raw_config(config: Union[Path, Dict[str, Any]]) -> Dict[str, Any
async def _load_config(config: Dict[str, Any]) -> KresConfig:
- config_validated = KresConfig(config)
- return config_validated
+ return KresConfig(config)
async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
config_validated = await _load_config(config)
- config_store = ConfigStore(config_validated)
- return config_store
+ return ConfigStore(config_validated)
async def _init_manager(config_store: ConfigStore, server: Server) -> KresManager:
@@ -476,11 +472,10 @@ def _lock_working_directory(attempt: int = 0) -> None:
"Another manager is running in the same working directory."
f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
) from e
- else:
- raise KresManagerException(
- "Another manager is running in the same working directory."
- f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
- ) from e
+ raise KresManagerException(
+ "Another manager is running in the same working directory."
+ f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
+ ) from e
# now we know that we are the only manager running in this directory
@@ -506,7 +501,7 @@ async def _sigterm_while_shutting_down():
sys.exit(128 + signal.SIGTERM)
-async def start_server(config: Path = CONFIG_FILE) -> int:
+async def start_server(config: Path = CONFIG_FILE) -> int: # noqa: PLR0915
# This function is quite long, but it describes how manager runs. So let's silence pylint
# pylint: disable=too-many-statements
@@ -517,6 +512,14 @@ async def start_server(config: Path = CONFIG_FILE) -> int:
# Block signals during initialization to force their processing once everything is ready
signal.pthread_sigmask(signal.SIG_BLOCK, Server.all_handled_signals())
+ # Check if we are running under the intended user, if not, log a warning message
+ pw_username = getpwuid(os.getuid()).pw_name
+ if pw_username != USER:
+ logger.warning(
+ f"Knot Resolver does not run as the default '{USER}' user, but as '{pw_username}' instead."
+ " This may or may not affect the configuration validation and the proper functioning of the resolver."
+ )
+
# before starting server, initialize the subprocess controller, config store, etc. Any errors during inicialization
# are fatal
try:
@@ -527,8 +530,10 @@ async def start_server(config: Path = CONFIG_FILE) -> int:
config_raw = await _load_raw_config(config)
# before processing any configuration, set validation context
- # - resolve_root = root against which all relative paths will be resolved
- set_global_validation_context(Context(config.parent, True))
+ # - resolve_root: root against which all relative paths will be resolved
+ # - strict_validation: check for path existence during configuration validation
+ # - permissions_default: validate dirs/files rwx permissions against default user:group in constants
+ set_global_validation_context(Context(config.parent, True, False))
# We want to change cwd as soon as possible. Some parts of the codebase are using os.getcwd() to get the
# working directory.
@@ -567,7 +572,7 @@ async def start_server(config: Path = CONFIG_FILE) -> int:
# After we have loaded the configuration, we can start worring about subprocess management.
manager = await _init_manager(config_store, server)
- except SubprocessControllerExecException as e:
+ except SubprocessControllerExecError as e:
# if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would
# be a subprocess manager like supervisord, which wants to make sure the manager runs under supervisord in
# the process tree. So now we stop everything, and exec what we are told to. We are assuming, that the thing
diff --git a/python/knot_resolver/utils/__init__.py b/python/knot_resolver/utils/__init__.py
index edc36fca..428485fa 100644
--- a/python/knot_resolver/utils/__init__.py
+++ b/python/knot_resolver/utils/__init__.py
@@ -24,8 +24,7 @@ def ignore_exceptions_optional(
except BaseException as e:
if isinstance(e, exceptions):
return default
- else:
- raise e
+ raise e
return f
diff --git a/python/knot_resolver/utils/compat/__init__.py b/python/knot_resolver/utils/compat/__init__.py
index 53993f6c..52ffaa9c 100644
--- a/python/knot_resolver/utils/compat/__init__.py
+++ b/python/knot_resolver/utils/compat/__init__.py
@@ -1,3 +1,3 @@
-from . import asyncio
+from . import asyncio, typing
-__all__ = ["asyncio"]
+__all__ = ["asyncio", "typing"]
diff --git a/python/knot_resolver/utils/compat/asyncio.py b/python/knot_resolver/utils/compat/asyncio.py
index 9e10e6c6..4b6c6852 100644
--- a/python/knot_resolver/utils/compat/asyncio.py
+++ b/python/knot_resolver/utils/compat/asyncio.py
@@ -20,11 +20,9 @@ async def to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
return await asyncio.to_thread(func, *args, **kwargs) # type: ignore[attr-defined]
# earlier versions, run with default executor
- else:
- loop = asyncio.get_event_loop()
- pfunc = functools.partial(func, *args, **kwargs)
- res = await loop.run_in_executor(None, pfunc)
- return res
+ loop = asyncio.get_event_loop()
+ pfunc = functools.partial(func, *args, **kwargs)
+ return await loop.run_in_executor(None, pfunc)
def async_in_a_thread(func: Callable[..., T]) -> Callable[..., Coroutine[None, None, T]]:
@@ -45,12 +43,11 @@ def create_task(coro: Awaitable[T], name: Optional[str] = None) -> "asyncio.Task
return asyncio.create_task(coro) # type: ignore[attr-defined,arg-type]
# earlier versions, use older function
- else:
- return asyncio.ensure_future(coro)
+ return asyncio.ensure_future(coro)
def is_event_loop_running() -> bool:
- loop = events._get_running_loop() # pylint: disable=protected-access
+ loop = events._get_running_loop() # noqa: SLF001
return loop is not None and loop.is_running()
@@ -64,7 +61,7 @@ def run(coro: Awaitable[T], debug: Optional[bool] = None) -> T:
return asyncio.run(coro, debug=debug) # type: ignore[attr-defined,arg-type]
# earlier versions, use backported version of the function
- if events._get_running_loop() is not None: # pylint: disable=protected-access
+ if events._get_running_loop() is not None: # noqa: SLF001
raise RuntimeError("asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(coro):
diff --git a/python/knot_resolver/utils/compat/typing.py b/python/knot_resolver/utils/compat/typing.py
new file mode 100644
index 00000000..15654d34
--- /dev/null
+++ b/python/knot_resolver/utils/compat/typing.py
@@ -0,0 +1,8 @@
+# The 'typing.Pattern' is deprecated since python 3.8 and is removed in version 3.12.
+# https://docs.python.org/3.9/library/typing.html#typing.Pattern
+try:
+ from typing import Pattern
+except ImportError:
+ from re import Pattern
+
+__all__ = ["Pattern"]
diff --git a/python/knot_resolver/utils/modeling/base_schema.py b/python/knot_resolver/utils/modeling/base_schema.py
index 13539fe0..73cd1d0a 100644
--- a/python/knot_resolver/utils/modeling/base_schema.py
+++ b/python/knot_resolver/utils/modeling/base_schema.py
@@ -70,20 +70,20 @@ class Serializable(ABC):
if isinstance(obj, Serializable):
return obj.to_dict()
- elif isinstance(obj, (BaseValueType, BaseGenericTypeWrapper)):
+ if isinstance(obj, (BaseValueType, BaseGenericTypeWrapper)):
o = obj.serialize()
# if Serializable.is_serializable(o):
return Serializable.serialize(o)
# return o
- elif isinstance(obj, list):
+ if isinstance(obj, list):
res: List[Any] = [Serializable.serialize(i) for i in cast(List[Any], obj)]
return res
return obj
-class _lazy_default(Generic[T], Serializable):
+class _LazyDefault(Generic[T], Serializable):
"""
Wrapper for default values BaseSchema classes which deffers their instantiation until the schema
itself is being instantiated
@@ -104,7 +104,7 @@ class _lazy_default(Generic[T], Serializable):
def lazy_default(constructor: Callable[..., T], *args: Any, **kwargs: Any) -> T:
"""We use a factory function because you can't lie about the return type in `__new__`"""
- return _lazy_default(constructor, *args, **kwargs) # type: ignore
+ return _LazyDefault(constructor, *args, **kwargs) # type: ignore
def _split_docstring(docstring: str) -> Tuple[str, Optional[str]]:
@@ -169,51 +169,50 @@ def _get_properties_schema(typ: Type[Any]) -> Dict[Any, Any]:
return schema
-def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
+def _describe_type(typ: Type[Any]) -> Dict[Any, Any]: # noqa: PLR0911, PLR0912
# pylint: disable=too-many-branches
if inspect.isclass(typ) and issubclass(typ, BaseSchema):
return typ.json_schema(include_schema_definition=False)
- elif inspect.isclass(typ) and issubclass(typ, BaseValueType):
+ if inspect.isclass(typ) and issubclass(typ, BaseValueType):
return typ.json_schema()
- elif is_generic_type_wrapper(typ):
+ if is_generic_type_wrapper(typ):
wrapped = get_generic_type_wrapper_argument(typ)
return _describe_type(wrapped)
- elif is_none_type(typ):
+ if is_none_type(typ):
return {"type": "null"}
- elif typ == int:
+ if typ is int:
return {"type": "integer"}
- elif typ == bool:
+ if typ is bool:
return {"type": "boolean"}
- elif typ == str:
+ if typ is str:
return {"type": "string"}
- elif is_literal(typ):
+ if is_literal(typ):
lit = get_generic_type_arguments(typ)
return {"type": "string", "enum": lit}
- elif is_optional(typ):
+ if is_optional(typ):
desc = _describe_type(get_optional_inner_type(typ))
if "type" in desc:
desc["type"] = [desc["type"], "null"]
return desc
- else:
- return {"anyOf": [{"type": "null"}, desc]}
+ return {"anyOf": [{"type": "null"}, desc]}
- elif is_union(typ):
+ if is_union(typ):
variants = get_generic_type_arguments(typ)
return {"anyOf": [_describe_type(v) for v in variants]}
- elif is_list(typ):
+ if is_list(typ):
return {"type": "array", "items": _describe_type(get_generic_type_argument(typ))}
- elif is_dict(typ):
+ if is_dict(typ):
key, val = get_generic_type_arguments(typ)
if inspect.isclass(key) and issubclass(key, BaseValueType):
@@ -221,11 +220,11 @@ def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
key.__str__ is not BaseValueType.__str__
), "To support derived 'BaseValueType', __str__ must be implemented."
else:
- assert key == str, "We currently do not support any other keys then strings"
+ assert key is str, "We currently do not support any other keys then strings"
return {"type": "object", "additionalProperties": _describe_type(val)}
- elif inspect.isclass(typ) and issubclass(typ, enum.Enum): # same as our is_enum(typ), but inlined for type checker
+ if inspect.isclass(typ) and issubclass(typ, enum.Enum): # same as our is_enum(typ), but inlined for type checker
return {"type": "string", "enum": [str(v) for v in typ]}
raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
@@ -257,7 +256,7 @@ class ObjectMapper:
errs.append(e)
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return tuple(res)
@@ -275,7 +274,7 @@ class ObjectMapper:
errs.append(e)
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return res
except AttributeError as e:
@@ -303,7 +302,7 @@ class ObjectMapper:
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, child_exceptions=errs)
return res
@@ -311,17 +310,16 @@ class ObjectMapper:
# we are willing to cast any primitive value to string, but no compound values are allowed
if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
return str(obj)
- elif is_obj_type(obj, bool):
+ if is_obj_type(obj, bool):
raise DataValidationError(
"Expected str, found bool. Be careful, that YAML parsers consider even"
' "no" and "yes" as a bool. Search for the Norway Problem for more'
" details. And please use quotes explicitly.",
object_path,
)
- else:
- raise DataValidationError(
- f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
- )
+ raise DataValidationError(
+ f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
+ )
def _create_int(self, obj: Any, object_path: str) -> int:
# we don't want to make an int out of anything else than other int
@@ -345,21 +343,18 @@ class ObjectMapper:
inner: Type[Any] = get_optional_inner_type(tp)
if obj is None:
return None
- else:
- return self.map_object(inner, obj, object_path=object_path)
+ return self.map_object(inner, obj, object_path=object_path)
def _create_bool(self, obj: Any, object_path: str) -> bool:
if is_obj_type(obj, bool):
return obj
- else:
- raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
+ raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
def _create_literal(self, tp: Type[Any], obj: Any, object_path: str) -> Any:
expected = get_generic_type_arguments(tp)
if obj in expected:
return obj
- else:
- raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
+ raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
def _create_base_schema_object(self, tp: Type[Any], obj: Any, object_path: str) -> "BaseSchema":
if isinstance(obj, (dict, BaseSchema)):
@@ -370,24 +365,22 @@ class ObjectMapper:
if isinstance(obj, tp):
# if we already have a custom value type, just pass it through
return obj
- else:
- # no validation performed, the implementation does it in the constuctor
- try:
- return tp(obj, object_path=object_path)
- except ValueError as e:
- if len(e.args) > 0 and isinstance(e.args[0], str):
- msg = e.args[0]
- else:
- msg = f"Failed to validate value against {tp} type"
- raise DataValidationError(msg, object_path) from e
+ # no validation performed, the implementation does it in the constuctor
+ try:
+ return tp(obj, object_path=object_path)
+ except ValueError as e:
+ if len(e.args) > 0 and isinstance(e.args[0], str):
+ msg = e.args[0]
+ else:
+ msg = f"Failed to validate value against {tp} type"
+ raise DataValidationError(msg, object_path) from e
def _create_default(self, obj: Any) -> Any:
- if isinstance(obj, _lazy_default):
+ if isinstance(obj, _LazyDefault):
return obj.instantiate() # type: ignore
- else:
- return obj
+ return obj
- def map_object(
+ def map_object( # noqa: PLR0911, PLR0912
self,
tp: Type[Any],
obj: Any,
@@ -409,101 +402,98 @@ class ObjectMapper:
return self._create_default(default)
# NoneType
- elif is_none_type(tp):
+ if is_none_type(tp):
if obj is None:
return None
- else:
- raise DataValidationError(f"expected None, found '{obj}'.", object_path)
+ raise DataValidationError(f"expected None, found '{obj}'.", object_path)
# Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
- elif is_optional(tp):
+ if is_optional(tp):
return self._create_optional(tp, obj, object_path)
# Union[*variants]
- elif is_union(tp):
+ if is_union(tp):
return self._create_union(tp, obj, object_path)
# after this, there is no place for a None object
- elif obj is None:
+ if obj is None:
raise DataValidationError(f"unexpected value 'None' for type {tp}", object_path)
# int
- elif tp == int:
+ if tp is int:
return self._create_int(obj, object_path)
# str
- elif tp == str:
+ if tp is str:
return self._create_str(obj, object_path)
# bool
- elif tp == bool:
+ if tp is bool:
return self._create_bool(obj, object_path)
# float
- elif tp == float:
+ if tp is float:
raise NotImplementedError(
"Floating point values are not supported in the object mapper."
" Please implement them and be careful with type coercions"
)
# Literal[T]
- elif is_literal(tp):
+ if is_literal(tp):
return self._create_literal(tp, obj, object_path)
# Dict[K,V]
- elif is_dict(tp):
+ if is_dict(tp):
return self._create_dict(tp, obj, object_path)
# any Enums (probably used only internally in DataValidator)
- elif is_enum(tp):
+ if is_enum(tp):
if isinstance(obj, tp):
return obj
- else:
- raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path)
+ raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path)
# List[T]
- elif is_list(tp):
+ if is_list(tp):
return self._create_list(tp, obj, object_path)
# Tuple[A,B,C,D,...]
- elif is_tuple(tp):
+ if is_tuple(tp):
return self._create_tuple(tp, obj, object_path)
# type of obj and cls type match
- elif is_obj_type(obj, tp):
+ if is_obj_type(obj, tp):
return obj
# when the specified type is Any, just return the given value
# on mypy version 1.11.0 comparison-overlap error started popping up
# https://github.com/python/mypy/issues/17665
- elif tp == Any: # type: ignore[comparison-overlap]
+ if tp == Any: # type: ignore[comparison-overlap]
return obj
# BaseValueType subclasses
- elif inspect.isclass(tp) and issubclass(tp, BaseValueType):
+ if inspect.isclass(tp) and issubclass(tp, BaseValueType):
return self.create_value_type_object(tp, obj, object_path)
# BaseGenericTypeWrapper subclasses
- elif is_generic_type_wrapper(tp):
+ if is_generic_type_wrapper(tp):
inner_type = get_generic_type_wrapper_argument(tp)
obj_valid = self.map_object(inner_type, obj, object_path)
return tp(obj_valid, object_path=object_path) # type: ignore
# nested BaseSchema subclasses
- elif inspect.isclass(tp) and issubclass(tp, BaseSchema):
+ if inspect.isclass(tp) and issubclass(tp, BaseSchema):
return self._create_base_schema_object(tp, obj, object_path)
# if the object matches, just pass it through
- elif inspect.isclass(tp) and isinstance(obj, tp):
+ if inspect.isclass(tp) and isinstance(obj, tp):
return obj
# default error handler
- else:
- raise DataValidationError(
- f"Type {tp} cannot be parsed. This is a implementation error. "
- "Please fix your types in the class or improve the parser/validator.",
- object_path,
- )
+ raise DataValidationError(
+ f"Type {tp} cannot be parsed. This is a implementation error. "
+ "Please fix your types in the class or improve the parser/validator.",
+ object_path,
+ )
def is_obj_type_valid(self, obj: Any, tp: Type[Any]) -> bool:
"""
@@ -582,7 +572,7 @@ class ObjectMapper:
if len(errs) == 1:
raise errs[0]
- elif len(errs) > 1:
+ if len(errs) > 1:
raise AggregateDataValidationError(object_path, errs)
return used_keys
@@ -596,11 +586,10 @@ class ObjectMapper:
if argc == 1:
# it is a static method
return func(source)
- elif argc == 2:
+ if argc == 2:
# it is a instance method
return func(_create_untouchable("obj"), source)
- else:
- raise RuntimeError("Transformation function has wrong number of arguments")
+ raise RuntimeError("Transformation function has wrong number of arguments")
except ValueError as e:
if len(e.args) > 0 and isinstance(e.args[0], str):
msg = e.args[0]
@@ -616,15 +605,14 @@ class ObjectMapper:
worry about a different BaseSchema class, when we want to have dynamically renamed fields.
"""
# As this is a delegated constructor, we must ignore protected access warnings
- # pylint: disable=protected-access
# sanity check
if not isinstance(source, (BaseSchema, dict)): # type: ignore
raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
# construct lower level schema first if configured to do so
- if obj._LAYER is not None:
- source = obj._LAYER(source, object_path=object_path) # pylint: disable=not-callable
+ if obj._LAYER is not None: # noqa: SLF001
+ source = obj._LAYER(source, object_path=object_path) # pylint: disable=not-callable # noqa: SLF001
# assign fields
used_keys = self._assign_fields(obj, source, object_path)
@@ -641,7 +629,7 @@ class ObjectMapper:
# validate the constructed value
try:
- obj._validate()
+ obj._validate() # noqa: SLF001
except ValueError as e:
raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path or "/") from e
@@ -723,10 +711,9 @@ class BaseSchema(Serializable):
def get_unparsed_data(self) -> Dict[str, Any]:
if isinstance(self.__source, BaseSchema):
return self.__source.get_unparsed_data()
- elif isinstance(self.__source, Renamed):
+ if isinstance(self.__source, Renamed):
return self.__source.original()
- else:
- return self.__source
+ return self.__source
def __getitem__(self, key: str) -> Any:
if not hasattr(self, key):
diff --git a/python/knot_resolver/utils/modeling/exceptions.py b/python/knot_resolver/utils/modeling/exceptions.py
index 478f5488..ef6df357 100644
--- a/python/knot_resolver/utils/modeling/exceptions.py
+++ b/python/knot_resolver/utils/modeling/exceptions.py
@@ -36,8 +36,8 @@ class DataValidationError(DataModelingBaseException):
indentation_level += 1
msg_parts.append("Configuration validation error detected:")
- INDENT = indentation_level * "\t"
- msg_parts.append(f"{INDENT}{self.msg()}")
+ indent = indentation_level * "\t"
+ msg_parts.append(f"{indent}{self.msg()}")
for c in self._child_exceptions:
msg_parts.append(c.recursive_msg(indentation_level + 1))
diff --git a/python/knot_resolver/utils/modeling/json_pointer.py b/python/knot_resolver/utils/modeling/json_pointer.py
index a60ba5d1..20801059 100644
--- a/python/knot_resolver/utils/modeling/json_pointer.py
+++ b/python/knot_resolver/utils/modeling/json_pointer.py
@@ -56,25 +56,24 @@ class _JSONPtr:
f"JSON pointer cannot reference nested non-existent object: object at ptr '{current_ptr}' already points to None, cannot nest deeper with token '{token}'"
)
- elif isinstance(current, (bool, int, float, str)):
+ if isinstance(current, (bool, int, float, str)):
raise ValueError(f"object at '{current_ptr}' is a scalar, JSON pointer cannot point into it")
- else:
- parent = current
- if isinstance(current, list):
- if token == "-":
- current = None
- else:
- try:
- token = int(token)
- current = current[token]
- except ValueError as e:
- raise ValueError(
- f"invalid JSON pointer: list '{current_ptr}' require numbers as keys, instead got '{token}'"
- ) from e
-
- elif isinstance(current, dict):
- current = current.get(token, None)
+ parent = current
+ if isinstance(current, list):
+ if token == "-":
+ current = None
+ else:
+ try:
+ token_num = int(token)
+ current = current[token_num]
+ except ValueError as e:
+ raise ValueError(
+ f"invalid JSON pointer: list '{current_ptr}' require numbers as keys, instead got '{token}'"
+ ) from e
+
+ elif isinstance(current, dict):
+ current = current.get(token, None)
current_ptr += f"/{token}"
diff --git a/python/knot_resolver/utils/modeling/parsing.py b/python/knot_resolver/utils/modeling/parsing.py
index 185a53a1..593f6ca1 100644
--- a/python/knot_resolver/utils/modeling/parsing.py
+++ b/python/knot_resolver/utils/modeling/parsing.py
@@ -58,10 +58,9 @@ class DataFormat(Enum):
# RaiseDuplicatesLoader extends yaml.SafeLoader, so this should be safe
# https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
- elif self is DataFormat.JSON:
+ if self is DataFormat.JSON:
return renamed(json.loads(text, object_pairs_hook=_json_raise_duplicates))
- else:
- raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
+ raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
def dict_dump(self, data: Union[Dict[str, Any], Renamed], indent: Optional[int] = None) -> str:
if isinstance(data, Renamed):
@@ -69,10 +68,9 @@ class DataFormat(Enum):
if self is DataFormat.YAML:
return yaml.safe_dump(data, indent=indent) # type: ignore
- elif self is DataFormat.JSON:
+ if self is DataFormat.JSON:
return json.dumps(data, indent=indent)
- else:
- raise NotImplementedError(f"Exporting to '{self}' format is not implemented")
+ raise NotImplementedError(f"Exporting to '{self}' format is not implemented")
def parse_yaml(data: str) -> Any:
@@ -96,4 +94,4 @@ def try_to_parse(data: str) -> Any:
# and we may not know which one is the actual one.
raise DataParsingError( # pylint: disable=raise-missing-from
f"failed to parse data, JSON: {je}, YAML: {ye}"
- )
+ ) from ye
diff --git a/python/knot_resolver/utils/modeling/query.py b/python/knot_resolver/utils/modeling/query.py
index 2e378609..520190e9 100644
--- a/python/knot_resolver/utils/modeling/query.py
+++ b/python/knot_resolver/utils/modeling/query.py
@@ -48,7 +48,7 @@ class AddOp(Op):
assert isinstance(token, int)
parent.insert(token, self.value)
else:
- assert False, "never happens"
+ raise AssertionError("never happens")
return fakeroot
@@ -95,8 +95,7 @@ class MoveOp(Op):
newobj = copy.deepcopy(obj)
fakeroot = RemoveOp({"op": "remove", "path": self.source}).eval(fakeroot)
- fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
- return fakeroot
+ return AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
class CopyOp(Op):
@@ -113,8 +112,7 @@ class CopyOp(Op):
_parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
newobj = copy.deepcopy(obj)
- fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
- return fakeroot
+ return AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
class TestOp(Op):
@@ -152,11 +150,11 @@ def query(
parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
return fakeroot["root"], obj
- elif method == "delete":
+ if method == "delete":
fakeroot = RemoveOp({"op": "remove", "path": ptr}).eval(fakeroot)
return fakeroot["root"], None
- elif method == "put":
+ if method == "put":
parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
assert parent is not None # we know this due to the fakeroot
if isinstance(parent, list) and token == "-":
@@ -165,7 +163,7 @@ def query(
parent[token] = payload
return fakeroot["root"], None
- elif method == "patch":
+ if method == "patch":
tp = List[Union[AddOp, RemoveOp, MoveOp, CopyOp, TestOp, ReplaceOp]]
transaction: tp = map_object(tp, payload)
@@ -177,5 +175,4 @@ def query(
return fakeroot["root"], None
- else:
- assert False, "invalid operation, never happens"
+ raise AssertionError("invalid operation, never happens")
diff --git a/python/knot_resolver/utils/modeling/renaming.py b/python/knot_resolver/utils/modeling/renaming.py
index 2420ed04..fd20adf9 100644
--- a/python/knot_resolver/utils/modeling/renaming.py
+++ b/python/knot_resolver/utils/modeling/renaming.py
@@ -81,10 +81,9 @@ class RenamedList(List[V], Renamed): # type: ignore
def renamed(obj: Any) -> Any:
if isinstance(obj, dict):
return RenamedDict(**obj)
- elif isinstance(obj, list):
+ if isinstance(obj, list):
return RenamedList(obj)
- else:
- return obj
+ return obj
__all__ = ["renamed", "Renamed"]
diff --git a/python/knot_resolver/utils/modeling/types.py b/python/knot_resolver/utils/modeling/types.py
index c7452672..110201e8 100644
--- a/python/knot_resolver/utils/modeling/types.py
+++ b/python/knot_resolver/utils/modeling/types.py
@@ -42,8 +42,7 @@ def is_union(tp: Any) -> bool:
def is_literal(tp: Any) -> bool:
if sys.version_info.minor == 6:
return isinstance(tp, type(Literal))
- else:
- return getattr(tp, "__origin__", None) == Literal
+ return getattr(tp, "__origin__", None) == Literal
def is_generic_type_wrapper(tp: Any) -> bool:
@@ -55,8 +54,7 @@ def get_generic_type_arguments(tp: Any) -> List[Any]:
default: List[Any] = []
if sys.version_info.minor == 6 and is_literal(tp):
return getattr(tp, "__values__")
- else:
- return getattr(tp, "__args__", default)
+ return getattr(tp, "__args__", default)
def get_generic_type_argument(tp: Any) -> Any: