1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
"""
This file contains autodetection logic for available subprocess controllers. Because we have to catch errors
from imports, they are located in functions which are invoked at the end of this file.
We supported multiple subprocess controllers while developing it. It now all converged onto just supervisord.
The interface however remains so that different controllers can be added in the future.
"""
# pylint: disable=import-outside-toplevel
import asyncio
import logging
from typing import List, Optional
from knot_resolver_manager.manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.controller.interface import SubprocessController
logger = logging.getLogger(__name__)
"""
List of all subprocess controllers that are available in order of priority.
It is filled dynamically based on available modules that do not fail to import.
"""
_registered_controllers: List[SubprocessController] = []
def try_supervisord():
"""
Attempt to load supervisord controllers.
"""
try:
from knot_resolver_manager.controller.supervisord import SupervisordSubprocessController
_registered_controllers.append(SupervisordSubprocessController())
except ImportError:
logger.error("Failed to import modules related to supervisord service manager", exc_info=True)
async def get_best_controller_implementation(config: KresConfig) -> SubprocessController:
logger.info("Starting service manager auto-selection...")
if len(_registered_controllers) == 0:
logger.error("No controllers are available! Did you install all dependencies?")
raise LookupError("No service managers available!")
# check all controllers concurrently
res = await asyncio.gather(*(cont.is_controller_available(config) for cont in _registered_controllers))
logger.info(
"Available subprocess controllers are %s",
str(tuple((str(c) for r, c in zip(res, _registered_controllers) if r))),
)
# take the first one on the list which is available
for avail, controller in zip(res, _registered_controllers):
if avail:
logger.info("Selected controller '%s'", str(controller))
return controller
# or fail
raise LookupError("Can't find any available service manager!")
def list_controller_names() -> List[str]:
"""
Returns a list of names of registered controllers. The listed controllers are not necessarly functional.
"""
return [str(controller) for controller in sorted(_registered_controllers, key=str)]
async def get_controller_by_name(config: KresConfig, name: str) -> SubprocessController:
logger.debug("Subprocess controller selected manualy by the user, testing feasibility...")
controller: Optional[SubprocessController] = None
for c in sorted(_registered_controllers, key=str):
if str(c).startswith(name):
if str(c) != name:
logger.debug("Assuming '%s' is a shortcut for '%s'", name, str(c))
controller = c
break
if controller is None:
logger.error("Subprocess controller with name '%s' was not found", name)
raise LookupError(f"No subprocess controller named '{name}' found")
if await controller.is_controller_available(config):
logger.info("Selected controller '%s'", str(controller))
return controller
else:
raise LookupError("The selected subprocess controller is not available for use on this system.")
# run the imports on module load
try_supervisord()
|