diff options
author | Adam King <47704447+adk3798@users.noreply.github.com> | 2023-12-04 13:32:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-12-04 13:32:52 +0100 |
commit | 1680e466aab77cdf9ba07394bea664106580b32b (patch) | |
tree | b0082058c812d3e66e0dd977dc89ef9fab05c8d9 | |
parent | Merge pull request #54679 from Suyashd999/add-rgw (diff) | |
parent | cephadm: black format daemons/tracing.py (diff) | |
download | ceph-1680e466aab77cdf9ba07394bea664106580b32b.tar.xz ceph-1680e466aab77cdf9ba07394bea664106580b32b.zip |
Merge pull request #54441 from phlogistonjohn/jjm-cephadm-breakupv19.0.0
cephadm: break various daemon type classes out to smaller files in cephadmlib
Reviewed-by: Adam King <adking@redhat.com>
-rwxr-xr-x | src/cephadm/cephadm.py | 2126 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/__init__.py | 24 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/ceph.py | 462 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/custom.py | 222 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/ingress.py | 290 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/iscsi.py | 286 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/monitoring.py | 377 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/nfs.py | 225 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/nvmeof.py | 193 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/snmp.py | 226 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/daemons/tracing.py | 116 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/deployment_utils.py | 35 | ||||
-rw-r--r-- | src/cephadm/cephadmlib/file_utils.py | 4 | ||||
-rw-r--r-- | src/cephadm/tests/fixtures.py | 82 | ||||
-rw-r--r-- | src/cephadm/tests/test_cephadm.py | 126 | ||||
-rw-r--r-- | src/cephadm/tests/test_daemon_form.py | 3 | ||||
-rw-r--r-- | src/cephadm/tests/test_deploy.py | 96 | ||||
-rw-r--r-- | src/cephadm/tests/test_nfs.py | 6 |
18 files changed, 2684 insertions, 2215 deletions
diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index d1a2671550b..98a2585c760 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -28,29 +28,14 @@ from functools import wraps from glob import glob from io import StringIO from threading import Thread, Event -from urllib.error import HTTPError, URLError from urllib.request import urlopen, Request from pathlib import Path from cephadmlib.constants import ( # default images - DEFAULT_ALERT_MANAGER_IMAGE, - DEFAULT_ELASTICSEARCH_IMAGE, - DEFAULT_GRAFANA_IMAGE, - DEFAULT_HAPROXY_IMAGE, DEFAULT_IMAGE, DEFAULT_IMAGE_IS_MAIN, DEFAULT_IMAGE_RELEASE, - DEFAULT_JAEGER_AGENT_IMAGE, - DEFAULT_JAEGER_COLLECTOR_IMAGE, - DEFAULT_JAEGER_QUERY_IMAGE, - DEFAULT_KEEPALIVED_IMAGE, - DEFAULT_LOKI_IMAGE, - DEFAULT_NODE_EXPORTER_IMAGE, - DEFAULT_NVMEOF_IMAGE, - DEFAULT_PROMETHEUS_IMAGE, - DEFAULT_PROMTAIL_IMAGE, - DEFAULT_SNMP_GATEWAY_IMAGE, # other constant values CEPH_CONF, CEPH_CONF_DIR, @@ -82,7 +67,6 @@ from cephadmlib.context_getters import ( get_config_and_keyring, get_parm, read_configuration_source, - should_log_to_journald, ) from cephadmlib.exceptions import ( ClusterAlreadyExists, @@ -107,7 +91,6 @@ from cephadmlib.container_engines import ( registry_login, ) from cephadmlib.data_utils import ( - dict_get, dict_get_join, get_legacy_config_fsid, is_fsid, @@ -120,7 +103,6 @@ from cephadmlib.file_utils import ( get_file_timestamp, makedirs, pathify, - populate_files, read_file, recursive_chown, touch, @@ -134,7 +116,6 @@ from cephadmlib.net_utils import ( check_subnet, get_fqdn, get_hostname, - get_ip_addresses, get_short_hostname, ip_in_subnets, is_ipv6, @@ -145,7 +126,7 @@ from cephadmlib.net_utils import ( wrap_ipv6, ) from cephadmlib.locking import FileLock -from cephadmlib.daemon_identity import DaemonIdentity, DaemonSubIdentity +from cephadmlib.daemon_identity import DaemonIdentity from cephadmlib.packagers import create_packager, Packager from cephadmlib.logging import cephadm_init_logging, Highlight, LogDestination from cephadmlib.systemd import check_unit, check_units @@ -176,6 +157,19 @@ from cephadmlib.container_daemon_form import ( from cephadmlib.sysctl import install_sysctl, migrate_sysctl_dir from cephadmlib.firewalld import Firewalld, update_firewalld from cephadmlib import templating +from cephadmlib.daemons.ceph import get_ceph_mounts_for_type, ceph_daemons +from cephadmlib.daemons import ( + Ceph, + CephIscsi, + CephNvmeof, + CustomContainer, + HAproxy, + Keepalived, + Monitoring, + NFSGanesha, + SNMPGateway, + Tracing, +) FuncT = TypeVar('FuncT', bound=Callable) @@ -211,1992 +205,6 @@ class ContainerInfo: ################################## -@register_daemon_form -class Ceph(ContainerDaemonForm): - _daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror', - 'crash', 'cephfs-mirror') - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - # TODO: figure out a way to un-special-case osd - return daemon_type in cls._daemons and daemon_type != 'osd' - - def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None: - self.ctx = ctx - self._identity = ident - self.user_supplied_config = False - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Ceph': - return cls(ctx, ident) - - @property - def identity(self) -> DaemonIdentity: - return self._identity - - def firewall_service_name(self) -> str: - if self.identity.daemon_type == 'mon': - return 'ceph-mon' - elif self.identity.daemon_type in ['mgr', 'mds']: - return 'ceph' - return '' - - def container(self, ctx: CephadmContext) -> CephContainer: - # previous to being a ContainerDaemonForm, this make_var_run - # call was hard coded in the deploy path. Eventually, it would be - # good to move this somwhere cleaner and avoid needing to know the - # uid/gid here. - uid, gid = self.uid_gid(ctx) - make_var_run(ctx, ctx.fsid, uid, gid) - - # mon and osd need privileged in order for libudev to query devices - privileged = self.identity.daemon_type in ['mon', 'osd'] - ctr = daemon_to_container(ctx, self, privileged=privileged) - ctr = to_deployment_container(ctx, ctr) - config_json = fetch_configs(ctx) - if self.identity.daemon_type == 'mon' and config_json is not None: - if 'crush_location' in config_json: - c_loc = config_json['crush_location'] - # was originally "c.args.extend(['--set-crush-location', c_loc])" - # but that doesn't seem to persist in the object after it's passed - # in further function calls - ctr.args = ctr.args + ['--set-crush-location', c_loc] - return ctr - - _uid_gid: Optional[Tuple[int, int]] = None - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - if self._uid_gid is None: - self._uid_gid = extract_uid_gid(ctx) - return self._uid_gid - - def config_and_keyring( - self, ctx: CephadmContext - ) -> Tuple[Optional[str], Optional[str]]: - return get_config_and_keyring(ctx) - - def get_daemon_args(self) -> List[str]: - if self.identity.daemon_type == 'crash': - return [] - r = [ - '--setuser', 'ceph', - '--setgroup', 'ceph', - '--default-log-to-file=false', - ] - log_to_journald = should_log_to_journald(self.ctx) - if log_to_journald: - r += [ - '--default-log-to-journald=true', - '--default-log-to-stderr=false', - ] - else: - r += [ - '--default-log-to-stderr=true', - '--default-log-stderr-prefix=debug ', - ] - if self.identity.daemon_type == 'mon': - r += [ - '--default-mon-cluster-log-to-file=false', - ] - if log_to_journald: - r += [ - '--default-mon-cluster-log-to-journald=true', - '--default-mon-cluster-log-to-stderr=false', - ] - else: - r += ['--default-mon-cluster-log-to-stderr=true'] - return r - - @staticmethod - def get_ceph_mounts( - ctx: CephadmContext, - ident: DaemonIdentity, - no_config: bool = False, - ) -> Dict[str, str]: - # Warning: This is a hack done for more expedient refactoring - mounts = _get_container_mounts_for_type( - ctx, ident.fsid, ident.daemon_type - ) - data_dir = ident.data_dir(ctx.data_dir) - if ident.daemon_type == 'rgw': - cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % ( - ident.daemon_id - ) - else: - cdata_dir = '/var/lib/ceph/%s/ceph-%s' % ( - ident.daemon_type, - ident.daemon_id, - ) - if ident.daemon_type != 'crash': - mounts[data_dir] = cdata_dir + ':z' - if not no_config: - mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z' - if ident.daemon_type in [ - 'rbd-mirror', - 'cephfs-mirror', - 'crash', - 'ceph-exporter', - ]: - # these do not search for their keyrings in a data directory - mounts[ - data_dir + '/keyring' - ] = '/etc/ceph/ceph.client.%s.%s.keyring' % ( - ident.daemon_type, - ident.daemon_id, - ) - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - no_config = bool( - getattr(ctx, 'config', None) and self.user_supplied_config - ) - cm = self.get_ceph_mounts( - ctx, - self.identity, - no_config=no_config, - ) - mounts.update(cm) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.append(ctx.container_engine.unlimited_pids_option) - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - ident = self.identity - if ident.daemon_type == 'rgw': - name = 'client.rgw.%s' % ident.daemon_id - elif ident.daemon_type == 'rbd-mirror': - name = 'client.rbd-mirror.%s' % ident.daemon_id - elif ident.daemon_type == 'cephfs-mirror': - name = 'client.cephfs-mirror.%s' % ident.daemon_id - elif ident.daemon_type == 'crash': - name = 'client.crash.%s' % ident.daemon_id - elif ident.daemon_type in ['mon', 'mgr', 'mds', 'osd']: - name = ident.daemon_name - else: - raise ValueError(ident) - args.extend(['-n', name]) - if ident.daemon_type != 'crash': - args.append('-f') - args.extend(self.get_daemon_args()) - - def customize_container_envs( - self, ctx: CephadmContext, envs: List[str] - ) -> None: - envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728') - - def default_entrypoint(self) -> str: - ep = { - 'rgw': '/usr/bin/radosgw', - 'rbd-mirror': '/usr/bin/rbd-mirror', - 'cephfs-mirror': '/usr/bin/cephfs-mirror', - } - daemon_type = self.identity.daemon_type - return ep.get(daemon_type) or f'/usr/bin/ceph-{daemon_type}' - -################################## - - -@register_daemon_form -class OSD(Ceph): - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - # TODO: figure out a way to un-special-case osd - return daemon_type == 'osd' - - def __init__( - self, - ctx: CephadmContext, - ident: DaemonIdentity, - osd_fsid: Optional[str] = None, - ) -> None: - super().__init__(ctx, ident) - self._osd_fsid = osd_fsid - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'OSD': - osd_fsid = getattr(ctx, 'osd_fsid', None) - if osd_fsid is None: - logger.info( - 'Creating an OSD daemon form without an OSD FSID value' - ) - return cls(ctx, ident, osd_fsid) - - @staticmethod - def get_sysctl_settings() -> List[str]: - return [ - '# allow a large number of OSDs', - 'fs.aio-max-nr = 1048576', - 'kernel.pid_max = 4194304', - ] - - def firewall_service_name(self) -> str: - return 'ceph' - - @property - def osd_fsid(self) -> Optional[str]: - return self._osd_fsid - - -################################## - - -@register_daemon_form -class SNMPGateway(ContainerDaemonForm): - """Defines an SNMP gateway between Prometheus and SNMP monitoring Frameworks""" - daemon_type = 'snmp-gateway' - SUPPORTED_VERSIONS = ['V2c', 'V3'] - default_image = DEFAULT_SNMP_GATEWAY_IMAGE - DEFAULT_PORT = 9464 - env_filename = 'snmp-gateway.conf' - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - ctx: CephadmContext, - fsid: str, - daemon_id: Union[int, str], - config_json: Dict[str, Any], - image: Optional[str] = None) -> None: - self.ctx = ctx - self.fsid = fsid - self.daemon_id = daemon_id - self.image = image or SNMPGateway.default_image - - self.uid = config_json.get('uid', 0) - self.gid = config_json.get('gid', 0) - - self.destination = config_json.get('destination', '') - self.snmp_version = config_json.get('snmp_version', 'V2c') - self.snmp_community = config_json.get('snmp_community', 'public') - self.log_level = config_json.get('log_level', 'info') - self.snmp_v3_auth_username = config_json.get('snmp_v3_auth_username', '') - self.snmp_v3_auth_password = config_json.get('snmp_v3_auth_password', '') - self.snmp_v3_auth_protocol = config_json.get('snmp_v3_auth_protocol', '') - self.snmp_v3_priv_protocol = config_json.get('snmp_v3_priv_protocol', '') - self.snmp_v3_priv_password = config_json.get('snmp_v3_priv_password', '') - self.snmp_v3_engine_id = config_json.get('snmp_v3_engine_id', '') - - self.validate() - - @classmethod - def init(cls, ctx: CephadmContext, fsid: str, - daemon_id: Union[int, str]) -> 'SNMPGateway': - cfgs = fetch_configs(ctx) - assert cfgs # assert some config data was found - return cls(ctx, fsid, daemon_id, cfgs, ctx.image) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'SNMPGateway': - return cls.init(ctx, ident.fsid, ident.daemon_id) - - @property - def identity(self) -> DaemonIdentity: - return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) - - @staticmethod - def get_version(ctx: CephadmContext, fsid: str, daemon_id: str) -> Optional[str]: - """Return the version of the notifier from it's http endpoint""" - path = os.path.join(ctx.data_dir, fsid, f'snmp-gateway.{daemon_id}', 'unit.meta') - try: - with open(path, 'r') as env: - metadata = json.loads(env.read()) - except (OSError, json.JSONDecodeError): - return None - - ports = metadata.get('ports', []) - if not ports: - return None - - try: - with urlopen(f'http://127.0.0.1:{ports[0]}/') as r: - html = r.read().decode('utf-8').split('\n') - except (HTTPError, URLError): - return None - - for h in html: - stripped = h.strip() - if stripped.startswith(('<pre>', '<PRE>')) and \ - stripped.endswith(('</pre>', '</PRE>')): - # <pre>(version=1.2.1, branch=HEAD, revision=7... - return stripped.split(',')[0].split('version=')[1] - - return None - - @property - def port(self) -> int: - endpoints = fetch_endpoints(self.ctx) - if not endpoints: - return self.DEFAULT_PORT - return endpoints[0].port - - def get_daemon_args(self) -> List[str]: - v3_args = [] - base_args = [ - f'--web.listen-address=:{self.port}', - f'--snmp.destination={self.destination}', - f'--snmp.version={self.snmp_version}', - f'--log.level={self.log_level}', - '--snmp.trap-description-template=/etc/snmp_notifier/description-template.tpl' - ] - - if self.snmp_version == 'V3': - # common auth settings - v3_args.extend([ - '--snmp.authentication-enabled', - f'--snmp.authentication-protocol={self.snmp_v3_auth_protocol}', - f'--snmp.security-engine-id={self.snmp_v3_engine_id}' - ]) - # authPriv setting is applied if we have a privacy protocol setting - if self.snmp_v3_priv_protocol: - v3_args.extend([ - '--snmp.private-enabled', - f'--snmp.private-protocol={self.snmp_v3_priv_protocol}' - ]) - - return base_args + v3_args - - @property - def data_dir(self) -> str: - return os.path.join(self.ctx.data_dir, self.ctx.fsid, f'{self.daemon_type}.{self.daemon_id}') - - @property - def conf_file_path(self) -> str: - return os.path.join(self.data_dir, self.env_filename) - - def create_daemon_conf(self) -> None: - """Creates the environment file holding 'secrets' passed to the snmp-notifier daemon""" - with write_new(self.conf_file_path) as f: - if self.snmp_version == 'V2c': - f.write(f'SNMP_NOTIFIER_COMMUNITY={self.snmp_community}\n') - else: - f.write(f'SNMP_NOTIFIER_AUTH_USERNAME={self.snmp_v3_auth_username}\n') - f.write(f'SNMP_NOTIFIER_AUTH_PASSWORD={self.snmp_v3_auth_password}\n') - if self.snmp_v3_priv_password: - f.write(f'SNMP_NOTIFIER_PRIV_PASSWORD={self.snmp_v3_priv_password}\n') - - def validate(self) -> None: - """Validate the settings - - Raises: - Error: if the fsid doesn't look like an fsid - Error: if the snmp version is not supported - Error: destination IP and port address missing - """ - if not is_fsid(self.fsid): - raise Error(f'not a valid fsid: {self.fsid}') - - if self.snmp_version not in SNMPGateway.SUPPORTED_VERSIONS: - raise Error(f'not a valid snmp version: {self.snmp_version}') - - if not self.destination: - raise Error('config is missing destination attribute(<ip>:<port>) of the target SNMP listener') - - def container(self, ctx: CephadmContext) -> CephContainer: - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - return self.uid, self.gid - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.append(f'--env-file={self.conf_file_path}') - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend(self.get_daemon_args()) - - -################################## -@register_daemon_form -class Monitoring(ContainerDaemonForm): - """Define the configs for the monitoring containers""" - - port_map = { - 'prometheus': [9095], # Avoid default 9090, due to conflict with cockpit UI - 'node-exporter': [9100], - 'grafana': [3000], - 'alertmanager': [9093, 9094], - 'loki': [3100], - 'promtail': [9080] - } - - components = { - 'prometheus': { - 'image': DEFAULT_PROMETHEUS_IMAGE, - 'cpus': '2', - 'memory': '4GB', - 'args': [ - '--config.file=/etc/prometheus/prometheus.yml', - '--storage.tsdb.path=/prometheus', - ], - 'config-json-files': [ - 'prometheus.yml', - ], - }, - 'loki': { - 'image': DEFAULT_LOKI_IMAGE, - 'cpus': '1', - 'memory': '1GB', - 'args': [ - '--config.file=/etc/loki/loki.yml', - ], - 'config-json-files': [ - 'loki.yml' - ], - }, - 'promtail': { - 'image': DEFAULT_PROMTAIL_IMAGE, - 'cpus': '1', - 'memory': '1GB', - 'args': [ - '--config.file=/etc/promtail/promtail.yml', - ], - 'config-json-files': [ - 'promtail.yml', - ], - }, - 'node-exporter': { - 'image': DEFAULT_NODE_EXPORTER_IMAGE, - 'cpus': '1', - 'memory': '1GB', - 'args': [ - '--no-collector.timex' - ], - }, - 'grafana': { - 'image': DEFAULT_GRAFANA_IMAGE, - 'cpus': '2', - 'memory': '4GB', - 'args': [], - 'config-json-files': [ - 'grafana.ini', - 'provisioning/datasources/ceph-dashboard.yml', - 'certs/cert_file', - 'certs/cert_key', - ], - }, - 'alertmanager': { - 'image': DEFAULT_ALERT_MANAGER_IMAGE, - 'cpus': '2', - 'memory': '2GB', - 'args': [ - '--cluster.listen-address=:{}'.format(port_map['alertmanager'][1]), - ], - 'config-json-files': [ - 'alertmanager.yml', - ], - 'config-json-args': [ - 'peers', - ], - }, - } # type: ignore - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return daemon_type in cls.components - - @staticmethod - def get_version(ctx, container_id, daemon_type): - # type: (CephadmContext, str, str) -> str - """ - :param: daemon_type Either "prometheus", "alertmanager", "loki", "promtail" or "node-exporter" - """ - assert daemon_type in ('prometheus', 'alertmanager', 'node-exporter', 'loki', 'promtail') - cmd = daemon_type.replace('-', '_') - code = -1 - err = '' - out = '' - version = '' - if daemon_type == 'alertmanager': - for cmd in ['alertmanager', 'prometheus-alertmanager']: - out, err, code = call(ctx, [ - ctx.container_engine.path, 'exec', container_id, cmd, - '--version' - ], verbosity=CallVerbosity.QUIET) - if code == 0: - break - cmd = 'alertmanager' # reset cmd for version extraction - else: - out, err, code = call(ctx, [ - ctx.container_engine.path, 'exec', container_id, cmd, '--version' - ], verbosity=CallVerbosity.QUIET) - if code == 0: - if err.startswith('%s, version ' % cmd): - version = err.split(' ')[2] - elif out.startswith('%s, version ' % cmd): - version = out.split(' ')[2] - return version - - @staticmethod - def extract_uid_gid( - ctx: CephadmContext, daemon_type: str - ) -> Tuple[int, int]: - if daemon_type == 'prometheus': - uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus') - elif daemon_type == 'node-exporter': - uid, gid = 65534, 65534 - elif daemon_type == 'grafana': - uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana') - elif daemon_type == 'loki': - uid, gid = extract_uid_gid(ctx, file_path='/etc/loki') - elif daemon_type == 'promtail': - uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail') - elif daemon_type == 'alertmanager': - uid, gid = extract_uid_gid( - ctx, file_path=['/etc/alertmanager', '/etc/prometheus'] - ) - else: - raise Error('{} not implemented yet'.format(daemon_type)) - return uid, gid - - def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None: - self.ctx = ctx - self._identity = ident - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Monitoring': - return cls(ctx, ident) - - @property - def identity(self) -> DaemonIdentity: - return self._identity - - def container(self, ctx: CephadmContext) -> CephContainer: - self._prevalidate(ctx) - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - return self.extract_uid_gid(ctx, self.identity.daemon_type) - - def _prevalidate(self, ctx: CephadmContext) -> None: - # before being refactored into a ContainerDaemonForm these checks were - # done inside the deploy function. This was the only "family" of daemons - # that performed these checks in that location - daemon_type = self.identity.daemon_type - config = fetch_configs(ctx) # type: ignore - required_files = self.components[daemon_type].get( - 'config-json-files', list() - ) - required_args = self.components[daemon_type].get( - 'config-json-args', list() - ) - if required_files: - if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore - raise Error( - '{} deployment requires config-json which must ' - 'contain file content for {}'.format( - daemon_type.capitalize(), ', '.join(required_files) - ) - ) - if required_args: - if not config or not all(c in config.keys() for c in required_args): # type: ignore - raise Error( - '{} deployment requires config-json which must ' - 'contain arg for {}'.format( - daemon_type.capitalize(), ', '.join(required_args) - ) - ) - - def get_daemon_args(self) -> List[str]: - ctx = self.ctx - daemon_type = self.identity.daemon_type - metadata = self.components[daemon_type] - r = list(metadata.get('args', [])) - # set ip and port to bind to for nodeexporter,alertmanager,prometheus - if daemon_type not in ['grafana', 'loki', 'promtail']: - ip = '' - port = self.port_map[daemon_type][0] - meta = fetch_meta(ctx) - if meta: - if 'ip' in meta and meta['ip']: - ip = meta['ip'] - if 'ports' in meta and meta['ports']: - port = meta['ports'][0] - r += [f'--web.listen-address={ip}:{port}'] - if daemon_type == 'prometheus': - config = fetch_configs(ctx) - retention_time = config.get('retention_time', '15d') - retention_size = config.get('retention_size', '0') # default to disabled - r += [f'--storage.tsdb.retention.time={retention_time}'] - r += [f'--storage.tsdb.retention.size={retention_size}'] - scheme = 'http' - host = get_fqdn() - # in case host is not an fqdn then we use the IP to - # avoid producing a broken web.external-url link - if '.' not in host: - ipv4_addrs, ipv6_addrs = get_ip_addresses(get_hostname()) - # use the first ipv4 (if any) otherwise use the first ipv6 - addr = next(iter(ipv4_addrs or ipv6_addrs), None) - host = wrap_ipv6(addr) if addr else host - r += [f'--web.external-url={scheme}://{host}:{port}'] - if daemon_type == 'alertmanager': - config = fetch_configs(ctx) - peers = config.get('peers', list()) # type: ignore - for peer in peers: - r += ['--cluster.peer={}'.format(peer)] - try: - r += [f'--web.config.file={config["web_config"]}'] - except KeyError: - pass - # some alertmanager, by default, look elsewhere for a config - r += ['--config.file=/etc/alertmanager/alertmanager.yml'] - if daemon_type == 'promtail': - r += ['--config.expand-env'] - if daemon_type == 'prometheus': - config = fetch_configs(ctx) - try: - r += [f'--web.config.file={config["web_config"]}'] - except KeyError: - pass - if daemon_type == 'node-exporter': - config = fetch_configs(ctx) - try: - r += [f'--web.config.file={config["web_config"]}'] - except KeyError: - pass - r += ['--path.procfs=/host/proc', - '--path.sysfs=/host/sys', - '--path.rootfs=/rootfs'] - return r - - def _get_container_mounts(self, data_dir: str) -> Dict[str, str]: - ctx = self.ctx - daemon_type = self.identity.daemon_type - mounts: Dict[str, str] = {} - log_dir = get_log_dir(self.identity.fsid, ctx.log_dir) - if daemon_type == 'prometheus': - mounts[ - os.path.join(data_dir, 'etc/prometheus') - ] = '/etc/prometheus:Z' - mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z' - elif daemon_type == 'loki': - mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z' - mounts[os.path.join(data_dir, 'data')] = '/loki:Z' - elif daemon_type == 'promtail': - mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z' - mounts[log_dir] = '/var/log/ceph:z' - mounts[os.path.join(data_dir, 'data')] = '/promtail:Z' - elif daemon_type == 'node-exporter': - mounts[ - os.path.join(data_dir, 'etc/node-exporter') - ] = '/etc/node-exporter:Z' - mounts['/proc'] = '/host/proc:ro' - mounts['/sys'] = '/host/sys:ro' - mounts['/'] = '/rootfs:ro' - elif daemon_type == 'grafana': - mounts[ - os.path.join(data_dir, 'etc/grafana/grafana.ini') - ] = '/etc/grafana/grafana.ini:Z' - mounts[ - os.path.join(data_dir, 'etc/grafana/provisioning/datasources') - ] = '/etc/grafana/provisioning/datasources:Z' - mounts[ - os.path.join(data_dir, 'etc/grafana/certs') - ] = '/etc/grafana/certs:Z' - mounts[ - os.path.join(data_dir, 'data/grafana.db') - ] = '/var/lib/grafana/grafana.db:Z' - elif daemon_type == 'alertmanager': - mounts[ - os.path.join(data_dir, 'etc/alertmanager') - ] = '/etc/alertmanager:Z' - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - mounts.update(self._get_container_mounts(data_dir)) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - uid, _ = self.uid_gid(ctx) - monitoring_args = [ - '--user', - str(uid), - # FIXME: disable cpu/memory limits for the time being (not supported - # by ubuntu 18.04 kernel!) - ] - args.extend(monitoring_args) - if self.identity.daemon_type == 'node-exporter': - # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys', - # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation - # between the node-exporter container and the host to avoid selinux denials - args.extend(['--security-opt', 'label=disable']) - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend(self.get_daemon_args()) - - def default_entrypoint(self) -> str: - return '' - -################################## - - -@register_daemon_form -class NFSGanesha(ContainerDaemonForm): - """Defines a NFS-Ganesha container""" - - daemon_type = 'nfs' - entrypoint = '/usr/bin/ganesha.nfsd' - daemon_args = ['-F', '-L', 'STDERR'] - - required_files = ['ganesha.conf'] - - port_map = { - 'nfs': 2049, - } - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - ctx, - fsid, - daemon_id, - config_json, - image=DEFAULT_IMAGE): - # type: (CephadmContext, str, Union[int, str], Dict, str) -> None - self.ctx = ctx - self.fsid = fsid - self.daemon_id = daemon_id - self.image = image - - # config-json options - self.pool = dict_get(config_json, 'pool', require=True) - self.namespace = dict_get(config_json, 'namespace') - self.userid = dict_get(config_json, 'userid') - self.extra_args = dict_get(config_json, 'extra_args', []) - self.files = dict_get(config_json, 'files', {}) - self.rgw = dict_get(config_json, 'rgw', {}) - - # validate the supplied args - self.validate() - - @classmethod - def init(cls, ctx, fsid, daemon_id): - # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha - return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'NFSGanesha': - return cls.init(ctx, ident.fsid, ident.daemon_id) - - @property - def identity(self) -> DaemonIdentity: - return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) - - def _get_container_mounts(self, data_dir): - # type: (str) -> Dict[str, str] - mounts = dict() - mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' - mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' - mounts[os.path.join(data_dir, 'etc/ganesha')] = '/etc/ganesha:z' - if self.rgw: - cluster = self.rgw.get('cluster', 'ceph') - rgw_user = self.rgw.get('user', 'admin') - mounts[os.path.join(data_dir, 'keyring.rgw')] = \ - '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user) - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - mounts.update(self._get_container_mounts(data_dir)) - - @staticmethod - def get_container_envs(): - # type: () -> List[str] - envs = [ - 'CEPH_CONF=%s' % (CEPH_DEFAULT_CONF) - ] - return envs - - @staticmethod - def get_version(ctx, container_id): - # type: (CephadmContext, str) -> Optional[str] - version = None - out, err, code = call(ctx, - [ctx.container_engine.path, 'exec', container_id, - NFSGanesha.entrypoint, '-v'], - verbosity=CallVerbosity.QUIET) - if code == 0: - match = re.search(r'NFS-Ganesha Release\s*=\s*[V]*([\d.]+)', out) - if match: - version = match.group(1) - return version - - def validate(self): - # type: () -> None - if not is_fsid(self.fsid): - raise Error('not an fsid: %s' % self.fsid) - if not self.daemon_id: - raise Error('invalid daemon_id: %s' % self.daemon_id) - if not self.image: - raise Error('invalid image: %s' % self.image) - - # check for the required files - if self.required_files: - for fname in self.required_files: - if fname not in self.files: - raise Error('required file missing from config-json: %s' % fname) - - # check for an RGW config - if self.rgw: - if not self.rgw.get('keyring'): - raise Error('RGW keyring is missing') - if not self.rgw.get('user'): - raise Error('RGW user is missing') - - def get_daemon_name(self): - # type: () -> str - return '%s.%s' % (self.daemon_type, self.daemon_id) - - def get_container_name(self, desc=None): - # type: (Optional[str]) -> str - cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) - if desc: - cname = '%s-%s' % (cname, desc) - return cname - - def get_daemon_args(self): - # type: () -> List[str] - return self.daemon_args + self.extra_args - - def create_daemon_dirs(self, data_dir, uid, gid): - # type: (str, int, int) -> None - """Create files under the container data dir""" - if not os.path.isdir(data_dir): - raise OSError('data_dir is not a directory: %s' % (data_dir)) - - logger.info('Creating ganesha config...') - - # create the ganesha conf dir - config_dir = os.path.join(data_dir, 'etc/ganesha') - makedirs(config_dir, uid, gid, 0o755) - - # populate files from the config-json - populate_files(config_dir, self.files, uid, gid) - - # write the RGW keyring - if self.rgw: - keyring_path = os.path.join(data_dir, 'keyring.rgw') - with write_new(keyring_path, owner=(uid, gid)) as f: - f.write(self.rgw.get('keyring', '')) - - def firewall_service_name(self) -> str: - return 'nfs' - - def container(self, ctx: CephadmContext) -> CephContainer: - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def customize_container_endpoints( - self, endpoints: List[EndPoint], deployment_type: DeploymentType - ) -> None: - if deployment_type == DeploymentType.DEFAULT and not endpoints: - nfs_ports = list(NFSGanesha.port_map.values()) - endpoints.extend([EndPoint('0.0.0.0', p) for p in nfs_ports]) - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - # TODO: extract ganesha uid/gid (997, 994) ? - return extract_uid_gid(ctx) - - def config_and_keyring( - self, ctx: CephadmContext - ) -> Tuple[Optional[str], Optional[str]]: - return get_config_and_keyring(ctx) - - def customize_container_envs( - self, ctx: CephadmContext, envs: List[str] - ) -> None: - envs.extend(self.get_container_envs()) - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend(self.get_daemon_args()) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.append(ctx.container_engine.unlimited_pids_option) - - def default_entrypoint(self) -> str: - return self.entrypoint - -################################## - - -@register_daemon_form -class CephIscsi(ContainerDaemonForm): - """Defines a Ceph-Iscsi container""" - - daemon_type = 'iscsi' - entrypoint = '/usr/bin/rbd-target-api' - - required_files = ['iscsi-gateway.cfg'] - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - ctx: CephadmContext, - ident: DaemonIdentity, - config_json: Dict, - image: str = DEFAULT_IMAGE): - self.ctx = ctx - self._identity = ident - self.image = image - - # config-json options - self.files = dict_get(config_json, 'files', {}) - - # validate the supplied args - self.validate() - - @classmethod - def init(cls, ctx: CephadmContext, fsid: str, daemon_id: str) -> 'CephIscsi': - return cls.create(ctx, DaemonIdentity(fsid, cls.daemon_type, daemon_id)) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephIscsi': - return cls(ctx, ident, fetch_configs(ctx), ctx.image) - - @property - def identity(self) -> DaemonIdentity: - return self._identity - - @property - def fsid(self) -> str: - return self._identity.fsid - - @property - def daemon_id(self) -> str: - return self._identity.daemon_id - - @staticmethod - def _get_container_mounts(data_dir, log_dir): - # type: (str, str) -> Dict[str, str] - mounts = dict() - mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' - mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' - mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z' - mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config' - mounts[os.path.join(data_dir, 'tcmu-runner-entrypoint.sh')] = '/usr/local/scripts/tcmu-runner-entrypoint.sh' - mounts[log_dir] = '/var/log:z' - mounts['/dev'] = '/dev' - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - # Removes ending ".tcmu" from data_dir a tcmu-runner uses the same - # data_dir as rbd-runner-api - if data_dir.endswith('.tcmu'): - data_dir = re.sub(r'\.tcmu$', '', data_dir) - log_dir = get_log_dir(self.identity.fsid, ctx.log_dir) - mounts.update(CephIscsi._get_container_mounts(data_dir, log_dir)) - - def customize_container_binds( - self, ctx: CephadmContext, binds: List[List[str]] - ) -> None: - lib_modules = [ - 'type=bind', - 'source=/lib/modules', - 'destination=/lib/modules', - 'ro=true', - ] - binds.append(lib_modules) - - @staticmethod - def get_version(ctx, container_id): - # type: (CephadmContext, str) -> Optional[str] - version = None - out, err, code = call(ctx, - [ctx.container_engine.path, 'exec', container_id, - '/usr/bin/python3', '-c', - "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"], - verbosity=CallVerbosity.QUIET) - if code == 0: - version = out.strip() - return version - - def validate(self): - # type: () -> None - if not is_fsid(self.fsid): - raise Error('not an fsid: %s' % self.fsid) - if not self.daemon_id: - raise Error('invalid daemon_id: %s' % self.daemon_id) - if not self.image: - raise Error('invalid image: %s' % self.image) - - # check for the required files - if self.required_files: - for fname in self.required_files: - if fname not in self.files: - raise Error('required file missing from config-json: %s' % fname) - - def get_daemon_name(self): - # type: () -> str - return '%s.%s' % (self.daemon_type, self.daemon_id) - - def get_container_name(self, desc=None): - # type: (Optional[str]) -> str - cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) - if desc: - cname = '%s-%s' % (cname, desc) - return cname - - def create_daemon_dirs(self, data_dir, uid, gid): - # type: (str, int, int) -> None - """Create files under the container data dir""" - if not os.path.isdir(data_dir): - raise OSError('data_dir is not a directory: %s' % (data_dir)) - - logger.info('Creating ceph-iscsi config...') - configfs_dir = os.path.join(data_dir, 'configfs') - makedirs(configfs_dir, uid, gid, 0o755) - - # set up the tcmu-runner entrypoint script - # to be mounted into the container. For more info - # on why we need this script, see the - # tcmu_runner_entrypoint_script function - self.files['tcmu-runner-entrypoint.sh'] = self.tcmu_runner_entrypoint_script() - - # populate files from the config-json - populate_files(data_dir, self.files, uid, gid) - - # we want the tcmu runner entrypoint script to be executable - # populate_files will give it 0o600 by default - os.chmod(os.path.join(data_dir, 'tcmu-runner-entrypoint.sh'), 0o700) - - @staticmethod - def configfs_mount_umount(data_dir, mount=True): - # type: (str, bool) -> List[str] - mount_path = os.path.join(data_dir, 'configfs') - if mount: - cmd = 'if ! grep -qs {0} /proc/mounts; then ' \ - 'mount -t configfs none {0}; fi'.format(mount_path) - else: - cmd = 'if grep -qs {0} /proc/mounts; then ' \ - 'umount {0}; fi'.format(mount_path) - return cmd.split() - - @staticmethod - def tcmu_runner_entrypoint_script() -> str: - # since we are having tcmu-runner be a background - # process in its systemd unit (rbd-target-api being - # the main process) systemd will not restart it when - # it fails. in order to try and get around that for now - # we can have a script mounted in the container that - # that attempts to do the restarting for us. This script - # can then become the entrypoint for the tcmu-runner - # container - - # This is intended to be dropped for a better solution - # for at least the squid release onward - return """#!/bin/bash -RUN_DIR=/var/run/tcmu-runner - -if [ ! -d "${RUN_DIR}" ] ; then - mkdir -p "${RUN_DIR}" -fi - -rm -rf "${RUN_DIR}"/* - -while true -do - touch "${RUN_DIR}"/start-up-$(date -Ins) - /usr/bin/tcmu-runner - - # If we got around 3 kills/segfaults in the last minute, - # don't start anymore - if [ $(find "${RUN_DIR}" -type f -cmin -1 | wc -l) -ge 3 ] ; then - exit 0 - fi - - sleep 1 -done -""" - - def get_tcmu_runner_container(self): - # type: () -> CephContainer - # daemon_id, is used to generated the cid and pid files used by podman but as both tcmu-runner - # and rbd-target-api have the same daemon_id, it conflits and prevent the second container from - # starting. .tcmu runner is appended to the daemon_id to fix that. - subident = DaemonSubIdentity( - self.fsid, self.daemon_type, self.daemon_id, 'tcmu' - ) - tcmu_dmn = self.create(self.ctx, subident) - tcmu_container = to_deployment_container( - self.ctx, daemon_to_container(self.ctx, tcmu_dmn, privileged=True) - ) - # TODO: Eventually we don't want to run tcmu-runner through this script. - # This is intended to be a workaround backported to older releases - # and should eventually be removed in at least squid onward - tcmu_container.entrypoint = '/usr/local/scripts/tcmu-runner-entrypoint.sh' - tcmu_container.cname = self.get_container_name(desc='tcmu') - return tcmu_container - - def container(self, ctx: CephadmContext) -> CephContainer: - # So the container can modprobe iscsi_target_mod and have write perms - # to configfs we need to make this a privileged container. - ctr = daemon_to_container(ctx, self, privileged=True) - return to_deployment_container(ctx, ctr) - - def config_and_keyring( - self, ctx: CephadmContext - ) -> Tuple[Optional[str], Optional[str]]: - return get_config_and_keyring(ctx) - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - return extract_uid_gid(ctx) - - def default_entrypoint(self) -> str: - return self.entrypoint - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.append(ctx.container_engine.unlimited_pids_option) - -################################## - - -@register_daemon_form -class CephNvmeof(ContainerDaemonForm): - """Defines a Ceph-Nvmeof container""" - - daemon_type = 'nvmeof' - required_files = ['ceph-nvmeof.conf'] - default_image = DEFAULT_NVMEOF_IMAGE - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - ctx, - fsid, - daemon_id, - config_json, - image=DEFAULT_NVMEOF_IMAGE): - # type: (CephadmContext, str, Union[int, str], Dict, str) -> None - self.ctx = ctx - self.fsid = fsid - self.daemon_id = daemon_id - self.image = image - - # config-json options - self.files = dict_get(config_json, 'files', {}) - - # validate the supplied args - self.validate() - - @classmethod - def init(cls, ctx, fsid, daemon_id): - # type: (CephadmContext, str, Union[int, str]) -> CephNvmeof - return cls(ctx, fsid, daemon_id, - fetch_configs(ctx), ctx.image) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephNvmeof': - return cls.init(ctx, ident.fsid, ident.daemon_id) - - @property - def identity(self) -> DaemonIdentity: - return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) - - @staticmethod - def _get_container_mounts(data_dir: str) -> Dict[str, str]: - mounts = dict() - mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' - mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' - mounts[os.path.join(data_dir, 'ceph-nvmeof.conf')] = '/src/ceph-nvmeof.conf:z' - mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config' - mounts['/dev/hugepages'] = '/dev/hugepages' - mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio' - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - mounts.update(self._get_container_mounts(data_dir)) - - def customize_container_binds( - self, ctx: CephadmContext, binds: List[List[str]] - ) -> None: - lib_modules = [ - 'type=bind', - 'source=/lib/modules', - 'destination=/lib/modules', - 'ro=true', - ] - binds.append(lib_modules) - - @staticmethod - def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]: - out, err, ret = call(ctx, - [ctx.container_engine.path, 'inspect', - '--format', '{{index .Config.Labels "io.ceph.version"}}', - ctx.image]) - version = None - if ret == 0: - version = out.strip() - return version - - def validate(self): - # type: () -> None - if not is_fsid(self.fsid): - raise Error('not an fsid: %s' % self.fsid) - if not self.daemon_id: - raise Error('invalid daemon_id: %s' % self.daemon_id) - if not self.image: - raise Error('invalid image: %s' % self.image) - - # check for the required files - if self.required_files: - for fname in self.required_files: - if fname not in self.files: - raise Error('required file missing from config-json: %s' % fname) - - def get_daemon_name(self): - # type: () -> str - return '%s.%s' % (self.daemon_type, self.daemon_id) - - def get_container_name(self, desc=None): - # type: (Optional[str]) -> str - cname = '%s-%s' % (self.fsid, self.get_daemon_name()) - if desc: - cname = '%s-%s' % (cname, desc) - return cname - - def create_daemon_dirs(self, data_dir, uid, gid): - # type: (str, int, int) -> None - """Create files under the container data dir""" - if not os.path.isdir(data_dir): - raise OSError('data_dir is not a directory: %s' % (data_dir)) - - logger.info('Creating ceph-nvmeof config...') - configfs_dir = os.path.join(data_dir, 'configfs') - makedirs(configfs_dir, uid, gid, 0o755) - - # populate files from the config-json - populate_files(data_dir, self.files, uid, gid) - - @staticmethod - def configfs_mount_umount(data_dir, mount=True): - # type: (str, bool) -> List[str] - mount_path = os.path.join(data_dir, 'configfs') - if mount: - cmd = 'if ! grep -qs {0} /proc/mounts; then ' \ - 'mount -t configfs none {0}; fi'.format(mount_path) - else: - cmd = 'if grep -qs {0} /proc/mounts; then ' \ - 'umount {0}; fi'.format(mount_path) - return cmd.split() - - @staticmethod - def get_sysctl_settings() -> List[str]: - return [ - 'vm.nr_hugepages = 4096', - ] - - def container(self, ctx: CephadmContext) -> CephContainer: - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - return 167, 167 # TODO: need to get properly the uid/gid - - def config_and_keyring( - self, ctx: CephadmContext - ) -> Tuple[Optional[str], Optional[str]]: - return get_config_and_keyring(ctx) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.append(ctx.container_engine.unlimited_pids_option) - args.extend(['--ulimit', 'memlock=-1:-1']) - args.extend(['--ulimit', 'nofile=10240']) - args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE']) - - -################################## - - -@register_daemon_form -class CephExporter(ContainerDaemonForm): - """Defines a Ceph exporter container""" - - daemon_type = 'ceph-exporter' - entrypoint = '/usr/bin/ceph-exporter' - DEFAULT_PORT = 9926 - port_map = { - 'ceph-exporter': DEFAULT_PORT, - } - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - ctx: CephadmContext, - fsid: str, daemon_id: Union[int, str], - config_json: Dict[str, Any], - image: str = DEFAULT_IMAGE) -> None: - self.ctx = ctx - self.fsid = fsid - self.daemon_id = daemon_id - self.image = image - - self.sock_dir = config_json.get('sock-dir', '/var/run/ceph/') - ipv4_addrs, _ = get_ip_addresses(get_hostname()) - addrs = '0.0.0.0' if ipv4_addrs else '::' - self.addrs = config_json.get('addrs', addrs) - self.port = config_json.get('port', self.DEFAULT_PORT) - self.prio_limit = config_json.get('prio-limit', 5) - self.stats_period = config_json.get('stats-period', 5) - - self.validate() - - @classmethod - def init(cls, ctx: CephadmContext, fsid: str, - daemon_id: Union[int, str]) -> 'CephExporter': - return cls(ctx, fsid, daemon_id, - fetch_configs(ctx), ctx.image) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephExporter': - return cls.init(ctx, ident.fsid, ident.daemon_id) - - @property - def identity(self) -> DaemonIdentity: - return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) - - def get_daemon_args(self) -> List[str]: - args = [ - f'--sock-dir={self.sock_dir}', - f'--addrs={self.addrs}', - f'--port={self.port}', - f'--prio-limit={self.prio_limit}', - f'--stats-period={self.stats_period}', - ] - return args - - def validate(self) -> None: - if not os.path.isdir(self.sock_dir): - raise Error(f'Directory does not exist. Got: {self.sock_dir}') - - def container(self, ctx: CephadmContext) -> CephContainer: - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - return extract_uid_gid(ctx) - - def config_and_keyring( - self, ctx: CephadmContext - ) -> Tuple[Optional[str], Optional[str]]: - return get_config_and_keyring(ctx) - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - cm = Ceph.get_ceph_mounts(ctx, self.identity) - mounts.update(cm) - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - name = 'client.ceph-exporter.%s' % self.identity.daemon_id - args.extend(['-n', name, '-f']) - args.extend(self.get_daemon_args()) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.append(ctx.container_engine.unlimited_pids_option) - - def customize_container_envs( - self, ctx: CephadmContext, envs: List[str] - ) -> None: - envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728') - - def default_entrypoint(self) -> str: - return self.entrypoint - - -################################## - - -@register_daemon_form -class HAproxy(ContainerDaemonForm): - """Defines an HAproxy container""" - daemon_type = 'haproxy' - required_files = ['haproxy.cfg'] - default_image = DEFAULT_HAPROXY_IMAGE - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - ctx: CephadmContext, - fsid: str, daemon_id: Union[int, str], - config_json: Dict, image: str) -> None: - self.ctx = ctx - self.fsid = fsid - self.daemon_id = daemon_id - self.image = image - - # config-json options - self.files = dict_get(config_json, 'files', {}) - - self.validate() - - @classmethod - def init(cls, ctx: CephadmContext, - fsid: str, daemon_id: Union[int, str]) -> 'HAproxy': - return cls(ctx, fsid, daemon_id, fetch_configs(ctx), - ctx.image) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'HAproxy': - return cls.init(ctx, ident.fsid, ident.daemon_id) - - @property - def identity(self) -> DaemonIdentity: - return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) - - def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: - """Create files under the container data dir""" - if not os.path.isdir(data_dir): - raise OSError('data_dir is not a directory: %s' % (data_dir)) - - # create additional directories in data dir for HAproxy to use - if not os.path.isdir(os.path.join(data_dir, 'haproxy')): - makedirs(os.path.join(data_dir, 'haproxy'), uid, gid, DATA_DIR_MODE) - - data_dir = os.path.join(data_dir, 'haproxy') - populate_files(data_dir, self.files, uid, gid) - - def get_daemon_args(self) -> List[str]: - return ['haproxy', '-f', '/var/lib/haproxy/haproxy.cfg'] - - def validate(self): - # type: () -> None - if not is_fsid(self.fsid): - raise Error('not an fsid: %s' % self.fsid) - if not self.daemon_id: - raise Error('invalid daemon_id: %s' % self.daemon_id) - if not self.image: - raise Error('invalid image: %s' % self.image) - - # check for the required files - if self.required_files: - for fname in self.required_files: - if fname not in self.files: - raise Error('required file missing from config-json: %s' % fname) - - def get_daemon_name(self): - # type: () -> str - return '%s.%s' % (self.daemon_type, self.daemon_id) - - def get_container_name(self, desc=None): - # type: (Optional[str]) -> str - cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) - if desc: - cname = '%s-%s' % (cname, desc) - return cname - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - # better directory for this? - return extract_uid_gid(self.ctx, file_path='/var/lib') - - @staticmethod - def _get_container_mounts(data_dir: str) -> Dict[str, str]: - mounts = dict() - mounts[os.path.join(data_dir, 'haproxy')] = '/var/lib/haproxy' - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - mounts.update(self._get_container_mounts(data_dir)) - - @staticmethod - def get_sysctl_settings() -> List[str]: - return [ - '# IP forwarding and non-local bind', - 'net.ipv4.ip_forward = 1', - 'net.ipv4.ip_nonlocal_bind = 1', - ] - - def container(self, ctx: CephadmContext) -> CephContainer: - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend( - ['--user=root'] - ) # haproxy 2.4 defaults to a different user - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend(self.get_daemon_args()) - - -################################## - - -@register_daemon_form -class Keepalived(ContainerDaemonForm): - """Defines an Keepalived container""" - daemon_type = 'keepalived' - required_files = ['keepalived.conf'] - default_image = DEFAULT_KEEPALIVED_IMAGE - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - ctx: CephadmContext, - fsid: str, daemon_id: Union[int, str], - config_json: Dict, image: str) -> None: - self.ctx = ctx - self.fsid = fsid - self.daemon_id = daemon_id - self.image = image - - # config-json options - self.files = dict_get(config_json, 'files', {}) - - self.validate() - - @classmethod - def init(cls, ctx: CephadmContext, fsid: str, - daemon_id: Union[int, str]) -> 'Keepalived': - return cls(ctx, fsid, daemon_id, - fetch_configs(ctx), ctx.image) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Keepalived': - return cls.init(ctx, ident.fsid, ident.daemon_id) - - @property - def identity(self) -> DaemonIdentity: - return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) - - def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: - """Create files under the container data dir""" - if not os.path.isdir(data_dir): - raise OSError('data_dir is not a directory: %s' % (data_dir)) - - # create additional directories in data dir for keepalived to use - if not os.path.isdir(os.path.join(data_dir, 'keepalived')): - makedirs(os.path.join(data_dir, 'keepalived'), uid, gid, DATA_DIR_MODE) - - # populate files from the config-json - populate_files(data_dir, self.files, uid, gid) - - def validate(self): - # type: () -> None - if not is_fsid(self.fsid): - raise Error('not an fsid: %s' % self.fsid) - if not self.daemon_id: - raise Error('invalid daemon_id: %s' % self.daemon_id) - if not self.image: - raise Error('invalid image: %s' % self.image) - - # check for the required files - if self.required_files: - for fname in self.required_files: - if fname not in self.files: - raise Error('required file missing from config-json: %s' % fname) - - def get_daemon_name(self): - # type: () -> str - return '%s.%s' % (self.daemon_type, self.daemon_id) - - def get_container_name(self, desc=None): - # type: (Optional[str]) -> str - cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) - if desc: - cname = '%s-%s' % (cname, desc) - return cname - - @staticmethod - def get_container_envs(): - # type: () -> List[str] - envs = [ - 'KEEPALIVED_AUTOCONF=false', - 'KEEPALIVED_CONF=/etc/keepalived/keepalived.conf', - 'KEEPALIVED_CMD=/usr/sbin/keepalived -n -l -f /etc/keepalived/keepalived.conf', - 'KEEPALIVED_DEBUG=false' - ] - return envs - - @staticmethod - def get_sysctl_settings() -> List[str]: - return [ - '# IP forwarding and non-local bind', - 'net.ipv4.ip_forward = 1', - 'net.ipv4.ip_nonlocal_bind = 1', - ] - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - # better directory for this? - return extract_uid_gid(self.ctx, file_path='/var/lib') - - @staticmethod - def _get_container_mounts(data_dir: str) -> Dict[str, str]: - mounts = dict() - mounts[os.path.join(data_dir, 'keepalived.conf')] = '/etc/keepalived/keepalived.conf' - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - mounts.update(self._get_container_mounts(data_dir)) - - def container(self, ctx: CephadmContext) -> CephContainer: - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def customize_container_envs( - self, ctx: CephadmContext, envs: List[str] - ) -> None: - envs.extend(self.get_container_envs()) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW']) - - -################################## - - -@register_daemon_form -class Tracing(ContainerDaemonForm): - """Define the configs for the jaeger tracing containers""" - - components: Dict[str, Dict[str, Any]] = { - 'elasticsearch': { - 'image': DEFAULT_ELASTICSEARCH_IMAGE, - 'envs': ['discovery.type=single-node'] - }, - 'jaeger-agent': { - 'image': DEFAULT_JAEGER_AGENT_IMAGE, - }, - 'jaeger-collector': { - 'image': DEFAULT_JAEGER_COLLECTOR_IMAGE, - }, - 'jaeger-query': { - 'image': DEFAULT_JAEGER_QUERY_IMAGE, - }, - } # type: ignore - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return daemon_type in cls.components - - @staticmethod - def set_configuration(config: Dict[str, str], daemon_type: str) -> None: - if daemon_type in ['jaeger-collector', 'jaeger-query']: - assert 'elasticsearch_nodes' in config - Tracing.components[daemon_type]['envs'] = [ - 'SPAN_STORAGE_TYPE=elasticsearch', - f'ES_SERVER_URLS={config["elasticsearch_nodes"]}'] - if daemon_type == 'jaeger-agent': - assert 'collector_nodes' in config - Tracing.components[daemon_type]['daemon_args'] = [ - f'--reporter.grpc.host-port={config["collector_nodes"]}', - '--processor.jaeger-compact.server-host-port=6799' - ] - - def __init__(self, ident: DaemonIdentity) -> None: - self._identity = ident - self._configured = False - - def _configure(self, ctx: CephadmContext) -> None: - if self._configured: - return - config = fetch_configs(ctx) - # Currently, this method side-effects the class attribute, and that - # is unpleasant. In the future it would be nice to move all of - # set_configuration into _confiure and only modify each classes data - # independently - self.set_configuration(config, self.identity.daemon_type) - self._configured = True - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Tracing': - return cls(ident) - - @property - def identity(self) -> DaemonIdentity: - return self._identity - - def container(self, ctx: CephadmContext) -> CephContainer: - ctr = daemon_to_container(ctx, self) - return to_deployment_container(ctx, ctr) - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - return 65534, 65534 - - def get_daemon_args(self) -> List[str]: - return self.components[self.identity.daemon_type].get( - 'daemon_args', [] - ) - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - self._configure(ctx) - # earlier code did an explicit check if the daemon type was jaeger-agent - # and would only call get_daemon_args if that was true. However, since - # the function only returns a non-empty list in the case of jaeger-agent - # that check is unnecessary and is not brought over. - args.extend(self.get_daemon_args()) - - def customize_container_envs( - self, ctx: CephadmContext, envs: List[str] - ) -> None: - self._configure(ctx) - envs.extend( - self.components[self.identity.daemon_type].get('envs', []) - ) - - def default_entrypoint(self) -> str: - return '' - - -################################## - - -@register_daemon_form -class CustomContainer(ContainerDaemonForm): - """Defines a custom container""" - daemon_type = 'container' - - @classmethod - def for_daemon_type(cls, daemon_type: str) -> bool: - return cls.daemon_type == daemon_type - - def __init__(self, - fsid: str, daemon_id: Union[int, str], - config_json: Dict, image: str) -> None: - self.fsid = fsid - self.daemon_id = daemon_id - self.image = image - - # config-json options - self.entrypoint = dict_get(config_json, 'entrypoint') - self.uid = dict_get(config_json, 'uid', 65534) # nobody - self.gid = dict_get(config_json, 'gid', 65534) # nobody - self.volume_mounts = dict_get(config_json, 'volume_mounts', {}) - self.args = dict_get(config_json, 'args', []) - self.envs = dict_get(config_json, 'envs', []) - self.privileged = dict_get(config_json, 'privileged', False) - self.bind_mounts = dict_get(config_json, 'bind_mounts', []) - self.ports = dict_get(config_json, 'ports', []) - self.dirs = dict_get(config_json, 'dirs', []) - self.files = dict_get(config_json, 'files', {}) - - @classmethod - def init(cls, ctx: CephadmContext, - fsid: str, daemon_id: Union[int, str]) -> 'CustomContainer': - return cls(fsid, daemon_id, - fetch_configs(ctx), ctx.image) - - @classmethod - def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CustomContainer': - return cls.init(ctx, ident.fsid, ident.daemon_id) - - @property - def identity(self) -> DaemonIdentity: - return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) - - def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: - """ - Create dirs/files below the container data directory. - """ - logger.info('Creating custom container configuration ' - 'dirs/files in {} ...'.format(data_dir)) - - if not os.path.isdir(data_dir): - raise OSError('data_dir is not a directory: %s' % data_dir) - - for dir_path in self.dirs: - logger.info('Creating directory: {}'.format(dir_path)) - dir_path = os.path.join(data_dir, dir_path.strip('/')) - makedirs(dir_path, uid, gid, 0o755) - - for file_path in self.files: - logger.info('Creating file: {}'.format(file_path)) - content = dict_get_join(self.files, file_path) - file_path = os.path.join(data_dir, file_path.strip('/')) - with write_new(file_path, owner=(uid, gid), encoding='utf-8') as f: - f.write(content) - - def get_daemon_args(self) -> List[str]: - return [] - - def get_container_args(self) -> List[str]: - return self.args - - def get_container_envs(self) -> List[str]: - return self.envs - - def _get_container_mounts(self, data_dir: str) -> Dict[str, str]: - """ - Get the volume mounts. Relative source paths will be located below - `/var/lib/ceph/<cluster-fsid>/<daemon-name>`. - - Example: - { - /foo/conf: /conf - foo/conf: /conf - } - becomes - { - /foo/conf: /conf - /var/lib/ceph/<cluster-fsid>/<daemon-name>/foo/conf: /conf - } - """ - mounts = {} - for source, destination in self.volume_mounts.items(): - source = os.path.join(data_dir, source) - mounts[source] = destination - return mounts - - def customize_container_mounts( - self, ctx: CephadmContext, mounts: Dict[str, str] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - mounts.update(self._get_container_mounts(data_dir)) - - def _get_container_binds(self, data_dir: str) -> List[List[str]]: - """ - Get the bind mounts. Relative `source=...` paths will be located below - `/var/lib/ceph/<cluster-fsid>/<daemon-name>`. - - Example: - [ - 'type=bind', - 'source=lib/modules', - 'destination=/lib/modules', - 'ro=true' - ] - becomes - [ - ... - 'source=/var/lib/ceph/<cluster-fsid>/<daemon-name>/lib/modules', - ... - ] - """ - binds = self.bind_mounts.copy() - for bind in binds: - for index, value in enumerate(bind): - match = re.match(r'^source=(.+)$', value) - if match: - bind[index] = 'source={}'.format(os.path.join( - data_dir, match.group(1))) - return binds - - def customize_container_binds( - self, ctx: CephadmContext, binds: List[List[str]] - ) -> None: - data_dir = self.identity.data_dir(ctx.data_dir) - binds.extend(self._get_container_binds(data_dir)) - - # Cache the container so we don't need to rebuild it again when calling - # into init_containers - _container: Optional[CephContainer] = None - - def container(self, ctx: CephadmContext) -> CephContainer: - if self._container is None: - ctr = daemon_to_container( - ctx, - self, - host_network=False, - privileged=self.privileged, - ptrace=ctx.allow_ptrace, - ) - self._container = to_deployment_container(ctx, ctr) - return self._container - - def init_containers(self, ctx: CephadmContext) -> List[InitContainer]: - primary = self.container(ctx) - init_containers: List[Dict[str, Any]] = getattr( - ctx, 'init_containers', [] - ) - return [ - InitContainer.from_primary_and_opts(ctx, primary, ic_opts) - for ic_opts in init_containers - ] - - def customize_container_endpoints( - self, endpoints: List[EndPoint], deployment_type: DeploymentType - ) -> None: - if deployment_type == DeploymentType.DEFAULT: - endpoints.extend([EndPoint('0.0.0.0', p) for p in self.ports]) - - def customize_container_envs( - self, ctx: CephadmContext, envs: List[str] - ) -> None: - envs.extend(self.get_container_envs()) - - def customize_container_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend(self.get_container_args()) - - def customize_process_args( - self, ctx: CephadmContext, args: List[str] - ) -> None: - args.extend(self.get_daemon_args()) - - def default_entrypoint(self) -> str: - return self.entrypoint or '' - - def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: - return self.uid, self.gid - - -################################## - - def get_supported_daemons(): # type: () -> List[str] supported_daemons = ceph_daemons() @@ -2213,15 +221,6 @@ def get_supported_daemons(): assert len(supported_daemons) == len(set(supported_daemons)) return supported_daemons - -def ceph_daemons() -> List[str]: - cds = list(Ceph._daemons) - cds.append(CephExporter.daemon_type) - return cds - -################################## - - ################################## @@ -2844,76 +843,11 @@ def get_container_mounts_for_type( """Return a dictionary mapping container-external paths to container-internal paths given an fsid and daemon_type. """ - mounts = _get_container_mounts_for_type(ctx, fsid, daemon_type) + mounts = get_ceph_mounts_for_type(ctx, fsid, daemon_type) _update_podman_mounts(ctx, mounts) return mounts -def _get_container_mounts_for_type( - ctx: CephadmContext, fsid: str, daemon_type: str -) -> Dict[str, str]: - """The main implementation of get_container_mounts_for_type minus the call - to _update_podman_mounts so that this can be called from - get_container_mounts. - """ - mounts = dict() - - if daemon_type in ceph_daemons(): - if fsid: - run_path = os.path.join('/var/run/ceph', fsid) - if os.path.exists(run_path): - mounts[run_path] = '/var/run/ceph:z' - log_dir = get_log_dir(fsid, ctx.log_dir) - mounts[log_dir] = '/var/log/ceph:z' - crash_dir = '/var/lib/ceph/%s/crash' % fsid - if os.path.exists(crash_dir): - mounts[crash_dir] = '/var/lib/ceph/crash:z' - if daemon_type != 'crash' and should_log_to_journald(ctx): - journald_sock_dir = '/run/systemd/journal' - mounts[journald_sock_dir] = journald_sock_dir - - if daemon_type in ['mon', 'osd', 'clusterless-ceph-volume']: - mounts['/dev'] = '/dev' # FIXME: narrow this down? - mounts['/run/udev'] = '/run/udev' - if daemon_type in ['osd', 'clusterless-ceph-volume']: - mounts['/sys'] = '/sys' # for numa.cc, pick_address, cgroups, ... - mounts['/run/lvm'] = '/run/lvm' - mounts['/run/lock/lvm'] = '/run/lock/lvm' - if daemon_type == 'osd': - # selinux-policy in the container may not match the host. - if HostFacts(ctx).selinux_enabled: - cluster_dir = f'{ctx.data_dir}/{fsid}' - selinux_folder = f'{cluster_dir}/selinux' - if os.path.exists(cluster_dir): - if not os.path.exists(selinux_folder): - os.makedirs(selinux_folder, mode=0o755) - mounts[selinux_folder] = '/sys/fs/selinux:ro' - else: - logger.error(f'Cluster direcotry {cluster_dir} does not exist.') - mounts['/'] = '/rootfs' - - try: - if ctx.shared_ceph_folder: # make easy manager modules/ceph-volume development - ceph_folder = pathify(ctx.shared_ceph_folder) - if os.path.exists(ceph_folder): - cephadm_binary = ceph_folder + '/src/cephadm/cephadm' - if not os.path.exists(pathify(cephadm_binary)): - raise Error("cephadm binary does not exist. Please run './build.sh cephadm' from ceph/src/cephadm/ directory.") - mounts[cephadm_binary] = '/usr/sbin/cephadm' - mounts[ceph_folder + '/src/ceph-volume/ceph_volume'] = '/usr/lib/python3.6/site-packages/ceph_volume' - mounts[ceph_folder + '/src/pybind/mgr'] = '/usr/share/ceph/mgr' - mounts[ceph_folder + '/src/python-common/ceph'] = '/usr/lib/python3.6/site-packages/ceph' - mounts[ceph_folder + '/monitoring/ceph-mixin/dashboards_out'] = '/etc/grafana/dashboards/ceph-dashboard' - mounts[ceph_folder + '/monitoring/ceph-mixin/prometheus_alerts.yml'] = '/etc/prometheus/ceph/ceph_default_alerts.yml' - else: - logger.error( - 'Ceph shared source folder does not exist.', - extra=Highlight.FAILURE.extra()) - except AttributeError: - pass - return mounts - - def get_container_mounts( ctx: CephadmContext, ident: 'DaemonIdentity', no_config: bool = False ) -> Dict[str, str]: @@ -5257,36 +3191,6 @@ def command_registry_login(ctx: CephadmContext) -> int: ################################## -def to_deployment_container( - ctx: CephadmContext, ctr: CephContainer -) -> CephContainer: - """Given a standard ceph container instance return a CephContainer - prepared for a deployment as a daemon, having the extra args and - custom configurations added. - NOTE: The `ctr` object is mutated before being returned. - """ - if 'extra_container_args' in ctx and ctx.extra_container_args: - ctr.container_args.extend(ctx.extra_container_args) - if 'extra_entrypoint_args' in ctx and ctx.extra_entrypoint_args: - ctr.args.extend(ctx.extra_entrypoint_args) - ccfiles = fetch_custom_config_files(ctx) - if ccfiles: - mandatory_keys = ['mount_path', 'content'] - for conf in ccfiles: - if all(k in conf for k in mandatory_keys): - mount_path = conf['mount_path'] - assert ctr.identity - file_path = os.path.join( - ctx.data_dir, - ctr.identity.fsid, - 'custom_config_files', - ctr.identity.daemon_name, - os.path.basename(mount_path) - ) - ctr.volume_mounts[file_path] = mount_path - return ctr - - def get_deployment_type( ctx: CephadmContext, ident: 'DaemonIdentity', ) -> DeploymentType: diff --git a/src/cephadm/cephadmlib/daemons/__init__.py b/src/cephadm/cephadmlib/daemons/__init__.py new file mode 100644 index 00000000000..cf572d487c9 --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/__init__.py @@ -0,0 +1,24 @@ +from .ceph import Ceph, OSD, CephExporter +from .custom import CustomContainer +from .ingress import HAproxy, Keepalived +from .iscsi import CephIscsi +from .monitoring import Monitoring +from .nfs import NFSGanesha +from .nvmeof import CephNvmeof +from .snmp import SNMPGateway +from .tracing import Tracing + +__all__ = [ + 'Ceph', + 'CephExporter', + 'CephIscsi', + 'CephNvmeof', + 'CustomContainer', + 'HAproxy', + 'Keepalived', + 'Monitoring', + 'NFSGanesha', + 'OSD', + 'SNMPGateway', + 'Tracing', +] diff --git a/src/cephadm/cephadmlib/daemons/ceph.py b/src/cephadm/cephadmlib/daemons/ceph.py new file mode 100644 index 00000000000..0afb8f734af --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/ceph.py @@ -0,0 +1,462 @@ +import logging +import os + +from typing import Any, Dict, List, Optional, Tuple, Union + +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer, extract_uid_gid +from ..context_getters import ( + fetch_configs, + get_config_and_keyring, + should_log_to_journald, +) +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..constants import DEFAULT_IMAGE +from ..context import CephadmContext +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..file_utils import make_run_dir, pathify +from ..host_facts import HostFacts +from ..logging import Highlight +from ..net_utils import get_hostname, get_ip_addresses + + +logger = logging.getLogger() + + +@register_daemon_form +class Ceph(ContainerDaemonForm): + _daemons = ( + 'mon', + 'mgr', + 'osd', + 'mds', + 'rgw', + 'rbd-mirror', + 'crash', + 'cephfs-mirror', + ) + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + # TODO: figure out a way to un-special-case osd + return daemon_type in cls._daemons and daemon_type != 'osd' + + def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None: + self.ctx = ctx + self._identity = ident + self.user_supplied_config = False + + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Ceph': + return cls(ctx, ident) + + @property + def identity(self) -> DaemonIdentity: + return self._identity + + def firewall_service_name(self) -> str: + if self.identity.daemon_type == 'mon': + return 'ceph-mon' + elif self.identity.daemon_type in ['mgr', 'mds']: + return 'ceph' + return '' + + def container(self, ctx: CephadmContext) -> CephContainer: + # previous to being a ContainerDaemonForm, this call to create the + # var-run directory was hard coded in the deploy path. Eventually, it + # would be good to move this somwhere cleaner and avoid needing to know + # the uid/gid here. + uid, gid = self.uid_gid(ctx) + make_run_dir(ctx.fsid, uid, gid) + + # mon and osd need privileged in order for libudev to query devices + privileged = self.identity.daemon_type in ['mon', 'osd'] + ctr = daemon_to_container(ctx, self, privileged=privileged) + ctr = to_deployment_container(ctx, ctr) + config_json = fetch_configs(ctx) + if self.identity.daemon_type == 'mon' and config_json is not None: + if 'crush_location' in config_json: + c_loc = config_json['crush_location'] + # was originally "c.args.extend(['--set-crush-location', c_loc])" + # but that doesn't seem to persist in the object after it's passed + # in further function calls + ctr.args = ctr.args + ['--set-crush-location', c_loc] + return ctr + + _uid_gid: Optional[Tuple[int, int]] = None + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + if self._uid_gid is None: + self._uid_gid = extract_uid_gid(ctx) + return self._uid_gid + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def get_daemon_args(self) -> List[str]: + if self.identity.daemon_type == 'crash': + return [] + r = [ + '--setuser', + 'ceph', + '--setgroup', + 'ceph', + '--default-log-to-file=false', + ] + log_to_journald = should_log_to_journald(self.ctx) + if log_to_journald: + r += [ + '--default-log-to-journald=true', + '--default-log-to-stderr=false', + ] + else: + r += [ + '--default-log-to-stderr=true', + '--default-log-stderr-prefix=debug ', + ] + if self.identity.daemon_type == 'mon': + r += [ + '--default-mon-cluster-log-to-file=false', + ] + if log_to_journald: + r += [ + '--default-mon-cluster-log-to-journald=true', + '--default-mon-cluster-log-to-stderr=false', + ] + else: + r += ['--default-mon-cluster-log-to-stderr=true'] + return r + + @staticmethod + def get_ceph_mounts( + ctx: CephadmContext, + ident: DaemonIdentity, + no_config: bool = False, + ) -> Dict[str, str]: + # Warning: This is a hack done for more expedient refactoring + mounts = get_ceph_mounts_for_type(ctx, ident.fsid, ident.daemon_type) + data_dir = ident.data_dir(ctx.data_dir) + if ident.daemon_type == 'rgw': + cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % ( + ident.daemon_id + ) + else: + cdata_dir = '/var/lib/ceph/%s/ceph-%s' % ( + ident.daemon_type, + ident.daemon_id, + ) + if ident.daemon_type != 'crash': + mounts[data_dir] = cdata_dir + ':z' + if not no_config: + mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z' + if ident.daemon_type in [ + 'rbd-mirror', + 'cephfs-mirror', + 'crash', + 'ceph-exporter', + ]: + # these do not search for their keyrings in a data directory + mounts[ + data_dir + '/keyring' + ] = '/etc/ceph/ceph.client.%s.%s.keyring' % ( + ident.daemon_type, + ident.daemon_id, + ) + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + no_config = bool( + getattr(ctx, 'config', None) and self.user_supplied_config + ) + cm = self.get_ceph_mounts( + ctx, + self.identity, + no_config=no_config, + ) + mounts.update(cm) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + ident = self.identity + if ident.daemon_type == 'rgw': + name = 'client.rgw.%s' % ident.daemon_id + elif ident.daemon_type == 'rbd-mirror': + name = 'client.rbd-mirror.%s' % ident.daemon_id + elif ident.daemon_type == 'cephfs-mirror': + name = 'client.cephfs-mirror.%s' % ident.daemon_id + elif ident.daemon_type == 'crash': + name = 'client.crash.%s' % ident.daemon_id + elif ident.daemon_type in ['mon', 'mgr', 'mds', 'osd']: + name = ident.daemon_name + else: + raise ValueError(ident) + args.extend(['-n', name]) + if ident.daemon_type != 'crash': + args.append('-f') + args.extend(self.get_daemon_args()) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728') + + def default_entrypoint(self) -> str: + ep = { + 'rgw': '/usr/bin/radosgw', + 'rbd-mirror': '/usr/bin/rbd-mirror', + 'cephfs-mirror': '/usr/bin/cephfs-mirror', + } + daemon_type = self.identity.daemon_type + return ep.get(daemon_type) or f'/usr/bin/ceph-{daemon_type}' + + +@register_daemon_form +class OSD(Ceph): + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + # TODO: figure out a way to un-special-case osd + return daemon_type == 'osd' + + def __init__( + self, + ctx: CephadmContext, + ident: DaemonIdentity, + osd_fsid: Optional[str] = None, + ) -> None: + super().__init__(ctx, ident) + self._osd_fsid = osd_fsid + + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'OSD': + osd_fsid = getattr(ctx, 'osd_fsid', None) + if osd_fsid is None: + logger.info( + 'Creating an OSD daemon form without an OSD FSID value' + ) + return cls(ctx, ident, osd_fsid) + + @staticmethod + def get_sysctl_settings() -> List[str]: + return [ + '# allow a large number of OSDs', + 'fs.aio-max-nr = 1048576', + 'kernel.pid_max = 4194304', + ] + + def firewall_service_name(self) -> str: + return 'ceph' + + @property + def osd_fsid(self) -> Optional[str]: + return self._osd_fsid + + +@register_daemon_form +class CephExporter(ContainerDaemonForm): + """Defines a Ceph exporter container""" + + daemon_type = 'ceph-exporter' + entrypoint = '/usr/bin/ceph-exporter' + DEFAULT_PORT = 9926 + port_map = { + 'ceph-exporter': DEFAULT_PORT, + } + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, + ctx: CephadmContext, + fsid: str, + daemon_id: Union[int, str], + config_json: Dict[str, Any], + image: str = DEFAULT_IMAGE, + ) -> None: + self.ctx = ctx + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + self.sock_dir = config_json.get('sock-dir', '/var/run/ceph/') + ipv4_addrs, _ = get_ip_addresses(get_hostname()) + addrs = '0.0.0.0' if ipv4_addrs else '::' + self.addrs = config_json.get('addrs', addrs) + self.port = config_json.get('port', self.DEFAULT_PORT) + self.prio_limit = config_json.get('prio-limit', 5) + self.stats_period = config_json.get('stats-period', 5) + + self.validate() + + @classmethod + def init( + cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] + ) -> 'CephExporter': + return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'CephExporter': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + def get_daemon_args(self) -> List[str]: + args = [ + f'--sock-dir={self.sock_dir}', + f'--addrs={self.addrs}', + f'--port={self.port}', + f'--prio-limit={self.prio_limit}', + f'--stats-period={self.stats_period}', + ] + return args + + def validate(self) -> None: + if not os.path.isdir(self.sock_dir): + raise Error(f'Directory does not exist. Got: {self.sock_dir}') + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return extract_uid_gid(ctx) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + cm = Ceph.get_ceph_mounts(ctx, self.identity) + mounts.update(cm) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + name = 'client.ceph-exporter.%s' % self.identity.daemon_id + args.extend(['-n', name, '-f']) + args.extend(self.get_daemon_args()) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728') + + def default_entrypoint(self) -> str: + return self.entrypoint + + +def get_ceph_mounts_for_type( + ctx: CephadmContext, fsid: str, daemon_type: str +) -> Dict[str, str]: + """The main implementation of get_container_mounts_for_type minus the call + to _update_podman_mounts so that this can be called from + get_container_mounts. + """ + mounts = dict() + + if daemon_type in ceph_daemons(): + if fsid: + run_path = os.path.join('/var/run/ceph', fsid) + if os.path.exists(run_path): + mounts[run_path] = '/var/run/ceph:z' + log_dir = os.path.join(ctx.log_dir, fsid) + mounts[log_dir] = '/var/log/ceph:z' + crash_dir = '/var/lib/ceph/%s/crash' % fsid + if os.path.exists(crash_dir): + mounts[crash_dir] = '/var/lib/ceph/crash:z' + if daemon_type != 'crash' and should_log_to_journald(ctx): + journald_sock_dir = '/run/systemd/journal' + mounts[journald_sock_dir] = journald_sock_dir + + if daemon_type in ['mon', 'osd', 'clusterless-ceph-volume']: + mounts['/dev'] = '/dev' # FIXME: narrow this down? + mounts['/run/udev'] = '/run/udev' + if daemon_type in ['osd', 'clusterless-ceph-volume']: + mounts['/sys'] = '/sys' # for numa.cc, pick_address, cgroups, ... + mounts['/run/lvm'] = '/run/lvm' + mounts['/run/lock/lvm'] = '/run/lock/lvm' + if daemon_type == 'osd': + # selinux-policy in the container may not match the host. + if HostFacts(ctx).selinux_enabled: + cluster_dir = f'{ctx.data_dir}/{fsid}' + selinux_folder = f'{cluster_dir}/selinux' + if os.path.exists(cluster_dir): + if not os.path.exists(selinux_folder): + os.makedirs(selinux_folder, mode=0o755) + mounts[selinux_folder] = '/sys/fs/selinux:ro' + else: + logger.error( + f'Cluster direcotry {cluster_dir} does not exist.' + ) + mounts['/'] = '/rootfs' + + try: + if ( + ctx.shared_ceph_folder + ): # make easy manager modules/ceph-volume development + ceph_folder = pathify(ctx.shared_ceph_folder) + if os.path.exists(ceph_folder): + cephadm_binary = ceph_folder + '/src/cephadm/cephadm' + if not os.path.exists(pathify(cephadm_binary)): + raise Error( + "cephadm binary does not exist. Please run './build.sh cephadm' from ceph/src/cephadm/ directory." + ) + mounts[cephadm_binary] = '/usr/sbin/cephadm' + mounts[ + ceph_folder + '/src/ceph-volume/ceph_volume' + ] = '/usr/lib/python3.6/site-packages/ceph_volume' + mounts[ + ceph_folder + '/src/pybind/mgr' + ] = '/usr/share/ceph/mgr' + mounts[ + ceph_folder + '/src/python-common/ceph' + ] = '/usr/lib/python3.6/site-packages/ceph' + mounts[ + ceph_folder + '/monitoring/ceph-mixin/dashboards_out' + ] = '/etc/grafana/dashboards/ceph-dashboard' + mounts[ + ceph_folder + + '/monitoring/ceph-mixin/prometheus_alerts.yml' + ] = '/etc/prometheus/ceph/ceph_default_alerts.yml' + else: + logger.error( + 'Ceph shared source folder does not exist.', + extra=Highlight.FAILURE.extra(), + ) + except AttributeError: + pass + return mounts + + +def ceph_daemons() -> List[str]: + """A legacy method that returns a list of all daemon types considered ceph + daemons. + """ + cds = list(Ceph._daemons) + cds.append(CephExporter.daemon_type) + return cds diff --git a/src/cephadm/cephadmlib/daemons/custom.py b/src/cephadm/cephadmlib/daemons/custom.py new file mode 100644 index 00000000000..e833c80c9a5 --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/custom.py @@ -0,0 +1,222 @@ +import logging +import os +import re + +from typing import Any, Dict, List, Optional, Tuple, Union + +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer, InitContainer +from ..context import CephadmContext +from ..context_getters import fetch_configs +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..data_utils import dict_get, dict_get_join +from ..deploy import DeploymentType +from ..deployment_utils import to_deployment_container +from ..file_utils import write_new, makedirs +from ..net_utils import EndPoint + + +logger = logging.getLogger() + + +@register_daemon_form +class CustomContainer(ContainerDaemonForm): + """Defines a custom container""" + + daemon_type = 'container' + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, + fsid: str, + daemon_id: Union[int, str], + config_json: Dict, + image: str, + ) -> None: + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.entrypoint = dict_get(config_json, 'entrypoint') + self.uid = dict_get(config_json, 'uid', 65534) # nobody + self.gid = dict_get(config_json, 'gid', 65534) # nobody + self.volume_mounts = dict_get(config_json, 'volume_mounts', {}) + self.args = dict_get(config_json, 'args', []) + self.envs = dict_get(config_json, 'envs', []) + self.privileged = dict_get(config_json, 'privileged', False) + self.bind_mounts = dict_get(config_json, 'bind_mounts', []) + self.ports = dict_get(config_json, 'ports', []) + self.dirs = dict_get(config_json, 'dirs', []) + self.files = dict_get(config_json, 'files', {}) + + @classmethod + def init( + cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] + ) -> 'CustomContainer': + return cls(fsid, daemon_id, fetch_configs(ctx), ctx.image) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'CustomContainer': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: + """ + Create dirs/files below the container data directory. + """ + logger.info( + 'Creating custom container configuration ' + 'dirs/files in {} ...'.format(data_dir) + ) + + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % data_dir) + + for dir_path in self.dirs: + logger.info('Creating directory: {}'.format(dir_path)) + dir_path = os.path.join(data_dir, dir_path.strip('/')) + makedirs(dir_path, uid, gid, 0o755) + + for file_path in self.files: + logger.info('Creating file: {}'.format(file_path)) + content = dict_get_join(self.files, file_path) + file_path = os.path.join(data_dir, file_path.strip('/')) + with write_new( + file_path, owner=(uid, gid), encoding='utf-8' + ) as f: + f.write(content) + + def get_daemon_args(self) -> List[str]: + return [] + + def get_container_args(self) -> List[str]: + return self.args + + def get_container_envs(self) -> List[str]: + return self.envs + + def _get_container_mounts(self, data_dir: str) -> Dict[str, str]: + """ + Get the volume mounts. Relative source paths will be located below + `/var/lib/ceph/<cluster-fsid>/<daemon-name>`. + + Example: + { + /foo/conf: /conf + foo/conf: /conf + } + becomes + { + /foo/conf: /conf + /var/lib/ceph/<cluster-fsid>/<daemon-name>/foo/conf: /conf + } + """ + mounts = {} + for source, destination in self.volume_mounts.items(): + source = os.path.join(data_dir, source) + mounts[source] = destination + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def _get_container_binds(self, data_dir: str) -> List[List[str]]: + """ + Get the bind mounts. Relative `source=...` paths will be located below + `/var/lib/ceph/<cluster-fsid>/<daemon-name>`. + + Example: + [ + 'type=bind', + 'source=lib/modules', + 'destination=/lib/modules', + 'ro=true' + ] + becomes + [ + ... + 'source=/var/lib/ceph/<cluster-fsid>/<daemon-name>/lib/modules', + ... + ] + """ + binds = self.bind_mounts.copy() + for bind in binds: + for index, value in enumerate(bind): + match = re.match(r'^source=(.+)$', value) + if match: + bind[index] = 'source={}'.format( + os.path.join(data_dir, match.group(1)) + ) + return binds + + def customize_container_binds( + self, ctx: CephadmContext, binds: List[List[str]] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + binds.extend(self._get_container_binds(data_dir)) + + # Cache the container so we don't need to rebuild it again when calling + # into init_containers + _container: Optional[CephContainer] = None + + def container(self, ctx: CephadmContext) -> CephContainer: + if self._container is None: + ctr = daemon_to_container( + ctx, + self, + host_network=False, + privileged=self.privileged, + ptrace=ctx.allow_ptrace, + ) + self._container = to_deployment_container(ctx, ctr) + return self._container + + def init_containers(self, ctx: CephadmContext) -> List[InitContainer]: + primary = self.container(ctx) + init_containers: List[Dict[str, Any]] = getattr( + ctx, 'init_containers', [] + ) + return [ + InitContainer.from_primary_and_opts(ctx, primary, ic_opts) + for ic_opts in init_containers + ] + + def customize_container_endpoints( + self, endpoints: List[EndPoint], deployment_type: DeploymentType + ) -> None: + if deployment_type == DeploymentType.DEFAULT: + endpoints.extend([EndPoint('0.0.0.0', p) for p in self.ports]) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.extend(self.get_container_envs()) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_container_args()) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + + def default_entrypoint(self) -> str: + return self.entrypoint or '' + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return self.uid, self.gid diff --git a/src/cephadm/cephadmlib/daemons/ingress.py b/src/cephadm/cephadmlib/daemons/ingress.py new file mode 100644 index 00000000000..6064cf538fb --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/ingress.py @@ -0,0 +1,290 @@ +import os + +from typing import Dict, List, Optional, Tuple, Union + +from ..constants import ( + DEFAULT_HAPROXY_IMAGE, + DEFAULT_KEEPALIVED_IMAGE, + DATA_DIR_MODE, +) +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer, extract_uid_gid +from ..context import CephadmContext +from ..context_getters import fetch_configs +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..data_utils import dict_get, is_fsid +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..file_utils import makedirs, populate_files + + +@register_daemon_form +class HAproxy(ContainerDaemonForm): + """Defines an HAproxy container""" + + daemon_type = 'haproxy' + required_files = ['haproxy.cfg'] + default_image = DEFAULT_HAPROXY_IMAGE + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, + ctx: CephadmContext, + fsid: str, + daemon_id: Union[int, str], + config_json: Dict, + image: str, + ) -> None: + self.ctx = ctx + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.files = dict_get(config_json, 'files', {}) + + self.validate() + + @classmethod + def init( + cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] + ) -> 'HAproxy': + return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'HAproxy': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + # create additional directories in data dir for HAproxy to use + if not os.path.isdir(os.path.join(data_dir, 'haproxy')): + makedirs( + os.path.join(data_dir, 'haproxy'), uid, gid, DATA_DIR_MODE + ) + + data_dir = os.path.join(data_dir, 'haproxy') + populate_files(data_dir, self.files, uid, gid) + + def get_daemon_args(self) -> List[str]: + return ['haproxy', '-f', '/var/lib/haproxy/haproxy.cfg'] + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error( + 'required file missing from config-json: %s' % fname + ) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + # better directory for this? + print('UUUUU', extract_uid_gid) + return extract_uid_gid(self.ctx, file_path='/var/lib') + + @staticmethod + def _get_container_mounts(data_dir: str) -> Dict[str, str]: + mounts = dict() + mounts[os.path.join(data_dir, 'haproxy')] = '/var/lib/haproxy' + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + @staticmethod + def get_sysctl_settings() -> List[str]: + return [ + '# IP forwarding and non-local bind', + 'net.ipv4.ip_forward = 1', + 'net.ipv4.ip_nonlocal_bind = 1', + ] + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend( + ['--user=root'] + ) # haproxy 2.4 defaults to a different user + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + + +@register_daemon_form +class Keepalived(ContainerDaemonForm): + """Defines an Keepalived container""" + + daemon_type = 'keepalived' + required_files = ['keepalived.conf'] + default_image = DEFAULT_KEEPALIVED_IMAGE + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, + ctx: CephadmContext, + fsid: str, + daemon_id: Union[int, str], + config_json: Dict, + image: str, + ) -> None: + self.ctx = ctx + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.files = dict_get(config_json, 'files', {}) + + self.validate() + + @classmethod + def init( + cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] + ) -> 'Keepalived': + return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'Keepalived': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + # create additional directories in data dir for keepalived to use + if not os.path.isdir(os.path.join(data_dir, 'keepalived')): + makedirs( + os.path.join(data_dir, 'keepalived'), uid, gid, DATA_DIR_MODE + ) + + # populate files from the config-json + populate_files(data_dir, self.files, uid, gid) + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error( + 'required file missing from config-json: %s' % fname + ) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + @staticmethod + def get_container_envs(): + # type: () -> List[str] + envs = [ + 'KEEPALIVED_AUTOCONF=false', + 'KEEPALIVED_CONF=/etc/keepalived/keepalived.conf', + 'KEEPALIVED_CMD=/usr/sbin/keepalived -n -l -f /etc/keepalived/keepalived.conf', + 'KEEPALIVED_DEBUG=false', + ] + return envs + + @staticmethod + def get_sysctl_settings() -> List[str]: + return [ + '# IP forwarding and non-local bind', + 'net.ipv4.ip_forward = 1', + 'net.ipv4.ip_nonlocal_bind = 1', + ] + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + # better directory for this? + return extract_uid_gid(self.ctx, file_path='/var/lib') + + @staticmethod + def _get_container_mounts(data_dir: str) -> Dict[str, str]: + mounts = dict() + mounts[ + os.path.join(data_dir, 'keepalived.conf') + ] = '/etc/keepalived/keepalived.conf' + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.extend(self.get_container_envs()) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW']) diff --git a/src/cephadm/cephadmlib/daemons/iscsi.py b/src/cephadm/cephadmlib/daemons/iscsi.py new file mode 100644 index 00000000000..1845a37bf4e --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/iscsi.py @@ -0,0 +1,286 @@ +import logging +import os +import re + +from typing import Dict, List, Optional, Tuple + +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer, extract_uid_gid +from ..context_getters import fetch_configs, get_config_and_keyring +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity, DaemonSubIdentity +from ..constants import DEFAULT_IMAGE +from ..context import CephadmContext +from ..data_utils import dict_get, is_fsid +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..file_utils import makedirs, populate_files +from ..call_wrappers import call, CallVerbosity + + +logger = logging.getLogger() + + +@register_daemon_form +class CephIscsi(ContainerDaemonForm): + """Defines a Ceph-Iscsi container""" + + daemon_type = 'iscsi' + entrypoint = '/usr/bin/rbd-target-api' + + required_files = ['iscsi-gateway.cfg'] + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, + ctx: CephadmContext, + ident: DaemonIdentity, + config_json: Dict, + image: str = DEFAULT_IMAGE, + ): + self.ctx = ctx + self._identity = ident + self.image = image + + # config-json options + self.files = dict_get(config_json, 'files', {}) + + # validate the supplied args + self.validate() + + @classmethod + def init( + cls, ctx: CephadmContext, fsid: str, daemon_id: str + ) -> 'CephIscsi': + return cls.create( + ctx, DaemonIdentity(fsid, cls.daemon_type, daemon_id) + ) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'CephIscsi': + return cls(ctx, ident, fetch_configs(ctx), ctx.image) + + @property + def identity(self) -> DaemonIdentity: + return self._identity + + @property + def fsid(self) -> str: + return self._identity.fsid + + @property + def daemon_id(self) -> str: + return self._identity.daemon_id + + @staticmethod + def _get_container_mounts(data_dir, log_dir): + # type: (str, str) -> Dict[str, str] + mounts = dict() + mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' + mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' + mounts[ + os.path.join(data_dir, 'iscsi-gateway.cfg') + ] = '/etc/ceph/iscsi-gateway.cfg:z' + mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config' + mounts[ + os.path.join(data_dir, 'tcmu-runner-entrypoint.sh') + ] = '/usr/local/scripts/tcmu-runner-entrypoint.sh' + mounts[log_dir] = '/var/log:z' + mounts['/dev'] = '/dev' + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + # Removes ending ".tcmu" from data_dir a tcmu-runner uses the same + # data_dir as rbd-runner-api + if data_dir.endswith('.tcmu'): + data_dir = re.sub(r'\.tcmu$', '', data_dir) + log_dir = os.path.join(ctx.log_dir, self.identity.fsid) + mounts.update(CephIscsi._get_container_mounts(data_dir, log_dir)) + + def customize_container_binds( + self, ctx: CephadmContext, binds: List[List[str]] + ) -> None: + lib_modules = [ + 'type=bind', + 'source=/lib/modules', + 'destination=/lib/modules', + 'ro=true', + ] + binds.append(lib_modules) + + @staticmethod + def get_version(ctx, container_id): + # type: (CephadmContext, str) -> Optional[str] + version = None + out, err, code = call( + ctx, + [ + ctx.container_engine.path, + 'exec', + container_id, + '/usr/bin/python3', + '-c', + "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)", + ], + verbosity=CallVerbosity.QUIET, + ) + if code == 0: + version = out.strip() + return version + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error( + 'required file missing from config-json: %s' % fname + ) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + def create_daemon_dirs(self, data_dir, uid, gid): + # type: (str, int, int) -> None + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + logger.info('Creating ceph-iscsi config...') + configfs_dir = os.path.join(data_dir, 'configfs') + makedirs(configfs_dir, uid, gid, 0o755) + + # set up the tcmu-runner entrypoint script + # to be mounted into the container. For more info + # on why we need this script, see the + # tcmu_runner_entrypoint_script function + self.files[ + 'tcmu-runner-entrypoint.sh' + ] = self.tcmu_runner_entrypoint_script() + + # populate files from the config-json + populate_files(data_dir, self.files, uid, gid) + + # we want the tcmu runner entrypoint script to be executable + # populate_files will give it 0o600 by default + os.chmod(os.path.join(data_dir, 'tcmu-runner-entrypoint.sh'), 0o700) + + @staticmethod + def configfs_mount_umount(data_dir, mount=True): + # type: (str, bool) -> List[str] + mount_path = os.path.join(data_dir, 'configfs') + if mount: + cmd = ( + 'if ! grep -qs {0} /proc/mounts; then ' + 'mount -t configfs none {0}; fi'.format(mount_path) + ) + else: + cmd = ( + 'if grep -qs {0} /proc/mounts; then ' + 'umount {0}; fi'.format(mount_path) + ) + return cmd.split() + + @staticmethod + def tcmu_runner_entrypoint_script() -> str: + # since we are having tcmu-runner be a background + # process in its systemd unit (rbd-target-api being + # the main process) systemd will not restart it when + # it fails. in order to try and get around that for now + # we can have a script mounted in the container that + # that attempts to do the restarting for us. This script + # can then become the entrypoint for the tcmu-runner + # container + + # This is intended to be dropped for a better solution + # for at least the squid release onward + return """#!/bin/bash +RUN_DIR=/var/run/tcmu-runner + +if [ ! -d "${RUN_DIR}" ] ; then + mkdir -p "${RUN_DIR}" +fi + +rm -rf "${RUN_DIR}"/* + +while true +do + touch "${RUN_DIR}"/start-up-$(date -Ins) + /usr/bin/tcmu-runner + + # If we got around 3 kills/segfaults in the last minute, + # don't start anymore + if [ $(find "${RUN_DIR}" -type f -cmin -1 | wc -l) -ge 3 ] ; then + exit 0 + fi + + sleep 1 +done +""" + + def get_tcmu_runner_container(self): + # type: () -> CephContainer + # daemon_id, is used to generated the cid and pid files used by podman but as both tcmu-runner + # and rbd-target-api have the same daemon_id, it conflits and prevent the second container from + # starting. .tcmu runner is appended to the daemon_id to fix that. + subident = DaemonSubIdentity( + self.fsid, self.daemon_type, self.daemon_id, 'tcmu' + ) + tcmu_dmn = self.create(self.ctx, subident) + tcmu_container = to_deployment_container( + self.ctx, daemon_to_container(self.ctx, tcmu_dmn, privileged=True) + ) + # TODO: Eventually we don't want to run tcmu-runner through this script. + # This is intended to be a workaround backported to older releases + # and should eventually be removed in at least squid onward + tcmu_container.entrypoint = ( + '/usr/local/scripts/tcmu-runner-entrypoint.sh' + ) + tcmu_container.cname = self.get_container_name(desc='tcmu') + return tcmu_container + + def container(self, ctx: CephadmContext) -> CephContainer: + # So the container can modprobe iscsi_target_mod and have write perms + # to configfs we need to make this a privileged container. + ctr = daemon_to_container(ctx, self, privileged=True) + return to_deployment_container(ctx, ctr) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return extract_uid_gid(ctx) + + def default_entrypoint(self) -> str: + return self.entrypoint + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) diff --git a/src/cephadm/cephadmlib/daemons/monitoring.py b/src/cephadm/cephadmlib/daemons/monitoring.py new file mode 100644 index 00000000000..e0667853dd7 --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/monitoring.py @@ -0,0 +1,377 @@ +import os + +from typing import Dict, List, Tuple + +from ..call_wrappers import call, CallVerbosity +from ..constants import ( + DEFAULT_ALERT_MANAGER_IMAGE, + DEFAULT_GRAFANA_IMAGE, + DEFAULT_LOKI_IMAGE, + DEFAULT_NODE_EXPORTER_IMAGE, + DEFAULT_PROMETHEUS_IMAGE, + DEFAULT_PROMTAIL_IMAGE, +) +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer, extract_uid_gid +from ..context import CephadmContext +from ..context_getters import fetch_configs, fetch_meta +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..net_utils import get_fqdn, get_hostname, get_ip_addresses, wrap_ipv6 + + +@register_daemon_form +class Monitoring(ContainerDaemonForm): + """Define the configs for the monitoring containers""" + + port_map = { + 'prometheus': [ + 9095 + ], # Avoid default 9090, due to conflict with cockpit UI + 'node-exporter': [9100], + 'grafana': [3000], + 'alertmanager': [9093, 9094], + 'loki': [3100], + 'promtail': [9080], + } + + components = { + 'prometheus': { + 'image': DEFAULT_PROMETHEUS_IMAGE, + 'cpus': '2', + 'memory': '4GB', + 'args': [ + '--config.file=/etc/prometheus/prometheus.yml', + '--storage.tsdb.path=/prometheus', + ], + 'config-json-files': [ + 'prometheus.yml', + ], + }, + 'loki': { + 'image': DEFAULT_LOKI_IMAGE, + 'cpus': '1', + 'memory': '1GB', + 'args': [ + '--config.file=/etc/loki/loki.yml', + ], + 'config-json-files': ['loki.yml'], + }, + 'promtail': { + 'image': DEFAULT_PROMTAIL_IMAGE, + 'cpus': '1', + 'memory': '1GB', + 'args': [ + '--config.file=/etc/promtail/promtail.yml', + ], + 'config-json-files': [ + 'promtail.yml', + ], + }, + 'node-exporter': { + 'image': DEFAULT_NODE_EXPORTER_IMAGE, + 'cpus': '1', + 'memory': '1GB', + 'args': ['--no-collector.timex'], + }, + 'grafana': { + 'image': DEFAULT_GRAFANA_IMAGE, + 'cpus': '2', + 'memory': '4GB', + 'args': [], + 'config-json-files': [ + 'grafana.ini', + 'provisioning/datasources/ceph-dashboard.yml', + 'certs/cert_file', + 'certs/cert_key', + ], + }, + 'alertmanager': { + 'image': DEFAULT_ALERT_MANAGER_IMAGE, + 'cpus': '2', + 'memory': '2GB', + 'args': [ + '--cluster.listen-address=:{}'.format( + port_map['alertmanager'][1] + ), + ], + 'config-json-files': [ + 'alertmanager.yml', + ], + 'config-json-args': [ + 'peers', + ], + }, + } # type: ignore + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return daemon_type in cls.components + + @staticmethod + def get_version(ctx, container_id, daemon_type): + # type: (CephadmContext, str, str) -> str + """ + :param: daemon_type Either "prometheus", "alertmanager", "loki", "promtail" or "node-exporter" + """ + assert daemon_type in ( + 'prometheus', + 'alertmanager', + 'node-exporter', + 'loki', + 'promtail', + ) + cmd = daemon_type.replace('-', '_') + code = -1 + err = '' + out = '' + version = '' + if daemon_type == 'alertmanager': + for cmd in ['alertmanager', 'prometheus-alertmanager']: + out, err, code = call( + ctx, + [ + ctx.container_engine.path, + 'exec', + container_id, + cmd, + '--version', + ], + verbosity=CallVerbosity.QUIET, + ) + if code == 0: + break + cmd = 'alertmanager' # reset cmd for version extraction + else: + out, err, code = call( + ctx, + [ + ctx.container_engine.path, + 'exec', + container_id, + cmd, + '--version', + ], + verbosity=CallVerbosity.QUIET, + ) + if code == 0: + if err.startswith('%s, version ' % cmd): + version = err.split(' ')[2] + elif out.startswith('%s, version ' % cmd): + version = out.split(' ')[2] + return version + + @staticmethod + def extract_uid_gid( + ctx: CephadmContext, daemon_type: str + ) -> Tuple[int, int]: + if daemon_type == 'prometheus': + uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus') + elif daemon_type == 'node-exporter': + uid, gid = 65534, 65534 + elif daemon_type == 'grafana': + uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana') + elif daemon_type == 'loki': + uid, gid = extract_uid_gid(ctx, file_path='/etc/loki') + elif daemon_type == 'promtail': + uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail') + elif daemon_type == 'alertmanager': + uid, gid = extract_uid_gid( + ctx, file_path=['/etc/alertmanager', '/etc/prometheus'] + ) + else: + raise Error('{} not implemented yet'.format(daemon_type)) + return uid, gid + + def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None: + self.ctx = ctx + self._identity = ident + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'Monitoring': + return cls(ctx, ident) + + @property + def identity(self) -> DaemonIdentity: + return self._identity + + def container(self, ctx: CephadmContext) -> CephContainer: + self._prevalidate(ctx) + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return self.extract_uid_gid(ctx, self.identity.daemon_type) + + def _prevalidate(self, ctx: CephadmContext) -> None: + # before being refactored into a ContainerDaemonForm these checks were + # done inside the deploy function. This was the only "family" of daemons + # that performed these checks in that location + daemon_type = self.identity.daemon_type + config = fetch_configs(ctx) # type: ignore + required_files = self.components[daemon_type].get( + 'config-json-files', list() + ) + required_args = self.components[daemon_type].get( + 'config-json-args', list() + ) + if required_files: + if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore + raise Error( + '{} deployment requires config-json which must ' + 'contain file content for {}'.format( + daemon_type.capitalize(), ', '.join(required_files) + ) + ) + if required_args: + if not config or not all(c in config.keys() for c in required_args): # type: ignore + raise Error( + '{} deployment requires config-json which must ' + 'contain arg for {}'.format( + daemon_type.capitalize(), ', '.join(required_args) + ) + ) + + def get_daemon_args(self) -> List[str]: + ctx = self.ctx + daemon_type = self.identity.daemon_type + metadata = self.components[daemon_type] + r = list(metadata.get('args', [])) + # set ip and port to bind to for nodeexporter,alertmanager,prometheus + if daemon_type not in ['grafana', 'loki', 'promtail']: + ip = '' + port = self.port_map[daemon_type][0] + meta = fetch_meta(ctx) + if meta: + if 'ip' in meta and meta['ip']: + ip = meta['ip'] + if 'ports' in meta and meta['ports']: + port = meta['ports'][0] + r += [f'--web.listen-address={ip}:{port}'] + if daemon_type == 'prometheus': + config = fetch_configs(ctx) + retention_time = config.get('retention_time', '15d') + retention_size = config.get( + 'retention_size', '0' + ) # default to disabled + r += [f'--storage.tsdb.retention.time={retention_time}'] + r += [f'--storage.tsdb.retention.size={retention_size}'] + scheme = 'http' + host = get_fqdn() + # in case host is not an fqdn then we use the IP to + # avoid producing a broken web.external-url link + if '.' not in host: + ipv4_addrs, ipv6_addrs = get_ip_addresses(get_hostname()) + # use the first ipv4 (if any) otherwise use the first ipv6 + addr = next(iter(ipv4_addrs or ipv6_addrs), None) + host = wrap_ipv6(addr) if addr else host + r += [f'--web.external-url={scheme}://{host}:{port}'] + if daemon_type == 'alertmanager': + config = fetch_configs(ctx) + peers = config.get('peers', list()) # type: ignore + for peer in peers: + r += ['--cluster.peer={}'.format(peer)] + try: + r += [f'--web.config.file={config["web_config"]}'] + except KeyError: + pass + # some alertmanager, by default, look elsewhere for a config + r += ['--config.file=/etc/alertmanager/alertmanager.yml'] + if daemon_type == 'promtail': + r += ['--config.expand-env'] + if daemon_type == 'prometheus': + config = fetch_configs(ctx) + try: + r += [f'--web.config.file={config["web_config"]}'] + except KeyError: + pass + if daemon_type == 'node-exporter': + config = fetch_configs(ctx) + try: + r += [f'--web.config.file={config["web_config"]}'] + except KeyError: + pass + r += [ + '--path.procfs=/host/proc', + '--path.sysfs=/host/sys', + '--path.rootfs=/rootfs', + ] + return r + + def _get_container_mounts(self, data_dir: str) -> Dict[str, str]: + ctx = self.ctx + daemon_type = self.identity.daemon_type + mounts: Dict[str, str] = {} + log_dir = os.path.join(ctx.log_dir, self.identity.fsid) + if daemon_type == 'prometheus': + mounts[ + os.path.join(data_dir, 'etc/prometheus') + ] = '/etc/prometheus:Z' + mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z' + elif daemon_type == 'loki': + mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z' + mounts[os.path.join(data_dir, 'data')] = '/loki:Z' + elif daemon_type == 'promtail': + mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z' + mounts[log_dir] = '/var/log/ceph:z' + mounts[os.path.join(data_dir, 'data')] = '/promtail:Z' + elif daemon_type == 'node-exporter': + mounts[ + os.path.join(data_dir, 'etc/node-exporter') + ] = '/etc/node-exporter:Z' + mounts['/proc'] = '/host/proc:ro' + mounts['/sys'] = '/host/sys:ro' + mounts['/'] = '/rootfs:ro' + elif daemon_type == 'grafana': + mounts[ + os.path.join(data_dir, 'etc/grafana/grafana.ini') + ] = '/etc/grafana/grafana.ini:Z' + mounts[ + os.path.join(data_dir, 'etc/grafana/provisioning/datasources') + ] = '/etc/grafana/provisioning/datasources:Z' + mounts[ + os.path.join(data_dir, 'etc/grafana/certs') + ] = '/etc/grafana/certs:Z' + mounts[ + os.path.join(data_dir, 'data/grafana.db') + ] = '/var/lib/grafana/grafana.db:Z' + elif daemon_type == 'alertmanager': + mounts[ + os.path.join(data_dir, 'etc/alertmanager') + ] = '/etc/alertmanager:Z' + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + uid, _ = self.uid_gid(ctx) + monitoring_args = [ + '--user', + str(uid), + # FIXME: disable cpu/memory limits for the time being (not supported + # by ubuntu 18.04 kernel!) + ] + args.extend(monitoring_args) + if self.identity.daemon_type == 'node-exporter': + # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys', + # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation + # between the node-exporter container and the host to avoid selinux denials + args.extend(['--security-opt', 'label=disable']) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + + def default_entrypoint(self) -> str: + return '' diff --git a/src/cephadm/cephadmlib/daemons/nfs.py b/src/cephadm/cephadmlib/daemons/nfs.py new file mode 100644 index 00000000000..6e2f2a945ca --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/nfs.py @@ -0,0 +1,225 @@ +import logging +import os +import re + +from typing import Dict, List, Optional, Tuple, Union + +from ..call_wrappers import call, CallVerbosity +from ..constants import DEFAULT_IMAGE, CEPH_DEFAULT_CONF +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer, extract_uid_gid +from ..context import CephadmContext +from ..context_getters import fetch_configs, get_config_and_keyring +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..data_utils import dict_get, is_fsid +from ..deploy import DeploymentType +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..file_utils import makedirs, populate_files, write_new +from ..net_utils import EndPoint + + +logger = logging.getLogger() + + +@register_daemon_form +class NFSGanesha(ContainerDaemonForm): + """Defines a NFS-Ganesha container""" + + daemon_type = 'nfs' + entrypoint = '/usr/bin/ganesha.nfsd' + daemon_args = ['-F', '-L', 'STDERR'] + + required_files = ['ganesha.conf'] + + port_map = { + 'nfs': 2049, + } + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, ctx, fsid, daemon_id, config_json, image=DEFAULT_IMAGE + ): + # type: (CephadmContext, str, Union[int, str], Dict, str) -> None + self.ctx = ctx + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.pool = dict_get(config_json, 'pool', require=True) + self.namespace = dict_get(config_json, 'namespace') + self.userid = dict_get(config_json, 'userid') + self.extra_args = dict_get(config_json, 'extra_args', []) + self.files = dict_get(config_json, 'files', {}) + self.rgw = dict_get(config_json, 'rgw', {}) + + # validate the supplied args + self.validate() + + @classmethod + def init(cls, ctx, fsid, daemon_id): + # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha + return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'NFSGanesha': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + def _get_container_mounts(self, data_dir): + # type: (str) -> Dict[str, str] + mounts = dict() + mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' + mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' + mounts[os.path.join(data_dir, 'etc/ganesha')] = '/etc/ganesha:z' + if self.rgw: + cluster = self.rgw.get('cluster', 'ceph') + rgw_user = self.rgw.get('user', 'admin') + mounts[ + os.path.join(data_dir, 'keyring.rgw') + ] = '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user) + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + @staticmethod + def get_container_envs(): + # type: () -> List[str] + envs = ['CEPH_CONF=%s' % (CEPH_DEFAULT_CONF)] + return envs + + @staticmethod + def get_version(ctx, container_id): + # type: (CephadmContext, str) -> Optional[str] + version = None + out, err, code = call( + ctx, + [ + ctx.container_engine.path, + 'exec', + container_id, + NFSGanesha.entrypoint, + '-v', + ], + verbosity=CallVerbosity.QUIET, + ) + if code == 0: + match = re.search(r'NFS-Ganesha Release\s*=\s*[V]*([\d.]+)', out) + if match: + version = match.group(1) + return version + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error( + 'required file missing from config-json: %s' % fname + ) + + # check for an RGW config + if self.rgw: + if not self.rgw.get('keyring'): + raise Error('RGW keyring is missing') + if not self.rgw.get('user'): + raise Error('RGW user is missing') + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = 'ceph-%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + def get_daemon_args(self): + # type: () -> List[str] + return self.daemon_args + self.extra_args + + def create_daemon_dirs(self, data_dir, uid, gid): + # type: (str, int, int) -> None + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + logger.info('Creating ganesha config...') + + # create the ganesha conf dir + config_dir = os.path.join(data_dir, 'etc/ganesha') + makedirs(config_dir, uid, gid, 0o755) + + # populate files from the config-json + populate_files(config_dir, self.files, uid, gid) + + # write the RGW keyring + if self.rgw: + keyring_path = os.path.join(data_dir, 'keyring.rgw') + with write_new(keyring_path, owner=(uid, gid)) as f: + f.write(self.rgw.get('keyring', '')) + + def firewall_service_name(self) -> str: + return 'nfs' + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def customize_container_endpoints( + self, endpoints: List[EndPoint], deployment_type: DeploymentType + ) -> None: + if deployment_type == DeploymentType.DEFAULT and not endpoints: + nfs_ports = list(NFSGanesha.port_map.values()) + endpoints.extend([EndPoint('0.0.0.0', p) for p in nfs_ports]) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + # TODO: extract ganesha uid/gid (997, 994) ? + return extract_uid_gid(ctx) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.extend(self.get_container_envs()) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + + def default_entrypoint(self) -> str: + return self.entrypoint diff --git a/src/cephadm/cephadmlib/daemons/nvmeof.py b/src/cephadm/cephadmlib/daemons/nvmeof.py new file mode 100644 index 00000000000..39488406bc8 --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/nvmeof.py @@ -0,0 +1,193 @@ +import logging +import os + +from typing import Dict, List, Optional, Tuple, Union + +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer +from ..context_getters import fetch_configs, get_config_and_keyring +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..constants import DEFAULT_NVMEOF_IMAGE +from ..context import CephadmContext +from ..data_utils import dict_get, is_fsid +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..file_utils import makedirs, populate_files +from ..call_wrappers import call + + +logger = logging.getLogger() + + +@register_daemon_form +class CephNvmeof(ContainerDaemonForm): + """Defines a Ceph-Nvmeof container""" + + daemon_type = 'nvmeof' + required_files = ['ceph-nvmeof.conf'] + default_image = DEFAULT_NVMEOF_IMAGE + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, ctx, fsid, daemon_id, config_json, image=DEFAULT_NVMEOF_IMAGE + ): + # type: (CephadmContext, str, Union[int, str], Dict, str) -> None + self.ctx = ctx + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image + + # config-json options + self.files = dict_get(config_json, 'files', {}) + + # validate the supplied args + self.validate() + + @classmethod + def init(cls, ctx, fsid, daemon_id): + # type: (CephadmContext, str, Union[int, str]) -> CephNvmeof + return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'CephNvmeof': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + @staticmethod + def _get_container_mounts(data_dir: str) -> Dict[str, str]: + mounts = dict() + mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' + mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' + mounts[ + os.path.join(data_dir, 'ceph-nvmeof.conf') + ] = '/src/ceph-nvmeof.conf:z' + mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config' + mounts['/dev/hugepages'] = '/dev/hugepages' + mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio' + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def customize_container_binds( + self, ctx: CephadmContext, binds: List[List[str]] + ) -> None: + lib_modules = [ + 'type=bind', + 'source=/lib/modules', + 'destination=/lib/modules', + 'ro=true', + ] + binds.append(lib_modules) + + @staticmethod + def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]: + out, err, ret = call( + ctx, + [ + ctx.container_engine.path, + 'inspect', + '--format', + '{{index .Config.Labels "io.ceph.version"}}', + ctx.image, + ], + ) + version = None + if ret == 0: + version = out.strip() + return version + + def validate(self): + # type: () -> None + if not is_fsid(self.fsid): + raise Error('not an fsid: %s' % self.fsid) + if not self.daemon_id: + raise Error('invalid daemon_id: %s' % self.daemon_id) + if not self.image: + raise Error('invalid image: %s' % self.image) + + # check for the required files + if self.required_files: + for fname in self.required_files: + if fname not in self.files: + raise Error( + 'required file missing from config-json: %s' % fname + ) + + def get_daemon_name(self): + # type: () -> str + return '%s.%s' % (self.daemon_type, self.daemon_id) + + def get_container_name(self, desc=None): + # type: (Optional[str]) -> str + cname = '%s-%s' % (self.fsid, self.get_daemon_name()) + if desc: + cname = '%s-%s' % (cname, desc) + return cname + + def create_daemon_dirs(self, data_dir, uid, gid): + # type: (str, int, int) -> None + """Create files under the container data dir""" + if not os.path.isdir(data_dir): + raise OSError('data_dir is not a directory: %s' % (data_dir)) + + logger.info('Creating ceph-nvmeof config...') + configfs_dir = os.path.join(data_dir, 'configfs') + makedirs(configfs_dir, uid, gid, 0o755) + + # populate files from the config-json + populate_files(data_dir, self.files, uid, gid) + + @staticmethod + def configfs_mount_umount(data_dir, mount=True): + # type: (str, bool) -> List[str] + mount_path = os.path.join(data_dir, 'configfs') + if mount: + cmd = ( + 'if ! grep -qs {0} /proc/mounts; then ' + 'mount -t configfs none {0}; fi'.format(mount_path) + ) + else: + cmd = ( + 'if grep -qs {0} /proc/mounts; then ' + 'umount {0}; fi'.format(mount_path) + ) + return cmd.split() + + @staticmethod + def get_sysctl_settings() -> List[str]: + return [ + 'vm.nr_hugepages = 4096', + ] + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return 167, 167 # TODO: need to get properly the uid/gid + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + args.extend(['--ulimit', 'memlock=-1:-1']) + args.extend(['--ulimit', 'nofile=10240']) + args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE']) diff --git a/src/cephadm/cephadmlib/daemons/snmp.py b/src/cephadm/cephadmlib/daemons/snmp.py new file mode 100644 index 00000000000..f334e5f7652 --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/snmp.py @@ -0,0 +1,226 @@ +import json +import os + +from typing import Any, Dict, List, Optional, Tuple, Union +from urllib.error import HTTPError, URLError +from urllib.request import urlopen + +from ..constants import DEFAULT_SNMP_GATEWAY_IMAGE +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer +from ..context import CephadmContext +from ..context_getters import fetch_configs, fetch_endpoints +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..data_utils import is_fsid +from ..deployment_utils import to_deployment_container +from ..exceptions import Error +from ..file_utils import write_new + + +@register_daemon_form +class SNMPGateway(ContainerDaemonForm): + """Defines an SNMP gateway between Prometheus and SNMP monitoring Frameworks""" + + daemon_type = 'snmp-gateway' + SUPPORTED_VERSIONS = ['V2c', 'V3'] + default_image = DEFAULT_SNMP_GATEWAY_IMAGE + DEFAULT_PORT = 9464 + env_filename = 'snmp-gateway.conf' + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + def __init__( + self, + ctx: CephadmContext, + fsid: str, + daemon_id: Union[int, str], + config_json: Dict[str, Any], + image: Optional[str] = None, + ) -> None: + self.ctx = ctx + self.fsid = fsid + self.daemon_id = daemon_id + self.image = image or SNMPGateway.default_image + + self.uid = config_json.get('uid', 0) + self.gid = config_json.get('gid', 0) + + self.destination = config_json.get('destination', '') + self.snmp_version = config_json.get('snmp_version', 'V2c') + self.snmp_community = config_json.get('snmp_community', 'public') + self.log_level = config_json.get('log_level', 'info') + self.snmp_v3_auth_username = config_json.get( + 'snmp_v3_auth_username', '' + ) + self.snmp_v3_auth_password = config_json.get( + 'snmp_v3_auth_password', '' + ) + self.snmp_v3_auth_protocol = config_json.get( + 'snmp_v3_auth_protocol', '' + ) + self.snmp_v3_priv_protocol = config_json.get( + 'snmp_v3_priv_protocol', '' + ) + self.snmp_v3_priv_password = config_json.get( + 'snmp_v3_priv_password', '' + ) + self.snmp_v3_engine_id = config_json.get('snmp_v3_engine_id', '') + + self.validate() + + @classmethod + def init( + cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] + ) -> 'SNMPGateway': + cfgs = fetch_configs(ctx) + assert cfgs # assert some config data was found + return cls(ctx, fsid, daemon_id, cfgs, ctx.image) + + @classmethod + def create( + cls, ctx: CephadmContext, ident: DaemonIdentity + ) -> 'SNMPGateway': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + @staticmethod + def get_version( + ctx: CephadmContext, fsid: str, daemon_id: str + ) -> Optional[str]: + """Return the version of the notifier from it's http endpoint""" + path = os.path.join( + ctx.data_dir, fsid, f'snmp-gateway.{daemon_id}', 'unit.meta' + ) + try: + with open(path, 'r') as env: + metadata = json.loads(env.read()) + except (OSError, json.JSONDecodeError): + return None + + ports = metadata.get('ports', []) + if not ports: + return None + + try: + with urlopen(f'http://127.0.0.1:{ports[0]}/') as r: + html = r.read().decode('utf-8').split('\n') + except (HTTPError, URLError): + return None + + for h in html: + stripped = h.strip() + if stripped.startswith(('<pre>', '<PRE>')) and stripped.endswith( + ('</pre>', '</PRE>') + ): + # <pre>(version=1.2.1, branch=HEAD, revision=7... + return stripped.split(',')[0].split('version=')[1] + + return None + + @property + def port(self) -> int: + endpoints = fetch_endpoints(self.ctx) + if not endpoints: + return self.DEFAULT_PORT + return endpoints[0].port + + def get_daemon_args(self) -> List[str]: + v3_args = [] + base_args = [ + f'--web.listen-address=:{self.port}', + f'--snmp.destination={self.destination}', + f'--snmp.version={self.snmp_version}', + f'--log.level={self.log_level}', + '--snmp.trap-description-template=/etc/snmp_notifier/description-template.tpl', + ] + + if self.snmp_version == 'V3': + # common auth settings + v3_args.extend( + [ + '--snmp.authentication-enabled', + f'--snmp.authentication-protocol={self.snmp_v3_auth_protocol}', + f'--snmp.security-engine-id={self.snmp_v3_engine_id}', + ] + ) + # authPriv setting is applied if we have a privacy protocol setting + if self.snmp_v3_priv_protocol: + v3_args.extend( + [ + '--snmp.private-enabled', + f'--snmp.private-protocol={self.snmp_v3_priv_protocol}', + ] + ) + + return base_args + v3_args + + @property + def data_dir(self) -> str: + return os.path.join( + self.ctx.data_dir, + self.ctx.fsid, + f'{self.daemon_type}.{self.daemon_id}', + ) + + @property + def conf_file_path(self) -> str: + return os.path.join(self.data_dir, self.env_filename) + + def create_daemon_conf(self) -> None: + """Creates the environment file holding 'secrets' passed to the snmp-notifier daemon""" + with write_new(self.conf_file_path) as f: + if self.snmp_version == 'V2c': + f.write(f'SNMP_NOTIFIER_COMMUNITY={self.snmp_community}\n') + else: + f.write( + f'SNMP_NOTIFIER_AUTH_USERNAME={self.snmp_v3_auth_username}\n' + ) + f.write( + f'SNMP_NOTIFIER_AUTH_PASSWORD={self.snmp_v3_auth_password}\n' + ) + if self.snmp_v3_priv_password: + f.write( + f'SNMP_NOTIFIER_PRIV_PASSWORD={self.snmp_v3_priv_password}\n' + ) + + def validate(self) -> None: + """Validate the settings + + Raises: + Error: if the fsid doesn't look like an fsid + Error: if the snmp version is not supported + Error: destination IP and port address missing + """ + if not is_fsid(self.fsid): + raise Error(f'not a valid fsid: {self.fsid}') + + if self.snmp_version not in SNMPGateway.SUPPORTED_VERSIONS: + raise Error(f'not a valid snmp version: {self.snmp_version}') + + if not self.destination: + raise Error( + 'config is missing destination attribute(<ip>:<port>) of the target SNMP listener' + ) + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return self.uid, self.gid + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(f'--env-file={self.conf_file_path}') + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) diff --git a/src/cephadm/cephadmlib/daemons/tracing.py b/src/cephadm/cephadmlib/daemons/tracing.py new file mode 100644 index 00000000000..4d4fecacbb0 --- /dev/null +++ b/src/cephadm/cephadmlib/daemons/tracing.py @@ -0,0 +1,116 @@ +import logging + +from typing import Any, Dict, List, Tuple + +from ..constants import ( + DEFAULT_ELASTICSEARCH_IMAGE, + DEFAULT_JAEGER_AGENT_IMAGE, + DEFAULT_JAEGER_COLLECTOR_IMAGE, + DEFAULT_JAEGER_QUERY_IMAGE, +) +from ..container_daemon_form import ContainerDaemonForm, daemon_to_container +from ..container_types import CephContainer +from ..context import CephadmContext +from ..context_getters import fetch_configs +from ..daemon_form import register as register_daemon_form +from ..daemon_identity import DaemonIdentity +from ..deployment_utils import to_deployment_container + + +logger = logging.getLogger() + + +@register_daemon_form +class Tracing(ContainerDaemonForm): + """Define the configs for the jaeger tracing containers""" + + components: Dict[str, Dict[str, Any]] = { + 'elasticsearch': { + 'image': DEFAULT_ELASTICSEARCH_IMAGE, + 'envs': ['discovery.type=single-node'], + }, + 'jaeger-agent': { + 'image': DEFAULT_JAEGER_AGENT_IMAGE, + }, + 'jaeger-collector': { + 'image': DEFAULT_JAEGER_COLLECTOR_IMAGE, + }, + 'jaeger-query': { + 'image': DEFAULT_JAEGER_QUERY_IMAGE, + }, + } # type: ignore + + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return daemon_type in cls.components + + @staticmethod + def set_configuration(config: Dict[str, str], daemon_type: str) -> None: + if daemon_type in ['jaeger-collector', 'jaeger-query']: + assert 'elasticsearch_nodes' in config + Tracing.components[daemon_type]['envs'] = [ + 'SPAN_STORAGE_TYPE=elasticsearch', + f'ES_SERVER_URLS={config["elasticsearch_nodes"]}', + ] + if daemon_type == 'jaeger-agent': + assert 'collector_nodes' in config + Tracing.components[daemon_type]['daemon_args'] = [ + f'--reporter.grpc.host-port={config["collector_nodes"]}', + '--processor.jaeger-compact.server-host-port=6799', + ] + + def __init__(self, ident: DaemonIdentity) -> None: + self._identity = ident + self._configured = False + + def _configure(self, ctx: CephadmContext) -> None: + if self._configured: + return + config = fetch_configs(ctx) + # Currently, this method side-effects the class attribute, and that + # is unpleasant. In the future it would be nice to move all of + # set_configuration into _confiure and only modify each classes data + # independently + self.set_configuration(config, self.identity.daemon_type) + self._configured = True + + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Tracing': + return cls(ident) + + @property + def identity(self) -> DaemonIdentity: + return self._identity + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = daemon_to_container(ctx, self) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return 65534, 65534 + + def get_daemon_args(self) -> List[str]: + return self.components[self.identity.daemon_type].get( + 'daemon_args', [] + ) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + self._configure(ctx) + # earlier code did an explicit check if the daemon type was jaeger-agent + # and would only call get_daemon_args if that was true. However, since + # the function only returns a non-empty list in the case of jaeger-agent + # that check is unnecessary and is not brought over. + args.extend(self.get_daemon_args()) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + self._configure(ctx) + envs.extend( + self.components[self.identity.daemon_type].get('envs', []) + ) + + def default_entrypoint(self) -> str: + return '' diff --git a/src/cephadm/cephadmlib/deployment_utils.py b/src/cephadm/cephadmlib/deployment_utils.py new file mode 100644 index 00000000000..908fa979f1a --- /dev/null +++ b/src/cephadm/cephadmlib/deployment_utils.py @@ -0,0 +1,35 @@ +import os + +from .container_types import CephContainer +from .context import CephadmContext +from cephadmlib.context_getters import fetch_custom_config_files + + +def to_deployment_container( + ctx: CephadmContext, ctr: CephContainer +) -> CephContainer: + """Given a standard ceph container instance return a CephContainer + prepared for a deployment as a daemon, having the extra args and + custom configurations added. + NOTE: The `ctr` object is mutated before being returned. + """ + if 'extra_container_args' in ctx and ctx.extra_container_args: + ctr.container_args.extend(ctx.extra_container_args) + if 'extra_entrypoint_args' in ctx and ctx.extra_entrypoint_args: + ctr.args.extend(ctx.extra_entrypoint_args) + ccfiles = fetch_custom_config_files(ctx) + if ccfiles: + mandatory_keys = ['mount_path', 'content'] + for conf in ccfiles: + if all(k in conf for k in mandatory_keys): + mount_path = conf['mount_path'] + assert ctr.identity + file_path = os.path.join( + ctx.data_dir, + ctr.identity.fsid, + 'custom_config_files', + ctr.identity.daemon_name, + os.path.basename(mount_path), + ) + ctr.volume_mounts[file_path] = mount_path + return ctr diff --git a/src/cephadm/cephadmlib/file_utils.py b/src/cephadm/cephadmlib/file_utils.py index 7c9e6f69e43..1b9f11499a4 100644 --- a/src/cephadm/cephadmlib/file_utils.py +++ b/src/cephadm/cephadmlib/file_utils.py @@ -139,3 +139,7 @@ def get_file_timestamp(fn): ).strftime(DATEFMT) except Exception: return None + + +def make_run_dir(fsid: str, uid: int, gid: int) -> None: + makedirs(f'/var/run/ceph/{fsid}', uid, gid, 0o770) diff --git a/src/cephadm/tests/fixtures.py b/src/cephadm/tests/fixtures.py index d25dffa9e3b..572c1f9969d 100644 --- a/src/cephadm/tests/fixtures.py +++ b/src/cephadm/tests/fixtures.py @@ -6,7 +6,7 @@ import time from contextlib import contextmanager from pyfakefs import fake_filesystem -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Any def import_cephadm(): @@ -183,3 +183,83 @@ def with_cephadm_ctx( else: yield ctx + +@pytest.fixture() +def funkypatch(monkeypatch): + """Defines the funkypatch fixtures that acts like a mixture between + mock.patch and pytest's monkeypatch fixture. + """ + fp = FunkyPatcher(monkeypatch) + yield fp + + +class FunkyPatcher: + """FunkyPatcher monkeypatches all imported instances of an object. + + Use `patch` to patch the canonical location of an object and FunkyPatcher + will automatically replace other imports of that object. + """ + + def __init__(self, monkeypatcher): + self._mp = monkeypatcher + # keep track of objects we've already patched. this dictionary + # maps a (module-name, object-name) tuple to the original object + # before patching. This could be used to determine if a name has + # already been patched or compare a patched object to the original. + self._originals: Dict[Tuple[str, str], Any] = {} + + def patch( + self, + mod: str, + name: str = '', + *, + dest: Any = None, + force: bool = False, + ) -> Any: + """Patch an object and all existing imports of that object. + Specify mod as `my.mod.name.obj` where obj is name of the object to be + patched or as `my.mod.name` and specify `name` as the name of the + object to be patched. + If the object to be patched is not imported as the same name in `mod` + it will *not* be automatically patched. In other words, `from + my.mod.name import foo` will work, but `from my.mod.name import foo as + _foo` will not. + Use the keyword-only argument `dest` to specify the new object to be + used. A MagicMock will be created and used if dest is None. + Use the keyword-only argument `force` to override checks that a mocked + objects are the same across modules. This can be used in the case that + some other code already patched an object and you want funkypatch to + override that patch (use with caution). + Returns the patched object (the MagicMock or supplied dest). + """ + import sys + import importlib + + if not name: + mod, name = mod.rsplit('.', 1) + modname = (mod, name) + # We don't strictly need the check but patching already patched objs is + # confusing to think about. It's better to block it for now and perhaps + # later we can relax these restrictions or be clever in some way. + if modname in self._originals: + raise KeyError(f'{modname} already patched') + + if dest is None: + dest = mock.MagicMock() + + imod = importlib.import_module(mod) + self._originals[modname] = getattr(imod, name) + + for mname, imod in sys.modules.items(): + try: + obj = getattr(imod, name) + except AttributeError: + # no matching name in module + continue + # make sure that the module imported the same object as the + # one we want to patch out, and not just some naming collision. + # ensure the original object and the one in the module are the + # same object + if obj is self._originals[modname] or force: + self._mp.setattr(imod, name, dest) + return dest diff --git a/src/cephadm/tests/test_cephadm.py b/src/cephadm/tests/test_cephadm.py index 8db8edd0c1b..82850ab597d 100644 --- a/src/cephadm/tests/test_cephadm.py +++ b/src/cephadm/tests/test_cephadm.py @@ -16,6 +16,7 @@ from .fixtures import ( with_cephadm_ctx, mock_bad_firewalld, import_cephadm, + funkypatch, ) from pyfakefs import fake_filesystem @@ -317,13 +318,17 @@ class TestCephAdm(object): with pytest.raises(Exception): _cephadm.prepare_dashboard(ctx, 0, 0, lambda _, extra_mounts=None, ___=None : '5', lambda : None) - @mock.patch('cephadm.logger') - @mock.patch('cephadm.fetch_custom_config_files') - @mock.patch('cephadm.get_container') - def test_to_deployment_container(self, _get_container, _get_config, _logger): + def test_to_deployment_container(self, funkypatch): """ test to_deployment_container properly makes use of extra container args and custom conf files """ + from cephadmlib.deployment_utils import to_deployment_container + + funkypatch.patch('cephadm.logger') + _get_config = funkypatch.patch( + 'cephadmlib.deployment_utils.fetch_custom_config_files' + ) + _get_container = funkypatch.patch('cephadm.get_container') ctx = _cephadm.CephadmContext() ctx.config_json = '-' @@ -357,32 +362,45 @@ class TestCephAdm(object): host_network=True, ) c = _cephadm.get_container(ctx, ident) - c = _cephadm.to_deployment_container(ctx, c) + c = to_deployment_container(ctx, c) assert '--pids-limit=12345' in c.container_args assert '--something' in c.container_args assert os.path.join('data', '9b9d7609-f4d5-4aba-94c8-effa764d96c9', 'custom_config_files', 'grafana.host1', 'testing.str') in c.volume_mounts assert c.volume_mounts[os.path.join('data', '9b9d7609-f4d5-4aba-94c8-effa764d96c9', 'custom_config_files', 'grafana.host1', 'testing.str')] == '/etc/testing.str' - @mock.patch('cephadm.logger') - @mock.patch('cephadm.FileLock') - @mock.patch('cephadm.deploy_daemon') - @mock.patch('cephadm.make_var_run') - @mock.patch('cephadm.migrate_sysctl_dir') - @mock.patch('cephadm.check_unit', lambda *args, **kwargs: (None, 'running', None)) - @mock.patch('cephadm.get_unit_name', lambda *args, **kwargs: 'mon-unit-name') - @mock.patch('cephadm.extract_uid_gid', lambda *args, **kwargs: (0, 0)) - @mock.patch('cephadm.get_container') - @mock.patch('cephadm.apply_deploy_config_to_ctx', lambda d, c: None) - def test_mon_crush_location(self, _get_container, _migrate_sysctl, _make_var_run, _deploy_daemon, _file_lock, _logger, monkeypatch): + def test_mon_crush_location(self, funkypatch): """ test that crush location for mon is set if it is included in config_json """ - _fetch_configs = mock.MagicMock() - monkeypatch.setattr('cephadmlib.context_getters.fetch_configs', _fetch_configs) - monkeypatch.setattr('cephadm.fetch_configs', _fetch_configs) - monkeypatch.setattr('cephadm.read_configuration_source', lambda c: {}) - monkeypatch.setattr('cephadm.fetch_custom_config_files', mock.MagicMock()) + funkypatch.patch('cephadm.logger') + funkypatch.patch('cephadm.FileLock') + _deploy_daemon = funkypatch.patch('cephadm.deploy_daemon') + funkypatch.patch('cephadm.make_var_run') + funkypatch.patch('cephadmlib.file_utils.make_run_dir') + _migrate_sysctl = funkypatch.patch('cephadm.migrate_sysctl_dir') + funkypatch.patch( + 'cephadm.check_unit', + dest=lambda *args, **kwargs: (None, 'running', None), + ) + funkypatch.patch( + 'cephadm.get_unit_name', + dest=lambda *args, **kwargs: 'mon-unit-name', + ) + funkypatch.patch( + 'cephadm.extract_uid_gid', dest=lambda *args, **kwargs: (0, 0) + ) + _get_container = funkypatch.patch('cephadm.get_container') + funkypatch.patch( + 'cephadm.apply_deploy_config_to_ctx', dest=lambda d, c: None + ) + _fetch_configs = funkypatch.patch( + 'cephadmlib.context_getters.fetch_configs' + ) + funkypatch.patch( + 'cephadm.read_configuration_source', dest=lambda c: {} + ) + funkypatch.patch('cephadm.fetch_custom_config_files') ctx = _cephadm.CephadmContext() ctx.name = 'mon.test' @@ -541,25 +559,31 @@ class TestCephAdm(object): def test_dict_get(self): - result = _cephadm.dict_get({'a': 1}, 'a', require=True) + from cephadmlib.data_utils import dict_get + + result = dict_get({'a': 1}, 'a', require=True) assert result == 1 - result = _cephadm.dict_get({'a': 1}, 'b') + result = dict_get({'a': 1}, 'b') assert result is None - result = _cephadm.dict_get({'a': 1}, 'b', default=2) + result = dict_get({'a': 1}, 'b', default=2) assert result == 2 def test_dict_get_error(self): + from cephadmlib.data_utils import dict_get + with pytest.raises(_cephadm.Error): - _cephadm.dict_get({'a': 1}, 'b', require=True) + dict_get({'a': 1}, 'b', require=True) def test_dict_get_join(self): - result = _cephadm.dict_get_join({'foo': ['a', 'b']}, 'foo') + from cephadmlib.data_utils import dict_get_join + + result = dict_get_join({'foo': ['a', 'b']}, 'foo') assert result == 'a\nb' - result = _cephadm.dict_get_join({'foo': [1, 2]}, 'foo') + result = dict_get_join({'foo': [1, 2]}, 'foo') assert result == '1\n2' - result = _cephadm.dict_get_join({'bar': 'a'}, 'bar') + result = dict_get_join({'bar': 'a'}, 'bar') assert result == 'a' - result = _cephadm.dict_get_join({'a': 1}, 'a') + result = dict_get_join({'a': 1}, 'a') assert result == 1 @mock.patch('os.listdir', return_value=[]) @@ -761,24 +785,26 @@ class TestCephAdm(object): assert _cephadm.get_container_info(ctx, daemon_filter, by_name) == output def test_should_log_to_journald(self): + from cephadmlib import context_getters + ctx = _cephadm.CephadmContext() # explicit ctx.log_to_journald = True - assert _cephadm.should_log_to_journald(ctx) + assert context_getters.should_log_to_journald(ctx) ctx.log_to_journald = None # enable if podman support --cgroup=split ctx.container_engine = mock_podman() ctx.container_engine.version = (2, 1, 0) - assert _cephadm.should_log_to_journald(ctx) + assert context_getters.should_log_to_journald(ctx) # disable on old podman ctx.container_engine.version = (2, 0, 0) - assert not _cephadm.should_log_to_journald(ctx) + assert not context_getters.should_log_to_journald(ctx) # disable on docker ctx.container_engine = mock_docker() - assert not _cephadm.should_log_to_journald(ctx) + assert not context_getters.should_log_to_journald(ctx) def test_normalize_image_digest(self): s = 'myhostname:5000/ceph/ceph@sha256:753886ad9049004395ae990fbb9b096923b5a518b819283141ee8716ddf55ad1' @@ -1186,15 +1212,17 @@ class TestMaintenance: class TestMonitoring(object): - @mock.patch('cephadm.call') + @mock.patch('cephadmlib.daemons.monitoring.call') def test_get_version_alertmanager(self, _call): + from cephadmlib.daemons import monitoring + ctx = _cephadm.CephadmContext() ctx.container_engine = mock_podman() daemon_type = 'alertmanager' # binary `prometheus` _call.return_value = '', '{}, version 0.16.1'.format(daemon_type), 0 - version = _cephadm.Monitoring.get_version(ctx, 'container_id', daemon_type) + version = monitoring.Monitoring.get_version(ctx, 'container_id', daemon_type) assert version == '0.16.1' # binary `prometheus-alertmanager` @@ -1205,13 +1233,15 @@ class TestMonitoring(object): version = _cephadm.Monitoring.get_version(ctx, 'container_id', daemon_type) assert version == '0.16.1' - @mock.patch('cephadm.call') + @mock.patch('cephadmlib.daemons.monitoring.call') def test_get_version_prometheus(self, _call): + from cephadmlib.daemons import monitoring + ctx = _cephadm.CephadmContext() ctx.container_engine = mock_podman() daemon_type = 'prometheus' _call.return_value = '', '{}, version 0.16.1'.format(daemon_type), 0 - version = _cephadm.Monitoring.get_version(ctx, 'container_id', daemon_type) + version = monitoring.Monitoring.get_version(ctx, 'container_id', daemon_type) assert version == '0.16.1' def test_prometheus_external_url(self): @@ -1225,13 +1255,15 @@ class TestMonitoring(object): ).get_daemon_args() assert any([x.startswith('--web.external-url=http://') for x in args]) - @mock.patch('cephadm.call') + @mock.patch('cephadmlib.daemons.monitoring.call') def test_get_version_node_exporter(self, _call): + from cephadmlib.daemons import monitoring + ctx = _cephadm.CephadmContext() ctx.container_engine = mock_podman() daemon_type = 'node-exporter' _call.return_value = '', '{}, version 0.16.1'.format(daemon_type.replace('-', '_')), 0 - version = _cephadm.Monitoring.get_version(ctx, 'container_id', daemon_type) + version = monitoring.Monitoring.get_version(ctx, 'container_id', daemon_type) assert version == '0.16.1' def test_create_daemon_dirs_prometheus(self, cephadm_fs): @@ -2113,16 +2145,12 @@ class TestValidateRepo: class TestPull: - - @mock.patch('time.sleep') - @mock.patch('cephadm.get_image_info_from_inspect', return_value={}) - @mock.patch('cephadm.logger') - def test_error(self, _logger, _get_image_info_from_inspect, _sleep, monkeypatch): - # manually create a mock and use pytest's monkeypatch fixture to set - # multiple targets to the *same* mock - _call = mock.MagicMock() - monkeypatch.setattr('cephadm.call', _call) - monkeypatch.setattr('cephadmlib.call_wrappers.call', _call) + def test_error(self, funkypatch): + funkypatch.patch('time.sleep') + funkypatch.patch('cephadm.logger') + _giifi = funkypatch.patch('cephadm.get_image_info_from_inspect') + _giifi.return_value = {} + _call = funkypatch.patch('cephadmlib.call_wrappers.call') ctx = _cephadm.CephadmContext() ctx.container_engine = mock_podman() ctx.insecure = False diff --git a/src/cephadm/tests/test_daemon_form.py b/src/cephadm/tests/test_daemon_form.py index 07896cc5855..a2d1773f1c8 100644 --- a/src/cephadm/tests/test_daemon_form.py +++ b/src/cephadm/tests/test_daemon_form.py @@ -6,6 +6,7 @@ from .fixtures import import_cephadm from cephadmlib import daemon_form from cephadmlib import daemon_identity +from cephadmlib import daemons _cephadm = import_cephadm() @@ -22,7 +23,7 @@ _cephadm = import_cephadm() ('mon', _cephadm.Ceph), ('nfs', _cephadm.NFSGanesha), ('nvmeof', _cephadm.CephNvmeof), - ('osd', _cephadm.OSD), + ('osd', daemons.OSD), ('prometheus', _cephadm.Monitoring), ('snmp-gateway', _cephadm.SNMPGateway), ], diff --git a/src/cephadm/tests/test_deploy.py b/src/cephadm/tests/test_deploy.py index c77b243dfa7..dadf3456fd5 100644 --- a/src/cephadm/tests/test_deploy.py +++ b/src/cephadm/tests/test_deploy.py @@ -8,38 +8,35 @@ from .fixtures import ( import_cephadm, mock_podman, with_cephadm_ctx, + FunkyPatcher, + funkypatch, ) _cephadm = import_cephadm() -def _common_mp(monkeypatch): +def _common_patches(funkypatch): mocks = {} - _call = mock.MagicMock(return_value=('', '', 0)) - monkeypatch.setattr('cephadmlib.container_types.call', _call) + _call = funkypatch.patch('cephadmlib.container_types.call') + _call.return_value = ('', '', 0) mocks['call'] = _call - _call_throws = mock.MagicMock(return_value=0) - monkeypatch.setattr( - 'cephadmlib.container_types.call_throws', _call_throws - ) + _call_throws = funkypatch.patch('cephadmlib.container_types.call_throws') + _call_throws.return_value = ('', '', 0) mocks['call_throws'] = _call_throws - _firewalld = mock.MagicMock() + _firewalld = funkypatch.patch('cephadm.Firewalld') _firewalld().external_ports.get.return_value = [] - monkeypatch.setattr('cephadm.Firewalld', _firewalld) mocks['Firewalld'] = _firewalld - _extract_uid_gid = mock.MagicMock() + _extract_uid_gid = funkypatch.patch('cephadm.extract_uid_gid', force=True) _extract_uid_gid.return_value = (8765, 8765) - monkeypatch.setattr('cephadm.extract_uid_gid', _extract_uid_gid) mocks['extract_uid_gid'] = _extract_uid_gid - _install_sysctl = mock.MagicMock() - monkeypatch.setattr('cephadm.install_sysctl', _install_sysctl) + _install_sysctl = funkypatch.patch('cephadm.install_sysctl') mocks['install_sysctl'] = _install_sysctl return mocks -def test_deploy_nfs_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_nfs_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: @@ -75,8 +72,8 @@ def test_deploy_nfs_container(cephadm_fs, monkeypatch): assert f.read() == 'FAKE' -def test_deploy_snmp_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_snmp_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: @@ -107,8 +104,8 @@ def test_deploy_snmp_container(cephadm_fs, monkeypatch): assert not (basedir / 'keyring').exists() -def test_deploy_keepalived_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_keepalived_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] _install_sysctl = mocks['install_sysctl'] fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' @@ -155,8 +152,8 @@ def test_deploy_keepalived_container(cephadm_fs, monkeypatch): assert len(_install_sysctl.call_args[0][-1].get_sysctl_settings()) > 1 -def test_deploy_haproxy_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_haproxy_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] _install_sysctl = mocks['install_sysctl'] fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' @@ -200,8 +197,8 @@ def test_deploy_haproxy_container(cephadm_fs, monkeypatch): assert len(_install_sysctl.call_args[0][-1].get_sysctl_settings()) > 1 -def test_deploy_iscsi_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_iscsi_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: @@ -244,8 +241,8 @@ def test_deploy_iscsi_container(cephadm_fs, monkeypatch): assert (si.st_uid, si.st_gid) == (8765, 8765) -def test_deploy_nvmeof_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_nvmeof_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: @@ -290,11 +287,11 @@ def test_deploy_nvmeof_container(cephadm_fs, monkeypatch): assert (si.st_uid, si.st_gid) == (167, 167) -def test_deploy_a_monitoring_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_a_monitoring_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] - _get_ip_addresses = mock.MagicMock(return_value=(['10.10.10.10'], [])) - monkeypatch.setattr('cephadm.get_ip_addresses', _get_ip_addresses) + _get_ip_addresses = funkypatch.patch('cephadmlib.net_utils.get_ip_addresses') + _get_ip_addresses.return_value = (['10.10.10.10'], []) fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: ctx.container_engine = mock_podman() @@ -330,8 +327,8 @@ def test_deploy_a_monitoring_container(cephadm_fs, monkeypatch): assert (si.st_uid, si.st_gid) == (8765, 8765) -def test_deploy_a_tracing_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_a_tracing_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: @@ -361,11 +358,10 @@ def test_deploy_a_tracing_container(cephadm_fs, monkeypatch): assert not (basedir / 'keyring').exists() -def test_deploy_ceph_mgr_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_ceph_mgr_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] - _make_var_run = mock.MagicMock() - monkeypatch.setattr('cephadm.make_var_run', _make_var_run) + _make_run_dir = funkypatch.patch('cephadmlib.file_utils.make_run_dir') fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: ctx.container_engine = mock_podman() @@ -399,16 +395,15 @@ def test_deploy_ceph_mgr_container(cephadm_fs, monkeypatch): assert f.read() == 'XXXXXXX' with open(basedir / 'keyring') as f: assert f.read() == 'YYYYYY' - assert _make_var_run.call_count == 1 - assert _make_var_run.call_args[0][2] == 8765 - assert _make_var_run.call_args[0][3] == 8765 + assert _make_run_dir.call_count == 1 + assert _make_run_dir.call_args[0][1] == 8765 + assert _make_run_dir.call_args[0][2] == 8765 -def test_deploy_ceph_osd_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_ceph_osd_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] - _make_var_run = mock.MagicMock() - monkeypatch.setattr('cephadm.make_var_run', _make_var_run) + _make_run_dir = funkypatch.patch('cephadmlib.file_utils.make_run_dir') fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: ctx.container_engine = mock_podman() @@ -444,18 +439,17 @@ def test_deploy_ceph_osd_container(cephadm_fs, monkeypatch): assert f.read() == 'XXXXXXX' with open(basedir / 'keyring') as f: assert f.read() == 'YYYYYY' - assert _make_var_run.call_count == 1 - assert _make_var_run.call_args[0][2] == 8765 - assert _make_var_run.call_args[0][3] == 8765 + assert _make_run_dir.call_count == 1 + assert _make_run_dir.call_args[0][1] == 8765 + assert _make_run_dir.call_args[0][2] == 8765 -def test_deploy_ceph_exporter_container(cephadm_fs, monkeypatch): - mocks = _common_mp(monkeypatch) +def test_deploy_ceph_exporter_container(cephadm_fs, funkypatch): + mocks = _common_patches(funkypatch) _firewalld = mocks['Firewalld'] - _get_ip_addresses = mock.MagicMock(return_value=(['10.10.10.10'], [])) - monkeypatch.setattr('cephadm.get_ip_addresses', _get_ip_addresses) - _make_var_run = mock.MagicMock() - monkeypatch.setattr('cephadm.make_var_run', _make_var_run) + _get_ip_addresses = funkypatch.patch('cephadmlib.net_utils.get_ip_addresses') + _get_ip_addresses.return_value = (['10.10.10.10'], []) + _make_run_dir = funkypatch.patch('cephadmlib.file_utils.make_run_dir') fsid = 'b01dbeef-701d-9abe-0000-e1e5a47004a7' with with_cephadm_ctx([]) as ctx: ctx.container_engine = mock_podman() diff --git a/src/cephadm/tests/test_nfs.py b/src/cephadm/tests/test_nfs.py index 94ab6afcfdf..aae8113382d 100644 --- a/src/cephadm/tests/test_nfs.py +++ b/src/cephadm/tests/test_nfs.py @@ -155,15 +155,17 @@ def test_nfsganesha_container_envs(): def test_nfsganesha_get_version(): + from cephadmlib.daemons import nfs + with with_cephadm_ctx([]) as ctx: - nfsg = _cephadm.NFSGanesha( + nfsg = nfs.NFSGanesha( ctx, SAMPLE_UUID, "fred", good_nfs_json(), ) - with mock.patch("cephadm.call") as _call: + with mock.patch("cephadmlib.daemons.nfs.call") as _call: _call.return_value = ("NFS-Ganesha Release = V100", "", 0) ver = nfsg.get_version(ctx, "fake_version") _call.assert_called() |