diff options
Diffstat (limited to 'manager/knot_resolver_manager/utils/modeling/query.py')
-rw-r--r-- | manager/knot_resolver_manager/utils/modeling/query.py | 183 |
1 files changed, 183 insertions, 0 deletions
diff --git a/manager/knot_resolver_manager/utils/modeling/query.py b/manager/knot_resolver_manager/utils/modeling/query.py new file mode 100644 index 00000000..cfea82f6 --- /dev/null +++ b/manager/knot_resolver_manager/utils/modeling/query.py @@ -0,0 +1,183 @@ +import copy +from abc import ABC, abstractmethod # pylint: disable=[no-name-in-module] +from typing import Any, List, Optional, Tuple, Union + +from typing_extensions import Literal + +from knot_resolver_manager.utils.modeling.base_schema import BaseSchema, map_object +from knot_resolver_manager.utils.modeling.json_pointer import json_ptr_resolve + + +class PatchError(Exception): + pass + + +class Op(BaseSchema, ABC): + @abstractmethod + def eval(self, fakeroot: Any) -> Any: + """ + modifies the given fakeroot, returns a new one + """ + + def _resolve_ptr(self, fakeroot: Any, ptr: str) -> Tuple[Any, Any, Union[str, int, None]]: + # Lookup tree part based on the given JSON pointer + parent, obj, token = json_ptr_resolve(fakeroot["root"], ptr) + + # the lookup was on pure data, wrap the results in QueryTree + if parent is None: + parent = fakeroot + token = "root" + + assert token is not None + + return parent, obj, token + + +class AddOp(Op): + op: Literal["add"] + path: str + value: Any + + def eval(self, fakeroot: Any) -> Any: + parent, _obj, token = self._resolve_ptr(fakeroot, self.path) + + if isinstance(parent, dict): + parent[token] = self.value + elif isinstance(parent, list): + if token == "-": + parent.append(self.value) + else: + assert isinstance(token, int) + parent.insert(token, self.value) + else: + assert False, "never happens" + + return fakeroot + + +class RemoveOp(Op): + op: Literal["remove"] + path: str + + def eval(self, fakeroot: Any) -> Any: + parent, _obj, token = self._resolve_ptr(fakeroot, self.path) + del parent[token] + return fakeroot + + +class ReplaceOp(Op): + op: Literal["replace"] + path: str + value: str + + def eval(self, fakeroot: Any) -> Any: + parent, obj, token = self._resolve_ptr(fakeroot, self.path) + + if obj is None: + raise PatchError("the value you are trying to replace is null") + parent[token] = self.value + return fakeroot + + +class MoveOp(Op): + op: Literal["move"] + source: str + path: str + + def _source(self, source): + if "from" not in source: + raise ValueError("missing property 'from' in 'move' JSON patch operation") + return str(source["from"]) + + def eval(self, fakeroot: Any) -> Any: + if self.path.startswith(self.source): + raise PatchError("can't move value into itself") + + _parent, obj, _token = self._resolve_ptr(fakeroot, self.source) + newobj = copy.deepcopy(obj) + + fakeroot = RemoveOp({"op": "remove", "path": self.source}).eval(fakeroot) + fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot) + return fakeroot + + +class CopyOp(Op): + op: Literal["copy"] + source: str + path: str + + def _source(self, source): + if "from" not in source: + raise ValueError("missing property 'from' in 'copy' JSON patch operation") + return str(source["from"]) + + def eval(self, fakeroot: Any) -> Any: + _parent, obj, _token = self._resolve_ptr(fakeroot, self.source) + newobj = copy.deepcopy(obj) + + fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot) + return fakeroot + + +class TestOp(Op): + op: Literal["test"] + path: str + value: Any + + def eval(self, fakeroot: Any) -> Any: + _parent, obj, _token = self._resolve_ptr(fakeroot, self.path) + + if obj != self.value: + raise PatchError("test failed") + + return fakeroot + + +def query( + original: Any, method: Literal["get", "delete", "put", "patch"], ptr: str, payload: Any +) -> Tuple[Any, Optional[Any]]: + ######################################## + # Prepare data we will be working on + + # First of all, we consider the original data to be immutable. So we need to make a copy + # in order to freely mutate them + dataroot = copy.deepcopy(original) + + # To simplify referencing the root, create a fake root node + fakeroot = {"root": dataroot} + + ######################################### + # Handle the actual requested operation + + # get = return what the path selector picks + if method == "get": + parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}") + return fakeroot["root"], obj + + elif method == "delete": + fakeroot = RemoveOp({"op": "remove", "path": ptr}).eval(fakeroot) + return fakeroot["root"], None + + elif method == "put": + parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}") + assert parent is not None # we know this due to the fakeroot + if isinstance(parent, list) and token == "-": + parent.append(payload) + else: + parent[token] = payload + return fakeroot["root"], None + + elif method == "patch": + tp = List[Union[AddOp, RemoveOp, MoveOp, CopyOp, TestOp, ReplaceOp]] + transaction: tp = map_object(tp, payload) + + for i, op in enumerate(transaction): + try: + fakeroot = op.eval(fakeroot) + except PatchError as e: + raise ValueError(f"json patch transaction failed on step {i}") from e + + return fakeroot["root"], None + + else: + assert False, "invalid operation, never happens" |