diff options
Diffstat (limited to 'src/cephadm/cephadm.py')
-rwxr-xr-x | src/cephadm/cephadm.py | 6524 |
1 files changed, 1708 insertions, 4816 deletions
diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py index 5ad4712b3cb..348f581f9e6 100755 --- a/src/cephadm/cephadm.py +++ b/src/cephadm/cephadm.py @@ -1,18 +1,12 @@ #!/usr/bin/python3 -import asyncio -import asyncio.subprocess import argparse import datetime -import fcntl import ipaddress import io import json import logging -from logging.config import dictConfig import os -import platform -import pwd import random import shlex import shutil @@ -23,16 +17,13 @@ import sys import tempfile import time import errno -import struct import ssl -from enum import Enum -from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO, Generator +from typing import Dict, List, Tuple, Optional, Union, Any, Callable, IO, Sequence, TypeVar, cast, Iterable, TextIO import re import uuid -from configparser import ConfigParser -from contextlib import redirect_stdout, contextmanager +from contextlib import redirect_stdout from functools import wraps from glob import glob from io import StringIO @@ -41,115 +32,158 @@ from urllib.error import HTTPError, URLError from urllib.request import urlopen, Request from pathlib import Path -FuncT = TypeVar('FuncT', bound=Callable) - -# Default container images ----------------------------------------------------- -DEFAULT_IMAGE = 'quay.ceph.io/ceph-ci/ceph:main' -DEFAULT_IMAGE_IS_MAIN = True -DEFAULT_IMAGE_RELEASE = 'reef' -DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0' -DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0' -DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0' -DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0' -DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0' -DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7' -DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3' -DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4' -DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1' -DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1' -DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23' -DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29' -DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29' -DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29' -DEFAULT_REGISTRY = 'docker.io' # normalize unqualified digests to this -# ------------------------------------------------------------------------------ - -LATEST_STABLE_RELEASE = 'reef' -DATA_DIR = '/var/lib/ceph' -LOG_DIR = '/var/log/ceph' -LOCK_DIR = '/run/cephadm' -LOGROTATE_DIR = '/etc/logrotate.d' -SYSCTL_DIR = '/etc/sysctl.d' -UNIT_DIR = '/etc/systemd/system' -CEPH_CONF_DIR = 'config' -CEPH_CONF = 'ceph.conf' -CEPH_PUBKEY = 'ceph.pub' -CEPH_KEYRING = 'ceph.client.admin.keyring' -CEPH_DEFAULT_CONF = f'/etc/ceph/{CEPH_CONF}' -CEPH_DEFAULT_KEYRING = f'/etc/ceph/{CEPH_KEYRING}' -CEPH_DEFAULT_PUBKEY = f'/etc/ceph/{CEPH_PUBKEY}' -LOG_DIR_MODE = 0o770 -DATA_DIR_MODE = 0o700 -DEFAULT_MODE = 0o600 -CONTAINER_INIT = True -MIN_PODMAN_VERSION = (2, 0, 2) -CGROUPS_SPLIT_PODMAN_VERSION = (2, 1, 0) -PIDS_LIMIT_UNLIMITED_PODMAN_VERSION = (3, 4, 1) -CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ ' -DEFAULT_TIMEOUT = None # in seconds -DEFAULT_RETRY = 15 -DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ' -QUIET_LOG_LEVEL = 9 # DEBUG is 10, so using 9 to be lower level than DEBUG -NO_DEPRECATED = False - -logger: logging.Logger = None # type: ignore - -""" -You can invoke cephadm in two ways: - -1. The normal way, at the command line. - -2. By piping the script to the python3 binary. In this latter case, you should - prepend one or more lines to the beginning of the script. - - For arguments, +from cephadmlib.constants import ( + # default images + DEFAULT_ALERT_MANAGER_IMAGE, + DEFAULT_ELASTICSEARCH_IMAGE, + DEFAULT_GRAFANA_IMAGE, + DEFAULT_HAPROXY_IMAGE, + DEFAULT_IMAGE, + DEFAULT_IMAGE_IS_MAIN, + DEFAULT_IMAGE_RELEASE, + DEFAULT_JAEGER_AGENT_IMAGE, + DEFAULT_JAEGER_COLLECTOR_IMAGE, + DEFAULT_JAEGER_QUERY_IMAGE, + DEFAULT_KEEPALIVED_IMAGE, + DEFAULT_LOKI_IMAGE, + DEFAULT_NODE_EXPORTER_IMAGE, + DEFAULT_NVMEOF_IMAGE, + DEFAULT_PROMETHEUS_IMAGE, + DEFAULT_PROMTAIL_IMAGE, + DEFAULT_SNMP_GATEWAY_IMAGE, + # other constant values + CEPH_CONF, + CEPH_CONF_DIR, + CEPH_DEFAULT_CONF, + CEPH_DEFAULT_KEYRING, + CEPH_DEFAULT_PUBKEY, + CEPH_KEYRING, + CEPH_PUBKEY, + CONTAINER_INIT, + CUSTOM_PS1, + DATA_DIR, + DATA_DIR_MODE, + DATEFMT, + DEFAULT_RETRY, + DEFAULT_TIMEOUT, + LATEST_STABLE_RELEASE, + LOGROTATE_DIR, + LOG_DIR, + LOG_DIR_MODE, + SYSCTL_DIR, + UNIT_DIR, +) +from cephadmlib.context import CephadmContext +from cephadmlib.context_getters import ( + fetch_configs, + fetch_custom_config_files, + fetch_endpoints, + fetch_meta, + get_config_and_keyring, + get_parm, + read_configuration_source, + should_log_to_journald, +) +from cephadmlib.exceptions import ( + ClusterAlreadyExists, + Error, + UnauthorizedRegistryError, +) +from cephadmlib.exe_utils import find_executable, find_program +from cephadmlib.call_wrappers import ( + CallVerbosity, + async_run, + call, + call_throws, + call_timeout, + concurrent_tasks, +) +from cephadmlib.container_engines import ( + Docker, + Podman, + check_container_engine, + find_container_engine, + pull_command, + registry_login, +) +from cephadmlib.data_utils import ( + dict_get, + dict_get_join, + get_legacy_config_fsid, + is_fsid, + normalize_image_digest, + try_convert_datetime, + read_config, + with_units_to_int, +) +from cephadmlib.file_utils import ( + get_file_timestamp, + makedirs, + pathify, + populate_files, + read_file, + recursive_chown, + touch, + write_new, + write_tmp, +) +from cephadmlib.net_utils import ( + build_addrv_params, + EndPoint, + check_ip_port, + check_subnet, + get_fqdn, + get_hostname, + get_ip_addresses, + get_short_hostname, + ip_in_subnets, + is_ipv6, + parse_mon_addrv, + parse_mon_ip, + port_in_use, + unwrap_ipv6, + wrap_ipv6, +) +from cephadmlib.locking import FileLock +from cephadmlib.daemon_identity import DaemonIdentity, DaemonSubIdentity +from cephadmlib.packagers import create_packager, Packager +from cephadmlib.logging import cephadm_init_logging, Highlight, LogDestination +from cephadmlib.systemd import check_unit, check_units +from cephadmlib.container_types import ( + CephContainer, + InitContainer, + is_container_running, + extract_uid_gid, +) +from cephadmlib.decorators import ( + deprecated_command, + executes_early, + require_image +) +from cephadmlib.host_facts import HostFacts, list_networks +from cephadmlib.ssh import authorize_ssh_key, check_ssh_connectivity +from cephadmlib.daemon_form import ( + DaemonForm, + UnexpectedDaemonTypeError, + create as daemon_form_create, + register as register_daemon_form, +) +from cephadmlib.deploy import DeploymentType +from cephadmlib.container_daemon_form import ContainerDaemonForm +from cephadmlib.sysctl import install_sysctl, migrate_sysctl_dir +from cephadmlib.firewalld import Firewalld, update_firewalld +from cephadmlib import templating - injected_argv = [...] - e.g., - - injected_argv = ['ls'] +FuncT = TypeVar('FuncT', bound=Callable) - For reading stdin from the '--config-json -' argument, - injected_stdin = '...' -""" -cached_stdin = None +logger = logging.getLogger() ################################## -async def run_func(func: Callable, cmd: str) -> subprocess.CompletedProcess: - logger.debug(f'running function {func.__name__}, with parms: {cmd}') - response = func(cmd) - return response - - -async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]: - tasks = [] - for cmd in cmd_list: - tasks.append(run_func(func, cmd)) - - data = await asyncio.gather(*tasks) - - return data - - -class EndPoint: - """EndPoint representing an ip:port format""" - - def __init__(self, ip: str, port: int) -> None: - self.ip = ip - self.port = port - - def __str__(self) -> str: - return f'{self.ip}:{self.port}' - - def __repr__(self) -> str: - return f'{self.ip}:{self.port}' - - class ContainerInfo: def __init__(self, container_id: str, image_name: str, @@ -171,235 +205,220 @@ class ContainerInfo: and self.start == other.start and self.version == other.version) +################################## -class DeploymentType(Enum): - # Fresh deployment of a daemon. - DEFAULT = 'Deploy' - # Redeploying a daemon. Works the same as fresh - # deployment minus port checking. - REDEPLOY = 'Redeploy' - # Reconfiguring a daemon. Rewrites config - # files and potentially restarts daemon. - RECONFIG = 'Reconfig' - - -class BaseConfig: - - def __init__(self) -> None: - self.image: str = '' - self.docker: bool = False - self.data_dir: str = DATA_DIR - self.log_dir: str = LOG_DIR - self.logrotate_dir: str = LOGROTATE_DIR - self.sysctl_dir: str = SYSCTL_DIR - self.unit_dir: str = UNIT_DIR - self.verbose: bool = False - self.timeout: Optional[int] = DEFAULT_TIMEOUT - self.retry: int = DEFAULT_RETRY - self.env: List[str] = [] - self.memory_request: Optional[int] = None - self.memory_limit: Optional[int] = None - self.log_to_journald: Optional[bool] = None - - self.container_init: bool = CONTAINER_INIT - self.container_engine: Optional[ContainerEngine] = None - - def set_from_args(self, args: argparse.Namespace) -> None: - argdict: Dict[str, Any] = vars(args) - for k, v in argdict.items(): - if hasattr(self, k): - setattr(self, k, v) - - -class CephadmContext: - - def __init__(self) -> None: - self.__dict__['_args'] = None - self.__dict__['_conf'] = BaseConfig() - - def set_args(self, args: argparse.Namespace) -> None: - self._conf.set_from_args(args) - self._args = args - - def has_function(self) -> bool: - return 'func' in self._args - - def __contains__(self, name: str) -> bool: - return hasattr(self, name) - - def __getattr__(self, name: str) -> Any: - if '_conf' in self.__dict__ and hasattr(self._conf, name): - return getattr(self._conf, name) - elif '_args' in self.__dict__ and hasattr(self._args, name): - return getattr(self._args, name) - else: - return super().__getattribute__(name) - - def __setattr__(self, name: str, value: Any) -> None: - if hasattr(self._conf, name): - setattr(self._conf, name, value) - elif hasattr(self._args, name): - setattr(self._args, name, value) - else: - super().__setattr__(name, value) - - -class ContainerEngine: - def __init__(self) -> None: - self.path = find_program(self.EXE) - - @property - def EXE(self) -> str: - raise NotImplementedError() - def __str__(self) -> str: - return f'{self.EXE} ({self.path})' +@register_daemon_form +class Ceph(ContainerDaemonForm): + _daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror', + 'crash', 'cephfs-mirror') + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + # TODO: figure out a way to un-special-case osd + return daemon_type in cls._daemons and daemon_type != 'osd' -class Podman(ContainerEngine): - EXE = 'podman' + def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None: + self.ctx = ctx + self._identity = ident + self.user_supplied_config = False - def __init__(self) -> None: - super().__init__() - self._version: Optional[Tuple[int, ...]] = None + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Ceph': + return cls(ctx, ident) @property - def version(self) -> Tuple[int, ...]: - if self._version is None: - raise RuntimeError('Please call `get_version` first') - return self._version - - def get_version(self, ctx: CephadmContext) -> None: - out, _, _ = call_throws(ctx, [self.path, 'version', '--format', '{{.Client.Version}}'], verbosity=CallVerbosity.QUIET) - self._version = _parse_podman_version(out) + def identity(self) -> DaemonIdentity: + return self._identity + + def firewall_service_name(self) -> str: + if self.identity.daemon_type == 'mon': + return 'ceph-mon' + elif self.identity.daemon_type in ['mgr', 'mds']: + return 'ceph' + return '' - def __str__(self) -> str: - version = '.'.join(map(str, self.version)) - return f'{self.EXE} ({self.path}) version {version}' + def container(self, ctx: CephadmContext) -> CephContainer: + # previous to being a ContainerDaemonForm, this make_var_run + # call was hard coded in the deploy path. Eventually, it would be + # good to move this somwhere cleaner and avoid needing to know the + # uid/gid here. + uid, gid = self.uid_gid(ctx) + make_var_run(ctx, ctx.fsid, uid, gid) + ctr = get_container(ctx, self.identity) + ctr = to_deployment_container(ctx, ctr) + config_json = fetch_configs(ctx) + if self.identity.daemon_type == 'mon' and config_json is not None: + if 'crush_location' in config_json: + c_loc = config_json['crush_location'] + # was originally "c.args.extend(['--set-crush-location', c_loc])" + # but that doesn't seem to persist in the object after it's passed + # in further function calls + ctr.args = ctr.args + ['--set-crush-location', c_loc] + return ctr -class Docker(ContainerEngine): - EXE = 'docker' + _uid_gid: Optional[Tuple[int, int]] = None + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + if self._uid_gid is None: + self._uid_gid = extract_uid_gid(ctx) + return self._uid_gid -CONTAINER_PREFERENCE = (Podman, Docker) # prefer podman to docker + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + def get_daemon_args(self) -> List[str]: + if self.identity.daemon_type == 'crash': + return [] + r = [ + '--setuser', 'ceph', + '--setgroup', 'ceph', + '--default-log-to-file=false', + ] + log_to_journald = should_log_to_journald(self.ctx) + if log_to_journald: + r += [ + '--default-log-to-journald=true', + '--default-log-to-stderr=false', + ] + else: + r += [ + '--default-log-to-stderr=true', + '--default-log-stderr-prefix=debug ', + ] + if self.identity.daemon_type == 'mon': + r += [ + '--default-mon-cluster-log-to-file=false', + ] + if log_to_journald: + r += [ + '--default-mon-cluster-log-to-journald=true', + '--default-mon-cluster-log-to-stderr=false', + ] + else: + r += ['--default-mon-cluster-log-to-stderr=true'] + return r -# During normal cephadm operations (cephadm ls, gather-facts, etc ) we use: -# stdout: for JSON output only -# stderr: for error, debug, info, etc -logging_config = { - 'version': 1, - 'disable_existing_loggers': True, - 'formatters': { - 'cephadm': { - 'format': '%(asctime)s %(thread)x %(levelname)s %(message)s' - }, - }, - 'handlers': { - 'console': { - 'level': 'INFO', - 'class': 'logging.StreamHandler', - }, - 'log_file': { - 'level': 'DEBUG', - 'class': 'logging.handlers.WatchedFileHandler', - 'formatter': 'cephadm', - 'filename': '%s/cephadm.log' % LOG_DIR, - } - }, - 'loggers': { - '': { - 'level': 'DEBUG', - 'handlers': ['console', 'log_file'], - } - } -} + @staticmethod + def get_ceph_mounts( + ctx: CephadmContext, + ident: DaemonIdentity, + no_config: bool = False, + ) -> Dict[str, str]: + # Warning: This is a hack done for more expedient refactoring + mounts = _get_container_mounts_for_type( + ctx, ident.fsid, ident.daemon_type + ) + data_dir = ident.data_dir(ctx.data_dir) + if ident.daemon_type == 'rgw': + cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % ( + ident.daemon_id + ) + else: + cdata_dir = '/var/lib/ceph/%s/ceph-%s' % ( + ident.daemon_type, + ident.daemon_id, + ) + if ident.daemon_type != 'crash': + mounts[data_dir] = cdata_dir + ':z' + if not no_config: + mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z' + if ident.daemon_type in [ + 'rbd-mirror', + 'cephfs-mirror', + 'crash', + 'ceph-exporter', + ]: + # these do not search for their keyrings in a data directory + mounts[ + data_dir + '/keyring' + ] = '/etc/ceph/ceph.client.%s.%s.keyring' % ( + ident.daemon_type, + ident.daemon_id, + ) + return mounts + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + cm = self.get_ceph_mounts( + ctx, + self.identity, + no_config=self.ctx.config and self.user_supplied_config, + ) + mounts.update(cm) -class ExcludeErrorsFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: - """Only lets through log messages with log level below WARNING .""" - return record.levelno < logging.WARNING + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + ident = self.identity + if ident.daemon_type == 'rgw': + name = 'client.rgw.%s' % ident.daemon_id + elif ident.daemon_type == 'rbd-mirror': + name = 'client.rbd-mirror.%s' % ident.daemon_id + elif ident.daemon_type == 'cephfs-mirror': + name = 'client.cephfs-mirror.%s' % ident.daemon_id + elif ident.daemon_type == 'crash': + name = 'client.crash.%s' % ident.daemon_id + elif ident.daemon_type in ['mon', 'mgr', 'mds', 'osd']: + name = ident.daemon_name + else: + raise ValueError(ident) + args.extend(['-n', name]) + if ident.daemon_type != 'crash': + args.append('-f') + args.extend(self.get_daemon_args()) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728') -# When cephadm is used as standard binary (bootstrap, rm-cluster, etc) we use: -# stdout: for debug and info -# stderr: for errors and warnings -interactive_logging_config = { - 'version': 1, - 'filters': { - 'exclude_errors': { - '()': ExcludeErrorsFilter + def default_entrypoint(self) -> str: + ep = { + 'rgw': '/usr/bin/radosgw', + 'rbd-mirror': '/usr/bin/rbd-mirror', + 'cephfs-mirror': '/usr/bin/cephfs-mirror', } - }, - 'disable_existing_loggers': True, - 'formatters': { - 'cephadm': { - 'format': '%(asctime)s %(thread)x %(levelname)s %(message)s' - }, - }, - 'handlers': { - 'console_stdout': { - 'level': 'INFO', - 'class': 'logging.StreamHandler', - 'filters': ['exclude_errors'], - 'stream': sys.stdout - }, - 'console_stderr': { - 'level': 'WARNING', - 'class': 'logging.StreamHandler', - 'stream': sys.stderr - }, - 'log_file': { - 'level': 'DEBUG', - 'class': 'logging.handlers.WatchedFileHandler', - 'formatter': 'cephadm', - 'filename': '%s/cephadm.log' % LOG_DIR, - } - }, - 'loggers': { - '': { - 'level': 'DEBUG', - 'handlers': ['console_stdout', 'console_stderr', 'log_file'], - } - } -} - - -class termcolor: - yellow = '\033[93m' - red = '\033[31m' - end = '\033[0m' - - -class Error(Exception): - pass - - -class ClusterAlreadyExists(Exception): - pass - - -class TimeoutExpired(Error): - pass - - -class UnauthorizedRegistryError(Error): - pass + daemon_type = self.identity.daemon_type + return ep.get(daemon_type) or f'/usr/bin/ceph-{daemon_type}' ################################## -class Ceph(object): - daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror', - 'crash', 'cephfs-mirror', 'ceph-exporter') - gateways = ('iscsi', 'nfs', 'nvmeof') +@register_daemon_form +class OSD(Ceph): + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + # TODO: figure out a way to un-special-case osd + return daemon_type == 'osd' -################################## + def __init__( + self, + ctx: CephadmContext, + ident: DaemonIdentity, + osd_fsid: Optional[str] = None, + ) -> None: + super().__init__(ctx, ident) + self._osd_fsid = osd_fsid + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'OSD': + osd_fsid = getattr(ctx, 'osd_fsid', None) + if osd_fsid is None: + logger.info( + 'Creating an OSD daemon form without an OSD FSID value' + ) + return cls(ctx, ident, osd_fsid) -class OSD(object): @staticmethod def get_sysctl_settings() -> List[str]: return [ @@ -408,11 +427,19 @@ class OSD(object): 'kernel.pid_max = 4194304', ] + def firewall_service_name(self) -> str: + return 'ceph' + + @property + def osd_fsid(self) -> Optional[str]: + return self._osd_fsid + ################################## -class SNMPGateway: +@register_daemon_form +class SNMPGateway(ContainerDaemonForm): """Defines an SNMP gateway between Prometheus and SNMP monitoring Frameworks""" daemon_type = 'snmp-gateway' SUPPORTED_VERSIONS = ['V2c', 'V3'] @@ -420,6 +447,10 @@ class SNMPGateway: DEFAULT_PORT = 9464 env_filename = 'snmp-gateway.conf' + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, ctx: CephadmContext, fsid: str, @@ -454,6 +485,14 @@ class SNMPGateway: assert cfgs # assert some config data was found return cls(ctx, fsid, daemon_id, cfgs, ctx.image) + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'SNMPGateway': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + @staticmethod def get_version(ctx: CephadmContext, fsid: str, daemon_id: str) -> Optional[str]: """Return the version of the notifier from it's http endpoint""" @@ -485,7 +524,7 @@ class SNMPGateway: @property def port(self) -> int: - endpoints = fetch_tcp_ports(self.ctx) + endpoints = fetch_endpoints(self.ctx) if not endpoints: return self.DEFAULT_PORT return endpoints[0].port @@ -552,9 +591,27 @@ class SNMPGateway: if not self.destination: raise Error('config is missing destination attribute(<ip>:<port>) of the target SNMP listener') + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return self.uid, self.gid + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(f'--env-file={self.conf_file_path}') + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + ################################## -class Monitoring(object): +@register_daemon_form +class Monitoring(ContainerDaemonForm): """Define the configs for the monitoring containers""" port_map = { @@ -637,6 +694,10 @@ class Monitoring(object): }, } # type: ignore + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return daemon_type in cls.components + @staticmethod def get_version(ctx, container_id, daemon_type): # type: (CephadmContext, str, str) -> str @@ -669,57 +730,218 @@ class Monitoring(object): version = out.split(' ')[2] return version + @staticmethod + def extract_uid_gid( + ctx: CephadmContext, daemon_type: str + ) -> Tuple[int, int]: + if daemon_type == 'prometheus': + uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus') + elif daemon_type == 'node-exporter': + uid, gid = 65534, 65534 + elif daemon_type == 'grafana': + uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana') + elif daemon_type == 'loki': + uid, gid = extract_uid_gid(ctx, file_path='/etc/loki') + elif daemon_type == 'promtail': + uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail') + elif daemon_type == 'alertmanager': + uid, gid = extract_uid_gid( + ctx, file_path=['/etc/alertmanager', '/etc/prometheus'] + ) + else: + raise Error('{} not implemented yet'.format(daemon_type)) + return uid, gid + + def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None: + self.ctx = ctx + self._identity = ident + + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Monitoring': + return cls(ctx, ident) + + @property + def identity(self) -> DaemonIdentity: + return self._identity + + def container(self, ctx: CephadmContext) -> CephContainer: + self._prevalidate(ctx) + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return self.extract_uid_gid(ctx, self.identity.daemon_type) + + def _prevalidate(self, ctx: CephadmContext) -> None: + # before being refactored into a ContainerDaemonForm these checks were + # done inside the deploy function. This was the only "family" of daemons + # that performed these checks in that location + daemon_type = self.identity.daemon_type + config = fetch_configs(ctx) # type: ignore + required_files = self.components[daemon_type].get( + 'config-json-files', list() + ) + required_args = self.components[daemon_type].get( + 'config-json-args', list() + ) + if required_files: + if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore + raise Error( + '{} deployment requires config-json which must ' + 'contain file content for {}'.format( + daemon_type.capitalize(), ', '.join(required_files) + ) + ) + if required_args: + if not config or not all(c in config.keys() for c in required_args): # type: ignore + raise Error( + '{} deployment requires config-json which must ' + 'contain arg for {}'.format( + daemon_type.capitalize(), ', '.join(required_args) + ) + ) + + def get_daemon_args(self) -> List[str]: + ctx = self.ctx + daemon_type = self.identity.daemon_type + metadata = self.components[daemon_type] + r = list(metadata.get('args', [])) + # set ip and port to bind to for nodeexporter,alertmanager,prometheus + if daemon_type not in ['grafana', 'loki', 'promtail']: + ip = '' + port = self.port_map[daemon_type][0] + meta = fetch_meta(ctx) + if meta: + if 'ip' in meta and meta['ip']: + ip = meta['ip'] + if 'ports' in meta and meta['ports']: + port = meta['ports'][0] + r += [f'--web.listen-address={ip}:{port}'] + if daemon_type == 'prometheus': + config = fetch_configs(ctx) + retention_time = config.get('retention_time', '15d') + retention_size = config.get('retention_size', '0') # default to disabled + r += [f'--storage.tsdb.retention.time={retention_time}'] + r += [f'--storage.tsdb.retention.size={retention_size}'] + scheme = 'http' + host = get_fqdn() + # in case host is not an fqdn then we use the IP to + # avoid producing a broken web.external-url link + if '.' not in host: + ipv4_addrs, ipv6_addrs = get_ip_addresses(get_hostname()) + # use the first ipv4 (if any) otherwise use the first ipv6 + addr = next(iter(ipv4_addrs or ipv6_addrs), None) + host = wrap_ipv6(addr) if addr else host + r += [f'--web.external-url={scheme}://{host}:{port}'] + if daemon_type == 'alertmanager': + config = fetch_configs(ctx) + peers = config.get('peers', list()) # type: ignore + for peer in peers: + r += ['--cluster.peer={}'.format(peer)] + try: + r += [f'--web.config.file={config["web_config"]}'] + except KeyError: + pass + # some alertmanager, by default, look elsewhere for a config + r += ['--config.file=/etc/alertmanager/alertmanager.yml'] + if daemon_type == 'promtail': + r += ['--config.expand-env'] + if daemon_type == 'prometheus': + config = fetch_configs(ctx) + try: + r += [f'--web.config.file={config["web_config"]}'] + except KeyError: + pass + if daemon_type == 'node-exporter': + config = fetch_configs(ctx) + try: + r += [f'--web.config.file={config["web_config"]}'] + except KeyError: + pass + r += ['--path.procfs=/host/proc', + '--path.sysfs=/host/sys', + '--path.rootfs=/rootfs'] + return r + + def _get_container_mounts(self, data_dir: str) -> Dict[str, str]: + ctx = self.ctx + daemon_type = self.identity.daemon_type + mounts: Dict[str, str] = {} + log_dir = get_log_dir(self.identity.fsid, ctx.log_dir) + if daemon_type == 'prometheus': + mounts[ + os.path.join(data_dir, 'etc/prometheus') + ] = '/etc/prometheus:Z' + mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z' + elif daemon_type == 'loki': + mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z' + mounts[os.path.join(data_dir, 'data')] = '/loki:Z' + elif daemon_type == 'promtail': + mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z' + mounts[log_dir] = '/var/log/ceph:z' + mounts[os.path.join(data_dir, 'data')] = '/promtail:Z' + elif daemon_type == 'node-exporter': + mounts[ + os.path.join(data_dir, 'etc/node-exporter') + ] = '/etc/node-exporter:Z' + mounts['/proc'] = '/host/proc:ro' + mounts['/sys'] = '/host/sys:ro' + mounts['/'] = '/rootfs:ro' + elif daemon_type == 'grafana': + mounts[ + os.path.join(data_dir, 'etc/grafana/grafana.ini') + ] = '/etc/grafana/grafana.ini:Z' + mounts[ + os.path.join(data_dir, 'etc/grafana/provisioning/datasources') + ] = '/etc/grafana/provisioning/datasources:Z' + mounts[ + os.path.join(data_dir, 'etc/grafana/certs') + ] = '/etc/grafana/certs:Z' + mounts[ + os.path.join(data_dir, 'data/grafana.db') + ] = '/var/lib/grafana/grafana.db:Z' + elif daemon_type == 'alertmanager': + mounts[ + os.path.join(data_dir, 'etc/alertmanager') + ] = '/etc/alertmanager:Z' + return mounts + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + uid, _ = self.uid_gid(ctx) + monitoring_args = [ + '--user', + str(uid), + # FIXME: disable cpu/memory limits for the time being (not supported + # by ubuntu 18.04 kernel!) + ] + args.extend(monitoring_args) + if self.identity.daemon_type == 'node-exporter': + # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys', + # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation + # between the node-exporter container and the host to avoid selinux denials + args.extend(['--security-opt', 'label=disable']) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + + def default_entrypoint(self) -> str: + return '' + ################################## -@contextmanager -def write_new( - destination: Union[str, Path], - *, - owner: Optional[Tuple[int, int]] = None, - perms: Optional[int] = DEFAULT_MODE, - encoding: Optional[str] = None, -) -> Generator[IO, None, None]: - """Write a new file in a robust manner, optionally specifying the owner, - permissions, or encoding. This function takes care to never leave a file in - a partially-written state due to a crash or power outage by writing to - temporary file and then renaming that temp file over to the final - destination once all data is written. Note that the temporary files can be - leaked but only for a "crash" or power outage - regular exceptions will - clean up the temporary file. - """ - destination = os.path.abspath(destination) - tempname = f'{destination}.new' - open_kwargs: Dict[str, Any] = {} - if encoding: - open_kwargs['encoding'] = encoding - try: - with open(tempname, 'w', **open_kwargs) as fh: - yield fh - fh.flush() - os.fsync(fh.fileno()) - if owner is not None: - os.fchown(fh.fileno(), *owner) - if perms is not None: - os.fchmod(fh.fileno(), perms) - except Exception: - os.unlink(tempname) - raise - os.rename(tempname, destination) - - -def populate_files(config_dir, config_files, uid, gid): - # type: (str, Dict, int, int) -> None - """create config files for different services""" - for fname in config_files: - config_file = os.path.join(config_dir, fname) - config_content = dict_get_join(config_files, fname) - logger.info('Write file: %s' % (config_file)) - with write_new(config_file, owner=(uid, gid), encoding='utf-8') as f: - f.write(config_content) - - -class NFSGanesha(object): +@register_daemon_form +class NFSGanesha(ContainerDaemonForm): """Defines a NFS-Ganesha container""" daemon_type = 'nfs' @@ -732,6 +954,10 @@ class NFSGanesha(object): 'nfs': 2049, } + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, ctx, fsid, @@ -760,7 +986,15 @@ class NFSGanesha(object): # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) - def get_container_mounts(self, data_dir): + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'NFSGanesha': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + + def _get_container_mounts(self, data_dir): # type: (str) -> Dict[str, str] mounts = dict() mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' @@ -773,6 +1007,12 @@ class NFSGanesha(object): '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user) return mounts + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + @staticmethod def get_container_envs(): # type: () -> List[str] @@ -853,10 +1093,52 @@ class NFSGanesha(object): with write_new(keyring_path, owner=(uid, gid)) as f: f.write(self.rgw.get('keyring', '')) + def firewall_service_name(self) -> str: + return 'nfs' + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def customize_container_endpoints( + self, endpoints: List[EndPoint], deployment_type: DeploymentType + ) -> None: + if deployment_type == DeploymentType.DEFAULT and not endpoints: + nfs_ports = list(NFSGanesha.port_map.values()) + endpoints.extend([EndPoint('0.0.0.0', p) for p in nfs_ports]) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + # TODO: extract ganesha uid/gid (997, 994) ? + return extract_uid_gid(ctx) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.extend(self.get_container_envs()) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + + def default_entrypoint(self) -> str: + return self.entrypoint + ################################## -class CephIscsi(object): +@register_daemon_form +class CephIscsi(ContainerDaemonForm): """Defines a Ceph-Iscsi container""" daemon_type = 'iscsi' @@ -864,6 +1146,10 @@ class CephIscsi(object): required_files = ['iscsi-gateway.cfg'] + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, ctx, fsid, @@ -888,28 +1174,48 @@ class CephIscsi(object): return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephIscsi': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + @staticmethod - def get_container_mounts(data_dir, log_dir): + def _get_container_mounts(data_dir, log_dir): # type: (str, str) -> Dict[str, str] mounts = dict() mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z' mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config' + mounts[os.path.join(data_dir, 'tcmu-runner-entrypoint.sh')] = '/usr/local/scripts/tcmu-runner-entrypoint.sh' mounts[log_dir] = '/var/log:z' mounts['/dev'] = '/dev' return mounts - @staticmethod - def get_container_binds(): - # type: () -> List[List[str]] - binds = [] - lib_modules = ['type=bind', - 'source=/lib/modules', - 'destination=/lib/modules', - 'ro=true'] + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + # Removes ending ".tcmu" from data_dir a tcmu-runner uses the same + # data_dir as rbd-runner-api + if data_dir.endswith('.tcmu'): + data_dir = re.sub(r'\.tcmu$', '', data_dir) + log_dir = get_log_dir(self.identity.fsid, ctx.log_dir) + mounts.update(CephIscsi._get_container_mounts(data_dir, log_dir)) + + def customize_container_binds( + self, ctx: CephadmContext, binds: List[List[str]] + ) -> None: + lib_modules = [ + 'type=bind', + 'source=/lib/modules', + 'destination=/lib/modules', + 'ro=true', + ] binds.append(lib_modules) - return binds @staticmethod def get_version(ctx, container_id): @@ -960,9 +1266,19 @@ class CephIscsi(object): configfs_dir = os.path.join(data_dir, 'configfs') makedirs(configfs_dir, uid, gid, 0o755) + # set up the tcmu-runner entrypoint script + # to be mounted into the container. For more info + # on why we need this script, see the + # tcmu_runner_entrypoint_script function + self.files['tcmu-runner-entrypoint.sh'] = self.tcmu_runner_entrypoint_script() + # populate files from the config-json populate_files(data_dir, self.files, uid, gid) + # we want the tcmu runner entrypoint script to be executable + # populate_files will give it 0o600 by default + os.chmod(os.path.join(data_dir, 'tcmu-runner-entrypoint.sh'), 0o700) + @staticmethod def configfs_mount_umount(data_dir, mount=True): # type: (str, bool) -> List[str] @@ -975,27 +1291,96 @@ class CephIscsi(object): 'umount {0}; fi'.format(mount_path) return cmd.split() + @staticmethod + def tcmu_runner_entrypoint_script() -> str: + # since we are having tcmu-runner be a background + # process in its systemd unit (rbd-target-api being + # the main process) systemd will not restart it when + # it fails. in order to try and get around that for now + # we can have a script mounted in the container that + # that attempts to do the restarting for us. This script + # can then become the entrypoint for the tcmu-runner + # container + + # This is intended to be dropped for a better solution + # for at least the squid release onward + return """#!/bin/bash +RUN_DIR=/var/run/tcmu-runner + +if [ ! -d "${RUN_DIR}" ] ; then + mkdir -p "${RUN_DIR}" +fi + +rm -rf "${RUN_DIR}"/* + +while true +do + touch "${RUN_DIR}"/start-up-$(date -Ins) + /usr/bin/tcmu-runner + + # If we got around 3 kills/segfaults in the last minute, + # don't start anymore + if [ $(find "${RUN_DIR}" -type f -cmin -1 | wc -l) -ge 3 ] ; then + exit 0 + fi + + sleep 1 +done +""" + def get_tcmu_runner_container(self): # type: () -> CephContainer # daemon_id, is used to generated the cid and pid files used by podman but as both tcmu-runner # and rbd-target-api have the same daemon_id, it conflits and prevent the second container from # starting. .tcmu runner is appended to the daemon_id to fix that. - tcmu_container = get_deployment_container(self.ctx, self.fsid, self.daemon_type, str(self.daemon_id) + '.tcmu') - tcmu_container.entrypoint = '/usr/bin/tcmu-runner' + subident = DaemonSubIdentity( + self.fsid, self.daemon_type, self.daemon_id, 'tcmu' + ) + tcmu_container = to_deployment_container( + self.ctx, get_container(self.ctx, subident) + ) + # TODO: Eventually we don't want to run tcmu-runner through this script. + # This is intended to be a workaround backported to older releases + # and should eventually be removed in at least squid onward + tcmu_container.entrypoint = '/usr/local/scripts/tcmu-runner-entrypoint.sh' tcmu_container.cname = self.get_container_name(desc='tcmu') return tcmu_container + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return extract_uid_gid(ctx) + + def default_entrypoint(self) -> str: + return self.entrypoint + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) ################################## -class CephNvmeof(object): +@register_daemon_form +class CephNvmeof(ContainerDaemonForm): """Defines a Ceph-Nvmeof container""" daemon_type = 'nvmeof' required_files = ['ceph-nvmeof.conf'] default_image = DEFAULT_NVMEOF_IMAGE + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, ctx, fsid, @@ -1020,8 +1405,16 @@ class CephNvmeof(object): return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephNvmeof': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + @staticmethod - def get_container_mounts(data_dir: str) -> Dict[str, str]: + def _get_container_mounts(data_dir: str) -> Dict[str, str]: mounts = dict() mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z' mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z' @@ -1031,16 +1424,22 @@ class CephNvmeof(object): mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio' return mounts - @staticmethod - def get_container_binds(): - # type: () -> List[List[str]] - binds = [] - lib_modules = ['type=bind', - 'source=/lib/modules', - 'destination=/lib/modules', - 'ro=true'] + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def customize_container_binds( + self, ctx: CephadmContext, binds: List[List[str]] + ) -> None: + lib_modules = [ + 'type=bind', + 'source=/lib/modules', + 'destination=/lib/modules', + 'ro=true', + ] binds.append(lib_modules) - return binds @staticmethod def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]: @@ -1110,11 +1509,32 @@ class CephNvmeof(object): 'vm.nr_hugepages = 4096', ] + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return 167, 167 # TODO: need to get properly the uid/gid + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + args.extend(['--ulimit', 'memlock=-1:-1']) + args.extend(['--ulimit', 'nofile=10240']) + args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE']) + ################################## -class CephExporter(object): +@register_daemon_form +class CephExporter(ContainerDaemonForm): """Defines a Ceph exporter container""" daemon_type = 'ceph-exporter' @@ -1124,6 +1544,10 @@ class CephExporter(object): 'ceph-exporter': DEFAULT_PORT, } + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str], @@ -1150,11 +1574,13 @@ class CephExporter(object): return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) - @staticmethod - def get_container_mounts() -> Dict[str, str]: - mounts = dict() - mounts['/var/run/ceph'] = '/var/run/ceph:z' - return mounts + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephExporter': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) def get_daemon_args(self) -> List[str]: args = [ @@ -1170,16 +1596,59 @@ class CephExporter(object): if not os.path.isdir(self.sock_dir): raise Error(f'Directory does not exist. Got: {self.sock_dir}') + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return extract_uid_gid(ctx) + + def config_and_keyring( + self, ctx: CephadmContext + ) -> Tuple[Optional[str], Optional[str]]: + return get_config_and_keyring(ctx) + + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + cm = Ceph.get_ceph_mounts(ctx, self.identity) + mounts.update(cm) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + name = 'client.ceph-exporter.%s' % self.identity.daemon_id + args.extend(['-n', name, '-f']) + args.extend(self.get_daemon_args()) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.append(ctx.container_engine.unlimited_pids_option) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728') + + def default_entrypoint(self) -> str: + return self.entrypoint + ################################## -class HAproxy(object): +@register_daemon_form +class HAproxy(ContainerDaemonForm): """Defines an HAproxy container""" daemon_type = 'haproxy' required_files = ['haproxy.cfg'] default_image = DEFAULT_HAPROXY_IMAGE + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str], @@ -1200,6 +1669,14 @@ class HAproxy(object): return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'HAproxy': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: """Create files under the container data dir""" if not os.path.isdir(data_dir): @@ -1241,16 +1718,22 @@ class HAproxy(object): cname = '%s-%s' % (cname, desc) return cname - def extract_uid_gid_haproxy(self) -> Tuple[int, int]: + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: # better directory for this? return extract_uid_gid(self.ctx, file_path='/var/lib') @staticmethod - def get_container_mounts(data_dir: str) -> Dict[str, str]: + def _get_container_mounts(data_dir: str) -> Dict[str, str]: mounts = dict() mounts[os.path.join(data_dir, 'haproxy')] = '/var/lib/haproxy' return mounts + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + @staticmethod def get_sysctl_settings() -> List[str]: return [ @@ -1259,15 +1742,37 @@ class HAproxy(object): 'net.ipv4.ip_nonlocal_bind = 1', ] + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend( + ['--user=root'] + ) # haproxy 2.4 defaults to a different user + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + + ################################## -class Keepalived(object): +@register_daemon_form +class Keepalived(ContainerDaemonForm): """Defines an Keepalived container""" daemon_type = 'keepalived' required_files = ['keepalived.conf'] default_image = DEFAULT_KEEPALIVED_IMAGE + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str], @@ -1288,6 +1793,14 @@ class Keepalived(object): return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image) + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Keepalived': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: """Create files under the container data dir""" if not os.path.isdir(data_dir): @@ -1345,20 +1858,42 @@ class Keepalived(object): 'net.ipv4.ip_nonlocal_bind = 1', ] - def extract_uid_gid_keepalived(self) -> Tuple[int, int]: + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: # better directory for this? return extract_uid_gid(self.ctx, file_path='/var/lib') @staticmethod - def get_container_mounts(data_dir: str) -> Dict[str, str]: + def _get_container_mounts(data_dir: str) -> Dict[str, str]: mounts = dict() mounts[os.path.join(data_dir, 'keepalived.conf')] = '/etc/keepalived/keepalived.conf' return mounts + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.extend(self.get_container_envs()) + + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW']) + + ################################## -class Tracing(object): +@register_daemon_form +class Tracing(ContainerDaemonForm): """Define the configs for the jaeger tracing containers""" components: Dict[str, Dict[str, Any]] = { @@ -1377,6 +1912,10 @@ class Tracing(object): }, } # type: ignore + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return daemon_type in cls.components + @staticmethod def set_configuration(config: Dict[str, str], daemon_type: str) -> None: if daemon_type in ['jaeger-collector', 'jaeger-query']: @@ -1391,13 +1930,75 @@ class Tracing(object): '--processor.jaeger-compact.server-host-port=6799' ] + def __init__(self, ident: DaemonIdentity) -> None: + self._identity = ident + self._configured = False + + def _configure(self, ctx: CephadmContext) -> None: + if self._configured: + return + config = fetch_configs(ctx) + # Currently, this method side-effects the class attribute, and that + # is unpleasant. In the future it would be nice to move all of + # set_configuration into _confiure and only modify each classes data + # independently + self.set_configuration(config, self.identity.daemon_type) + self._configured = True + + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Tracing': + return cls(ident) + + @property + def identity(self) -> DaemonIdentity: + return self._identity + + def container(self, ctx: CephadmContext) -> CephContainer: + ctr = get_container(ctx, self.identity) + return to_deployment_container(ctx, ctr) + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return 65534, 65534 + + def get_daemon_args(self) -> List[str]: + return self.components[self.identity.daemon_type].get( + 'daemon_args', [] + ) + + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + self._configure(ctx) + # earlier code did an explicit check if the daemon type was jaeger-agent + # and would only call get_daemon_args if that was true. However, since + # the function only returns a non-empty list in the case of jaeger-agent + # that check is unnecessary and is not brought over. + args.extend(self.get_daemon_args()) + + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + self._configure(ctx) + envs.extend( + self.components[self.identity.daemon_type].get('envs', []) + ) + + def default_entrypoint(self) -> str: + return '' + + ################################## -class CustomContainer(object): +@register_daemon_form +class CustomContainer(ContainerDaemonForm): """Defines a custom container""" daemon_type = 'container' + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + def __init__(self, fsid: str, daemon_id: Union[int, str], config_json: Dict, image: str) -> None: @@ -1424,6 +2025,14 @@ class CustomContainer(object): return cls(fsid, daemon_id, fetch_configs(ctx), ctx.image) + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CustomContainer': + return cls.init(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None: """ Create dirs/files below the container data directory. @@ -1455,7 +2064,7 @@ class CustomContainer(object): def get_container_envs(self) -> List[str]: return self.envs - def get_container_mounts(self, data_dir: str) -> Dict[str, str]: + def _get_container_mounts(self, data_dir: str) -> Dict[str, str]: """ Get the volume mounts. Relative source paths will be located below `/var/lib/ceph/<cluster-fsid>/<daemon-name>`. @@ -1477,7 +2086,13 @@ class CustomContainer(object): mounts[source] = destination return mounts - def get_container_binds(self, data_dir: str) -> List[List[str]]: + def customize_container_mounts( + self, ctx: CephadmContext, mounts: Dict[str, str] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + mounts.update(self._get_container_mounts(data_dir)) + + def _get_container_binds(self, data_dir: str) -> List[List[str]]: """ Get the bind mounts. Relative `source=...` paths will be located below `/var/lib/ceph/<cluster-fsid>/<daemon-name>`. @@ -1505,60 +2120,71 @@ class CustomContainer(object): data_dir, match.group(1))) return binds -################################## + def customize_container_binds( + self, ctx: CephadmContext, binds: List[List[str]] + ) -> None: + data_dir = self.identity.data_dir(ctx.data_dir) + binds.extend(self._get_container_binds(data_dir)) + # Cache the container so we don't need to rebuild it again when calling + # into init_containers + _container: Optional[CephContainer] = None -def touch(file_path: str, uid: Optional[int] = None, gid: Optional[int] = None) -> None: - Path(file_path).touch() - if uid and gid: - os.chown(file_path, uid, gid) + def container(self, ctx: CephadmContext) -> CephContainer: + if self._container is None: + ctr = get_container( + ctx, + self.identity, + privileged=self.privileged, + ptrace=ctx.allow_ptrace, + ) + self._container = to_deployment_container(ctx, ctr) + return self._container + def init_containers(self, ctx: CephadmContext) -> List[InitContainer]: + primary = self.container(ctx) + init_containers: List[Dict[str, Any]] = getattr( + ctx, 'init_containers', [] + ) + return [ + InitContainer.from_primary_and_opts(ctx, primary, ic_opts) + for ic_opts in init_containers + ] -################################## + def customize_container_endpoints( + self, endpoints: List[EndPoint], deployment_type: DeploymentType + ) -> None: + if deployment_type == DeploymentType.DEFAULT: + endpoints.extend([EndPoint('0.0.0.0', p) for p in self.ports]) + def customize_container_envs( + self, ctx: CephadmContext, envs: List[str] + ) -> None: + envs.extend(self.get_container_envs()) -def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> Any: - """ - Helper function to get a key from a dictionary. - :param d: The dictionary to process. - :param key: The name of the key to get. - :param default: The default value in case the key does not - exist. Default is `None`. - :param require: Set to `True` if the key is required. An - exception will be raised if the key does not exist in - the given dictionary. - :return: Returns the value of the given key. - :raises: :exc:`self.Error` if the given key does not exist - and `require` is set to `True`. - """ - if require and key not in d.keys(): - raise Error('{} missing from dict'.format(key)) - return d.get(key, default) # type: ignore + def customize_container_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_container_args()) -################################## + def customize_process_args( + self, ctx: CephadmContext, args: List[str] + ) -> None: + args.extend(self.get_daemon_args()) + def default_entrypoint(self) -> str: + return self.entrypoint or '' + + def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]: + return self.uid, self.gid -def dict_get_join(d: Dict[str, Any], key: str) -> Any: - """ - Helper function to get the value of a given key from a dictionary. - `List` values will be converted to a string by joining them with a - line break. - :param d: The dictionary to process. - :param key: The name of the key to get. - :return: Returns the value of the given key. If it was a `list`, it - will be joining with a line break. - """ - value = d.get(key) - if isinstance(value, list): - value = '\n'.join(map(str, value)) - return value ################################## def get_supported_daemons(): # type: () -> List[str] - supported_daemons = list(Ceph.daemons) + supported_daemons = ceph_daemons() supported_daemons.extend(Monitoring.components) supported_daemons.append(NFSGanesha.daemon_type) supported_daemons.append(CephIscsi.daemon_type) @@ -1572,536 +2198,15 @@ def get_supported_daemons(): assert len(supported_daemons) == len(set(supported_daemons)) return supported_daemons -################################## - - -class PortOccupiedError(Error): - pass - - -def attempt_bind(ctx, s, address, port): - # type: (CephadmContext, socket.socket, str, int) -> None - try: - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind((address, port)) - except OSError as e: - if e.errno == errno.EADDRINUSE: - msg = 'Cannot bind to IP %s port %d: %s' % (address, port, e) - logger.warning(msg) - raise PortOccupiedError(msg) - else: - raise e - except Exception as e: - raise Error(e) - finally: - s.close() - -def port_in_use(ctx: CephadmContext, endpoint: EndPoint) -> bool: - """Detect whether a port is in use on the local machine - IPv4 and IPv6""" - logger.info('Verifying port %s ...' % str(endpoint)) - - def _port_in_use(af: socket.AddressFamily, address: str) -> bool: - try: - s = socket.socket(af, socket.SOCK_STREAM) - attempt_bind(ctx, s, address, endpoint.port) - except PortOccupiedError: - return True - except OSError as e: - if e.errno in (errno.EAFNOSUPPORT, errno.EADDRNOTAVAIL): - # Ignore EAFNOSUPPORT and EADDRNOTAVAIL as two interfaces are - # being tested here and one might be intentionally be disabled. - # In that case no error should be raised. - return False - else: - raise e - return False - - if endpoint.ip != '0.0.0.0' and endpoint.ip != '::': - if is_ipv6(endpoint.ip): - return _port_in_use(socket.AF_INET6, endpoint.ip) - else: - return _port_in_use(socket.AF_INET, endpoint.ip) - - return any(_port_in_use(af, address) for af, address in ( - (socket.AF_INET, '0.0.0.0'), - (socket.AF_INET6, '::') - )) - - -def check_ip_port(ctx, ep): - # type: (CephadmContext, EndPoint) -> None - if not ctx.skip_ping_check: - logger.info(f'Verifying IP {ep.ip} port {ep.port} ...') - if is_ipv6(ep.ip): - s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - ip = unwrap_ipv6(ep.ip) - else: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ip = ep.ip - attempt_bind(ctx, s, ip, ep.port) +def ceph_daemons() -> List[str]: + cds = list(Ceph._daemons) + cds.append(CephExporter.daemon_type) + return cds ################################## -# this is an abbreviated version of -# https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py -# that drops all of the compatibility (this is Unix/Linux only). - -class Timeout(TimeoutError): - """ - Raised when the lock could not be acquired in *timeout* - seconds. - """ - - def __init__(self, lock_file: str) -> None: - """ - """ - #: The path of the file lock. - self.lock_file = lock_file - return None - - def __str__(self) -> str: - temp = "The file lock '{}' could not be acquired."\ - .format(self.lock_file) - return temp - - -class _Acquire_ReturnProxy(object): - def __init__(self, lock: 'FileLock') -> None: - self.lock = lock - return None - - def __enter__(self) -> 'FileLock': - return self.lock - - def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: - self.lock.release() - return None - - -class FileLock(object): - def __init__(self, ctx: CephadmContext, name: str, timeout: int = -1) -> None: - if not os.path.exists(LOCK_DIR): - os.mkdir(LOCK_DIR, 0o700) - self._lock_file = os.path.join(LOCK_DIR, name + '.lock') - self.ctx = ctx - - # The file descriptor for the *_lock_file* as it is returned by the - # os.open() function. - # This file lock is only NOT None, if the object currently holds the - # lock. - self._lock_file_fd: Optional[int] = None - self.timeout = timeout - # The lock counter is used for implementing the nested locking - # mechanism. Whenever the lock is acquired, the counter is increased and - # the lock is only released, when this value is 0 again. - self._lock_counter = 0 - return None - - @property - def is_locked(self) -> bool: - return self._lock_file_fd is not None - - def acquire(self, timeout: Optional[int] = None, poll_intervall: float = 0.05) -> _Acquire_ReturnProxy: - """ - Acquires the file lock or fails with a :exc:`Timeout` error. - .. code-block:: python - # You can use this method in the context manager (recommended) - with lock.acquire(): - pass - # Or use an equivalent try-finally construct: - lock.acquire() - try: - pass - finally: - lock.release() - :arg float timeout: - The maximum time waited for the file lock. - If ``timeout < 0``, there is no timeout and this method will - block until the lock could be acquired. - If ``timeout`` is None, the default :attr:`~timeout` is used. - :arg float poll_intervall: - We check once in *poll_intervall* seconds if we can acquire the - file lock. - :raises Timeout: - if the lock could not be acquired in *timeout* seconds. - .. versionchanged:: 2.0.0 - This method returns now a *proxy* object instead of *self*, - so that it can be used in a with statement without side effects. - """ - - # Use the default timeout, if no timeout is provided. - if timeout is None: - timeout = self.timeout - - # Increment the number right at the beginning. - # We can still undo it, if something fails. - self._lock_counter += 1 - - lock_id = id(self) - lock_filename = self._lock_file - start_time = time.time() - try: - while True: - if not self.is_locked: - logger.log(QUIET_LOG_LEVEL, 'Acquiring lock %s on %s', lock_id, - lock_filename) - self._acquire() - - if self.is_locked: - logger.log(QUIET_LOG_LEVEL, 'Lock %s acquired on %s', lock_id, - lock_filename) - break - elif timeout >= 0 and time.time() - start_time > timeout: - logger.warning('Timeout acquiring lock %s on %s', lock_id, - lock_filename) - raise Timeout(self._lock_file) - else: - logger.log( - QUIET_LOG_LEVEL, - 'Lock %s not acquired on %s, waiting %s seconds ...', - lock_id, lock_filename, poll_intervall - ) - time.sleep(poll_intervall) - except Exception: - # Something did go wrong, so decrement the counter. - self._lock_counter = max(0, self._lock_counter - 1) - - raise - return _Acquire_ReturnProxy(lock=self) - - def release(self, force: bool = False) -> None: - """ - Releases the file lock. - Please note, that the lock is only completely released, if the lock - counter is 0. - Also note, that the lock file itself is not automatically deleted. - :arg bool force: - If true, the lock counter is ignored and the lock is released in - every case. - """ - if self.is_locked: - self._lock_counter -= 1 - - if self._lock_counter == 0 or force: - # lock_id = id(self) - # lock_filename = self._lock_file - - # Can't log in shutdown: - # File "/usr/lib64/python3.9/logging/__init__.py", line 1175, in _open - # NameError: name 'open' is not defined - # logger.debug('Releasing lock %s on %s', lock_id, lock_filename) - self._release() - self._lock_counter = 0 - # logger.debug('Lock %s released on %s', lock_id, lock_filename) - - return None - - def __enter__(self) -> 'FileLock': - self.acquire() - return self - - def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: - self.release() - return None - - def __del__(self) -> None: - self.release(force=True) - return None - - def _acquire(self) -> None: - open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC - fd = os.open(self._lock_file, open_mode) - - try: - fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) - except (IOError, OSError): - os.close(fd) - else: - self._lock_file_fd = fd - return None - - def _release(self) -> None: - # Do not remove the lockfile: - # - # https://github.com/benediktschmitt/py-filelock/issues/31 - # https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition - fd = self._lock_file_fd - self._lock_file_fd = None - fcntl.flock(fd, fcntl.LOCK_UN) # type: ignore - os.close(fd) # type: ignore - return None - - -################################## -# Popen wrappers, lifted from ceph-volume - -class CallVerbosity(Enum): - ##### - # Format: - # Normal Operation: <log-level-when-no-errors>, Errors: <log-level-when-error> - # - # NOTE: QUIET log level is custom level only used when --verbose is passed - ##### - - # Normal Operation: None, Errors: None - SILENT = 0 - # Normal Operation: QUIET, Error: QUIET - QUIET = 1 - # Normal Operation: DEBUG, Error: DEBUG - DEBUG = 2 - # Normal Operation: QUIET, Error: INFO - QUIET_UNLESS_ERROR = 3 - # Normal Operation: DEBUG, Error: INFO - VERBOSE_ON_FAILURE = 4 - # Normal Operation: INFO, Error: INFO - VERBOSE = 5 - - def success_log_level(self) -> int: - _verbosity_level_to_log_level = { - self.SILENT: 0, - self.QUIET: QUIET_LOG_LEVEL, - self.DEBUG: logging.DEBUG, - self.QUIET_UNLESS_ERROR: QUIET_LOG_LEVEL, - self.VERBOSE_ON_FAILURE: logging.DEBUG, - self.VERBOSE: logging.INFO - } - return _verbosity_level_to_log_level[self] # type: ignore - - def error_log_level(self) -> int: - _verbosity_level_to_log_level = { - self.SILENT: 0, - self.QUIET: QUIET_LOG_LEVEL, - self.DEBUG: logging.DEBUG, - self.QUIET_UNLESS_ERROR: logging.INFO, - self.VERBOSE_ON_FAILURE: logging.INFO, - self.VERBOSE: logging.INFO - } - return _verbosity_level_to_log_level[self] # type: ignore - - -# disable coverage for the next block. this is copy-n-paste -# from other code for compatibilty on older python versions -if sys.version_info < (3, 8): # pragma: no cover - import itertools - import threading - import warnings - from asyncio import events - - class ThreadedChildWatcher(asyncio.AbstractChildWatcher): - """Threaded child watcher implementation. - The watcher uses a thread per process - for waiting for the process finish. - It doesn't require subscription on POSIX signal - but a thread creation is not free. - The watcher has O(1) complexity, its performance doesn't depend - on amount of spawn processes. - """ - - def __init__(self) -> None: - self._pid_counter = itertools.count(0) - self._threads: Dict[Any, Any] = {} - - def is_active(self) -> bool: - return True - - def close(self) -> None: - self._join_threads() - - def _join_threads(self) -> None: - """Internal: Join all non-daemon threads""" - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] - for thread in threads: - thread.join() - - def __enter__(self) -> Any: - return self - - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: - pass - - def __del__(self, _warn: Any = warnings.warn) -> None: - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive()] - if threads: - _warn(f'{self.__class__} has registered but not finished child processes', - ResourceWarning, - source=self) - - def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None: - loop = events.get_event_loop() - thread = threading.Thread(target=self._do_waitpid, - name=f'waitpid-{next(self._pid_counter)}', - args=(loop, pid, callback, args), - daemon=True) - self._threads[pid] = thread - thread.start() - - def remove_child_handler(self, pid: Any) -> bool: - # asyncio never calls remove_child_handler() !!! - # The method is no-op but is implemented because - # abstract base classe requires it - return True - - def attach_loop(self, loop: Any) -> None: - pass - - def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None: - assert expected_pid > 0 - - try: - pid, status = os.waitpid(expected_pid, 0) - except ChildProcessError: - # The child process is already reaped - # (may happen if waitpid() is called elsewhere). - pid = expected_pid - returncode = 255 - logger.warning( - 'Unknown child process pid %d, will report returncode 255', - pid) - else: - if os.WIFEXITED(status): - returncode = os.WEXITSTATUS(status) - elif os.WIFSIGNALED(status): - returncode = -os.WTERMSIG(status) - else: - raise ValueError(f'unknown wait status {status}') - if loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - - if loop.is_closed(): - logger.warning('Loop %r that handles pid %r is closed', loop, pid) - else: - loop.call_soon_threadsafe(callback, pid, returncode, *args) - - self._threads.pop(expected_pid) - - # unlike SafeChildWatcher which handles SIGCHLD in the main thread, - # ThreadedChildWatcher runs in a separated thread, hence allows us to - # run create_subprocess_exec() in non-main thread, see - # https://bugs.python.org/issue35621 - asyncio.set_child_watcher(ThreadedChildWatcher()) - - -try: - from asyncio import run as async_run # type: ignore[attr-defined] -except ImportError: # pragma: no cover - # disable coverage for this block. it should be a copy-n-paste from - # from newer libs for compatibilty on older python versions - def async_run(coro): # type: ignore - loop = asyncio.new_event_loop() - try: - asyncio.set_event_loop(loop) - return loop.run_until_complete(coro) - finally: - try: - loop.run_until_complete(loop.shutdown_asyncgens()) - finally: - asyncio.set_event_loop(None) - loop.close() - - -def call(ctx: CephadmContext, - command: List[str], - desc: Optional[str] = None, - verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, - timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs: Any) -> Tuple[str, str, int]: - """ - Wrap subprocess.Popen to - - - log stdout/stderr to a logger, - - decode utf-8 - - cleanly return out, err, returncode - - :param timeout: timeout in seconds - """ - - prefix = command[0] if desc is None else desc - if prefix: - prefix += ': ' - timeout = timeout or ctx.timeout - - async def run_with_timeout() -> Tuple[str, str, int]: - process = await asyncio.create_subprocess_exec( - *command, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - env=os.environ.copy()) - assert process.stdout - assert process.stderr - try: - stdout, stderr = await asyncio.wait_for( - process.communicate(), - timeout, - ) - except asyncio.TimeoutError: - # try to terminate the process assuming it is still running. It's - # possible that even after killing the process it will not - # complete, particularly if it is D-state. If that happens the - # process.wait call will block, but we're no worse off than before - # when the timeout did not work. Additionally, there are other - # corner-cases we could try and handle here but we decided to start - # simple. - process.kill() - await process.wait() - logger.info(prefix + f'timeout after {timeout} seconds') - return '', '', 124 - else: - assert process.returncode is not None - return ( - stdout.decode('utf-8'), - stderr.decode('utf-8'), - process.returncode, - ) - - stdout, stderr, returncode = async_run(run_with_timeout()) - log_level = verbosity.success_log_level() - if returncode != 0: - log_level = verbosity.error_log_level() - logger.log(log_level, f'Non-zero exit code {returncode} from {" ".join(command)}') - for line in stdout.splitlines(): - logger.log(log_level, prefix + 'stdout ' + line) - for line in stderr.splitlines(): - logger.log(log_level, prefix + 'stderr ' + line) - return stdout, stderr, returncode - - -def call_throws( - ctx: CephadmContext, - command: List[str], - desc: Optional[str] = None, - verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, - timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs: Any) -> Tuple[str, str, int]: - out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs) - if ret: - for s in (out, err): - if s.strip() and len(s.splitlines()) <= 2: # readable message? - raise RuntimeError(f'Failed command: {" ".join(command)}: {s}') - raise RuntimeError('Failed command: %s' % ' '.join(command)) - return out, err, ret - - -def call_timeout(ctx, command, timeout): - # type: (CephadmContext, List[str], int) -> int - logger.debug('Running command (timeout=%s): %s' - % (timeout, ' '.join(command))) - - def raise_timeout(command, timeout): - # type: (List[str], int) -> NoReturn - msg = 'Command `%s` timed out after %s seconds' % (command, timeout) - logger.debug(msg) - raise TimeoutExpired(msg) - - try: - return subprocess.call(command, timeout=timeout, env=os.environ.copy()) - except subprocess.TimeoutExpired: - raise_timeout(command, timeout) - ################################## @@ -2142,114 +2247,6 @@ def is_available(ctx, what, func): time.sleep(2) -def read_config(fn): - # type: (Optional[str]) -> ConfigParser - cp = ConfigParser() - if fn: - cp.read(fn) - return cp - - -def pathify(p): - # type: (str) -> str - p = os.path.expanduser(p) - return os.path.abspath(p) - - -def get_file_timestamp(fn): - # type: (str) -> Optional[str] - try: - mt = os.path.getmtime(fn) - return datetime.datetime.fromtimestamp( - mt, tz=datetime.timezone.utc - ).strftime(DATEFMT) - except Exception: - return None - - -def try_convert_datetime(s): - # type: (str) -> Optional[str] - # This is super irritating because - # 1) podman and docker use different formats - # 2) python's strptime can't parse either one - # - # I've seen: - # docker 18.09.7: 2020-03-03T09:21:43.636153304Z - # podman 1.7.0: 2020-03-03T15:52:30.136257504-06:00 - # 2020-03-03 15:52:30.136257504 -0600 CST - # (In the podman case, there is a different string format for - # 'inspect' and 'inspect --format {{.Created}}'!!) - - # In *all* cases, the 9 digit second precision is too much for - # python's strptime. Shorten it to 6 digits. - p = re.compile(r'(\.[\d]{6})[\d]*') - s = p.sub(r'\1', s) - - # replace trailing Z with -0000, since (on python 3.6.8) it won't parse - if s and s[-1] == 'Z': - s = s[:-1] + '-0000' - - # cut off the redundant 'CST' part that strptime can't parse, if - # present. - v = s.split(' ') - s = ' '.join(v[0:3]) - - # try parsing with several format strings - fmts = [ - '%Y-%m-%dT%H:%M:%S.%f%z', - '%Y-%m-%d %H:%M:%S.%f %z', - ] - for f in fmts: - try: - # return timestamp normalized to UTC, rendered as DATEFMT. - return datetime.datetime.strptime(s, f).astimezone(tz=datetime.timezone.utc).strftime(DATEFMT) - except ValueError: - pass - return None - - -def _parse_podman_version(version_str): - # type: (str) -> Tuple[int, ...] - def to_int(val: str, org_e: Optional[Exception] = None) -> int: - if not val and org_e: - raise org_e - try: - return int(val) - except ValueError as e: - return to_int(val[0:-1], org_e or e) - - return tuple(map(to_int, version_str.split('.'))) - - -def get_hostname(): - # type: () -> str - return socket.gethostname() - - -def get_short_hostname(): - # type: () -> str - return get_hostname().split('.', 1)[0] - - -def get_fqdn(): - # type: () -> str - return socket.getfqdn() or socket.gethostname() - - -def get_ip_addresses(hostname: str) -> Tuple[List[str], List[str]]: - items = socket.getaddrinfo(hostname, None, - flags=socket.AI_CANONNAME, - type=socket.SOCK_STREAM) - ipv4_addresses = [i[4][0] for i in items if i[0] == socket.AF_INET] - ipv6_addresses = [i[4][0] for i in items if i[0] == socket.AF_INET6] - return ipv4_addresses, ipv6_addresses - - -def get_arch(): - # type: () -> str - return platform.uname().machine - - def generate_service_id(): # type: () -> str return get_short_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase) @@ -2279,15 +2276,6 @@ def make_fsid(): return str(uuid.uuid1()) -def is_fsid(s): - # type: (str) -> bool - try: - uuid.UUID(s) - except ValueError: - return False - return True - - def validate_fsid(func: FuncT) -> FuncT: @wraps(func) def _validate_fsid(ctx: CephadmContext) -> Any: @@ -2354,7 +2342,8 @@ def infer_config(func: FuncT) -> FuncT: def _infer_config(ctx: CephadmContext) -> Any: def config_path(daemon_type: str, daemon_name: str) -> str: - data_dir = get_data_dir(ctx.fsid, ctx.data_dir, daemon_type, daemon_name) + ident = DaemonIdentity(ctx.fsid, daemon_type, daemon_name) + data_dir = ident.data_dir(ctx.data_dir) return os.path.join(data_dir, 'config') def get_mon_daemon_name(fsid: str) -> Optional[str]: @@ -2403,7 +2392,7 @@ For information regarding the latest stable release: https://docs.ceph.com/docs/{}/cephadm/install """.format(LATEST_STABLE_RELEASE) for line in warn.splitlines(): - logger.warning('{}{}{}'.format(termcolor.yellow, line, termcolor.end)) + logger.warning(line, extra=Highlight.WARNING.extra()) return DEFAULT_IMAGE @@ -2424,19 +2413,6 @@ def infer_image(func: FuncT) -> FuncT: return cast(FuncT, _infer_image) -def require_image(func: FuncT) -> FuncT: - """ - Require the global --image flag to be set - """ - @wraps(func) - def _require_image(ctx: CephadmContext) -> Any: - if not ctx.image: - raise Error('This command requires the global --image option to be set') - return func(ctx) - - return cast(FuncT, _require_image) - - def default_image(func: FuncT) -> FuncT: @wraps(func) def _default_image(ctx: CephadmContext) -> Any: @@ -2471,27 +2447,6 @@ def update_default_image(ctx: CephadmContext) -> None: ctx.image = _get_default_image(ctx) -def executes_early(func: FuncT) -> FuncT: - """Decorator that indicates the command function is meant to have no - dependencies and no environmental requirements and can therefore be - executed as non-root and with no logging, etc. Commands that have this - decorator applied must be simple and self-contained. - """ - cast(Any, func)._execute_early = True - return func - - -def deprecated_command(func: FuncT) -> FuncT: - @wraps(func) - def _deprecated_command(ctx: CephadmContext) -> Any: - logger.warning(f'Deprecated command used: {func}') - if NO_DEPRECATED: - raise Error('running deprecated commands disabled') - return func(ctx) - - return cast(FuncT, _deprecated_command) - - def get_container_info(ctx: CephadmContext, daemon_filter: str, by_name: bool) -> Optional[ContainerInfo]: """ :param ctx: Cephadm context @@ -2541,7 +2496,7 @@ def infer_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional container_info = None daemon_name = ctx.name if ('name' in ctx and ctx.name and '.' in ctx.name) else None - daemons_ls = [daemon_name] if daemon_name is not None else Ceph.daemons # daemon types: 'mon', 'mgr', etc + daemons_ls = [daemon_name] if daemon_name is not None else ceph_daemons() # daemon types: 'mon', 'mgr', etc for daemon in daemons_ls: container_info = get_container_info(ctx, daemon, daemon_name is not None) if container_info is not None: @@ -2559,32 +2514,6 @@ def infer_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional return None -def write_tmp(s, uid, gid): - # type: (str, int, int) -> IO[str] - tmp_f = tempfile.NamedTemporaryFile(mode='w', - prefix='ceph-tmp') - os.fchown(tmp_f.fileno(), uid, gid) - tmp_f.write(s) - tmp_f.flush() - - return tmp_f - - -def makedirs(dir, uid, gid, mode): - # type: (str, int, int, int) -> None - if not os.path.exists(dir): - os.makedirs(dir, mode=mode) - else: - os.chmod(dir, mode) - os.chown(dir, uid, gid) - os.chmod(dir, mode) # the above is masked by umask... - - -def get_data_dir(fsid, data_dir, t, n): - # type: (str, str, str, Union[int, str]) -> str - return os.path.join(data_dir, fsid, '%s.%s' % (t, n)) - - def get_log_dir(fsid, log_dir): # type: (str, str) -> str return os.path.join(log_dir, fsid) @@ -2600,12 +2529,16 @@ def make_data_dir_base(fsid, data_dir, uid, gid): return data_dir_base -def make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=None, gid=None): - # type: (CephadmContext, str, str, Union[int, str], Optional[int], Optional[int]) -> str +def make_data_dir( + ctx: CephadmContext, + ident: 'DaemonIdentity', + uid: Optional[int] = None, + gid: Optional[int] = None, +) -> str: if uid is None or gid is None: uid, gid = extract_uid_gid(ctx) - make_data_dir_base(fsid, ctx.data_dir, uid, gid) - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) + make_data_dir_base(ident.fsid, ctx.data_dir, uid, gid) + data_dir = ident.data_dir(ctx.data_dir) makedirs(data_dir, uid, gid, DATA_DIR_MODE) return data_dir @@ -2696,90 +2629,14 @@ def move_files(ctx, src, dst, uid=None, gid=None): os.chown(dst_file, uid, gid) -def recursive_chown(path: str, uid: int, gid: int) -> None: - for dirpath, dirnames, filenames in os.walk(path): - os.chown(dirpath, uid, gid) - for filename in filenames: - os.chown(os.path.join(dirpath, filename), uid, gid) - - -# copied from distutils -def find_executable(executable: str, path: Optional[str] = None) -> Optional[str]: - """Tries to find 'executable' in the directories listed in 'path'. - A string listing directories separated by 'os.pathsep'; defaults to - os.environ['PATH']. Returns the complete filename or None if not found. +def get_unit_name( + fsid: str, daemon_type: str, daemon_id: Union[str, int] +) -> str: + """Return the name of the systemd unit given an fsid, a daemon_type, + and the daemon_id. """ - _, ext = os.path.splitext(executable) - if (sys.platform == 'win32') and (ext != '.exe'): - executable = executable + '.exe' # pragma: no cover - - if os.path.isfile(executable): - return executable - - if path is None: - path = os.environ.get('PATH', None) - if path is None: - try: - path = os.confstr('CS_PATH') - except (AttributeError, ValueError): - # os.confstr() or CS_PATH is not available - path = os.defpath - # bpo-35755: Don't use os.defpath if the PATH environment variable is - # set to an empty string - - # PATH='' doesn't match, whereas PATH=':' looks in the current directory - if not path: - return None - - paths = path.split(os.pathsep) - for p in paths: - f = os.path.join(p, executable) - if os.path.isfile(f): - # the file exists, we have a shot at spawn working - return f - return None - - -def find_program(filename): - # type: (str) -> str - name = find_executable(filename) - if name is None: - raise ValueError('%s not found' % filename) - return name - - -def find_container_engine(ctx: CephadmContext) -> Optional[ContainerEngine]: - if ctx.docker: - return Docker() - else: - for i in CONTAINER_PREFERENCE: - try: - return i() - except Exception: - pass - return None - - -def check_container_engine(ctx: CephadmContext) -> ContainerEngine: - engine = ctx.container_engine - if not isinstance(engine, CONTAINER_PREFERENCE): - # See https://github.com/python/mypy/issues/8993 - exes: List[str] = [i.EXE for i in CONTAINER_PREFERENCE] # type: ignore - raise Error('No container engine binary found ({}). Try run `apt/dnf/yum/zypper install <container engine>`'.format(' or '.join(exes))) - elif isinstance(engine, Podman): - engine.get_version(ctx) - if engine.version < MIN_PODMAN_VERSION: - raise Error('podman version %d.%d.%d or later is required' % MIN_PODMAN_VERSION) - return engine - - -def get_unit_name(fsid, daemon_type, daemon_id=None): - # type: (str, str, Optional[Union[int, str]]) -> str - # accept either name or type + id - if daemon_id is not None: - return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id) - else: - return 'ceph-%s@%s' % (fsid, daemon_type) + # TODO: fully replace get_unit_name with DaemonIdentity instances + return DaemonIdentity(fsid, daemon_type, daemon_id).unit_name def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid: str, name: str) -> str: @@ -2790,90 +2647,6 @@ def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid: str, name: str) -> s raise Error('Failed to get unit name for {}'.format(daemon)) -def check_unit(ctx, unit_name): - # type: (CephadmContext, str) -> Tuple[bool, str, bool] - # NOTE: we ignore the exit code here because systemctl outputs - # various exit codes based on the state of the service, but the - # string result is more explicit (and sufficient). - enabled = False - installed = False - try: - out, err, code = call(ctx, ['systemctl', 'is-enabled', unit_name], - verbosity=CallVerbosity.QUIET) - if code == 0: - enabled = True - installed = True - elif 'disabled' in out: - installed = True - except Exception as e: - logger.warning('unable to run systemctl: %s' % e) - enabled = False - installed = False - - state = 'unknown' - try: - out, err, code = call(ctx, ['systemctl', 'is-active', unit_name], - verbosity=CallVerbosity.QUIET) - out = out.strip() - if out in ['active']: - state = 'running' - elif out in ['inactive']: - state = 'stopped' - elif out in ['failed', 'auto-restart']: - state = 'error' - else: - state = 'unknown' - except Exception as e: - logger.warning('unable to run systemctl: %s' % e) - state = 'unknown' - return (enabled, state, installed) - - -def check_units(ctx, units, enabler=None): - # type: (CephadmContext, List[str], Optional[Packager]) -> bool - for u in units: - (enabled, state, installed) = check_unit(ctx, u) - if enabled and state == 'running': - logger.info('Unit %s is enabled and running' % u) - return True - if enabler is not None: - if installed: - logger.info('Enabling unit %s' % u) - enabler.enable_service(u) - return False - - -def is_container_running(ctx: CephadmContext, c: 'CephContainer') -> bool: - if ctx.name.split('.', 1)[0] in ['agent', 'cephadm-exporter']: - # these are non-containerized daemon types - return False - return bool(get_running_container_name(ctx, c)) - - -def get_running_container_name(ctx: CephadmContext, c: 'CephContainer') -> Optional[str]: - for name in [c.cname, c.old_cname]: - out, err, ret = call(ctx, [ - ctx.container_engine.path, 'container', 'inspect', - '--format', '{{.State.Status}}', name - ]) - if out.strip() == 'running': - return name - return None - - -def get_legacy_config_fsid(cluster, legacy_dir=None): - # type: (str, Optional[str]) -> Optional[str] - config_file = '/etc/ceph/%s.conf' % cluster - if legacy_dir is not None: - config_file = os.path.abspath(legacy_dir + config_file) - - if os.path.exists(config_file): - config = read_config(config_file) - if config.has_section('global') and config.has_option('global', 'fsid'): - return config.get('global', 'fsid') - return None - - def get_legacy_daemon_fsid(ctx, cluster, daemon_type, daemon_id, legacy_dir=None): # type: (CephadmContext, str, str, Union[int, str], Optional[str]) -> Optional[str] @@ -2895,130 +2668,19 @@ def get_legacy_daemon_fsid(ctx, cluster, return fsid -def should_log_to_journald(ctx: CephadmContext) -> bool: - if ctx.log_to_journald is not None: - return ctx.log_to_journald - return isinstance(ctx.container_engine, Podman) and \ - ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION - - -def get_daemon_args(ctx, fsid, daemon_type, daemon_id): - # type: (CephadmContext, str, str, Union[int, str]) -> List[str] - r = list() # type: List[str] - - if daemon_type in Ceph.daemons and daemon_type not in ['crash', 'ceph-exporter']: - r += [ - '--setuser', 'ceph', - '--setgroup', 'ceph', - '--default-log-to-file=false', - ] - log_to_journald = should_log_to_journald(ctx) - if log_to_journald: - r += [ - '--default-log-to-journald=true', - '--default-log-to-stderr=false', - ] - else: - r += [ - '--default-log-to-stderr=true', - '--default-log-stderr-prefix=debug ', - ] - if daemon_type == 'mon': - r += [ - '--default-mon-cluster-log-to-file=false', - ] - if log_to_journald: - r += [ - '--default-mon-cluster-log-to-journald=true', - '--default-mon-cluster-log-to-stderr=false', - ] - else: - r += ['--default-mon-cluster-log-to-stderr=true'] - elif daemon_type in Monitoring.components: - metadata = Monitoring.components[daemon_type] - r += metadata.get('args', list()) - # set ip and port to bind to for nodeexporter,alertmanager,prometheus - if daemon_type not in ['grafana', 'loki', 'promtail']: - ip = '' - port = Monitoring.port_map[daemon_type][0] - meta = fetch_meta(ctx) - if meta: - if 'ip' in meta and meta['ip']: - ip = meta['ip'] - if 'ports' in meta and meta['ports']: - port = meta['ports'][0] - r += [f'--web.listen-address={ip}:{port}'] - if daemon_type == 'prometheus': - config = fetch_configs(ctx) - retention_time = config.get('retention_time', '15d') - retention_size = config.get('retention_size', '0') # default to disabled - r += [f'--storage.tsdb.retention.time={retention_time}'] - r += [f'--storage.tsdb.retention.size={retention_size}'] - scheme = 'http' - host = get_fqdn() - # in case host is not an fqdn then we use the IP to - # avoid producing a broken web.external-url link - if '.' not in host: - ipv4_addrs, ipv6_addrs = get_ip_addresses(get_hostname()) - # use the first ipv4 (if any) otherwise use the first ipv6 - addr = next(iter(ipv4_addrs or ipv6_addrs), None) - host = wrap_ipv6(addr) if addr else host - r += [f'--web.external-url={scheme}://{host}:{port}'] - if daemon_type == 'alertmanager': - config = fetch_configs(ctx) - peers = config.get('peers', list()) # type: ignore - for peer in peers: - r += ['--cluster.peer={}'.format(peer)] - try: - r += [f'--web.config.file={config["web_config"]}'] - except KeyError: - pass - # some alertmanager, by default, look elsewhere for a config - r += ['--config.file=/etc/alertmanager/alertmanager.yml'] - if daemon_type == 'promtail': - r += ['--config.expand-env'] - if daemon_type == 'prometheus': - config = fetch_configs(ctx) - try: - r += [f'--web.config.file={config["web_config"]}'] - except KeyError: - pass - if daemon_type == 'node-exporter': - config = fetch_configs(ctx) - try: - r += [f'--web.config.file={config["web_config"]}'] - except KeyError: - pass - r += ['--path.procfs=/host/proc', - '--path.sysfs=/host/sys', - '--path.rootfs=/rootfs'] - elif daemon_type == 'jaeger-agent': - r.extend(Tracing.components[daemon_type]['daemon_args']) - elif daemon_type == NFSGanesha.daemon_type: - nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) - r += nfs_ganesha.get_daemon_args() - elif daemon_type == CephExporter.daemon_type: - ceph_exporter = CephExporter.init(ctx, fsid, daemon_id) - r.extend(ceph_exporter.get_daemon_args()) - elif daemon_type == HAproxy.daemon_type: - haproxy = HAproxy.init(ctx, fsid, daemon_id) - r += haproxy.get_daemon_args() - elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(ctx, fsid, daemon_id) - r.extend(cc.get_daemon_args()) - elif daemon_type == SNMPGateway.daemon_type: - sc = SNMPGateway.init(ctx, fsid, daemon_id) - r.extend(sc.get_daemon_args()) - - return r - - -def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid, - config=None, keyring=None): - # type: (CephadmContext, str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None - data_dir = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid) +def create_daemon_dirs( + ctx: CephadmContext, + ident: 'DaemonIdentity', + uid: int, + gid: int, + config: Optional[str] = None, + keyring: Optional[str] = None, +) -> None: + # unpack fsid and daemon_type from ident because they're used very frequently + fsid, daemon_type = ident.fsid, ident.daemon_type + data_dir = make_data_dir(ctx, ident, uid=uid, gid=gid) - if daemon_type in Ceph.daemons: + if daemon_type in ceph_daemons(): make_log_dir(ctx, fsid, uid=uid, gid=gid) if config: @@ -3038,8 +2700,7 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid, config_dir = '' data_dir_root = '' if daemon_type == 'prometheus': - data_dir_root = get_data_dir(fsid, ctx.data_dir, - daemon_type, daemon_id) + data_dir_root = ident.data_dir(ctx.data_dir) config_dir = 'etc/prometheus' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, config_dir, 'alerting'), uid, gid, 0o755) @@ -3047,8 +2708,7 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid, recursive_chown(os.path.join(data_dir_root, 'etc'), uid, gid) recursive_chown(os.path.join(data_dir_root, 'data'), uid, gid) elif daemon_type == 'grafana': - data_dir_root = get_data_dir(fsid, ctx.data_dir, - daemon_type, daemon_id) + data_dir_root = ident.data_dir(ctx.data_dir) config_dir = 'etc/grafana' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, config_dir, 'certs'), uid, gid, 0o755) @@ -3056,26 +2716,22 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid, makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755) touch(os.path.join(data_dir_root, 'data', 'grafana.db'), uid, gid) elif daemon_type == 'alertmanager': - data_dir_root = get_data_dir(fsid, ctx.data_dir, - daemon_type, daemon_id) + data_dir_root = ident.data_dir(ctx.data_dir) config_dir = 'etc/alertmanager' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755) elif daemon_type == 'promtail': - data_dir_root = get_data_dir(fsid, ctx.data_dir, - daemon_type, daemon_id) + data_dir_root = ident.data_dir(ctx.data_dir) config_dir = 'etc/promtail' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755) elif daemon_type == 'loki': - data_dir_root = get_data_dir(fsid, ctx.data_dir, - daemon_type, daemon_id) + data_dir_root = ident.data_dir(ctx.data_dir) config_dir = 'etc/loki' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755) elif daemon_type == 'node-exporter': - data_dir_root = get_data_dir(fsid, ctx.data_dir, - daemon_type, daemon_id) + data_dir_root = ident.data_dir(ctx.data_dir) config_dir = 'etc/node-exporter' makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755) recursive_chown(os.path.join(data_dir_root, 'etc'), uid, gid) @@ -3095,42 +2751,49 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid, f.write(content) elif daemon_type == NFSGanesha.daemon_type: - nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) + nfs_ganesha = NFSGanesha.init(ctx, fsid, ident.daemon_id) nfs_ganesha.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == CephIscsi.daemon_type: - ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) + ceph_iscsi = CephIscsi.init(ctx, fsid, ident.daemon_id) ceph_iscsi.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == CephNvmeof.daemon_type: - ceph_nvmeof = CephNvmeof.init(ctx, fsid, daemon_id) + ceph_nvmeof = CephNvmeof.init(ctx, fsid, ident.daemon_id) ceph_nvmeof.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == HAproxy.daemon_type: - haproxy = HAproxy.init(ctx, fsid, daemon_id) + haproxy = HAproxy.init(ctx, fsid, ident.daemon_id) haproxy.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == Keepalived.daemon_type: - keepalived = Keepalived.init(ctx, fsid, daemon_id) + keepalived = Keepalived.init(ctx, fsid, ident.daemon_id) keepalived.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(ctx, fsid, daemon_id) + cc = CustomContainer.init(ctx, fsid, ident.daemon_id) cc.create_daemon_dirs(data_dir, uid, gid) elif daemon_type == SNMPGateway.daemon_type: - sg = SNMPGateway.init(ctx, fsid, daemon_id) + sg = SNMPGateway.init(ctx, fsid, ident.daemon_id) sg.create_daemon_conf() - _write_custom_conf_files(ctx, daemon_type, str(daemon_id), fsid, uid, gid) + _write_custom_conf_files(ctx, ident, uid, gid) -def _write_custom_conf_files(ctx: CephadmContext, daemon_type: str, daemon_id: str, fsid: str, uid: int, gid: int) -> None: +def _write_custom_conf_files( + ctx: CephadmContext, ident: 'DaemonIdentity', uid: int, gid: int +) -> None: # mostly making this its own function to make unit testing easier ccfiles = fetch_custom_config_files(ctx) if not ccfiles: return - custom_config_dir = os.path.join(ctx.data_dir, fsid, 'custom_config_files', f'{daemon_type}.{daemon_id}') + custom_config_dir = os.path.join( + ctx.data_dir, + ident.fsid, + 'custom_config_files', + f'{ident.daemon_type}.{ident.daemon_id}', + ) if not os.path.exists(custom_config_dir): makedirs(custom_config_dir, uid, gid, 0o755) mandatory_keys = ['mount_path', 'content'] @@ -3139,183 +2802,48 @@ def _write_custom_conf_files(ctx: CephadmContext, daemon_type: str, daemon_id: s file_path = os.path.join(custom_config_dir, os.path.basename(ccf['mount_path'])) with write_new(file_path, owner=(uid, gid), encoding='utf-8') as f: f.write(ccf['content']) + # temporary workaround to make custom config files work for tcmu-runner + # container we deploy with iscsi until iscsi is refactored + if ident.daemon_type == 'iscsi': + tcmu_config_dir = custom_config_dir + '.tcmu' + if not os.path.exists(tcmu_config_dir): + makedirs(tcmu_config_dir, uid, gid, 0o755) + tcmu_file_path = os.path.join(tcmu_config_dir, os.path.basename(ccf['mount_path'])) + with write_new(tcmu_file_path, owner=(uid, gid), encoding='utf-8') as f: + f.write(ccf['content']) + + +def get_container_binds( + ctx: CephadmContext, ident: 'DaemonIdentity' +) -> List[List[str]]: + binds: List[List[str]] = list() + daemon = daemon_form_create(ctx, ident) + assert isinstance(daemon, ContainerDaemonForm) + daemon.customize_container_binds(ctx, binds) + return binds -def get_parm(option: str) -> Dict[str, str]: - js = _get_config_json(option) - # custom_config_files is a special field that may be in the config - # dict. It is used for mounting custom config files into daemon's containers - # and should be accessed through the "fetch_custom_config_files" function. - # For get_parm we need to discard it. - js.pop('custom_config_files', None) - return js - - -def _get_config_json(option: str) -> Dict[str, Any]: - if not option: - return dict() - - global cached_stdin - if option == '-': - if cached_stdin is not None: - j = cached_stdin - else: - j = sys.stdin.read() - cached_stdin = j - else: - # inline json string - if option[0] == '{' and option[-1] == '}': - j = option - # json file - elif os.path.exists(option): - with open(option, 'r') as f: - j = f.read() - else: - raise Error('Config file {} not found'.format(option)) - - try: - js = json.loads(j) - except ValueError as e: - raise Error('Invalid JSON in {}: {}'.format(option, e)) - else: - return js - - -def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]: - """Return a dict containing metadata about a deployment. - """ - meta = getattr(ctx, 'meta_properties', None) - if meta is not None: - return meta - mjson = getattr(ctx, 'meta_json', None) - if mjson is not None: - meta = json.loads(mjson) or {} - ctx.meta_properties = meta - return meta - return {} - - -def fetch_configs(ctx: CephadmContext) -> Dict[str, str]: - """Return a dict containing arbitrary configuration parameters. - This function filters out the key 'custom_config_files' which - must not be part of a deployment's configuration key-value pairs. - To access custom configuration file data, use `fetch_custom_config_files`. - """ - # ctx.config_blobs is *always* a dict. it is created once when - # a command is parsed/processed and stored "forever" - cfg_blobs = getattr(ctx, 'config_blobs', None) - if cfg_blobs: - cfg_blobs = dict(cfg_blobs) - cfg_blobs.pop('custom_config_files', None) - return cfg_blobs - # ctx.config_json is the legacy equivalent of config_blobs. it is a - # string that either contains json or refers to a file name where - # the file contains json. - cfg_json = getattr(ctx, 'config_json', None) - if cfg_json: - jdata = _get_config_json(cfg_json) or {} - jdata.pop('custom_config_files', None) - return jdata - return {} - - -def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]: - """Return a list containing dicts that can be used to populate - custom configuration files for containers. - """ - # NOTE: this function works like the opposite of fetch_configs. - # instead of filtering out custom_config_files, it returns only - # the content in that key. - cfg_blobs = getattr(ctx, 'config_blobs', None) - if cfg_blobs: - return cfg_blobs.get('custom_config_files', []) - cfg_json = getattr(ctx, 'config_json', None) - if cfg_json: - jdata = _get_config_json(cfg_json) - return jdata.get('custom_config_files', []) - return [] - - -def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]: - """Return a list of Endpoints, which have a port and ip attribute +def get_container_mounts_for_type( + ctx: CephadmContext, fsid: str, daemon_type: str +) -> Dict[str, str]: + """Return a dictionary mapping container-external paths to container-internal + paths given an fsid and daemon_type. """ - ports = getattr(ctx, 'tcp_ports', None) - if ports is None: - ports = [] - if isinstance(ports, str): - ports = list(map(int, ports.split())) - port_ips: Dict[str, str] = {} - port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None) - if isinstance(port_ips_attr, str): - port_ips = json.loads(port_ips_attr) - elif port_ips_attr is not None: - # if it's not None or a str, assume it's already the dict we want - port_ips = port_ips_attr - - endpoints: List[EndPoint] = [] - for port in ports: - if str(port) in port_ips: - endpoints.append(EndPoint(port_ips[str(port)], port)) - else: - endpoints.append(EndPoint('0.0.0.0', port)) - - return endpoints - - -def get_config_and_keyring(ctx): - # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]] - config = None - keyring = None - - d = fetch_configs(ctx) - if d: - config = d.get('config') - keyring = d.get('keyring') - if config and keyring: - return config, keyring - - if 'config' in ctx and ctx.config: - try: - with open(ctx.config, 'r') as f: - config = f.read() - except FileNotFoundError as e: - raise Error(e) - - if 'key' in ctx and ctx.key: - keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key) - elif 'keyring' in ctx and ctx.keyring: - try: - with open(ctx.keyring, 'r') as f: - keyring = f.read() - except FileNotFoundError as e: - raise Error(e) - - return config, keyring - - -def get_container_binds(ctx, fsid, daemon_type, daemon_id): - # type: (CephadmContext, str, str, Union[int, str, None]) -> List[List[str]] - binds = list() - - if daemon_type == CephIscsi.daemon_type: - binds.extend(CephIscsi.get_container_binds()) - if daemon_type == CephNvmeof.daemon_type: - binds.extend(CephNvmeof.get_container_binds()) - elif daemon_type == CustomContainer.daemon_type: - assert daemon_id - cc = CustomContainer.init(ctx, fsid, daemon_id) - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - binds.extend(cc.get_container_binds(data_dir)) - - return binds + mounts = _get_container_mounts_for_type(ctx, fsid, daemon_type) + _update_podman_mounts(ctx, mounts) + return mounts -def get_container_mounts(ctx, fsid, daemon_type, daemon_id, - no_config=False): - # type: (CephadmContext, str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str] +def _get_container_mounts_for_type( + ctx: CephadmContext, fsid: str, daemon_type: str +) -> Dict[str, str]: + """The main implementation of get_container_mounts_for_type minus the call + to _update_podman_mounts so that this can be called from + get_container_mounts. + """ mounts = dict() - if daemon_type in Ceph.daemons: + if daemon_type in ceph_daemons(): if fsid: run_path = os.path.join('/var/run/ceph', fsid) if os.path.exists(run_path): @@ -3329,20 +2857,6 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, journald_sock_dir = '/run/systemd/journal' mounts[journald_sock_dir] = journald_sock_dir - if daemon_type in Ceph.daemons and daemon_id: - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - if daemon_type == 'rgw': - cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % (daemon_id) - else: - cdata_dir = '/var/lib/ceph/%s/ceph-%s' % (daemon_type, daemon_id) - if daemon_type != 'crash': - mounts[data_dir] = cdata_dir + ':z' - if not no_config: - mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z' - if daemon_type in ['rbd-mirror', 'cephfs-mirror', 'crash', 'ceph-exporter']: - # these do not search for their keyrings in a data directory - mounts[data_dir + '/keyring'] = '/etc/ceph/ceph.client.%s.%s.keyring' % (daemon_type, daemon_id) - if daemon_type in ['mon', 'osd', 'clusterless-ceph-volume']: mounts['/dev'] = '/dev' # FIXME: narrow this down? mounts['/run/udev'] = '/run/udev' @@ -3367,97 +2881,56 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, if ctx.shared_ceph_folder: # make easy manager modules/ceph-volume development ceph_folder = pathify(ctx.shared_ceph_folder) if os.path.exists(ceph_folder): + cephadm_binary = ceph_folder + '/src/cephadm/cephadm' + if not os.path.exists(pathify(cephadm_binary)): + raise Error("cephadm binary does not exist. Please run './build.sh cephadm' from ceph/src/cephadm/ directory.") + mounts[cephadm_binary] = '/usr/sbin/cephadm' mounts[ceph_folder + '/src/ceph-volume/ceph_volume'] = '/usr/lib/python3.6/site-packages/ceph_volume' - mounts[ceph_folder + '/src/cephadm/cephadm.py'] = '/usr/sbin/cephadm' mounts[ceph_folder + '/src/pybind/mgr'] = '/usr/share/ceph/mgr' mounts[ceph_folder + '/src/python-common/ceph'] = '/usr/lib/python3.6/site-packages/ceph' mounts[ceph_folder + '/monitoring/ceph-mixin/dashboards_out'] = '/etc/grafana/dashboards/ceph-dashboard' mounts[ceph_folder + '/monitoring/ceph-mixin/prometheus_alerts.yml'] = '/etc/prometheus/ceph/ceph_default_alerts.yml' else: - logger.error('{}{}{}'.format(termcolor.red, - 'Ceph shared source folder does not exist.', - termcolor.end)) + logger.error( + 'Ceph shared source folder does not exist.', + extra=Highlight.FAILURE.extra()) except AttributeError: pass + return mounts - if daemon_type in Monitoring.components and daemon_id: - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - log_dir = get_log_dir(fsid, ctx.log_dir) - if daemon_type == 'prometheus': - mounts[os.path.join(data_dir, 'etc/prometheus')] = '/etc/prometheus:Z' - mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z' - elif daemon_type == 'loki': - mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z' - mounts[os.path.join(data_dir, 'data')] = '/loki:Z' - elif daemon_type == 'promtail': - mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z' - mounts[log_dir] = '/var/log/ceph:z' - mounts[os.path.join(data_dir, 'data')] = '/promtail:Z' - elif daemon_type == 'node-exporter': - mounts[os.path.join(data_dir, 'etc/node-exporter')] = '/etc/node-exporter:Z' - mounts['/proc'] = '/host/proc:ro' - mounts['/sys'] = '/host/sys:ro' - mounts['/'] = '/rootfs:ro' - elif daemon_type == 'grafana': - mounts[os.path.join(data_dir, 'etc/grafana/grafana.ini')] = '/etc/grafana/grafana.ini:Z' - mounts[os.path.join(data_dir, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z' - mounts[os.path.join(data_dir, 'etc/grafana/certs')] = '/etc/grafana/certs:Z' - mounts[os.path.join(data_dir, 'data/grafana.db')] = '/var/lib/grafana/grafana.db:Z' - elif daemon_type == 'alertmanager': - mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/etc/alertmanager:Z' - - if daemon_type == NFSGanesha.daemon_type: - assert daemon_id - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) - mounts.update(nfs_ganesha.get_container_mounts(data_dir)) - - if daemon_type == HAproxy.daemon_type: - assert daemon_id - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - mounts.update(HAproxy.get_container_mounts(data_dir)) - - if daemon_type == CephNvmeof.daemon_type: - assert daemon_id - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - mounts.update(CephNvmeof.get_container_mounts(data_dir)) - - if daemon_type == CephIscsi.daemon_type: - assert daemon_id - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - # Removes ending ".tcmu" from data_dir a tcmu-runner uses the same data_dir - # as rbd-runner-api - if data_dir.endswith('.tcmu'): - data_dir = re.sub(r'\.tcmu$', '', data_dir) - log_dir = get_log_dir(fsid, ctx.log_dir) - mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir)) - - if daemon_type == Keepalived.daemon_type: - assert daemon_id - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - mounts.update(Keepalived.get_container_mounts(data_dir)) - - if daemon_type == CustomContainer.daemon_type: - assert daemon_id - cc = CustomContainer.init(ctx, fsid, daemon_id) - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - mounts.update(cc.get_container_mounts(data_dir)) - - # Modifications podman makes to /etc/hosts causes issues with - # certain daemons (specifically referencing "host.containers.internal" entry - # being added to /etc/hosts in this case). To avoid that, but still - # allow users to use /etc/hosts for hostname resolution, we can - # mount the host's /etc/hosts file. - # https://tracker.ceph.com/issues/58532 - # https://tracker.ceph.com/issues/57018 - if isinstance(ctx.container_engine, Podman): - if os.path.exists('/etc/hosts'): - if '/etc/hosts' not in mounts: - mounts['/etc/hosts'] = '/etc/hosts:ro' +def get_container_mounts( + ctx: CephadmContext, ident: 'DaemonIdentity', no_config: bool = False +) -> Dict[str, str]: + """Return a dictionary mapping container-external paths to container-internal + paths given a daemon identity. + Setting `no_config` will skip mapping a daemon specific ceph.conf file. + """ + # unpack daemon_type from ident because they're used very frequently + daemon_type = ident.daemon_type + mounts: Dict[str, str] = {} + + assert ident.fsid + assert ident.daemon_id + # Ceph daemon types are special cased here beacause of the no_config + # option which JJM thinks is *only* used by cephadm shell + if daemon_type in ceph_daemons(): + mounts = Ceph.get_ceph_mounts(ctx, ident, no_config=no_config) + else: + daemon = daemon_form_create(ctx, ident) + assert isinstance(daemon, ContainerDaemonForm) + daemon.customize_container_mounts(ctx, mounts) + + _update_podman_mounts(ctx, mounts) return mounts +def _update_podman_mounts(ctx: CephadmContext, mounts: Dict[str, str]) -> None: + """Update the given mounts dict with mounts specific to podman.""" + if isinstance(ctx.container_engine, Podman): + ctx.container_engine.update_mounts(ctx, mounts) + + def get_ceph_volume_container(ctx: CephadmContext, privileged: bool = True, cname: str = '', @@ -3485,151 +2958,106 @@ def get_ceph_volume_container(ctx: CephadmContext, ) -def set_pids_limit_unlimited(ctx: CephadmContext, container_args: List[str]) -> None: - # set container's pids-limit to unlimited rather than default (Docker 4096 / Podman 2048) - # Useful for daemons like iscsi where the default pids-limit limits the number of luns - # per iscsi target or rgw where increasing the rgw_thread_pool_size to a value near - # the default pids-limit may cause the container to crash. - if ( - isinstance(ctx.container_engine, Podman) - and ctx.container_engine.version >= PIDS_LIMIT_UNLIMITED_PODMAN_VERSION - ): - container_args.append('--pids-limit=-1') - else: - container_args.append('--pids-limit=0') - - -def get_container(ctx: CephadmContext, - fsid: str, daemon_type: str, daemon_id: Union[int, str], - privileged: bool = False, - ptrace: bool = False, - container_args: Optional[List[str]] = None) -> 'CephContainer': +def get_container( + ctx: CephadmContext, + ident: 'DaemonIdentity', + privileged: bool = False, + ptrace: bool = False, + container_args: Optional[List[str]] = None, +) -> 'CephContainer': entrypoint: str = '' - name: str = '' - ceph_args: List[str] = [] + d_args: List[str] = [] envs: List[str] = [] host_network: bool = True + binds: List[List[str]] = [] + mounts: Dict[str, str] = {} - if daemon_type in Ceph.daemons: - envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728') + daemon_type = ident.daemon_type if container_args is None: container_args = [] - if daemon_type in Ceph.daemons or daemon_type in Ceph.gateways: - set_pids_limit_unlimited(ctx, container_args) + if Ceph.for_daemon_type(daemon_type) or OSD.for_daemon_type(daemon_type): + ceph_daemon = daemon_form_create(ctx, ident) + assert isinstance(ceph_daemon, ContainerDaemonForm) + entrypoint = ceph_daemon.default_entrypoint() + ceph_daemon.customize_container_envs(ctx, envs) + ceph_daemon.customize_container_args(ctx, container_args) + ceph_daemon.customize_process_args(ctx, d_args) + mounts = get_container_mounts(ctx, ident) if daemon_type in ['mon', 'osd']: # mon and osd need privileged in order for libudev to query devices privileged = True - if daemon_type == 'rgw': - entrypoint = '/usr/bin/radosgw' - name = 'client.rgw.%s' % daemon_id - elif daemon_type == 'rbd-mirror': - entrypoint = '/usr/bin/rbd-mirror' - name = 'client.rbd-mirror.%s' % daemon_id - elif daemon_type == 'cephfs-mirror': - entrypoint = '/usr/bin/cephfs-mirror' - name = 'client.cephfs-mirror.%s' % daemon_id - elif daemon_type == 'crash': - entrypoint = '/usr/bin/ceph-crash' - name = 'client.crash.%s' % daemon_id - elif daemon_type in ['mon', 'mgr', 'mds', 'osd']: - entrypoint = '/usr/bin/ceph-' + daemon_type - name = '%s.%s' % (daemon_type, daemon_id) - elif daemon_type in Monitoring.components: - entrypoint = '' + if daemon_type in Monitoring.components: + monitoring = Monitoring.create(ctx, ident) + entrypoint = monitoring.default_entrypoint() + monitoring.customize_container_args(ctx, container_args) + monitoring.customize_process_args(ctx, d_args) + mounts = get_container_mounts(ctx, ident) elif daemon_type in Tracing.components: - entrypoint = '' - name = '%s.%s' % (daemon_type, daemon_id) - config = fetch_configs(ctx) - Tracing.set_configuration(config, daemon_type) - envs.extend(Tracing.components[daemon_type].get('envs', [])) + tracing = Tracing.create(ctx, ident) + entrypoint = tracing.default_entrypoint() + tracing.customize_container_envs(ctx, envs) + tracing.customize_process_args(ctx, d_args) elif daemon_type == NFSGanesha.daemon_type: - entrypoint = NFSGanesha.entrypoint - name = '%s.%s' % (daemon_type, daemon_id) - envs.extend(NFSGanesha.get_container_envs()) + nfs_ganesha = NFSGanesha.create(ctx, ident) + entrypoint = nfs_ganesha.default_entrypoint() + nfs_ganesha.customize_container_envs(ctx, envs) + nfs_ganesha.customize_container_args(ctx, container_args) + nfs_ganesha.customize_process_args(ctx, d_args) + mounts = get_container_mounts(ctx, ident) elif daemon_type == CephExporter.daemon_type: - entrypoint = CephExporter.entrypoint - name = 'client.ceph-exporter.%s' % daemon_id + ceph_exporter = CephExporter.create(ctx, ident) + entrypoint = ceph_exporter.default_entrypoint() + ceph_exporter.customize_container_envs(ctx, envs) + ceph_exporter.customize_container_args(ctx, container_args) + ceph_exporter.customize_process_args(ctx, d_args) + mounts = get_container_mounts(ctx, ident) elif daemon_type == HAproxy.daemon_type: - name = '%s.%s' % (daemon_type, daemon_id) - container_args.extend(['--user=root']) # haproxy 2.4 defaults to a different user + haproxy = HAproxy.create(ctx, ident) + haproxy.customize_container_args(ctx, container_args) + haproxy.customize_process_args(ctx, d_args) + mounts = get_container_mounts(ctx, ident) elif daemon_type == Keepalived.daemon_type: - name = '%s.%s' % (daemon_type, daemon_id) - envs.extend(Keepalived.get_container_envs()) - container_args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW']) + keepalived = Keepalived.create(ctx, ident) + keepalived.customize_container_envs(ctx, envs) + keepalived.customize_container_args(ctx, container_args) + mounts = get_container_mounts(ctx, ident) elif daemon_type == CephNvmeof.daemon_type: - name = '%s.%s' % (daemon_type, daemon_id) - container_args.extend(['--ulimit', 'memlock=-1:-1']) - container_args.extend(['--ulimit', 'nofile=10240']) - container_args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE']) + nvmeof = CephNvmeof.create(ctx, ident) + nvmeof.customize_container_args(ctx, container_args) + binds = get_container_binds(ctx, ident) + mounts = get_container_mounts(ctx, ident) elif daemon_type == CephIscsi.daemon_type: - entrypoint = CephIscsi.entrypoint - name = '%s.%s' % (daemon_type, daemon_id) + iscsi = CephIscsi.create(ctx, ident) + entrypoint = iscsi.default_entrypoint() + iscsi.customize_container_args(ctx, container_args) # So the container can modprobe iscsi_target_mod and have write perms # to configfs we need to make this a privileged container. privileged = True + binds = get_container_binds(ctx, ident) + mounts = get_container_mounts(ctx, ident) elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(ctx, fsid, daemon_id) - entrypoint = cc.entrypoint + cc = CustomContainer.init(ctx, ident.fsid, ident.daemon_id) + entrypoint = cc.default_entrypoint() host_network = False - envs.extend(cc.get_container_envs()) - container_args.extend(cc.get_container_args()) - - if daemon_type in Monitoring.components: - uid, gid = extract_uid_gid_monitoring(ctx, daemon_type) - monitoring_args = [ - '--user', - str(uid), - # FIXME: disable cpu/memory limits for the time being (not supported - # by ubuntu 18.04 kernel!) - ] - container_args.extend(monitoring_args) - if daemon_type == 'node-exporter': - # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys', - # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation - # between the node-exporter container and the host to avoid selinux denials - container_args.extend(['--security-opt', 'label=disable']) - elif daemon_type == 'crash': - ceph_args = ['-n', name] - elif daemon_type in Ceph.daemons: - ceph_args = ['-n', name, '-f'] + cc.customize_container_envs(ctx, envs) + cc.customize_container_args(ctx, container_args) + cc.customize_process_args(ctx, d_args) + binds = get_container_binds(ctx, ident) + mounts = get_container_mounts(ctx, ident) elif daemon_type == SNMPGateway.daemon_type: - sg = SNMPGateway.init(ctx, fsid, daemon_id) - container_args.append( - f'--env-file={sg.conf_file_path}' - ) - - # if using podman, set -d, --conmon-pidfile & --cidfile flags - # so service can have Type=Forking - if isinstance(ctx.container_engine, Podman): - runtime_dir = '/run' - container_args.extend([ - '-d', '--log-driver', 'journald', - '--conmon-pidfile', - runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, daemon_id), - '--cidfile', - runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id), - ]) - if ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION and not ctx.no_cgroups_split: - container_args.append('--cgroups=split') - # if /etc/hosts doesn't exist, we can be confident - # users aren't using it for host name resolution - # and adding --no-hosts avoids bugs created in certain daemons - # by modifications podman makes to /etc/hosts - # https://tracker.ceph.com/issues/58532 - # https://tracker.ceph.com/issues/57018 - if not os.path.exists('/etc/hosts'): - container_args.extend(['--no-hosts']) + sg = SNMPGateway.create(ctx, ident) + sg.customize_container_args(ctx, container_args) + sg.customize_process_args(ctx, d_args) + _update_container_args_for_podman(ctx, ident, container_args) return CephContainer.for_daemon( ctx, - fsid=fsid, - daemon_type=daemon_type, - daemon_id=str(daemon_id), + ident=ident, entrypoint=entrypoint, - args=ceph_args + get_daemon_args(ctx, fsid, daemon_type, daemon_id), + args=d_args, container_args=container_args, - volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), - bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), + volume_mounts=mounts, + bind_mounts=binds, envs=envs, privileged=privileged, ptrace=ptrace, @@ -3637,42 +3065,20 @@ def get_container(ctx: CephadmContext, ) -def extract_uid_gid(ctx, img='', file_path='/var/lib/ceph'): - # type: (CephadmContext, str, Union[str, List[str]]) -> Tuple[int, int] - - if not img: - img = ctx.image - - if isinstance(file_path, str): - paths = [file_path] - else: - paths = file_path - - ex: Optional[Tuple[str, RuntimeError]] = None - - for fp in paths: - try: - out = CephContainer( - ctx, - image=img, - entrypoint='stat', - args=['-c', '%u %g', fp] - ).run(verbosity=CallVerbosity.QUIET_UNLESS_ERROR) - uid, gid = out.split(' ') - return int(uid), int(gid) - except RuntimeError as e: - ex = (fp, e) - if ex: - raise Error(f'Failed to extract uid/gid for path {ex[0]}: {ex[1]}') - - raise RuntimeError('uid/gid not found') +def _update_container_args_for_podman( + ctx: CephadmContext, ident: DaemonIdentity, container_args: List[str] +) -> None: + if not isinstance(ctx.container_engine, Podman): + return + service_name = f'{ident.unit_name}.service' + container_args.extend( + ctx.container_engine.service_args(ctx, service_name) + ) def deploy_daemon( ctx: CephadmContext, - fsid: str, - daemon_type: str, - daemon_id: Union[int, str], + ident: 'DaemonIdentity', c: Optional['CephContainer'], uid: int, gid: int, @@ -3684,6 +3090,7 @@ def deploy_daemon( init_containers: Optional[List['InitContainer']] = None, ) -> None: endpoints = endpoints or [] + daemon_type = ident.daemon_type # only check port in use if fresh deployment since service # we are redeploying/reconfiguring will already be using the port if deployment_type == DeploymentType.DEFAULT: @@ -3697,7 +3104,7 @@ def deploy_daemon( else: raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, endpoints)), daemon_type)) - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) + data_dir = ident.data_dir(ctx.data_dir) if deployment_type == DeploymentType.RECONFIG and not os.path.exists(data_dir): raise Error('cannot reconfig, data path %s does not exist' % data_dir) if daemon_type == 'mon' and not os.path.exists(data_dir): @@ -3710,23 +3117,24 @@ def deploy_daemon( tmp_config = write_tmp(config, uid, gid) # --mkfs - create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid) - mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', daemon_id) - log_dir = get_log_dir(fsid, ctx.log_dir) + create_daemon_dirs(ctx, ident, uid, gid) + assert ident.daemon_type == 'mon' + mon_dir = ident.data_dir(ctx.data_dir) + log_dir = get_log_dir(ident.fsid, ctx.log_dir) CephContainer( ctx, image=ctx.image, entrypoint='/usr/bin/ceph-mon', args=[ '--mkfs', - '-i', str(daemon_id), - '--fsid', fsid, + '-i', ident.daemon_id, + '--fsid', ident.fsid, '-c', '/tmp/config', '--keyring', '/tmp/keyring', - ] + get_daemon_args(ctx, fsid, 'mon', daemon_id), + ] + Ceph.create(ctx, ident).get_daemon_args(), volume_mounts={ log_dir: '/var/log/ceph:z', - mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (daemon_id), + mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (ident.daemon_id), tmp_keyring.name: '/tmp/keyring:z', tmp_config.name: '/tmp/config:z', }, @@ -3737,11 +3145,7 @@ def deploy_daemon( f.write(config) else: # dirs, conf, keyring - create_daemon_dirs( - ctx, - fsid, daemon_type, daemon_id, - uid, gid, - config, keyring) + create_daemon_dirs(ctx, ident, uid, gid, config, keyring) # only write out unit files and start daemon # with systemd if this is not a reconfig @@ -3750,13 +3154,20 @@ def deploy_daemon( config_js = fetch_configs(ctx) assert isinstance(config_js, dict) - cephadm_agent = CephadmAgent(ctx, fsid, daemon_id) + cephadm_agent = CephadmAgent(ctx, ident.fsid, ident.daemon_id) cephadm_agent.deploy_daemon_unit(config_js) else: if c: - deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, - c, osd_fsid=osd_fsid, endpoints=endpoints, - init_containers=init_containers) + deploy_daemon_units( + ctx, + ident, + uid, + gid, + c, + osd_fsid=osd_fsid, + endpoints=endpoints, + init_containers=init_containers, + ) else: raise RuntimeError('attempting to deploy a daemon without a container image') @@ -3767,23 +3178,22 @@ def deploy_daemon( with write_new(data_dir + '/unit.configured', owner=(uid, gid)) as f: f.write('mtime is time we were last configured\n') - update_firewalld(ctx, daemon_type) + update_firewalld(ctx, daemon_form_create(ctx, ident)) # Open ports explicitly required for the daemon - if endpoints: - fw = Firewalld(ctx) - fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, [])) - fw.apply_rules() + if not ('skip_firewalld' in ctx and ctx.skip_firewalld): + if endpoints: + fw = Firewalld(ctx) + fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, [])) + fw.apply_rules() # If this was a reconfig and the daemon is not a Ceph daemon, restart it # so it can pick up potential changes to its configuration files - if deployment_type == DeploymentType.RECONFIG and daemon_type not in Ceph.daemons: + if deployment_type == DeploymentType.RECONFIG and daemon_type not in ceph_daemons(): # ceph daemons do not need a restart; others (presumably) do to pick # up the new config - call_throws(ctx, ['systemctl', 'reset-failed', - get_unit_name(fsid, daemon_type, daemon_id)]) - call_throws(ctx, ['systemctl', 'restart', - get_unit_name(fsid, daemon_type, daemon_id)]) + call_throws(ctx, ['systemctl', 'reset-failed', ident.unit_name]) + call_throws(ctx, ['systemctl', 'restart', ident.unit_name]) def _bash_cmd( @@ -3904,11 +3314,9 @@ def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str) -> None: def deploy_daemon_units( ctx: CephadmContext, - fsid: str, + ident: 'DaemonIdentity', uid: int, gid: int, - daemon_type: str, - daemon_id: Union[int, str], container: 'CephContainer', enable: bool = True, start: bool = True, @@ -3918,23 +3326,30 @@ def deploy_daemon_units( ) -> None: # cmd - data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) + # unpack values from ident because they're used very frequently + fsid = ident.fsid + daemon_type = ident.daemon_type + daemon_id = ident.daemon_id + + data_dir = ident.data_dir(ctx.data_dir) run_file_path = data_dir + '/unit.run' meta_file_path = data_dir + '/unit.meta' with write_new(run_file_path) as f, write_new(meta_file_path) as metaf: f.write('set -e\n') - if daemon_type in Ceph.daemons: + if daemon_type in ceph_daemons(): install_path = find_program('install') f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid)) # pre-start cmd(s) if daemon_type == 'osd': assert osd_fsid - _write_osd_unit_run_commands(ctx, f, daemon_type, str(daemon_id), fsid, osd_fsid, data_dir, uid, gid) + _write_osd_unit_run_commands( + ctx, f, ident, osd_fsid, data_dir, uid, gid + ) elif daemon_type == CephIscsi.daemon_type: - _write_iscsi_unit_run_commands(ctx, f, daemon_type, str(daemon_id), fsid, data_dir) + _write_iscsi_unit_run_commands(ctx, f, ident, data_dir) init_containers = init_containers or [] if init_containers: _write_init_container_cmds_clean(ctx, f, init_containers[0]) @@ -3964,9 +3379,9 @@ def deploy_daemon_units( _write_stop_actions(ctx, cast(TextIO, f), container, timeout) if daemon_type == 'osd': assert osd_fsid - _write_osd_unit_poststop_commands(ctx, f, daemon_type, str(daemon_id), fsid, osd_fsid) + _write_osd_unit_poststop_commands(ctx, f, ident, osd_fsid) elif daemon_type == CephIscsi.daemon_type: - _write_iscsi_unit_poststop_commands(ctx, f, daemon_type, str(daemon_id), fsid, data_dir) + _write_iscsi_unit_poststop_commands(ctx, f, ident, data_dir) # post-stop command(s) with write_new(data_dir + '/unit.stop') as f: @@ -3977,7 +3392,7 @@ def deploy_daemon_units( f.write(container.image + '\n') # sysctl - install_sysctl(ctx, fsid, daemon_type) + install_sysctl(ctx, fsid, daemon_form_create(ctx, ident)) # systemd install_base_units(ctx, fsid) @@ -4013,9 +3428,7 @@ def _write_stop_actions( def _write_osd_unit_run_commands( ctx: CephadmContext, f: IO, - daemon_type: str, - daemon_id: str, - fsid: str, + ident: 'DaemonIdentity', osd_fsid: str, data_dir: str, uid: int, @@ -4023,7 +3436,7 @@ def _write_osd_unit_run_commands( ) -> None: # osds have a pre-start step simple_fn = os.path.join('/etc/ceph/osd', - '%s-%s.json.adopted-by-cephadm' % (daemon_id, osd_fsid)) + '%s-%s.json.adopted-by-cephadm' % (ident.daemon_id, osd_fsid)) if os.path.exists(simple_fn): f.write('# Simple OSDs need chown on startup:\n') for n in ['block', 'block.db', 'block.wal']: @@ -4032,11 +3445,14 @@ def _write_osd_unit_run_commands( else: # if ceph-volume does not support 'ceph-volume activate', we must # do 'ceph-volume lvm activate'. + fsid = ident.fsid + daemon_type = ident.daemon_type + daemon_id = ident.daemon_id test_cv = get_ceph_volume_container( ctx, args=['activate', '--bad-option'], - volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), - bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), + volume_mounts=get_container_mounts(ctx, ident), + bind_mounts=get_container_binds(ctx, ident), cname='ceph-%s-%s.%s-activate-test' % (fsid, daemon_type, daemon_id), ) out, err, ret = call(ctx, test_cv.run_cmd(), verbosity=CallVerbosity.SILENT) @@ -4061,282 +3477,51 @@ def _write_osd_unit_run_commands( prestart = get_ceph_volume_container( ctx, args=cmd, - volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), - bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), + volume_mounts=get_container_mounts(ctx, ident), + bind_mounts=get_container_binds(ctx, ident), cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id), ) _write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate') def _write_iscsi_unit_run_commands( - ctx: CephadmContext, f: IO, daemon_type: str, daemon_id: str, fsid: str, data_dir: str + ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', data_dir: str ) -> None: f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n') - ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) + ceph_iscsi = CephIscsi.init(ctx, ident.fsid, ident.daemon_id) tcmu_container = ceph_iscsi.get_tcmu_runner_container() _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runner container', background=True) def _write_osd_unit_poststop_commands( - ctx: CephadmContext, f: IO, daemon_type: str, daemon_id: str, fsid: str, osd_fsid: str + ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', osd_fsid: str ) -> None: poststop = get_ceph_volume_container( ctx, args=[ 'lvm', 'deactivate', - str(daemon_id), osd_fsid, + ident.daemon_id, osd_fsid, ], - volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), - bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), - cname='ceph-%s-%s.%s-deactivate' % (fsid, daemon_type, - daemon_id), + volume_mounts=get_container_mounts(ctx, ident), + bind_mounts=get_container_binds(ctx, ident), + cname='ceph-%s-%s.%s-deactivate' % (ident.fsid, ident.daemon_type, ident.daemon_id), ) _write_container_cmd_to_bash(ctx, f, poststop, 'deactivate osd') def _write_iscsi_unit_poststop_commands( - ctx: CephadmContext, f: IO, daemon_type: str, daemon_id: str, fsid: str, data_dir: str + ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', data_dir: str ) -> None: # make sure we also stop the tcmu container runtime_dir = '/run' - ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) + ceph_iscsi = CephIscsi.init(ctx, ident.fsid, ident.daemon_id) tcmu_container = ceph_iscsi.get_tcmu_runner_container() f.write('! ' + ' '.join(tcmu_container.stop_cmd()) + '\n') - f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n') - f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n') + f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n') + f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n') f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n') -class Firewalld(object): - - # for specifying ports we should always open when opening - # ports for a daemon of that type. Main use case is for ports - # that we should open when deploying the daemon type but that - # the daemon itself may not necessarily need to bind to the port. - # This needs to be handed differently as we don't want to fail - # deployment if the port cannot be bound to but we still want to - # open the port in the firewall. - external_ports: Dict[str, List[int]] = { - 'iscsi': [3260] # 3260 is the well known iSCSI port - } - - def __init__(self, ctx): - # type: (CephadmContext) -> None - self.ctx = ctx - self.available = self.check() - - def check(self): - # type: () -> bool - self.cmd = find_executable('firewall-cmd') - if not self.cmd: - logger.debug('firewalld does not appear to be present') - return False - (enabled, state, _) = check_unit(self.ctx, 'firewalld.service') - if not enabled: - logger.debug('firewalld.service is not enabled') - return False - if state != 'running': - logger.debug('firewalld.service is not running') - return False - - logger.info('firewalld ready') - return True - - def enable_service_for(self, daemon_type): - # type: (str) -> None - if not self.available: - logger.debug('Not possible to enable service <%s>. firewalld.service is not available' % daemon_type) - return - - if daemon_type == 'mon': - svc = 'ceph-mon' - elif daemon_type in ['mgr', 'mds', 'osd']: - svc = 'ceph' - elif daemon_type == NFSGanesha.daemon_type: - svc = 'nfs' - else: - return - - if not self.cmd: - raise RuntimeError('command not defined') - - out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG) - if ret: - logger.info('Enabling firewalld service %s in current zone...' % svc) - out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-service', svc]) - if ret: - raise RuntimeError( - 'unable to add service %s to current zone: %s' % (svc, err)) - else: - logger.debug('firewalld service %s is enabled in current zone' % svc) - - def open_ports(self, fw_ports): - # type: (List[int]) -> None - if not self.available: - logger.debug('Not possible to open ports <%s>. firewalld.service is not available' % fw_ports) - return - - if not self.cmd: - raise RuntimeError('command not defined') - - for port in fw_ports: - tcp_port = str(port) + '/tcp' - out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG) - if ret: - logger.info('Enabling firewalld port %s in current zone...' % tcp_port) - out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-port', tcp_port]) - if ret: - raise RuntimeError('unable to add port %s to current zone: %s' % - (tcp_port, err)) - else: - logger.debug('firewalld port %s is enabled in current zone' % tcp_port) - - def close_ports(self, fw_ports): - # type: (List[int]) -> None - if not self.available: - logger.debug('Not possible to close ports <%s>. firewalld.service is not available' % fw_ports) - return - - if not self.cmd: - raise RuntimeError('command not defined') - - for port in fw_ports: - tcp_port = str(port) + '/tcp' - out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG) - if not ret: - logger.info('Disabling port %s in current zone...' % tcp_port) - out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--remove-port', tcp_port]) - if ret: - raise RuntimeError('unable to remove port %s from current zone: %s' % - (tcp_port, err)) - else: - logger.info(f'Port {tcp_port} disabled') - else: - logger.info(f'firewalld port {tcp_port} already closed') - - def apply_rules(self): - # type: () -> None - if not self.available: - return - - if not self.cmd: - raise RuntimeError('command not defined') - - call_throws(self.ctx, [self.cmd, '--reload']) - - -def update_firewalld(ctx, daemon_type): - # type: (CephadmContext, str) -> None - if not ('skip_firewalld' in ctx and ctx.skip_firewalld): - firewall = Firewalld(ctx) - firewall.enable_service_for(daemon_type) - firewall.apply_rules() - - -def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None: - """ - Set up sysctl settings - """ - def _write(conf: Path, lines: List[str]) -> None: - lines = [ - '# created by cephadm', - '', - *lines, - '', - ] - with write_new(conf, owner=None, perms=None) as f: - f.write('\n'.join(lines)) - - conf = Path(ctx.sysctl_dir).joinpath(f'90-ceph-{fsid}-{daemon_type}.conf') - lines: List = [] - - if daemon_type == 'osd': - lines = OSD.get_sysctl_settings() - elif daemon_type == 'haproxy': - lines = HAproxy.get_sysctl_settings() - elif daemon_type == 'keepalived': - lines = Keepalived.get_sysctl_settings() - elif daemon_type == CephNvmeof.daemon_type: - lines = CephNvmeof.get_sysctl_settings() - lines = filter_sysctl_settings(ctx, lines) - - # apply the sysctl settings - if lines: - Path(ctx.sysctl_dir).mkdir(mode=0o755, exist_ok=True) - _write(conf, lines) - call_throws(ctx, ['sysctl', '--system']) - - -def sysctl_get(ctx: CephadmContext, variable: str) -> Union[str, None]: - """ - Read a sysctl setting by executing 'sysctl -b {variable}' - """ - out, err, code = call(ctx, ['sysctl', '-b', variable]) - return out or None - - -def filter_sysctl_settings(ctx: CephadmContext, lines: List[str]) -> List[str]: - """ - Given a list of sysctl settings, examine the system's current configuration - and return those which are not currently set as described. - """ - def test_setting(desired_line: str) -> bool: - # Remove any comments - comment_start = desired_line.find('#') - if comment_start != -1: - desired_line = desired_line[:comment_start] - desired_line = desired_line.strip() - if not desired_line or desired_line.isspace(): - return False - setting, desired_value = map(lambda s: s.strip(), desired_line.split('=')) - if not setting or not desired_value: - return False - actual_value = sysctl_get(ctx, setting) - return desired_value != actual_value - return list(filter(test_setting, lines)) - - -def migrate_sysctl_dir(ctx: CephadmContext, fsid: str) -> None: - """ - Cephadm once used '/usr/lib/sysctl.d' for storing sysctl configuration. - This moves it to '/etc/sysctl.d'. - """ - deprecated_location: str = '/usr/lib/sysctl.d' - deprecated_confs: List[str] = glob(f'{deprecated_location}/90-ceph-{fsid}-*.conf') - if not deprecated_confs: - return - - file_count: int = len(deprecated_confs) - logger.info(f'Found sysctl {file_count} files in deprecated location {deprecated_location}. Starting Migration.') - for conf in deprecated_confs: - try: - shutil.move(conf, ctx.sysctl_dir) - file_count -= 1 - except shutil.Error as err: - if str(err).endswith('already exists'): - logger.warning(f'Destination file already exists. Deleting {conf}.') - try: - os.unlink(conf) - file_count -= 1 - except OSError as del_err: - logger.warning(f'Could not remove {conf}: {del_err}.') - else: - logger.warning(f'Could not move {conf} from {deprecated_location} to {ctx.sysctl_dir}: {err}') - - # Log successful migration - if file_count == 0: - logger.info(f'Successfully migrated sysctl config to {ctx.sysctl_dir}.') - return - - # Log partially successful / unsuccessful migration - files_processed: int = len(deprecated_confs) - if file_count < files_processed: - status: str = f'partially successful (failed {file_count}/{files_processed})' - elif file_count == files_processed: - status = 'unsuccessful' - logger.warning(f'Migration of sysctl configuration {status}. You may want to perform a migration manually.') - - def install_base_units(ctx, fsid): # type: (CephadmContext, str) -> None """ @@ -4419,534 +3604,24 @@ def install_base_units(ctx, fsid): """ % (fsid, ' '.join(targets), '|'.join(targets))) -def get_unit_file(ctx, fsid): - # type: (CephadmContext, str) -> str - extra_args = '' - if isinstance(ctx.container_engine, Podman): - extra_args = ('ExecStartPre=-/bin/rm -f %t/%n-pid %t/%n-cid\n' - 'ExecStopPost=-/bin/rm -f %t/%n-pid %t/%n-cid\n' - 'Type=forking\n' - 'PIDFile=%t/%n-pid\n') - if ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION: - extra_args += 'Delegate=yes\n' - - docker = isinstance(ctx.container_engine, Docker) - u = """# generated by cephadm -[Unit] -Description=Ceph %i for {fsid} - -# According to: -# http://www.freedesktop.org/wiki/Software/systemd/NetworkTarget -# these can be removed once ceph-mon will dynamically change network -# configuration. -After=network-online.target local-fs.target time-sync.target{docker_after} -Wants=network-online.target local-fs.target time-sync.target -{docker_requires} - -PartOf=ceph-{fsid}.target -Before=ceph-{fsid}.target - -[Service] -LimitNOFILE=1048576 -LimitNPROC=1048576 -EnvironmentFile=-/etc/environment -ExecStart=/bin/bash {data_dir}/{fsid}/%i/unit.run -ExecStop=-/bin/bash -c 'bash {data_dir}/{fsid}/%i/unit.stop' -ExecStopPost=-/bin/bash {data_dir}/{fsid}/%i/unit.poststop -KillMode=none -Restart=on-failure -RestartSec=10s -TimeoutStartSec=200 -TimeoutStopSec=120 -StartLimitInterval=30min -StartLimitBurst=5 -{extra_args} -[Install] -WantedBy=ceph-{fsid}.target -""".format(fsid=fsid, - data_dir=ctx.data_dir, - extra_args=extra_args, - # if docker, we depend on docker.service - docker_after=' docker.service' if docker else '', - docker_requires='Requires=docker.service\n' if docker else '') - - return u +def get_unit_file(ctx: CephadmContext, fsid: str) -> str: + has_docker_engine = isinstance(ctx.container_engine, Docker) + has_podman_engine = isinstance(ctx.container_engine, Podman) + has_podman_split_version = ( + has_podman_engine and ctx.container_engine.supports_split_cgroups + ) + return templating.render( + ctx, + templating.Templates.ceph_service, + fsid=fsid, + has_docker_engine=has_docker_engine, + has_podman_engine=has_podman_engine, + has_podman_split_version=has_podman_split_version, + ) ################################## -class DaemonIdentity: - def __init__( - self, - fsid: str, - daemon_type: str, - daemon_id: Union[int, str], - subcomponent: str = '', - ) -> None: - self._fsid = fsid - self._daemon_type = daemon_type - self._daemon_id = str(daemon_id) - self._subcomponent = subcomponent - - @property - def fsid(self) -> str: - return self._fsid - - @property - def daemon_type(self) -> str: - return self._daemon_type - - @property - def daemon_id(self) -> str: - return self._daemon_id - - @property - def subcomponent(self) -> str: - return self._subcomponent - - @property - def legacy_container_name(self) -> str: - return 'ceph-%s-%s.%s' % (self.fsid, self.daemon_type, self.daemon_id) - - @property - def container_name(self) -> str: - name = f'ceph-{self.fsid}-{self.daemon_type}-{self.daemon_id}' - if self.subcomponent: - name = f'{name}-{self.subcomponent}' - return name.replace('.', '-') - - def _replace( - self, - *, - fsid: Optional[str] = None, - daemon_type: Optional[str] = None, - daemon_id: Union[None, int, str] = None, - subcomponent: Optional[str] = None, - ) -> 'DaemonIdentity': - return self.__class__( - fsid=self.fsid if fsid is None else fsid, - daemon_type=( - self.daemon_type if daemon_type is None else daemon_type - ), - daemon_id=self.daemon_id if daemon_id is None else daemon_id, - subcomponent=( - self.subcomponent if subcomponent is None else subcomponent - ), - ) - - @classmethod - def from_name(cls, fsid: str, name: str) -> 'DaemonIdentity': - daemon_type, daemon_id = name.split('.', 1) - return cls(fsid, daemon_type, daemon_id) - - @classmethod - def from_context(cls, ctx: 'CephadmContext') -> 'DaemonIdentity': - return cls.from_name(ctx.fsid, ctx.name) - - -class BasicContainer: - def __init__( - self, - ctx: CephadmContext, - *, - image: str, - entrypoint: str, - identity: Optional['DaemonIdentity'], - args: Optional[List[str]] = None, - container_args: Optional[List[str]] = None, - envs: Optional[List[str]] = None, - volume_mounts: Optional[Dict[str, str]] = None, - bind_mounts: Optional[List[List[str]]] = None, - network: str = '', - ipc: str = '', - init: bool = False, - ptrace: bool = False, - privileged: bool = False, - remove: bool = False, - memory_request: Optional[str] = None, - memory_limit: Optional[str] = None, - ) -> None: - self.ctx = ctx - self.image = image - self.entrypoint = entrypoint - self.identity = identity - self.args = args or [] - self.container_args = container_args or [] - self.envs = envs or [] - self.volume_mounts = volume_mounts or {} - self.bind_mounts = bind_mounts or [] - self.network = network - self.ipc = ipc - self.init = init - self.ptrace = ptrace - self.privileged = privileged - self.remove = remove - self.memory_request = memory_request - self.memory_limit = memory_limit - - @property - def _container_engine(self) -> str: - return self.ctx.container_engine.path - - @property - def _using_podman(self) -> bool: - return isinstance(self.ctx.container_engine, Podman) - - @property - def _using_docker(self) -> bool: - return isinstance(self.ctx.container_engine, Docker) - - @property - def cname(self) -> str: - assert self.identity - return self.identity.container_name - - def build_run_cmd(self) -> List[str]: - cmd_args: List[str] = [self._container_engine] - cmd_args.append('run') - if self.remove: - cmd_args.append('--rm') - if self.ipc: - cmd_args.append(f'--ipc={self.ipc}') - # some containers (ahem, haproxy) override this, but we want a fast - # shutdown always (and, more importantly, a successful exit even if we - # fall back to SIGKILL). - cmd_args.append('--stop-signal=SIGTERM') - - if isinstance(self.ctx.container_engine, Podman): - if os.path.exists('/etc/ceph/podman-auth.json'): - cmd_args.append('--authfile=/etc/ceph/podman-auth.json') - - if isinstance(self.ctx.container_engine, Docker): - cmd_args.extend(['--ulimit', 'nofile=1048576']) - - if self.memory_request: - cmd_args.extend(['-e', 'POD_MEMORY_REQUEST', str(self.memory_request)]) - if self.memory_limit: - cmd_args.extend(['-e', 'POD_MEMORY_LIMIT', str(self.memory_limit)]) - cmd_args.extend(['--memory', str(self.memory_limit)]) - - if self.network: - cmd_args.append(f'--net={self.network}') - if self.entrypoint: - cmd_args.extend(['--entrypoint', self.entrypoint]) - if self.privileged: - cmd_args.extend([ - '--privileged', - # let OSD etc read block devs that haven't been chowned - '--group-add=disk']) - if self.ptrace and not self.privileged: - # if privileged, the SYS_PTRACE cap is already added - # in addition, --cap-add and --privileged are mutually - # exclusive since podman >= 2.0 - cmd_args.append('--cap-add=SYS_PTRACE') - if self.init: - cmd_args.append('--init') - if self.cname: - cmd_args.extend(['--name', self.cname]) - - envs: List[str] = [ - '-e', 'CONTAINER_IMAGE=%s' % self.image, - ] - if self.envs: - for env in self.envs: - envs.extend(['-e', env]) - - vols: List[str] = [] - vols = sum( - [ - ['-v', '%s:%s' % (host_dir, container_dir)] - for host_dir, container_dir in self.volume_mounts.items() - ], - [], - ) - - binds: List[str] = [] - binds = sum( - [ - ['--mount', '{}'.format(','.join(bind))] - for bind in self.bind_mounts - ], - [], - ) - - return ( - cmd_args - + self.container_args - + envs - + vols - + binds - + [self.image] - + self.args - ) - - def build_rm_cmd(self, cname: str = '', storage: bool = False) -> List[str]: - cmd = [ - self._container_engine, - 'rm', - '-f', - ] - if storage: - cmd.append('--storage') - cmd.append(cname or self.cname) - return cmd - - def build_stop_cmd(self, cname: str = '', timeout: Optional[int] = None) -> List[str]: - cmd = [self._container_engine, 'stop'] - if timeout is not None: - cmd.extend(('-t', str(timeout))) - cmd.append(cname or self.cname) - return cmd - - -class CephContainer(BasicContainer): - def __init__(self, - ctx: CephadmContext, - image: str, - entrypoint: str, - args: List[str] = [], - volume_mounts: Dict[str, str] = {}, - identity: Optional['DaemonIdentity'] = None, - cname: str = '', - container_args: List[str] = [], - envs: Optional[List[str]] = None, - privileged: bool = False, - ptrace: bool = False, - bind_mounts: Optional[List[List[str]]] = None, - init: Optional[bool] = None, - host_network: bool = True, - memory_request: Optional[str] = None, - memory_limit: Optional[str] = None, - ) -> None: - self.ctx = ctx - self.image = image - self.entrypoint = entrypoint - self.args = args - self.volume_mounts = volume_mounts - self.identity = identity - self._cname = cname - self.container_args = container_args - self.envs = envs or [] - self.privileged = privileged - self.ptrace = ptrace - self.bind_mounts = bind_mounts if bind_mounts else [] - self.init = init if init else ctx.container_init - self.host_network = host_network - self.memory_request = memory_request - self.memory_limit = memory_limit - self.remove = True - self.ipc = 'host' - self.network = 'host' if self.host_network else '' - - @classmethod - def for_daemon(cls, - ctx: CephadmContext, - fsid: str, - daemon_type: str, - daemon_id: str, - entrypoint: str, - args: List[str] = [], - volume_mounts: Dict[str, str] = {}, - container_args: List[str] = [], - envs: Optional[List[str]] = None, - privileged: bool = False, - ptrace: bool = False, - bind_mounts: Optional[List[List[str]]] = None, - init: Optional[bool] = None, - host_network: bool = True, - memory_request: Optional[str] = None, - memory_limit: Optional[str] = None, - ) -> 'CephContainer': - ident = DaemonIdentity(fsid, daemon_type, daemon_id) - return cls( - ctx, - image=ctx.image, - entrypoint=entrypoint, - args=args, - volume_mounts=volume_mounts, - identity=ident, - container_args=container_args, - envs=envs, - privileged=privileged, - ptrace=ptrace, - bind_mounts=bind_mounts, - init=init, - host_network=host_network, - memory_request=memory_request, - memory_limit=memory_limit, - ) - - @property - def cname(self) -> str: - """ - podman adds the current container name to the /etc/hosts - file. Turns out, python's `socket.getfqdn()` differs from - `hostname -f`, when we have the container names containing - dots in it.: - - # podman run --name foo.bar.baz.com ceph/ceph /bin/bash - [root@sebastians-laptop /]# cat /etc/hosts - 127.0.0.1 localhost - ::1 localhost - 127.0.1.1 sebastians-laptop foo.bar.baz.com - [root@sebastians-laptop /]# hostname -f - sebastians-laptop - [root@sebastians-laptop /]# python3 -c 'import socket; print(socket.getfqdn())' - foo.bar.baz.com - - Fascinatingly, this doesn't happen when using dashes. - """ - if not self._cname and self.identity: - return self.identity.container_name - return self._cname.replace('.', '-') - - @cname.setter - def cname(self, val: str) -> None: - self._cname = val - - @property - def old_cname(self) -> str: - if not self._cname and self.identity: - return self.identity.legacy_container_name - return self._cname - - def run_cmd(self) -> List[str]: - if not (self.envs and self.envs[0].startswith('NODE_NAME=')): - self.envs.insert(0, 'NODE_NAME=%s' % get_hostname()) - return self.build_run_cmd() - - def rm_cmd(self, old_cname: bool = False, storage: bool = False) -> List[str]: - return self.build_rm_cmd( - cname=self.old_cname if old_cname else self.cname, - storage=storage, - ) - - def stop_cmd(self, old_cname: bool = False, timeout: Optional[int] = None) -> List[str]: - return self.build_stop_cmd( - cname=self.old_cname if old_cname else self.cname, - timeout=timeout, - ) - - def shell_cmd(self, cmd: List[str]) -> List[str]: - cmd_args: List[str] = [ - str(self.ctx.container_engine.path), - 'run', - '--rm', - '--ipc=host', - ] - envs: List[str] = [ - '-e', 'CONTAINER_IMAGE=%s' % self.image, - '-e', 'NODE_NAME=%s' % get_hostname(), - ] - vols: List[str] = [] - binds: List[str] = [] - - if self.host_network: - cmd_args.append('--net=host') - if self.ctx.no_hosts: - cmd_args.append('--no-hosts') - if self.privileged: - cmd_args.extend([ - '--privileged', - # let OSD etc read block devs that haven't been chowned - '--group-add=disk', - ]) - if self.init: - cmd_args.append('--init') - if self.envs: - for env in self.envs: - envs.extend(['-e', env]) - - vols = sum( - [['-v', '%s:%s' % (host_dir, container_dir)] - for host_dir, container_dir in self.volume_mounts.items()], []) - binds = sum([['--mount', '{}'.format(','.join(bind))] - for bind in self.bind_mounts], []) - - return cmd_args + self.container_args + envs + vols + binds + [ - '--entrypoint', cmd[0], - self.image, - ] + cmd[1:] - - def exec_cmd(self, cmd): - # type: (List[str]) -> List[str] - cname = get_running_container_name(self.ctx, self) - if not cname: - raise Error('unable to find container "{}"'.format(self.cname)) - return [ - str(self.ctx.container_engine.path), - 'exec', - ] + self.container_args + [ - self.cname, - ] + cmd - - def run(self, timeout=DEFAULT_TIMEOUT, verbosity=CallVerbosity.VERBOSE_ON_FAILURE): - # type: (Optional[int], CallVerbosity) -> str - out, _, _ = call_throws(self.ctx, self.run_cmd(), - desc=self.entrypoint, timeout=timeout, verbosity=verbosity) - return out - - -class InitContainer(BasicContainer): - @classmethod - def from_primary_and_opts( - cls, - ctx: CephadmContext, - primary: 'CephContainer', - opts: Dict[str, Any], - data_dir: str = '', - ) -> 'InitContainer': - if not opts: - raise Error('no init container values provided') - # volume mounts are specified relative to a dir in custom container - # if we are going to inherit the dirs from the primary then we - # just copy it. If not, we have to convert the relative paths - # into absolute paths. - assert primary.identity - vmounts = opts.get('volume_mounts') - if not vmounts: - vmounts = primary.volume_mounts - else: - data_dir = data_dir or get_data_dir( - primary.identity.fsid, - ctx.data_dir, - primary.identity.daemon_type, - primary.identity.daemon_id, - ) - vmounts = { - os.path.join(data_dir, src): dst - for src, dst in vmounts.items() - } - return cls( - ctx, - identity=primary.identity._replace(subcomponent='init'), - image=opts.get('image', primary.image), - entrypoint=opts.get('entrypoint', primary.entrypoint), - # note: args is not inherited from primary container - args=opts.get('entrypoint_args', []), - volume_mounts=vmounts, - envs=opts.get('envs', primary.envs), - # note: privileged is not inherited from primary container - # we really ought to minimize running stuff as privileged - privileged=opts.get('privileged', False), - init=False, - ptrace=primary.ptrace, - remove=False, - memory_request=primary.memory_request, - memory_limit=primary.memory_limit, - ) - # Things we are currently not handling: - # container_args, bind_mounts, network, ipc - - def run_cmd(self) -> List[str]: - return self.build_run_cmd() - - def rm_cmd(self, storage: bool = False) -> List[str]: - return self.build_rm_cmd(storage=storage) - - -##################################### - class MgrListener(Thread): def __init__(self, agent: 'CephadmAgent') -> None: self.agent = agent @@ -5014,7 +3689,8 @@ class MgrListener(Thread): self.agent.wakeup() -class CephadmAgent(): +@register_daemon_form +class CephadmAgent(DaemonForm): daemon_type = 'agent' default_port = 8498 @@ -5029,6 +3705,18 @@ class CephadmAgent(): 'listener.key', ] + @classmethod + def for_daemon_type(cls, daemon_type: str) -> bool: + return cls.daemon_type == daemon_type + + @classmethod + def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephadmAgent': + return cls(ctx, ident.fsid, ident.daemon_id) + + @property + def identity(self) -> DaemonIdentity: + return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id) + def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''): self.ctx = ctx self.fsid = fsid @@ -5102,24 +3790,8 @@ class CephadmAgent(): return ('set -e\n' + f'{py3} {binary_path} agent --fsid {self.fsid} --daemon-id {self.daemon_id} &\n') def unit_file(self) -> str: - return """#generated by cephadm -[Unit] -Description=cephadm agent for cluster {fsid} - -PartOf=ceph-{fsid}.target -Before=ceph-{fsid}.target - -[Service] -Type=forking -ExecStart=/bin/bash {data_dir}/unit.run -Restart=on-failure -RestartSec=10s - -[Install] -WantedBy=ceph-{fsid}.target -""".format( - fsid=self.fsid, - data_dir=self.daemon_dir + return templating.render( + self.ctx, templating.Templates.agent_service, agent=self ) def shutdown(self) -> None: @@ -5306,7 +3978,11 @@ WantedBy=ceph-{fsid}.target 'enabled': 'true' if enabled else 'false', 'state': state, } - c = CephContainer.for_daemon(self.ctx, self.ctx.fsid, daemon_type, daemon_id, 'bash') + c = CephContainer.for_daemon( + self.ctx, + DaemonIdentity(self.ctx.fsid, daemon_type, daemon_id), + 'bash', + ) container_id: Optional[str] = None for name in (c.cname, c.old_cname): if name in name_id_mapping: @@ -5447,21 +4123,73 @@ def command_agent(ctx: CephadmContext) -> None: def command_version(ctx): # type: (CephadmContext) -> int import importlib + import zipimport + import types + vmod: Optional[types.ModuleType] + zmod: Optional[types.ModuleType] try: - vmod = importlib.import_module('_version') + vmod = importlib.import_module('_cephadmmeta.version') + zmod = vmod except ImportError: - print('cephadm version UNKNOWN') - return 1 - _unset = '<UNSET>' - print('cephadm version {0} ({1}) {2} ({3})'.format( - getattr(vmod, 'CEPH_GIT_NICE_VER', _unset), - getattr(vmod, 'CEPH_GIT_VER', _unset), - getattr(vmod, 'CEPH_RELEASE_NAME', _unset), - getattr(vmod, 'CEPH_RELEASE_TYPE', _unset), - )) + vmod = zmod = None + if vmod is None: + # fallback to earlier location + try: + vmod = importlib.import_module('_version') + except ImportError: + pass + if zmod is None: + # fallback to outer package, for zip import module + try: + zmod = importlib.import_module('_cephadmmeta') + except ImportError: + zmod = None + + if not ctx.verbose: + if vmod is None: + print('cephadm version UNKNOWN') + return 1 + _unset = '<UNSET>' + print( + 'cephadm version {0} ({1}) {2} ({3})'.format( + getattr(vmod, 'CEPH_GIT_NICE_VER', _unset), + getattr(vmod, 'CEPH_GIT_VER', _unset), + getattr(vmod, 'CEPH_RELEASE_NAME', _unset), + getattr(vmod, 'CEPH_RELEASE_TYPE', _unset), + ) + ) + return 0 + + out: Dict[str, Any] = {'name': 'cephadm'} + ceph_vars = [ + 'CEPH_GIT_NICE_VER', + 'CEPH_GIT_VER', + 'CEPH_RELEASE_NAME', + 'CEPH_RELEASE_TYPE', + ] + for var in ceph_vars: + value = getattr(vmod, var, None) + if value is not None: + out[var.lower()] = value + + loader = getattr(zmod, '__loader__', None) + if loader and isinstance(loader, zipimport.zipimporter): + try: + deps_info = json.loads(loader.get_data('_cephadmmeta/deps.json')) + out['bundled_packages'] = deps_info + except OSError: + pass + files = getattr(loader, '_files', {}) + out['zip_root_entries'] = sorted( + {p.split('/')[0] for p in files.keys()} + ) + + json.dump(out, sys.stdout, indent=2) + print() return 0 + ################################## @@ -5488,14 +4216,7 @@ def _pull_image(ctx, image, insecure=False): 'Digest did not match, expected', ] - cmd = [ctx.container_engine.path, 'pull', image] - if isinstance(ctx.container_engine, Podman): - if insecure: - cmd.append('--tls-verify=false') - - if os.path.exists('/etc/ceph/podman-auth.json'): - cmd.append('--authfile=/etc/ceph/podman-auth.json') - cmd_str = ' '.join(cmd) + cmd = pull_command(ctx, image, insecure=insecure) for sleep_secs in [1, 4, 25]: out, err, ret = call(ctx, cmd, verbosity=CallVerbosity.QUIET_UNLESS_ERROR) @@ -5505,6 +4226,7 @@ def _pull_image(ctx, image, insecure=False): if 'unauthorized' in err: raise UnauthorizedRegistryError() + cmd_str = ' '.join(cmd) if not any(pattern in err for pattern in ignorelist): raise Error('Failed command: %s' % cmd_str) @@ -5535,33 +4257,6 @@ def command_inspect_image(ctx): return 0 -def normalize_image_digest(digest: str) -> str: - """ - Normal case: - >>> normalize_image_digest('ceph/ceph', 'docker.io') - 'docker.io/ceph/ceph' - - No change: - >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io') - 'quay.ceph.io/ceph/ceph' - - >>> normalize_image_digest('docker.io/ubuntu', 'docker.io') - 'docker.io/ubuntu' - - >>> normalize_image_digest('localhost/ceph', 'docker.io') - 'localhost/ceph' - """ - known_shortnames = [ - 'ceph/ceph', - 'ceph/daemon', - 'ceph/daemon-base', - ] - for image in known_shortnames: - if digest.startswith(image): - return f'{DEFAULT_REGISTRY}/{digest}' - return digest - - def get_image_info_from_inspect(out, image): # type: (str, str) -> Dict[str, Union[str,List[str]]] image_id, digests = out.split(',', 1) @@ -5577,131 +4272,6 @@ def get_image_info_from_inspect(out, image): ################################## -def check_subnet(subnets: str) -> Tuple[int, List[int], str]: - """Determine whether the given string is a valid subnet - - :param subnets: subnet string, a single definition or comma separated list of CIDR subnets - :returns: return code, IP version list of the subnets and msg describing any errors validation errors - """ - - rc = 0 - versions = set() - errors = [] - subnet_list = subnets.split(',') - for subnet in subnet_list: - # ensure the format of the string is as expected address/netmask - subnet = subnet.strip() - if not re.search(r'\/\d+$', subnet): - rc = 1 - errors.append(f'{subnet} is not in CIDR format (address/netmask)') - continue - try: - v = ipaddress.ip_network(subnet).version - versions.add(v) - except ValueError as e: - rc = 1 - errors.append(f'{subnet} invalid: {str(e)}') - - return rc, list(versions), ', '.join(errors) - - -def unwrap_ipv6(address): - # type: (str) -> str - if address.startswith('[') and address.endswith(']'): - return address[1: -1] - return address - - -def wrap_ipv6(address): - # type: (str) -> str - - # We cannot assume it's already wrapped or even an IPv6 address if - # it's already wrapped it'll not pass (like if it's a hostname) and trigger - # the ValueError - try: - if ipaddress.ip_address(address).version == 6: - return f'[{address}]' - except ValueError: - pass - - return address - - -def is_ipv6(address): - # type: (str) -> bool - address = unwrap_ipv6(address) - try: - return ipaddress.ip_address(address).version == 6 - except ValueError: - logger.warning('Address: {} is not a valid IP address'.format(address)) - return False - - -def ip_in_subnets(ip_addr: str, subnets: str) -> bool: - """Determine if the ip_addr belongs to any of the subnets list.""" - subnet_list = [x.strip() for x in subnets.split(',')] - for subnet in subnet_list: - ip_address = unwrap_ipv6(ip_addr) if is_ipv6(ip_addr) else ip_addr - if ipaddress.ip_address(ip_address) in ipaddress.ip_network(subnet): - return True - return False - - -def parse_mon_addrv(addrv_arg: str) -> List[EndPoint]: - """Parse mon-addrv param into a list of mon end points.""" - r = re.compile(r':(\d+)$') - addrv_args = [] - addr_arg = addrv_arg - if addr_arg[0] != '[' or addr_arg[-1] != ']': - raise Error(f'--mon-addrv value {addr_arg} must use square brackets') - - for addr in addr_arg[1: -1].split(','): - hasport = r.findall(addr) - if not hasport: - raise Error(f'--mon-addrv value {addr_arg} must include port number') - port_str = hasport[0] - addr = re.sub(r'^v\d+:', '', addr) # strip off v1: or v2: prefix - base_ip = addr[0:-(len(port_str)) - 1] - addrv_args.append(EndPoint(base_ip, int(port_str))) - - return addrv_args - - -def parse_mon_ip(mon_ip: str) -> List[EndPoint]: - """Parse mon-ip param into a list of mon end points.""" - r = re.compile(r':(\d+)$') - addrv_args = [] - hasport = r.findall(mon_ip) - if hasport: - port_str = hasport[0] - base_ip = mon_ip[0:-(len(port_str)) - 1] - addrv_args.append(EndPoint(base_ip, int(port_str))) - else: - # No port provided: use fixed ports for ceph monitor - addrv_args.append(EndPoint(mon_ip, 3300)) - addrv_args.append(EndPoint(mon_ip, 6789)) - - return addrv_args - - -def build_addrv_params(addrv: List[EndPoint]) -> str: - """Convert mon end-points (ip:port) into the format: [v[1|2]:ip:port1]""" - if len(addrv) > 2: - raise Error('Detected a local mon-addrv list with more than 2 entries.') - port_to_ver: Dict[int, str] = {6789: 'v1', 3300: 'v2'} - addr_arg_list: List[str] = [] - for ep in addrv: - if ep.port in port_to_ver: - ver = port_to_ver[ep.port] - else: - ver = 'v2' # default mon protocol version if port is not provided - logger.warning(f'Using msgr2 protocol for unrecognized port {ep}') - addr_arg_list.append(f'{ver}:{ep.ip}:{ep.port}') - - addr_arg = '[{0}]'.format(','.join(addr_arg_list)) - return addr_arg - - def get_public_net_from_cfg(ctx: CephadmContext) -> Optional[str]: """Get mon public network from configuration file.""" cp = read_config(ctx.config) @@ -5920,8 +4490,9 @@ def prepare_create_mon( monmap_path: str ) -> Tuple[str, str]: logger.info('Creating mon...') - create_daemon_dirs(ctx, fsid, 'mon', mon_id, uid, gid) - mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', mon_id) + ident = DaemonIdentity(fsid, 'mon', mon_id) + create_daemon_dirs(ctx, ident, uid, gid) + mon_dir = ident.data_dir(ctx.data_dir) log_dir = get_log_dir(fsid, ctx.log_dir) out = CephContainer( ctx, @@ -5934,7 +4505,7 @@ def prepare_create_mon( '-c', '/dev/null', '--monmap', '/tmp/monmap', '--keyring', '/tmp/keyring', - ] + get_daemon_args(ctx, fsid, 'mon', mon_id), + ] + Ceph.create(ctx, ident).get_daemon_args(), volume_mounts={ log_dir: '/var/log/ceph:z', mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id), @@ -5951,10 +4522,10 @@ def create_mon( uid: int, gid: int, fsid: str, mon_id: str ) -> None: - mon_c = get_container(ctx, fsid, 'mon', mon_id) + ident = DaemonIdentity(fsid, 'mon', mon_id) + mon_c = get_container(ctx, ident) ctx.meta_properties = {'service_name': 'mon'} - deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid, - config=None, keyring=None) + deploy_daemon(ctx, ident, mon_c, uid, gid) def wait_for_mon( @@ -5997,14 +4568,23 @@ def create_mgr( ) -> None: logger.info('Creating mgr...') mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key) - mgr_c = get_container(ctx, fsid, 'mgr', mgr_id) + ident = DaemonIdentity(fsid, 'mgr', mgr_id) + mgr_c = get_container(ctx, ident) # Note:the default port used by the Prometheus node exporter is opened in fw ctx.meta_properties = {'service_name': 'mgr'} endpoints = [EndPoint('0.0.0.0', 9283), EndPoint('0.0.0.0', 8765)] if not ctx.skip_monitoring_stack: endpoints.append(EndPoint('0.0.0.0', 8443)) - deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid, - config=config, keyring=mgr_keyring, endpoints=endpoints) + deploy_daemon( + ctx, + ident, + mgr_c, + uid, + gid, + config=config, + keyring=mgr_keyring, + endpoints=endpoints, + ) # wait for the service to become available logger.info('Waiting for mgr to start...') @@ -6429,26 +5009,26 @@ def rollback(func: FuncT) -> FuncT: raise except (KeyboardInterrupt, Exception) as e: logger.error(f'{type(e).__name__}: {e}') - if ctx.cleanup_on_failure: + if ctx.no_cleanup_on_failure: logger.info('\n\n' '\t***************\n' - '\tCephadm hit an issue during cluster installation. Current cluster files will be deleted automatically,\n' - '\tto disable this behaviour you can pass the --no-cleanup-on-failure flag. In case of any previous\n' - '\tbroken installation user must use the following command to completely delete the broken cluster:\n\n' - '\t> cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n' + '\tCephadm hit an issue during cluster installation. Current cluster files will NOT BE DELETED automatically. To change\n' + '\tthis behaviour do not pass the --no-cleanup-on-failure flag. To remove this broken cluster manually please run:\n\n' + f'\t > cephadm rm-cluster --force --fsid {ctx.fsid}\n\n' + '\tin case of any previous broken installation, users must use the rm-cluster command to delete the broken cluster:\n\n' + '\t > cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n' '\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n' '\t***************\n\n') - _rm_cluster(ctx, keep_logs=False, zap_osds=False) else: logger.info('\n\n' '\t***************\n' - '\tCephadm hit an issue during cluster installation. Current cluster files will NOT BE DELETED automatically to change\n' - '\tthis behaviour you can pass the --cleanup-on-failure. To remove this broken cluster manually please run:\n\n' - f'\t > cephadm rm-cluster --force --fsid {ctx.fsid}\n\n' - '\tin case of any previous broken installation user must use the rm-cluster command to delete the broken cluster:\n\n' - '\t > cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n' + '\tCephadm hit an issue during cluster installation. Current cluster files will be deleted automatically.\n' + '\tTo disable this behaviour you can pass the --no-cleanup-on-failure flag. In case of any previous\n' + '\tbroken installation, users must use the following command to completely delete the broken cluster:\n\n' + '\t> cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n' '\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n' '\t***************\n\n') + _rm_cluster(ctx, keep_logs=False, zap_osds=False) raise return cast(FuncT, _rollback) @@ -6620,6 +5200,16 @@ def command_bootstrap(ctx): '-i', '/var/lib/ceph/user.conf'], {tmp.name: '/var/lib/ceph/user.conf:z'}) + if getattr(ctx, 'log_dest', None): + ldkey = 'mgr/cephadm/cephadm_log_destination' + cp = read_config(ctx.config) + if cp.has_option('mgr', ldkey): + logger.info('The cephadm log destination is set by the config file, ignoring cli option') + else: + value = ','.join(sorted(getattr(ctx, 'log_dest'))) + logger.info('Setting cephadm log destination to match logging for cephadm boostrap: %s', value) + cli(['config', 'set', 'mgr', ldkey, value, '--force']) + # wait for mgr to restart (after enabling a module) def wait_for_mgr_restart() -> None: # first get latest mgrmap epoch from the mon. try newer 'mgr @@ -6717,6 +5307,7 @@ def command_bootstrap(ctx): def command_registry_login(ctx: CephadmContext) -> int: + logger.info('Logging into custom registry.') if ctx.registry_json: logger.info('Pulling custom registry login info from %s.' % ctx.registry_json) d = get_parm(ctx.registry_json) @@ -6742,91 +5333,47 @@ def command_registry_login(ctx: CephadmContext) -> int: return 0 -def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[str], password: Optional[str]) -> None: - logger.info('Logging into custom registry.') - try: - engine = ctx.container_engine - cmd = [engine.path, 'login', - '-u', username, '-p', password, - url] - if isinstance(engine, Podman): - cmd.append('--authfile=/etc/ceph/podman-auth.json') - out, _, _ = call_throws(ctx, cmd) - if isinstance(engine, Podman): - os.chmod('/etc/ceph/podman-auth.json', DEFAULT_MODE) - except Exception: - raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username)) - ################################## -def extract_uid_gid_monitoring(ctx, daemon_type): - # type: (CephadmContext, str) -> Tuple[int, int] - - if daemon_type == 'prometheus': - uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus') - elif daemon_type == 'node-exporter': - uid, gid = 65534, 65534 - elif daemon_type == 'grafana': - uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana') - elif daemon_type == 'loki': - uid, gid = extract_uid_gid(ctx, file_path='/etc/loki') - elif daemon_type == 'promtail': - uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail') - elif daemon_type == 'alertmanager': - uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus']) - else: - raise Error('{} not implemented yet'.format(daemon_type)) - return uid, gid - - -def get_deployment_container(ctx: CephadmContext, - fsid: str, daemon_type: str, daemon_id: Union[int, str], - privileged: bool = False, - ptrace: bool = False, - container_args: Optional[List[str]] = None) -> 'CephContainer': - # wrapper for get_container specifically for containers made during the `cephadm deploy` - # command. Adds some extra things such as extra container args and custom config files - c = get_container(ctx, fsid, daemon_type, daemon_id, privileged, ptrace, container_args) +def to_deployment_container( + ctx: CephadmContext, ctr: CephContainer +) -> CephContainer: + """Given a standard ceph container instance return a CephContainer + prepared for a deployment as a daemon, having the extra args and + custom configurations added. + NOTE: The `ctr` object is mutated before being returned. + """ if 'extra_container_args' in ctx and ctx.extra_container_args: - c.container_args.extend(ctx.extra_container_args) + ctr.container_args.extend(ctx.extra_container_args) if 'extra_entrypoint_args' in ctx and ctx.extra_entrypoint_args: - c.args.extend(ctx.extra_entrypoint_args) + ctr.args.extend(ctx.extra_entrypoint_args) ccfiles = fetch_custom_config_files(ctx) if ccfiles: mandatory_keys = ['mount_path', 'content'] for conf in ccfiles: if all(k in conf for k in mandatory_keys): mount_path = conf['mount_path'] + assert ctr.identity file_path = os.path.join( ctx.data_dir, - fsid, + ctr.identity.fsid, 'custom_config_files', - f'{daemon_type}.{daemon_id}', + ctr.identity.daemon_name, os.path.basename(mount_path) ) - c.volume_mounts[file_path] = mount_path - return c + ctr.volume_mounts[file_path] = mount_path + return ctr -def get_deployment_init_containers( - ctx: CephadmContext, - primary_container: 'CephContainer', -) -> List['InitContainer']: - init_containers: List[Dict[str, Any]] = getattr(ctx, 'init_containers', []) - return [ - InitContainer.from_primary_and_opts(ctx, primary_container, ic_opts) - for ic_opts in init_containers - ] - - -def get_deployment_type(ctx: CephadmContext, daemon_type: str, daemon_id: str) -> DeploymentType: +def get_deployment_type( + ctx: CephadmContext, ident: 'DaemonIdentity', +) -> DeploymentType: deployment_type: DeploymentType = DeploymentType.DEFAULT if ctx.reconfig: deployment_type = DeploymentType.RECONFIG - unit_name = get_unit_name(ctx.fsid, daemon_type, daemon_id) - (_, state, _) = check_unit(ctx, unit_name) - if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ctx.fsid, daemon_type, daemon_id, 'bash')): + (_, state, _) = check_unit(ctx, ident.unit_name) + if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ident, 'bash')): # if reconfig was set, that takes priority over redeploy. If # this is considered a fresh deployment at this stage, # mark it as a redeploy to avoid port checking @@ -6845,20 +5392,6 @@ def command_deploy(ctx): _common_deploy(ctx) -def read_configuration_source(ctx: CephadmContext) -> Dict[str, Any]: - """Read a JSON configuration based on the `ctx.source` value.""" - source = '-' - if 'source' in ctx and ctx.source: - source = ctx.source - if source == '-': - config_data = json.load(sys.stdin) - else: - with open(source, 'rb') as fh: - config_data = json.load(fh) - logger.debug('Loaded deploy configuration: %r', config_data) - return config_data - - def apply_deploy_config_to_ctx( config_data: Dict[str, Any], ctx: CephadmContext, @@ -6897,173 +5430,75 @@ def command_deploy_from(ctx: CephadmContext) -> None: configuration parameters from an input JSON configuration file. """ config_data = read_configuration_source(ctx) + logger.debug('Loaded deploy configuration: %r', config_data) apply_deploy_config_to_ctx(config_data, ctx) _common_deploy(ctx) def _common_deploy(ctx: CephadmContext) -> None: - daemon_type, daemon_id = ctx.name.split('.', 1) - if daemon_type not in get_supported_daemons(): - raise Error('daemon type %s not recognized' % daemon_type) + ident = DaemonIdentity.from_context(ctx) + if ident.daemon_type not in get_supported_daemons(): + raise Error('daemon type %s not recognized' % ident.daemon_type) lock = FileLock(ctx, ctx.fsid) lock.acquire() - deployment_type = get_deployment_type(ctx, daemon_type, daemon_id) + deployment_type = get_deployment_type(ctx, ident) # Migrate sysctl conf files from /usr/lib to /etc migrate_sysctl_dir(ctx, ctx.fsid) # Get and check ports explicitly required to be opened - endpoints = fetch_tcp_ports(ctx) - _dispatch_deploy(ctx, daemon_type, daemon_id, endpoints, deployment_type) - + endpoints = fetch_endpoints(ctx) -def _dispatch_deploy( - ctx: CephadmContext, - daemon_type: str, - daemon_id: str, - daemon_endpoints: List[EndPoint], - deployment_type: DeploymentType, -) -> None: - if daemon_type in Ceph.daemons: - config, keyring = get_config_and_keyring(ctx) - uid, gid = extract_uid_gid(ctx) - make_var_run(ctx, ctx.fsid, uid, gid) - - config_json = fetch_configs(ctx) - - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id, - ptrace=ctx.allow_ptrace) - - if daemon_type == 'mon' and config_json is not None: - if 'crush_location' in config_json: - c_loc = config_json['crush_location'] - # was originally "c.args.extend(['--set-crush-location', c_loc])" - # but that doesn't seem to persist in the object after it's passed - # in further function calls - c.args = c.args + ['--set-crush-location', c_loc] - - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - config=config, keyring=keyring, - osd_fsid=ctx.osd_fsid, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - - elif daemon_type in Monitoring.components: - # monitoring daemon - prometheus, grafana, alertmanager, node-exporter - # Default Checks - # make sure provided config-json is sufficient - config = fetch_configs(ctx) # type: ignore - required_files = Monitoring.components[daemon_type].get('config-json-files', list()) - required_args = Monitoring.components[daemon_type].get('config-json-args', list()) - if required_files: - if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore - raise Error('{} deployment requires config-json which must ' - 'contain file content for {}'.format(daemon_type.capitalize(), ', '.join(required_files))) - if required_args: - if not config or not all(c in config.keys() for c in required_args): # type: ignore - raise Error('{} deployment requires config-json which must ' - 'contain arg for {}'.format(daemon_type.capitalize(), ', '.join(required_args))) - - uid, gid = extract_uid_gid_monitoring(ctx, daemon_type) - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - - elif daemon_type == NFSGanesha.daemon_type: - # only check ports if this is a fresh deployment - if deployment_type == DeploymentType.DEFAULT and not daemon_endpoints: - nfs_ports = list(NFSGanesha.port_map.values()) - daemon_endpoints = [EndPoint('0.0.0.0', p) for p in nfs_ports] - - config, keyring = get_config_and_keyring(ctx) - # TODO: extract ganesha uid/gid (997, 994) ? - uid, gid = extract_uid_gid(ctx) - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - config=config, keyring=keyring, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - - elif daemon_type == CephIscsi.daemon_type: - config, keyring = get_config_and_keyring(ctx) - uid, gid = extract_uid_gid(ctx) - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - config=config, keyring=keyring, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - elif daemon_type == CephNvmeof.daemon_type: - config, keyring = get_config_and_keyring(ctx) - uid, gid = 167, 167 # TODO: need to get properly the uid/gid - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - config=config, keyring=keyring, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - elif daemon_type in Tracing.components: - uid, gid = 65534, 65534 - c = get_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - elif daemon_type == HAproxy.daemon_type: - haproxy = HAproxy.init(ctx, ctx.fsid, daemon_id) - uid, gid = haproxy.extract_uid_gid_haproxy() - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - - elif daemon_type == Keepalived.daemon_type: - keepalived = Keepalived.init(ctx, ctx.fsid, daemon_id) - uid, gid = keepalived.extract_uid_gid_keepalived() - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - - elif daemon_type == CustomContainer.daemon_type: - cc = CustomContainer.init(ctx, ctx.fsid, daemon_id) - # only check ports if this is a fresh deployment - if deployment_type == DeploymentType.DEFAULT: - daemon_endpoints.extend([EndPoint('0.0.0.0', p) for p in cc.ports]) - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id, - privileged=cc.privileged, - ptrace=ctx.allow_ptrace) - ics = get_deployment_init_containers( - ctx, - c, - ) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, - uid=cc.uid, gid=cc.gid, config=None, - keyring=None, - deployment_type=deployment_type, - endpoints=daemon_endpoints, - init_containers=ics) - - elif daemon_type == CephadmAgent.daemon_type: + if ident.daemon_type == CephadmAgent.daemon_type: # get current user gid and uid uid = os.getuid() gid = os.getgid() - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None, - uid, gid, - deployment_type=deployment_type, - endpoints=daemon_endpoints) - - elif daemon_type == SNMPGateway.daemon_type: - sc = SNMPGateway.init(ctx, ctx.fsid, daemon_id) - c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id) - deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, - sc.uid, sc.gid, - deployment_type=deployment_type, - endpoints=daemon_endpoints) + deploy_daemon( + ctx, + ident, + None, + uid, + gid, + deployment_type=deployment_type, + endpoints=endpoints, + ) else: - raise Error('daemon type {} not implemented in command_deploy function' - .format(daemon_type)) + try: + _deploy_daemon_container(ctx, ident, endpoints, deployment_type) + except UnexpectedDaemonTypeError: + raise Error('daemon type {} not implemented in command_deploy function' + .format(ident.daemon_type)) + + +def _deploy_daemon_container( + ctx: CephadmContext, + ident: 'DaemonIdentity', + daemon_endpoints: List[EndPoint], + deployment_type: DeploymentType, +) -> None: + daemon = daemon_form_create(ctx, ident) + assert isinstance(daemon, ContainerDaemonForm) + daemon.customize_container_endpoints(daemon_endpoints, deployment_type) + ctr = daemon.container(ctx) + ics = daemon.init_containers(ctx) + config, keyring = daemon.config_and_keyring(ctx) + uid, gid = daemon.uid_gid(ctx) + deploy_daemon( + ctx, + ident, + ctr, + uid, + gid, + config=config, + keyring=keyring, + deployment_type=deployment_type, + endpoints=daemon_endpoints, + osd_fsid=daemon.osd_fsid, + init_containers=ics, + ) ################################## @@ -7071,8 +5506,7 @@ def _dispatch_deploy( @infer_image def command_run(ctx): # type: (CephadmContext) -> int - (daemon_type, daemon_id) = ctx.name.split('.', 1) - c = get_container(ctx, ctx.fsid, daemon_type, daemon_id) + c = get_container(ctx, DaemonIdentity.from_context(ctx)) command = c.run_cmd() return call_timeout(ctx, command, ctx.timeout) @@ -7100,7 +5534,7 @@ def command_shell(ctx): daemon_type = 'osd' # get the most mounts daemon_id = None - if ctx.fsid and daemon_type in Ceph.daemons: + if ctx.fsid and daemon_type in ceph_daemons(): make_log_dir(ctx, ctx.fsid) if daemon_id and not ctx.fsid: @@ -7118,9 +5552,15 @@ def command_shell(ctx): ctx.keyring = CEPH_DEFAULT_KEYRING container_args: List[str] = ['-i'] - mounts = get_container_mounts(ctx, ctx.fsid, daemon_type, daemon_id, - no_config=True if ctx.config else False) - binds = get_container_binds(ctx, ctx.fsid, daemon_type, daemon_id) + if ctx.fsid and daemon_id: + ident = DaemonIdentity(ctx.fsid, daemon_type, daemon_id) + mounts = get_container_mounts( + ctx, ident, no_config=bool(ctx.config), + ) + binds = get_container_binds(ctx, ident) + else: + mounts = get_container_mounts_for_type(ctx, ctx.fsid, daemon_type) + binds = [] if ctx.config: mounts[pathify(ctx.config)] = '/etc/ceph/ceph.conf:z' if ctx.keyring: @@ -7174,6 +5614,10 @@ def command_shell(ctx): privileged=True) command = c.shell_cmd(command) + if ctx.dry_run: + print(' '.join(shlex.quote(arg) for arg in command)) + return 0 + return call_timeout(ctx, command, ctx.timeout) ################################## @@ -7225,7 +5669,7 @@ def command_ceph_volume(ctx): lock.acquire() (uid, gid) = (0, 0) # ceph-volume runs as root - mounts = get_container_mounts(ctx, ctx.fsid, 'osd', None) + mounts = get_container_mounts_for_type(ctx, ctx.fsid, 'osd') tmp_config = None tmp_keyring = None @@ -7256,6 +5700,24 @@ def command_ceph_volume(ctx): ################################## +def command_unit_install(ctx): + # type: (CephadmContext) -> int + if not ctx.fsid: + raise Error('must pass --fsid to specify cluster') + + fsid = ctx.fsid + install_base_units(ctx, fsid) + unit = get_unit_file(ctx, fsid) + unit_file = 'ceph-%s@.service' % (fsid) + with open(ctx.unit_dir + '/' + unit_file + '.new', 'w') as f: + f.write(unit) + os.rename(ctx.unit_dir + '/' + unit_file + '.new', + ctx.unit_dir + '/' + unit_file) + call_throws(ctx, ['systemctl', 'daemon-reload']) + + return 0 + + @infer_fsid def command_unit(ctx): # type: (CephadmContext) -> int @@ -7296,97 +5758,6 @@ def command_logs(ctx): ################################## -def list_networks(ctx): - # type: (CephadmContext) -> Dict[str,Dict[str, Set[str]]] - - # sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag, - # so we'll need to use a regex to parse 'ip' command output. - # - # out, _, _ = call_throws(['ip', '-j', 'route', 'ls']) - # j = json.loads(out) - # for x in j: - res = _list_ipv4_networks(ctx) - res.update(_list_ipv6_networks(ctx)) - return res - - -def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]: - execstr: Optional[str] = find_executable('ip') - if not execstr: - raise FileNotFoundError("unable to find 'ip' command") - out, _, _ = call_throws(ctx, [execstr, 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) - return _parse_ipv4_route(out) - - -def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]: - r = {} # type: Dict[str, Dict[str, Set[str]]] - p = re.compile(r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)') - for line in out.splitlines(): - m = p.findall(line) - if not m: - continue - net = m[0][0] - if '/' not in net: # aggregate /32 mask for single host sub-networks - net += '/32' - iface = m[0][1] - ip = m[0][4] - if net not in r: - r[net] = {} - if iface not in r[net]: - r[net][iface] = set() - r[net][iface].add(ip) - return r - - -def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]: - execstr: Optional[str] = find_executable('ip') - if not execstr: - raise FileNotFoundError("unable to find 'ip' command") - routes, _, _ = call_throws(ctx, [execstr, '-6', 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) - ips, _, _ = call_throws(ctx, [execstr, '-6', 'addr', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) - return _parse_ipv6_route(routes, ips) - - -def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]: - r = {} # type: Dict[str, Dict[str, Set[str]]] - route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$') - ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$') - iface_p = re.compile(r'^(\d+): (\S+): (.*)$') - for line in routes.splitlines(): - m = route_p.findall(line) - if not m or m[0][0].lower() == 'default': - continue - net = m[0][0] - if '/' not in net: # aggregate /128 mask for single host sub-networks - net += '/128' - iface = m[0][1] - if iface == 'lo': # skip loopback devices - continue - if net not in r: - r[net] = {} - if iface not in r[net]: - r[net][iface] = set() - - iface = None - for line in ips.splitlines(): - m = ip_p.findall(line) - if not m: - m = iface_p.findall(line) - if m: - # drop @... suffix, if present - iface = m[0][1].split('@')[0] - continue - ip = m[0][0] - # find the network it belongs to - net = [n for n in r.keys() - if ipaddress.ip_address(ip) in ipaddress.ip_network(n)] - if net and iface in r[net[0]]: - assert iface - r[net[0]][iface].add(ip) - - return r - - def command_list_networks(ctx): # type: (CephadmContext) -> None r = list_networks(ctx) @@ -7406,27 +5777,6 @@ def command_ls(ctx): print(json.dumps(ls, indent=4)) -def with_units_to_int(v: str) -> int: - if v.endswith('iB'): - v = v[:-2] - elif v.endswith('B'): - v = v[:-1] - mult = 1 - if v[-1].upper() == 'K': - mult = 1024 - v = v[:-1] - elif v[-1].upper() == 'M': - mult = 1024 * 1024 - v = v[:-1] - elif v[-1].upper() == 'G': - mult = 1024 * 1024 * 1024 - v = v[:-1] - elif v[-1].upper() == 'T': - mult = 1024 * 1024 * 1024 * 1024 - v = v[:-1] - return int(float(v) * mult) - - def list_daemons(ctx, detail=True, legacy_dir=None): # type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]] host_version: Optional[str] = None @@ -7553,7 +5903,7 @@ def list_daemons(ctx, detail=True, legacy_dir=None): if daemon_type == CephNvmeof.daemon_type: version = CephNvmeof.get_version(ctx, container_id) elif not version: - if daemon_type in Ceph.daemons: + if daemon_type in ceph_daemons(): out, err, code = call(ctx, [container_path, 'exec', container_id, 'ceph', '-v'], @@ -7584,7 +5934,8 @@ def list_daemons(ctx, detail=True, legacy_dir=None): 'haproxy', '-v'], verbosity=CallVerbosity.QUIET) if not code and \ - out.startswith('HA-Proxy version '): + out.startswith('HA-Proxy version ') or \ + out.startswith('HAProxy version '): version = out.split(' ')[2] seen_versions[image_id] = version elif daemon_type == 'keepalived': @@ -7693,7 +6044,9 @@ def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None): def get_container_stats(ctx: CephadmContext, container_path: str, fsid: str, daemon_type: str, daemon_id: str) -> Tuple[str, str, int]: - c = CephContainer.for_daemon(ctx, fsid, daemon_type, daemon_id, 'bash') + c = CephContainer.for_daemon( + ctx, DaemonIdentity(fsid, daemon_type, daemon_id), 'bash' + ) out, err, code = '', '', -1 for name in (c.cname, c.old_cname): cmd = [ @@ -7739,7 +6092,7 @@ def command_adopt(ctx): lock.acquire() # call correct adoption - if daemon_type in Ceph.daemons: + if daemon_type in ceph_daemons(): command_adopt_ceph(ctx, daemon_type, daemon_id, fsid) elif daemon_type == 'prometheus': command_adopt_prometheus(ctx, daemon_id, fsid) @@ -7905,8 +6258,12 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid): # data logger.info('Moving data...') - data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, - uid=uid, gid=gid) + data_dir_dst = make_data_dir( + ctx, + DaemonIdentity(fsid, daemon_type, daemon_id), + uid=uid, + gid=gid, + ) move_files(ctx, glob(os.path.join(data_dir_src, '*')), data_dir_dst, uid=uid, gid=gid) @@ -7974,18 +6331,25 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid): logger.info('Creating new units...') make_var_run(ctx, fsid, uid, gid) - c = get_container(ctx, fsid, daemon_type, daemon_id) - deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, - enable=True, # unconditionally enable the new unit - start=(state == 'running' or ctx.force_start), - osd_fsid=osd_fsid) - update_firewalld(ctx, daemon_type) + ident = DaemonIdentity(fsid, daemon_type, daemon_id) + c = get_container(ctx, ident) + deploy_daemon_units( + ctx, + ident, + uid, + gid, + c, + enable=True, # unconditionally enable the new unit + start=(state == 'running' or ctx.force_start), + osd_fsid=osd_fsid, + ) + update_firewalld(ctx, daemon_form_create(ctx, ident)) def command_adopt_prometheus(ctx, daemon_id, fsid): # type: (CephadmContext, str, str) -> None daemon_type = 'prometheus' - (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type) + (uid, gid) = Monitoring.extract_uid_gid(ctx, daemon_type) # should try to set the ports we know cephadm defaults # to for these services in the firewall. ports = Monitoring.port_map['prometheus'] @@ -7993,8 +6357,12 @@ def command_adopt_prometheus(ctx, daemon_id, fsid): _stop_and_disable(ctx, 'prometheus') - data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, - uid=uid, gid=gid) + data_dir_dst = make_data_dir( + ctx, + DaemonIdentity(fsid, daemon_type, daemon_id), + uid=uid, + gid=gid, + ) # config config_src = '/etc/prometheus/prometheus.yml' @@ -8010,17 +6378,25 @@ def command_adopt_prometheus(ctx, daemon_id, fsid): copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid) make_var_run(ctx, fsid, uid, gid) - c = get_container(ctx, fsid, daemon_type, daemon_id) - deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints) - update_firewalld(ctx, daemon_type) + ident = DaemonIdentity(fsid, daemon_type, daemon_id) + c = get_container(ctx, ident) + deploy_daemon( + ctx, + ident, + c, + uid, + gid, + deployment_type=DeploymentType.REDEPLOY, + endpoints=endpoints, + ) + update_firewalld(ctx, daemon_form_create(ctx, ident)) def command_adopt_grafana(ctx, daemon_id, fsid): # type: (CephadmContext, str, str) -> None daemon_type = 'grafana' - (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type) + (uid, gid) = Monitoring.extract_uid_gid(ctx, daemon_type) # should try to set the ports we know cephadm defaults # to for these services in the firewall. ports = Monitoring.port_map['grafana'] @@ -8028,8 +6404,13 @@ def command_adopt_grafana(ctx, daemon_id, fsid): _stop_and_disable(ctx, 'grafana-server') - data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, - uid=uid, gid=gid) + ident = DaemonIdentity(fsid, daemon_type, daemon_id) + data_dir_dst = make_data_dir( + ctx, + ident, + uid=uid, + gid=gid, + ) # config config_src = '/etc/grafana/grafana.ini' @@ -8069,17 +6450,24 @@ def command_adopt_grafana(ctx, daemon_id, fsid): copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid) make_var_run(ctx, fsid, uid, gid) - c = get_container(ctx, fsid, daemon_type, daemon_id) - deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints) - update_firewalld(ctx, daemon_type) + c = get_container(ctx, ident) + deploy_daemon( + ctx, + ident, + c, + uid, + gid, + deployment_type=DeploymentType.REDEPLOY, + endpoints=endpoints, + ) + update_firewalld(ctx, daemon_form_create(ctx, ident)) def command_adopt_alertmanager(ctx, daemon_id, fsid): # type: (CephadmContext, str, str) -> None daemon_type = 'alertmanager' - (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type) + (uid, gid) = Monitoring.extract_uid_gid(ctx, daemon_type) # should try to set the ports we know cephadm defaults # to for these services in the firewall. ports = Monitoring.port_map['alertmanager'] @@ -8087,8 +6475,13 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid): _stop_and_disable(ctx, 'prometheus-alertmanager') - data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, - uid=uid, gid=gid) + ident = DaemonIdentity(fsid, daemon_type, daemon_id) + data_dir_dst = make_data_dir( + ctx, + ident, + uid=uid, + gid=gid, + ) # config config_src = '/etc/prometheus/alertmanager.yml' @@ -8104,10 +6497,17 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid): copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid) make_var_run(ctx, fsid, uid, gid) - c = get_container(ctx, fsid, daemon_type, daemon_id) - deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, - deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints) - update_firewalld(ctx, daemon_type) + c = get_container(ctx, ident) + deploy_daemon( + ctx, + ident, + c, + uid, + gid, + deployment_type=DeploymentType.REDEPLOY, + endpoints=endpoints, + ) + update_firewalld(ctx, daemon_form_create(ctx, ident)) def _adjust_grafana_ini(filename): @@ -8174,7 +6574,8 @@ def command_rm_daemon(ctx): call(ctx, ['rm', '-rf', rgw_asok_path], verbosity=CallVerbosity.DEBUG) - data_dir = get_data_dir(ctx.fsid, ctx.data_dir, daemon_type, daemon_id) + ident = DaemonIdentity(ctx.fsid, daemon_type, daemon_id) + data_dir = ident.data_dir(ctx.data_dir) if daemon_type in ['mon', 'osd', 'prometheus'] and \ not ctx.force_delete_data: # rename it out of the way -- do not delete @@ -8188,7 +6589,7 @@ def command_rm_daemon(ctx): else: call_throws(ctx, ['rm', '-rf', data_dir]) - endpoints = fetch_tcp_ports(ctx) + endpoints = fetch_endpoints(ctx) ports: List[int] = [e.port for e in endpoints] if ports: try: @@ -8205,7 +6606,9 @@ def command_rm_daemon(ctx): def _zap(ctx: CephadmContext, what: str) -> None: - mounts = get_container_mounts(ctx, ctx.fsid, 'clusterless-ceph-volume', None) + mounts = get_container_mounts_for_type( + ctx, ctx.fsid, 'clusterless-ceph-volume' + ) c = get_ceph_volume_container(ctx, args=['lvm', 'zap', '--destroy', what], volume_mounts=mounts, @@ -8219,7 +6622,9 @@ def _zap_osds(ctx: CephadmContext) -> None: # assume fsid lock already held # list - mounts = get_container_mounts(ctx, ctx.fsid, 'clusterless-ceph-volume', None) + mounts = get_container_mounts_for_type( + ctx, ctx.fsid, 'clusterless-ceph-volume' + ) c = get_ceph_volume_container(ctx, args=['inventory', '--format', 'json'], volume_mounts=mounts, @@ -8291,7 +6696,7 @@ def _rm_cluster(ctx: CephadmContext, keep_logs: bool, zap_osds: bool) -> None: continue if d['style'] != 'cephadm:v1': continue - disable_systemd_service(get_unit_name(ctx.fsid, d['name'])) + disable_systemd_service('ceph-%s@%s' % (ctx.fsid, d['name'])) # cluster units for unit_name in ['ceph-%s.target' % ctx.fsid]: @@ -8422,152 +6827,6 @@ def command_check_host(ctx: CephadmContext) -> None: ################################## -def get_ssh_vars(ssh_user: str) -> Tuple[int, int, str]: - try: - s_pwd = pwd.getpwnam(ssh_user) - except KeyError: - raise Error('Cannot find uid/gid for ssh-user: %s' % (ssh_user)) - - ssh_uid = s_pwd.pw_uid - ssh_gid = s_pwd.pw_gid - ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh') - return ssh_uid, ssh_gid, ssh_dir - - -def authorize_ssh_key(ssh_pub_key: str, ssh_user: str) -> bool: - """Authorize the public key for the provided ssh user""" - - def key_in_file(path: str, key: str) -> bool: - if not os.path.exists(path): - return False - with open(path) as f: - lines = f.readlines() - for line in lines: - if line.strip() == key.strip(): - return True - return False - - logger.info(f'Adding key to {ssh_user}@localhost authorized_keys...') - if ssh_pub_key is None or ssh_pub_key.isspace(): - raise Error('Trying to authorize an empty ssh key') - - ssh_pub_key = ssh_pub_key.strip() - ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user) - if not os.path.exists(ssh_dir): - makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700) - - auth_keys_file = '%s/authorized_keys' % ssh_dir - if key_in_file(auth_keys_file, ssh_pub_key): - logger.info(f'key already in {ssh_user}@localhost authorized_keys...') - return False - - add_newline = False - if os.path.exists(auth_keys_file): - with open(auth_keys_file, 'r') as f: - f.seek(0, os.SEEK_END) - if f.tell() > 0: - f.seek(f.tell() - 1, os.SEEK_SET) # go to last char - if f.read() != '\n': - add_newline = True - - with open(auth_keys_file, 'a') as f: - os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it - os.fchmod(f.fileno(), DEFAULT_MODE) # just in case we created it - if add_newline: - f.write('\n') - f.write(ssh_pub_key + '\n') - - return True - - -def revoke_ssh_key(key: str, ssh_user: str) -> None: - """Revoke the public key authorization for the ssh user""" - ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user) - auth_keys_file = '%s/authorized_keys' % ssh_dir - deleted = False - if os.path.exists(auth_keys_file): - with open(auth_keys_file, 'r') as f: - lines = f.readlines() - _, filename = tempfile.mkstemp() - with open(filename, 'w') as f: - os.fchown(f.fileno(), ssh_uid, ssh_gid) - os.fchmod(f.fileno(), DEFAULT_MODE) # secure access to the keys file - for line in lines: - if line.strip() == key.strip(): - deleted = True - else: - f.write(line) - - if deleted: - shutil.move(filename, auth_keys_file) - else: - logger.warning('Cannot find the ssh key to be deleted') - - -def check_ssh_connectivity(ctx: CephadmContext) -> None: - - def cmd_is_available(cmd: str) -> bool: - if shutil.which(cmd) is None: - logger.warning(f'Command not found: {cmd}') - return False - return True - - if not cmd_is_available('ssh') or not cmd_is_available('ssh-keygen'): - logger.warning('Cannot check ssh connectivity. Skipping...') - return - - ssh_priv_key_path = '' - ssh_pub_key_path = '' - ssh_signed_cert_path = '' - if ctx.ssh_private_key and ctx.ssh_public_key: - # let's use the keys provided by the user - ssh_priv_key_path = pathify(ctx.ssh_private_key.name) - ssh_pub_key_path = pathify(ctx.ssh_public_key.name) - elif ctx.ssh_private_key and ctx.ssh_signed_cert: - # CA signed keys use case - ssh_priv_key_path = pathify(ctx.ssh_private_key.name) - ssh_signed_cert_path = pathify(ctx.ssh_signed_cert.name) - else: - # no custom keys, let's generate some random keys just for this check - ssh_priv_key_path = f'/tmp/ssh_key_{uuid.uuid1()}' - ssh_pub_key_path = f'{ssh_priv_key_path}.pub' - ssh_key_gen_cmd = ['ssh-keygen', '-q', '-t', 'rsa', '-N', '', '-C', '', '-f', ssh_priv_key_path] - _, _, code = call(ctx, ssh_key_gen_cmd) - if code != 0: - logger.warning('Cannot generate keys to check ssh connectivity.') - return - - if ssh_signed_cert_path: - logger.info('Verification for CA signed keys authentication not implemented. Skipping ...') - elif ssh_pub_key_path: - logger.info('Verifying ssh connectivity using standard pubkey authentication ...') - with open(ssh_pub_key_path, 'r') as f: - key = f.read().strip() - new_key = authorize_ssh_key(key, ctx.ssh_user) - ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else [] - _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no', - *ssh_cfg_file_arg, '-i', ssh_priv_key_path, - '-o PasswordAuthentication=no', - f'{ctx.ssh_user}@{get_hostname()}', - 'sudo echo']) - - # we only remove the key if it's a new one. In case the user has provided - # some already existing key then we don't alter authorized_keys file - if new_key: - revoke_ssh_key(key, ctx.ssh_user) - - pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else '' - prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else '' - ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else '' - err_msg = f""" -** Please verify your user's ssh configuration and make sure: -- User {ctx.ssh_user} must have passwordless sudo access -{pub_key_msg}{prv_key_msg}{ssh_cfg_msg} -""" - if code != 0: - raise Error(err_msg) - - def command_prepare_host(ctx: CephadmContext) -> None: logger.info('Verifying podman|docker is present...') pkg = None @@ -8631,505 +6890,6 @@ class CustomValidation(argparse.Action): ################################## -def get_distro(): - # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] - distro = None - distro_version = None - distro_codename = None - with open('/etc/os-release', 'r') as f: - for line in f.readlines(): - line = line.strip() - if '=' not in line or line.startswith('#'): - continue - (var, val) = line.split('=', 1) - if val[0] == '"' and val[-1] == '"': - val = val[1:-1] - if var == 'ID': - distro = val.lower() - elif var == 'VERSION_ID': - distro_version = val.lower() - elif var == 'VERSION_CODENAME': - distro_codename = val.lower() - return distro, distro_version, distro_codename - - -class Packager(object): - def __init__(self, ctx: CephadmContext, - stable: Optional[str] = None, version: Optional[str] = None, - branch: Optional[str] = None, commit: Optional[str] = None): - assert \ - (stable and not version and not branch and not commit) or \ - (not stable and version and not branch and not commit) or \ - (not stable and not version and branch) or \ - (not stable and not version and not branch and not commit) - self.ctx = ctx - self.stable = stable - self.version = version - self.branch = branch - self.commit = commit - - def validate(self) -> None: - """Validate parameters before writing any state to disk.""" - pass - - def add_repo(self) -> None: - raise NotImplementedError - - def rm_repo(self) -> None: - raise NotImplementedError - - def install(self, ls: List[str]) -> None: - raise NotImplementedError - - def install_podman(self) -> None: - raise NotImplementedError - - def query_shaman(self, distro: str, distro_version: Any, branch: Optional[str], commit: Optional[str]) -> str: - # query shaman - logger.info('Fetching repo metadata from shaman and chacra...') - shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format( - distro=distro, - distro_version=distro_version, - branch=branch, - sha1=commit or 'latest', - arch=get_arch() - ) - try: - shaman_response = urlopen(shaman_url) - except HTTPError as err: - logger.error('repository not found in shaman (might not be available yet)') - raise Error('%s, failed to fetch %s' % (err, shaman_url)) - chacra_url = '' - try: - chacra_url = shaman_response.geturl() - chacra_response = urlopen(chacra_url) - except HTTPError as err: - logger.error('repository not found in chacra (might not be available yet)') - raise Error('%s, failed to fetch %s' % (err, chacra_url)) - return chacra_response.read().decode('utf-8') - - def repo_gpgkey(self) -> Tuple[str, str]: - if self.ctx.gpg_url: - return self.ctx.gpg_url, 'manual' - if self.stable or self.version: - return 'https://download.ceph.com/keys/release.gpg', 'release' - else: - return 'https://download.ceph.com/keys/autobuild.gpg', 'autobuild' - - def enable_service(self, service: str) -> None: - """ - Start and enable the service (typically using systemd). - """ - call_throws(self.ctx, ['systemctl', 'enable', '--now', service]) - - -class Apt(Packager): - DISTRO_NAMES = { - 'ubuntu': 'ubuntu', - 'debian': 'debian', - } - - def __init__(self, ctx: CephadmContext, - stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str], - distro: Optional[str], distro_version: Optional[str], distro_codename: Optional[str]) -> None: - super(Apt, self).__init__(ctx, stable=stable, version=version, - branch=branch, commit=commit) - assert distro - self.ctx = ctx - self.distro = self.DISTRO_NAMES[distro] - self.distro_codename = distro_codename - self.distro_version = distro_version - - def repo_path(self) -> str: - return '/etc/apt/sources.list.d/ceph.list' - - def add_repo(self) -> None: - - url, name = self.repo_gpgkey() - logger.info('Installing repo GPG key from %s...' % url) - try: - response = urlopen(url) - except HTTPError as err: - logger.error('failed to fetch GPG repo key from %s: %s' % ( - url, err)) - raise Error('failed to fetch GPG key') - key = response.read() - with open('/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name, 'wb') as f: - f.write(key) - - if self.version: - content = 'deb %s/debian-%s/ %s main\n' % ( - self.ctx.repo_url, self.version, self.distro_codename) - elif self.stable: - content = 'deb %s/debian-%s/ %s main\n' % ( - self.ctx.repo_url, self.stable, self.distro_codename) - else: - content = self.query_shaman(self.distro, self.distro_codename, self.branch, - self.commit) - - logger.info('Installing repo file at %s...' % self.repo_path()) - with open(self.repo_path(), 'w') as f: - f.write(content) - - self.update() - - def rm_repo(self) -> None: - for name in ['autobuild', 'release', 'manual']: - p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name - if os.path.exists(p): - logger.info('Removing repo GPG key %s...' % p) - os.unlink(p) - if os.path.exists(self.repo_path()): - logger.info('Removing repo at %s...' % self.repo_path()) - os.unlink(self.repo_path()) - - if self.distro == 'ubuntu': - self.rm_kubic_repo() - - def install(self, ls: List[str]) -> None: - logger.info('Installing packages %s...' % ls) - call_throws(self.ctx, ['apt-get', 'install', '-y'] + ls) - - def update(self) -> None: - logger.info('Updating package list...') - call_throws(self.ctx, ['apt-get', 'update']) - - def install_podman(self) -> None: - if self.distro == 'ubuntu': - logger.info('Setting up repo for podman...') - self.add_kubic_repo() - self.update() - - logger.info('Attempting podman install...') - try: - self.install(['podman']) - except Error: - logger.info('Podman did not work. Falling back to docker...') - self.install(['docker.io']) - - def kubic_repo_url(self) -> str: - return 'https://download.opensuse.org/repositories/devel:/kubic:/' \ - 'libcontainers:/stable/xUbuntu_%s/' % self.distro_version - - def kubic_repo_path(self) -> str: - return '/etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list' - - def kubic_repo_gpgkey_url(self) -> str: - return '%s/Release.key' % self.kubic_repo_url() - - def kubic_repo_gpgkey_path(self) -> str: - return '/etc/apt/trusted.gpg.d/kubic.release.gpg' - - def add_kubic_repo(self) -> None: - url = self.kubic_repo_gpgkey_url() - logger.info('Installing repo GPG key from %s...' % url) - try: - response = urlopen(url) - except HTTPError as err: - logger.error('failed to fetch GPG repo key from %s: %s' % ( - url, err)) - raise Error('failed to fetch GPG key') - key = response.read().decode('utf-8') - tmp_key = write_tmp(key, 0, 0) - keyring = self.kubic_repo_gpgkey_path() - call_throws(self.ctx, ['apt-key', '--keyring', keyring, 'add', tmp_key.name]) - - logger.info('Installing repo file at %s...' % self.kubic_repo_path()) - content = 'deb %s /\n' % self.kubic_repo_url() - with open(self.kubic_repo_path(), 'w') as f: - f.write(content) - - def rm_kubic_repo(self) -> None: - keyring = self.kubic_repo_gpgkey_path() - if os.path.exists(keyring): - logger.info('Removing repo GPG key %s...' % keyring) - os.unlink(keyring) - - p = self.kubic_repo_path() - if os.path.exists(p): - logger.info('Removing repo at %s...' % p) - os.unlink(p) - - -class YumDnf(Packager): - DISTRO_NAMES = { - 'centos': ('centos', 'el'), - 'rhel': ('centos', 'el'), - 'scientific': ('centos', 'el'), - 'rocky': ('centos', 'el'), - 'almalinux': ('centos', 'el'), - 'ol': ('centos', 'el'), - 'fedora': ('fedora', 'fc'), - 'mariner': ('mariner', 'cm'), - } - - def __init__(self, ctx: CephadmContext, - stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str], - distro: Optional[str], distro_version: Optional[str]) -> None: - super(YumDnf, self).__init__(ctx, stable=stable, version=version, - branch=branch, commit=commit) - assert distro - assert distro_version - self.ctx = ctx - self.major = int(distro_version.split('.')[0]) - self.distro_normalized = self.DISTRO_NAMES[distro][0] - self.distro_code = self.DISTRO_NAMES[distro][1] + str(self.major) - if (self.distro_code == 'fc' and self.major >= 30) or \ - (self.distro_code == 'el' and self.major >= 8): - self.tool = 'dnf' - elif (self.distro_code == 'cm'): - self.tool = 'tdnf' - else: - self.tool = 'yum' - - def custom_repo(self, **kw: Any) -> str: - """ - Repo files need special care in that a whole line should not be present - if there is no value for it. Because we were using `format()` we could - not conditionally add a line for a repo file. So the end result would - contain a key with a missing value (say if we were passing `None`). - - For example, it could look like:: - - [ceph repo] - name= ceph repo - proxy= - gpgcheck= - - Which breaks. This function allows us to conditionally add lines, - preserving an order and be more careful. - - Previously, and for historical purposes, this is how the template used - to look:: - - custom_repo = - [{repo_name}] - name={name} - baseurl={baseurl} - enabled={enabled} - gpgcheck={gpgcheck} - type={_type} - gpgkey={gpgkey} - proxy={proxy} - - """ - lines = [] - - # by using tuples (vs a dict) we preserve the order of what we want to - # return, like starting with a [repo name] - tmpl = ( - ('reponame', '[%s]'), - ('name', 'name=%s'), - ('baseurl', 'baseurl=%s'), - ('enabled', 'enabled=%s'), - ('gpgcheck', 'gpgcheck=%s'), - ('_type', 'type=%s'), - ('gpgkey', 'gpgkey=%s'), - ('proxy', 'proxy=%s'), - ('priority', 'priority=%s'), - ) - - for line in tmpl: - tmpl_key, tmpl_value = line # key values from tmpl - - # ensure that there is an actual value (not None nor empty string) - if tmpl_key in kw and kw.get(tmpl_key) not in (None, ''): - lines.append(tmpl_value % kw.get(tmpl_key)) - - return '\n'.join(lines) - - def repo_path(self) -> str: - return '/etc/yum.repos.d/ceph.repo' - - def repo_baseurl(self) -> str: - assert self.stable or self.version - if self.version: - return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.version, - self.distro_code) - else: - return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.stable, - self.distro_code) - - def validate(self) -> None: - if self.distro_code.startswith('fc'): - raise Error('Ceph team does not build Fedora specific packages and therefore cannot add repos for this distro') - if self.distro_code == 'el7': - if self.stable and self.stable >= 'pacific': - raise Error('Ceph does not support pacific or later for this version of this linux distro and therefore cannot add a repo for it') - if self.version and self.version.split('.')[0] >= '16': - raise Error('Ceph does not support 16.y.z or later for this version of this linux distro and therefore cannot add a repo for it') - - if self.stable or self.version: - # we know that yum & dnf require there to be a - # $base_url/$arch/repodata/repomd.xml so we can test if this URL - # is gettable in order to validate the inputs - test_url = self.repo_baseurl() + '/noarch/repodata/repomd.xml' - try: - urlopen(test_url) - except HTTPError as err: - logger.error('unable to fetch repo metadata: %r', err) - raise Error('failed to fetch repository metadata. please check' - ' the provided parameters are correct and try again') - - def add_repo(self) -> None: - if self.stable or self.version: - content = '' - for n, t in { - 'Ceph': '$basearch', - 'Ceph-noarch': 'noarch', - 'Ceph-source': 'SRPMS'}.items(): - content += '[%s]\n' % (n) - content += self.custom_repo( - name='Ceph %s' % t, - baseurl=self.repo_baseurl() + '/' + t, - enabled=1, - gpgcheck=1, - gpgkey=self.repo_gpgkey()[0], - ) - content += '\n\n' - else: - content = self.query_shaman(self.distro_normalized, self.major, - self.branch, - self.commit) - - logger.info('Writing repo to %s...' % self.repo_path()) - with open(self.repo_path(), 'w') as f: - f.write(content) - - if self.distro_code.startswith('el'): - logger.info('Enabling EPEL...') - call_throws(self.ctx, [self.tool, 'install', '-y', 'epel-release']) - - def rm_repo(self) -> None: - if os.path.exists(self.repo_path()): - os.unlink(self.repo_path()) - - def install(self, ls: List[str]) -> None: - logger.info('Installing packages %s...' % ls) - call_throws(self.ctx, [self.tool, 'install', '-y'] + ls) - - def install_podman(self) -> None: - self.install(['podman']) - - -class Zypper(Packager): - DISTRO_NAMES = [ - 'sles', - 'opensuse-tumbleweed', - 'opensuse-leap' - ] - - def __init__(self, ctx: CephadmContext, - stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str], - distro: Optional[str], distro_version: Optional[str]) -> None: - super(Zypper, self).__init__(ctx, stable=stable, version=version, - branch=branch, commit=commit) - assert distro is not None - self.ctx = ctx - self.tool = 'zypper' - self.distro = 'opensuse' - self.distro_version = '15.1' - if 'tumbleweed' not in distro and distro_version is not None: - self.distro_version = distro_version - - def custom_repo(self, **kw: Any) -> str: - """ - See YumDnf for format explanation. - """ - lines = [] - - # by using tuples (vs a dict) we preserve the order of what we want to - # return, like starting with a [repo name] - tmpl = ( - ('reponame', '[%s]'), - ('name', 'name=%s'), - ('baseurl', 'baseurl=%s'), - ('enabled', 'enabled=%s'), - ('gpgcheck', 'gpgcheck=%s'), - ('_type', 'type=%s'), - ('gpgkey', 'gpgkey=%s'), - ('proxy', 'proxy=%s'), - ('priority', 'priority=%s'), - ) - - for line in tmpl: - tmpl_key, tmpl_value = line # key values from tmpl - - # ensure that there is an actual value (not None nor empty string) - if tmpl_key in kw and kw.get(tmpl_key) not in (None, ''): - lines.append(tmpl_value % kw.get(tmpl_key)) - - return '\n'.join(lines) - - def repo_path(self) -> str: - return '/etc/zypp/repos.d/ceph.repo' - - def repo_baseurl(self) -> str: - assert self.stable or self.version - if self.version: - return '%s/rpm-%s/%s' % (self.ctx.repo_url, - self.stable, self.distro) - else: - return '%s/rpm-%s/%s' % (self.ctx.repo_url, - self.stable, self.distro) - - def add_repo(self) -> None: - if self.stable or self.version: - content = '' - for n, t in { - 'Ceph': '$basearch', - 'Ceph-noarch': 'noarch', - 'Ceph-source': 'SRPMS'}.items(): - content += '[%s]\n' % (n) - content += self.custom_repo( - name='Ceph %s' % t, - baseurl=self.repo_baseurl() + '/' + t, - enabled=1, - gpgcheck=1, - gpgkey=self.repo_gpgkey()[0], - ) - content += '\n\n' - else: - content = self.query_shaman(self.distro, self.distro_version, - self.branch, - self.commit) - - logger.info('Writing repo to %s...' % self.repo_path()) - with open(self.repo_path(), 'w') as f: - f.write(content) - - def rm_repo(self) -> None: - if os.path.exists(self.repo_path()): - os.unlink(self.repo_path()) - - def install(self, ls: List[str]) -> None: - logger.info('Installing packages %s...' % ls) - call_throws(self.ctx, [self.tool, 'in', '-y'] + ls) - - def install_podman(self) -> None: - self.install(['podman']) - - -def create_packager(ctx: CephadmContext, - stable: Optional[str] = None, version: Optional[str] = None, - branch: Optional[str] = None, commit: Optional[str] = None) -> Packager: - distro, distro_version, distro_codename = get_distro() - if distro in YumDnf.DISTRO_NAMES: - return YumDnf(ctx, stable=stable, version=version, - branch=branch, commit=commit, - distro=distro, distro_version=distro_version) - elif distro in Apt.DISTRO_NAMES: - return Apt(ctx, stable=stable, version=version, - branch=branch, commit=commit, - distro=distro, distro_version=distro_version, - distro_codename=distro_codename) - elif distro in Zypper.DISTRO_NAMES: - return Zypper(ctx, stable=stable, version=version, - branch=branch, commit=commit, - distro=distro, distro_version=distro_version) - raise Error('Distro %s version %s not supported' % (distro, distro_version)) - - def command_add_repo(ctx: CephadmContext) -> None: if ctx.version and ctx.release: raise Error('you can specify either --release or --version but not both') @@ -9206,843 +6966,6 @@ def command_rescan_disks(ctx: CephadmContext) -> str: return f'Ok. {len(all_scan_files)} adapters detected: {len(scan_files)} rescanned, {len(skipped)} skipped, {len(failures)} failed ({elapsed:.2f}s)' -################################## - - -def get_ipv4_address(ifname): - # type: (str) -> str - def _extract(sock: socket.socket, offset: int) -> str: - return socket.inet_ntop( - socket.AF_INET, - fcntl.ioctl( - sock.fileno(), - offset, - struct.pack('256s', bytes(ifname[:15], 'utf-8')) - )[20:24]) - - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - addr = _extract(s, 35093) # '0x8915' = SIOCGIFADDR - dq_mask = _extract(s, 35099) # 0x891b = SIOCGIFNETMASK - except OSError: - # interface does not have an ipv4 address - return '' - - dec_mask = sum([bin(int(i)).count('1') - for i in dq_mask.split('.')]) - return '{}/{}'.format(addr, dec_mask) - - -def get_ipv6_address(ifname): - # type: (str) -> str - if not os.path.exists('/proc/net/if_inet6'): - return '' - - raw = read_file(['/proc/net/if_inet6']) - data = raw.splitlines() - # based on docs @ https://www.tldp.org/HOWTO/Linux+IPv6-HOWTO/ch11s04.html - # field 0 is ipv6, field 2 is scope - for iface_setting in data: - field = iface_setting.split() - if field[-1] == ifname: - ipv6_raw = field[0] - ipv6_fmtd = ':'.join([ipv6_raw[_p:_p + 4] for _p in range(0, len(field[0]), 4)]) - # apply naming rules using ipaddress module - ipv6 = ipaddress.ip_address(ipv6_fmtd) - return '{}/{}'.format(str(ipv6), int('0x{}'.format(field[2]), 16)) - return '' - - -def bytes_to_human(num, mode='decimal'): - # type: (float, str) -> str - """Convert a bytes value into it's human-readable form. - - :param num: number, in bytes, to convert - :param mode: Either decimal (default) or binary to determine divisor - :returns: string representing the bytes value in a more readable format - """ - unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB'] - divisor = 1000.0 - yotta = 'YB' - - if mode == 'binary': - unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB'] - divisor = 1024.0 - yotta = 'YiB' - - for unit in unit_list: - if abs(num) < divisor: - return '%3.1f%s' % (num, unit) - num /= divisor - return '%.1f%s' % (num, yotta) - - -def read_file(path_list, file_name=''): - # type: (List[str], str) -> str - """Returns the content of the first file found within the `path_list` - - :param path_list: list of file paths to search - :param file_name: optional file_name to be applied to a file path - :returns: content of the file or 'Unknown' - """ - for path in path_list: - if file_name: - file_path = os.path.join(path, file_name) - else: - file_path = path - if os.path.exists(file_path): - with open(file_path, 'rb') as f: - try: - content = f.read().decode('utf-8', 'ignore').strip() - except OSError: - # sysfs may populate the file, but for devices like - # virtio reads can fail - return 'Unknown' - else: - return content - return 'Unknown' - -################################## - - -class Enclosure: - def __init__(self, enc_id: str, enc_path: str, dev_path: str): - """External disk enclosure metadata - - Args: - :param enc_id: enclosure id (normally a WWN) - :param enc_path: sysfs path to HBA attached to the enclosure - e.g. /sys/class/scsi_generic/sg11/device/enclosure/0:0:9:0 - :param dev_path: sysfs path to the generic scsi device for the enclosure HBA - e.g. /sys/class/scsi_generic/sg2 - """ - self._path: str = dev_path - self._dev_path: str = os.path.join(dev_path, 'device') - self._enc_path: str = enc_path - self.ses_paths: List[str] = [] - self.path_count: int = 0 - self.vendor: str = '' - self.model: str = '' - self.enc_id: str = enc_id - self.components: Union[int, str] = 0 - self.device_lookup: Dict[str, str] = {} - self.device_count: int = 0 - self.slot_map: Dict[str, Dict[str, str]] = {} - - self._probe() - - def _probe(self) -> None: - """Analyse the dev paths to identify enclosure related information""" - - self.vendor = read_file([os.path.join(self._dev_path, 'vendor')]) - self.model = read_file([os.path.join(self._dev_path, 'model')]) - self.components = read_file([os.path.join(self._enc_path, 'components')]) - slot_paths = glob(os.path.join(self._enc_path, '*', 'slot')) - for slot_path in slot_paths: - slot = read_file([slot_path]) - serial_path = os.path.join(os.path.dirname(slot_path), 'device', 'vpd_pg80') - serial = '' - if os.path.exists(serial_path): - serial_raw = read_file([serial_path]) - serial = (''.join(char for char in serial_raw if char in string.printable)).strip() - self.device_lookup[serial] = slot - slot_dir = os.path.dirname(slot_path) - self.slot_map[slot] = { - 'status': read_file([os.path.join(slot_dir, 'status')]), - 'fault': read_file([os.path.join(slot_dir, 'fault')]), - 'locate': read_file([os.path.join(slot_dir, 'locate')]), - 'serial': serial, - } - - self.device_count = len(self.device_lookup) - self.update(os.path.basename(self._path)) - - def update(self, dev_id: str) -> None: - """Update an enclosure object with a related sg device name - - :param dev_id (str): device name e.g. sg2 - """ - self.ses_paths.append(dev_id) - self.path_count = len(self.ses_paths) - - def _dump(self) -> Dict[str, Any]: - """Return a dict representation of the object""" - return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} - - def __str__(self) -> str: - """Return a formatted json representation of the object as a string""" - return json.dumps(self._dump(), indent=2) - - def __repr__(self) -> str: - """Return a json representation of the object as a string""" - return json.dumps(self._dump()) - - def as_json(self) -> Dict[str, Any]: - """Return a dict representing the object""" - return self._dump() - - -class HostFacts(): - _dmi_path_list = ['/sys/class/dmi/id'] - _nic_path_list = ['/sys/class/net'] - _apparmor_path_list = ['/etc/apparmor'] - _disk_vendor_workarounds = { - '0x1af4': 'Virtio Block Device' - } - _excluded_block_devices = ('sr', 'zram', 'dm-', 'loop', 'md') - _sg_generic_glob = '/sys/class/scsi_generic/*' - - def __init__(self, ctx: CephadmContext): - self.ctx: CephadmContext = ctx - self.cpu_model: str = 'Unknown' - self.sysctl_options: Dict[str, str] = self._populate_sysctl_options() - self.cpu_count: int = 0 - self.cpu_cores: int = 0 - self.cpu_threads: int = 0 - self.interfaces: Dict[str, Any] = {} - - self._meminfo: List[str] = read_file(['/proc/meminfo']).splitlines() - self._get_cpuinfo() - self._process_nics() - self.arch: str = platform.processor() - self.kernel: str = platform.release() - self._enclosures = self._discover_enclosures() - self._block_devices = self._get_block_devs() - self._device_list = self._get_device_info() - - def _populate_sysctl_options(self) -> Dict[str, str]: - sysctl_options = {} - out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR) - if out: - for line in out.splitlines(): - option, value = line.split('=') - sysctl_options[option.strip()] = value.strip() - return sysctl_options - - def _discover_enclosures(self) -> Dict[str, Enclosure]: - """Build a dictionary of discovered scsi enclosures - - Enclosures are detected by walking the scsi generic sysfs hierarchy. - Any device tree that holds an 'enclosure' subdirectory is interpreted as - an enclosure. Once identified the enclosure directory is analysis to - identify key descriptors that will help relate disks to enclosures and - disks to enclosure slots. - - :return: Dict[str, Enclosure]: a map of enclosure id (hex) to enclosure object - """ - sg_paths: List[str] = glob(HostFacts._sg_generic_glob) - enclosures: Dict[str, Enclosure] = {} - - for sg_path in sg_paths: - enc_path = os.path.join(sg_path, 'device', 'enclosure') - if os.path.exists(enc_path): - enc_dirs = glob(os.path.join(enc_path, '*')) - if len(enc_dirs) != 1: - # incomplete enclosure spec - expecting ONE dir in the format - # host(adapter):bus:target:lun e.g. 16:0:0:0 - continue - enc_path = enc_dirs[0] - enc_id = read_file([os.path.join(enc_path, 'id')]) - if enc_id in enclosures: - enclosures[enc_id].update(os.path.basename(sg_path)) - continue - - enclosure = Enclosure(enc_id, enc_path, sg_path) - enclosures[enc_id] = enclosure - - return enclosures - - @property - def enclosures(self) -> Dict[str, Dict[str, Any]]: - """Dump the enclosure objects as dicts""" - return {k: v._dump() for k, v in self._enclosures.items()} - - @property - def enclosure_count(self) -> int: - """Return the number of enclosures detected""" - return len(self._enclosures.keys()) - - def _get_cpuinfo(self): - # type: () -> None - """Determine cpu information via /proc/cpuinfo""" - raw = read_file(['/proc/cpuinfo']) - output = raw.splitlines() - cpu_set = set() - - for line in output: - field = [f.strip() for f in line.split(':')] - if 'model name' in line: - self.cpu_model = field[1] - if 'physical id' in line: - cpu_set.add(field[1]) - if 'siblings' in line: - self.cpu_threads = int(field[1].strip()) - if 'cpu cores' in line: - self.cpu_cores = int(field[1].strip()) - pass - self.cpu_count = len(cpu_set) - - def _get_block_devs(self): - # type: () -> List[str] - """Determine the list of block devices by looking at /sys/block""" - return [dev for dev in os.listdir('/sys/block') - if not dev.startswith(HostFacts._excluded_block_devices)] - - @property - def operating_system(self): - # type: () -> str - """Determine OS version""" - raw_info = read_file(['/etc/os-release']) - os_release = raw_info.splitlines() - rel_str = 'Unknown' - rel_dict = dict() - - for line in os_release: - if '=' in line: - var_name, var_value = line.split('=') - rel_dict[var_name] = var_value.strip('"') - - # Would normally use PRETTY_NAME, but NAME and VERSION are more - # consistent - if all(_v in rel_dict for _v in ['NAME', 'VERSION']): - rel_str = '{} {}'.format(rel_dict['NAME'], rel_dict['VERSION']) - return rel_str - - @property - def hostname(self): - # type: () -> str - """Return the hostname""" - return platform.node() - - @property - def shortname(self) -> str: - return platform.node().split('.', 1)[0] - - @property - def fqdn(self) -> str: - return get_fqdn() - - @property - def subscribed(self): - # type: () -> str - """Highlevel check to see if the host is subscribed to receive updates/support""" - def _red_hat(): - # type: () -> str - # RHEL 7 and RHEL 8 - entitlements_dir = '/etc/pki/entitlement' - if os.path.exists(entitlements_dir): - pems = glob('{}/*.pem'.format(entitlements_dir)) - if len(pems) >= 2: - return 'Yes' - - return 'No' - - os_name = self.operating_system - if os_name.upper().startswith('RED HAT'): - return _red_hat() - - return 'Unknown' - - @property - def hdd_count(self): - # type: () -> int - """Return a count of HDDs (spinners)""" - return len(self.hdd_list) - - def _get_capacity(self, dev): - # type: (str) -> int - """Determine the size of a given device - - The kernel always bases device size calculations based on a 512 byte - sector. For more information see - https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/linux/types.h?h=v5.15.63#n120 - """ - size_path = os.path.join('/sys/block', dev, 'size') - size_blocks = int(read_file([size_path])) - return size_blocks * 512 - - def _get_capacity_by_type(self, disk_type='hdd'): - # type: (str) -> int - """Return the total capacity of a category of device (flash or hdd)""" - capacity: int = 0 - for dev in self._device_list: - if dev['disk_type'] == disk_type: - disk_capacity = cast(int, dev.get('disk_size_bytes', 0)) - capacity += disk_capacity - return capacity - - def _get_device_info(self): - # type: () -> List[Dict[str, object]] - """Return a 'pretty' name list for each unique device in the `dev_list`""" - disk_list = list() - - # serial_num_lookup is a dict of serial number -> List of devices with that serial number - serial_num_lookup: Dict[str, List[str]] = {} - - # make a map of devname -> disk path. this path name may indicate the physical slot - # of a drive (phyXX) - disk_path_map: Dict[str, str] = {} - for path in glob('/dev/disk/by-path/*'): - tgt_raw = Path(path).resolve() - tgt = os.path.basename(str(tgt_raw)) - disk_path_map[tgt] = path - - # make a map of holder (dm-XX) -> full mpath name - dm_device_map: Dict[str, str] = {} - for mpath in glob('/dev/mapper/mpath*'): - tgt_raw = Path(mpath).resolve() - tgt = os.path.basename(str(tgt_raw)) - dm_device_map[tgt] = mpath - - # main loop to process all eligible block devices - for dev in self._block_devices: - enclosure_id = '' - enclosure_slot = '' - scsi_addr = '' - mpath = '' - - disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip() - disk_rev = read_file(['/sys/block/{}/device/rev'.format(dev)]).strip() - disk_wwid = read_file(['/sys/block/{}/device/wwid'.format(dev)]).strip() - vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip() - rotational = read_file(['/sys/block/{}/queue/rotational'.format(dev)]) - holders_raw = glob('/sys/block/{}/holders/*'.format(dev)) - if len(holders_raw) == 1: - # mpath will have 1 holder entry - holder = os.path.basename(holders_raw[0]) - mpath = dm_device_map.get(holder, '') - - disk_type = 'hdd' if rotational == '1' else 'flash' - scsi_addr_path = glob('/sys/block/{}/device/bsg/*'.format(dev)) - if len(scsi_addr_path) == 1: - scsi_addr = os.path.basename(scsi_addr_path[0]) - - # vpd_pg80 isn't guaranteed (libvirt, vmware for example) - serial_raw = read_file(['/sys/block/{}/device/vpd_pg80'.format(dev)]) - serial = (''.join(i for i in serial_raw if i in string.printable)).strip() - if serial.lower() == 'unknown': - serial = '' - else: - if serial in serial_num_lookup: - serial_num_lookup[serial].append(dev) - else: - serial_num_lookup[serial] = [dev] - for enc_id, enclosure in self._enclosures.items(): - if serial in enclosure.device_lookup.keys(): - enclosure_id = enc_id - enclosure_slot = enclosure.device_lookup[serial] - - disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor) - disk_size_bytes = self._get_capacity(dev) - disk_list.append({ - 'description': '{} {} ({})'.format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)), - 'vendor': disk_vendor, - 'model': disk_model, - 'rev': disk_rev, - 'wwid': disk_wwid, - 'dev_name': dev, - 'disk_size_bytes': disk_size_bytes, - 'disk_type': disk_type, - 'serial': serial, - 'alt_dev_name': '', - 'scsi_addr': scsi_addr, - 'enclosure_id': enclosure_id, - 'enclosure_slot': enclosure_slot, - 'path_id': disk_path_map.get(dev, ''), - 'mpath': mpath, - }) - - # process the devices to drop duplicate physical devs based on matching - # the unique serial number - disk_list_unique: List[Dict[str, Any]] = [] - serials_seen: List[str] = [] - for dev in disk_list: - serial = str(dev['serial']) - if serial: - if serial in serials_seen: - continue - else: - serials_seen.append(serial) - devs = serial_num_lookup[serial].copy() - devs.remove(str(dev['dev_name'])) - dev['alt_dev_name'] = ','.join(devs) - disk_list_unique.append(dev) - - return disk_list_unique - - @property - def hdd_list(self): - # type: () -> List[Dict[str, object]] - """Return a list of devices that are HDDs (spinners)""" - return [dev for dev in self._device_list if dev['disk_type'] == 'hdd'] - - @property - def flash_list(self): - # type: () -> List[Dict[str, object]] - """Return a list of devices that are flash based (SSD, NVMe)""" - return [dev for dev in self._device_list if dev['disk_type'] == 'flash'] - - @property - def hdd_capacity_bytes(self): - # type: () -> int - """Return the total capacity for all HDD devices (bytes)""" - return self._get_capacity_by_type(disk_type='hdd') - - @property - def hdd_capacity(self): - # type: () -> str - """Return the total capacity for all HDD devices (human readable format)""" - return bytes_to_human(self.hdd_capacity_bytes) - - @property - def cpu_load(self): - # type: () -> Dict[str, float] - """Return the cpu load average data for the host""" - raw = read_file(['/proc/loadavg']).strip() - data = raw.split() - return { - '1min': float(data[0]), - '5min': float(data[1]), - '15min': float(data[2]), - } - - @property - def flash_count(self): - # type: () -> int - """Return the number of flash devices in the system (SSD, NVMe)""" - return len(self.flash_list) - - @property - def flash_capacity_bytes(self): - # type: () -> int - """Return the total capacity for all flash devices (bytes)""" - return self._get_capacity_by_type(disk_type='flash') - - @property - def flash_capacity(self): - # type: () -> str - """Return the total capacity for all Flash devices (human readable format)""" - return bytes_to_human(self.flash_capacity_bytes) - - def _process_nics(self): - # type: () -> None - """Look at the NIC devices and extract network related metadata""" - # from https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_arp.h - hw_lookup = { - '1': 'ethernet', - '32': 'infiniband', - '772': 'loopback', - } - - for nic_path in HostFacts._nic_path_list: - if not os.path.exists(nic_path): - continue - for iface in os.listdir(nic_path): - - if os.path.exists(os.path.join(nic_path, iface, 'bridge')): - nic_type = 'bridge' - elif os.path.exists(os.path.join(nic_path, iface, 'bonding')): - nic_type = 'bonding' - else: - nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown') - - if nic_type == 'loopback': # skip loopback devices - continue - - lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))] - upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))] - - try: - mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')])) - except ValueError: - mtu = 0 - - operstate = read_file([os.path.join(nic_path, iface, 'operstate')]) - try: - speed = int(read_file([os.path.join(nic_path, iface, 'speed')])) - except (OSError, ValueError): - # OSError : device doesn't support the ethtool get_link_ksettings - # ValueError : raised when the read fails, and returns Unknown - # - # Either way, we show a -1 when speed isn't available - speed = -1 - - dev_link = os.path.join(nic_path, iface, 'device') - if os.path.exists(dev_link): - iftype = 'physical' - driver_path = os.path.join(dev_link, 'driver') - if os.path.exists(driver_path): - driver = os.path.basename(os.path.realpath(driver_path)) - else: - driver = 'Unknown' - - else: - iftype = 'logical' - driver = '' - - self.interfaces[iface] = { - 'mtu': mtu, - 'upper_devs_list': upper_devs_list, - 'lower_devs_list': lower_devs_list, - 'operstate': operstate, - 'iftype': iftype, - 'nic_type': nic_type, - 'driver': driver, - 'speed': speed, - 'ipv4_address': get_ipv4_address(iface), - 'ipv6_address': get_ipv6_address(iface), - } - - @property - def nic_count(self): - # type: () -> int - """Return a total count of all physical NICs detected in the host""" - phys_devs = [] - for iface in self.interfaces: - if self.interfaces[iface]['iftype'] == 'physical': - phys_devs.append(iface) - return len(phys_devs) - - def _get_mem_data(self, field_name): - # type: (str) -> int - for line in self._meminfo: - if line.startswith(field_name): - _d = line.split() - return int(_d[1]) - return 0 - - @property - def memory_total_kb(self): - # type: () -> int - """Determine the memory installed (kb)""" - return self._get_mem_data('MemTotal') - - @property - def memory_free_kb(self): - # type: () -> int - """Determine the memory free (not cache, immediately usable)""" - return self._get_mem_data('MemFree') - - @property - def memory_available_kb(self): - # type: () -> int - """Determine the memory available to new applications without swapping""" - return self._get_mem_data('MemAvailable') - - @property - def vendor(self): - # type: () -> str - """Determine server vendor from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, 'sys_vendor') - - @property - def model(self): - # type: () -> str - """Determine server model information from DMI data in sysfs""" - family = read_file(HostFacts._dmi_path_list, 'product_family') - product = read_file(HostFacts._dmi_path_list, 'product_name') - if family == 'Unknown' and product: - return '{}'.format(product) - - return '{} ({})'.format(family, product) - - @property - def bios_version(self): - # type: () -> str - """Determine server BIOS version from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, 'bios_version') - - @property - def bios_date(self): - # type: () -> str - """Determine server BIOS date from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, 'bios_date') - - @property - def chassis_serial(self): - # type: () -> str - """Determine chassis serial number from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, 'chassis_serial') - - @property - def board_serial(self): - # type: () -> str - """Determine mainboard serial number from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, 'board_serial') - - @property - def product_serial(self): - # type: () -> str - """Determine server's serial number from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, 'product_serial') - - @property - def timestamp(self): - # type: () -> float - """Return the current time as Epoch seconds""" - return time.time() - - @property - def system_uptime(self): - # type: () -> float - """Return the system uptime (in secs)""" - raw_time = read_file(['/proc/uptime']) - up_secs, _ = raw_time.split() - return float(up_secs) - - @property - def kernel_security(self): - # type: () -> Dict[str, str] - """Determine the security features enabled in the kernel - SELinux, AppArmor""" - def _fetch_selinux() -> Dict[str, str]: - """Get the selinux status""" - security = {} - try: - out, err, code = call(self.ctx, ['sestatus'], - verbosity=CallVerbosity.QUIET) - security['type'] = 'SELinux' - status, mode, policy = '', '', '' - for line in out.split('\n'): - if line.startswith('SELinux status:'): - k, v = line.split(':') - status = v.strip() - elif line.startswith('Current mode:'): - k, v = line.split(':') - mode = v.strip() - elif line.startswith('Loaded policy name:'): - k, v = line.split(':') - policy = v.strip() - if status == 'disabled': - security['description'] = 'SELinux: Disabled' - else: - security['description'] = 'SELinux: Enabled({}, {})'.format(mode, policy) - except Exception as e: - logger.info('unable to get selinux status: %s' % e) - return security - - def _fetch_apparmor() -> Dict[str, str]: - """Read the apparmor profiles directly, returning an overview of AppArmor status""" - security = {} - for apparmor_path in HostFacts._apparmor_path_list: - if os.path.exists(apparmor_path): - security['type'] = 'AppArmor' - security['description'] = 'AppArmor: Enabled' - try: - profiles = read_file(['/sys/kernel/security/apparmor/profiles']) - if len(profiles) == 0: - return {} - except OSError: - pass - else: - summary = {} # type: Dict[str, int] - for line in profiles.split('\n'): - item, mode = line.split(' ') - mode = mode.strip('()') - if mode in summary: - summary[mode] += 1 - else: - summary[mode] = 0 - summary_str = ','.join(['{} {}'.format(v, k) for k, v in summary.items()]) - security = {**security, **summary} # type: ignore - security['description'] += '({})'.format(summary_str) - - return security - return {} - - ret = {} - if os.path.exists('/sys/kernel/security/lsm'): - lsm = read_file(['/sys/kernel/security/lsm']).strip() - if 'selinux' in lsm: - ret = _fetch_selinux() - elif 'apparmor' in lsm: - ret = _fetch_apparmor() - else: - return { - 'type': 'Unknown', - 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor' - } - - if ret: - return ret - - return { - 'type': 'None', - 'description': 'Linux Security Module framework is not available' - } - - @property - def selinux_enabled(self) -> bool: - return (self.kernel_security['type'] == 'SELinux') and \ - (self.kernel_security['description'] != 'SELinux: Disabled') - - @property - def kernel_parameters(self): - # type: () -> Dict[str, str] - """Get kernel parameters required/used in Ceph clusters""" - - k_param = {} - out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT) - if out: - param_list = out.split('\n') - param_dict = {param.split(' = ')[0]: param.split(' = ')[-1] for param in param_list} - - # return only desired parameters - if 'net.ipv4.ip_nonlocal_bind' in param_dict: - k_param['net.ipv4.ip_nonlocal_bind'] = param_dict['net.ipv4.ip_nonlocal_bind'] - - return k_param - - @staticmethod - def _process_net_data(tcp_file: str, protocol: str = 'tcp') -> List[int]: - listening_ports = [] - # Connections state documentation - # tcp - https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h - # udp - uses 07 (TCP_CLOSE or UNCONN, since udp is stateless. test with netcat -ul <port>) - listening_state = { - 'tcp': '0A', - 'udp': '07' - } - - if protocol not in listening_state.keys(): - return [] - - if os.path.exists(tcp_file): - with open(tcp_file) as f: - tcp_data = f.readlines()[1:] - - for con in tcp_data: - con_info = con.strip().split() - if con_info[3] == listening_state[protocol]: - local_port = int(con_info[1].split(':')[1], 16) - listening_ports.append(local_port) - - return listening_ports - - @property - def tcp_ports_used(self) -> List[int]: - return HostFacts._process_net_data('/proc/net/tcp') - - @property - def tcp6_ports_used(self) -> List[int]: - return HostFacts._process_net_data('/proc/net/tcp6') - - @property - def udp_ports_used(self) -> List[int]: - return HostFacts._process_net_data('/proc/net/udp', 'udp') - - @property - def udp6_ports_used(self) -> List[int]: - return HostFacts._process_net_data('/proc/net/udp6', 'udp') - - def dump(self): - # type: () -> str - """Return the attributes of this HostFacts object as json""" - data = { - k: getattr(self, k) for k in dir(self) - if not k.startswith('_') - and isinstance(getattr(self, k), (float, int, str, list, dict, tuple)) - } - return json.dumps(data, indent=2, sort_keys=True) ################################## @@ -10257,6 +7180,11 @@ def _get_parser(): action='store_true', help='Show debug-level log messages') parser.add_argument( + '--log-dest', + action='append', + choices=[v.name for v in LogDestination], + help='select one or more destination for persistent logging') + parser.add_argument( '--timeout', type=int, default=DEFAULT_TIMEOUT, @@ -10287,6 +7215,11 @@ def _get_parser(): parser_version = subparsers.add_parser( 'version', help='get cephadm version') parser_version.set_defaults(func=command_version) + parser_version.add_argument( + '--verbose', + action='store_true', + help='Detailed version information', + ) parser_pull = subparsers.add_parser( 'pull', help='pull the default container image') @@ -10456,6 +7389,10 @@ def _get_parser(): '--no-hosts', action='store_true', help='dont pass /etc/hosts through to the container') + parser_shell.add_argument( + '--dry-run', + action='store_true', + help='print, but do not execute, the container command to start the shell') parser_enter = subparsers.add_parser( 'enter', help='run an interactive shell inside a running daemon container') @@ -10520,6 +7457,10 @@ def _get_parser(): required=True, help='daemon name (type.id)') + parser_unit_install = subparsers.add_parser( + 'unit-install', help="Install the daemon's systemd unit") + parser_unit_install.set_defaults(func=command_unit_install) + parser_logs = subparsers.add_parser( 'logs', help='print journald logs for a daemon container') parser_logs.set_defaults(func=command_logs) @@ -10652,22 +7593,10 @@ def _get_parser(): '--allow-overwrite', action='store_true', help='allow overwrite of existing --output-* config/keyring/ssh files') - # following logic to have both '--cleanup-on-failure' and '--no-cleanup-on-failure' - # has been included in argparse of python v3.9, however since we have to support - # older python versions the following is more generic. Once python v3.9 becomes - # the minium supported version we can implement the same by using the new option - # argparse.BooleanOptionalAction - group = parser_bootstrap.add_mutually_exclusive_group() - group.add_argument( - '--cleanup-on-failure', - action='store_true', - default=True, - help='Delete cluster files in case of a failed installation') - group.add_argument( + parser_bootstrap.add_argument( '--no-cleanup-on-failure', - action='store_const', - const=False, - dest='cleanup_on_failure', + action='store_true', + default=False, help='Do not delete cluster files in case of a failed installation') parser_bootstrap.add_argument( '--allow-fqdn-hostname', @@ -10897,43 +7826,6 @@ def cephadm_init_ctx(args: List[str]) -> CephadmContext: return ctx -def cephadm_init_logging(ctx: CephadmContext, args: List[str]) -> None: - """Configure the logging for cephadm as well as updating the system - to have the expected log dir and logrotate configuration. - """ - logging.addLevelName(QUIET_LOG_LEVEL, 'QUIET') - global logger - if not os.path.exists(LOG_DIR): - os.makedirs(LOG_DIR) - operations = ['bootstrap', 'rm-cluster'] - if any(op in args for op in operations): - dictConfig(interactive_logging_config) - else: - dictConfig(logging_config) - - logger = logging.getLogger() - logger.setLevel(QUIET_LOG_LEVEL) - - if not os.path.exists(ctx.logrotate_dir + '/cephadm'): - with open(ctx.logrotate_dir + '/cephadm', 'w') as f: - f.write("""# created by cephadm -/var/log/ceph/cephadm.log { - rotate 7 - daily - compress - missingok - notifempty - su root root -} -""") - - if ctx.verbose: - for handler in logger.handlers: - if handler.name in ['console', 'log_file', 'console_stdout']: - handler.setLevel(QUIET_LOG_LEVEL) - logger.debug('%s\ncephadm %s' % ('-' * 80, args)) - - def cephadm_require_root() -> None: """Exit if the process is not running as root.""" if os.geteuid() != 0: @@ -10960,7 +7852,7 @@ def main() -> None: sys.exit(1) cephadm_require_root() - cephadm_init_logging(ctx, av) + cephadm_init_logging(ctx, logger, av) try: # podman or docker? ctx.container_engine = find_container_engine(ctx) |