summaryrefslogtreecommitdiffstats
path: root/python/knot_resolver_manager/manager/datamodel/policy_schema.py
blob: fb215188f125db1cb02d4ec974b6c04384e31610 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from typing import List, Optional, Union

from knot_resolver_manager.manager.datamodel.forward_schema import ForwardServerSchema
from knot_resolver_manager.manager.datamodel.network_schema import AddressRenumberingSchema
from knot_resolver_manager.manager.datamodel.types import (
    DNSRecordTypeEnum,
    IPAddressOptionalPort,
    PolicyActionEnum,
    PolicyFlagEnum,
    TimeUnit,
)
from knot_resolver_manager.utils.modeling import ConfigSchema


class FilterSchema(ConfigSchema):
    """
    Query filtering configuration.

    ---
    suffix: Filter based on the suffix of the query name.
    pattern: Filter based on the pattern that match query name.
    qtype: Filter based on the DNS query type.
    """

    suffix: Optional[str] = None
    pattern: Optional[str] = None
    qtype: Optional[DNSRecordTypeEnum] = None


class AnswerSchema(ConfigSchema):
    """
    Configuration of custom resource record for DNS answer.

    ---
    rtype: Type of DNS resource record.
    rdata: Data of DNS resource record.
    ttl: Time-to-live value for defined answer.
    nodata: Answer with NODATA If requested type is not configured in the answer. Otherwise policy rule is ignored.
    """

    rtype: DNSRecordTypeEnum
    rdata: str
    ttl: TimeUnit = TimeUnit("1s")
    nodata: bool = False


def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None:
    servers = ["mirror", "forward", "stub"]

    def _field(ac: str) -> str:
        if ac in servers:
            return "servers"
        return "message" if ac == "deny" else ac

    configurable_actions = ["deny", "reroute", "answer"] + servers

    # checking for missing mandatory fields for actions
    field = _field(policy_action.action)
    if policy_action.action in configurable_actions and not getattr(policy_action, field):
        raise ValueError(f"missing mandatory field '{field}' for '{policy_action.action}' action")

    # checking for unnecessary fields
    for ac in configurable_actions + ["deny"]:
        field = _field(ac)
        if getattr(policy_action, field) and _field(policy_action.action) != field:
            raise ValueError(f"'{field}' field can only be defined for '{ac}' action")

    # ForwardServerSchema is valid only for 'forward' action
    if policy_action.servers:
        for server in policy_action.servers:  # pylint: disable=not-an-iterable
            if policy_action.action != "forward" and isinstance(server, ForwardServerSchema):
                raise ValueError(
                    f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{policy_action.action}'"
                )


class ActionSchema(ConfigSchema):
    """
    Configuration of policy action.

    ---
    action: Policy action.
    message: Deny message for 'deny' action.
    reroute: Configuration for 'reroute' action.
    answer: Answer definition for 'answer' action.
    servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
    """

    action: PolicyActionEnum
    message: Optional[str] = None
    reroute: Optional[List[AddressRenumberingSchema]] = None
    answer: Optional[AnswerSchema] = None
    servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None

    def _validate(self) -> None:
        _validate_policy_action(self)


class PolicySchema(ConfigSchema):
    """
    Configuration of policy rule.

    ---
    action: Policy rule action.
    priority: Policy rule priority.
    filter: Query filtering configuration.
    views: Use policy rule only for clients defined by views.
    options: Configuration flags for policy rule.
    message: Deny message for 'deny' action.
    reroute: Configuration for 'reroute' action.
    answer: Answer definition for 'answer' action.
    servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
    """

    action: PolicyActionEnum
    priority: Optional[int] = None
    filter: Optional[FilterSchema] = None
    views: Optional[List[str]] = None
    options: Optional[List[PolicyFlagEnum]] = None
    message: Optional[str] = None
    reroute: Optional[List[AddressRenumberingSchema]] = None
    answer: Optional[AnswerSchema] = None
    servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None

    def _validate(self) -> None:
        _validate_policy_action(self)