1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
|
import asyncio
import errno
import json
import logging
import os
import signal
import sys
from functools import partial
from http import HTTPStatus
from pathlib import Path
from pwd import getpwuid
from time import time
from typing import Any, Dict, List, Literal, Optional, Set, Union, cast
from aiohttp import web
from aiohttp.web import middleware
from aiohttp.web_app import Application
from aiohttp.web_response import json_response
from aiohttp.web_runner import AppRunner, TCPSite, UnixSite
from knot_resolver.constants import CONFIG_FILE, USER
from knot_resolver.controller import get_best_controller_implementation
from knot_resolver.controller.exceptions import SubprocessControllerExecError
from knot_resolver.controller.interface import SubprocessType
from knot_resolver.controller.registered_workers import command_single_registered_worker
from knot_resolver.datamodel import kres_config_json_schema
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
from knot_resolver.datamodel.config_schema import KresConfig, get_rundir_without_validation
from knot_resolver.datamodel.globals import Context, set_global_validation_context
from knot_resolver.datamodel.management_schema import ManagementSchema
from knot_resolver.manager import files, metrics
from knot_resolver.utils import custom_atexit as atexit
from knot_resolver.utils import ignore_exceptions_optional
from knot_resolver.utils.async_utils import readfile
from knot_resolver.utils.compat import asyncio as asyncio_compat
from knot_resolver.utils.etag import structural_etag
from knot_resolver.utils.functional import Result
from knot_resolver.utils.modeling.exceptions import AggregateDataValidationError, DataParsingError, DataValidationError
from knot_resolver.utils.modeling.parsing import DataFormat, try_to_parse
from knot_resolver.utils.modeling.query import query
from knot_resolver.utils.modeling.types import NoneType
from knot_resolver.utils.systemd_notify import systemd_notify
from .config_store import ConfigStore
from .constants import PID_FILE_NAME, init_user_constants
from .exceptions import KresManagerException
from .logging import logger_init
from .manager import KresManager
logger = logging.getLogger(__name__)
@middleware
async def error_handler(request: web.Request, handler: Any) -> web.Response:
"""
Generic error handler for route handlers.
If an exception is thrown during request processing, this middleware catches it
and responds accordingly.
"""
try:
return await handler(request)
except (AggregateDataValidationError, DataValidationError) as e:
return web.Response(text=str(e), status=HTTPStatus.BAD_REQUEST)
except DataParsingError as e:
return web.Response(text=f"request processing error:\n{e}", status=HTTPStatus.BAD_REQUEST)
except KresManagerException as e:
return web.Response(text=f"request processing failed:\n{e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
def from_mime_type(mime_type: str) -> DataFormat:
formats = {
"application/json": DataFormat.JSON,
"application/octet-stream": DataFormat.JSON, # default in aiohttp
}
if mime_type not in formats:
raise DataParsingError(f"unsupported MIME type '{mime_type}', expected: {str(formats)[1:-1]}")
return formats[mime_type]
def parse_from_mime_type(data: str, mime_type: str) -> Any:
return from_mime_type(mime_type).parse_to_dict(data)
class Server:
# pylint: disable=too-many-instance-attributes
# This is top-level class containing pretty much everything. Instead of global
# variables, we use instance attributes. That's why there are so many and it's
# ok.
def __init__(self, store: ConfigStore, config_path: Optional[Path], manager: KresManager):
# config store & server dynamic reconfiguration
self.config_store = store
# HTTP server
self.app = Application(middlewares=[error_handler])
self.runner = AppRunner(self.app)
self.listen: Optional[ManagementSchema] = None
self.site: Union[NoneType, TCPSite, UnixSite] = None
self.listen_lock = asyncio.Lock()
self._config_path: Optional[Path] = config_path
self._exit_code: int = 0
self._shutdown_event = asyncio.Event()
self._manager = manager
async def _reconfigure(self, config: KresConfig) -> None:
await self._reconfigure_listen_address(config)
async def _deny_management_changes(self, config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
if config_old.management != config_new.management:
return Result.err(
"/server/management: Changing management API address/uTruenix-socket dynamically is not allowed as it's really dangerous."
" If you really need this feature, please contact the developers and explain why. Technically,"
" there are no problems in supporting it. We are only blocking the dynamic changes because"
" we think the consequences of leaving this footgun unprotected are worse than its usefulness."
)
return Result.ok(None)
async def _reload_config(self) -> None:
if self._config_path is None:
logger.warning("The manager was started with inlined configuration - can't reload")
else:
try:
data = await readfile(self._config_path)
config = KresConfig(try_to_parse(data))
await self.config_store.update(config)
logger.info("Configuration file successfully reloaded")
except FileNotFoundError:
logger.error(
f"Configuration file was not found at '{self._config_path}'."
" Something must have happened to it while we were running."
)
logger.error("Configuration have NOT been changed.")
except (DataParsingError, DataValidationError) as e:
logger.error(f"Failed to parse the updated configuration file: {e}")
logger.error("Configuration have NOT been changed.")
except KresManagerException as e:
logger.error(f"Reloading of the configuration file failed: {e}")
logger.error("Configuration have NOT been changed.")
async def sigint_handler(self) -> None:
logger.info("Received SIGINT, triggering graceful shutdown")
self.trigger_shutdown(0)
async def sigterm_handler(self) -> None:
logger.info("Received SIGTERM, triggering graceful shutdown")
self.trigger_shutdown(0)
async def sighup_handler(self) -> None:
logger.info("Received SIGHUP, reloading configuration file")
systemd_notify(RELOADING="1")
await self._reload_config()
systemd_notify(READY="1")
@staticmethod
def all_handled_signals() -> Set[signal.Signals]:
return {signal.SIGHUP, signal.SIGINT, signal.SIGTERM}
def bind_signal_handlers(self):
asyncio_compat.add_async_signal_handler(signal.SIGTERM, self.sigterm_handler)
asyncio_compat.add_async_signal_handler(signal.SIGINT, self.sigint_handler)
asyncio_compat.add_async_signal_handler(signal.SIGHUP, self.sighup_handler)
def unbind_signal_handlers(self):
asyncio_compat.remove_signal_handler(signal.SIGTERM)
asyncio_compat.remove_signal_handler(signal.SIGINT)
asyncio_compat.remove_signal_handler(signal.SIGHUP)
async def start(self) -> None:
self._setup_routes()
await self.runner.setup()
await self.config_store.register_verifier(self._deny_management_changes)
await self.config_store.register_on_change_callback(self._reconfigure)
async def wait_for_shutdown(self) -> None:
await self._shutdown_event.wait()
def trigger_shutdown(self, exit_code: int) -> None:
self._shutdown_event.set()
self._exit_code = exit_code
async def _handler_index(self, _request: web.Request) -> web.Response:
"""
Dummy index handler to indicate that the server is indeed running...
"""
return json_response(
{
"msg": "Knot Resolver Manager is running! The configuration endpoint is at /config",
"status": "RUNNING",
}
)
async def _handler_config_query(self, request: web.Request) -> web.Response:
"""
Route handler for changing resolver configuration
"""
# There are a lot of local variables in here, but they are usually immutable (almost SSA form :) )
# pylint: disable=too-many-locals
# parse the incoming data
if request.method == "GET":
update_with: Optional[Dict[str, Any]] = None
else:
update_with = parse_from_mime_type(await request.text(), request.content_type)
document_path = request.match_info["path"]
getheaders = ignore_exceptions_optional(List[str], None, KeyError)(request.headers.getall)
etags = getheaders("if-match")
not_etags = getheaders("if-none-match")
current_config: Dict[str, Any] = self.config_store.get().get_unparsed_data()
# stop processing if etags
def strip_quotes(s: str) -> str:
return s.strip('"')
# WARNING: this check is prone to race conditions. When changing, make sure that the current config
# is really the latest current config (i.e. no await in between obtaining the config and the checks)
status = HTTPStatus.NOT_MODIFIED if request.method in ("GET", "HEAD") else HTTPStatus.PRECONDITION_FAILED
if etags is not None and structural_etag(current_config) not in map(strip_quotes, etags):
return web.Response(status=status)
if not_etags is not None and structural_etag(current_config) in map(strip_quotes, not_etags):
return web.Response(status=status)
# run query
op = cast(Literal["get", "delete", "patch", "put"], request.method.lower())
new_config, to_return = query(current_config, op, document_path, update_with)
# update the config
if request.method != "GET":
# validate
config_validated = KresConfig(new_config)
# apply
await self.config_store.update(config_validated)
# serialize the response (the `to_return` object is a Dict/list/scalar, we want to return json)
resp_text: Optional[str] = json.dumps(to_return) if to_return is not None else None
# create the response and return it
res = web.Response(status=HTTPStatus.OK, text=resp_text, content_type="application/json")
res.headers.add("ETag", f'"{structural_etag(new_config)}"')
return res
async def _handler_metrics(self, request: web.Request) -> web.Response:
raise web.HTTPMovedPermanently("/metrics/json")
async def _handler_metrics_json(self, _request: web.Request) -> web.Response:
config = self.config_store.get()
return web.Response(
body=await metrics.report_json(config),
content_type="application/json",
charset="utf8",
)
async def _handler_metrics_prometheus(self, _request: web.Request) -> web.Response:
metrics_report = await metrics.report_prometheus()
if not metrics_report:
raise web.HTTPNotFound()
return web.Response(
body=metrics_report,
content_type="text/plain",
charset="utf8",
)
async def _handler_cache_clear(self, request: web.Request) -> web.Response:
data = parse_from_mime_type(await request.text(), request.content_type)
config = CacheClearRPCSchema(data)
_, result = await command_single_registered_worker(config.render_lua())
return web.Response(
body=json.dumps(result),
content_type="application/json",
charset="utf8",
)
async def _handler_schema(self, _request: web.Request) -> web.Response:
return web.json_response(
kres_config_json_schema(), headers={"Access-Control-Allow-Origin": "*"}, dumps=partial(json.dumps, indent=4)
)
async def _handle_view_schema(self, _request: web.Request) -> web.Response:
"""
Provides a UI for visuallising and understanding JSON schema.
The feature in the Knot Resolver Manager to render schemas is unwanted, as it's completely
out of scope. However, it can be convinient. We therefore rely on a public web-based viewers
and provide just a redirect. If this feature ever breaks due to disapearance of the public
service, we can fix it. But we are not guaranteeing, that this will always work.
"""
return web.Response(
text="""
<html>
<head><title>Redirect to schema viewer</title></head>
<body>
<script>
// we are using JS in order to use proper host
let protocol = window.location.protocol;
let host = window.location.host;
let url = encodeURIComponent(`${protocol}//${host}/schema`);
window.location.replace(`https://json-schema.app/view/%23?url=${url}`);
</script>
<h1>JavaScript required for a dynamic redirect...</h1>
</body>
</html>
""",
content_type="text/html",
)
async def _handler_stop(self, _request: web.Request) -> web.Response:
"""
Route handler for shutting down the server (and whole manager)
"""
self._shutdown_event.set()
logger.info("Shutdown event triggered...")
return web.Response(text="Shutting down...")
async def _handler_reload(self, _request: web.Request) -> web.Response:
"""
Route handler for reloading the server
"""
logger.info("Reloading event triggered...")
await self._reload_config()
return web.Response(text="Reloading...")
async def _handler_processes(self, request: web.Request) -> web.Response:
"""
Route handler for listing PIDs of subprocesses
"""
proc_type: Optional[SubprocessType] = None
if "path" in request.match_info and len(request.match_info["path"]) > 0:
ptstr = request.match_info["path"]
if ptstr == "/kresd":
proc_type = SubprocessType.KRESD
elif ptstr == "/gc":
proc_type = SubprocessType.GC
elif ptstr == "/all":
proc_type = None
else:
return web.Response(text=f"Invalid process type '{ptstr}'", status=400)
return web.json_response(
await self._manager.get_processes(proc_type),
headers={"Access-Control-Allow-Origin": "*"},
dumps=partial(json.dumps, indent=4),
)
def _setup_routes(self) -> None:
self.app.add_routes(
[
web.get("/", self._handler_index),
web.get(r"/v1/config{path:.*}", self._handler_config_query),
web.put(r"/v1/config{path:.*}", self._handler_config_query),
web.delete(r"/v1/config{path:.*}", self._handler_config_query),
web.patch(r"/v1/config{path:.*}", self._handler_config_query),
web.post("/stop", self._handler_stop),
web.post("/reload", self._handler_reload),
web.get("/schema", self._handler_schema),
web.get("/schema/ui", self._handle_view_schema),
web.get("/metrics", self._handler_metrics),
web.get("/metrics/json", self._handler_metrics_json),
web.get("/metrics/prometheus", self._handler_metrics_prometheus),
web.post("/cache/clear", self._handler_cache_clear),
web.get("/processes{path:.*}", self._handler_processes),
]
)
async def _reconfigure_listen_address(self, config: KresConfig) -> None:
async with self.listen_lock:
mgn = config.management
# if the listen address did not change, do nothing
if self.listen == mgn:
return
# start the new listen address
nsite: Union[web.TCPSite, web.UnixSite]
if mgn.unix_socket:
nsite = web.UnixSite(self.runner, str(mgn.unix_socket))
logger.info(f"Starting API HTTP server on http+unix://{mgn.unix_socket}")
elif mgn.interface:
nsite = web.TCPSite(self.runner, str(mgn.interface.addr), int(mgn.interface.port))
logger.info(f"Starting API HTTP server on http://{mgn.interface.addr}:{mgn.interface.port}")
else:
raise KresManagerException("Requested API on unsupported configuration format.")
await nsite.start()
# stop the old listen
assert (self.listen is None) == (self.site is None)
if self.listen is not None and self.site is not None:
if self.listen.unix_socket:
logger.info(f"Stopping API HTTP server on http+unix://{mgn.unix_socket}")
elif self.listen.interface:
logger.info(
f"Stopping API HTTP server on http://{self.listen.interface.addr}:{self.listen.interface.port}"
)
await self.site.stop()
# save new state
self.listen = mgn
self.site = nsite
async def shutdown(self) -> None:
if self.site is not None:
await self.site.stop()
await self.runner.cleanup()
def get_exit_code(self) -> int:
return self._exit_code
async def _load_raw_config(config: Union[Path, Dict[str, Any]]) -> Dict[str, Any]:
# Initial configuration of the manager
if isinstance(config, Path):
if not config.exists():
raise KresManagerException(
f"Manager is configured to load config file at {config} on startup, but the file does not exist."
)
logger.info(f"Loading configuration from '{config}' file.")
config = try_to_parse(await readfile(config))
# validate the initial configuration
assert isinstance(config, dict)
return config
async def _load_config(config: Dict[str, Any]) -> KresConfig:
return KresConfig(config)
async def _init_config_store(config: Dict[str, Any]) -> ConfigStore:
config_validated = await _load_config(config)
return ConfigStore(config_validated)
async def _init_manager(config_store: ConfigStore) -> KresManager:
"""
Called asynchronously when the application initializes.
"""
# Instantiate subprocess controller (if we wanted to, we could switch it at this point)
controller = await get_best_controller_implementation(config_store.get())
# Create KresManager. This will perform autodetection of available service managers and
# select the most appropriate to use (or use the one configured directly)
manager = await KresManager.create(controller, config_store)
logger.info("Initial configuration applied. Process manager initialized...")
return manager
async def _deny_working_directory_changes(config_old: KresConfig, config_new: KresConfig) -> Result[None, str]:
if config_old.rundir != config_new.rundir:
return Result.err("Changing manager's `rundir` during runtime is not allowed.")
return Result.ok(None)
def _set_working_directory(config_raw: Dict[str, Any]) -> None:
try:
rundir = get_rundir_without_validation(config_raw)
except ValueError as e:
raise DataValidationError(str(e), "/rundir") from e
logger.debug(f"Changing working directory to '{rundir.to_path().absolute()}'.")
os.chdir(rundir.to_path())
def _lock_working_directory(attempt: int = 0) -> None:
# the following syscall is atomic, it's essentially the same as acquiring a lock
try:
pidfile_fd = os.open(PID_FILE_NAME, os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
except OSError as e:
if e.errno == errno.EEXIST and attempt == 0:
# the pid file exists, let's check PID
with open(PID_FILE_NAME, "r", encoding="utf-8") as f:
pid = int(f.read().strip())
try:
os.kill(pid, 0)
except OSError as e2:
if e2.errno == errno.ESRCH:
os.unlink(PID_FILE_NAME)
_lock_working_directory(attempt=attempt + 1)
return
raise KresManagerException(
"Another manager is running in the same working directory."
f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
) from e
raise KresManagerException(
"Another manager is running in the same working directory."
f" PID file is located at {os.getcwd()}/{PID_FILE_NAME}"
) from e
# now we know that we are the only manager running in this directory
# write PID to the pidfile and close it afterwards
pidfile = os.fdopen(pidfile_fd, "w")
pid = os.getpid()
pidfile.write(f"{pid}\n")
pidfile.close()
# make sure that the file is deleted on shutdown
atexit.register(lambda: os.unlink(PID_FILE_NAME))
async def _sigint_while_shutting_down():
logger.warning(
"Received SIGINT while already shutting down. Ignoring."
" If you want to forcefully stop the manager right now, use SIGTERM."
)
async def _sigterm_while_shutting_down():
logger.warning("Received SIGTERM. Invoking dirty shutdown!")
sys.exit(128 + signal.SIGTERM)
async def start_server(config: Path = CONFIG_FILE) -> int: # noqa: PLR0915
# This function is quite long, but it describes how manager runs. So let's silence pylint
# pylint: disable=too-many-statements
start_time = time()
working_directory_on_startup = os.getcwd()
manager: Optional[KresManager] = None
# Block signals during initialization to force their processing once everything is ready
signal.pthread_sigmask(signal.SIG_BLOCK, Server.all_handled_signals())
# Check if we are running under the intended user, if not, log a warning message
pw_username = getpwuid(os.getuid()).pw_name
if pw_username != USER:
logger.warning(
f"Knot Resolver does not run as the default '{USER}' user, but as '{pw_username}' instead."
" This may or may not affect the configuration validation and the proper functioning of the resolver."
)
# before starting server, initialize the subprocess controller, config store, etc. Any errors during inicialization
# are fatal
try:
# Make sure that the config path does not change meaning when we change working directory
config = config.absolute()
# Preprocess config - load from file or in general take it to the last step before validation.
config_raw = await _load_raw_config(config)
# before processing any configuration, set validation context
# - resolve_root: root against which all relative paths will be resolved
# - strict_validation: check for path existence during configuration validation
# - permissions_default: validate dirs/files rwx permissions against default user:group in constants
set_global_validation_context(Context(config.parent, True, False))
# We want to change cwd as soon as possible. Some parts of the codebase are using os.getcwd() to get the
# working directory.
#
# If we fail to read rundir from unparsed config, the first config validation error comes from here
_set_working_directory(config_raw)
# We don't want more than one manager in a single working directory. So we lock it with a PID file.
# Warning - this does not prevent multiple managers with the same naming of kresd service.
_lock_working_directory()
# set_global_validation_context(Context(config.parent))
# After the working directory is set, we can initialize proper config store with a newly parsed configuration.
config_store = await _init_config_store(config_raw)
# Some "constants" need to be loaded from the initial config, some need to be stored from the initial run conditions
await init_user_constants(config_store, working_directory_on_startup)
# This behaviour described above with paths means, that we MUST NOT allow `rundir` change after initialization.
# It would cause strange problems because every other path configuration depends on it. Therefore, we have to
# add a check to the config store, which disallows changes.
await config_store.register_verifier(_deny_working_directory_changes)
# Up to this point, we have been logging to memory buffer. But now, when we have the configuration loaded, we
# can flush the buffer into the proper place
await logger_init(config_store)
# With configuration on hand, we can initialize monitoring. We want to do this before any subprocesses are
# started, therefore before initializing manager
await metrics.init_prometheus(config_store)
await files.init_files_watchdog(config_store)
# After we have loaded the configuration, we can start worrying about subprocess management.
manager = await _init_manager(config_store)
# prepare instance of the server (no side effects)
server = Server(config_store, config, manager)
# add Server's shutdown trigger to the manager
manager.add_shutdown_trigger(server.trigger_shutdown)
except SubprocessControllerExecError as e:
# if we caught this exception, some component wants to perform a reexec during startup. Most likely, it would
# be a subprocess manager like supervisord, which wants to make sure the manager runs under supervisord in
# the process tree. So now we stop everything, and exec what we are told to. We are assuming, that the thing
# we'll exec will invoke us again.
logger.info("Exec requested with arguments: %s", str(e.exec_args))
# unblock signals, this could actually terminate us straight away
signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
# run exit functions
atexit.run_callbacks()
# and finally exec what we were told to exec
os.execl(*e.exec_args)
except KresManagerException as e:
# We caught an error with a pretty error message. Just print it and exit.
logger.error(e)
return 1
except BaseException:
logger.error("Uncaught generic exception during manager inicialization...", exc_info=True)
return 1
# At this point, all backend functionality-providing components are initialized. It's therefore save to start
# the API server.
try:
await server.start()
except OSError as e:
if e.errno in (errno.EADDRINUSE, errno.EADDRNOTAVAIL):
# fancy error reporting of network binding errors
logger.error(str(e))
await manager.stop()
return 1
raise
# At this point, pretty much everything is ready to go. We should just make sure the user can shut
# the manager down with signals.
server.bind_signal_handlers()
signal.pthread_sigmask(signal.SIG_UNBLOCK, Server.all_handled_signals())
logger.info(f"Manager fully initialized and running in {round(time() - start_time, 3)} seconds")
# notify systemd/anything compatible that we are ready
systemd_notify(READY="1")
await server.wait_for_shutdown()
# notify systemd that we are shutting down
systemd_notify(STOPPING="1")
# Ok, now we are tearing everything down.
# First of all, let's block all unwanted interruptions. We don't want to be reconfiguring kresd's while
# shutting down.
signal.pthread_sigmask(signal.SIG_BLOCK, Server.all_handled_signals())
server.unbind_signal_handlers()
# on the other hand, we want to immediatelly stop when the user really wants us to stop
asyncio_compat.add_async_signal_handler(signal.SIGTERM, _sigterm_while_shutting_down)
asyncio_compat.add_async_signal_handler(signal.SIGINT, _sigint_while_shutting_down)
signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGTERM, signal.SIGINT})
# After triggering shutdown, we neet to clean everything up
logger.info("Stopping API service...")
await server.shutdown()
logger.info("Stopping kresd manager...")
await manager.stop()
logger.info(f"The manager run for {round(time() - start_time)} seconds...")
return server.get_exit_code()
|