summaryrefslogtreecommitdiffstats
path: root/python/knot_resolver/controller/interface.py
blob: 0544dac23c79300fe10f332aa9c2c185308589d2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import asyncio
import itertools
import json
import logging
import struct
import sys
from abc import ABC, abstractmethod  # pylint: disable=no-name-in-module
from enum import Enum, auto
from pathlib import Path
from typing import Dict, Iterable, Optional, Type, TypeVar
from weakref import WeakValueDictionary

from knot_resolver.controller.exceptions import SubprocessControllerError
from knot_resolver.controller.registered_workers import register_worker, unregister_worker
from knot_resolver.datamodel.config_schema import KresConfig
from knot_resolver.manager.constants import kresd_config_file, policy_loader_config_file
from knot_resolver.utils.async_utils import writefile

logger = logging.getLogger(__name__)


class SubprocessType(Enum):
    KRESD = auto()
    POLICY_LOADER = auto()
    GC = auto()


class SubprocessStatus(Enum):
    RUNNING = auto()
    FATAL = auto()
    EXITED = auto()
    UNKNOWN = auto()


T = TypeVar("T", bound="KresID")


class KresID:
    """
    ID object used for identifying subprocesses.
    """

    _used: "Dict[SubprocessType, WeakValueDictionary[int, KresID]]" = {k: WeakValueDictionary() for k in SubprocessType}

    @classmethod
    def alloc(cls: Type[T], typ: SubprocessType) -> T:
        # find free ID closest to zero
        for i in itertools.count(start=0, step=1):
            if i not in cls._used[typ]:
                return cls.new(typ, i)

        raise RuntimeError("Reached an end of an infinite loop. How?")

    @classmethod
    def new(cls: "Type[T]", typ: SubprocessType, n: int) -> "T":
        if n in cls._used[typ]:
            # Ignoring typing here, because I can't find a way how to make the _used dict
            # typed based on subclass. I am not even sure that it's different between subclasses,
            # it's probably still the same dict. But we don't really care about it
            return cls._used[typ][n]  # type: ignore
        val = cls(typ, n, _i_know_what_i_am_doing=True)
        cls._used[typ][n] = val
        return val

    def __init__(self, typ: SubprocessType, n: int, _i_know_what_i_am_doing: bool = False):
        if not _i_know_what_i_am_doing:
            raise RuntimeError("Don't do this. You seem to have no idea what it does")

        self._id = n
        self._type = typ

    @property
    def subprocess_type(self) -> SubprocessType:
        return self._type

    def __repr__(self) -> str:
        return f"KresID({self})"

    def __hash__(self) -> int:
        return self._id

    def __eq__(self, o: object) -> bool:
        if isinstance(o, KresID):
            return self._type == o._type and self._id == o._id
        return False

    def __str__(self) -> str:
        """
        Returns string representation of the ID usable directly in the underlying service manager
        """
        raise NotImplementedError()

    @staticmethod
    def from_string(val: str) -> "KresID":
        """
        Inverse of __str__
        """
        raise NotImplementedError()

    def __int__(self) -> int:
        return self._id


class Subprocess(ABC):
    """
    One SubprocessInstance corresponds to one manager's subprocess
    """

    def __init__(self, config: KresConfig, kresid: KresID) -> None:
        self._id = kresid
        self._config = config
        self._registered_worker: bool = False

    async def start(self, new_config: Optional[KresConfig] = None) -> None:
        if new_config:
            self._config = new_config

        config_file: Optional[Path] = None
        if self.type is SubprocessType.KRESD:
            config_lua = self._config.render_lua()
            config_file = kresd_config_file(self._config, self.id)
            await writefile(config_file, config_lua)
        elif self.type is SubprocessType.POLICY_LOADER:
            config_lua = self._config.render_lua_policy()
            config_file = policy_loader_config_file(self._config)
            await writefile(config_file, config_lua)

        try:
            await self._start()
            if self.type is SubprocessType.KRESD:
                register_worker(self)
                self._registered_worker = True
        except SubprocessControllerError as e:
            if config_file:
                config_file.unlink()
            raise e

    async def apply_new_config(self, new_config: KresConfig) -> None:
        self._config = new_config

        # update config file
        logger.debug(f"Writing config file for {self.id}")

        config_file: Optional[Path] = None
        if self.type is SubprocessType.KRESD:
            config_lua = self._config.render_lua()
            config_file = kresd_config_file(self._config, self.id)
            await writefile(config_file, config_lua)
        elif self.type is SubprocessType.POLICY_LOADER:
            config_lua = self._config.render_lua_policy()
            config_file = policy_loader_config_file(self._config)
            await writefile(config_file, config_lua)

        # update runtime status
        logger.debug(f"Restarting {self.id}")
        await self._restart()

    async def stop(self) -> None:
        if self._registered_worker:
            unregister_worker(self)
        await self._stop()
        await self.cleanup()

    async def cleanup(self) -> None:
        """
        Remove temporary files and all traces of this instance running. It is NOT SAFE to call this while
        the kresd is running, because it will break automatic restarts (at the very least).
        """

        if self.type is SubprocessType.KRESD:
            config_file = kresd_config_file(self._config, self.id)
            config_file.unlink()
        elif self.type is SubprocessType.POLICY_LOADER:
            config_file = policy_loader_config_file(self._config)
            config_file.unlink()

    def __eq__(self, o: object) -> bool:
        return isinstance(o, type(self)) and o.type == self.type and o.id == self.id

    def __hash__(self) -> int:
        return hash(type(self)) ^ hash(self.type) ^ hash(self.id)

    @abstractmethod
    async def _start(self) -> None:
        pass

    @abstractmethod
    async def _stop(self) -> None:
        pass

    @abstractmethod
    async def _restart(self) -> None:
        pass

    @abstractmethod
    def status(self) -> SubprocessStatus:
        pass

    @property
    def type(self) -> SubprocessType:
        return self.id.subprocess_type

    @property
    def id(self) -> KresID:
        return self._id

    async def command(self, cmd: str) -> object:
        if not self._registered_worker:
            raise RuntimeError("the command cannot be sent to a process other than the kresd worker")

        reader: asyncio.StreamReader
        writer: Optional[asyncio.StreamWriter] = None

        try:
            reader, writer = await asyncio.open_unix_connection(f"./control/{int(self.id)}")

            # drop prompt
            _ = await reader.read(2)

            # switch to JSON mode
            writer.write("__json\n".encode("utf8"))

            # write command
            writer.write(cmd.encode("utf8"))
            writer.write(b"\n")
            await writer.drain()

            # read result
            (msg_len,) = struct.unpack(">I", await reader.read(4))
            result_bytes = await reader.readexactly(msg_len)
            return json.loads(result_bytes.decode("utf8"))

        finally:
            if writer is not None:
                writer.close()

                # proper closing of the socket is only implemented in later versions of python
                if sys.version_info.minor >= 7:
                    await writer.wait_closed()  # type: ignore


class SubprocessController(ABC):
    """
    The common Subprocess Controller interface. This is what KresManager requires and what has to be implemented by all
    controllers.
    """

    @abstractmethod
    async def is_controller_available(self, config: KresConfig) -> bool:
        """
        Returns bool, whether the controller is available with the given config
        """

    @abstractmethod
    async def initialize_controller(self, config: KresConfig) -> None:
        """
        Should be called when we want to really start using the controller with a specific configuration
        """

    @abstractmethod
    async def get_all_running_instances(self) -> Iterable[Subprocess]:
        """

        Must NOT be called before initialize_controller()
        """

    @abstractmethod
    async def shutdown_controller(self) -> None:
        """
        Called when the manager is gracefully shutting down. Allows us to stop
        the service manager process or simply cleanup, so that we don't reuse
        the same resources in a new run.

        Must NOT be called before initialize_controller()
        """

    @abstractmethod
    async def create_subprocess(self, subprocess_config: KresConfig, subprocess_type: SubprocessType) -> Subprocess:
        """
        Return a Subprocess object which can be operated on. The subprocess is not
        started or in any way active after this call. That has to be performaed manually
        using the returned object itself.

        Must NOT be called before initialize_controller()
        """

    @abstractmethod
    async def get_subprocess_status(self) -> Dict[KresID, SubprocessStatus]:
        """
        Get a status of running subprocesses as seen by the controller. This method  actively polls
        for information.

        Must NOT be called before initialize_controller()
        """