1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
from typing import List, Optional, Union
from knot_resolver_manager.manager.datamodel.forward_schema import ForwardServerSchema
from knot_resolver_manager.manager.datamodel.network_schema import AddressRenumberingSchema
from knot_resolver_manager.manager.datamodel.types import (
DNSRecordTypeEnum,
IPAddressOptionalPort,
PolicyActionEnum,
PolicyFlagEnum,
TimeUnit,
)
from knot_resolver_manager.utils.modeling import ConfigSchema
class FilterSchema(ConfigSchema):
"""
Query filtering configuration.
---
suffix: Filter based on the suffix of the query name.
pattern: Filter based on the pattern that match query name.
qtype: Filter based on the DNS query type.
"""
suffix: Optional[str] = None
pattern: Optional[str] = None
qtype: Optional[DNSRecordTypeEnum] = None
class AnswerSchema(ConfigSchema):
"""
Configuration of custom resource record for DNS answer.
---
rtype: Type of DNS resource record.
rdata: Data of DNS resource record.
ttl: Time-to-live value for defined answer.
nodata: Answer with NODATA If requested type is not configured in the answer. Otherwise policy rule is ignored.
"""
rtype: DNSRecordTypeEnum
rdata: str
ttl: TimeUnit = TimeUnit("1s")
nodata: bool = False
def _validate_policy_action(policy_action: Union["ActionSchema", "PolicySchema"]) -> None:
servers = ["mirror", "forward", "stub"]
def _field(ac: str) -> str:
if ac in servers:
return "servers"
return "message" if ac == "deny" else ac
configurable_actions = ["deny", "reroute", "answer"] + servers
# checking for missing mandatory fields for actions
field = _field(policy_action.action)
if policy_action.action in configurable_actions and not getattr(policy_action, field):
raise ValueError(f"missing mandatory field '{field}' for '{policy_action.action}' action")
# checking for unnecessary fields
for ac in configurable_actions + ["deny"]:
field = _field(ac)
if getattr(policy_action, field) and _field(policy_action.action) != field:
raise ValueError(f"'{field}' field can only be defined for '{ac}' action")
# ForwardServerSchema is valid only for 'forward' action
if policy_action.servers:
for server in policy_action.servers: # pylint: disable=not-an-iterable
if policy_action.action != "forward" and isinstance(server, ForwardServerSchema):
raise ValueError(
f"'ForwardServerSchema' in 'servers' is valid only for 'forward' action, got '{policy_action.action}'"
)
class ActionSchema(ConfigSchema):
"""
Configuration of policy action.
---
action: Policy action.
message: Deny message for 'deny' action.
reroute: Configuration for 'reroute' action.
answer: Answer definition for 'answer' action.
servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
"""
action: PolicyActionEnum
message: Optional[str] = None
reroute: Optional[List[AddressRenumberingSchema]] = None
answer: Optional[AnswerSchema] = None
servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
def _validate(self) -> None:
_validate_policy_action(self)
class PolicySchema(ConfigSchema):
"""
Configuration of policy rule.
---
action: Policy rule action.
priority: Policy rule priority.
filter: Query filtering configuration.
views: Use policy rule only for clients defined by views.
options: Configuration flags for policy rule.
message: Deny message for 'deny' action.
reroute: Configuration for 'reroute' action.
answer: Answer definition for 'answer' action.
servers: Servers configuration for 'mirror', 'forward' and 'stub' action.
"""
action: PolicyActionEnum
priority: Optional[int] = None
filter: Optional[FilterSchema] = None
views: Optional[List[str]] = None
options: Optional[List[PolicyFlagEnum]] = None
message: Optional[str] = None
reroute: Optional[List[AddressRenumberingSchema]] = None
answer: Optional[AnswerSchema] = None
servers: Optional[Union[List[IPAddressOptionalPort], List[ForwardServerSchema]]] = None
def _validate(self) -> None:
_validate_policy_action(self)
|