summaryrefslogtreecommitdiffstats
path: root/manager/knot_resolver_manager
diff options
context:
space:
mode:
authorVasek Sraier <git@vakabus.cz>2022-11-14 11:09:19 +0100
committerVasek Sraier <git@vakabus.cz>2022-12-03 11:31:10 +0100
commit243ede9308e22fccb451b0686a8948c30cd30c1d (patch)
tree2a3a51033f83b10d272bf12ae8423303e384ae22 /manager/knot_resolver_manager
parentdocs: API and manager dev notes (diff)
downloadknot-resolver-243ede9308e22fccb451b0686a8948c30cd30c1d.tar.xz
knot-resolver-243ede9308e22fccb451b0686a8948c30cd30c1d.zip
manager: masive API and modelling updates
- got rid of ParsedTree class because it did too many things at once - introduced Renamed family of data structures (RenamedList, RenamedDict) - split out etag generation into standalone procedure - split out query() into standalone procedure - modelling: changed BaseSchema to NoRenameBaseSchema, which works on normal dicts and lists (no ParsedTree dependency) - modelling: introduced new BaseSchema (for backwards compatibility) which uses Renamed wrappers to handle configuration renaming - added json pointer implementation (https://www.rfc-editor.org/rfc/rfc6901) - API: - got rid of QueryTree class as it was too complicated - completely rewrote query() to use JSON pointer and JSON Patch (https://datatracker.ietf.org/doc/html/rfc6902/)
Diffstat (limited to 'manager/knot_resolver_manager')
-rw-r--r--manager/knot_resolver_manager/cli/cmd/config.py2
-rw-r--r--manager/knot_resolver_manager/datamodel/logging_schema.py4
-rw-r--r--manager/knot_resolver_manager/server.py37
-rw-r--r--manager/knot_resolver_manager/utils/etag.py10
-rw-r--r--manager/knot_resolver_manager/utils/modeling/README.md2
-rw-r--r--manager/knot_resolver_manager/utils/modeling/__init__.py5
-rw-r--r--manager/knot_resolver_manager/utils/modeling/base_schema.py519
-rw-r--r--manager/knot_resolver_manager/utils/modeling/json_pointer.py89
-rw-r--r--manager/knot_resolver_manager/utils/modeling/parsing.py88
-rw-r--r--manager/knot_resolver_manager/utils/modeling/query.py365
-rw-r--r--manager/knot_resolver_manager/utils/modeling/renaming.py70
11 files changed, 627 insertions, 564 deletions
diff --git a/manager/knot_resolver_manager/cli/cmd/config.py b/manager/knot_resolver_manager/cli/cmd/config.py
index 029b6f8a..2646c844 100644
--- a/manager/knot_resolver_manager/cli/cmd/config.py
+++ b/manager/knot_resolver_manager/cli/cmd/config.py
@@ -40,7 +40,7 @@ class ConfigCmd(Command):
if not self.path.startswith("/"):
self.path = "/" + self.path
- method: Literal["GET", "POST"] = "GET" if self.replacement_value is None else "POST"
+ method: Literal["GET", "PUT"] = "GET" if self.replacement_value is None else "PUT"
url = f"{args.socket}/v1/config{self.path}"
response = request(method, url, self.replacement_value)
print(response)
diff --git a/manager/knot_resolver_manager/datamodel/logging_schema.py b/manager/knot_resolver_manager/datamodel/logging_schema.py
index daa1397a..1a107019 100644
--- a/manager/knot_resolver_manager/datamodel/logging_schema.py
+++ b/manager/knot_resolver_manager/datamodel/logging_schema.py
@@ -5,7 +5,7 @@ from typing_extensions import Literal
from knot_resolver_manager.datamodel.types import CheckedPath, TimeUnit
from knot_resolver_manager.utils.modeling import BaseSchema
-from knot_resolver_manager.utils.modeling.base_schema import is_obj_type_Valid
+from knot_resolver_manager.utils.modeling.base_schema import is_obj_type_valid
try:
# On Debian 10, the typing_extensions library does not contain TypeAlias.
@@ -132,7 +132,7 @@ class LoggingSchema(BaseSchema):
def _target(self, raw: Raw) -> LogTargetEnum:
if raw.target == "from-env":
target = os.environ.get("KRES_LOGGING_TARGET") or "stdout"
- if not is_obj_type_Valid(target, cast(Type[Any], LogTargetEnum)):
+ if not is_obj_type_valid(target, cast(Type[Any], LogTargetEnum)):
raise ValueError(f"logging target '{target}' read from $KRES_LOGGING_TARGET is invalid")
return cast(LogTargetEnum, target)
else:
diff --git a/manager/knot_resolver_manager/server.py b/manager/knot_resolver_manager/server.py
index c9d3892c..175d522a 100644
--- a/manager/knot_resolver_manager/server.py
+++ b/manager/knot_resolver_manager/server.py
@@ -7,7 +7,7 @@ import sys
from http import HTTPStatus
from pathlib import Path
from time import time
-from typing import Any, List, Optional, Set, Union, cast
+from typing import Any, Dict, List, Optional, Set, Union, cast
from aiohttp import web
from aiohttp.web import middleware
@@ -27,9 +27,11 @@ from knot_resolver_manager.exceptions import CancelStartupExecInsteadException,
from knot_resolver_manager.kresd_controller import get_best_controller_implementation
from knot_resolver_manager.utils import ignore_exceptions_optional
from knot_resolver_manager.utils.async_utils import readfile
+from knot_resolver_manager.utils.etag import structural_etag
from knot_resolver_manager.utils.functional import Result
-from knot_resolver_manager.utils.modeling import ParsedTree, parse, parse_yaml
from knot_resolver_manager.utils.modeling.exceptions import DataParsingError, DataValidationError
+from knot_resolver_manager.utils.modeling.parsing import parse, parse_yaml
+from knot_resolver_manager.utils.modeling.query import query
from knot_resolver_manager.utils.modeling.types import NoneType
from knot_resolver_manager.utils.systemd_notify import systemd_notify
@@ -172,14 +174,14 @@ class Server:
# parse the incoming data
if request.method == "GET":
- update_with: Optional[ParsedTree] = None
+ update_with: Optional[Dict[str, Any]] = None
else:
update_with = parse(await request.text(), request.content_type)
document_path = request.match_info["path"]
getheaders = ignore_exceptions_optional(List[str], None, KeyError)(request.headers.getall)
etags = getheaders("if-match")
not_etags = getheaders("if-none-match")
- current_config: ParsedTree = self.config_store.get().get_unparsed_data()
+ current_config: Dict[str, Any] = self.config_store.get().get_unparsed_data()
# stop processing if etags
def strip_quotes(s: str) -> str:
@@ -188,14 +190,14 @@ class Server:
# WARNING: this check is prone to race conditions. When changing, make sure that the current config
# is really the latest current config (i.e. no await in between obtaining the config and the checks)
status = HTTPStatus.NOT_MODIFIED if request.method in ("GET", "HEAD") else HTTPStatus.PRECONDITION_FAILED
- if etags is not None and current_config.etag not in map(strip_quotes, etags):
+ if etags is not None and structural_etag(current_config) not in map(strip_quotes, etags):
return web.Response(status=status)
- if not_etags is not None and current_config.etag in map(strip_quotes, not_etags):
+ if not_etags is not None and structural_etag(current_config) in map(strip_quotes, not_etags):
return web.Response(status=status)
# run query
- op = cast(Literal["get", "post", "delete", "patch", "put"], request.method.lower())
- new_config, to_return = current_config.query(op, document_path, update_with)
+ op = cast(Literal["get", "delete", "patch", "put"], request.method.lower())
+ new_config, to_return = query(current_config, op, document_path, update_with)
# update the config
if request.method != "GET":
@@ -207,7 +209,7 @@ class Server:
# return success
resp_text: Optional[str] = str(to_return) if to_return is not None else None
res = web.Response(status=HTTPStatus.OK, text=resp_text, content_type="application/json")
- res.headers.add("ETag", f'"{new_config.etag}"')
+ res.headers.add("ETag", f'"{structural_etag(new_config)}"')
return res
async def _handler_metrics(self, _request: web.Request) -> web.Response:
@@ -262,11 +264,10 @@ class Server:
self.app.add_routes(
[
web.get("/", self._handler_index),
- web.post(r"/v1/config{path:.*}", self._handler_config_query),
- web.put(r"/v1/config{path:.*}", self._handler_config_query),
- web.patch(r"/v1/config{path:.*}", self._handler_config_query),
web.get(r"/v1/config{path:.*}", self._handler_config_query),
+ web.put(r"/v1/config{path:.*}", self._handler_config_query),
web.delete(r"/v1/config{path:.*}", self._handler_config_query),
+ web.patch(r"/v1/config{path:.*}", self._handler_config_query),
web.post("/stop", self._handler_stop),
web.get("/schema", self._handler_schema),
web.get("/schema/ui", self._handle_view_schema),
@@ -318,7 +319,7 @@ class Server:
return self._exit_code
-async def _load_raw_config(config: Union[Path, ParsedTree]) -> ParsedTree:
+async def _load_raw_config(config: Union[Path, Dict[str, Any]]) -> Dict[str, Any]:
# Initial configuration of the manager
if isinstance(config, Path):
if not config.exists():
@@ -330,17 +331,17 @@ async def _load_raw_config(config: Union[Path, ParsedTree]) -> ParsedTree:
config = parse_yaml(await readfile(config))
# validate the initial configuration
- assert isinstance(config, ParsedTree)
+ assert isinstance(config, dict)
return config
-async def _load_config(config: ParsedTree) -> KresConfig:
+async def _load_config(config: Dict[str, Any]) -> KresConfig:
logger.info("Validating initial configuration...")
config_validated = KresConfig(config)
return config_validated
-async def _init_config_store(config: ParsedTree) -> ConfigStore:
+async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
config_validated = await _load_config(config)
config_store = ConfigStore(config_validated)
return config_store
@@ -369,7 +370,7 @@ async def _deny_working_directory_changes(config_old: KresConfig, config_new: Kr
return Result.ok(None)
-def _set_working_directory(config_raw: ParsedTree) -> None:
+def _set_working_directory(config_raw: Dict[str, Any]) -> None:
config = KresConfig(config_raw)
if not config.rundir.to_path().exists():
@@ -428,7 +429,7 @@ async def _sigterm_while_shutting_down():
sys.exit(128 + signal.SIGTERM)
-async def start_server(config: Union[Path, ParsedTree] = DEFAULT_MANAGER_CONFIG_FILE) -> int:
+async def start_server(config: Union[Path, Dict[str, Any]] = DEFAULT_MANAGER_CONFIG_FILE) -> int:
# This function is quite long, but it describes how manager runs. So let's silence pylint
# pylint: disable=too-many-statements
diff --git a/manager/knot_resolver_manager/utils/etag.py b/manager/knot_resolver_manager/utils/etag.py
new file mode 100644
index 00000000..bb80700b
--- /dev/null
+++ b/manager/knot_resolver_manager/utils/etag.py
@@ -0,0 +1,10 @@
+import base64
+import json
+from hashlib import blake2b
+from typing import Any
+
+
+def structural_etag(obj: Any) -> str:
+ m = blake2b(digest_size=15)
+ m.update(json.dumps(obj, sort_keys=True).encode("utf8"))
+ return base64.urlsafe_b64encode(m.digest()).decode("utf8")
diff --git a/manager/knot_resolver_manager/utils/modeling/README.md b/manager/knot_resolver_manager/utils/modeling/README.md
index 5fdaf0b9..eec99e33 100644
--- a/manager/knot_resolver_manager/utils/modeling/README.md
+++ b/manager/knot_resolver_manager/utils/modeling/README.md
@@ -141,7 +141,7 @@ simple-schema:
```
To parse data from YAML format just use `parse_yaml` function or `parse_json` for JSON format.
-Parsed data are represented as `ParsedTree` which is a simple wrapper for dict-like object that takes care of `-`/`_` conversion.
+Parsed data are stored in a dict-like object that takes care of `-`/`_` conversion.
```python
from .modeling import parse_yaml
diff --git a/manager/knot_resolver_manager/utils/modeling/__init__.py b/manager/knot_resolver_manager/utils/modeling/__init__.py
index d174b7ce..3db537dd 100644
--- a/manager/knot_resolver_manager/utils/modeling/__init__.py
+++ b/manager/knot_resolver_manager/utils/modeling/__init__.py
@@ -1,14 +1,11 @@
from .base_schema import BaseSchema
from .base_value_type import BaseValueType
-from .parsing import ParsedTree, parse, parse_json, parse_yaml
-from .query import QueryTree
+from .parsing import parse, parse_json, parse_yaml
__all__ = [
"BaseValueType",
"BaseSchema",
- "ParsedTree",
"parse",
"parse_yaml",
"parse_json",
- "QueryTree",
]
diff --git a/manager/knot_resolver_manager/utils/modeling/base_schema.py b/manager/knot_resolver_manager/utils/modeling/base_schema.py
index ea0c6b99..17f003ee 100644
--- a/manager/knot_resolver_manager/utils/modeling/base_schema.py
+++ b/manager/knot_resolver_manager/utils/modeling/base_schema.py
@@ -1,6 +1,6 @@
import enum
import inspect
-from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union, cast
+from typing import Any, Dict, List, Optional, Set, Tuple, Type, TypeVar, Union, cast
import yaml
@@ -8,7 +8,7 @@ from knot_resolver_manager.utils.functional import all_matches
from .base_value_type import BaseValueType
from .exceptions import AggregateDataValidationError, DataDescriptionError, DataValidationError
-from .parsing import ParsedTree
+from .renaming import Renamed, renamed
from .types import (
NoneType,
get_generic_type_argument,
@@ -185,217 +185,7 @@ def _describe_type(typ: Type[Any]) -> Dict[Any, Any]:
raise NotImplementedError(f"Trying to get JSON schema for type '{typ}', which is not implemented")
-def _validated_tuple(cls: Type[Any], obj: Tuple[Any, ...], object_path: str) -> Tuple[Any, ...]:
- types = get_generic_type_arguments(cls)
- errs: List[DataValidationError] = []
- res: List[Any] = []
- for i, (tp, val) in enumerate(zip(types, obj)):
- try:
- res.append(_validated_object_type(tp, val, object_path=f"{object_path}[{i}]"))
- except DataValidationError as e:
- errs.append(e)
- if len(errs) == 1:
- raise errs[0]
- elif len(errs) > 1:
- raise AggregateDataValidationError(object_path, child_exceptions=errs)
- return tuple(res)
-
-
-def _validated_dict(cls: Type[Any], obj: Dict[Any, Any], object_path: str) -> Dict[Any, Any]:
- key_type, val_type = get_generic_type_arguments(cls)
- try:
- errs: List[DataValidationError] = []
- res: Dict[Any, Any] = {}
- for key, val in obj.items():
- try:
- nkey = _validated_object_type(key_type, key, object_path=f"{object_path}[{key}]")
- nval = _validated_object_type(val_type, val, object_path=f"{object_path}[{key}]")
- res[nkey] = nval
- except DataValidationError as e:
- errs.append(e)
- if len(errs) == 1:
- raise errs[0]
- elif len(errs) > 1:
- raise AggregateDataValidationError(object_path, child_exceptions=errs)
- return res
- except AttributeError as e:
- raise DataValidationError(
- f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
- ) from e
-
-
-def _validated_list(cls: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
- inner_type = get_generic_type_argument(cls)
- errs: List[DataValidationError] = []
- res: List[Any] = []
- for i, val in enumerate(obj):
- try:
- res.append(_validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
- except DataValidationError as e:
- errs.append(e)
- if len(errs) == 1:
- raise errs[0]
- elif len(errs) > 1:
- raise AggregateDataValidationError(object_path, child_exceptions=errs)
- return res
-
-
-def _validated_object_type(
- cls: Type[Any], obj: Any, default: Any = ..., use_default: bool = False, object_path: str = "/"
-) -> Any:
- """
- Given an expected type `cls` and a value object `obj`, validate the type of `obj` and return it
- """
-
- # Disabling these checks, because I think it's much more readable as a single function
- # and it's not that large at this point. If it got larger, then we should definitely split it
- # pylint: disable=too-many-branches,too-many-locals,too-many-statements
-
- # default values
- if obj is None and use_default:
- return default
-
- # NoneType
- elif is_none_type(cls):
- if obj is None:
- return None
- else:
- raise DataValidationError(f"expected None, found '{obj}'.", object_path)
-
- # Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
- elif is_optional(cls):
- inner: Type[Any] = get_optional_inner_type(cls)
- if obj is None:
- return None
- else:
- return _validated_object_type(inner, obj, object_path=object_path)
-
- # Union[*variants]
- elif is_union(cls):
- variants = get_generic_type_arguments(cls)
- errs: List[DataValidationError] = []
- for v in variants:
- try:
- return _validated_object_type(v, obj, object_path=object_path)
- except DataValidationError as e:
- errs.append(e)
-
- raise DataValidationError("could not parse any of the possible variants", object_path, child_exceptions=errs)
-
- # after this, there is no place for a None object
- elif obj is None:
- raise DataValidationError(f"unexpected value 'None' for type {cls}", object_path)
-
- # int
- elif cls == int:
- # we don't want to make an int out of anything else than other int
- # except for BaseValueType class instances
- if is_obj_type(obj, int) or isinstance(obj, BaseValueType):
- return int(obj)
- raise DataValidationError(f"expected int, found {type(obj)}", object_path)
-
- # str
- elif cls == str:
- # we are willing to cast any primitive value to string, but no compound values are allowed
- if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
- return str(obj)
- elif is_obj_type(obj, bool):
- raise DataValidationError(
- "Expected str, found bool. Be careful, that YAML parsers consider even"
- ' "no" and "yes" as a bool. Search for the Norway Problem for more'
- " details. And please use quotes explicitly.",
- object_path,
- )
- else:
- raise DataValidationError(
- f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
- )
-
- # bool
- elif cls == bool:
- if is_obj_type(obj, bool):
- return obj
- else:
- raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
-
- # float
- elif cls == float:
- raise NotImplementedError(
- "Floating point values are not supported in the parser."
- " Please implement them and be careful with type coercions"
- )
-
- # Literal[T]
- elif is_literal(cls):
- expected = get_generic_type_arguments(cls)
- if obj in expected:
- return obj
- else:
- raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
-
- # Dict[K,V]
- elif is_dict(cls):
- return _validated_dict(cls, obj, object_path)
-
- # any Enums (probably used only internally in DataValidator)
- elif is_enum(cls):
- if isinstance(obj, cls):
- return obj
- else:
- raise DataValidationError(f"unexpected value '{obj}' for enum '{cls}'", object_path)
-
- # List[T]
- elif is_list(cls):
- if isinstance(obj, str):
- raise DataValidationError("expected list, got string", object_path)
- return _validated_list(cls, obj, object_path)
-
- # Tuple[A,B,C,D,...]
- elif is_tuple(cls):
- return _validated_tuple(cls, obj, object_path)
-
- # type of obj and cls type match
- elif is_obj_type(obj, cls):
- return obj
-
- # BaseValueType subclasses
- elif inspect.isclass(cls) and issubclass(cls, BaseValueType):
- if isinstance(obj, cls):
- # if we already have a custom value type, just pass it through
- return obj
- else:
- # no validation performed, the implementation does it in the constuctor
- try:
- return cls(obj, object_path=object_path)
- except ValueError as e:
- if len(e.args) > 0 and isinstance(e.args[0], str):
- msg = e.args[0]
- else:
- msg = f"Failed to validate value against {cls} type"
- raise DataValidationError(msg, object_path) from e
-
- # nested BaseSchema subclasses
- elif inspect.isclass(cls) and issubclass(cls, BaseSchema):
- # we should return DataParser, we expect to be given a dict,
- # because we can construct a DataParser from it
- if isinstance(obj, (dict, BaseSchema)):
- return cls(obj, object_path=object_path) # type: ignore
- raise DataValidationError(f"expected 'dict' or 'BaseSchema' object, found '{type(obj)}'", object_path)
-
- # if the object matches, just pass it through
- elif inspect.isclass(cls) and isinstance(obj, cls):
- return obj
-
- # default error handler
- else:
- raise DataValidationError(
- f"Type {cls} cannot be parsed. This is a implementation error. "
- "Please fix your types in the class or improve the parser/validator.",
- object_path,
- )
-
-
-TSource = Union[NoneType, ParsedTree, "BaseSchema", Dict[str, Any]]
+TSource = Union[NoneType, "NoRenameBaseSchema", Dict[str, Any]]
def _create_untouchable(name: str) -> object:
@@ -409,7 +199,7 @@ def _create_untouchable(name: str) -> object:
return _Untouchable()
-class BaseSchema(Serializable):
+class NoRenameBaseSchema(Serializable):
"""
Base class for modeling configuration schema. It somewhat resembles standard dataclasses with additional
functionality:
@@ -418,8 +208,8 @@ class BaseSchema(Serializable):
* data conversion
To create an instance of this class, you have to provide source data in the form of dict-like object.
- Generally, we expect `ParsedTree`, raw dict or another `BaseSchema` instance. The provided data object
- is traversed, transformed and validated before assigned to the appropriate fields (attributes).
+ Generally, raw dict or another `BaseSchema` instance. The provided data object is traversed, transformed
+ and validated before assigned to the appropriate fields (attributes).
Fields (attributes)
===================
@@ -472,19 +262,19 @@ class BaseSchema(Serializable):
See tests/utils/test_modelling.py for example usage.
"""
- _LAYER: Optional[Type["BaseSchema"]] = None
+ _LAYER: Optional[Type["NoRenameBaseSchema"]] = None
def _assign_default(self, name: str, python_type: Any, object_path: str) -> None:
cls = self.__class__
default = getattr(cls, name, None)
- value = _validated_object_type(python_type, default, object_path=f"{object_path}/{name}")
+ value = type(self).validated_object_type(python_type, default, object_path=f"{object_path}/{name}")
setattr(self, name, value)
def _assign_field(self, name: str, python_type: Any, value: Any, object_path: str) -> None:
- value = _validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
+ value = type(self).validated_object_type(python_type, value, object_path=f"{object_path}/{name}")
setattr(self, name, value)
- def _assign_fields(self, source: Union[ParsedTree, "BaseSchema", NoneType], object_path: str) -> Set[str]:
+ def _assign_fields(self, source: Union[Dict[str, Any], "NoRenameBaseSchema", None], object_path: str) -> Set[str]:
"""
Order of assignment:
1. all direct assignments
@@ -542,26 +332,23 @@ class BaseSchema(Serializable):
def __init__(self, source: TSource = None, object_path: str = ""):
# make sure that all raw data checks passed on the source object
if source is None:
- source = ParsedTree({})
- if isinstance(source, dict):
- source = ParsedTree(source)
+ source = {}
- # save source
- self._source: Union[ParsedTree, BaseSchema] = source
+ if not isinstance(source, (NoRenameBaseSchema, dict)):
+ raise DataValidationError(f"expected dict-like object, found '{type(source)}'", object_path)
+
+ # save source (3 underscores to prevent collisions with any user defined conversion methods or system methods)
+ self.___source: Union[Dict[str, Any], NoRenameBaseSchema] = source
# construct lower level schema first if configured to do so
if self._LAYER is not None:
source = self._LAYER(source, object_path=object_path) # pylint: disable=not-callable
- # prevent failure when user provides a different type than object
- if isinstance(source, ParsedTree) and not source.is_dict():
- raise DataValidationError(f"expected object, found '{source.type()}'", object_path)
-
# assign fields
used_keys = self._assign_fields(source, object_path)
# check for unused keys in the source object
- if source and not isinstance(source, BaseSchema):
+ if source and not isinstance(source, NoRenameBaseSchema):
unused = source.keys() - used_keys
if len(unused) > 0:
keys = ", ".join((f"'{u}'" for u in unused))
@@ -576,11 +363,13 @@ class BaseSchema(Serializable):
except ValueError as e:
raise DataValidationError(e.args[0] if len(e.args) > 0 else "Validation error", object_path) from e
- def get_unparsed_data(self) -> ParsedTree:
- if isinstance(self._source, BaseSchema):
- return self._source.get_unparsed_data()
+ def get_unparsed_data(self) -> Dict[str, Any]:
+ if isinstance(self.___source, NoRenameBaseSchema):
+ return self.___source.get_unparsed_data()
+ elif isinstance(self.___source, Renamed):
+ return self.___source.original()
else:
- return self._source
+ return self.___source
def _get_converted_value(self, key: str, source: TSource, object_path: str) -> Any:
"""
@@ -630,7 +419,7 @@ class BaseSchema(Serializable):
return True
@classmethod
- def json_schema(cls: Type["BaseSchema"], include_schema_definition: bool = True) -> Dict[Any, Any]:
+ def json_schema(cls: Type["NoRenameBaseSchema"], include_schema_definition: bool = True) -> Dict[Any, Any]:
if cls._LAYER is not None:
return cls._LAYER.json_schema(include_schema_definition=include_schema_definition)
@@ -653,14 +442,270 @@ class BaseSchema(Serializable):
res[name] = Serializable.serialize(getattr(self, name))
return res
+ @classmethod
+ def _validated_tuple(
+ cls: Type["NoRenameBaseSchema"], tp: Type[Any], obj: Tuple[Any, ...], object_path: str
+ ) -> Tuple[Any, ...]:
+ types = get_generic_type_arguments(tp)
+ errs: List[DataValidationError] = []
+ res: List[Any] = []
+ for i, (t, val) in enumerate(zip(types, obj)):
+ try:
+ res.append(cls.validated_object_type(t, val, object_path=f"{object_path}[{i}]"))
+ except DataValidationError as e:
+ errs.append(e)
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ return tuple(res)
-def is_obj_type_Valid(obj: Any, tp: Type[Any]) -> bool:
+ @classmethod
+ def _validated_dict(
+ cls: Type["NoRenameBaseSchema"], tp: Type[Any], obj: Dict[Any, Any], object_path: str
+ ) -> Dict[Any, Any]:
+ key_type, val_type = get_generic_type_arguments(tp)
+ try:
+ errs: List[DataValidationError] = []
+ res: Dict[Any, Any] = {}
+ for key, val in obj.items():
+ try:
+ nkey = cls.validated_object_type(key_type, key, object_path=f"{object_path}[{key}]")
+ nval = cls.validated_object_type(val_type, val, object_path=f"{object_path}[{key}]")
+ res[nkey] = nval
+ except DataValidationError as e:
+ errs.append(e)
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ return res
+ except AttributeError as e:
+ raise DataValidationError(
+ f"Expected dict-like object, but failed to access its .items() method. Value was {obj}", object_path
+ ) from e
+
+ @classmethod
+ def _validated_list(cls: Type["NoRenameBaseSchema"], tp: Type[Any], obj: List[Any], object_path: str) -> List[Any]:
+ inner_type = get_generic_type_argument(tp)
+ errs: List[DataValidationError] = []
+ res: List[Any] = []
+ for i, val in enumerate(obj):
+ try:
+ res.append(cls.validated_object_type(inner_type, val, object_path=f"{object_path}[{i}]"))
+ except DataValidationError as e:
+ errs.append(e)
+ if len(errs) == 1:
+ raise errs[0]
+ elif len(errs) > 1:
+ raise AggregateDataValidationError(object_path, child_exceptions=errs)
+ return res
+
+ @classmethod
+ def validated_object_type(
+ cls: Type["NoRenameBaseSchema"],
+ tp: Type[Any],
+ obj: Any,
+ default: Any = ...,
+ use_default: bool = False,
+ object_path: str = "/",
+ ) -> Any:
+ """
+ Given an expected type `cls` and a value object `obj`, validate the type of `obj` and return it
+ """
+
+ # Disabling these checks, because I think it's much more readable as a single function
+ # and it's not that large at this point. If it got larger, then we should definitely split it
+ # pylint: disable=too-many-branches,too-many-locals,too-many-statements
+
+ # default values
+ if obj is None and use_default:
+ return default
+
+ # NoneType
+ elif is_none_type(tp):
+ if obj is None:
+ return None
+ else:
+ raise DataValidationError(f"expected None, found '{obj}'.", object_path)
+
+ # Optional[T] (could be technically handled by Union[*variants], but this way we have better error reporting)
+ elif is_optional(tp):
+ inner: Type[Any] = get_optional_inner_type(tp)
+ if obj is None:
+ return None
+ else:
+ return cls.validated_object_type(inner, obj, object_path=object_path)
+
+ # Union[*variants]
+ elif is_union(tp):
+ variants = get_generic_type_arguments(tp)
+ errs: List[DataValidationError] = []
+ for v in variants:
+ try:
+ return cls.validated_object_type(v, obj, object_path=object_path)
+ except DataValidationError as e:
+ errs.append(e)
+
+ raise DataValidationError(
+ "could not parse any of the possible variants", object_path, child_exceptions=errs
+ )
+
+ # after this, there is no place for a None object
+ elif obj is None:
+ raise DataValidationError(f"unexpected value 'None' for type {tp}", object_path)
+
+ # int
+ elif tp == int:
+ # we don't want to make an int out of anything else than other int
+ # except for BaseValueType class instances
+ if is_obj_type(obj, int) or isinstance(obj, BaseValueType):
+ return int(obj)
+ raise DataValidationError(f"expected int, found {type(obj)}", object_path)
+
+ # str
+ elif tp == str:
+ # we are willing to cast any primitive value to string, but no compound values are allowed
+ if is_obj_type(obj, (str, float, int)) or isinstance(obj, BaseValueType):
+ return str(obj)
+ elif is_obj_type(obj, bool):
+ raise DataValidationError(
+ "Expected str, found bool. Be careful, that YAML parsers consider even"
+ ' "no" and "yes" as a bool. Search for the Norway Problem for more'
+ " details. And please use quotes explicitly.",
+ object_path,
+ )
+ else:
+ raise DataValidationError(
+ f"expected str (or number that would be cast to string), but found type {type(obj)}", object_path
+ )
+
+ # bool
+ elif tp == bool:
+ if is_obj_type(obj, bool):
+ return obj
+ else:
+ raise DataValidationError(f"expected bool, found {type(obj)}", object_path)
+
+ # float
+ elif tp == float:
+ raise NotImplementedError(
+ "Floating point values are not supported in the parser."
+ " Please implement them and be careful with type coercions"
+ )
+
+ # Literal[T]
+ elif is_literal(tp):
+ expected = get_generic_type_arguments(tp)
+ if obj in expected:
+ return obj
+ else:
+ raise DataValidationError(f"'{obj}' does not match any of the expected values {expected}", object_path)
+
+ # Dict[K,V]
+ elif is_dict(tp):
+ return cls._validated_dict(tp, obj, object_path)
+
+ # any Enums (probably used only internally in DataValidator)
+ elif is_enum(tp):
+ if isinstance(obj, tp):
+ return obj
+ else:
+ raise DataValidationError(f"unexpected value '{obj}' for enum '{tp}'", object_path)
+
+ # List[T]
+ elif is_list(tp):
+ if isinstance(obj, str):
+ raise DataValidationError("expected list, got string", object_path)
+ return cls._validated_list(tp, obj, object_path)
+
+ # Tuple[A,B,C,D,...]
+ elif is_tuple(tp):
+ return cls._validated_tuple(tp, obj, object_path)
+
+ # type of obj and cls type match
+ elif is_obj_type(obj, tp):
+ return obj
+
+ # when the specified type is Any, just return the given value
+ # (pylint does something weird on the following line and it happens only on python 3.10)
+ elif tp == Any: # pylint: disable=comparison-with-callable
+ return obj
+
+ # BaseValueType subclasses
+ elif inspect.isclass(tp) and issubclass(tp, BaseValueType):
+ if isinstance(obj, tp):
+ # if we already have a custom value type, just pass it through
+ return obj
+ else:
+ # no validation performed, the implementation does it in the constuctor
+ try:
+ return tp(obj, object_path=object_path)
+ except ValueError as e:
+ if len(e.args) > 0 and isinstance(e.args[0], str):
+ msg = e.args[0]
+ else:
+ msg = f"Failed to validate value against {tp} type"
+ raise DataValidationError(msg, object_path) from e
+
+ # nested BaseSchema subclasses
+ elif inspect.isclass(tp) and issubclass(tp, NoRenameBaseSchema):
+ # we should return DataParser, we expect to be given a dict,
+ # because we can construct a DataParser from it
+ if isinstance(obj, (dict, NoRenameBaseSchema)):
+ return tp(obj, object_path=object_path) # type: ignore
+ raise DataValidationError(
+ f"expected 'dict' or 'NoRenameBaseSchema' object, found '{type(obj)}'", object_path
+ )
+
+ # if the object matches, just pass it through
+ elif inspect.isclass(tp) and isinstance(obj, tp):
+ return obj
+
+ # default error handler
+ else:
+ raise DataValidationError(
+ f"Type {tp} cannot be parsed. This is a implementation error. "
+ "Please fix your types in the class or improve the parser/validator.",
+ object_path,
+ )
+
+
+def is_obj_type_valid(obj: Any, tp: Type[Any]) -> bool:
"""
Runtime type checking. Validate, that a given object is of a given type.
"""
try:
- _validated_object_type(tp, obj)
+ NoRenameBaseSchema.validated_object_type(tp, obj)
return True
except (DataValidationError, ValueError):
return False
+
+
+T = TypeVar("T")
+
+
+def load(cls: Type[T], obj: Any, default: Any = ..., use_default: bool = False) -> T:
+ return NoRenameBaseSchema.validated_object_type(cls, obj, default, use_default)
+
+
+class BaseSchema(NoRenameBaseSchema):
+ """
+ In Knot Resolver Manager, we need renamed keys most of the time, as we are using the modelling
+ tools mostly for configuration schema. That's why the normal looking name BaseSchema does renaming
+ and NoRenameBaseSchema is the opposite.
+ """
+
+ def __init__(self, source: TSource = None, object_path: str = ""):
+ if isinstance(source, dict):
+ source = renamed(source)
+ super().__init__(source, object_path)
+
+ @classmethod
+ def _validated_dict(
+ cls: Type["BaseSchema"], tp: Type[Any], obj: Dict[Any, Any], object_path: str
+ ) -> Dict[Any, Any]:
+ if isinstance(obj, Renamed):
+ obj = obj.original()
+ return super()._validated_dict(tp, obj, object_path)
diff --git a/manager/knot_resolver_manager/utils/modeling/json_pointer.py b/manager/knot_resolver_manager/utils/modeling/json_pointer.py
new file mode 100644
index 00000000..adbfa36d
--- /dev/null
+++ b/manager/knot_resolver_manager/utils/modeling/json_pointer.py
@@ -0,0 +1,89 @@
+"""
+Implements JSON pointer resolution based on RFC 6901:
+https://www.rfc-editor.org/rfc/rfc6901
+"""
+
+
+from typing import Any, Optional, Tuple, Union
+
+# JSONPtrAddressable = Optional[Union[Dict[str, "JSONPtrAddressable"], List["JSONPtrAddressable"], int, float, bool, str, None]]
+JSONPtrAddressable = Any # the recursive definition above is not valid :(
+
+
+class _JSONPtr:
+ @staticmethod
+ def _decode_token(token: str) -> str:
+ """
+ Resolves escaped characters ~ and /
+ """
+
+ # the order of the replace statements is important, do not change without
+ # consulting the RFC
+ return token.replace("~1", "/").replace("~0", "~")
+
+ @staticmethod
+ def _encode_token(token: str) -> str:
+ return token.replace("~", "~0").replace("/", "~1")
+
+ def __init__(self, ptr: str):
+ if ptr == "":
+ # pointer to the root
+ self.tokens = []
+
+ else:
+ if ptr[0] != "/":
+ raise SyntaxError(
+ f"JSON pointer '{ptr}' invalid: the first character MUST be '/' or the pointer must be empty"
+ )
+
+ ptr = ptr[1:]
+ self.tokens = [_JSONPtr._decode_token(tok) for tok in ptr.split("/")]
+
+ def resolve(
+ self, obj: JSONPtrAddressable
+ ) -> Tuple[Optional[JSONPtrAddressable], JSONPtrAddressable, Union[str, int, None]]:
+ """
+ Returns (Optional[parent], Optional[direct value], key of value in the parent object)
+ """
+
+ parent: Optional[JSONPtrAddressable] = None
+ current = obj
+ current_ptr = ""
+ token: Union[int, str, None] = None
+
+ for token in self.tokens:
+ if current is None:
+ raise ValueError(
+ f"JSON pointer cannot reference nested non-existent object: object at ptr '{current_ptr}' already points to None, cannot nest deeper with token '{token}'"
+ )
+
+ elif isinstance(current, (bool, int, float, str)):
+ raise ValueError(f"object at '{current_ptr}' is a scalar, JSON pointer cannot point into it")
+
+ else:
+ parent = current
+ if isinstance(current, list):
+ if token == "-":
+ current = None
+ else:
+ try:
+ token = int(token)
+ current = current[token]
+ except ValueError:
+ raise ValueError(
+ f"invalid JSON pointer: list '{current_ptr}' require numbers as keys, instead got '{token}'"
+ )
+
+ elif isinstance(current, dict):
+ current = current.get(token, None)
+
+ current_ptr += f"/{token}"
+
+ return parent, current, token
+
+
+def json_ptr_resolve(
+ obj: JSONPtrAddressable,
+ ptr: str,
+) -> Tuple[Optional[JSONPtrAddressable], Optional[JSONPtrAddressable], Union[str, int, None]]:
+ return _JSONPtr(ptr).resolve(obj)
diff --git a/manager/knot_resolver_manager/utils/modeling/parsing.py b/manager/knot_resolver_manager/utils/modeling/parsing.py
index fad582c2..9d55989b 100644
--- a/manager/knot_resolver_manager/utils/modeling/parsing.py
+++ b/manager/knot_resolver_manager/utils/modeling/parsing.py
@@ -1,82 +1,13 @@
-import base64
import json
from enum import Enum, auto
-from hashlib import blake2b
-from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
+from typing import Any, Dict, List, Optional, Tuple, Union
import yaml
-from typing_extensions import Literal
from yaml.constructor import ConstructorError
from yaml.nodes import MappingNode
-from knot_resolver_manager.utils.modeling.query import QueryTree
-
from .exceptions import DataParsingError
-
-
-class ParsedTree:
- """
- Simple wrapper for parsed data.
- Changes external naming convention (hyphen separator) to internal (snake_case) on the fly.
-
- IMMUTABLE, DO NOT MODIFY
- """
-
- @staticmethod
- def _convert_internal_field_name_to_external(name: Any) -> Any:
- if isinstance(name, str):
- return name.replace("_", "-")
- return name
-
- @staticmethod
- def _convert_external_field_name_to_internal(name: Any) -> Any:
- if isinstance(name, str):
- return name.replace("-", "_")
- return name
-
- def __init__(self, data: Union[Dict[str, Any], str, int, bool, List[Any]]):
- self._data = data
-
- def to_raw(self) -> Union[Dict[str, Any], str, int, bool, List[Any]]:
- return self._data
-
- def __getitem__(self, key: str) -> Any:
- assert isinstance(self._data, dict)
- return self._data[ParsedTree._convert_internal_field_name_to_external(key)]
-
- def is_dict(self) -> bool:
- return isinstance(self._data, dict)
-
- def type(self) -> Type[Any]:
- return type(self._data)
-
- def __contains__(self, key: str) -> bool:
- assert isinstance(self._data, dict)
- return ParsedTree._convert_internal_field_name_to_external(key) in self._data
-
- def __str__(self) -> str:
- return json.dumps(self._data, sort_keys=False, indent=2)
-
- def keys(self) -> Set[Any]:
- assert isinstance(self._data, dict)
- return {ParsedTree._convert_external_field_name_to_internal(key) for key in self._data.keys()}
-
- def query(
- self,
- op: Literal["get", "post", "delete", "patch", "put"],
- path: str,
- update_with: Optional["ParsedTree"] = None,
- ) -> "Tuple[ParsedTree, Optional[ParsedTree]]":
- new_root, res = QueryTree(self._data).query(
- op, path, update_with=QueryTree(update_with.to_raw()) if update_with is not None else None
- )
- return ParsedTree(new_root.to_raw()), ParsedTree(res.to_raw()) if res is not None else None
-
- @property
- def etag(self) -> str:
- m = blake2b(digest_size=15)
- m.update(json.dumps(self._data, sort_keys=True).encode("utf8"))
- return base64.urlsafe_b64encode(m.digest()).decode("utf8")
+from .renaming import renamed
# custom hook for 'json.loads()' to detect duplicate keys in data
@@ -122,13 +53,13 @@ class _Format(Enum):
YAML = auto()
JSON = auto()
- def parse_to_dict(self, text: str) -> ParsedTree:
+ def parse_to_dict(self, text: str) -> Any:
if self is _Format.YAML:
# RaiseDuplicatesLoader extends yaml.SafeLoader, so this should be safe
# https://python.land/data-processing/python-yaml#PyYAML_safe_load_vs_load
- return ParsedTree(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
+ return renamed(yaml.load(text, Loader=_RaiseDuplicatesLoader)) # type: ignore
elif self is _Format.JSON:
- return ParsedTree(json.loads(text, object_pairs_hook=_json_raise_duplicates))
+ return renamed(json.loads(text, object_pairs_hook=_json_raise_duplicates))
else:
raise NotImplementedError(f"Parsing of format '{self}' is not implemented")
@@ -144,23 +75,24 @@ class _Format(Enum):
def from_mime_type(mime_type: str) -> "_Format":
formats = {
"application/json": _Format.JSON,
+ "application/yaml": _Format.YAML,
"application/octet-stream": _Format.JSON, # default in aiohttp
"text/vnd.yaml": _Format.YAML,
}
if mime_type not in formats:
raise DataParsingError(
- f"unsupported MIME type '{mime_type}', expected 'application/json' or 'text/vnd.yaml'"
+ f"unsupported MIME type '{mime_type}', expected 'application/json' or 'application/yaml'"
)
return formats[mime_type]
-def parse(data: str, mime_type: str) -> ParsedTree:
+def parse(data: str, mime_type: str) -> Any:
return _Format.from_mime_type(mime_type).parse_to_dict(data)
-def parse_yaml(data: str) -> ParsedTree:
+def parse_yaml(data: str) -> Any:
return _Format.YAML.parse_to_dict(data)
-def parse_json(data: str) -> ParsedTree:
+def parse_json(data: str) -> Any:
return _Format.JSON.parse_to_dict(data)
diff --git a/manager/knot_resolver_manager/utils/modeling/query.py b/manager/knot_resolver_manager/utils/modeling/query.py
index 459085a2..b63e4b96 100644
--- a/manager/knot_resolver_manager/utils/modeling/query.py
+++ b/manager/knot_resolver_manager/utils/modeling/query.py
@@ -1,269 +1,188 @@
import copy
-import json
-import re
-from typing import Any, Dict, List, Optional, Set, Tuple, Union
+from abc import ABC, abstractmethod
+from typing import Any, List, Optional, Tuple, Union
from typing_extensions import Literal
-from knot_resolver_manager.utils.modeling.exceptions import DataParsingError
+from knot_resolver_manager.utils.modeling.base_schema import NoRenameBaseSchema, load
+from knot_resolver_manager.utils.modeling.json_pointer import json_ptr_resolve
-class QueryTree:
- """
- Simple wrapper for raw data which allows modification queries to be run on top.
+class PatchError(Exception):
+ pass
- IMMUTABLE, DO NOT MODIFY
- """
- def is_scalar(self) -> bool:
+class Op(NoRenameBaseSchema, ABC):
+ @abstractmethod
+ def eval(self, fakeroot: Any) -> Any:
"""
- true if the object represents a primitive type
+ modifies the given fakeroot, returns a new one
"""
- return isinstance(self._data, (str, int, bool))
- def is_object(self) -> bool:
- """
- true if the object represents a list or dict
- """
- return isinstance(self._data, (list, dict))
+ def _resolve_ptr(self, fakeroot: Any, ptr: str) -> Tuple[Any, Any, Union[str, int, None]]:
+ # Lookup tree part based on the given JSON pointer
+ parent, obj, token = json_ptr_resolve(fakeroot["root"], ptr)
- def _is_list(self) -> bool:
- return isinstance(self._data, list)
+ # the lookup was on pure data, wrap the results in QueryTree
+ if parent is None:
+ parent = fakeroot
+ token = "root"
- def _is_dict(self) -> bool:
- return isinstance(self._data, dict)
+ assert token is not None
- def _upsert(self, key: str, value: "QueryTree") -> None:
- """
- WARNING!!! MUTATES THE TREE
+ return parent, obj, token
- update or insert
- """
- if isinstance(self._data, dict):
- self._data[key] = value.to_raw()
- elif isinstance(self._data, list):
- if key in self:
- self._data[int(key)] = value.to_raw()
+
+class AddOp(Op):
+ op: Literal["add"]
+ path: str
+ value: Any
+
+ def eval(self, fakeroot: Any) -> Any:
+ parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
+
+ if isinstance(parent, dict):
+ parent[token] = self.value
+ elif isinstance(parent, list):
+ if token == "-":
+ parent.append(self.value)
else:
- raise DataParsingError("query invalid: can't set a value of an item in a list at a non-existent index")
+ assert isinstance(token, int)
+ parent.insert(token, self.value)
else:
- assert False, "this should never happen"
+ assert False, "never happens"
- def _append(self, value: "QueryTree") -> None:
- """
- WARNING!!! MUTATES THE TREE
+ return fakeroot
- append to a list
- """
- assert isinstance(self._data, list)
- self._data.append(value.to_raw())
- def _delete(self, key: str) -> None:
- """
- WARNING!!! MUTATES THE TREE
+class RemoveOp(Op):
+ op: Literal["remove"]
+ path: str
- deletes a key
- """
- assert self.is_object()
- if isinstance(self._data, list):
- del self._data[int(key)]
- elif isinstance(self._data, dict):
- del self._data[key]
- else:
- assert False, "never happens"
+ def eval(self, fakeroot: Any) -> Any:
+ parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
+ del parent[token]
+ return fakeroot
- def value(self) -> Union[str, int, bool]:
- if self.is_object():
- raise DataParsingError("attempted to access object as a scalar")
- assert isinstance(self._data, (str, int, bool)) # make type checker happy
- return self._data
+class ReplaceOp(Op):
+ op: Literal["replace"]
+ path: str
+ value: str
- def __init__(self, data: Union[Dict[str, Any], str, int, bool, List[Any]]):
- self._data = data
+ def eval(self, fakeroot: Any) -> Any:
+ parent, obj, token = self._resolve_ptr(fakeroot, self.path)
- def to_raw(self) -> Union[Dict[str, Any], str, int, bool, List[Any]]:
- return self._data
+ if obj is None:
+ raise PatchError("the value you are trying to replace is null")
+ parent[token] = self.value
+ return fakeroot
- def __getitem__(self, key: Union[str, int]) -> "QueryTree":
- if self.is_scalar():
- raise DataParsingError(f"attempted to access scalar value '{self._data}' as an object type")
- if isinstance(self._data, list):
- return QueryTree(self._data[int(key)])
- elif isinstance(self._data, dict):
- return QueryTree(self._data[str(key)])
- else:
- raise RuntimeError("unexpected type in self._data, this should never happen")
+class MoveOp(Op):
+ op: Literal["move"]
+ source: str
+ path: str
- def __contains__(self, key: Union[str, int]) -> bool:
- if self.is_scalar():
- raise DataParsingError(f"attempted to access scalar value '{self._data}' as an object type")
+ def _source(self, source):
+ if "from" not in source:
+ raise ValueError("missing property 'from' in 'move' JSON patch operation")
+ return str(source["from"])
- if isinstance(self._data, list):
- return int(key) < len(self._data)
- elif isinstance(self._data, dict):
- return key in self._data
- else:
- raise RuntimeError("unexpected type in self._data, this should never happen")
+ def eval(self, fakeroot: Any) -> Any:
+ if self.path.startswith(self.source):
+ raise PatchError("can't move value into itself")
- def __str__(self) -> str:
- return json.dumps(self._data, sort_keys=False, indent=2)
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
+ newobj = copy.deepcopy(obj)
- def keys(self) -> Set[Any]:
- if self.is_scalar():
- raise DataParsingError(f"attempted to access scalar value '{self._data}' as an object type")
+ fakeroot = RemoveOp({"op": "remove", "path": self.source}).eval(fakeroot)
+ fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
+ return fakeroot
- if isinstance(self._data, dict):
- return set(self._data.keys())
- elif isinstance(self._data, list):
- return set(range(len(self._data)))
- else:
- raise RuntimeError("unexpected type in self._data, this should never happen")
- _SUBTREE_MUTATION_PATH_PATTERN = re.compile(r"^(/[^/]+)*/?$")
+class CopyOp(Op):
+ op: Literal["copy"]
+ source: str
+ path: str
- def _preprocess_query_path(self, path: str) -> str:
- # prepare and validate the path object
- path = path[:-1] if path.endswith("/") else path
- if re.match(QueryTree._SUBTREE_MUTATION_PATH_PATTERN, path) is None:
- raise DataParsingError("Provided object path for mutation is invalid.")
- if "_" in path:
- raise DataParsingError("Provided object path contains character '_', which is illegal")
+ def _source(self, source):
+ if "from" not in source:
+ raise ValueError("missing property 'from' in 'copy' JSON patch operation")
+ return str(source["from"])
- # now, the path variable should contain '/' separated field names
- return path.strip("/")
+ def eval(self, fakeroot: Any) -> Any:
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
+ newobj = copy.deepcopy(obj)
- def _copy_and_find(self, path: str) -> Tuple["QueryTree", "QueryTree", Optional["QueryTree"], str]:
- """
- Returns (fakeroot, parent, Optional[queryTarget])
+ fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
+ return fakeroot
- - fakeroot has the real root in a field called 'root'
- - queryTarget is None, when it refers to non-existent object
- """
- path = self._preprocess_query_path(path)
-
- # `self` is considered immutable, do all operations on a copy
- rwcopy = copy.deepcopy(self)
- # make a fake root, so that we do not have to handle special cases for root node
- rwcopy._data = {"root": rwcopy._data} # pylint: disable=protected-access
- segments = f"root/{path}".strip("/").split("/")
-
- # walk the tree
- obj: Optional[QueryTree] = rwcopy
- parent: QueryTree = rwcopy
- segment = "" # just to make type checker happy
- for segment in segments:
- assert len(segment) > 0
- if obj is None:
- raise DataParsingError(
- f"query path does not point to any existing object in the configuration tree, first missing path segment is called '{segment}'"
- )
- elif segment in obj:
- parent = obj
- obj = obj[segment]
- else:
- parent = obj
- obj = None
-
- return rwcopy, parent, obj, segment
-
- @staticmethod
- def _post(
- fakeroot: "QueryTree",
- parent: "QueryTree",
- obj: Optional["QueryTree"],
- name: str,
- update_with: Optional["QueryTree"] = None,
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- # pylint: disable=protected-access
- if update_with is None:
- raise DataParsingError("query invalid: can't request a change via POST and not provide a value")
- if parent._is_dict():
- parent._upsert(name, update_with)
- return fakeroot["root"], None
- elif parent._is_list():
- if obj is None:
- parent._append(update_with)
- return fakeroot["root"], None
- else:
- parent._upsert(name, update_with)
- return fakeroot["root"], None
- else:
- assert False, "this should never happen"
-
- @staticmethod
- def _patch(
- fakeroot: "QueryTree",
- parent: "QueryTree",
- obj: Optional["QueryTree"],
- name: str,
- update_with: Optional["QueryTree"] = None,
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- # pylint: disable=protected-access
- if update_with is None:
- raise DataParsingError("query invalid: can't request a change via PATCH and not provide a value")
- if obj is None:
- raise DataParsingError("query error: can't update non-existent object")
- else:
- parent._upsert(name, update_with)
- return fakeroot["root"], None
-
- @staticmethod
- def _put(
- fakeroot: "QueryTree",
- parent: "QueryTree",
- obj: Optional["QueryTree"],
- name: str,
- update_with: Optional["QueryTree"] = None,
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- # pylint: disable=protected-access
- if update_with is None:
- raise DataParsingError("query invalid: can't request an insert via PUT and not provide a value")
- if obj is None:
- # FIXME probably a bug, this is weird
- if parent._is_list():
- parent._append(update_with)
- return fakeroot["root"], None
- elif parent._is_dict():
- parent._upsert(name, update_with)
- return fakeroot["root"], None
- else:
- assert False, "never happens"
- else:
- raise DataParsingError("query invalid: can't insert when there is already a value there")
+class TestOp(Op):
+ op: Literal["test"]
+ path: str
+ value: Any
- def query(
- self, op: Literal["get", "post", "delete", "patch", "put"], path: str, update_with: Optional["QueryTree"] = None
- ) -> "Tuple[QueryTree, Optional[QueryTree]]":
- """
- Implements a modification API in the style of Caddy:
- https://caddyserver.com/docs/api
- """
- # pylint: disable=protected-access
- fakeroot, parent, obj, name = self._copy_and_find(path)
+ def eval(self, fakeroot: Any) -> Any:
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.path)
- # get = return what the path selector picks
- if op == "get":
- return fakeroot["root"], obj
+ if obj != self.value:
+ raise PatchError("test failed")
- # post = set value at a key, append to lists
- elif op == "post":
- return self._post(fakeroot, parent, obj, name, update_with)
+ return fakeroot
- # delete = remove the given key
- elif op == "delete":
- parent._delete(name)
- return fakeroot["root"], None
- # patch = update an existing object
- elif op == "patch":
- return self._patch(fakeroot, parent, obj, name, update_with)
+def query(
+ original: Any, method: Literal["get", "delete", "put", "patch"], ptr: str, payload: Any
+) -> Tuple[Any, Optional[Any]]:
+ """
+ Implements a modification API in the style of Caddy:
+ https://caddyserver.com/docs/api
+ """
+
+ ########################################
+ # Prepare data we will be working on
+
+ # First of all, we consider the original data to be immutable. So we need to make a copy
+ # in order to freely mutate them
+ dataroot = copy.deepcopy(original)
+
+ # To simplify referencing the root, create a fake root node
+ fakeroot = {"root": dataroot}
+
+ #########################################
+ # Handle the actual requested operation
- # put = insert and never replace
- elif op == "put":
- return self._put(fakeroot, parent, obj, name, update_with)
+ # get = return what the path selector picks
+ if method == "get":
+ parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
+ return fakeroot["root"], obj
+ elif method == "delete":
+ fakeroot = RemoveOp({"op": "remove", "path": ptr}).eval(fakeroot)
+ return fakeroot["root"], None
+
+ elif method == "put":
+ parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
+ assert parent is not None # we know this due to the fakeroot
+ if isinstance(parent, list) and token == "-":
+ parent.append(payload)
else:
- assert False, "invalid operation"
+ parent[token] = payload
+ return fakeroot["root"], None
+
+ elif method == "patch":
+ tp = List[Union[AddOp, RemoveOp, MoveOp, CopyOp, TestOp, ReplaceOp]]
+ transaction: tp = load(tp, payload)
+
+ for i, op in enumerate(transaction):
+ try:
+ fakeroot = op.eval(fakeroot)
+ except PatchError as e:
+ raise ValueError(f"json patch transaction failed on step {i}") from e
+
+ return fakeroot["root"], None
+
+ else:
+ assert False, "invalid operation, never happens"
diff --git a/manager/knot_resolver_manager/utils/modeling/renaming.py b/manager/knot_resolver_manager/utils/modeling/renaming.py
new file mode 100644
index 00000000..05775677
--- /dev/null
+++ b/manager/knot_resolver_manager/utils/modeling/renaming.py
@@ -0,0 +1,70 @@
+from abc import ABC, abstractmethod
+from typing import Any, Dict, List, TypeVar
+
+
+class Renamed(ABC):
+ @abstractmethod
+ def original(self) -> Any:
+ """
+ Returns a data structure, which is the source without dynamic renamings
+ """
+
+ @staticmethod
+ def map_public_to_private(name: Any) -> Any:
+ if isinstance(name, str):
+ return name.replace("_", "-")
+ return name
+
+ @staticmethod
+ def map_private_to_public(name: Any) -> Any:
+ if isinstance(name, str):
+ return name.replace("-", "_")
+ return name
+
+
+K = TypeVar("K")
+V = TypeVar("V")
+
+
+class RenamedDict(Dict[K, V], Renamed):
+ def keys(self) -> Any:
+ keys = super().keys()
+ return {Renamed.map_private_to_public(key) for key in keys}
+
+ def __getitem__(self, key: K) -> V:
+ key = Renamed.map_public_to_private(key)
+ res = super().__getitem__(key)
+ return renamed(res)
+
+ def __setitem__(self, key: K, value: V) -> None:
+ key = Renamed.map_public_to_private(key)
+ return super().__setitem__(key, value)
+
+ def __contains__(self, key: object) -> bool:
+ key = Renamed.map_public_to_private(key)
+ return super().__contains__(key)
+
+ def items(self) -> Any:
+ for k, v in super().items():
+ yield Renamed.map_private_to_public(k), renamed(v)
+
+ def original(self) -> Dict[K, V]:
+ return dict(super().items())
+
+
+class RenamedList(List[V], Renamed): # type: ignore
+ def __getitem__(self, key: Any) -> Any:
+ res = super().__getitem__(key)
+ return renamed(res)
+
+ def original(self) -> Any:
+ return list(super().__iter__())
+
+
+def renamed(obj: Any) -> Any:
+ if isinstance(obj, dict):
+ return RenamedDict(**obj)
+ elif isinstance(obj, list):
+ return RenamedList(obj)
+ else:
+ return obj