summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/cert_mgr.py
blob: 9b68e85ca44e1f2fc627196ea396da1f1feda203 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from cephadm.ssl_cert_utils import SSLCerts, SSLConfigException
from typing import TYPE_CHECKING, Tuple, Union, List

if TYPE_CHECKING:
    from cephadm.module import CephadmOrchestrator


class CertMgr:

    CEPHADM_ROOT_CA_CERT = 'cephadm_root_ca_cert'
    CEPHADM_ROOT_CA_KEY = 'cephadm_root_ca_key'

    def __init__(self, mgr: "CephadmOrchestrator", ip: str) -> None:
        self.ssl_certs: SSLCerts = SSLCerts()
        old_cert = mgr.cert_key_store.get_cert(self.CEPHADM_ROOT_CA_CERT)
        old_key = mgr.cert_key_store.get_key(self.CEPHADM_ROOT_CA_KEY)
        if old_key and old_cert:
            try:
                self.ssl_certs.load_root_credentials(old_cert, old_key)
            except SSLConfigException:
                raise Exception("Cannot load cephadm root CA certificates.")
        else:
            self.ssl_certs.generate_root_cert(addr=ip)
            mgr.cert_key_store.save_cert(self.CEPHADM_ROOT_CA_CERT, self.ssl_certs.get_root_cert())
            mgr.cert_key_store.save_key(self.CEPHADM_ROOT_CA_KEY, self.ssl_certs.get_root_key())

    def get_root_ca(self) -> str:
        return self.ssl_certs.get_root_cert()

    def generate_cert(self, host_fqdn: Union[str, List[str]], node_ip: Union[str, List[str]]) -> Tuple[str, str]:
        return self.ssl_certs.generate_cert(host_fqdn, node_ip)