summaryrefslogtreecommitdiffstats
path: root/manager/knot_resolver_manager/utils/modeling/query.py
diff options
context:
space:
mode:
Diffstat (limited to 'manager/knot_resolver_manager/utils/modeling/query.py')
-rw-r--r--manager/knot_resolver_manager/utils/modeling/query.py183
1 files changed, 183 insertions, 0 deletions
diff --git a/manager/knot_resolver_manager/utils/modeling/query.py b/manager/knot_resolver_manager/utils/modeling/query.py
new file mode 100644
index 00000000..cfea82f6
--- /dev/null
+++ b/manager/knot_resolver_manager/utils/modeling/query.py
@@ -0,0 +1,183 @@
+import copy
+from abc import ABC, abstractmethod # pylint: disable=[no-name-in-module]
+from typing import Any, List, Optional, Tuple, Union
+
+from typing_extensions import Literal
+
+from knot_resolver_manager.utils.modeling.base_schema import BaseSchema, map_object
+from knot_resolver_manager.utils.modeling.json_pointer import json_ptr_resolve
+
+
+class PatchError(Exception):
+ pass
+
+
+class Op(BaseSchema, ABC):
+ @abstractmethod
+ def eval(self, fakeroot: Any) -> Any:
+ """
+ modifies the given fakeroot, returns a new one
+ """
+
+ def _resolve_ptr(self, fakeroot: Any, ptr: str) -> Tuple[Any, Any, Union[str, int, None]]:
+ # Lookup tree part based on the given JSON pointer
+ parent, obj, token = json_ptr_resolve(fakeroot["root"], ptr)
+
+ # the lookup was on pure data, wrap the results in QueryTree
+ if parent is None:
+ parent = fakeroot
+ token = "root"
+
+ assert token is not None
+
+ return parent, obj, token
+
+
+class AddOp(Op):
+ op: Literal["add"]
+ path: str
+ value: Any
+
+ def eval(self, fakeroot: Any) -> Any:
+ parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
+
+ if isinstance(parent, dict):
+ parent[token] = self.value
+ elif isinstance(parent, list):
+ if token == "-":
+ parent.append(self.value)
+ else:
+ assert isinstance(token, int)
+ parent.insert(token, self.value)
+ else:
+ assert False, "never happens"
+
+ return fakeroot
+
+
+class RemoveOp(Op):
+ op: Literal["remove"]
+ path: str
+
+ def eval(self, fakeroot: Any) -> Any:
+ parent, _obj, token = self._resolve_ptr(fakeroot, self.path)
+ del parent[token]
+ return fakeroot
+
+
+class ReplaceOp(Op):
+ op: Literal["replace"]
+ path: str
+ value: str
+
+ def eval(self, fakeroot: Any) -> Any:
+ parent, obj, token = self._resolve_ptr(fakeroot, self.path)
+
+ if obj is None:
+ raise PatchError("the value you are trying to replace is null")
+ parent[token] = self.value
+ return fakeroot
+
+
+class MoveOp(Op):
+ op: Literal["move"]
+ source: str
+ path: str
+
+ def _source(self, source):
+ if "from" not in source:
+ raise ValueError("missing property 'from' in 'move' JSON patch operation")
+ return str(source["from"])
+
+ def eval(self, fakeroot: Any) -> Any:
+ if self.path.startswith(self.source):
+ raise PatchError("can't move value into itself")
+
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
+ newobj = copy.deepcopy(obj)
+
+ fakeroot = RemoveOp({"op": "remove", "path": self.source}).eval(fakeroot)
+ fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
+ return fakeroot
+
+
+class CopyOp(Op):
+ op: Literal["copy"]
+ source: str
+ path: str
+
+ def _source(self, source):
+ if "from" not in source:
+ raise ValueError("missing property 'from' in 'copy' JSON patch operation")
+ return str(source["from"])
+
+ def eval(self, fakeroot: Any) -> Any:
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.source)
+ newobj = copy.deepcopy(obj)
+
+ fakeroot = AddOp({"path": self.path, "value": newobj, "op": "add"}).eval(fakeroot)
+ return fakeroot
+
+
+class TestOp(Op):
+ op: Literal["test"]
+ path: str
+ value: Any
+
+ def eval(self, fakeroot: Any) -> Any:
+ _parent, obj, _token = self._resolve_ptr(fakeroot, self.path)
+
+ if obj != self.value:
+ raise PatchError("test failed")
+
+ return fakeroot
+
+
+def query(
+ original: Any, method: Literal["get", "delete", "put", "patch"], ptr: str, payload: Any
+) -> Tuple[Any, Optional[Any]]:
+ ########################################
+ # Prepare data we will be working on
+
+ # First of all, we consider the original data to be immutable. So we need to make a copy
+ # in order to freely mutate them
+ dataroot = copy.deepcopy(original)
+
+ # To simplify referencing the root, create a fake root node
+ fakeroot = {"root": dataroot}
+
+ #########################################
+ # Handle the actual requested operation
+
+ # get = return what the path selector picks
+ if method == "get":
+ parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
+ return fakeroot["root"], obj
+
+ elif method == "delete":
+ fakeroot = RemoveOp({"op": "remove", "path": ptr}).eval(fakeroot)
+ return fakeroot["root"], None
+
+ elif method == "put":
+ parent, obj, token = json_ptr_resolve(fakeroot, f"/root{ptr}")
+ assert parent is not None # we know this due to the fakeroot
+ if isinstance(parent, list) and token == "-":
+ parent.append(payload)
+ else:
+ parent[token] = payload
+ return fakeroot["root"], None
+
+ elif method == "patch":
+ tp = List[Union[AddOp, RemoveOp, MoveOp, CopyOp, TestOp, ReplaceOp]]
+ transaction: tp = map_object(tp, payload)
+
+ for i, op in enumerate(transaction):
+ try:
+ fakeroot = op.eval(fakeroot)
+ except PatchError as e:
+ raise ValueError(f"json patch transaction failed on step {i}") from e
+
+ return fakeroot["root"], None
+
+ else:
+ assert False, "invalid operation, never happens"