diff options
Diffstat (limited to 'python/knot_resolver_manager/manager/datamodel/policy_schema.py')
-rw-r--r-- | python/knot_resolver_manager/manager/datamodel/policy_schema.py | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/python/knot_resolver_manager/manager/datamodel/policy_schema.py b/python/knot_resolver_manager/manager/datamodel/policy_schema.py new file mode 100644 index 00000000..fb215188 --- /dev/null +++ b/python/knot_resolver_manager/manager/datamodel/policy_schema.py @@ -0,0 +1,126 @@ +from typing import List, Optional, Union + +from knot_resolver_manager.manager.datamodel.forward_schema import ForwardServerSchema +from knot_resolver_manager.manager.datamodel.network_schema import AddressRenumberingSchema +from knot_resolver_manager.manager.datamodel.types import ( + DNSRecordTypeEnum, + IPAddressOptionalPort, + PolicyActionEnum, + PolicyFlagEnum, + TimeUnit, +) +from knot_resolver_manager.utils.modeling import ConfigSchema + + +class FilterSchema(ConfigSchema): + """ + Query filtering configuration. + + --- + suffix: Filter based on the suffix of the query name. + pattern: Filter based on the pattern that match query name. + qtype: Filter based on the DNS query type. + """ + + suffix: Optional[str] = None + pattern: Optional[str] = None + qtype: Optional[DNSRecordTypeEnum] = None + + +class AnswerSchema(ConfigSchema): + """ + Configuration of custom resource record for DNS answer. + + --- + rtype: Type of DNS resource record. + rdata: Data of DNS resource record. + ttl: Time-to-live value for defined answer. + nodata: Answer with NODATA If requested type is not configured in the answer. Otherwise policy rule is ignored. + """ + + rtype: DNSRecordTypeEnum + rdata: str + ttl: TimeUnit = TimeUnit("1s") + nodata: bool = False + + +def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None: + servers = ["mirror", "forward", "stub"] + + def _field(ac: str) -> str: + if ac in servers: + return "servers" + return "message" if ac == "deny" else ac + + configurable_actions = ["deny", "reroute", "answer"] + servers + + # checking for missing mandatory fields for actions + field = _field(policy_action.action) + if policy_action.action in configurable_actions and not getattr(policy_action, field): + raise ValueError(f"missing mandatory field '{field}' for '{policy_action.action}' action") + + # checking for unnecessary fields + for ac in configurable_actions + ["deny"]: + field = _field(ac) + if getattr(policy_action, field) and _field(policy_action.action) != field: + raise ValueError(f"'{field}' field can only be defined for '{ac}' action") + + # ForwardServerSchema is valid only for 'forward' action + if policy_action.servers: + for server in policy_action.servers: # pylint: disable=not-an-iterable + if policy_action.action != "forward" and isinstance(server, ForwardServerSchema): + raise ValueError( + f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{policy_action.action}'" + ) + + +class ActionSchema(ConfigSchema): + """ + Configuration of policy action. + + --- + action: Policy action. + message: Deny message for 'deny' action. + reroute: Configuration for 'reroute' action. + answer: Answer definition for 'answer' action. + servers: Servers configuration for 'mirror', 'forward' and 'stub' action. + """ + + action: PolicyActionEnum + message: Optional[str] = None + reroute: Optional[List[AddressRenumberingSchema]] = None + answer: Optional[AnswerSchema] = None + servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None + + def _validate(self) -> None: + _validate_policy_action(self) + + +class PolicySchema(ConfigSchema): + """ + Configuration of policy rule. + + --- + action: Policy rule action. + priority: Policy rule priority. + filter: Query filtering configuration. + views: Use policy rule only for clients defined by views. + options: Configuration flags for policy rule. + message: Deny message for 'deny' action. + reroute: Configuration for 'reroute' action. + answer: Answer definition for 'answer' action. + servers: Servers configuration for 'mirror', 'forward' and 'stub' action. + """ + + action: PolicyActionEnum + priority: Optional[int] = None + filter: Optional[FilterSchema] = None + views: Optional[List[str]] = None + options: Optional[List[PolicyFlagEnum]] = None + message: Optional[str] = None + reroute: Optional[List[AddressRenumberingSchema]] = None + answer: Optional[AnswerSchema] = None + servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None + + def _validate(self) -> None: + _validate_policy_action(self) |