1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
from typing import TYPE_CHECKING, Tuple, Union, List
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
class CertMgr:
CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'
def __init__(self, mgr: "CephadmOrchestrator", ip: str) -> None:
self.ssl_certs: SSLCerts = SSLCerts()
old_cert = mgr.cert_key_store.get_cert(self.CEPHADM_ROOT_CA_CERT)
old_key = mgr.cert_key_store.get_key(self.CEPHADM_ROOT_CA_KEY)
if old_key and old_cert:
try:
self.ssl_certs.load_root_credentials(old_cert, old_key)
except SSLConfigException:
raise Exception("Cannot load cephadm root CA certificates.")
else:
self.ssl_certs.generate_root_cert(addr=ip)
mgr.cert_key_store.save_cert(self.CEPHADM_ROOT_CA_CERT, self.ssl_certs.get_root_cert())
mgr.cert_key_store.save_key(self.CEPHADM_ROOT_CA_KEY, self.ssl_certs.get_root_key())
def get_root_ca(self) -> str:
return self.ssl_certs.get_root_cert()
def generate_cert(self, host_fqdn: Union[str, List[str]], node_ip: Union[str, List[str]]) -> Tuple[str, str]:
return self.ssl_certs.generate_cert(host_fqdn, node_ip)
|