1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
|
import asyncio
import itertools
import json
import logging
import struct
import sys
from abc import ABC, abstractmethod # pylint: disable=no-name-in-module
from enum import Enum, auto
from pathlib import Path
from typing import Dict, Iterable, Optional, Type, TypeVar
from weakref import WeakValueDictionary
from knot_resolver.controller.exceptions import SubprocessControllerError
from knot_resolver.controller.registered_workers import register_worker, unregister_worker
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.manager.constants import kresd_config_file, policy_loader_config_file
from knot_resolver.utils.async_utils import writefile
logger = logging.getLogger(__name__)
class SubprocessType(Enum):
KRESD = auto()
POLICY_LOADER = auto()
GC = auto()
class SubprocessStatus(Enum):
RUNNING = auto()
FATAL = auto()
EXITED = auto()
UNKNOWN = auto()
T = TypeVar("T", bound="KresID")
class KresID:
"""
ID object used for identifying subprocesses.
"""
_used: "Dict[SubprocessType, WeakValueDictionary[int, KresID]]" = {k: WeakValueDictionary() for k in SubprocessType}
@classmethod
def alloc(cls: Type[T], typ: SubprocessType) -> T:
# find free ID closest to zero
for i in itertools.count(start=0, step=1):
if i not in cls._used[typ]:
return cls.new(typ, i)
raise RuntimeError("Reached an end of an infinite loop. How?")
@classmethod
def new(cls: "Type[T]", typ: SubprocessType, n: int) -> "T":
if n in cls._used[typ]:
# Ignoring typing here, because I can't find a way how to make the _used dict
# typed based on subclass. I am not even sure that it's different between subclasses,
# it's probably still the same dict. But we don't really care about it
return cls._used[typ][n] # type: ignore
val = cls(typ, n, _i_know_what_i_am_doing=True)
cls._used[typ][n] = val
return val
def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False):
if not _i_know_what_i_am_doing:
raise RuntimeError("Don't do this. You seem to have no idea what it does")
self._id = n
self._type = typ
@property
def subprocess_type(self) -> SubprocessType:
return self._type
def __repr__(self) -> str:
return f"KresID({self})"
def __hash__(self) -> int:
return self._id
def __eq__(self, o: object) -> bool:
if isinstance(o, KresID):
return self._type == o._type and self._id == o._id
return False
def __str__(self) -> str:
"""
Returns string representation of the ID usable directly in the underlying service manager
"""
raise NotImplementedError()
@staticmethod
def from_string(val: str) -> "KresID":
"""
Inverse of __str__
"""
raise NotImplementedError()
def __int__(self) -> int:
return self._id
class Subprocess(ABC):
"""
One SubprocessInstance corresponds to one manager's subprocess
"""
def __init__(self, config: KresConfig, kresid: KresID) -> None:
self._id = kresid
self._config = config
self._registered_worker: bool = False
async def start(self, new_config: Optional[KresConfig] = None) -> None:
if new_config:
self._config = new_config
config_file: Optional[Path] = None
if self.type is SubprocessType.KRESD:
config_lua = self._config.render_lua()
config_file = kresd_config_file(self._config, self.id)
await writefile(config_file, config_lua)
elif self.type is SubprocessType.POLICY_LOADER:
config_lua = self._config.render_lua_policy()
config_file = policy_loader_config_file(self._config)
await writefile(config_file, config_lua)
try:
await self._start()
if self.type is SubprocessType.KRESD:
register_worker(self)
self._registered_worker = True
except SubprocessControllerError as e:
if config_file:
config_file.unlink()
raise e
async def apply_new_config(self, new_config: KresConfig) -> None:
self._config = new_config
# update config file
logger.debug(f"Writing config file for {self.id}")
config_file: Optional[Path] = None
if self.type is SubprocessType.KRESD:
config_lua = self._config.render_lua()
config_file = kresd_config_file(self._config, self.id)
await writefile(config_file, config_lua)
elif self.type is SubprocessType.POLICY_LOADER:
config_lua = self._config.render_lua_policy()
config_file = policy_loader_config_file(self._config)
await writefile(config_file, config_lua)
# update runtime status
logger.debug(f"Restarting {self.id}")
await self._restart()
async def stop(self) -> None:
if self._registered_worker:
unregister_worker(self)
await self._stop()
await self.cleanup()
async def cleanup(self) -> None:
"""
Remove temporary files and all traces of this instance running. It is NOT SAFE to call this while
the kresd is running, because it will break automatic restarts (at the very least).
"""
if self.type is SubprocessType.KRESD:
config_file = kresd_config_file(self._config, self.id)
config_file.unlink()
elif self.type is SubprocessType.POLICY_LOADER:
config_file = policy_loader_config_file(self._config)
config_file.unlink()
def __eq__(self, o: object) -> bool:
return isinstance(o, type(self)) and o.type == self.type and o.id == self.id
def __hash__(self) -> int:
return hash(type(self)) ^ hash(self.type) ^ hash(self.id)
@abstractmethod
async def _start(self) -> None:
pass
@abstractmethod
async def _stop(self) -> None:
pass
@abstractmethod
async def _restart(self) -> None:
pass
@abstractmethod
def status(self) -> SubprocessStatus:
pass
@property
def type(self) -> SubprocessType:
return self.id.subprocess_type
@property
def id(self) -> KresID:
return self._id
async def command(self, cmd: str) -> object:
if not self._registered_worker:
raise RuntimeError("the command cannot be sent to a process other than the kresd worker")
reader: asyncio.StreamReader
writer: Optional[asyncio.StreamWriter] = None
try:
reader, writer = await asyncio.open_unix_connection(f"./control/{int(self.id)}")
# drop prompt
_ = await reader.read(2)
# switch to JSON mode
writer.write("__json\n".encode("utf8"))
# write command
writer.write(cmd.encode("utf8"))
writer.write(b"\n")
await writer.drain()
# read result
(msg_len,) = struct.unpack(">I", await reader.read(4))
result_bytes = await reader.readexactly(msg_len)
return json.loads(result_bytes.decode("utf8"))
finally:
if writer is not None:
writer.close()
# proper closing of the socket is only implemented in later versions of python
if sys.version_info.minor >= 7:
await writer.wait_closed() # type: ignore
class SubprocessController(ABC):
"""
The common Subprocess Controller interface. This is what KresManager requires and what has to be implemented by all
controllers.
"""
@abstractmethod
async def is_controller_available(self, config: KresConfig) -> bool:
"""
Returns bool, whether the controller is available with the given config
"""
@abstractmethod
async def initialize_controller(self, config: KresConfig) -> None:
"""
Should be called when we want to really start using the controller with a specific configuration
"""
@abstractmethod
async def get_all_running_instances(self) -> Iterable[Subprocess]:
"""
Must NOT be called before initialize_controller()
"""
@abstractmethod
async def shutdown_controller(self) -> None:
"""
Called when the manager is gracefully shutting down. Allows us to stop
the service manager process or simply cleanup, so that we don't reuse
the same resources in a new run.
Must NOT be called before initialize_controller()
"""
@abstractmethod
async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
"""
Return a Subprocess object which can be operated on. The subprocess is not
started or in any way active after this call. That has to be performaed manually
using the returned object itself.
Must NOT be called before initialize_controller()
"""
@abstractmethod
async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
"""
Get a status of running subprocesses as seen by the controller. This method actively polls
for information.
Must NOT be called before initialize_controller()
"""
|