summaryrefslogtreecommitdiffstats
path: root/src/cephadm/cephadm.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/cephadm/cephadm.py')
-rwxr-xr-xsrc/cephadm/cephadm.py6524
1 files changed, 1708 insertions, 4816 deletions
diff --git a/src/cephadm/cephadm.py b/src/cephadm/cephadm.py
index 5ad4712b3cb..348f581f9e6 100755
--- a/src/cephadm/cephadm.py
+++ b/src/cephadm/cephadm.py
@@ -1,18 +1,12 @@
#!/usr/bin/python3
-import asyncio
-import asyncio.subprocess
import argparse
import datetime
-import fcntl
import ipaddress
import io
import json
import logging
-from logging.config import dictConfig
import os
-import platform
-import pwd
import random
import shlex
import shutil
@@ -23,16 +17,13 @@ import sys
import tempfile
import time
import errno
-import struct
import ssl
-from enum import Enum
-from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO, Sequence, TypeVar, cast, Set, Iterable, TextIO, Generator
+from typing import Dict, List, Tuple, Optional, Union, Any, Callable, IO, Sequence, TypeVar, cast, Iterable, TextIO
import re
import uuid
-from configparser import ConfigParser
-from contextlib import redirect_stdout, contextmanager
+from contextlib import redirect_stdout
from functools import wraps
from glob import glob
from io import StringIO
@@ -41,115 +32,158 @@ from urllib.error import HTTPError, URLError
from urllib.request import urlopen, Request
from pathlib import Path
-FuncT = TypeVar('FuncT', bound=Callable)
-
-# Default container images -----------------------------------------------------
-DEFAULT_IMAGE = 'quay.ceph.io/ceph-ci/ceph:main'
-DEFAULT_IMAGE_IS_MAIN = True
-DEFAULT_IMAGE_RELEASE = 'reef'
-DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.43.0'
-DEFAULT_LOKI_IMAGE = 'docker.io/grafana/loki:2.4.0'
-DEFAULT_PROMTAIL_IMAGE = 'docker.io/grafana/promtail:2.4.0'
-DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.5.0'
-DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.25.0'
-DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/ceph-grafana:9.4.7'
-DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
-DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
-DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:0.0.1'
-DEFAULT_SNMP_GATEWAY_IMAGE = 'docker.io/maxwo/snmp-notifier:v1.2.1'
-DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
-DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
-DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29'
-DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29'
-DEFAULT_REGISTRY = 'docker.io' # normalize unqualified digests to this
-# ------------------------------------------------------------------------------
-
-LATEST_STABLE_RELEASE = 'reef'
-DATA_DIR = '/var/lib/ceph'
-LOG_DIR = '/var/log/ceph'
-LOCK_DIR = '/run/cephadm'
-LOGROTATE_DIR = '/etc/logrotate.d'
-SYSCTL_DIR = '/etc/sysctl.d'
-UNIT_DIR = '/etc/systemd/system'
-CEPH_CONF_DIR = 'config'
-CEPH_CONF = 'ceph.conf'
-CEPH_PUBKEY = 'ceph.pub'
-CEPH_KEYRING = 'ceph.client.admin.keyring'
-CEPH_DEFAULT_CONF = f'/etc/ceph/{CEPH_CONF}'
-CEPH_DEFAULT_KEYRING = f'/etc/ceph/{CEPH_KEYRING}'
-CEPH_DEFAULT_PUBKEY = f'/etc/ceph/{CEPH_PUBKEY}'
-LOG_DIR_MODE = 0o770
-DATA_DIR_MODE = 0o700
-DEFAULT_MODE = 0o600
-CONTAINER_INIT = True
-MIN_PODMAN_VERSION = (2, 0, 2)
-CGROUPS_SPLIT_PODMAN_VERSION = (2, 1, 0)
-PIDS_LIMIT_UNLIMITED_PODMAN_VERSION = (3, 4, 1)
-CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ '
-DEFAULT_TIMEOUT = None # in seconds
-DEFAULT_RETRY = 15
-DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ'
-QUIET_LOG_LEVEL = 9 # DEBUG is 10, so using 9 to be lower level than DEBUG
-NO_DEPRECATED = False
-
-logger: logging.Logger = None # type: ignore
-
-"""
-You can invoke cephadm in two ways:
-
-1. The normal way, at the command line.
-
-2. By piping the script to the python3 binary. In this latter case, you should
- prepend one or more lines to the beginning of the script.
-
- For arguments,
+from cephadmlib.constants import (
+ # default images
+ DEFAULT_ALERT_MANAGER_IMAGE,
+ DEFAULT_ELASTICSEARCH_IMAGE,
+ DEFAULT_GRAFANA_IMAGE,
+ DEFAULT_HAPROXY_IMAGE,
+ DEFAULT_IMAGE,
+ DEFAULT_IMAGE_IS_MAIN,
+ DEFAULT_IMAGE_RELEASE,
+ DEFAULT_JAEGER_AGENT_IMAGE,
+ DEFAULT_JAEGER_COLLECTOR_IMAGE,
+ DEFAULT_JAEGER_QUERY_IMAGE,
+ DEFAULT_KEEPALIVED_IMAGE,
+ DEFAULT_LOKI_IMAGE,
+ DEFAULT_NODE_EXPORTER_IMAGE,
+ DEFAULT_NVMEOF_IMAGE,
+ DEFAULT_PROMETHEUS_IMAGE,
+ DEFAULT_PROMTAIL_IMAGE,
+ DEFAULT_SNMP_GATEWAY_IMAGE,
+ # other constant values
+ CEPH_CONF,
+ CEPH_CONF_DIR,
+ CEPH_DEFAULT_CONF,
+ CEPH_DEFAULT_KEYRING,
+ CEPH_DEFAULT_PUBKEY,
+ CEPH_KEYRING,
+ CEPH_PUBKEY,
+ CONTAINER_INIT,
+ CUSTOM_PS1,
+ DATA_DIR,
+ DATA_DIR_MODE,
+ DATEFMT,
+ DEFAULT_RETRY,
+ DEFAULT_TIMEOUT,
+ LATEST_STABLE_RELEASE,
+ LOGROTATE_DIR,
+ LOG_DIR,
+ LOG_DIR_MODE,
+ SYSCTL_DIR,
+ UNIT_DIR,
+)
+from cephadmlib.context import CephadmContext
+from cephadmlib.context_getters import (
+ fetch_configs,
+ fetch_custom_config_files,
+ fetch_endpoints,
+ fetch_meta,
+ get_config_and_keyring,
+ get_parm,
+ read_configuration_source,
+ should_log_to_journald,
+)
+from cephadmlib.exceptions import (
+ ClusterAlreadyExists,
+ Error,
+ UnauthorizedRegistryError,
+)
+from cephadmlib.exe_utils import find_executable, find_program
+from cephadmlib.call_wrappers import (
+ CallVerbosity,
+ async_run,
+ call,
+ call_throws,
+ call_timeout,
+ concurrent_tasks,
+)
+from cephadmlib.container_engines import (
+ Docker,
+ Podman,
+ check_container_engine,
+ find_container_engine,
+ pull_command,
+ registry_login,
+)
+from cephadmlib.data_utils import (
+ dict_get,
+ dict_get_join,
+ get_legacy_config_fsid,
+ is_fsid,
+ normalize_image_digest,
+ try_convert_datetime,
+ read_config,
+ with_units_to_int,
+)
+from cephadmlib.file_utils import (
+ get_file_timestamp,
+ makedirs,
+ pathify,
+ populate_files,
+ read_file,
+ recursive_chown,
+ touch,
+ write_new,
+ write_tmp,
+)
+from cephadmlib.net_utils import (
+ build_addrv_params,
+ EndPoint,
+ check_ip_port,
+ check_subnet,
+ get_fqdn,
+ get_hostname,
+ get_ip_addresses,
+ get_short_hostname,
+ ip_in_subnets,
+ is_ipv6,
+ parse_mon_addrv,
+ parse_mon_ip,
+ port_in_use,
+ unwrap_ipv6,
+ wrap_ipv6,
+)
+from cephadmlib.locking import FileLock
+from cephadmlib.daemon_identity import DaemonIdentity, DaemonSubIdentity
+from cephadmlib.packagers import create_packager, Packager
+from cephadmlib.logging import cephadm_init_logging, Highlight, LogDestination
+from cephadmlib.systemd import check_unit, check_units
+from cephadmlib.container_types import (
+ CephContainer,
+ InitContainer,
+ is_container_running,
+ extract_uid_gid,
+)
+from cephadmlib.decorators import (
+ deprecated_command,
+ executes_early,
+ require_image
+)
+from cephadmlib.host_facts import HostFacts, list_networks
+from cephadmlib.ssh import authorize_ssh_key, check_ssh_connectivity
+from cephadmlib.daemon_form import (
+ DaemonForm,
+ UnexpectedDaemonTypeError,
+ create as daemon_form_create,
+ register as register_daemon_form,
+)
+from cephadmlib.deploy import DeploymentType
+from cephadmlib.container_daemon_form import ContainerDaemonForm
+from cephadmlib.sysctl import install_sysctl, migrate_sysctl_dir
+from cephadmlib.firewalld import Firewalld, update_firewalld
+from cephadmlib import templating
- injected_argv = [...]
- e.g.,
-
- injected_argv = ['ls']
+FuncT = TypeVar('FuncT', bound=Callable)
- For reading stdin from the '--config-json -' argument,
- injected_stdin = '...'
-"""
-cached_stdin = None
+logger = logging.getLogger()
##################################
-async def run_func(func: Callable, cmd: str) -> subprocess.CompletedProcess:
- logger.debug(f'running function {func.__name__}, with parms: {cmd}')
- response = func(cmd)
- return response
-
-
-async def concurrent_tasks(func: Callable, cmd_list: List[str]) -> List[Any]:
- tasks = []
- for cmd in cmd_list:
- tasks.append(run_func(func, cmd))
-
- data = await asyncio.gather(*tasks)
-
- return data
-
-
-class EndPoint:
- """EndPoint representing an ip:port format"""
-
- def __init__(self, ip: str, port: int) -> None:
- self.ip = ip
- self.port = port
-
- def __str__(self) -> str:
- return f'{self.ip}:{self.port}'
-
- def __repr__(self) -> str:
- return f'{self.ip}:{self.port}'
-
-
class ContainerInfo:
def __init__(self, container_id: str,
image_name: str,
@@ -171,235 +205,220 @@ class ContainerInfo:
and self.start == other.start
and self.version == other.version)
+##################################
-class DeploymentType(Enum):
- # Fresh deployment of a daemon.
- DEFAULT = 'Deploy'
- # Redeploying a daemon. Works the same as fresh
- # deployment minus port checking.
- REDEPLOY = 'Redeploy'
- # Reconfiguring a daemon. Rewrites config
- # files and potentially restarts daemon.
- RECONFIG = 'Reconfig'
-
-
-class BaseConfig:
-
- def __init__(self) -> None:
- self.image: str = ''
- self.docker: bool = False
- self.data_dir: str = DATA_DIR
- self.log_dir: str = LOG_DIR
- self.logrotate_dir: str = LOGROTATE_DIR
- self.sysctl_dir: str = SYSCTL_DIR
- self.unit_dir: str = UNIT_DIR
- self.verbose: bool = False
- self.timeout: Optional[int] = DEFAULT_TIMEOUT
- self.retry: int = DEFAULT_RETRY
- self.env: List[str] = []
- self.memory_request: Optional[int] = None
- self.memory_limit: Optional[int] = None
- self.log_to_journald: Optional[bool] = None
-
- self.container_init: bool = CONTAINER_INIT
- self.container_engine: Optional[ContainerEngine] = None
-
- def set_from_args(self, args: argparse.Namespace) -> None:
- argdict: Dict[str, Any] = vars(args)
- for k, v in argdict.items():
- if hasattr(self, k):
- setattr(self, k, v)
-
-
-class CephadmContext:
-
- def __init__(self) -> None:
- self.__dict__['_args'] = None
- self.__dict__['_conf'] = BaseConfig()
-
- def set_args(self, args: argparse.Namespace) -> None:
- self._conf.set_from_args(args)
- self._args = args
-
- def has_function(self) -> bool:
- return 'func' in self._args
-
- def __contains__(self, name: str) -> bool:
- return hasattr(self, name)
-
- def __getattr__(self, name: str) -> Any:
- if '_conf' in self.__dict__ and hasattr(self._conf, name):
- return getattr(self._conf, name)
- elif '_args' in self.__dict__ and hasattr(self._args, name):
- return getattr(self._args, name)
- else:
- return super().__getattribute__(name)
-
- def __setattr__(self, name: str, value: Any) -> None:
- if hasattr(self._conf, name):
- setattr(self._conf, name, value)
- elif hasattr(self._args, name):
- setattr(self._args, name, value)
- else:
- super().__setattr__(name, value)
-
-
-class ContainerEngine:
- def __init__(self) -> None:
- self.path = find_program(self.EXE)
-
- @property
- def EXE(self) -> str:
- raise NotImplementedError()
- def __str__(self) -> str:
- return f'{self.EXE} ({self.path})'
+@register_daemon_form
+class Ceph(ContainerDaemonForm):
+ _daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
+ 'crash', 'cephfs-mirror')
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ # TODO: figure out a way to un-special-case osd
+ return daemon_type in cls._daemons and daemon_type != 'osd'
-class Podman(ContainerEngine):
- EXE = 'podman'
+ def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None:
+ self.ctx = ctx
+ self._identity = ident
+ self.user_supplied_config = False
- def __init__(self) -> None:
- super().__init__()
- self._version: Optional[Tuple[int, ...]] = None
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Ceph':
+ return cls(ctx, ident)
@property
- def version(self) -> Tuple[int, ...]:
- if self._version is None:
- raise RuntimeError('Please call `get_version` first')
- return self._version
-
- def get_version(self, ctx: CephadmContext) -> None:
- out, _, _ = call_throws(ctx, [self.path, 'version', '--format', '{{.Client.Version}}'], verbosity=CallVerbosity.QUIET)
- self._version = _parse_podman_version(out)
+ def identity(self) -> DaemonIdentity:
+ return self._identity
+
+ def firewall_service_name(self) -> str:
+ if self.identity.daemon_type == 'mon':
+ return 'ceph-mon'
+ elif self.identity.daemon_type in ['mgr', 'mds']:
+ return 'ceph'
+ return ''
- def __str__(self) -> str:
- version = '.'.join(map(str, self.version))
- return f'{self.EXE} ({self.path}) version {version}'
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ # previous to being a ContainerDaemonForm, this make_var_run
+ # call was hard coded in the deploy path. Eventually, it would be
+ # good to move this somwhere cleaner and avoid needing to know the
+ # uid/gid here.
+ uid, gid = self.uid_gid(ctx)
+ make_var_run(ctx, ctx.fsid, uid, gid)
+ ctr = get_container(ctx, self.identity)
+ ctr = to_deployment_container(ctx, ctr)
+ config_json = fetch_configs(ctx)
+ if self.identity.daemon_type == 'mon' and config_json is not None:
+ if 'crush_location' in config_json:
+ c_loc = config_json['crush_location']
+ # was originally "c.args.extend(['--set-crush-location', c_loc])"
+ # but that doesn't seem to persist in the object after it's passed
+ # in further function calls
+ ctr.args = ctr.args + ['--set-crush-location', c_loc]
+ return ctr
-class Docker(ContainerEngine):
- EXE = 'docker'
+ _uid_gid: Optional[Tuple[int, int]] = None
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ if self._uid_gid is None:
+ self._uid_gid = extract_uid_gid(ctx)
+ return self._uid_gid
-CONTAINER_PREFERENCE = (Podman, Docker) # prefer podman to docker
+ def config_and_keyring(
+ self, ctx: CephadmContext
+ ) -> Tuple[Optional[str], Optional[str]]:
+ return get_config_and_keyring(ctx)
+ def get_daemon_args(self) -> List[str]:
+ if self.identity.daemon_type == 'crash':
+ return []
+ r = [
+ '--setuser', 'ceph',
+ '--setgroup', 'ceph',
+ '--default-log-to-file=false',
+ ]
+ log_to_journald = should_log_to_journald(self.ctx)
+ if log_to_journald:
+ r += [
+ '--default-log-to-journald=true',
+ '--default-log-to-stderr=false',
+ ]
+ else:
+ r += [
+ '--default-log-to-stderr=true',
+ '--default-log-stderr-prefix=debug ',
+ ]
+ if self.identity.daemon_type == 'mon':
+ r += [
+ '--default-mon-cluster-log-to-file=false',
+ ]
+ if log_to_journald:
+ r += [
+ '--default-mon-cluster-log-to-journald=true',
+ '--default-mon-cluster-log-to-stderr=false',
+ ]
+ else:
+ r += ['--default-mon-cluster-log-to-stderr=true']
+ return r
-# During normal cephadm operations (cephadm ls, gather-facts, etc ) we use:
-# stdout: for JSON output only
-# stderr: for error, debug, info, etc
-logging_config = {
- 'version': 1,
- 'disable_existing_loggers': True,
- 'formatters': {
- 'cephadm': {
- 'format': '%(asctime)s %(thread)x %(levelname)s %(message)s'
- },
- },
- 'handlers': {
- 'console': {
- 'level': 'INFO',
- 'class': 'logging.StreamHandler',
- },
- 'log_file': {
- 'level': 'DEBUG',
- 'class': 'logging.handlers.WatchedFileHandler',
- 'formatter': 'cephadm',
- 'filename': '%s/cephadm.log' % LOG_DIR,
- }
- },
- 'loggers': {
- '': {
- 'level': 'DEBUG',
- 'handlers': ['console', 'log_file'],
- }
- }
-}
+ @staticmethod
+ def get_ceph_mounts(
+ ctx: CephadmContext,
+ ident: DaemonIdentity,
+ no_config: bool = False,
+ ) -> Dict[str, str]:
+ # Warning: This is a hack done for more expedient refactoring
+ mounts = _get_container_mounts_for_type(
+ ctx, ident.fsid, ident.daemon_type
+ )
+ data_dir = ident.data_dir(ctx.data_dir)
+ if ident.daemon_type == 'rgw':
+ cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % (
+ ident.daemon_id
+ )
+ else:
+ cdata_dir = '/var/lib/ceph/%s/ceph-%s' % (
+ ident.daemon_type,
+ ident.daemon_id,
+ )
+ if ident.daemon_type != 'crash':
+ mounts[data_dir] = cdata_dir + ':z'
+ if not no_config:
+ mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z'
+ if ident.daemon_type in [
+ 'rbd-mirror',
+ 'cephfs-mirror',
+ 'crash',
+ 'ceph-exporter',
+ ]:
+ # these do not search for their keyrings in a data directory
+ mounts[
+ data_dir + '/keyring'
+ ] = '/etc/ceph/ceph.client.%s.%s.keyring' % (
+ ident.daemon_type,
+ ident.daemon_id,
+ )
+ return mounts
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ cm = self.get_ceph_mounts(
+ ctx,
+ self.identity,
+ no_config=self.ctx.config and self.user_supplied_config,
+ )
+ mounts.update(cm)
-class ExcludeErrorsFilter(logging.Filter):
- def filter(self, record: logging.LogRecord) -> bool:
- """Only lets through log messages with log level below WARNING ."""
- return record.levelno < logging.WARNING
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.append(ctx.container_engine.unlimited_pids_option)
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ ident = self.identity
+ if ident.daemon_type == 'rgw':
+ name = 'client.rgw.%s' % ident.daemon_id
+ elif ident.daemon_type == 'rbd-mirror':
+ name = 'client.rbd-mirror.%s' % ident.daemon_id
+ elif ident.daemon_type == 'cephfs-mirror':
+ name = 'client.cephfs-mirror.%s' % ident.daemon_id
+ elif ident.daemon_type == 'crash':
+ name = 'client.crash.%s' % ident.daemon_id
+ elif ident.daemon_type in ['mon', 'mgr', 'mds', 'osd']:
+ name = ident.daemon_name
+ else:
+ raise ValueError(ident)
+ args.extend(['-n', name])
+ if ident.daemon_type != 'crash':
+ args.append('-f')
+ args.extend(self.get_daemon_args())
+
+ def customize_container_envs(
+ self, ctx: CephadmContext, envs: List[str]
+ ) -> None:
+ envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728')
-# When cephadm is used as standard binary (bootstrap, rm-cluster, etc) we use:
-# stdout: for debug and info
-# stderr: for errors and warnings
-interactive_logging_config = {
- 'version': 1,
- 'filters': {
- 'exclude_errors': {
- '()': ExcludeErrorsFilter
+ def default_entrypoint(self) -> str:
+ ep = {
+ 'rgw': '/usr/bin/radosgw',
+ 'rbd-mirror': '/usr/bin/rbd-mirror',
+ 'cephfs-mirror': '/usr/bin/cephfs-mirror',
}
- },
- 'disable_existing_loggers': True,
- 'formatters': {
- 'cephadm': {
- 'format': '%(asctime)s %(thread)x %(levelname)s %(message)s'
- },
- },
- 'handlers': {
- 'console_stdout': {
- 'level': 'INFO',
- 'class': 'logging.StreamHandler',
- 'filters': ['exclude_errors'],
- 'stream': sys.stdout
- },
- 'console_stderr': {
- 'level': 'WARNING',
- 'class': 'logging.StreamHandler',
- 'stream': sys.stderr
- },
- 'log_file': {
- 'level': 'DEBUG',
- 'class': 'logging.handlers.WatchedFileHandler',
- 'formatter': 'cephadm',
- 'filename': '%s/cephadm.log' % LOG_DIR,
- }
- },
- 'loggers': {
- '': {
- 'level': 'DEBUG',
- 'handlers': ['console_stdout', 'console_stderr', 'log_file'],
- }
- }
-}
-
-
-class termcolor:
- yellow = '\033[93m'
- red = '\033[31m'
- end = '\033[0m'
-
-
-class Error(Exception):
- pass
-
-
-class ClusterAlreadyExists(Exception):
- pass
-
-
-class TimeoutExpired(Error):
- pass
-
-
-class UnauthorizedRegistryError(Error):
- pass
+ daemon_type = self.identity.daemon_type
+ return ep.get(daemon_type) or f'/usr/bin/ceph-{daemon_type}'
##################################
-class Ceph(object):
- daemons = ('mon', 'mgr', 'osd', 'mds', 'rgw', 'rbd-mirror',
- 'crash', 'cephfs-mirror', 'ceph-exporter')
- gateways = ('iscsi', 'nfs', 'nvmeof')
+@register_daemon_form
+class OSD(Ceph):
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ # TODO: figure out a way to un-special-case osd
+ return daemon_type == 'osd'
-##################################
+ def __init__(
+ self,
+ ctx: CephadmContext,
+ ident: DaemonIdentity,
+ osd_fsid: Optional[str] = None,
+ ) -> None:
+ super().__init__(ctx, ident)
+ self._osd_fsid = osd_fsid
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'OSD':
+ osd_fsid = getattr(ctx, 'osd_fsid', None)
+ if osd_fsid is None:
+ logger.info(
+ 'Creating an OSD daemon form without an OSD FSID value'
+ )
+ return cls(ctx, ident, osd_fsid)
-class OSD(object):
@staticmethod
def get_sysctl_settings() -> List[str]:
return [
@@ -408,11 +427,19 @@ class OSD(object):
'kernel.pid_max = 4194304',
]
+ def firewall_service_name(self) -> str:
+ return 'ceph'
+
+ @property
+ def osd_fsid(self) -> Optional[str]:
+ return self._osd_fsid
+
##################################
-class SNMPGateway:
+@register_daemon_form
+class SNMPGateway(ContainerDaemonForm):
"""Defines an SNMP gateway between Prometheus and SNMP monitoring Frameworks"""
daemon_type = 'snmp-gateway'
SUPPORTED_VERSIONS = ['V2c', 'V3']
@@ -420,6 +447,10 @@ class SNMPGateway:
DEFAULT_PORT = 9464
env_filename = 'snmp-gateway.conf'
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
ctx: CephadmContext,
fsid: str,
@@ -454,6 +485,14 @@ class SNMPGateway:
assert cfgs # assert some config data was found
return cls(ctx, fsid, daemon_id, cfgs, ctx.image)
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'SNMPGateway':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
@staticmethod
def get_version(ctx: CephadmContext, fsid: str, daemon_id: str) -> Optional[str]:
"""Return the version of the notifier from it's http endpoint"""
@@ -485,7 +524,7 @@ class SNMPGateway:
@property
def port(self) -> int:
- endpoints = fetch_tcp_ports(self.ctx)
+ endpoints = fetch_endpoints(self.ctx)
if not endpoints:
return self.DEFAULT_PORT
return endpoints[0].port
@@ -552,9 +591,27 @@ class SNMPGateway:
if not self.destination:
raise Error('config is missing destination attribute(<ip>:<port>) of the target SNMP listener')
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return self.uid, self.gid
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.append(f'--env-file={self.conf_file_path}')
+
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_daemon_args())
+
##################################
-class Monitoring(object):
+@register_daemon_form
+class Monitoring(ContainerDaemonForm):
"""Define the configs for the monitoring containers"""
port_map = {
@@ -637,6 +694,10 @@ class Monitoring(object):
},
} # type: ignore
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return daemon_type in cls.components
+
@staticmethod
def get_version(ctx, container_id, daemon_type):
# type: (CephadmContext, str, str) -> str
@@ -669,57 +730,218 @@ class Monitoring(object):
version = out.split(' ')[2]
return version
+ @staticmethod
+ def extract_uid_gid(
+ ctx: CephadmContext, daemon_type: str
+ ) -> Tuple[int, int]:
+ if daemon_type == 'prometheus':
+ uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus')
+ elif daemon_type == 'node-exporter':
+ uid, gid = 65534, 65534
+ elif daemon_type == 'grafana':
+ uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana')
+ elif daemon_type == 'loki':
+ uid, gid = extract_uid_gid(ctx, file_path='/etc/loki')
+ elif daemon_type == 'promtail':
+ uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail')
+ elif daemon_type == 'alertmanager':
+ uid, gid = extract_uid_gid(
+ ctx, file_path=['/etc/alertmanager', '/etc/prometheus']
+ )
+ else:
+ raise Error('{} not implemented yet'.format(daemon_type))
+ return uid, gid
+
+ def __init__(self, ctx: CephadmContext, ident: DaemonIdentity) -> None:
+ self.ctx = ctx
+ self._identity = ident
+
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Monitoring':
+ return cls(ctx, ident)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return self._identity
+
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ self._prevalidate(ctx)
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return self.extract_uid_gid(ctx, self.identity.daemon_type)
+
+ def _prevalidate(self, ctx: CephadmContext) -> None:
+ # before being refactored into a ContainerDaemonForm these checks were
+ # done inside the deploy function. This was the only "family" of daemons
+ # that performed these checks in that location
+ daemon_type = self.identity.daemon_type
+ config = fetch_configs(ctx) # type: ignore
+ required_files = self.components[daemon_type].get(
+ 'config-json-files', list()
+ )
+ required_args = self.components[daemon_type].get(
+ 'config-json-args', list()
+ )
+ if required_files:
+ if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore
+ raise Error(
+ '{} deployment requires config-json which must '
+ 'contain file content for {}'.format(
+ daemon_type.capitalize(), ', '.join(required_files)
+ )
+ )
+ if required_args:
+ if not config or not all(c in config.keys() for c in required_args): # type: ignore
+ raise Error(
+ '{} deployment requires config-json which must '
+ 'contain arg for {}'.format(
+ daemon_type.capitalize(), ', '.join(required_args)
+ )
+ )
+
+ def get_daemon_args(self) -> List[str]:
+ ctx = self.ctx
+ daemon_type = self.identity.daemon_type
+ metadata = self.components[daemon_type]
+ r = list(metadata.get('args', []))
+ # set ip and port to bind to for nodeexporter,alertmanager,prometheus
+ if daemon_type not in ['grafana', 'loki', 'promtail']:
+ ip = ''
+ port = self.port_map[daemon_type][0]
+ meta = fetch_meta(ctx)
+ if meta:
+ if 'ip' in meta and meta['ip']:
+ ip = meta['ip']
+ if 'ports' in meta and meta['ports']:
+ port = meta['ports'][0]
+ r += [f'--web.listen-address={ip}:{port}']
+ if daemon_type == 'prometheus':
+ config = fetch_configs(ctx)
+ retention_time = config.get('retention_time', '15d')
+ retention_size = config.get('retention_size', '0') # default to disabled
+ r += [f'--storage.tsdb.retention.time={retention_time}']
+ r += [f'--storage.tsdb.retention.size={retention_size}']
+ scheme = 'http'
+ host = get_fqdn()
+ # in case host is not an fqdn then we use the IP to
+ # avoid producing a broken web.external-url link
+ if '.' not in host:
+ ipv4_addrs, ipv6_addrs = get_ip_addresses(get_hostname())
+ # use the first ipv4 (if any) otherwise use the first ipv6
+ addr = next(iter(ipv4_addrs or ipv6_addrs), None)
+ host = wrap_ipv6(addr) if addr else host
+ r += [f'--web.external-url={scheme}://{host}:{port}']
+ if daemon_type == 'alertmanager':
+ config = fetch_configs(ctx)
+ peers = config.get('peers', list()) # type: ignore
+ for peer in peers:
+ r += ['--cluster.peer={}'.format(peer)]
+ try:
+ r += [f'--web.config.file={config["web_config"]}']
+ except KeyError:
+ pass
+ # some alertmanager, by default, look elsewhere for a config
+ r += ['--config.file=/etc/alertmanager/alertmanager.yml']
+ if daemon_type == 'promtail':
+ r += ['--config.expand-env']
+ if daemon_type == 'prometheus':
+ config = fetch_configs(ctx)
+ try:
+ r += [f'--web.config.file={config["web_config"]}']
+ except KeyError:
+ pass
+ if daemon_type == 'node-exporter':
+ config = fetch_configs(ctx)
+ try:
+ r += [f'--web.config.file={config["web_config"]}']
+ except KeyError:
+ pass
+ r += ['--path.procfs=/host/proc',
+ '--path.sysfs=/host/sys',
+ '--path.rootfs=/rootfs']
+ return r
+
+ def _get_container_mounts(self, data_dir: str) -> Dict[str, str]:
+ ctx = self.ctx
+ daemon_type = self.identity.daemon_type
+ mounts: Dict[str, str] = {}
+ log_dir = get_log_dir(self.identity.fsid, ctx.log_dir)
+ if daemon_type == 'prometheus':
+ mounts[
+ os.path.join(data_dir, 'etc/prometheus')
+ ] = '/etc/prometheus:Z'
+ mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z'
+ elif daemon_type == 'loki':
+ mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z'
+ mounts[os.path.join(data_dir, 'data')] = '/loki:Z'
+ elif daemon_type == 'promtail':
+ mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z'
+ mounts[log_dir] = '/var/log/ceph:z'
+ mounts[os.path.join(data_dir, 'data')] = '/promtail:Z'
+ elif daemon_type == 'node-exporter':
+ mounts[
+ os.path.join(data_dir, 'etc/node-exporter')
+ ] = '/etc/node-exporter:Z'
+ mounts['/proc'] = '/host/proc:ro'
+ mounts['/sys'] = '/host/sys:ro'
+ mounts['/'] = '/rootfs:ro'
+ elif daemon_type == 'grafana':
+ mounts[
+ os.path.join(data_dir, 'etc/grafana/grafana.ini')
+ ] = '/etc/grafana/grafana.ini:Z'
+ mounts[
+ os.path.join(data_dir, 'etc/grafana/provisioning/datasources')
+ ] = '/etc/grafana/provisioning/datasources:Z'
+ mounts[
+ os.path.join(data_dir, 'etc/grafana/certs')
+ ] = '/etc/grafana/certs:Z'
+ mounts[
+ os.path.join(data_dir, 'data/grafana.db')
+ ] = '/var/lib/grafana/grafana.db:Z'
+ elif daemon_type == 'alertmanager':
+ mounts[
+ os.path.join(data_dir, 'etc/alertmanager')
+ ] = '/etc/alertmanager:Z'
+ return mounts
+
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ mounts.update(self._get_container_mounts(data_dir))
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ uid, _ = self.uid_gid(ctx)
+ monitoring_args = [
+ '--user',
+ str(uid),
+ # FIXME: disable cpu/memory limits for the time being (not supported
+ # by ubuntu 18.04 kernel!)
+ ]
+ args.extend(monitoring_args)
+ if self.identity.daemon_type == 'node-exporter':
+ # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys',
+ # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation
+ # between the node-exporter container and the host to avoid selinux denials
+ args.extend(['--security-opt', 'label=disable'])
+
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_daemon_args())
+
+ def default_entrypoint(self) -> str:
+ return ''
+
##################################
-@contextmanager
-def write_new(
- destination: Union[str, Path],
- *,
- owner: Optional[Tuple[int, int]] = None,
- perms: Optional[int] = DEFAULT_MODE,
- encoding: Optional[str] = None,
-) -> Generator[IO, None, None]:
- """Write a new file in a robust manner, optionally specifying the owner,
- permissions, or encoding. This function takes care to never leave a file in
- a partially-written state due to a crash or power outage by writing to
- temporary file and then renaming that temp file over to the final
- destination once all data is written. Note that the temporary files can be
- leaked but only for a "crash" or power outage - regular exceptions will
- clean up the temporary file.
- """
- destination = os.path.abspath(destination)
- tempname = f'{destination}.new'
- open_kwargs: Dict[str, Any] = {}
- if encoding:
- open_kwargs['encoding'] = encoding
- try:
- with open(tempname, 'w', **open_kwargs) as fh:
- yield fh
- fh.flush()
- os.fsync(fh.fileno())
- if owner is not None:
- os.fchown(fh.fileno(), *owner)
- if perms is not None:
- os.fchmod(fh.fileno(), perms)
- except Exception:
- os.unlink(tempname)
- raise
- os.rename(tempname, destination)
-
-
-def populate_files(config_dir, config_files, uid, gid):
- # type: (str, Dict, int, int) -> None
- """create config files for different services"""
- for fname in config_files:
- config_file = os.path.join(config_dir, fname)
- config_content = dict_get_join(config_files, fname)
- logger.info('Write file: %s' % (config_file))
- with write_new(config_file, owner=(uid, gid), encoding='utf-8') as f:
- f.write(config_content)
-
-
-class NFSGanesha(object):
+@register_daemon_form
+class NFSGanesha(ContainerDaemonForm):
"""Defines a NFS-Ganesha container"""
daemon_type = 'nfs'
@@ -732,6 +954,10 @@ class NFSGanesha(object):
'nfs': 2049,
}
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
ctx,
fsid,
@@ -760,7 +986,15 @@ class NFSGanesha(object):
# type: (CephadmContext, str, Union[int, str]) -> NFSGanesha
return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image)
- def get_container_mounts(self, data_dir):
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'NFSGanesha':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
+ def _get_container_mounts(self, data_dir):
# type: (str) -> Dict[str, str]
mounts = dict()
mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
@@ -773,6 +1007,12 @@ class NFSGanesha(object):
'/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user)
return mounts
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ mounts.update(self._get_container_mounts(data_dir))
+
@staticmethod
def get_container_envs():
# type: () -> List[str]
@@ -853,10 +1093,52 @@ class NFSGanesha(object):
with write_new(keyring_path, owner=(uid, gid)) as f:
f.write(self.rgw.get('keyring', ''))
+ def firewall_service_name(self) -> str:
+ return 'nfs'
+
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def customize_container_endpoints(
+ self, endpoints: List[EndPoint], deployment_type: DeploymentType
+ ) -> None:
+ if deployment_type == DeploymentType.DEFAULT and not endpoints:
+ nfs_ports = list(NFSGanesha.port_map.values())
+ endpoints.extend([EndPoint('0.0.0.0', p) for p in nfs_ports])
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ # TODO: extract ganesha uid/gid (997, 994) ?
+ return extract_uid_gid(ctx)
+
+ def config_and_keyring(
+ self, ctx: CephadmContext
+ ) -> Tuple[Optional[str], Optional[str]]:
+ return get_config_and_keyring(ctx)
+
+ def customize_container_envs(
+ self, ctx: CephadmContext, envs: List[str]
+ ) -> None:
+ envs.extend(self.get_container_envs())
+
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_daemon_args())
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.append(ctx.container_engine.unlimited_pids_option)
+
+ def default_entrypoint(self) -> str:
+ return self.entrypoint
+
##################################
-class CephIscsi(object):
+@register_daemon_form
+class CephIscsi(ContainerDaemonForm):
"""Defines a Ceph-Iscsi container"""
daemon_type = 'iscsi'
@@ -864,6 +1146,10 @@ class CephIscsi(object):
required_files = ['iscsi-gateway.cfg']
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
ctx,
fsid,
@@ -888,28 +1174,48 @@ class CephIscsi(object):
return cls(ctx, fsid, daemon_id,
fetch_configs(ctx), ctx.image)
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephIscsi':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
@staticmethod
- def get_container_mounts(data_dir, log_dir):
+ def _get_container_mounts(data_dir, log_dir):
# type: (str, str) -> Dict[str, str]
mounts = dict()
mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
mounts[os.path.join(data_dir, 'iscsi-gateway.cfg')] = '/etc/ceph/iscsi-gateway.cfg:z'
mounts[os.path.join(data_dir, 'configfs')] = '/sys/kernel/config'
+ mounts[os.path.join(data_dir, 'tcmu-runner-entrypoint.sh')] = '/usr/local/scripts/tcmu-runner-entrypoint.sh'
mounts[log_dir] = '/var/log:z'
mounts['/dev'] = '/dev'
return mounts
- @staticmethod
- def get_container_binds():
- # type: () -> List[List[str]]
- binds = []
- lib_modules = ['type=bind',
- 'source=/lib/modules',
- 'destination=/lib/modules',
- 'ro=true']
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ # Removes ending ".tcmu" from data_dir a tcmu-runner uses the same
+ # data_dir as rbd-runner-api
+ if data_dir.endswith('.tcmu'):
+ data_dir = re.sub(r'\.tcmu$', '', data_dir)
+ log_dir = get_log_dir(self.identity.fsid, ctx.log_dir)
+ mounts.update(CephIscsi._get_container_mounts(data_dir, log_dir))
+
+ def customize_container_binds(
+ self, ctx: CephadmContext, binds: List[List[str]]
+ ) -> None:
+ lib_modules = [
+ 'type=bind',
+ 'source=/lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true',
+ ]
binds.append(lib_modules)
- return binds
@staticmethod
def get_version(ctx, container_id):
@@ -960,9 +1266,19 @@ class CephIscsi(object):
configfs_dir = os.path.join(data_dir, 'configfs')
makedirs(configfs_dir, uid, gid, 0o755)
+ # set up the tcmu-runner entrypoint script
+ # to be mounted into the container. For more info
+ # on why we need this script, see the
+ # tcmu_runner_entrypoint_script function
+ self.files['tcmu-runner-entrypoint.sh'] = self.tcmu_runner_entrypoint_script()
+
# populate files from the config-json
populate_files(data_dir, self.files, uid, gid)
+ # we want the tcmu runner entrypoint script to be executable
+ # populate_files will give it 0o600 by default
+ os.chmod(os.path.join(data_dir, 'tcmu-runner-entrypoint.sh'), 0o700)
+
@staticmethod
def configfs_mount_umount(data_dir, mount=True):
# type: (str, bool) -> List[str]
@@ -975,27 +1291,96 @@ class CephIscsi(object):
'umount {0}; fi'.format(mount_path)
return cmd.split()
+ @staticmethod
+ def tcmu_runner_entrypoint_script() -> str:
+ # since we are having tcmu-runner be a background
+ # process in its systemd unit (rbd-target-api being
+ # the main process) systemd will not restart it when
+ # it fails. in order to try and get around that for now
+ # we can have a script mounted in the container that
+ # that attempts to do the restarting for us. This script
+ # can then become the entrypoint for the tcmu-runner
+ # container
+
+ # This is intended to be dropped for a better solution
+ # for at least the squid release onward
+ return """#!/bin/bash
+RUN_DIR=/var/run/tcmu-runner
+
+if [ ! -d "${RUN_DIR}" ] ; then
+ mkdir -p "${RUN_DIR}"
+fi
+
+rm -rf "${RUN_DIR}"/*
+
+while true
+do
+ touch "${RUN_DIR}"/start-up-$(date -Ins)
+ /usr/bin/tcmu-runner
+
+ # If we got around 3 kills/segfaults in the last minute,
+ # don't start anymore
+ if [ $(find "${RUN_DIR}" -type f -cmin -1 | wc -l) -ge 3 ] ; then
+ exit 0
+ fi
+
+ sleep 1
+done
+"""
+
def get_tcmu_runner_container(self):
# type: () -> CephContainer
# daemon_id, is used to generated the cid and pid files used by podman but as both tcmu-runner
# and rbd-target-api have the same daemon_id, it conflits and prevent the second container from
# starting. .tcmu runner is appended to the daemon_id to fix that.
- tcmu_container = get_deployment_container(self.ctx, self.fsid, self.daemon_type, str(self.daemon_id) + '.tcmu')
- tcmu_container.entrypoint = '/usr/bin/tcmu-runner'
+ subident = DaemonSubIdentity(
+ self.fsid, self.daemon_type, self.daemon_id, 'tcmu'
+ )
+ tcmu_container = to_deployment_container(
+ self.ctx, get_container(self.ctx, subident)
+ )
+ # TODO: Eventually we don't want to run tcmu-runner through this script.
+ # This is intended to be a workaround backported to older releases
+ # and should eventually be removed in at least squid onward
+ tcmu_container.entrypoint = '/usr/local/scripts/tcmu-runner-entrypoint.sh'
tcmu_container.cname = self.get_container_name(desc='tcmu')
return tcmu_container
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def config_and_keyring(
+ self, ctx: CephadmContext
+ ) -> Tuple[Optional[str], Optional[str]]:
+ return get_config_and_keyring(ctx)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return extract_uid_gid(ctx)
+
+ def default_entrypoint(self) -> str:
+ return self.entrypoint
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.append(ctx.container_engine.unlimited_pids_option)
##################################
-class CephNvmeof(object):
+@register_daemon_form
+class CephNvmeof(ContainerDaemonForm):
"""Defines a Ceph-Nvmeof container"""
daemon_type = 'nvmeof'
required_files = ['ceph-nvmeof.conf']
default_image = DEFAULT_NVMEOF_IMAGE
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
ctx,
fsid,
@@ -1020,8 +1405,16 @@ class CephNvmeof(object):
return cls(ctx, fsid, daemon_id,
fetch_configs(ctx), ctx.image)
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephNvmeof':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
@staticmethod
- def get_container_mounts(data_dir: str) -> Dict[str, str]:
+ def _get_container_mounts(data_dir: str) -> Dict[str, str]:
mounts = dict()
mounts[os.path.join(data_dir, 'config')] = '/etc/ceph/ceph.conf:z'
mounts[os.path.join(data_dir, 'keyring')] = '/etc/ceph/keyring:z'
@@ -1031,16 +1424,22 @@ class CephNvmeof(object):
mounts['/dev/vfio/vfio'] = '/dev/vfio/vfio'
return mounts
- @staticmethod
- def get_container_binds():
- # type: () -> List[List[str]]
- binds = []
- lib_modules = ['type=bind',
- 'source=/lib/modules',
- 'destination=/lib/modules',
- 'ro=true']
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ mounts.update(self._get_container_mounts(data_dir))
+
+ def customize_container_binds(
+ self, ctx: CephadmContext, binds: List[List[str]]
+ ) -> None:
+ lib_modules = [
+ 'type=bind',
+ 'source=/lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true',
+ ]
binds.append(lib_modules)
- return binds
@staticmethod
def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]:
@@ -1110,11 +1509,32 @@ class CephNvmeof(object):
'vm.nr_hugepages = 4096',
]
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return 167, 167 # TODO: need to get properly the uid/gid
+
+ def config_and_keyring(
+ self, ctx: CephadmContext
+ ) -> Tuple[Optional[str], Optional[str]]:
+ return get_config_and_keyring(ctx)
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.append(ctx.container_engine.unlimited_pids_option)
+ args.extend(['--ulimit', 'memlock=-1:-1'])
+ args.extend(['--ulimit', 'nofile=10240'])
+ args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE'])
+
##################################
-class CephExporter(object):
+@register_daemon_form
+class CephExporter(ContainerDaemonForm):
"""Defines a Ceph exporter container"""
daemon_type = 'ceph-exporter'
@@ -1124,6 +1544,10 @@ class CephExporter(object):
'ceph-exporter': DEFAULT_PORT,
}
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
ctx: CephadmContext,
fsid: str, daemon_id: Union[int, str],
@@ -1150,11 +1574,13 @@ class CephExporter(object):
return cls(ctx, fsid, daemon_id,
fetch_configs(ctx), ctx.image)
- @staticmethod
- def get_container_mounts() -> Dict[str, str]:
- mounts = dict()
- mounts['/var/run/ceph'] = '/var/run/ceph:z'
- return mounts
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephExporter':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
def get_daemon_args(self) -> List[str]:
args = [
@@ -1170,16 +1596,59 @@ class CephExporter(object):
if not os.path.isdir(self.sock_dir):
raise Error(f'Directory does not exist. Got: {self.sock_dir}')
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return extract_uid_gid(ctx)
+
+ def config_and_keyring(
+ self, ctx: CephadmContext
+ ) -> Tuple[Optional[str], Optional[str]]:
+ return get_config_and_keyring(ctx)
+
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ cm = Ceph.get_ceph_mounts(ctx, self.identity)
+ mounts.update(cm)
+
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ name = 'client.ceph-exporter.%s' % self.identity.daemon_id
+ args.extend(['-n', name, '-f'])
+ args.extend(self.get_daemon_args())
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.append(ctx.container_engine.unlimited_pids_option)
+
+ def customize_container_envs(
+ self, ctx: CephadmContext, envs: List[str]
+ ) -> None:
+ envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728')
+
+ def default_entrypoint(self) -> str:
+ return self.entrypoint
+
##################################
-class HAproxy(object):
+@register_daemon_form
+class HAproxy(ContainerDaemonForm):
"""Defines an HAproxy container"""
daemon_type = 'haproxy'
required_files = ['haproxy.cfg']
default_image = DEFAULT_HAPROXY_IMAGE
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
ctx: CephadmContext,
fsid: str, daemon_id: Union[int, str],
@@ -1200,6 +1669,14 @@ class HAproxy(object):
return cls(ctx, fsid, daemon_id, fetch_configs(ctx),
ctx.image)
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'HAproxy':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""Create files under the container data dir"""
if not os.path.isdir(data_dir):
@@ -1241,16 +1718,22 @@ class HAproxy(object):
cname = '%s-%s' % (cname, desc)
return cname
- def extract_uid_gid_haproxy(self) -> Tuple[int, int]:
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
# better directory for this?
return extract_uid_gid(self.ctx, file_path='/var/lib')
@staticmethod
- def get_container_mounts(data_dir: str) -> Dict[str, str]:
+ def _get_container_mounts(data_dir: str) -> Dict[str, str]:
mounts = dict()
mounts[os.path.join(data_dir, 'haproxy')] = '/var/lib/haproxy'
return mounts
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ mounts.update(self._get_container_mounts(data_dir))
+
@staticmethod
def get_sysctl_settings() -> List[str]:
return [
@@ -1259,15 +1742,37 @@ class HAproxy(object):
'net.ipv4.ip_nonlocal_bind = 1',
]
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(
+ ['--user=root']
+ ) # haproxy 2.4 defaults to a different user
+
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_daemon_args())
+
+
##################################
-class Keepalived(object):
+@register_daemon_form
+class Keepalived(ContainerDaemonForm):
"""Defines an Keepalived container"""
daemon_type = 'keepalived'
required_files = ['keepalived.conf']
default_image = DEFAULT_KEEPALIVED_IMAGE
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
ctx: CephadmContext,
fsid: str, daemon_id: Union[int, str],
@@ -1288,6 +1793,14 @@ class Keepalived(object):
return cls(ctx, fsid, daemon_id,
fetch_configs(ctx), ctx.image)
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Keepalived':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""Create files under the container data dir"""
if not os.path.isdir(data_dir):
@@ -1345,20 +1858,42 @@ class Keepalived(object):
'net.ipv4.ip_nonlocal_bind = 1',
]
- def extract_uid_gid_keepalived(self) -> Tuple[int, int]:
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
# better directory for this?
return extract_uid_gid(self.ctx, file_path='/var/lib')
@staticmethod
- def get_container_mounts(data_dir: str) -> Dict[str, str]:
+ def _get_container_mounts(data_dir: str) -> Dict[str, str]:
mounts = dict()
mounts[os.path.join(data_dir, 'keepalived.conf')] = '/etc/keepalived/keepalived.conf'
return mounts
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ mounts.update(self._get_container_mounts(data_dir))
+
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def customize_container_envs(
+ self, ctx: CephadmContext, envs: List[str]
+ ) -> None:
+ envs.extend(self.get_container_envs())
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW'])
+
+
##################################
-class Tracing(object):
+@register_daemon_form
+class Tracing(ContainerDaemonForm):
"""Define the configs for the jaeger tracing containers"""
components: Dict[str, Dict[str, Any]] = {
@@ -1377,6 +1912,10 @@ class Tracing(object):
},
} # type: ignore
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return daemon_type in cls.components
+
@staticmethod
def set_configuration(config: Dict[str, str], daemon_type: str) -> None:
if daemon_type in ['jaeger-collector', 'jaeger-query']:
@@ -1391,13 +1930,75 @@ class Tracing(object):
'--processor.jaeger-compact.server-host-port=6799'
]
+ def __init__(self, ident: DaemonIdentity) -> None:
+ self._identity = ident
+ self._configured = False
+
+ def _configure(self, ctx: CephadmContext) -> None:
+ if self._configured:
+ return
+ config = fetch_configs(ctx)
+ # Currently, this method side-effects the class attribute, and that
+ # is unpleasant. In the future it would be nice to move all of
+ # set_configuration into _confiure and only modify each classes data
+ # independently
+ self.set_configuration(config, self.identity.daemon_type)
+ self._configured = True
+
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'Tracing':
+ return cls(ident)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return self._identity
+
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ ctr = get_container(ctx, self.identity)
+ return to_deployment_container(ctx, ctr)
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return 65534, 65534
+
+ def get_daemon_args(self) -> List[str]:
+ return self.components[self.identity.daemon_type].get(
+ 'daemon_args', []
+ )
+
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ self._configure(ctx)
+ # earlier code did an explicit check if the daemon type was jaeger-agent
+ # and would only call get_daemon_args if that was true. However, since
+ # the function only returns a non-empty list in the case of jaeger-agent
+ # that check is unnecessary and is not brought over.
+ args.extend(self.get_daemon_args())
+
+ def customize_container_envs(
+ self, ctx: CephadmContext, envs: List[str]
+ ) -> None:
+ self._configure(ctx)
+ envs.extend(
+ self.components[self.identity.daemon_type].get('envs', [])
+ )
+
+ def default_entrypoint(self) -> str:
+ return ''
+
+
##################################
-class CustomContainer(object):
+@register_daemon_form
+class CustomContainer(ContainerDaemonForm):
"""Defines a custom container"""
daemon_type = 'container'
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
def __init__(self,
fsid: str, daemon_id: Union[int, str],
config_json: Dict, image: str) -> None:
@@ -1424,6 +2025,14 @@ class CustomContainer(object):
return cls(fsid, daemon_id,
fetch_configs(ctx), ctx.image)
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CustomContainer':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
"""
Create dirs/files below the container data directory.
@@ -1455,7 +2064,7 @@ class CustomContainer(object):
def get_container_envs(self) -> List[str]:
return self.envs
- def get_container_mounts(self, data_dir: str) -> Dict[str, str]:
+ def _get_container_mounts(self, data_dir: str) -> Dict[str, str]:
"""
Get the volume mounts. Relative source paths will be located below
`/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
@@ -1477,7 +2086,13 @@ class CustomContainer(object):
mounts[source] = destination
return mounts
- def get_container_binds(self, data_dir: str) -> List[List[str]]:
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ mounts.update(self._get_container_mounts(data_dir))
+
+ def _get_container_binds(self, data_dir: str) -> List[List[str]]:
"""
Get the bind mounts. Relative `source=...` paths will be located below
`/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
@@ -1505,60 +2120,71 @@ class CustomContainer(object):
data_dir, match.group(1)))
return binds
-##################################
+ def customize_container_binds(
+ self, ctx: CephadmContext, binds: List[List[str]]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ binds.extend(self._get_container_binds(data_dir))
+ # Cache the container so we don't need to rebuild it again when calling
+ # into init_containers
+ _container: Optional[CephContainer] = None
-def touch(file_path: str, uid: Optional[int] = None, gid: Optional[int] = None) -> None:
- Path(file_path).touch()
- if uid and gid:
- os.chown(file_path, uid, gid)
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ if self._container is None:
+ ctr = get_container(
+ ctx,
+ self.identity,
+ privileged=self.privileged,
+ ptrace=ctx.allow_ptrace,
+ )
+ self._container = to_deployment_container(ctx, ctr)
+ return self._container
+ def init_containers(self, ctx: CephadmContext) -> List[InitContainer]:
+ primary = self.container(ctx)
+ init_containers: List[Dict[str, Any]] = getattr(
+ ctx, 'init_containers', []
+ )
+ return [
+ InitContainer.from_primary_and_opts(ctx, primary, ic_opts)
+ for ic_opts in init_containers
+ ]
-##################################
+ def customize_container_endpoints(
+ self, endpoints: List[EndPoint], deployment_type: DeploymentType
+ ) -> None:
+ if deployment_type == DeploymentType.DEFAULT:
+ endpoints.extend([EndPoint('0.0.0.0', p) for p in self.ports])
+ def customize_container_envs(
+ self, ctx: CephadmContext, envs: List[str]
+ ) -> None:
+ envs.extend(self.get_container_envs())
-def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> Any:
- """
- Helper function to get a key from a dictionary.
- :param d: The dictionary to process.
- :param key: The name of the key to get.
- :param default: The default value in case the key does not
- exist. Default is `None`.
- :param require: Set to `True` if the key is required. An
- exception will be raised if the key does not exist in
- the given dictionary.
- :return: Returns the value of the given key.
- :raises: :exc:`self.Error` if the given key does not exist
- and `require` is set to `True`.
- """
- if require and key not in d.keys():
- raise Error('{} missing from dict'.format(key))
- return d.get(key, default) # type: ignore
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_container_args())
-##################################
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_daemon_args())
+ def default_entrypoint(self) -> str:
+ return self.entrypoint or ''
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return self.uid, self.gid
-def dict_get_join(d: Dict[str, Any], key: str) -> Any:
- """
- Helper function to get the value of a given key from a dictionary.
- `List` values will be converted to a string by joining them with a
- line break.
- :param d: The dictionary to process.
- :param key: The name of the key to get.
- :return: Returns the value of the given key. If it was a `list`, it
- will be joining with a line break.
- """
- value = d.get(key)
- if isinstance(value, list):
- value = '\n'.join(map(str, value))
- return value
##################################
def get_supported_daemons():
# type: () -> List[str]
- supported_daemons = list(Ceph.daemons)
+ supported_daemons = ceph_daemons()
supported_daemons.extend(Monitoring.components)
supported_daemons.append(NFSGanesha.daemon_type)
supported_daemons.append(CephIscsi.daemon_type)
@@ -1572,536 +2198,15 @@ def get_supported_daemons():
assert len(supported_daemons) == len(set(supported_daemons))
return supported_daemons
-##################################
-
-
-class PortOccupiedError(Error):
- pass
-
-
-def attempt_bind(ctx, s, address, port):
- # type: (CephadmContext, socket.socket, str, int) -> None
- try:
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- s.bind((address, port))
- except OSError as e:
- if e.errno == errno.EADDRINUSE:
- msg = 'Cannot bind to IP %s port %d: %s' % (address, port, e)
- logger.warning(msg)
- raise PortOccupiedError(msg)
- else:
- raise e
- except Exception as e:
- raise Error(e)
- finally:
- s.close()
-
-def port_in_use(ctx: CephadmContext, endpoint: EndPoint) -> bool:
- """Detect whether a port is in use on the local machine - IPv4 and IPv6"""
- logger.info('Verifying port %s ...' % str(endpoint))
-
- def _port_in_use(af: socket.AddressFamily, address: str) -> bool:
- try:
- s = socket.socket(af, socket.SOCK_STREAM)
- attempt_bind(ctx, s, address, endpoint.port)
- except PortOccupiedError:
- return True
- except OSError as e:
- if e.errno in (errno.EAFNOSUPPORT, errno.EADDRNOTAVAIL):
- # Ignore EAFNOSUPPORT and EADDRNOTAVAIL as two interfaces are
- # being tested here and one might be intentionally be disabled.
- # In that case no error should be raised.
- return False
- else:
- raise e
- return False
-
- if endpoint.ip != '0.0.0.0' and endpoint.ip != '::':
- if is_ipv6(endpoint.ip):
- return _port_in_use(socket.AF_INET6, endpoint.ip)
- else:
- return _port_in_use(socket.AF_INET, endpoint.ip)
-
- return any(_port_in_use(af, address) for af, address in (
- (socket.AF_INET, '0.0.0.0'),
- (socket.AF_INET6, '::')
- ))
-
-
-def check_ip_port(ctx, ep):
- # type: (CephadmContext, EndPoint) -> None
- if not ctx.skip_ping_check:
- logger.info(f'Verifying IP {ep.ip} port {ep.port} ...')
- if is_ipv6(ep.ip):
- s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- ip = unwrap_ipv6(ep.ip)
- else:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- ip = ep.ip
- attempt_bind(ctx, s, ip, ep.port)
+def ceph_daemons() -> List[str]:
+ cds = list(Ceph._daemons)
+ cds.append(CephExporter.daemon_type)
+ return cds
##################################
-# this is an abbreviated version of
-# https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py
-# that drops all of the compatibility (this is Unix/Linux only).
-
-class Timeout(TimeoutError):
- """
- Raised when the lock could not be acquired in *timeout*
- seconds.
- """
-
- def __init__(self, lock_file: str) -> None:
- """
- """
- #: The path of the file lock.
- self.lock_file = lock_file
- return None
-
- def __str__(self) -> str:
- temp = "The file lock '{}' could not be acquired."\
- .format(self.lock_file)
- return temp
-
-
-class _Acquire_ReturnProxy(object):
- def __init__(self, lock: 'FileLock') -> None:
- self.lock = lock
- return None
-
- def __enter__(self) -> 'FileLock':
- return self.lock
-
- def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
- self.lock.release()
- return None
-
-
-class FileLock(object):
- def __init__(self, ctx: CephadmContext, name: str, timeout: int = -1) -> None:
- if not os.path.exists(LOCK_DIR):
- os.mkdir(LOCK_DIR, 0o700)
- self._lock_file = os.path.join(LOCK_DIR, name + '.lock')
- self.ctx = ctx
-
- # The file descriptor for the *_lock_file* as it is returned by the
- # os.open() function.
- # This file lock is only NOT None, if the object currently holds the
- # lock.
- self._lock_file_fd: Optional[int] = None
- self.timeout = timeout
- # The lock counter is used for implementing the nested locking
- # mechanism. Whenever the lock is acquired, the counter is increased and
- # the lock is only released, when this value is 0 again.
- self._lock_counter = 0
- return None
-
- @property
- def is_locked(self) -> bool:
- return self._lock_file_fd is not None
-
- def acquire(self, timeout: Optional[int] = None, poll_intervall: float = 0.05) -> _Acquire_ReturnProxy:
- """
- Acquires the file lock or fails with a :exc:`Timeout` error.
- .. code-block:: python
- # You can use this method in the context manager (recommended)
- with lock.acquire():
- pass
- # Or use an equivalent try-finally construct:
- lock.acquire()
- try:
- pass
- finally:
- lock.release()
- :arg float timeout:
- The maximum time waited for the file lock.
- If ``timeout < 0``, there is no timeout and this method will
- block until the lock could be acquired.
- If ``timeout`` is None, the default :attr:`~timeout` is used.
- :arg float poll_intervall:
- We check once in *poll_intervall* seconds if we can acquire the
- file lock.
- :raises Timeout:
- if the lock could not be acquired in *timeout* seconds.
- .. versionchanged:: 2.0.0
- This method returns now a *proxy* object instead of *self*,
- so that it can be used in a with statement without side effects.
- """
-
- # Use the default timeout, if no timeout is provided.
- if timeout is None:
- timeout = self.timeout
-
- # Increment the number right at the beginning.
- # We can still undo it, if something fails.
- self._lock_counter += 1
-
- lock_id = id(self)
- lock_filename = self._lock_file
- start_time = time.time()
- try:
- while True:
- if not self.is_locked:
- logger.log(QUIET_LOG_LEVEL, 'Acquiring lock %s on %s', lock_id,
- lock_filename)
- self._acquire()
-
- if self.is_locked:
- logger.log(QUIET_LOG_LEVEL, 'Lock %s acquired on %s', lock_id,
- lock_filename)
- break
- elif timeout >= 0 and time.time() - start_time > timeout:
- logger.warning('Timeout acquiring lock %s on %s', lock_id,
- lock_filename)
- raise Timeout(self._lock_file)
- else:
- logger.log(
- QUIET_LOG_LEVEL,
- 'Lock %s not acquired on %s, waiting %s seconds ...',
- lock_id, lock_filename, poll_intervall
- )
- time.sleep(poll_intervall)
- except Exception:
- # Something did go wrong, so decrement the counter.
- self._lock_counter = max(0, self._lock_counter - 1)
-
- raise
- return _Acquire_ReturnProxy(lock=self)
-
- def release(self, force: bool = False) -> None:
- """
- Releases the file lock.
- Please note, that the lock is only completely released, if the lock
- counter is 0.
- Also note, that the lock file itself is not automatically deleted.
- :arg bool force:
- If true, the lock counter is ignored and the lock is released in
- every case.
- """
- if self.is_locked:
- self._lock_counter -= 1
-
- if self._lock_counter == 0 or force:
- # lock_id = id(self)
- # lock_filename = self._lock_file
-
- # Can't log in shutdown:
- # File "/usr/lib64/python3.9/logging/__init__.py", line 1175, in _open
- # NameError: name 'open' is not defined
- # logger.debug('Releasing lock %s on %s', lock_id, lock_filename)
- self._release()
- self._lock_counter = 0
- # logger.debug('Lock %s released on %s', lock_id, lock_filename)
-
- return None
-
- def __enter__(self) -> 'FileLock':
- self.acquire()
- return self
-
- def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
- self.release()
- return None
-
- def __del__(self) -> None:
- self.release(force=True)
- return None
-
- def _acquire(self) -> None:
- open_mode = os.O_RDWR | os.O_CREAT | os.O_TRUNC
- fd = os.open(self._lock_file, open_mode)
-
- try:
- fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except (IOError, OSError):
- os.close(fd)
- else:
- self._lock_file_fd = fd
- return None
-
- def _release(self) -> None:
- # Do not remove the lockfile:
- #
- # https://github.com/benediktschmitt/py-filelock/issues/31
- # https://stackoverflow.com/questions/17708885/flock-removing-locked-file-without-race-condition
- fd = self._lock_file_fd
- self._lock_file_fd = None
- fcntl.flock(fd, fcntl.LOCK_UN) # type: ignore
- os.close(fd) # type: ignore
- return None
-
-
-##################################
-# Popen wrappers, lifted from ceph-volume
-
-class CallVerbosity(Enum):
- #####
- # Format:
- # Normal Operation: <log-level-when-no-errors>, Errors: <log-level-when-error>
- #
- # NOTE: QUIET log level is custom level only used when --verbose is passed
- #####
-
- # Normal Operation: None, Errors: None
- SILENT = 0
- # Normal Operation: QUIET, Error: QUIET
- QUIET = 1
- # Normal Operation: DEBUG, Error: DEBUG
- DEBUG = 2
- # Normal Operation: QUIET, Error: INFO
- QUIET_UNLESS_ERROR = 3
- # Normal Operation: DEBUG, Error: INFO
- VERBOSE_ON_FAILURE = 4
- # Normal Operation: INFO, Error: INFO
- VERBOSE = 5
-
- def success_log_level(self) -> int:
- _verbosity_level_to_log_level = {
- self.SILENT: 0,
- self.QUIET: QUIET_LOG_LEVEL,
- self.DEBUG: logging.DEBUG,
- self.QUIET_UNLESS_ERROR: QUIET_LOG_LEVEL,
- self.VERBOSE_ON_FAILURE: logging.DEBUG,
- self.VERBOSE: logging.INFO
- }
- return _verbosity_level_to_log_level[self] # type: ignore
-
- def error_log_level(self) -> int:
- _verbosity_level_to_log_level = {
- self.SILENT: 0,
- self.QUIET: QUIET_LOG_LEVEL,
- self.DEBUG: logging.DEBUG,
- self.QUIET_UNLESS_ERROR: logging.INFO,
- self.VERBOSE_ON_FAILURE: logging.INFO,
- self.VERBOSE: logging.INFO
- }
- return _verbosity_level_to_log_level[self] # type: ignore
-
-
-# disable coverage for the next block. this is copy-n-paste
-# from other code for compatibilty on older python versions
-if sys.version_info < (3, 8): # pragma: no cover
- import itertools
- import threading
- import warnings
- from asyncio import events
-
- class ThreadedChildWatcher(asyncio.AbstractChildWatcher):
- """Threaded child watcher implementation.
- The watcher uses a thread per process
- for waiting for the process finish.
- It doesn't require subscription on POSIX signal
- but a thread creation is not free.
- The watcher has O(1) complexity, its performance doesn't depend
- on amount of spawn processes.
- """
-
- def __init__(self) -> None:
- self._pid_counter = itertools.count(0)
- self._threads: Dict[Any, Any] = {}
-
- def is_active(self) -> bool:
- return True
-
- def close(self) -> None:
- self._join_threads()
-
- def _join_threads(self) -> None:
- """Internal: Join all non-daemon threads"""
- threads = [thread for thread in list(self._threads.values())
- if thread.is_alive() and not thread.daemon]
- for thread in threads:
- thread.join()
-
- def __enter__(self) -> Any:
- return self
-
- def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
- pass
-
- def __del__(self, _warn: Any = warnings.warn) -> None:
- threads = [thread for thread in list(self._threads.values())
- if thread.is_alive()]
- if threads:
- _warn(f'{self.__class__} has registered but not finished child processes',
- ResourceWarning,
- source=self)
-
- def add_child_handler(self, pid: Any, callback: Any, *args: Any) -> None:
- loop = events.get_event_loop()
- thread = threading.Thread(target=self._do_waitpid,
- name=f'waitpid-{next(self._pid_counter)}',
- args=(loop, pid, callback, args),
- daemon=True)
- self._threads[pid] = thread
- thread.start()
-
- def remove_child_handler(self, pid: Any) -> bool:
- # asyncio never calls remove_child_handler() !!!
- # The method is no-op but is implemented because
- # abstract base classe requires it
- return True
-
- def attach_loop(self, loop: Any) -> None:
- pass
-
- def _do_waitpid(self, loop: Any, expected_pid: Any, callback: Any, args: Any) -> None:
- assert expected_pid > 0
-
- try:
- pid, status = os.waitpid(expected_pid, 0)
- except ChildProcessError:
- # The child process is already reaped
- # (may happen if waitpid() is called elsewhere).
- pid = expected_pid
- returncode = 255
- logger.warning(
- 'Unknown child process pid %d, will report returncode 255',
- pid)
- else:
- if os.WIFEXITED(status):
- returncode = os.WEXITSTATUS(status)
- elif os.WIFSIGNALED(status):
- returncode = -os.WTERMSIG(status)
- else:
- raise ValueError(f'unknown wait status {status}')
- if loop.get_debug():
- logger.debug('process %s exited with returncode %s',
- expected_pid, returncode)
-
- if loop.is_closed():
- logger.warning('Loop %r that handles pid %r is closed', loop, pid)
- else:
- loop.call_soon_threadsafe(callback, pid, returncode, *args)
-
- self._threads.pop(expected_pid)
-
- # unlike SafeChildWatcher which handles SIGCHLD in the main thread,
- # ThreadedChildWatcher runs in a separated thread, hence allows us to
- # run create_subprocess_exec() in non-main thread, see
- # https://bugs.python.org/issue35621
- asyncio.set_child_watcher(ThreadedChildWatcher())
-
-
-try:
- from asyncio import run as async_run # type: ignore[attr-defined]
-except ImportError: # pragma: no cover
- # disable coverage for this block. it should be a copy-n-paste from
- # from newer libs for compatibilty on older python versions
- def async_run(coro): # type: ignore
- loop = asyncio.new_event_loop()
- try:
- asyncio.set_event_loop(loop)
- return loop.run_until_complete(coro)
- finally:
- try:
- loop.run_until_complete(loop.shutdown_asyncgens())
- finally:
- asyncio.set_event_loop(None)
- loop.close()
-
-
-def call(ctx: CephadmContext,
- command: List[str],
- desc: Optional[str] = None,
- verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
- timeout: Optional[int] = DEFAULT_TIMEOUT,
- **kwargs: Any) -> Tuple[str, str, int]:
- """
- Wrap subprocess.Popen to
-
- - log stdout/stderr to a logger,
- - decode utf-8
- - cleanly return out, err, returncode
-
- :param timeout: timeout in seconds
- """
-
- prefix = command[0] if desc is None else desc
- if prefix:
- prefix += ': '
- timeout = timeout or ctx.timeout
-
- async def run_with_timeout() -> Tuple[str, str, int]:
- process = await asyncio.create_subprocess_exec(
- *command,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- env=os.environ.copy())
- assert process.stdout
- assert process.stderr
- try:
- stdout, stderr = await asyncio.wait_for(
- process.communicate(),
- timeout,
- )
- except asyncio.TimeoutError:
- # try to terminate the process assuming it is still running. It's
- # possible that even after killing the process it will not
- # complete, particularly if it is D-state. If that happens the
- # process.wait call will block, but we're no worse off than before
- # when the timeout did not work. Additionally, there are other
- # corner-cases we could try and handle here but we decided to start
- # simple.
- process.kill()
- await process.wait()
- logger.info(prefix + f'timeout after {timeout} seconds')
- return '', '', 124
- else:
- assert process.returncode is not None
- return (
- stdout.decode('utf-8'),
- stderr.decode('utf-8'),
- process.returncode,
- )
-
- stdout, stderr, returncode = async_run(run_with_timeout())
- log_level = verbosity.success_log_level()
- if returncode != 0:
- log_level = verbosity.error_log_level()
- logger.log(log_level, f'Non-zero exit code {returncode} from {" ".join(command)}')
- for line in stdout.splitlines():
- logger.log(log_level, prefix + 'stdout ' + line)
- for line in stderr.splitlines():
- logger.log(log_level, prefix + 'stderr ' + line)
- return stdout, stderr, returncode
-
-
-def call_throws(
- ctx: CephadmContext,
- command: List[str],
- desc: Optional[str] = None,
- verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE,
- timeout: Optional[int] = DEFAULT_TIMEOUT,
- **kwargs: Any) -> Tuple[str, str, int]:
- out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs)
- if ret:
- for s in (out, err):
- if s.strip() and len(s.splitlines()) <= 2: # readable message?
- raise RuntimeError(f'Failed command: {" ".join(command)}: {s}')
- raise RuntimeError('Failed command: %s' % ' '.join(command))
- return out, err, ret
-
-
-def call_timeout(ctx, command, timeout):
- # type: (CephadmContext, List[str], int) -> int
- logger.debug('Running command (timeout=%s): %s'
- % (timeout, ' '.join(command)))
-
- def raise_timeout(command, timeout):
- # type: (List[str], int) -> NoReturn
- msg = 'Command `%s` timed out after %s seconds' % (command, timeout)
- logger.debug(msg)
- raise TimeoutExpired(msg)
-
- try:
- return subprocess.call(command, timeout=timeout, env=os.environ.copy())
- except subprocess.TimeoutExpired:
- raise_timeout(command, timeout)
-
##################################
@@ -2142,114 +2247,6 @@ def is_available(ctx, what, func):
time.sleep(2)
-def read_config(fn):
- # type: (Optional[str]) -> ConfigParser
- cp = ConfigParser()
- if fn:
- cp.read(fn)
- return cp
-
-
-def pathify(p):
- # type: (str) -> str
- p = os.path.expanduser(p)
- return os.path.abspath(p)
-
-
-def get_file_timestamp(fn):
- # type: (str) -> Optional[str]
- try:
- mt = os.path.getmtime(fn)
- return datetime.datetime.fromtimestamp(
- mt, tz=datetime.timezone.utc
- ).strftime(DATEFMT)
- except Exception:
- return None
-
-
-def try_convert_datetime(s):
- # type: (str) -> Optional[str]
- # This is super irritating because
- # 1) podman and docker use different formats
- # 2) python's strptime can't parse either one
- #
- # I've seen:
- # docker 18.09.7: 2020-03-03T09:21:43.636153304Z
- # podman 1.7.0: 2020-03-03T15:52:30.136257504-06:00
- # 2020-03-03 15:52:30.136257504 -0600 CST
- # (In the podman case, there is a different string format for
- # 'inspect' and 'inspect --format {{.Created}}'!!)
-
- # In *all* cases, the 9 digit second precision is too much for
- # python's strptime. Shorten it to 6 digits.
- p = re.compile(r'(\.[\d]{6})[\d]*')
- s = p.sub(r'\1', s)
-
- # replace trailing Z with -0000, since (on python 3.6.8) it won't parse
- if s and s[-1] == 'Z':
- s = s[:-1] + '-0000'
-
- # cut off the redundant 'CST' part that strptime can't parse, if
- # present.
- v = s.split(' ')
- s = ' '.join(v[0:3])
-
- # try parsing with several format strings
- fmts = [
- '%Y-%m-%dT%H:%M:%S.%f%z',
- '%Y-%m-%d %H:%M:%S.%f %z',
- ]
- for f in fmts:
- try:
- # return timestamp normalized to UTC, rendered as DATEFMT.
- return datetime.datetime.strptime(s, f).astimezone(tz=datetime.timezone.utc).strftime(DATEFMT)
- except ValueError:
- pass
- return None
-
-
-def _parse_podman_version(version_str):
- # type: (str) -> Tuple[int, ...]
- def to_int(val: str, org_e: Optional[Exception] = None) -> int:
- if not val and org_e:
- raise org_e
- try:
- return int(val)
- except ValueError as e:
- return to_int(val[0:-1], org_e or e)
-
- return tuple(map(to_int, version_str.split('.')))
-
-
-def get_hostname():
- # type: () -> str
- return socket.gethostname()
-
-
-def get_short_hostname():
- # type: () -> str
- return get_hostname().split('.', 1)[0]
-
-
-def get_fqdn():
- # type: () -> str
- return socket.getfqdn() or socket.gethostname()
-
-
-def get_ip_addresses(hostname: str) -> Tuple[List[str], List[str]]:
- items = socket.getaddrinfo(hostname, None,
- flags=socket.AI_CANONNAME,
- type=socket.SOCK_STREAM)
- ipv4_addresses = [i[4][0] for i in items if i[0] == socket.AF_INET]
- ipv6_addresses = [i[4][0] for i in items if i[0] == socket.AF_INET6]
- return ipv4_addresses, ipv6_addresses
-
-
-def get_arch():
- # type: () -> str
- return platform.uname().machine
-
-
def generate_service_id():
# type: () -> str
return get_short_hostname() + '.' + ''.join(random.choice(string.ascii_lowercase)
@@ -2279,15 +2276,6 @@ def make_fsid():
return str(uuid.uuid1())
-def is_fsid(s):
- # type: (str) -> bool
- try:
- uuid.UUID(s)
- except ValueError:
- return False
- return True
-
-
def validate_fsid(func: FuncT) -> FuncT:
@wraps(func)
def _validate_fsid(ctx: CephadmContext) -> Any:
@@ -2354,7 +2342,8 @@ def infer_config(func: FuncT) -> FuncT:
def _infer_config(ctx: CephadmContext) -> Any:
def config_path(daemon_type: str, daemon_name: str) -> str:
- data_dir = get_data_dir(ctx.fsid, ctx.data_dir, daemon_type, daemon_name)
+ ident = DaemonIdentity(ctx.fsid, daemon_type, daemon_name)
+ data_dir = ident.data_dir(ctx.data_dir)
return os.path.join(data_dir, 'config')
def get_mon_daemon_name(fsid: str) -> Optional[str]:
@@ -2403,7 +2392,7 @@ For information regarding the latest stable release:
https://docs.ceph.com/docs/{}/cephadm/install
""".format(LATEST_STABLE_RELEASE)
for line in warn.splitlines():
- logger.warning('{}{}{}'.format(termcolor.yellow, line, termcolor.end))
+ logger.warning(line, extra=Highlight.WARNING.extra())
return DEFAULT_IMAGE
@@ -2424,19 +2413,6 @@ def infer_image(func: FuncT) -> FuncT:
return cast(FuncT, _infer_image)
-def require_image(func: FuncT) -> FuncT:
- """
- Require the global --image flag to be set
- """
- @wraps(func)
- def _require_image(ctx: CephadmContext) -> Any:
- if not ctx.image:
- raise Error('This command requires the global --image option to be set')
- return func(ctx)
-
- return cast(FuncT, _require_image)
-
-
def default_image(func: FuncT) -> FuncT:
@wraps(func)
def _default_image(ctx: CephadmContext) -> Any:
@@ -2471,27 +2447,6 @@ def update_default_image(ctx: CephadmContext) -> None:
ctx.image = _get_default_image(ctx)
-def executes_early(func: FuncT) -> FuncT:
- """Decorator that indicates the command function is meant to have no
- dependencies and no environmental requirements and can therefore be
- executed as non-root and with no logging, etc. Commands that have this
- decorator applied must be simple and self-contained.
- """
- cast(Any, func)._execute_early = True
- return func
-
-
-def deprecated_command(func: FuncT) -> FuncT:
- @wraps(func)
- def _deprecated_command(ctx: CephadmContext) -> Any:
- logger.warning(f'Deprecated command used: {func}')
- if NO_DEPRECATED:
- raise Error('running deprecated commands disabled')
- return func(ctx)
-
- return cast(FuncT, _deprecated_command)
-
-
def get_container_info(ctx: CephadmContext, daemon_filter: str, by_name: bool) -> Optional[ContainerInfo]:
"""
:param ctx: Cephadm context
@@ -2541,7 +2496,7 @@ def infer_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional
container_info = None
daemon_name = ctx.name if ('name' in ctx and ctx.name and '.' in ctx.name) else None
- daemons_ls = [daemon_name] if daemon_name is not None else Ceph.daemons # daemon types: 'mon', 'mgr', etc
+ daemons_ls = [daemon_name] if daemon_name is not None else ceph_daemons() # daemon types: 'mon', 'mgr', etc
for daemon in daemons_ls:
container_info = get_container_info(ctx, daemon, daemon_name is not None)
if container_info is not None:
@@ -2559,32 +2514,6 @@ def infer_local_ceph_image(ctx: CephadmContext, container_path: str) -> Optional
return None
-def write_tmp(s, uid, gid):
- # type: (str, int, int) -> IO[str]
- tmp_f = tempfile.NamedTemporaryFile(mode='w',
- prefix='ceph-tmp')
- os.fchown(tmp_f.fileno(), uid, gid)
- tmp_f.write(s)
- tmp_f.flush()
-
- return tmp_f
-
-
-def makedirs(dir, uid, gid, mode):
- # type: (str, int, int, int) -> None
- if not os.path.exists(dir):
- os.makedirs(dir, mode=mode)
- else:
- os.chmod(dir, mode)
- os.chown(dir, uid, gid)
- os.chmod(dir, mode) # the above is masked by umask...
-
-
-def get_data_dir(fsid, data_dir, t, n):
- # type: (str, str, str, Union[int, str]) -> str
- return os.path.join(data_dir, fsid, '%s.%s' % (t, n))
-
-
def get_log_dir(fsid, log_dir):
# type: (str, str) -> str
return os.path.join(log_dir, fsid)
@@ -2600,12 +2529,16 @@ def make_data_dir_base(fsid, data_dir, uid, gid):
return data_dir_base
-def make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=None, gid=None):
- # type: (CephadmContext, str, str, Union[int, str], Optional[int], Optional[int]) -> str
+def make_data_dir(
+ ctx: CephadmContext,
+ ident: 'DaemonIdentity',
+ uid: Optional[int] = None,
+ gid: Optional[int] = None,
+) -> str:
if uid is None or gid is None:
uid, gid = extract_uid_gid(ctx)
- make_data_dir_base(fsid, ctx.data_dir, uid, gid)
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+ make_data_dir_base(ident.fsid, ctx.data_dir, uid, gid)
+ data_dir = ident.data_dir(ctx.data_dir)
makedirs(data_dir, uid, gid, DATA_DIR_MODE)
return data_dir
@@ -2696,90 +2629,14 @@ def move_files(ctx, src, dst, uid=None, gid=None):
os.chown(dst_file, uid, gid)
-def recursive_chown(path: str, uid: int, gid: int) -> None:
- for dirpath, dirnames, filenames in os.walk(path):
- os.chown(dirpath, uid, gid)
- for filename in filenames:
- os.chown(os.path.join(dirpath, filename), uid, gid)
-
-
-# copied from distutils
-def find_executable(executable: str, path: Optional[str] = None) -> Optional[str]:
- """Tries to find 'executable' in the directories listed in 'path'.
- A string listing directories separated by 'os.pathsep'; defaults to
- os.environ['PATH']. Returns the complete filename or None if not found.
+def get_unit_name(
+ fsid: str, daemon_type: str, daemon_id: Union[str, int]
+) -> str:
+ """Return the name of the systemd unit given an fsid, a daemon_type,
+ and the daemon_id.
"""
- _, ext = os.path.splitext(executable)
- if (sys.platform == 'win32') and (ext != '.exe'):
- executable = executable + '.exe' # pragma: no cover
-
- if os.path.isfile(executable):
- return executable
-
- if path is None:
- path = os.environ.get('PATH', None)
- if path is None:
- try:
- path = os.confstr('CS_PATH')
- except (AttributeError, ValueError):
- # os.confstr() or CS_PATH is not available
- path = os.defpath
- # bpo-35755: Don't use os.defpath if the PATH environment variable is
- # set to an empty string
-
- # PATH='' doesn't match, whereas PATH=':' looks in the current directory
- if not path:
- return None
-
- paths = path.split(os.pathsep)
- for p in paths:
- f = os.path.join(p, executable)
- if os.path.isfile(f):
- # the file exists, we have a shot at spawn working
- return f
- return None
-
-
-def find_program(filename):
- # type: (str) -> str
- name = find_executable(filename)
- if name is None:
- raise ValueError('%s not found' % filename)
- return name
-
-
-def find_container_engine(ctx: CephadmContext) -> Optional[ContainerEngine]:
- if ctx.docker:
- return Docker()
- else:
- for i in CONTAINER_PREFERENCE:
- try:
- return i()
- except Exception:
- pass
- return None
-
-
-def check_container_engine(ctx: CephadmContext) -> ContainerEngine:
- engine = ctx.container_engine
- if not isinstance(engine, CONTAINER_PREFERENCE):
- # See https://github.com/python/mypy/issues/8993
- exes: List[str] = [i.EXE for i in CONTAINER_PREFERENCE] # type: ignore
- raise Error('No container engine binary found ({}). Try run `apt/dnf/yum/zypper install <container engine>`'.format(' or '.join(exes)))
- elif isinstance(engine, Podman):
- engine.get_version(ctx)
- if engine.version < MIN_PODMAN_VERSION:
- raise Error('podman version %d.%d.%d or later is required' % MIN_PODMAN_VERSION)
- return engine
-
-
-def get_unit_name(fsid, daemon_type, daemon_id=None):
- # type: (str, str, Optional[Union[int, str]]) -> str
- # accept either name or type + id
- if daemon_id is not None:
- return 'ceph-%s@%s.%s' % (fsid, daemon_type, daemon_id)
- else:
- return 'ceph-%s@%s' % (fsid, daemon_type)
+ # TODO: fully replace get_unit_name with DaemonIdentity instances
+ return DaemonIdentity(fsid, daemon_type, daemon_id).unit_name
def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid: str, name: str) -> str:
@@ -2790,90 +2647,6 @@ def get_unit_name_by_daemon_name(ctx: CephadmContext, fsid: str, name: str) -> s
raise Error('Failed to get unit name for {}'.format(daemon))
-def check_unit(ctx, unit_name):
- # type: (CephadmContext, str) -> Tuple[bool, str, bool]
- # NOTE: we ignore the exit code here because systemctl outputs
- # various exit codes based on the state of the service, but the
- # string result is more explicit (and sufficient).
- enabled = False
- installed = False
- try:
- out, err, code = call(ctx, ['systemctl', 'is-enabled', unit_name],
- verbosity=CallVerbosity.QUIET)
- if code == 0:
- enabled = True
- installed = True
- elif 'disabled' in out:
- installed = True
- except Exception as e:
- logger.warning('unable to run systemctl: %s' % e)
- enabled = False
- installed = False
-
- state = 'unknown'
- try:
- out, err, code = call(ctx, ['systemctl', 'is-active', unit_name],
- verbosity=CallVerbosity.QUIET)
- out = out.strip()
- if out in ['active']:
- state = 'running'
- elif out in ['inactive']:
- state = 'stopped'
- elif out in ['failed', 'auto-restart']:
- state = 'error'
- else:
- state = 'unknown'
- except Exception as e:
- logger.warning('unable to run systemctl: %s' % e)
- state = 'unknown'
- return (enabled, state, installed)
-
-
-def check_units(ctx, units, enabler=None):
- # type: (CephadmContext, List[str], Optional[Packager]) -> bool
- for u in units:
- (enabled, state, installed) = check_unit(ctx, u)
- if enabled and state == 'running':
- logger.info('Unit %s is enabled and running' % u)
- return True
- if enabler is not None:
- if installed:
- logger.info('Enabling unit %s' % u)
- enabler.enable_service(u)
- return False
-
-
-def is_container_running(ctx: CephadmContext, c: 'CephContainer') -> bool:
- if ctx.name.split('.', 1)[0] in ['agent', 'cephadm-exporter']:
- # these are non-containerized daemon types
- return False
- return bool(get_running_container_name(ctx, c))
-
-
-def get_running_container_name(ctx: CephadmContext, c: 'CephContainer') -> Optional[str]:
- for name in [c.cname, c.old_cname]:
- out, err, ret = call(ctx, [
- ctx.container_engine.path, 'container', 'inspect',
- '--format', '{{.State.Status}}', name
- ])
- if out.strip() == 'running':
- return name
- return None
-
-
-def get_legacy_config_fsid(cluster, legacy_dir=None):
- # type: (str, Optional[str]) -> Optional[str]
- config_file = '/etc/ceph/%s.conf' % cluster
- if legacy_dir is not None:
- config_file = os.path.abspath(legacy_dir + config_file)
-
- if os.path.exists(config_file):
- config = read_config(config_file)
- if config.has_section('global') and config.has_option('global', 'fsid'):
- return config.get('global', 'fsid')
- return None
-
-
def get_legacy_daemon_fsid(ctx, cluster,
daemon_type, daemon_id, legacy_dir=None):
# type: (CephadmContext, str, str, Union[int, str], Optional[str]) -> Optional[str]
@@ -2895,130 +2668,19 @@ def get_legacy_daemon_fsid(ctx, cluster,
return fsid
-def should_log_to_journald(ctx: CephadmContext) -> bool:
- if ctx.log_to_journald is not None:
- return ctx.log_to_journald
- return isinstance(ctx.container_engine, Podman) and \
- ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION
-
-
-def get_daemon_args(ctx, fsid, daemon_type, daemon_id):
- # type: (CephadmContext, str, str, Union[int, str]) -> List[str]
- r = list() # type: List[str]
-
- if daemon_type in Ceph.daemons and daemon_type not in ['crash', 'ceph-exporter']:
- r += [
- '--setuser', 'ceph',
- '--setgroup', 'ceph',
- '--default-log-to-file=false',
- ]
- log_to_journald = should_log_to_journald(ctx)
- if log_to_journald:
- r += [
- '--default-log-to-journald=true',
- '--default-log-to-stderr=false',
- ]
- else:
- r += [
- '--default-log-to-stderr=true',
- '--default-log-stderr-prefix=debug ',
- ]
- if daemon_type == 'mon':
- r += [
- '--default-mon-cluster-log-to-file=false',
- ]
- if log_to_journald:
- r += [
- '--default-mon-cluster-log-to-journald=true',
- '--default-mon-cluster-log-to-stderr=false',
- ]
- else:
- r += ['--default-mon-cluster-log-to-stderr=true']
- elif daemon_type in Monitoring.components:
- metadata = Monitoring.components[daemon_type]
- r += metadata.get('args', list())
- # set ip and port to bind to for nodeexporter,alertmanager,prometheus
- if daemon_type not in ['grafana', 'loki', 'promtail']:
- ip = ''
- port = Monitoring.port_map[daemon_type][0]
- meta = fetch_meta(ctx)
- if meta:
- if 'ip' in meta and meta['ip']:
- ip = meta['ip']
- if 'ports' in meta and meta['ports']:
- port = meta['ports'][0]
- r += [f'--web.listen-address={ip}:{port}']
- if daemon_type == 'prometheus':
- config = fetch_configs(ctx)
- retention_time = config.get('retention_time', '15d')
- retention_size = config.get('retention_size', '0') # default to disabled
- r += [f'--storage.tsdb.retention.time={retention_time}']
- r += [f'--storage.tsdb.retention.size={retention_size}']
- scheme = 'http'
- host = get_fqdn()
- # in case host is not an fqdn then we use the IP to
- # avoid producing a broken web.external-url link
- if '.' not in host:
- ipv4_addrs, ipv6_addrs = get_ip_addresses(get_hostname())
- # use the first ipv4 (if any) otherwise use the first ipv6
- addr = next(iter(ipv4_addrs or ipv6_addrs), None)
- host = wrap_ipv6(addr) if addr else host
- r += [f'--web.external-url={scheme}://{host}:{port}']
- if daemon_type == 'alertmanager':
- config = fetch_configs(ctx)
- peers = config.get('peers', list()) # type: ignore
- for peer in peers:
- r += ['--cluster.peer={}'.format(peer)]
- try:
- r += [f'--web.config.file={config["web_config"]}']
- except KeyError:
- pass
- # some alertmanager, by default, look elsewhere for a config
- r += ['--config.file=/etc/alertmanager/alertmanager.yml']
- if daemon_type == 'promtail':
- r += ['--config.expand-env']
- if daemon_type == 'prometheus':
- config = fetch_configs(ctx)
- try:
- r += [f'--web.config.file={config["web_config"]}']
- except KeyError:
- pass
- if daemon_type == 'node-exporter':
- config = fetch_configs(ctx)
- try:
- r += [f'--web.config.file={config["web_config"]}']
- except KeyError:
- pass
- r += ['--path.procfs=/host/proc',
- '--path.sysfs=/host/sys',
- '--path.rootfs=/rootfs']
- elif daemon_type == 'jaeger-agent':
- r.extend(Tracing.components[daemon_type]['daemon_args'])
- elif daemon_type == NFSGanesha.daemon_type:
- nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
- r += nfs_ganesha.get_daemon_args()
- elif daemon_type == CephExporter.daemon_type:
- ceph_exporter = CephExporter.init(ctx, fsid, daemon_id)
- r.extend(ceph_exporter.get_daemon_args())
- elif daemon_type == HAproxy.daemon_type:
- haproxy = HAproxy.init(ctx, fsid, daemon_id)
- r += haproxy.get_daemon_args()
- elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(ctx, fsid, daemon_id)
- r.extend(cc.get_daemon_args())
- elif daemon_type == SNMPGateway.daemon_type:
- sc = SNMPGateway.init(ctx, fsid, daemon_id)
- r.extend(sc.get_daemon_args())
-
- return r
-
-
-def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
- config=None, keyring=None):
- # type: (CephadmContext, str, str, Union[int, str], int, int, Optional[str], Optional[str]) -> None
- data_dir = make_data_dir(ctx, fsid, daemon_type, daemon_id, uid=uid, gid=gid)
+def create_daemon_dirs(
+ ctx: CephadmContext,
+ ident: 'DaemonIdentity',
+ uid: int,
+ gid: int,
+ config: Optional[str] = None,
+ keyring: Optional[str] = None,
+) -> None:
+ # unpack fsid and daemon_type from ident because they're used very frequently
+ fsid, daemon_type = ident.fsid, ident.daemon_type
+ data_dir = make_data_dir(ctx, ident, uid=uid, gid=gid)
- if daemon_type in Ceph.daemons:
+ if daemon_type in ceph_daemons():
make_log_dir(ctx, fsid, uid=uid, gid=gid)
if config:
@@ -3038,8 +2700,7 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
config_dir = ''
data_dir_root = ''
if daemon_type == 'prometheus':
- data_dir_root = get_data_dir(fsid, ctx.data_dir,
- daemon_type, daemon_id)
+ data_dir_root = ident.data_dir(ctx.data_dir)
config_dir = 'etc/prometheus'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'alerting'), uid, gid, 0o755)
@@ -3047,8 +2708,7 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
recursive_chown(os.path.join(data_dir_root, 'etc'), uid, gid)
recursive_chown(os.path.join(data_dir_root, 'data'), uid, gid)
elif daemon_type == 'grafana':
- data_dir_root = get_data_dir(fsid, ctx.data_dir,
- daemon_type, daemon_id)
+ data_dir_root = ident.data_dir(ctx.data_dir)
config_dir = 'etc/grafana'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'certs'), uid, gid, 0o755)
@@ -3056,26 +2716,22 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
touch(os.path.join(data_dir_root, 'data', 'grafana.db'), uid, gid)
elif daemon_type == 'alertmanager':
- data_dir_root = get_data_dir(fsid, ctx.data_dir,
- daemon_type, daemon_id)
+ data_dir_root = ident.data_dir(ctx.data_dir)
config_dir = 'etc/alertmanager'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, config_dir, 'data'), uid, gid, 0o755)
elif daemon_type == 'promtail':
- data_dir_root = get_data_dir(fsid, ctx.data_dir,
- daemon_type, daemon_id)
+ data_dir_root = ident.data_dir(ctx.data_dir)
config_dir = 'etc/promtail'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
elif daemon_type == 'loki':
- data_dir_root = get_data_dir(fsid, ctx.data_dir,
- daemon_type, daemon_id)
+ data_dir_root = ident.data_dir(ctx.data_dir)
config_dir = 'etc/loki'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
makedirs(os.path.join(data_dir_root, 'data'), uid, gid, 0o755)
elif daemon_type == 'node-exporter':
- data_dir_root = get_data_dir(fsid, ctx.data_dir,
- daemon_type, daemon_id)
+ data_dir_root = ident.data_dir(ctx.data_dir)
config_dir = 'etc/node-exporter'
makedirs(os.path.join(data_dir_root, config_dir), uid, gid, 0o755)
recursive_chown(os.path.join(data_dir_root, 'etc'), uid, gid)
@@ -3095,42 +2751,49 @@ def create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid,
f.write(content)
elif daemon_type == NFSGanesha.daemon_type:
- nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
+ nfs_ganesha = NFSGanesha.init(ctx, fsid, ident.daemon_id)
nfs_ganesha.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == CephIscsi.daemon_type:
- ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
+ ceph_iscsi = CephIscsi.init(ctx, fsid, ident.daemon_id)
ceph_iscsi.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == CephNvmeof.daemon_type:
- ceph_nvmeof = CephNvmeof.init(ctx, fsid, daemon_id)
+ ceph_nvmeof = CephNvmeof.init(ctx, fsid, ident.daemon_id)
ceph_nvmeof.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == HAproxy.daemon_type:
- haproxy = HAproxy.init(ctx, fsid, daemon_id)
+ haproxy = HAproxy.init(ctx, fsid, ident.daemon_id)
haproxy.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == Keepalived.daemon_type:
- keepalived = Keepalived.init(ctx, fsid, daemon_id)
+ keepalived = Keepalived.init(ctx, fsid, ident.daemon_id)
keepalived.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(ctx, fsid, daemon_id)
+ cc = CustomContainer.init(ctx, fsid, ident.daemon_id)
cc.create_daemon_dirs(data_dir, uid, gid)
elif daemon_type == SNMPGateway.daemon_type:
- sg = SNMPGateway.init(ctx, fsid, daemon_id)
+ sg = SNMPGateway.init(ctx, fsid, ident.daemon_id)
sg.create_daemon_conf()
- _write_custom_conf_files(ctx, daemon_type, str(daemon_id), fsid, uid, gid)
+ _write_custom_conf_files(ctx, ident, uid, gid)
-def _write_custom_conf_files(ctx: CephadmContext, daemon_type: str, daemon_id: str, fsid: str, uid: int, gid: int) -> None:
+def _write_custom_conf_files(
+ ctx: CephadmContext, ident: 'DaemonIdentity', uid: int, gid: int
+) -> None:
# mostly making this its own function to make unit testing easier
ccfiles = fetch_custom_config_files(ctx)
if not ccfiles:
return
- custom_config_dir = os.path.join(ctx.data_dir, fsid, 'custom_config_files', f'{daemon_type}.{daemon_id}')
+ custom_config_dir = os.path.join(
+ ctx.data_dir,
+ ident.fsid,
+ 'custom_config_files',
+ f'{ident.daemon_type}.{ident.daemon_id}',
+ )
if not os.path.exists(custom_config_dir):
makedirs(custom_config_dir, uid, gid, 0o755)
mandatory_keys = ['mount_path', 'content']
@@ -3139,183 +2802,48 @@ def _write_custom_conf_files(ctx: CephadmContext, daemon_type: str, daemon_id: s
file_path = os.path.join(custom_config_dir, os.path.basename(ccf['mount_path']))
with write_new(file_path, owner=(uid, gid), encoding='utf-8') as f:
f.write(ccf['content'])
+ # temporary workaround to make custom config files work for tcmu-runner
+ # container we deploy with iscsi until iscsi is refactored
+ if ident.daemon_type == 'iscsi':
+ tcmu_config_dir = custom_config_dir + '.tcmu'
+ if not os.path.exists(tcmu_config_dir):
+ makedirs(tcmu_config_dir, uid, gid, 0o755)
+ tcmu_file_path = os.path.join(tcmu_config_dir, os.path.basename(ccf['mount_path']))
+ with write_new(tcmu_file_path, owner=(uid, gid), encoding='utf-8') as f:
+ f.write(ccf['content'])
+
+
+def get_container_binds(
+ ctx: CephadmContext, ident: 'DaemonIdentity'
+) -> List[List[str]]:
+ binds: List[List[str]] = list()
+ daemon = daemon_form_create(ctx, ident)
+ assert isinstance(daemon, ContainerDaemonForm)
+ daemon.customize_container_binds(ctx, binds)
+ return binds
-def get_parm(option: str) -> Dict[str, str]:
- js = _get_config_json(option)
- # custom_config_files is a special field that may be in the config
- # dict. It is used for mounting custom config files into daemon's containers
- # and should be accessed through the "fetch_custom_config_files" function.
- # For get_parm we need to discard it.
- js.pop('custom_config_files', None)
- return js
-
-
-def _get_config_json(option: str) -> Dict[str, Any]:
- if not option:
- return dict()
-
- global cached_stdin
- if option == '-':
- if cached_stdin is not None:
- j = cached_stdin
- else:
- j = sys.stdin.read()
- cached_stdin = j
- else:
- # inline json string
- if option[0] == '{' and option[-1] == '}':
- j = option
- # json file
- elif os.path.exists(option):
- with open(option, 'r') as f:
- j = f.read()
- else:
- raise Error('Config file {} not found'.format(option))
-
- try:
- js = json.loads(j)
- except ValueError as e:
- raise Error('Invalid JSON in {}: {}'.format(option, e))
- else:
- return js
-
-
-def fetch_meta(ctx: CephadmContext) -> Dict[str, Any]:
- """Return a dict containing metadata about a deployment.
- """
- meta = getattr(ctx, 'meta_properties', None)
- if meta is not None:
- return meta
- mjson = getattr(ctx, 'meta_json', None)
- if mjson is not None:
- meta = json.loads(mjson) or {}
- ctx.meta_properties = meta
- return meta
- return {}
-
-
-def fetch_configs(ctx: CephadmContext) -> Dict[str, str]:
- """Return a dict containing arbitrary configuration parameters.
- This function filters out the key 'custom_config_files' which
- must not be part of a deployment's configuration key-value pairs.
- To access custom configuration file data, use `fetch_custom_config_files`.
- """
- # ctx.config_blobs is *always* a dict. it is created once when
- # a command is parsed/processed and stored "forever"
- cfg_blobs = getattr(ctx, 'config_blobs', None)
- if cfg_blobs:
- cfg_blobs = dict(cfg_blobs)
- cfg_blobs.pop('custom_config_files', None)
- return cfg_blobs
- # ctx.config_json is the legacy equivalent of config_blobs. it is a
- # string that either contains json or refers to a file name where
- # the file contains json.
- cfg_json = getattr(ctx, 'config_json', None)
- if cfg_json:
- jdata = _get_config_json(cfg_json) or {}
- jdata.pop('custom_config_files', None)
- return jdata
- return {}
-
-
-def fetch_custom_config_files(ctx: CephadmContext) -> List[Dict[str, Any]]:
- """Return a list containing dicts that can be used to populate
- custom configuration files for containers.
- """
- # NOTE: this function works like the opposite of fetch_configs.
- # instead of filtering out custom_config_files, it returns only
- # the content in that key.
- cfg_blobs = getattr(ctx, 'config_blobs', None)
- if cfg_blobs:
- return cfg_blobs.get('custom_config_files', [])
- cfg_json = getattr(ctx, 'config_json', None)
- if cfg_json:
- jdata = _get_config_json(cfg_json)
- return jdata.get('custom_config_files', [])
- return []
-
-
-def fetch_tcp_ports(ctx: CephadmContext) -> List[EndPoint]:
- """Return a list of Endpoints, which have a port and ip attribute
+def get_container_mounts_for_type(
+ ctx: CephadmContext, fsid: str, daemon_type: str
+) -> Dict[str, str]:
+ """Return a dictionary mapping container-external paths to container-internal
+ paths given an fsid and daemon_type.
"""
- ports = getattr(ctx, 'tcp_ports', None)
- if ports is None:
- ports = []
- if isinstance(ports, str):
- ports = list(map(int, ports.split()))
- port_ips: Dict[str, str] = {}
- port_ips_attr: Union[str, Dict[str, str], None] = getattr(ctx, 'port_ips', None)
- if isinstance(port_ips_attr, str):
- port_ips = json.loads(port_ips_attr)
- elif port_ips_attr is not None:
- # if it's not None or a str, assume it's already the dict we want
- port_ips = port_ips_attr
-
- endpoints: List[EndPoint] = []
- for port in ports:
- if str(port) in port_ips:
- endpoints.append(EndPoint(port_ips[str(port)], port))
- else:
- endpoints.append(EndPoint('0.0.0.0', port))
-
- return endpoints
-
-
-def get_config_and_keyring(ctx):
- # type: (CephadmContext) -> Tuple[Optional[str], Optional[str]]
- config = None
- keyring = None
-
- d = fetch_configs(ctx)
- if d:
- config = d.get('config')
- keyring = d.get('keyring')
- if config and keyring:
- return config, keyring
-
- if 'config' in ctx and ctx.config:
- try:
- with open(ctx.config, 'r') as f:
- config = f.read()
- except FileNotFoundError as e:
- raise Error(e)
-
- if 'key' in ctx and ctx.key:
- keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key)
- elif 'keyring' in ctx and ctx.keyring:
- try:
- with open(ctx.keyring, 'r') as f:
- keyring = f.read()
- except FileNotFoundError as e:
- raise Error(e)
-
- return config, keyring
-
-
-def get_container_binds(ctx, fsid, daemon_type, daemon_id):
- # type: (CephadmContext, str, str, Union[int, str, None]) -> List[List[str]]
- binds = list()
-
- if daemon_type == CephIscsi.daemon_type:
- binds.extend(CephIscsi.get_container_binds())
- if daemon_type == CephNvmeof.daemon_type:
- binds.extend(CephNvmeof.get_container_binds())
- elif daemon_type == CustomContainer.daemon_type:
- assert daemon_id
- cc = CustomContainer.init(ctx, fsid, daemon_id)
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- binds.extend(cc.get_container_binds(data_dir))
-
- return binds
+ mounts = _get_container_mounts_for_type(ctx, fsid, daemon_type)
+ _update_podman_mounts(ctx, mounts)
+ return mounts
-def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
- no_config=False):
- # type: (CephadmContext, str, str, Union[int, str, None], Optional[bool]) -> Dict[str, str]
+def _get_container_mounts_for_type(
+ ctx: CephadmContext, fsid: str, daemon_type: str
+) -> Dict[str, str]:
+ """The main implementation of get_container_mounts_for_type minus the call
+ to _update_podman_mounts so that this can be called from
+ get_container_mounts.
+ """
mounts = dict()
- if daemon_type in Ceph.daemons:
+ if daemon_type in ceph_daemons():
if fsid:
run_path = os.path.join('/var/run/ceph', fsid)
if os.path.exists(run_path):
@@ -3329,20 +2857,6 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
journald_sock_dir = '/run/systemd/journal'
mounts[journald_sock_dir] = journald_sock_dir
- if daemon_type in Ceph.daemons and daemon_id:
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- if daemon_type == 'rgw':
- cdata_dir = '/var/lib/ceph/radosgw/ceph-rgw.%s' % (daemon_id)
- else:
- cdata_dir = '/var/lib/ceph/%s/ceph-%s' % (daemon_type, daemon_id)
- if daemon_type != 'crash':
- mounts[data_dir] = cdata_dir + ':z'
- if not no_config:
- mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z'
- if daemon_type in ['rbd-mirror', 'cephfs-mirror', 'crash', 'ceph-exporter']:
- # these do not search for their keyrings in a data directory
- mounts[data_dir + '/keyring'] = '/etc/ceph/ceph.client.%s.%s.keyring' % (daemon_type, daemon_id)
-
if daemon_type in ['mon', 'osd', 'clusterless-ceph-volume']:
mounts['/dev'] = '/dev' # FIXME: narrow this down?
mounts['/run/udev'] = '/run/udev'
@@ -3367,97 +2881,56 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id,
if ctx.shared_ceph_folder: # make easy manager modules/ceph-volume development
ceph_folder = pathify(ctx.shared_ceph_folder)
if os.path.exists(ceph_folder):
+ cephadm_binary = ceph_folder + '/src/cephadm/cephadm'
+ if not os.path.exists(pathify(cephadm_binary)):
+ raise Error("cephadm binary does not exist. Please run './build.sh cephadm' from ceph/src/cephadm/ directory.")
+ mounts[cephadm_binary] = '/usr/sbin/cephadm'
mounts[ceph_folder + '/src/ceph-volume/ceph_volume'] = '/usr/lib/python3.6/site-packages/ceph_volume'
- mounts[ceph_folder + '/src/cephadm/cephadm.py'] = '/usr/sbin/cephadm'
mounts[ceph_folder + '/src/pybind/mgr'] = '/usr/share/ceph/mgr'
mounts[ceph_folder + '/src/python-common/ceph'] = '/usr/lib/python3.6/site-packages/ceph'
mounts[ceph_folder + '/monitoring/ceph-mixin/dashboards_out'] = '/etc/grafana/dashboards/ceph-dashboard'
mounts[ceph_folder + '/monitoring/ceph-mixin/prometheus_alerts.yml'] = '/etc/prometheus/ceph/ceph_default_alerts.yml'
else:
- logger.error('{}{}{}'.format(termcolor.red,
- 'Ceph shared source folder does not exist.',
- termcolor.end))
+ logger.error(
+ 'Ceph shared source folder does not exist.',
+ extra=Highlight.FAILURE.extra())
except AttributeError:
pass
+ return mounts
- if daemon_type in Monitoring.components and daemon_id:
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- log_dir = get_log_dir(fsid, ctx.log_dir)
- if daemon_type == 'prometheus':
- mounts[os.path.join(data_dir, 'etc/prometheus')] = '/etc/prometheus:Z'
- mounts[os.path.join(data_dir, 'data')] = '/prometheus:Z'
- elif daemon_type == 'loki':
- mounts[os.path.join(data_dir, 'etc/loki')] = '/etc/loki:Z'
- mounts[os.path.join(data_dir, 'data')] = '/loki:Z'
- elif daemon_type == 'promtail':
- mounts[os.path.join(data_dir, 'etc/promtail')] = '/etc/promtail:Z'
- mounts[log_dir] = '/var/log/ceph:z'
- mounts[os.path.join(data_dir, 'data')] = '/promtail:Z'
- elif daemon_type == 'node-exporter':
- mounts[os.path.join(data_dir, 'etc/node-exporter')] = '/etc/node-exporter:Z'
- mounts['/proc'] = '/host/proc:ro'
- mounts['/sys'] = '/host/sys:ro'
- mounts['/'] = '/rootfs:ro'
- elif daemon_type == 'grafana':
- mounts[os.path.join(data_dir, 'etc/grafana/grafana.ini')] = '/etc/grafana/grafana.ini:Z'
- mounts[os.path.join(data_dir, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z'
- mounts[os.path.join(data_dir, 'etc/grafana/certs')] = '/etc/grafana/certs:Z'
- mounts[os.path.join(data_dir, 'data/grafana.db')] = '/var/lib/grafana/grafana.db:Z'
- elif daemon_type == 'alertmanager':
- mounts[os.path.join(data_dir, 'etc/alertmanager')] = '/etc/alertmanager:Z'
-
- if daemon_type == NFSGanesha.daemon_type:
- assert daemon_id
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id)
- mounts.update(nfs_ganesha.get_container_mounts(data_dir))
-
- if daemon_type == HAproxy.daemon_type:
- assert daemon_id
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- mounts.update(HAproxy.get_container_mounts(data_dir))
-
- if daemon_type == CephNvmeof.daemon_type:
- assert daemon_id
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- mounts.update(CephNvmeof.get_container_mounts(data_dir))
-
- if daemon_type == CephIscsi.daemon_type:
- assert daemon_id
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- # Removes ending ".tcmu" from data_dir a tcmu-runner uses the same data_dir
- # as rbd-runner-api
- if data_dir.endswith('.tcmu'):
- data_dir = re.sub(r'\.tcmu$', '', data_dir)
- log_dir = get_log_dir(fsid, ctx.log_dir)
- mounts.update(CephIscsi.get_container_mounts(data_dir, log_dir))
-
- if daemon_type == Keepalived.daemon_type:
- assert daemon_id
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- mounts.update(Keepalived.get_container_mounts(data_dir))
-
- if daemon_type == CustomContainer.daemon_type:
- assert daemon_id
- cc = CustomContainer.init(ctx, fsid, daemon_id)
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
- mounts.update(cc.get_container_mounts(data_dir))
-
- # Modifications podman makes to /etc/hosts causes issues with
- # certain daemons (specifically referencing "host.containers.internal" entry
- # being added to /etc/hosts in this case). To avoid that, but still
- # allow users to use /etc/hosts for hostname resolution, we can
- # mount the host's /etc/hosts file.
- # https://tracker.ceph.com/issues/58532
- # https://tracker.ceph.com/issues/57018
- if isinstance(ctx.container_engine, Podman):
- if os.path.exists('/etc/hosts'):
- if '/etc/hosts' not in mounts:
- mounts['/etc/hosts'] = '/etc/hosts:ro'
+def get_container_mounts(
+ ctx: CephadmContext, ident: 'DaemonIdentity', no_config: bool = False
+) -> Dict[str, str]:
+ """Return a dictionary mapping container-external paths to container-internal
+ paths given a daemon identity.
+ Setting `no_config` will skip mapping a daemon specific ceph.conf file.
+ """
+ # unpack daemon_type from ident because they're used very frequently
+ daemon_type = ident.daemon_type
+ mounts: Dict[str, str] = {}
+
+ assert ident.fsid
+ assert ident.daemon_id
+ # Ceph daemon types are special cased here beacause of the no_config
+ # option which JJM thinks is *only* used by cephadm shell
+ if daemon_type in ceph_daemons():
+ mounts = Ceph.get_ceph_mounts(ctx, ident, no_config=no_config)
+ else:
+ daemon = daemon_form_create(ctx, ident)
+ assert isinstance(daemon, ContainerDaemonForm)
+ daemon.customize_container_mounts(ctx, mounts)
+
+ _update_podman_mounts(ctx, mounts)
return mounts
+def _update_podman_mounts(ctx: CephadmContext, mounts: Dict[str, str]) -> None:
+ """Update the given mounts dict with mounts specific to podman."""
+ if isinstance(ctx.container_engine, Podman):
+ ctx.container_engine.update_mounts(ctx, mounts)
+
+
def get_ceph_volume_container(ctx: CephadmContext,
privileged: bool = True,
cname: str = '',
@@ -3485,151 +2958,106 @@ def get_ceph_volume_container(ctx: CephadmContext,
)
-def set_pids_limit_unlimited(ctx: CephadmContext, container_args: List[str]) -> None:
- # set container's pids-limit to unlimited rather than default (Docker 4096 / Podman 2048)
- # Useful for daemons like iscsi where the default pids-limit limits the number of luns
- # per iscsi target or rgw where increasing the rgw_thread_pool_size to a value near
- # the default pids-limit may cause the container to crash.
- if (
- isinstance(ctx.container_engine, Podman)
- and ctx.container_engine.version >= PIDS_LIMIT_UNLIMITED_PODMAN_VERSION
- ):
- container_args.append('--pids-limit=-1')
- else:
- container_args.append('--pids-limit=0')
-
-
-def get_container(ctx: CephadmContext,
- fsid: str, daemon_type: str, daemon_id: Union[int, str],
- privileged: bool = False,
- ptrace: bool = False,
- container_args: Optional[List[str]] = None) -> 'CephContainer':
+def get_container(
+ ctx: CephadmContext,
+ ident: 'DaemonIdentity',
+ privileged: bool = False,
+ ptrace: bool = False,
+ container_args: Optional[List[str]] = None,
+) -> 'CephContainer':
entrypoint: str = ''
- name: str = ''
- ceph_args: List[str] = []
+ d_args: List[str] = []
envs: List[str] = []
host_network: bool = True
+ binds: List[List[str]] = []
+ mounts: Dict[str, str] = {}
- if daemon_type in Ceph.daemons:
- envs.append('TCMALLOC_MAX_TOTAL_THREAD_CACHE_BYTES=134217728')
+ daemon_type = ident.daemon_type
if container_args is None:
container_args = []
- if daemon_type in Ceph.daemons or daemon_type in Ceph.gateways:
- set_pids_limit_unlimited(ctx, container_args)
+ if Ceph.for_daemon_type(daemon_type) or OSD.for_daemon_type(daemon_type):
+ ceph_daemon = daemon_form_create(ctx, ident)
+ assert isinstance(ceph_daemon, ContainerDaemonForm)
+ entrypoint = ceph_daemon.default_entrypoint()
+ ceph_daemon.customize_container_envs(ctx, envs)
+ ceph_daemon.customize_container_args(ctx, container_args)
+ ceph_daemon.customize_process_args(ctx, d_args)
+ mounts = get_container_mounts(ctx, ident)
if daemon_type in ['mon', 'osd']:
# mon and osd need privileged in order for libudev to query devices
privileged = True
- if daemon_type == 'rgw':
- entrypoint = '/usr/bin/radosgw'
- name = 'client.rgw.%s' % daemon_id
- elif daemon_type == 'rbd-mirror':
- entrypoint = '/usr/bin/rbd-mirror'
- name = 'client.rbd-mirror.%s' % daemon_id
- elif daemon_type == 'cephfs-mirror':
- entrypoint = '/usr/bin/cephfs-mirror'
- name = 'client.cephfs-mirror.%s' % daemon_id
- elif daemon_type == 'crash':
- entrypoint = '/usr/bin/ceph-crash'
- name = 'client.crash.%s' % daemon_id
- elif daemon_type in ['mon', 'mgr', 'mds', 'osd']:
- entrypoint = '/usr/bin/ceph-' + daemon_type
- name = '%s.%s' % (daemon_type, daemon_id)
- elif daemon_type in Monitoring.components:
- entrypoint = ''
+ if daemon_type in Monitoring.components:
+ monitoring = Monitoring.create(ctx, ident)
+ entrypoint = monitoring.default_entrypoint()
+ monitoring.customize_container_args(ctx, container_args)
+ monitoring.customize_process_args(ctx, d_args)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type in Tracing.components:
- entrypoint = ''
- name = '%s.%s' % (daemon_type, daemon_id)
- config = fetch_configs(ctx)
- Tracing.set_configuration(config, daemon_type)
- envs.extend(Tracing.components[daemon_type].get('envs', []))
+ tracing = Tracing.create(ctx, ident)
+ entrypoint = tracing.default_entrypoint()
+ tracing.customize_container_envs(ctx, envs)
+ tracing.customize_process_args(ctx, d_args)
elif daemon_type == NFSGanesha.daemon_type:
- entrypoint = NFSGanesha.entrypoint
- name = '%s.%s' % (daemon_type, daemon_id)
- envs.extend(NFSGanesha.get_container_envs())
+ nfs_ganesha = NFSGanesha.create(ctx, ident)
+ entrypoint = nfs_ganesha.default_entrypoint()
+ nfs_ganesha.customize_container_envs(ctx, envs)
+ nfs_ganesha.customize_container_args(ctx, container_args)
+ nfs_ganesha.customize_process_args(ctx, d_args)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type == CephExporter.daemon_type:
- entrypoint = CephExporter.entrypoint
- name = 'client.ceph-exporter.%s' % daemon_id
+ ceph_exporter = CephExporter.create(ctx, ident)
+ entrypoint = ceph_exporter.default_entrypoint()
+ ceph_exporter.customize_container_envs(ctx, envs)
+ ceph_exporter.customize_container_args(ctx, container_args)
+ ceph_exporter.customize_process_args(ctx, d_args)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type == HAproxy.daemon_type:
- name = '%s.%s' % (daemon_type, daemon_id)
- container_args.extend(['--user=root']) # haproxy 2.4 defaults to a different user
+ haproxy = HAproxy.create(ctx, ident)
+ haproxy.customize_container_args(ctx, container_args)
+ haproxy.customize_process_args(ctx, d_args)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type == Keepalived.daemon_type:
- name = '%s.%s' % (daemon_type, daemon_id)
- envs.extend(Keepalived.get_container_envs())
- container_args.extend(['--cap-add=NET_ADMIN', '--cap-add=NET_RAW'])
+ keepalived = Keepalived.create(ctx, ident)
+ keepalived.customize_container_envs(ctx, envs)
+ keepalived.customize_container_args(ctx, container_args)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type == CephNvmeof.daemon_type:
- name = '%s.%s' % (daemon_type, daemon_id)
- container_args.extend(['--ulimit', 'memlock=-1:-1'])
- container_args.extend(['--ulimit', 'nofile=10240'])
- container_args.extend(['--cap-add=SYS_ADMIN', '--cap-add=CAP_SYS_NICE'])
+ nvmeof = CephNvmeof.create(ctx, ident)
+ nvmeof.customize_container_args(ctx, container_args)
+ binds = get_container_binds(ctx, ident)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type == CephIscsi.daemon_type:
- entrypoint = CephIscsi.entrypoint
- name = '%s.%s' % (daemon_type, daemon_id)
+ iscsi = CephIscsi.create(ctx, ident)
+ entrypoint = iscsi.default_entrypoint()
+ iscsi.customize_container_args(ctx, container_args)
# So the container can modprobe iscsi_target_mod and have write perms
# to configfs we need to make this a privileged container.
privileged = True
+ binds = get_container_binds(ctx, ident)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(ctx, fsid, daemon_id)
- entrypoint = cc.entrypoint
+ cc = CustomContainer.init(ctx, ident.fsid, ident.daemon_id)
+ entrypoint = cc.default_entrypoint()
host_network = False
- envs.extend(cc.get_container_envs())
- container_args.extend(cc.get_container_args())
-
- if daemon_type in Monitoring.components:
- uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
- monitoring_args = [
- '--user',
- str(uid),
- # FIXME: disable cpu/memory limits for the time being (not supported
- # by ubuntu 18.04 kernel!)
- ]
- container_args.extend(monitoring_args)
- if daemon_type == 'node-exporter':
- # in order to support setting '--path.procfs=/host/proc','--path.sysfs=/host/sys',
- # '--path.rootfs=/rootfs' for node-exporter we need to disable selinux separation
- # between the node-exporter container and the host to avoid selinux denials
- container_args.extend(['--security-opt', 'label=disable'])
- elif daemon_type == 'crash':
- ceph_args = ['-n', name]
- elif daemon_type in Ceph.daemons:
- ceph_args = ['-n', name, '-f']
+ cc.customize_container_envs(ctx, envs)
+ cc.customize_container_args(ctx, container_args)
+ cc.customize_process_args(ctx, d_args)
+ binds = get_container_binds(ctx, ident)
+ mounts = get_container_mounts(ctx, ident)
elif daemon_type == SNMPGateway.daemon_type:
- sg = SNMPGateway.init(ctx, fsid, daemon_id)
- container_args.append(
- f'--env-file={sg.conf_file_path}'
- )
-
- # if using podman, set -d, --conmon-pidfile & --cidfile flags
- # so service can have Type=Forking
- if isinstance(ctx.container_engine, Podman):
- runtime_dir = '/run'
- container_args.extend([
- '-d', '--log-driver', 'journald',
- '--conmon-pidfile',
- runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, daemon_id),
- '--cidfile',
- runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id),
- ])
- if ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION and not ctx.no_cgroups_split:
- container_args.append('--cgroups=split')
- # if /etc/hosts doesn't exist, we can be confident
- # users aren't using it for host name resolution
- # and adding --no-hosts avoids bugs created in certain daemons
- # by modifications podman makes to /etc/hosts
- # https://tracker.ceph.com/issues/58532
- # https://tracker.ceph.com/issues/57018
- if not os.path.exists('/etc/hosts'):
- container_args.extend(['--no-hosts'])
+ sg = SNMPGateway.create(ctx, ident)
+ sg.customize_container_args(ctx, container_args)
+ sg.customize_process_args(ctx, d_args)
+ _update_container_args_for_podman(ctx, ident, container_args)
return CephContainer.for_daemon(
ctx,
- fsid=fsid,
- daemon_type=daemon_type,
- daemon_id=str(daemon_id),
+ ident=ident,
entrypoint=entrypoint,
- args=ceph_args + get_daemon_args(ctx, fsid, daemon_type, daemon_id),
+ args=d_args,
container_args=container_args,
- volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id),
- bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
+ volume_mounts=mounts,
+ bind_mounts=binds,
envs=envs,
privileged=privileged,
ptrace=ptrace,
@@ -3637,42 +3065,20 @@ def get_container(ctx: CephadmContext,
)
-def extract_uid_gid(ctx, img='', file_path='/var/lib/ceph'):
- # type: (CephadmContext, str, Union[str, List[str]]) -> Tuple[int, int]
-
- if not img:
- img = ctx.image
-
- if isinstance(file_path, str):
- paths = [file_path]
- else:
- paths = file_path
-
- ex: Optional[Tuple[str, RuntimeError]] = None
-
- for fp in paths:
- try:
- out = CephContainer(
- ctx,
- image=img,
- entrypoint='stat',
- args=['-c', '%u %g', fp]
- ).run(verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
- uid, gid = out.split(' ')
- return int(uid), int(gid)
- except RuntimeError as e:
- ex = (fp, e)
- if ex:
- raise Error(f'Failed to extract uid/gid for path {ex[0]}: {ex[1]}')
-
- raise RuntimeError('uid/gid not found')
+def _update_container_args_for_podman(
+ ctx: CephadmContext, ident: DaemonIdentity, container_args: List[str]
+) -> None:
+ if not isinstance(ctx.container_engine, Podman):
+ return
+ service_name = f'{ident.unit_name}.service'
+ container_args.extend(
+ ctx.container_engine.service_args(ctx, service_name)
+ )
def deploy_daemon(
ctx: CephadmContext,
- fsid: str,
- daemon_type: str,
- daemon_id: Union[int, str],
+ ident: 'DaemonIdentity',
c: Optional['CephContainer'],
uid: int,
gid: int,
@@ -3684,6 +3090,7 @@ def deploy_daemon(
init_containers: Optional[List['InitContainer']] = None,
) -> None:
endpoints = endpoints or []
+ daemon_type = ident.daemon_type
# only check port in use if fresh deployment since service
# we are redeploying/reconfiguring will already be using the port
if deployment_type == DeploymentType.DEFAULT:
@@ -3697,7 +3104,7 @@ def deploy_daemon(
else:
raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, endpoints)), daemon_type))
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+ data_dir = ident.data_dir(ctx.data_dir)
if deployment_type == DeploymentType.RECONFIG and not os.path.exists(data_dir):
raise Error('cannot reconfig, data path %s does not exist' % data_dir)
if daemon_type == 'mon' and not os.path.exists(data_dir):
@@ -3710,23 +3117,24 @@ def deploy_daemon(
tmp_config = write_tmp(config, uid, gid)
# --mkfs
- create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid)
- mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', daemon_id)
- log_dir = get_log_dir(fsid, ctx.log_dir)
+ create_daemon_dirs(ctx, ident, uid, gid)
+ assert ident.daemon_type == 'mon'
+ mon_dir = ident.data_dir(ctx.data_dir)
+ log_dir = get_log_dir(ident.fsid, ctx.log_dir)
CephContainer(
ctx,
image=ctx.image,
entrypoint='/usr/bin/ceph-mon',
args=[
'--mkfs',
- '-i', str(daemon_id),
- '--fsid', fsid,
+ '-i', ident.daemon_id,
+ '--fsid', ident.fsid,
'-c', '/tmp/config',
'--keyring', '/tmp/keyring',
- ] + get_daemon_args(ctx, fsid, 'mon', daemon_id),
+ ] + Ceph.create(ctx, ident).get_daemon_args(),
volume_mounts={
log_dir: '/var/log/ceph:z',
- mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (daemon_id),
+ mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (ident.daemon_id),
tmp_keyring.name: '/tmp/keyring:z',
tmp_config.name: '/tmp/config:z',
},
@@ -3737,11 +3145,7 @@ def deploy_daemon(
f.write(config)
else:
# dirs, conf, keyring
- create_daemon_dirs(
- ctx,
- fsid, daemon_type, daemon_id,
- uid, gid,
- config, keyring)
+ create_daemon_dirs(ctx, ident, uid, gid, config, keyring)
# only write out unit files and start daemon
# with systemd if this is not a reconfig
@@ -3750,13 +3154,20 @@ def deploy_daemon(
config_js = fetch_configs(ctx)
assert isinstance(config_js, dict)
- cephadm_agent = CephadmAgent(ctx, fsid, daemon_id)
+ cephadm_agent = CephadmAgent(ctx, ident.fsid, ident.daemon_id)
cephadm_agent.deploy_daemon_unit(config_js)
else:
if c:
- deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id,
- c, osd_fsid=osd_fsid, endpoints=endpoints,
- init_containers=init_containers)
+ deploy_daemon_units(
+ ctx,
+ ident,
+ uid,
+ gid,
+ c,
+ osd_fsid=osd_fsid,
+ endpoints=endpoints,
+ init_containers=init_containers,
+ )
else:
raise RuntimeError('attempting to deploy a daemon without a container image')
@@ -3767,23 +3178,22 @@ def deploy_daemon(
with write_new(data_dir + '/unit.configured', owner=(uid, gid)) as f:
f.write('mtime is time we were last configured\n')
- update_firewalld(ctx, daemon_type)
+ update_firewalld(ctx, daemon_form_create(ctx, ident))
# Open ports explicitly required for the daemon
- if endpoints:
- fw = Firewalld(ctx)
- fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, []))
- fw.apply_rules()
+ if not ('skip_firewalld' in ctx and ctx.skip_firewalld):
+ if endpoints:
+ fw = Firewalld(ctx)
+ fw.open_ports([e.port for e in endpoints] + fw.external_ports.get(daemon_type, []))
+ fw.apply_rules()
# If this was a reconfig and the daemon is not a Ceph daemon, restart it
# so it can pick up potential changes to its configuration files
- if deployment_type == DeploymentType.RECONFIG and daemon_type not in Ceph.daemons:
+ if deployment_type == DeploymentType.RECONFIG and daemon_type not in ceph_daemons():
# ceph daemons do not need a restart; others (presumably) do to pick
# up the new config
- call_throws(ctx, ['systemctl', 'reset-failed',
- get_unit_name(fsid, daemon_type, daemon_id)])
- call_throws(ctx, ['systemctl', 'restart',
- get_unit_name(fsid, daemon_type, daemon_id)])
+ call_throws(ctx, ['systemctl', 'reset-failed', ident.unit_name])
+ call_throws(ctx, ['systemctl', 'restart', ident.unit_name])
def _bash_cmd(
@@ -3904,11 +3314,9 @@ def clean_cgroup(ctx: CephadmContext, fsid: str, unit_name: str) -> None:
def deploy_daemon_units(
ctx: CephadmContext,
- fsid: str,
+ ident: 'DaemonIdentity',
uid: int,
gid: int,
- daemon_type: str,
- daemon_id: Union[int, str],
container: 'CephContainer',
enable: bool = True,
start: bool = True,
@@ -3918,23 +3326,30 @@ def deploy_daemon_units(
) -> None:
# cmd
- data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id)
+ # unpack values from ident because they're used very frequently
+ fsid = ident.fsid
+ daemon_type = ident.daemon_type
+ daemon_id = ident.daemon_id
+
+ data_dir = ident.data_dir(ctx.data_dir)
run_file_path = data_dir + '/unit.run'
meta_file_path = data_dir + '/unit.meta'
with write_new(run_file_path) as f, write_new(meta_file_path) as metaf:
f.write('set -e\n')
- if daemon_type in Ceph.daemons:
+ if daemon_type in ceph_daemons():
install_path = find_program('install')
f.write('{install_path} -d -m0770 -o {uid} -g {gid} /var/run/ceph/{fsid}\n'.format(install_path=install_path, fsid=fsid, uid=uid, gid=gid))
# pre-start cmd(s)
if daemon_type == 'osd':
assert osd_fsid
- _write_osd_unit_run_commands(ctx, f, daemon_type, str(daemon_id), fsid, osd_fsid, data_dir, uid, gid)
+ _write_osd_unit_run_commands(
+ ctx, f, ident, osd_fsid, data_dir, uid, gid
+ )
elif daemon_type == CephIscsi.daemon_type:
- _write_iscsi_unit_run_commands(ctx, f, daemon_type, str(daemon_id), fsid, data_dir)
+ _write_iscsi_unit_run_commands(ctx, f, ident, data_dir)
init_containers = init_containers or []
if init_containers:
_write_init_container_cmds_clean(ctx, f, init_containers[0])
@@ -3964,9 +3379,9 @@ def deploy_daemon_units(
_write_stop_actions(ctx, cast(TextIO, f), container, timeout)
if daemon_type == 'osd':
assert osd_fsid
- _write_osd_unit_poststop_commands(ctx, f, daemon_type, str(daemon_id), fsid, osd_fsid)
+ _write_osd_unit_poststop_commands(ctx, f, ident, osd_fsid)
elif daemon_type == CephIscsi.daemon_type:
- _write_iscsi_unit_poststop_commands(ctx, f, daemon_type, str(daemon_id), fsid, data_dir)
+ _write_iscsi_unit_poststop_commands(ctx, f, ident, data_dir)
# post-stop command(s)
with write_new(data_dir + '/unit.stop') as f:
@@ -3977,7 +3392,7 @@ def deploy_daemon_units(
f.write(container.image + '\n')
# sysctl
- install_sysctl(ctx, fsid, daemon_type)
+ install_sysctl(ctx, fsid, daemon_form_create(ctx, ident))
# systemd
install_base_units(ctx, fsid)
@@ -4013,9 +3428,7 @@ def _write_stop_actions(
def _write_osd_unit_run_commands(
ctx: CephadmContext,
f: IO,
- daemon_type: str,
- daemon_id: str,
- fsid: str,
+ ident: 'DaemonIdentity',
osd_fsid: str,
data_dir: str,
uid: int,
@@ -4023,7 +3436,7 @@ def _write_osd_unit_run_commands(
) -> None:
# osds have a pre-start step
simple_fn = os.path.join('/etc/ceph/osd',
- '%s-%s.json.adopted-by-cephadm' % (daemon_id, osd_fsid))
+ '%s-%s.json.adopted-by-cephadm' % (ident.daemon_id, osd_fsid))
if os.path.exists(simple_fn):
f.write('# Simple OSDs need chown on startup:\n')
for n in ['block', 'block.db', 'block.wal']:
@@ -4032,11 +3445,14 @@ def _write_osd_unit_run_commands(
else:
# if ceph-volume does not support 'ceph-volume activate', we must
# do 'ceph-volume lvm activate'.
+ fsid = ident.fsid
+ daemon_type = ident.daemon_type
+ daemon_id = ident.daemon_id
test_cv = get_ceph_volume_container(
ctx,
args=['activate', '--bad-option'],
- volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id),
- bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
+ volume_mounts=get_container_mounts(ctx, ident),
+ bind_mounts=get_container_binds(ctx, ident),
cname='ceph-%s-%s.%s-activate-test' % (fsid, daemon_type, daemon_id),
)
out, err, ret = call(ctx, test_cv.run_cmd(), verbosity=CallVerbosity.SILENT)
@@ -4061,282 +3477,51 @@ def _write_osd_unit_run_commands(
prestart = get_ceph_volume_container(
ctx,
args=cmd,
- volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id),
- bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
+ volume_mounts=get_container_mounts(ctx, ident),
+ bind_mounts=get_container_binds(ctx, ident),
cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id),
)
_write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate')
def _write_iscsi_unit_run_commands(
- ctx: CephadmContext, f: IO, daemon_type: str, daemon_id: str, fsid: str, data_dir: str
+ ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', data_dir: str
) -> None:
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n')
- ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
+ ceph_iscsi = CephIscsi.init(ctx, ident.fsid, ident.daemon_id)
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
_write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runner container', background=True)
def _write_osd_unit_poststop_commands(
- ctx: CephadmContext, f: IO, daemon_type: str, daemon_id: str, fsid: str, osd_fsid: str
+ ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', osd_fsid: str
) -> None:
poststop = get_ceph_volume_container(
ctx,
args=[
'lvm', 'deactivate',
- str(daemon_id), osd_fsid,
+ ident.daemon_id, osd_fsid,
],
- volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id),
- bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id),
- cname='ceph-%s-%s.%s-deactivate' % (fsid, daemon_type,
- daemon_id),
+ volume_mounts=get_container_mounts(ctx, ident),
+ bind_mounts=get_container_binds(ctx, ident),
+ cname='ceph-%s-%s.%s-deactivate' % (ident.fsid, ident.daemon_type, ident.daemon_id),
)
_write_container_cmd_to_bash(ctx, f, poststop, 'deactivate osd')
def _write_iscsi_unit_poststop_commands(
- ctx: CephadmContext, f: IO, daemon_type: str, daemon_id: str, fsid: str, data_dir: str
+ ctx: CephadmContext, f: IO, ident: 'DaemonIdentity', data_dir: str
) -> None:
# make sure we also stop the tcmu container
runtime_dir = '/run'
- ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id)
+ ceph_iscsi = CephIscsi.init(ctx, ident.fsid, ident.daemon_id)
tcmu_container = ceph_iscsi.get_tcmu_runner_container()
f.write('! ' + ' '.join(tcmu_container.stop_cmd()) + '\n')
- f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n')
- f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, str(daemon_id) + '.tcmu') + '\n')
+ f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-pid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n')
+ f.write('! ' + 'rm ' + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (ident.fsid, ident.daemon_type, ident.daemon_id + '.tcmu') + '\n')
f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n')
-class Firewalld(object):
-
- # for specifying ports we should always open when opening
- # ports for a daemon of that type. Main use case is for ports
- # that we should open when deploying the daemon type but that
- # the daemon itself may not necessarily need to bind to the port.
- # This needs to be handed differently as we don't want to fail
- # deployment if the port cannot be bound to but we still want to
- # open the port in the firewall.
- external_ports: Dict[str, List[int]] = {
- 'iscsi': [3260] # 3260 is the well known iSCSI port
- }
-
- def __init__(self, ctx):
- # type: (CephadmContext) -> None
- self.ctx = ctx
- self.available = self.check()
-
- def check(self):
- # type: () -> bool
- self.cmd = find_executable('firewall-cmd')
- if not self.cmd:
- logger.debug('firewalld does not appear to be present')
- return False
- (enabled, state, _) = check_unit(self.ctx, 'firewalld.service')
- if not enabled:
- logger.debug('firewalld.service is not enabled')
- return False
- if state != 'running':
- logger.debug('firewalld.service is not running')
- return False
-
- logger.info('firewalld ready')
- return True
-
- def enable_service_for(self, daemon_type):
- # type: (str) -> None
- if not self.available:
- logger.debug('Not possible to enable service <%s>. firewalld.service is not available' % daemon_type)
- return
-
- if daemon_type == 'mon':
- svc = 'ceph-mon'
- elif daemon_type in ['mgr', 'mds', 'osd']:
- svc = 'ceph'
- elif daemon_type == NFSGanesha.daemon_type:
- svc = 'nfs'
- else:
- return
-
- if not self.cmd:
- raise RuntimeError('command not defined')
-
- out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG)
- if ret:
- logger.info('Enabling firewalld service %s in current zone...' % svc)
- out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-service', svc])
- if ret:
- raise RuntimeError(
- 'unable to add service %s to current zone: %s' % (svc, err))
- else:
- logger.debug('firewalld service %s is enabled in current zone' % svc)
-
- def open_ports(self, fw_ports):
- # type: (List[int]) -> None
- if not self.available:
- logger.debug('Not possible to open ports <%s>. firewalld.service is not available' % fw_ports)
- return
-
- if not self.cmd:
- raise RuntimeError('command not defined')
-
- for port in fw_ports:
- tcp_port = str(port) + '/tcp'
- out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
- if ret:
- logger.info('Enabling firewalld port %s in current zone...' % tcp_port)
- out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-port', tcp_port])
- if ret:
- raise RuntimeError('unable to add port %s to current zone: %s' %
- (tcp_port, err))
- else:
- logger.debug('firewalld port %s is enabled in current zone' % tcp_port)
-
- def close_ports(self, fw_ports):
- # type: (List[int]) -> None
- if not self.available:
- logger.debug('Not possible to close ports <%s>. firewalld.service is not available' % fw_ports)
- return
-
- if not self.cmd:
- raise RuntimeError('command not defined')
-
- for port in fw_ports:
- tcp_port = str(port) + '/tcp'
- out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-port', tcp_port], verbosity=CallVerbosity.DEBUG)
- if not ret:
- logger.info('Disabling port %s in current zone...' % tcp_port)
- out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--remove-port', tcp_port])
- if ret:
- raise RuntimeError('unable to remove port %s from current zone: %s' %
- (tcp_port, err))
- else:
- logger.info(f'Port {tcp_port} disabled')
- else:
- logger.info(f'firewalld port {tcp_port} already closed')
-
- def apply_rules(self):
- # type: () -> None
- if not self.available:
- return
-
- if not self.cmd:
- raise RuntimeError('command not defined')
-
- call_throws(self.ctx, [self.cmd, '--reload'])
-
-
-def update_firewalld(ctx, daemon_type):
- # type: (CephadmContext, str) -> None
- if not ('skip_firewalld' in ctx and ctx.skip_firewalld):
- firewall = Firewalld(ctx)
- firewall.enable_service_for(daemon_type)
- firewall.apply_rules()
-
-
-def install_sysctl(ctx: CephadmContext, fsid: str, daemon_type: str) -> None:
- """
- Set up sysctl settings
- """
- def _write(conf: Path, lines: List[str]) -> None:
- lines = [
- '# created by cephadm',
- '',
- *lines,
- '',
- ]
- with write_new(conf, owner=None, perms=None) as f:
- f.write('\n'.join(lines))
-
- conf = Path(ctx.sysctl_dir).joinpath(f'90-ceph-{fsid}-{daemon_type}.conf')
- lines: List = []
-
- if daemon_type == 'osd':
- lines = OSD.get_sysctl_settings()
- elif daemon_type == 'haproxy':
- lines = HAproxy.get_sysctl_settings()
- elif daemon_type == 'keepalived':
- lines = Keepalived.get_sysctl_settings()
- elif daemon_type == CephNvmeof.daemon_type:
- lines = CephNvmeof.get_sysctl_settings()
- lines = filter_sysctl_settings(ctx, lines)
-
- # apply the sysctl settings
- if lines:
- Path(ctx.sysctl_dir).mkdir(mode=0o755, exist_ok=True)
- _write(conf, lines)
- call_throws(ctx, ['sysctl', '--system'])
-
-
-def sysctl_get(ctx: CephadmContext, variable: str) -> Union[str, None]:
- """
- Read a sysctl setting by executing 'sysctl -b {variable}'
- """
- out, err, code = call(ctx, ['sysctl', '-b', variable])
- return out or None
-
-
-def filter_sysctl_settings(ctx: CephadmContext, lines: List[str]) -> List[str]:
- """
- Given a list of sysctl settings, examine the system's current configuration
- and return those which are not currently set as described.
- """
- def test_setting(desired_line: str) -> bool:
- # Remove any comments
- comment_start = desired_line.find('#')
- if comment_start != -1:
- desired_line = desired_line[:comment_start]
- desired_line = desired_line.strip()
- if not desired_line or desired_line.isspace():
- return False
- setting, desired_value = map(lambda s: s.strip(), desired_line.split('='))
- if not setting or not desired_value:
- return False
- actual_value = sysctl_get(ctx, setting)
- return desired_value != actual_value
- return list(filter(test_setting, lines))
-
-
-def migrate_sysctl_dir(ctx: CephadmContext, fsid: str) -> None:
- """
- Cephadm once used '/usr/lib/sysctl.d' for storing sysctl configuration.
- This moves it to '/etc/sysctl.d'.
- """
- deprecated_location: str = '/usr/lib/sysctl.d'
- deprecated_confs: List[str] = glob(f'{deprecated_location}/90-ceph-{fsid}-*.conf')
- if not deprecated_confs:
- return
-
- file_count: int = len(deprecated_confs)
- logger.info(f'Found sysctl {file_count} files in deprecated location {deprecated_location}. Starting Migration.')
- for conf in deprecated_confs:
- try:
- shutil.move(conf, ctx.sysctl_dir)
- file_count -= 1
- except shutil.Error as err:
- if str(err).endswith('already exists'):
- logger.warning(f'Destination file already exists. Deleting {conf}.')
- try:
- os.unlink(conf)
- file_count -= 1
- except OSError as del_err:
- logger.warning(f'Could not remove {conf}: {del_err}.')
- else:
- logger.warning(f'Could not move {conf} from {deprecated_location} to {ctx.sysctl_dir}: {err}')
-
- # Log successful migration
- if file_count == 0:
- logger.info(f'Successfully migrated sysctl config to {ctx.sysctl_dir}.')
- return
-
- # Log partially successful / unsuccessful migration
- files_processed: int = len(deprecated_confs)
- if file_count < files_processed:
- status: str = f'partially successful (failed {file_count}/{files_processed})'
- elif file_count == files_processed:
- status = 'unsuccessful'
- logger.warning(f'Migration of sysctl configuration {status}. You may want to perform a migration manually.')
-
-
def install_base_units(ctx, fsid):
# type: (CephadmContext, str) -> None
"""
@@ -4419,534 +3604,24 @@ def install_base_units(ctx, fsid):
""" % (fsid, ' '.join(targets), '|'.join(targets)))
-def get_unit_file(ctx, fsid):
- # type: (CephadmContext, str) -> str
- extra_args = ''
- if isinstance(ctx.container_engine, Podman):
- extra_args = ('ExecStartPre=-/bin/rm -f %t/%n-pid %t/%n-cid\n'
- 'ExecStopPost=-/bin/rm -f %t/%n-pid %t/%n-cid\n'
- 'Type=forking\n'
- 'PIDFile=%t/%n-pid\n')
- if ctx.container_engine.version >= CGROUPS_SPLIT_PODMAN_VERSION:
- extra_args += 'Delegate=yes\n'
-
- docker = isinstance(ctx.container_engine, Docker)
- u = """# generated by cephadm
-[Unit]
-Description=Ceph %i for {fsid}
-
-# According to:
-# http://www.freedesktop.org/wiki/Software/systemd/NetworkTarget
-# these can be removed once ceph-mon will dynamically change network
-# configuration.
-After=network-online.target local-fs.target time-sync.target{docker_after}
-Wants=network-online.target local-fs.target time-sync.target
-{docker_requires}
-
-PartOf=ceph-{fsid}.target
-Before=ceph-{fsid}.target
-
-[Service]
-LimitNOFILE=1048576
-LimitNPROC=1048576
-EnvironmentFile=-/etc/environment
-ExecStart=/bin/bash {data_dir}/{fsid}/%i/unit.run
-ExecStop=-/bin/bash -c 'bash {data_dir}/{fsid}/%i/unit.stop'
-ExecStopPost=-/bin/bash {data_dir}/{fsid}/%i/unit.poststop
-KillMode=none
-Restart=on-failure
-RestartSec=10s
-TimeoutStartSec=200
-TimeoutStopSec=120
-StartLimitInterval=30min
-StartLimitBurst=5
-{extra_args}
-[Install]
-WantedBy=ceph-{fsid}.target
-""".format(fsid=fsid,
- data_dir=ctx.data_dir,
- extra_args=extra_args,
- # if docker, we depend on docker.service
- docker_after=' docker.service' if docker else '',
- docker_requires='Requires=docker.service\n' if docker else '')
-
- return u
+def get_unit_file(ctx: CephadmContext, fsid: str) -> str:
+ has_docker_engine = isinstance(ctx.container_engine, Docker)
+ has_podman_engine = isinstance(ctx.container_engine, Podman)
+ has_podman_split_version = (
+ has_podman_engine and ctx.container_engine.supports_split_cgroups
+ )
+ return templating.render(
+ ctx,
+ templating.Templates.ceph_service,
+ fsid=fsid,
+ has_docker_engine=has_docker_engine,
+ has_podman_engine=has_podman_engine,
+ has_podman_split_version=has_podman_split_version,
+ )
##################################
-class DaemonIdentity:
- def __init__(
- self,
- fsid: str,
- daemon_type: str,
- daemon_id: Union[int, str],
- subcomponent: str = '',
- ) -> None:
- self._fsid = fsid
- self._daemon_type = daemon_type
- self._daemon_id = str(daemon_id)
- self._subcomponent = subcomponent
-
- @property
- def fsid(self) -> str:
- return self._fsid
-
- @property
- def daemon_type(self) -> str:
- return self._daemon_type
-
- @property
- def daemon_id(self) -> str:
- return self._daemon_id
-
- @property
- def subcomponent(self) -> str:
- return self._subcomponent
-
- @property
- def legacy_container_name(self) -> str:
- return 'ceph-%s-%s.%s' % (self.fsid, self.daemon_type, self.daemon_id)
-
- @property
- def container_name(self) -> str:
- name = f'ceph-{self.fsid}-{self.daemon_type}-{self.daemon_id}'
- if self.subcomponent:
- name = f'{name}-{self.subcomponent}'
- return name.replace('.', '-')
-
- def _replace(
- self,
- *,
- fsid: Optional[str] = None,
- daemon_type: Optional[str] = None,
- daemon_id: Union[None, int, str] = None,
- subcomponent: Optional[str] = None,
- ) -> 'DaemonIdentity':
- return self.__class__(
- fsid=self.fsid if fsid is None else fsid,
- daemon_type=(
- self.daemon_type if daemon_type is None else daemon_type
- ),
- daemon_id=self.daemon_id if daemon_id is None else daemon_id,
- subcomponent=(
- self.subcomponent if subcomponent is None else subcomponent
- ),
- )
-
- @classmethod
- def from_name(cls, fsid: str, name: str) -> 'DaemonIdentity':
- daemon_type, daemon_id = name.split('.', 1)
- return cls(fsid, daemon_type, daemon_id)
-
- @classmethod
- def from_context(cls, ctx: 'CephadmContext') -> 'DaemonIdentity':
- return cls.from_name(ctx.fsid, ctx.name)
-
-
-class BasicContainer:
- def __init__(
- self,
- ctx: CephadmContext,
- *,
- image: str,
- entrypoint: str,
- identity: Optional['DaemonIdentity'],
- args: Optional[List[str]] = None,
- container_args: Optional[List[str]] = None,
- envs: Optional[List[str]] = None,
- volume_mounts: Optional[Dict[str, str]] = None,
- bind_mounts: Optional[List[List[str]]] = None,
- network: str = '',
- ipc: str = '',
- init: bool = False,
- ptrace: bool = False,
- privileged: bool = False,
- remove: bool = False,
- memory_request: Optional[str] = None,
- memory_limit: Optional[str] = None,
- ) -> None:
- self.ctx = ctx
- self.image = image
- self.entrypoint = entrypoint
- self.identity = identity
- self.args = args or []
- self.container_args = container_args or []
- self.envs = envs or []
- self.volume_mounts = volume_mounts or {}
- self.bind_mounts = bind_mounts or []
- self.network = network
- self.ipc = ipc
- self.init = init
- self.ptrace = ptrace
- self.privileged = privileged
- self.remove = remove
- self.memory_request = memory_request
- self.memory_limit = memory_limit
-
- @property
- def _container_engine(self) -> str:
- return self.ctx.container_engine.path
-
- @property
- def _using_podman(self) -> bool:
- return isinstance(self.ctx.container_engine, Podman)
-
- @property
- def _using_docker(self) -> bool:
- return isinstance(self.ctx.container_engine, Docker)
-
- @property
- def cname(self) -> str:
- assert self.identity
- return self.identity.container_name
-
- def build_run_cmd(self) -> List[str]:
- cmd_args: List[str] = [self._container_engine]
- cmd_args.append('run')
- if self.remove:
- cmd_args.append('--rm')
- if self.ipc:
- cmd_args.append(f'--ipc={self.ipc}')
- # some containers (ahem, haproxy) override this, but we want a fast
- # shutdown always (and, more importantly, a successful exit even if we
- # fall back to SIGKILL).
- cmd_args.append('--stop-signal=SIGTERM')
-
- if isinstance(self.ctx.container_engine, Podman):
- if os.path.exists('/etc/ceph/podman-auth.json'):
- cmd_args.append('--authfile=/etc/ceph/podman-auth.json')
-
- if isinstance(self.ctx.container_engine, Docker):
- cmd_args.extend(['--ulimit', 'nofile=1048576'])
-
- if self.memory_request:
- cmd_args.extend(['-e', 'POD_MEMORY_REQUEST', str(self.memory_request)])
- if self.memory_limit:
- cmd_args.extend(['-e', 'POD_MEMORY_LIMIT', str(self.memory_limit)])
- cmd_args.extend(['--memory', str(self.memory_limit)])
-
- if self.network:
- cmd_args.append(f'--net={self.network}')
- if self.entrypoint:
- cmd_args.extend(['--entrypoint', self.entrypoint])
- if self.privileged:
- cmd_args.extend([
- '--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk'])
- if self.ptrace and not self.privileged:
- # if privileged, the SYS_PTRACE cap is already added
- # in addition, --cap-add and --privileged are mutually
- # exclusive since podman >= 2.0
- cmd_args.append('--cap-add=SYS_PTRACE')
- if self.init:
- cmd_args.append('--init')
- if self.cname:
- cmd_args.extend(['--name', self.cname])
-
- envs: List[str] = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
- ]
- if self.envs:
- for env in self.envs:
- envs.extend(['-e', env])
-
- vols: List[str] = []
- vols = sum(
- [
- ['-v', '%s:%s' % (host_dir, container_dir)]
- for host_dir, container_dir in self.volume_mounts.items()
- ],
- [],
- )
-
- binds: List[str] = []
- binds = sum(
- [
- ['--mount', '{}'.format(','.join(bind))]
- for bind in self.bind_mounts
- ],
- [],
- )
-
- return (
- cmd_args
- + self.container_args
- + envs
- + vols
- + binds
- + [self.image]
- + self.args
- )
-
- def build_rm_cmd(self, cname: str = '', storage: bool = False) -> List[str]:
- cmd = [
- self._container_engine,
- 'rm',
- '-f',
- ]
- if storage:
- cmd.append('--storage')
- cmd.append(cname or self.cname)
- return cmd
-
- def build_stop_cmd(self, cname: str = '', timeout: Optional[int] = None) -> List[str]:
- cmd = [self._container_engine, 'stop']
- if timeout is not None:
- cmd.extend(('-t', str(timeout)))
- cmd.append(cname or self.cname)
- return cmd
-
-
-class CephContainer(BasicContainer):
- def __init__(self,
- ctx: CephadmContext,
- image: str,
- entrypoint: str,
- args: List[str] = [],
- volume_mounts: Dict[str, str] = {},
- identity: Optional['DaemonIdentity'] = None,
- cname: str = '',
- container_args: List[str] = [],
- envs: Optional[List[str]] = None,
- privileged: bool = False,
- ptrace: bool = False,
- bind_mounts: Optional[List[List[str]]] = None,
- init: Optional[bool] = None,
- host_network: bool = True,
- memory_request: Optional[str] = None,
- memory_limit: Optional[str] = None,
- ) -> None:
- self.ctx = ctx
- self.image = image
- self.entrypoint = entrypoint
- self.args = args
- self.volume_mounts = volume_mounts
- self.identity = identity
- self._cname = cname
- self.container_args = container_args
- self.envs = envs or []
- self.privileged = privileged
- self.ptrace = ptrace
- self.bind_mounts = bind_mounts if bind_mounts else []
- self.init = init if init else ctx.container_init
- self.host_network = host_network
- self.memory_request = memory_request
- self.memory_limit = memory_limit
- self.remove = True
- self.ipc = 'host'
- self.network = 'host' if self.host_network else ''
-
- @classmethod
- def for_daemon(cls,
- ctx: CephadmContext,
- fsid: str,
- daemon_type: str,
- daemon_id: str,
- entrypoint: str,
- args: List[str] = [],
- volume_mounts: Dict[str, str] = {},
- container_args: List[str] = [],
- envs: Optional[List[str]] = None,
- privileged: bool = False,
- ptrace: bool = False,
- bind_mounts: Optional[List[List[str]]] = None,
- init: Optional[bool] = None,
- host_network: bool = True,
- memory_request: Optional[str] = None,
- memory_limit: Optional[str] = None,
- ) -> 'CephContainer':
- ident = DaemonIdentity(fsid, daemon_type, daemon_id)
- return cls(
- ctx,
- image=ctx.image,
- entrypoint=entrypoint,
- args=args,
- volume_mounts=volume_mounts,
- identity=ident,
- container_args=container_args,
- envs=envs,
- privileged=privileged,
- ptrace=ptrace,
- bind_mounts=bind_mounts,
- init=init,
- host_network=host_network,
- memory_request=memory_request,
- memory_limit=memory_limit,
- )
-
- @property
- def cname(self) -> str:
- """
- podman adds the current container name to the /etc/hosts
- file. Turns out, python's `socket.getfqdn()` differs from
- `hostname -f`, when we have the container names containing
- dots in it.:
-
- # podman run --name foo.bar.baz.com ceph/ceph /bin/bash
- [root@sebastians-laptop /]# cat /etc/hosts
- 127.0.0.1 localhost
- ::1 localhost
- 127.0.1.1 sebastians-laptop foo.bar.baz.com
- [root@sebastians-laptop /]# hostname -f
- sebastians-laptop
- [root@sebastians-laptop /]# python3 -c 'import socket; print(socket.getfqdn())'
- foo.bar.baz.com
-
- Fascinatingly, this doesn't happen when using dashes.
- """
- if not self._cname and self.identity:
- return self.identity.container_name
- return self._cname.replace('.', '-')
-
- @cname.setter
- def cname(self, val: str) -> None:
- self._cname = val
-
- @property
- def old_cname(self) -> str:
- if not self._cname and self.identity:
- return self.identity.legacy_container_name
- return self._cname
-
- def run_cmd(self) -> List[str]:
- if not (self.envs and self.envs[0].startswith('NODE_NAME=')):
- self.envs.insert(0, 'NODE_NAME=%s' % get_hostname())
- return self.build_run_cmd()
-
- def rm_cmd(self, old_cname: bool = False, storage: bool = False) -> List[str]:
- return self.build_rm_cmd(
- cname=self.old_cname if old_cname else self.cname,
- storage=storage,
- )
-
- def stop_cmd(self, old_cname: bool = False, timeout: Optional[int] = None) -> List[str]:
- return self.build_stop_cmd(
- cname=self.old_cname if old_cname else self.cname,
- timeout=timeout,
- )
-
- def shell_cmd(self, cmd: List[str]) -> List[str]:
- cmd_args: List[str] = [
- str(self.ctx.container_engine.path),
- 'run',
- '--rm',
- '--ipc=host',
- ]
- envs: List[str] = [
- '-e', 'CONTAINER_IMAGE=%s' % self.image,
- '-e', 'NODE_NAME=%s' % get_hostname(),
- ]
- vols: List[str] = []
- binds: List[str] = []
-
- if self.host_network:
- cmd_args.append('--net=host')
- if self.ctx.no_hosts:
- cmd_args.append('--no-hosts')
- if self.privileged:
- cmd_args.extend([
- '--privileged',
- # let OSD etc read block devs that haven't been chowned
- '--group-add=disk',
- ])
- if self.init:
- cmd_args.append('--init')
- if self.envs:
- for env in self.envs:
- envs.extend(['-e', env])
-
- vols = sum(
- [['-v', '%s:%s' % (host_dir, container_dir)]
- for host_dir, container_dir in self.volume_mounts.items()], [])
- binds = sum([['--mount', '{}'.format(','.join(bind))]
- for bind in self.bind_mounts], [])
-
- return cmd_args + self.container_args + envs + vols + binds + [
- '--entrypoint', cmd[0],
- self.image,
- ] + cmd[1:]
-
- def exec_cmd(self, cmd):
- # type: (List[str]) -> List[str]
- cname = get_running_container_name(self.ctx, self)
- if not cname:
- raise Error('unable to find container "{}"'.format(self.cname))
- return [
- str(self.ctx.container_engine.path),
- 'exec',
- ] + self.container_args + [
- self.cname,
- ] + cmd
-
- def run(self, timeout=DEFAULT_TIMEOUT, verbosity=CallVerbosity.VERBOSE_ON_FAILURE):
- # type: (Optional[int], CallVerbosity) -> str
- out, _, _ = call_throws(self.ctx, self.run_cmd(),
- desc=self.entrypoint, timeout=timeout, verbosity=verbosity)
- return out
-
-
-class InitContainer(BasicContainer):
- @classmethod
- def from_primary_and_opts(
- cls,
- ctx: CephadmContext,
- primary: 'CephContainer',
- opts: Dict[str, Any],
- data_dir: str = '',
- ) -> 'InitContainer':
- if not opts:
- raise Error('no init container values provided')
- # volume mounts are specified relative to a dir in custom container
- # if we are going to inherit the dirs from the primary then we
- # just copy it. If not, we have to convert the relative paths
- # into absolute paths.
- assert primary.identity
- vmounts = opts.get('volume_mounts')
- if not vmounts:
- vmounts = primary.volume_mounts
- else:
- data_dir = data_dir or get_data_dir(
- primary.identity.fsid,
- ctx.data_dir,
- primary.identity.daemon_type,
- primary.identity.daemon_id,
- )
- vmounts = {
- os.path.join(data_dir, src): dst
- for src, dst in vmounts.items()
- }
- return cls(
- ctx,
- identity=primary.identity._replace(subcomponent='init'),
- image=opts.get('image', primary.image),
- entrypoint=opts.get('entrypoint', primary.entrypoint),
- # note: args is not inherited from primary container
- args=opts.get('entrypoint_args', []),
- volume_mounts=vmounts,
- envs=opts.get('envs', primary.envs),
- # note: privileged is not inherited from primary container
- # we really ought to minimize running stuff as privileged
- privileged=opts.get('privileged', False),
- init=False,
- ptrace=primary.ptrace,
- remove=False,
- memory_request=primary.memory_request,
- memory_limit=primary.memory_limit,
- )
- # Things we are currently not handling:
- # container_args, bind_mounts, network, ipc
-
- def run_cmd(self) -> List[str]:
- return self.build_run_cmd()
-
- def rm_cmd(self, storage: bool = False) -> List[str]:
- return self.build_rm_cmd(storage=storage)
-
-
-#####################################
-
class MgrListener(Thread):
def __init__(self, agent: 'CephadmAgent') -> None:
self.agent = agent
@@ -5014,7 +3689,8 @@ class MgrListener(Thread):
self.agent.wakeup()
-class CephadmAgent():
+@register_daemon_form
+class CephadmAgent(DaemonForm):
daemon_type = 'agent'
default_port = 8498
@@ -5029,6 +3705,18 @@ class CephadmAgent():
'listener.key',
]
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
+ @classmethod
+ def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'CephadmAgent':
+ return cls(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
def __init__(self, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str] = ''):
self.ctx = ctx
self.fsid = fsid
@@ -5102,24 +3790,8 @@ class CephadmAgent():
return ('set -e\n' + f'{py3} {binary_path} agent --fsid {self.fsid} --daemon-id {self.daemon_id} &\n')
def unit_file(self) -> str:
- return """#generated by cephadm
-[Unit]
-Description=cephadm agent for cluster {fsid}
-
-PartOf=ceph-{fsid}.target
-Before=ceph-{fsid}.target
-
-[Service]
-Type=forking
-ExecStart=/bin/bash {data_dir}/unit.run
-Restart=on-failure
-RestartSec=10s
-
-[Install]
-WantedBy=ceph-{fsid}.target
-""".format(
- fsid=self.fsid,
- data_dir=self.daemon_dir
+ return templating.render(
+ self.ctx, templating.Templates.agent_service, agent=self
)
def shutdown(self) -> None:
@@ -5306,7 +3978,11 @@ WantedBy=ceph-{fsid}.target
'enabled': 'true' if enabled else 'false',
'state': state,
}
- c = CephContainer.for_daemon(self.ctx, self.ctx.fsid, daemon_type, daemon_id, 'bash')
+ c = CephContainer.for_daemon(
+ self.ctx,
+ DaemonIdentity(self.ctx.fsid, daemon_type, daemon_id),
+ 'bash',
+ )
container_id: Optional[str] = None
for name in (c.cname, c.old_cname):
if name in name_id_mapping:
@@ -5447,21 +4123,73 @@ def command_agent(ctx: CephadmContext) -> None:
def command_version(ctx):
# type: (CephadmContext) -> int
import importlib
+ import zipimport
+ import types
+ vmod: Optional[types.ModuleType]
+ zmod: Optional[types.ModuleType]
try:
- vmod = importlib.import_module('_version')
+ vmod = importlib.import_module('_cephadmmeta.version')
+ zmod = vmod
except ImportError:
- print('cephadm version UNKNOWN')
- return 1
- _unset = '<UNSET>'
- print('cephadm version {0} ({1}) {2} ({3})'.format(
- getattr(vmod, 'CEPH_GIT_NICE_VER', _unset),
- getattr(vmod, 'CEPH_GIT_VER', _unset),
- getattr(vmod, 'CEPH_RELEASE_NAME', _unset),
- getattr(vmod, 'CEPH_RELEASE_TYPE', _unset),
- ))
+ vmod = zmod = None
+ if vmod is None:
+ # fallback to earlier location
+ try:
+ vmod = importlib.import_module('_version')
+ except ImportError:
+ pass
+ if zmod is None:
+ # fallback to outer package, for zip import module
+ try:
+ zmod = importlib.import_module('_cephadmmeta')
+ except ImportError:
+ zmod = None
+
+ if not ctx.verbose:
+ if vmod is None:
+ print('cephadm version UNKNOWN')
+ return 1
+ _unset = '<UNSET>'
+ print(
+ 'cephadm version {0} ({1}) {2} ({3})'.format(
+ getattr(vmod, 'CEPH_GIT_NICE_VER', _unset),
+ getattr(vmod, 'CEPH_GIT_VER', _unset),
+ getattr(vmod, 'CEPH_RELEASE_NAME', _unset),
+ getattr(vmod, 'CEPH_RELEASE_TYPE', _unset),
+ )
+ )
+ return 0
+
+ out: Dict[str, Any] = {'name': 'cephadm'}
+ ceph_vars = [
+ 'CEPH_GIT_NICE_VER',
+ 'CEPH_GIT_VER',
+ 'CEPH_RELEASE_NAME',
+ 'CEPH_RELEASE_TYPE',
+ ]
+ for var in ceph_vars:
+ value = getattr(vmod, var, None)
+ if value is not None:
+ out[var.lower()] = value
+
+ loader = getattr(zmod, '__loader__', None)
+ if loader and isinstance(loader, zipimport.zipimporter):
+ try:
+ deps_info = json.loads(loader.get_data('_cephadmmeta/deps.json'))
+ out['bundled_packages'] = deps_info
+ except OSError:
+ pass
+ files = getattr(loader, '_files', {})
+ out['zip_root_entries'] = sorted(
+ {p.split('/')[0] for p in files.keys()}
+ )
+
+ json.dump(out, sys.stdout, indent=2)
+ print()
return 0
+
##################################
@@ -5488,14 +4216,7 @@ def _pull_image(ctx, image, insecure=False):
'Digest did not match, expected',
]
- cmd = [ctx.container_engine.path, 'pull', image]
- if isinstance(ctx.container_engine, Podman):
- if insecure:
- cmd.append('--tls-verify=false')
-
- if os.path.exists('/etc/ceph/podman-auth.json'):
- cmd.append('--authfile=/etc/ceph/podman-auth.json')
- cmd_str = ' '.join(cmd)
+ cmd = pull_command(ctx, image, insecure=insecure)
for sleep_secs in [1, 4, 25]:
out, err, ret = call(ctx, cmd, verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
@@ -5505,6 +4226,7 @@ def _pull_image(ctx, image, insecure=False):
if 'unauthorized' in err:
raise UnauthorizedRegistryError()
+ cmd_str = ' '.join(cmd)
if not any(pattern in err for pattern in ignorelist):
raise Error('Failed command: %s' % cmd_str)
@@ -5535,33 +4257,6 @@ def command_inspect_image(ctx):
return 0
-def normalize_image_digest(digest: str) -> str:
- """
- Normal case:
- >>> normalize_image_digest('ceph/ceph', 'docker.io')
- 'docker.io/ceph/ceph'
-
- No change:
- >>> normalize_image_digest('quay.ceph.io/ceph/ceph', 'docker.io')
- 'quay.ceph.io/ceph/ceph'
-
- >>> normalize_image_digest('docker.io/ubuntu', 'docker.io')
- 'docker.io/ubuntu'
-
- >>> normalize_image_digest('localhost/ceph', 'docker.io')
- 'localhost/ceph'
- """
- known_shortnames = [
- 'ceph/ceph',
- 'ceph/daemon',
- 'ceph/daemon-base',
- ]
- for image in known_shortnames:
- if digest.startswith(image):
- return f'{DEFAULT_REGISTRY}/{digest}'
- return digest
-
-
def get_image_info_from_inspect(out, image):
# type: (str, str) -> Dict[str, Union[str,List[str]]]
image_id, digests = out.split(',', 1)
@@ -5577,131 +4272,6 @@ def get_image_info_from_inspect(out, image):
##################################
-def check_subnet(subnets: str) -> Tuple[int, List[int], str]:
- """Determine whether the given string is a valid subnet
-
- :param subnets: subnet string, a single definition or comma separated list of CIDR subnets
- :returns: return code, IP version list of the subnets and msg describing any errors validation errors
- """
-
- rc = 0
- versions = set()
- errors = []
- subnet_list = subnets.split(',')
- for subnet in subnet_list:
- # ensure the format of the string is as expected address/netmask
- subnet = subnet.strip()
- if not re.search(r'\/\d+$', subnet):
- rc = 1
- errors.append(f'{subnet} is not in CIDR format (address/netmask)')
- continue
- try:
- v = ipaddress.ip_network(subnet).version
- versions.add(v)
- except ValueError as e:
- rc = 1
- errors.append(f'{subnet} invalid: {str(e)}')
-
- return rc, list(versions), ', '.join(errors)
-
-
-def unwrap_ipv6(address):
- # type: (str) -> str
- if address.startswith('[') and address.endswith(']'):
- return address[1: -1]
- return address
-
-
-def wrap_ipv6(address):
- # type: (str) -> str
-
- # We cannot assume it's already wrapped or even an IPv6 address if
- # it's already wrapped it'll not pass (like if it's a hostname) and trigger
- # the ValueError
- try:
- if ipaddress.ip_address(address).version == 6:
- return f'[{address}]'
- except ValueError:
- pass
-
- return address
-
-
-def is_ipv6(address):
- # type: (str) -> bool
- address = unwrap_ipv6(address)
- try:
- return ipaddress.ip_address(address).version == 6
- except ValueError:
- logger.warning('Address: {} is not a valid IP address'.format(address))
- return False
-
-
-def ip_in_subnets(ip_addr: str, subnets: str) -> bool:
- """Determine if the ip_addr belongs to any of the subnets list."""
- subnet_list = [x.strip() for x in subnets.split(',')]
- for subnet in subnet_list:
- ip_address = unwrap_ipv6(ip_addr) if is_ipv6(ip_addr) else ip_addr
- if ipaddress.ip_address(ip_address) in ipaddress.ip_network(subnet):
- return True
- return False
-
-
-def parse_mon_addrv(addrv_arg: str) -> List[EndPoint]:
- """Parse mon-addrv param into a list of mon end points."""
- r = re.compile(r':(\d+)$')
- addrv_args = []
- addr_arg = addrv_arg
- if addr_arg[0] != '[' or addr_arg[-1] != ']':
- raise Error(f'--mon-addrv value {addr_arg} must use square brackets')
-
- for addr in addr_arg[1: -1].split(','):
- hasport = r.findall(addr)
- if not hasport:
- raise Error(f'--mon-addrv value {addr_arg} must include port number')
- port_str = hasport[0]
- addr = re.sub(r'^v\d+:', '', addr) # strip off v1: or v2: prefix
- base_ip = addr[0:-(len(port_str)) - 1]
- addrv_args.append(EndPoint(base_ip, int(port_str)))
-
- return addrv_args
-
-
-def parse_mon_ip(mon_ip: str) -> List[EndPoint]:
- """Parse mon-ip param into a list of mon end points."""
- r = re.compile(r':(\d+)$')
- addrv_args = []
- hasport = r.findall(mon_ip)
- if hasport:
- port_str = hasport[0]
- base_ip = mon_ip[0:-(len(port_str)) - 1]
- addrv_args.append(EndPoint(base_ip, int(port_str)))
- else:
- # No port provided: use fixed ports for ceph monitor
- addrv_args.append(EndPoint(mon_ip, 3300))
- addrv_args.append(EndPoint(mon_ip, 6789))
-
- return addrv_args
-
-
-def build_addrv_params(addrv: List[EndPoint]) -> str:
- """Convert mon end-points (ip:port) into the format: [v[1|2]:ip:port1]"""
- if len(addrv) > 2:
- raise Error('Detected a local mon-addrv list with more than 2 entries.')
- port_to_ver: Dict[int, str] = {6789: 'v1', 3300: 'v2'}
- addr_arg_list: List[str] = []
- for ep in addrv:
- if ep.port in port_to_ver:
- ver = port_to_ver[ep.port]
- else:
- ver = 'v2' # default mon protocol version if port is not provided
- logger.warning(f'Using msgr2 protocol for unrecognized port {ep}')
- addr_arg_list.append(f'{ver}:{ep.ip}:{ep.port}')
-
- addr_arg = '[{0}]'.format(','.join(addr_arg_list))
- return addr_arg
-
-
def get_public_net_from_cfg(ctx: CephadmContext) -> Optional[str]:
"""Get mon public network from configuration file."""
cp = read_config(ctx.config)
@@ -5920,8 +4490,9 @@ def prepare_create_mon(
monmap_path: str
) -> Tuple[str, str]:
logger.info('Creating mon...')
- create_daemon_dirs(ctx, fsid, 'mon', mon_id, uid, gid)
- mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', mon_id)
+ ident = DaemonIdentity(fsid, 'mon', mon_id)
+ create_daemon_dirs(ctx, ident, uid, gid)
+ mon_dir = ident.data_dir(ctx.data_dir)
log_dir = get_log_dir(fsid, ctx.log_dir)
out = CephContainer(
ctx,
@@ -5934,7 +4505,7 @@ def prepare_create_mon(
'-c', '/dev/null',
'--monmap', '/tmp/monmap',
'--keyring', '/tmp/keyring',
- ] + get_daemon_args(ctx, fsid, 'mon', mon_id),
+ ] + Ceph.create(ctx, ident).get_daemon_args(),
volume_mounts={
log_dir: '/var/log/ceph:z',
mon_dir: '/var/lib/ceph/mon/ceph-%s:z' % (mon_id),
@@ -5951,10 +4522,10 @@ def create_mon(
uid: int, gid: int,
fsid: str, mon_id: str
) -> None:
- mon_c = get_container(ctx, fsid, 'mon', mon_id)
+ ident = DaemonIdentity(fsid, 'mon', mon_id)
+ mon_c = get_container(ctx, ident)
ctx.meta_properties = {'service_name': 'mon'}
- deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid,
- config=None, keyring=None)
+ deploy_daemon(ctx, ident, mon_c, uid, gid)
def wait_for_mon(
@@ -5997,14 +4568,23 @@ def create_mgr(
) -> None:
logger.info('Creating mgr...')
mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key)
- mgr_c = get_container(ctx, fsid, 'mgr', mgr_id)
+ ident = DaemonIdentity(fsid, 'mgr', mgr_id)
+ mgr_c = get_container(ctx, ident)
# Note:the default port used by the Prometheus node exporter is opened in fw
ctx.meta_properties = {'service_name': 'mgr'}
endpoints = [EndPoint('0.0.0.0', 9283), EndPoint('0.0.0.0', 8765)]
if not ctx.skip_monitoring_stack:
endpoints.append(EndPoint('0.0.0.0', 8443))
- deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid,
- config=config, keyring=mgr_keyring, endpoints=endpoints)
+ deploy_daemon(
+ ctx,
+ ident,
+ mgr_c,
+ uid,
+ gid,
+ config=config,
+ keyring=mgr_keyring,
+ endpoints=endpoints,
+ )
# wait for the service to become available
logger.info('Waiting for mgr to start...')
@@ -6429,26 +5009,26 @@ def rollback(func: FuncT) -> FuncT:
raise
except (KeyboardInterrupt, Exception) as e:
logger.error(f'{type(e).__name__}: {e}')
- if ctx.cleanup_on_failure:
+ if ctx.no_cleanup_on_failure:
logger.info('\n\n'
'\t***************\n'
- '\tCephadm hit an issue during cluster installation. Current cluster files will be deleted automatically,\n'
- '\tto disable this behaviour you can pass the --no-cleanup-on-failure flag. In case of any previous\n'
- '\tbroken installation user must use the following command to completely delete the broken cluster:\n\n'
- '\t> cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
+ '\tCephadm hit an issue during cluster installation. Current cluster files will NOT BE DELETED automatically. To change\n'
+ '\tthis behaviour do not pass the --no-cleanup-on-failure flag. To remove this broken cluster manually please run:\n\n'
+ f'\t > cephadm rm-cluster --force --fsid {ctx.fsid}\n\n'
+ '\tin case of any previous broken installation, users must use the rm-cluster command to delete the broken cluster:\n\n'
+ '\t > cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
'\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n'
'\t***************\n\n')
- _rm_cluster(ctx, keep_logs=False, zap_osds=False)
else:
logger.info('\n\n'
'\t***************\n'
- '\tCephadm hit an issue during cluster installation. Current cluster files will NOT BE DELETED automatically to change\n'
- '\tthis behaviour you can pass the --cleanup-on-failure. To remove this broken cluster manually please run:\n\n'
- f'\t > cephadm rm-cluster --force --fsid {ctx.fsid}\n\n'
- '\tin case of any previous broken installation user must use the rm-cluster command to delete the broken cluster:\n\n'
- '\t > cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
+ '\tCephadm hit an issue during cluster installation. Current cluster files will be deleted automatically.\n'
+ '\tTo disable this behaviour you can pass the --no-cleanup-on-failure flag. In case of any previous\n'
+ '\tbroken installation, users must use the following command to completely delete the broken cluster:\n\n'
+ '\t> cephadm rm-cluster --force --zap-osds --fsid <fsid>\n\n'
'\tfor more information please refer to https://docs.ceph.com/en/latest/cephadm/operations/#purging-a-cluster\n'
'\t***************\n\n')
+ _rm_cluster(ctx, keep_logs=False, zap_osds=False)
raise
return cast(FuncT, _rollback)
@@ -6620,6 +5200,16 @@ def command_bootstrap(ctx):
'-i', '/var/lib/ceph/user.conf'],
{tmp.name: '/var/lib/ceph/user.conf:z'})
+ if getattr(ctx, 'log_dest', None):
+ ldkey = 'mgr/cephadm/cephadm_log_destination'
+ cp = read_config(ctx.config)
+ if cp.has_option('mgr', ldkey):
+ logger.info('The cephadm log destination is set by the config file, ignoring cli option')
+ else:
+ value = ','.join(sorted(getattr(ctx, 'log_dest')))
+ logger.info('Setting cephadm log destination to match logging for cephadm boostrap: %s', value)
+ cli(['config', 'set', 'mgr', ldkey, value, '--force'])
+
# wait for mgr to restart (after enabling a module)
def wait_for_mgr_restart() -> None:
# first get latest mgrmap epoch from the mon. try newer 'mgr
@@ -6717,6 +5307,7 @@ def command_bootstrap(ctx):
def command_registry_login(ctx: CephadmContext) -> int:
+ logger.info('Logging into custom registry.')
if ctx.registry_json:
logger.info('Pulling custom registry login info from %s.' % ctx.registry_json)
d = get_parm(ctx.registry_json)
@@ -6742,91 +5333,47 @@ def command_registry_login(ctx: CephadmContext) -> int:
return 0
-def registry_login(ctx: CephadmContext, url: Optional[str], username: Optional[str], password: Optional[str]) -> None:
- logger.info('Logging into custom registry.')
- try:
- engine = ctx.container_engine
- cmd = [engine.path, 'login',
- '-u', username, '-p', password,
- url]
- if isinstance(engine, Podman):
- cmd.append('--authfile=/etc/ceph/podman-auth.json')
- out, _, _ = call_throws(ctx, cmd)
- if isinstance(engine, Podman):
- os.chmod('/etc/ceph/podman-auth.json', DEFAULT_MODE)
- except Exception:
- raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username))
-
##################################
-def extract_uid_gid_monitoring(ctx, daemon_type):
- # type: (CephadmContext, str) -> Tuple[int, int]
-
- if daemon_type == 'prometheus':
- uid, gid = extract_uid_gid(ctx, file_path='/etc/prometheus')
- elif daemon_type == 'node-exporter':
- uid, gid = 65534, 65534
- elif daemon_type == 'grafana':
- uid, gid = extract_uid_gid(ctx, file_path='/var/lib/grafana')
- elif daemon_type == 'loki':
- uid, gid = extract_uid_gid(ctx, file_path='/etc/loki')
- elif daemon_type == 'promtail':
- uid, gid = extract_uid_gid(ctx, file_path='/etc/promtail')
- elif daemon_type == 'alertmanager':
- uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus'])
- else:
- raise Error('{} not implemented yet'.format(daemon_type))
- return uid, gid
-
-
-def get_deployment_container(ctx: CephadmContext,
- fsid: str, daemon_type: str, daemon_id: Union[int, str],
- privileged: bool = False,
- ptrace: bool = False,
- container_args: Optional[List[str]] = None) -> 'CephContainer':
- # wrapper for get_container specifically for containers made during the `cephadm deploy`
- # command. Adds some extra things such as extra container args and custom config files
- c = get_container(ctx, fsid, daemon_type, daemon_id, privileged, ptrace, container_args)
+def to_deployment_container(
+ ctx: CephadmContext, ctr: CephContainer
+) -> CephContainer:
+ """Given a standard ceph container instance return a CephContainer
+ prepared for a deployment as a daemon, having the extra args and
+ custom configurations added.
+ NOTE: The `ctr` object is mutated before being returned.
+ """
if 'extra_container_args' in ctx and ctx.extra_container_args:
- c.container_args.extend(ctx.extra_container_args)
+ ctr.container_args.extend(ctx.extra_container_args)
if 'extra_entrypoint_args' in ctx and ctx.extra_entrypoint_args:
- c.args.extend(ctx.extra_entrypoint_args)
+ ctr.args.extend(ctx.extra_entrypoint_args)
ccfiles = fetch_custom_config_files(ctx)
if ccfiles:
mandatory_keys = ['mount_path', 'content']
for conf in ccfiles:
if all(k in conf for k in mandatory_keys):
mount_path = conf['mount_path']
+ assert ctr.identity
file_path = os.path.join(
ctx.data_dir,
- fsid,
+ ctr.identity.fsid,
'custom_config_files',
- f'{daemon_type}.{daemon_id}',
+ ctr.identity.daemon_name,
os.path.basename(mount_path)
)
- c.volume_mounts[file_path] = mount_path
- return c
+ ctr.volume_mounts[file_path] = mount_path
+ return ctr
-def get_deployment_init_containers(
- ctx: CephadmContext,
- primary_container: 'CephContainer',
-) -> List['InitContainer']:
- init_containers: List[Dict[str, Any]] = getattr(ctx, 'init_containers', [])
- return [
- InitContainer.from_primary_and_opts(ctx, primary_container, ic_opts)
- for ic_opts in init_containers
- ]
-
-
-def get_deployment_type(ctx: CephadmContext, daemon_type: str, daemon_id: str) -> DeploymentType:
+def get_deployment_type(
+ ctx: CephadmContext, ident: 'DaemonIdentity',
+) -> DeploymentType:
deployment_type: DeploymentType = DeploymentType.DEFAULT
if ctx.reconfig:
deployment_type = DeploymentType.RECONFIG
- unit_name = get_unit_name(ctx.fsid, daemon_type, daemon_id)
- (_, state, _) = check_unit(ctx, unit_name)
- if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ctx.fsid, daemon_type, daemon_id, 'bash')):
+ (_, state, _) = check_unit(ctx, ident.unit_name)
+ if state == 'running' or is_container_running(ctx, CephContainer.for_daemon(ctx, ident, 'bash')):
# if reconfig was set, that takes priority over redeploy. If
# this is considered a fresh deployment at this stage,
# mark it as a redeploy to avoid port checking
@@ -6845,20 +5392,6 @@ def command_deploy(ctx):
_common_deploy(ctx)
-def read_configuration_source(ctx: CephadmContext) -> Dict[str, Any]:
- """Read a JSON configuration based on the `ctx.source` value."""
- source = '-'
- if 'source' in ctx and ctx.source:
- source = ctx.source
- if source == '-':
- config_data = json.load(sys.stdin)
- else:
- with open(source, 'rb') as fh:
- config_data = json.load(fh)
- logger.debug('Loaded deploy configuration: %r', config_data)
- return config_data
-
-
def apply_deploy_config_to_ctx(
config_data: Dict[str, Any],
ctx: CephadmContext,
@@ -6897,173 +5430,75 @@ def command_deploy_from(ctx: CephadmContext) -> None:
configuration parameters from an input JSON configuration file.
"""
config_data = read_configuration_source(ctx)
+ logger.debug('Loaded deploy configuration: %r', config_data)
apply_deploy_config_to_ctx(config_data, ctx)
_common_deploy(ctx)
def _common_deploy(ctx: CephadmContext) -> None:
- daemon_type, daemon_id = ctx.name.split('.', 1)
- if daemon_type not in get_supported_daemons():
- raise Error('daemon type %s not recognized' % daemon_type)
+ ident = DaemonIdentity.from_context(ctx)
+ if ident.daemon_type not in get_supported_daemons():
+ raise Error('daemon type %s not recognized' % ident.daemon_type)
lock = FileLock(ctx, ctx.fsid)
lock.acquire()
- deployment_type = get_deployment_type(ctx, daemon_type, daemon_id)
+ deployment_type = get_deployment_type(ctx, ident)
# Migrate sysctl conf files from /usr/lib to /etc
migrate_sysctl_dir(ctx, ctx.fsid)
# Get and check ports explicitly required to be opened
- endpoints = fetch_tcp_ports(ctx)
- _dispatch_deploy(ctx, daemon_type, daemon_id, endpoints, deployment_type)
-
+ endpoints = fetch_endpoints(ctx)
-def _dispatch_deploy(
- ctx: CephadmContext,
- daemon_type: str,
- daemon_id: str,
- daemon_endpoints: List[EndPoint],
- deployment_type: DeploymentType,
-) -> None:
- if daemon_type in Ceph.daemons:
- config, keyring = get_config_and_keyring(ctx)
- uid, gid = extract_uid_gid(ctx)
- make_var_run(ctx, ctx.fsid, uid, gid)
-
- config_json = fetch_configs(ctx)
-
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
- ptrace=ctx.allow_ptrace)
-
- if daemon_type == 'mon' and config_json is not None:
- if 'crush_location' in config_json:
- c_loc = config_json['crush_location']
- # was originally "c.args.extend(['--set-crush-location', c_loc])"
- # but that doesn't seem to persist in the object after it's passed
- # in further function calls
- c.args = c.args + ['--set-crush-location', c_loc]
-
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- config=config, keyring=keyring,
- osd_fsid=ctx.osd_fsid,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
-
- elif daemon_type in Monitoring.components:
- # monitoring daemon - prometheus, grafana, alertmanager, node-exporter
- # Default Checks
- # make sure provided config-json is sufficient
- config = fetch_configs(ctx) # type: ignore
- required_files = Monitoring.components[daemon_type].get('config-json-files', list())
- required_args = Monitoring.components[daemon_type].get('config-json-args', list())
- if required_files:
- if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore
- raise Error('{} deployment requires config-json which must '
- 'contain file content for {}'.format(daemon_type.capitalize(), ', '.join(required_files)))
- if required_args:
- if not config or not all(c in config.keys() for c in required_args): # type: ignore
- raise Error('{} deployment requires config-json which must '
- 'contain arg for {}'.format(daemon_type.capitalize(), ', '.join(required_args)))
-
- uid, gid = extract_uid_gid_monitoring(ctx, daemon_type)
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
-
- elif daemon_type == NFSGanesha.daemon_type:
- # only check ports if this is a fresh deployment
- if deployment_type == DeploymentType.DEFAULT and not daemon_endpoints:
- nfs_ports = list(NFSGanesha.port_map.values())
- daemon_endpoints = [EndPoint('0.0.0.0', p) for p in nfs_ports]
-
- config, keyring = get_config_and_keyring(ctx)
- # TODO: extract ganesha uid/gid (997, 994) ?
- uid, gid = extract_uid_gid(ctx)
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- config=config, keyring=keyring,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
-
- elif daemon_type == CephIscsi.daemon_type:
- config, keyring = get_config_and_keyring(ctx)
- uid, gid = extract_uid_gid(ctx)
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- config=config, keyring=keyring,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
- elif daemon_type == CephNvmeof.daemon_type:
- config, keyring = get_config_and_keyring(ctx)
- uid, gid = 167, 167 # TODO: need to get properly the uid/gid
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- config=config, keyring=keyring,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
- elif daemon_type in Tracing.components:
- uid, gid = 65534, 65534
- c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
- elif daemon_type == HAproxy.daemon_type:
- haproxy = HAproxy.init(ctx, ctx.fsid, daemon_id)
- uid, gid = haproxy.extract_uid_gid_haproxy()
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
-
- elif daemon_type == Keepalived.daemon_type:
- keepalived = Keepalived.init(ctx, ctx.fsid, daemon_id)
- uid, gid = keepalived.extract_uid_gid_keepalived()
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
-
- elif daemon_type == CustomContainer.daemon_type:
- cc = CustomContainer.init(ctx, ctx.fsid, daemon_id)
- # only check ports if this is a fresh deployment
- if deployment_type == DeploymentType.DEFAULT:
- daemon_endpoints.extend([EndPoint('0.0.0.0', p) for p in cc.ports])
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id,
- privileged=cc.privileged,
- ptrace=ctx.allow_ptrace)
- ics = get_deployment_init_containers(
- ctx,
- c,
- )
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
- uid=cc.uid, gid=cc.gid, config=None,
- keyring=None,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints,
- init_containers=ics)
-
- elif daemon_type == CephadmAgent.daemon_type:
+ if ident.daemon_type == CephadmAgent.daemon_type:
# get current user gid and uid
uid = os.getuid()
gid = os.getgid()
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, None,
- uid, gid,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
-
- elif daemon_type == SNMPGateway.daemon_type:
- sc = SNMPGateway.init(ctx, ctx.fsid, daemon_id)
- c = get_deployment_container(ctx, ctx.fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, ctx.fsid, daemon_type, daemon_id, c,
- sc.uid, sc.gid,
- deployment_type=deployment_type,
- endpoints=daemon_endpoints)
+ deploy_daemon(
+ ctx,
+ ident,
+ None,
+ uid,
+ gid,
+ deployment_type=deployment_type,
+ endpoints=endpoints,
+ )
else:
- raise Error('daemon type {} not implemented in command_deploy function'
- .format(daemon_type))
+ try:
+ _deploy_daemon_container(ctx, ident, endpoints, deployment_type)
+ except UnexpectedDaemonTypeError:
+ raise Error('daemon type {} not implemented in command_deploy function'
+ .format(ident.daemon_type))
+
+
+def _deploy_daemon_container(
+ ctx: CephadmContext,
+ ident: 'DaemonIdentity',
+ daemon_endpoints: List[EndPoint],
+ deployment_type: DeploymentType,
+) -> None:
+ daemon = daemon_form_create(ctx, ident)
+ assert isinstance(daemon, ContainerDaemonForm)
+ daemon.customize_container_endpoints(daemon_endpoints, deployment_type)
+ ctr = daemon.container(ctx)
+ ics = daemon.init_containers(ctx)
+ config, keyring = daemon.config_and_keyring(ctx)
+ uid, gid = daemon.uid_gid(ctx)
+ deploy_daemon(
+ ctx,
+ ident,
+ ctr,
+ uid,
+ gid,
+ config=config,
+ keyring=keyring,
+ deployment_type=deployment_type,
+ endpoints=daemon_endpoints,
+ osd_fsid=daemon.osd_fsid,
+ init_containers=ics,
+ )
##################################
@@ -7071,8 +5506,7 @@ def _dispatch_deploy(
@infer_image
def command_run(ctx):
# type: (CephadmContext) -> int
- (daemon_type, daemon_id) = ctx.name.split('.', 1)
- c = get_container(ctx, ctx.fsid, daemon_type, daemon_id)
+ c = get_container(ctx, DaemonIdentity.from_context(ctx))
command = c.run_cmd()
return call_timeout(ctx, command, ctx.timeout)
@@ -7100,7 +5534,7 @@ def command_shell(ctx):
daemon_type = 'osd' # get the most mounts
daemon_id = None
- if ctx.fsid and daemon_type in Ceph.daemons:
+ if ctx.fsid and daemon_type in ceph_daemons():
make_log_dir(ctx, ctx.fsid)
if daemon_id and not ctx.fsid:
@@ -7118,9 +5552,15 @@ def command_shell(ctx):
ctx.keyring = CEPH_DEFAULT_KEYRING
container_args: List[str] = ['-i']
- mounts = get_container_mounts(ctx, ctx.fsid, daemon_type, daemon_id,
- no_config=True if ctx.config else False)
- binds = get_container_binds(ctx, ctx.fsid, daemon_type, daemon_id)
+ if ctx.fsid and daemon_id:
+ ident = DaemonIdentity(ctx.fsid, daemon_type, daemon_id)
+ mounts = get_container_mounts(
+ ctx, ident, no_config=bool(ctx.config),
+ )
+ binds = get_container_binds(ctx, ident)
+ else:
+ mounts = get_container_mounts_for_type(ctx, ctx.fsid, daemon_type)
+ binds = []
if ctx.config:
mounts[pathify(ctx.config)] = '/etc/ceph/ceph.conf:z'
if ctx.keyring:
@@ -7174,6 +5614,10 @@ def command_shell(ctx):
privileged=True)
command = c.shell_cmd(command)
+ if ctx.dry_run:
+ print(' '.join(shlex.quote(arg) for arg in command))
+ return 0
+
return call_timeout(ctx, command, ctx.timeout)
##################################
@@ -7225,7 +5669,7 @@ def command_ceph_volume(ctx):
lock.acquire()
(uid, gid) = (0, 0) # ceph-volume runs as root
- mounts = get_container_mounts(ctx, ctx.fsid, 'osd', None)
+ mounts = get_container_mounts_for_type(ctx, ctx.fsid, 'osd')
tmp_config = None
tmp_keyring = None
@@ -7256,6 +5700,24 @@ def command_ceph_volume(ctx):
##################################
+def command_unit_install(ctx):
+ # type: (CephadmContext) -> int
+ if not ctx.fsid:
+ raise Error('must pass --fsid to specify cluster')
+
+ fsid = ctx.fsid
+ install_base_units(ctx, fsid)
+ unit = get_unit_file(ctx, fsid)
+ unit_file = 'ceph-%s@.service' % (fsid)
+ with open(ctx.unit_dir + '/' + unit_file + '.new', 'w') as f:
+ f.write(unit)
+ os.rename(ctx.unit_dir + '/' + unit_file + '.new',
+ ctx.unit_dir + '/' + unit_file)
+ call_throws(ctx, ['systemctl', 'daemon-reload'])
+
+ return 0
+
+
@infer_fsid
def command_unit(ctx):
# type: (CephadmContext) -> int
@@ -7296,97 +5758,6 @@ def command_logs(ctx):
##################################
-def list_networks(ctx):
- # type: (CephadmContext) -> Dict[str,Dict[str, Set[str]]]
-
- # sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag,
- # so we'll need to use a regex to parse 'ip' command output.
- #
- # out, _, _ = call_throws(['ip', '-j', 'route', 'ls'])
- # j = json.loads(out)
- # for x in j:
- res = _list_ipv4_networks(ctx)
- res.update(_list_ipv6_networks(ctx))
- return res
-
-
-def _list_ipv4_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
- execstr: Optional[str] = find_executable('ip')
- if not execstr:
- raise FileNotFoundError("unable to find 'ip' command")
- out, _, _ = call_throws(ctx, [execstr, 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
- return _parse_ipv4_route(out)
-
-
-def _parse_ipv4_route(out: str) -> Dict[str, Dict[str, Set[str]]]:
- r = {} # type: Dict[str, Dict[str, Set[str]]]
- p = re.compile(r'^(\S+) (?:via \S+)? ?dev (\S+) (.*)scope link (.*)src (\S+)')
- for line in out.splitlines():
- m = p.findall(line)
- if not m:
- continue
- net = m[0][0]
- if '/' not in net: # aggregate /32 mask for single host sub-networks
- net += '/32'
- iface = m[0][1]
- ip = m[0][4]
- if net not in r:
- r[net] = {}
- if iface not in r[net]:
- r[net][iface] = set()
- r[net][iface].add(ip)
- return r
-
-
-def _list_ipv6_networks(ctx: CephadmContext) -> Dict[str, Dict[str, Set[str]]]:
- execstr: Optional[str] = find_executable('ip')
- if not execstr:
- raise FileNotFoundError("unable to find 'ip' command")
- routes, _, _ = call_throws(ctx, [execstr, '-6', 'route', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
- ips, _, _ = call_throws(ctx, [execstr, '-6', 'addr', 'ls'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
- return _parse_ipv6_route(routes, ips)
-
-
-def _parse_ipv6_route(routes: str, ips: str) -> Dict[str, Dict[str, Set[str]]]:
- r = {} # type: Dict[str, Dict[str, Set[str]]]
- route_p = re.compile(r'^(\S+) dev (\S+) proto (\S+) metric (\S+) .*pref (\S+)$')
- ip_p = re.compile(r'^\s+inet6 (\S+)/(.*)scope (.*)$')
- iface_p = re.compile(r'^(\d+): (\S+): (.*)$')
- for line in routes.splitlines():
- m = route_p.findall(line)
- if not m or m[0][0].lower() == 'default':
- continue
- net = m[0][0]
- if '/' not in net: # aggregate /128 mask for single host sub-networks
- net += '/128'
- iface = m[0][1]
- if iface == 'lo': # skip loopback devices
- continue
- if net not in r:
- r[net] = {}
- if iface not in r[net]:
- r[net][iface] = set()
-
- iface = None
- for line in ips.splitlines():
- m = ip_p.findall(line)
- if not m:
- m = iface_p.findall(line)
- if m:
- # drop @... suffix, if present
- iface = m[0][1].split('@')[0]
- continue
- ip = m[0][0]
- # find the network it belongs to
- net = [n for n in r.keys()
- if ipaddress.ip_address(ip) in ipaddress.ip_network(n)]
- if net and iface in r[net[0]]:
- assert iface
- r[net[0]][iface].add(ip)
-
- return r
-
-
def command_list_networks(ctx):
# type: (CephadmContext) -> None
r = list_networks(ctx)
@@ -7406,27 +5777,6 @@ def command_ls(ctx):
print(json.dumps(ls, indent=4))
-def with_units_to_int(v: str) -> int:
- if v.endswith('iB'):
- v = v[:-2]
- elif v.endswith('B'):
- v = v[:-1]
- mult = 1
- if v[-1].upper() == 'K':
- mult = 1024
- v = v[:-1]
- elif v[-1].upper() == 'M':
- mult = 1024 * 1024
- v = v[:-1]
- elif v[-1].upper() == 'G':
- mult = 1024 * 1024 * 1024
- v = v[:-1]
- elif v[-1].upper() == 'T':
- mult = 1024 * 1024 * 1024 * 1024
- v = v[:-1]
- return int(float(v) * mult)
-
-
def list_daemons(ctx, detail=True, legacy_dir=None):
# type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]]
host_version: Optional[str] = None
@@ -7553,7 +5903,7 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
if daemon_type == CephNvmeof.daemon_type:
version = CephNvmeof.get_version(ctx, container_id)
elif not version:
- if daemon_type in Ceph.daemons:
+ if daemon_type in ceph_daemons():
out, err, code = call(ctx,
[container_path, 'exec', container_id,
'ceph', '-v'],
@@ -7584,7 +5934,8 @@ def list_daemons(ctx, detail=True, legacy_dir=None):
'haproxy', '-v'],
verbosity=CallVerbosity.QUIET)
if not code and \
- out.startswith('HA-Proxy version '):
+ out.startswith('HA-Proxy version ') or \
+ out.startswith('HAProxy version '):
version = out.split(' ')[2]
seen_versions[image_id] = version
elif daemon_type == 'keepalived':
@@ -7693,7 +6044,9 @@ def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None):
def get_container_stats(ctx: CephadmContext, container_path: str, fsid: str, daemon_type: str, daemon_id: str) -> Tuple[str, str, int]:
- c = CephContainer.for_daemon(ctx, fsid, daemon_type, daemon_id, 'bash')
+ c = CephContainer.for_daemon(
+ ctx, DaemonIdentity(fsid, daemon_type, daemon_id), 'bash'
+ )
out, err, code = '', '', -1
for name in (c.cname, c.old_cname):
cmd = [
@@ -7739,7 +6092,7 @@ def command_adopt(ctx):
lock.acquire()
# call correct adoption
- if daemon_type in Ceph.daemons:
+ if daemon_type in ceph_daemons():
command_adopt_ceph(ctx, daemon_type, daemon_id, fsid)
elif daemon_type == 'prometheus':
command_adopt_prometheus(ctx, daemon_id, fsid)
@@ -7905,8 +6258,12 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid):
# data
logger.info('Moving data...')
- data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
- uid=uid, gid=gid)
+ data_dir_dst = make_data_dir(
+ ctx,
+ DaemonIdentity(fsid, daemon_type, daemon_id),
+ uid=uid,
+ gid=gid,
+ )
move_files(ctx, glob(os.path.join(data_dir_src, '*')),
data_dir_dst,
uid=uid, gid=gid)
@@ -7974,18 +6331,25 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid):
logger.info('Creating new units...')
make_var_run(ctx, fsid, uid, gid)
- c = get_container(ctx, fsid, daemon_type, daemon_id)
- deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c,
- enable=True, # unconditionally enable the new unit
- start=(state == 'running' or ctx.force_start),
- osd_fsid=osd_fsid)
- update_firewalld(ctx, daemon_type)
+ ident = DaemonIdentity(fsid, daemon_type, daemon_id)
+ c = get_container(ctx, ident)
+ deploy_daemon_units(
+ ctx,
+ ident,
+ uid,
+ gid,
+ c,
+ enable=True, # unconditionally enable the new unit
+ start=(state == 'running' or ctx.force_start),
+ osd_fsid=osd_fsid,
+ )
+ update_firewalld(ctx, daemon_form_create(ctx, ident))
def command_adopt_prometheus(ctx, daemon_id, fsid):
# type: (CephadmContext, str, str) -> None
daemon_type = 'prometheus'
- (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+ (uid, gid) = Monitoring.extract_uid_gid(ctx, daemon_type)
# should try to set the ports we know cephadm defaults
# to for these services in the firewall.
ports = Monitoring.port_map['prometheus']
@@ -7993,8 +6357,12 @@ def command_adopt_prometheus(ctx, daemon_id, fsid):
_stop_and_disable(ctx, 'prometheus')
- data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
- uid=uid, gid=gid)
+ data_dir_dst = make_data_dir(
+ ctx,
+ DaemonIdentity(fsid, daemon_type, daemon_id),
+ uid=uid,
+ gid=gid,
+ )
# config
config_src = '/etc/prometheus/prometheus.yml'
@@ -8010,17 +6378,25 @@ def command_adopt_prometheus(ctx, daemon_id, fsid):
copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid)
make_var_run(ctx, fsid, uid, gid)
- c = get_container(ctx, fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
- update_firewalld(ctx, daemon_type)
+ ident = DaemonIdentity(fsid, daemon_type, daemon_id)
+ c = get_container(ctx, ident)
+ deploy_daemon(
+ ctx,
+ ident,
+ c,
+ uid,
+ gid,
+ deployment_type=DeploymentType.REDEPLOY,
+ endpoints=endpoints,
+ )
+ update_firewalld(ctx, daemon_form_create(ctx, ident))
def command_adopt_grafana(ctx, daemon_id, fsid):
# type: (CephadmContext, str, str) -> None
daemon_type = 'grafana'
- (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+ (uid, gid) = Monitoring.extract_uid_gid(ctx, daemon_type)
# should try to set the ports we know cephadm defaults
# to for these services in the firewall.
ports = Monitoring.port_map['grafana']
@@ -8028,8 +6404,13 @@ def command_adopt_grafana(ctx, daemon_id, fsid):
_stop_and_disable(ctx, 'grafana-server')
- data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
- uid=uid, gid=gid)
+ ident = DaemonIdentity(fsid, daemon_type, daemon_id)
+ data_dir_dst = make_data_dir(
+ ctx,
+ ident,
+ uid=uid,
+ gid=gid,
+ )
# config
config_src = '/etc/grafana/grafana.ini'
@@ -8069,17 +6450,24 @@ def command_adopt_grafana(ctx, daemon_id, fsid):
copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid)
make_var_run(ctx, fsid, uid, gid)
- c = get_container(ctx, fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
- update_firewalld(ctx, daemon_type)
+ c = get_container(ctx, ident)
+ deploy_daemon(
+ ctx,
+ ident,
+ c,
+ uid,
+ gid,
+ deployment_type=DeploymentType.REDEPLOY,
+ endpoints=endpoints,
+ )
+ update_firewalld(ctx, daemon_form_create(ctx, ident))
def command_adopt_alertmanager(ctx, daemon_id, fsid):
# type: (CephadmContext, str, str) -> None
daemon_type = 'alertmanager'
- (uid, gid) = extract_uid_gid_monitoring(ctx, daemon_type)
+ (uid, gid) = Monitoring.extract_uid_gid(ctx, daemon_type)
# should try to set the ports we know cephadm defaults
# to for these services in the firewall.
ports = Monitoring.port_map['alertmanager']
@@ -8087,8 +6475,13 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid):
_stop_and_disable(ctx, 'prometheus-alertmanager')
- data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id,
- uid=uid, gid=gid)
+ ident = DaemonIdentity(fsid, daemon_type, daemon_id)
+ data_dir_dst = make_data_dir(
+ ctx,
+ ident,
+ uid=uid,
+ gid=gid,
+ )
# config
config_src = '/etc/prometheus/alertmanager.yml'
@@ -8104,10 +6497,17 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid):
copy_tree(ctx, [data_src], data_dst, uid=uid, gid=gid)
make_var_run(ctx, fsid, uid, gid)
- c = get_container(ctx, fsid, daemon_type, daemon_id)
- deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid,
- deployment_type=DeploymentType.REDEPLOY, endpoints=endpoints)
- update_firewalld(ctx, daemon_type)
+ c = get_container(ctx, ident)
+ deploy_daemon(
+ ctx,
+ ident,
+ c,
+ uid,
+ gid,
+ deployment_type=DeploymentType.REDEPLOY,
+ endpoints=endpoints,
+ )
+ update_firewalld(ctx, daemon_form_create(ctx, ident))
def _adjust_grafana_ini(filename):
@@ -8174,7 +6574,8 @@ def command_rm_daemon(ctx):
call(ctx, ['rm', '-rf', rgw_asok_path],
verbosity=CallVerbosity.DEBUG)
- data_dir = get_data_dir(ctx.fsid, ctx.data_dir, daemon_type, daemon_id)
+ ident = DaemonIdentity(ctx.fsid, daemon_type, daemon_id)
+ data_dir = ident.data_dir(ctx.data_dir)
if daemon_type in ['mon', 'osd', 'prometheus'] and \
not ctx.force_delete_data:
# rename it out of the way -- do not delete
@@ -8188,7 +6589,7 @@ def command_rm_daemon(ctx):
else:
call_throws(ctx, ['rm', '-rf', data_dir])
- endpoints = fetch_tcp_ports(ctx)
+ endpoints = fetch_endpoints(ctx)
ports: List[int] = [e.port for e in endpoints]
if ports:
try:
@@ -8205,7 +6606,9 @@ def command_rm_daemon(ctx):
def _zap(ctx: CephadmContext, what: str) -> None:
- mounts = get_container_mounts(ctx, ctx.fsid, 'clusterless-ceph-volume', None)
+ mounts = get_container_mounts_for_type(
+ ctx, ctx.fsid, 'clusterless-ceph-volume'
+ )
c = get_ceph_volume_container(ctx,
args=['lvm', 'zap', '--destroy', what],
volume_mounts=mounts,
@@ -8219,7 +6622,9 @@ def _zap_osds(ctx: CephadmContext) -> None:
# assume fsid lock already held
# list
- mounts = get_container_mounts(ctx, ctx.fsid, 'clusterless-ceph-volume', None)
+ mounts = get_container_mounts_for_type(
+ ctx, ctx.fsid, 'clusterless-ceph-volume'
+ )
c = get_ceph_volume_container(ctx,
args=['inventory', '--format', 'json'],
volume_mounts=mounts,
@@ -8291,7 +6696,7 @@ def _rm_cluster(ctx: CephadmContext, keep_logs: bool, zap_osds: bool) -> None:
continue
if d['style'] != 'cephadm:v1':
continue
- disable_systemd_service(get_unit_name(ctx.fsid, d['name']))
+ disable_systemd_service('ceph-%s@%s' % (ctx.fsid, d['name']))
# cluster units
for unit_name in ['ceph-%s.target' % ctx.fsid]:
@@ -8422,152 +6827,6 @@ def command_check_host(ctx: CephadmContext) -> None:
##################################
-def get_ssh_vars(ssh_user: str) -> Tuple[int, int, str]:
- try:
- s_pwd = pwd.getpwnam(ssh_user)
- except KeyError:
- raise Error('Cannot find uid/gid for ssh-user: %s' % (ssh_user))
-
- ssh_uid = s_pwd.pw_uid
- ssh_gid = s_pwd.pw_gid
- ssh_dir = os.path.join(s_pwd.pw_dir, '.ssh')
- return ssh_uid, ssh_gid, ssh_dir
-
-
-def authorize_ssh_key(ssh_pub_key: str, ssh_user: str) -> bool:
- """Authorize the public key for the provided ssh user"""
-
- def key_in_file(path: str, key: str) -> bool:
- if not os.path.exists(path):
- return False
- with open(path) as f:
- lines = f.readlines()
- for line in lines:
- if line.strip() == key.strip():
- return True
- return False
-
- logger.info(f'Adding key to {ssh_user}@localhost authorized_keys...')
- if ssh_pub_key is None or ssh_pub_key.isspace():
- raise Error('Trying to authorize an empty ssh key')
-
- ssh_pub_key = ssh_pub_key.strip()
- ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user)
- if not os.path.exists(ssh_dir):
- makedirs(ssh_dir, ssh_uid, ssh_gid, 0o700)
-
- auth_keys_file = '%s/authorized_keys' % ssh_dir
- if key_in_file(auth_keys_file, ssh_pub_key):
- logger.info(f'key already in {ssh_user}@localhost authorized_keys...')
- return False
-
- add_newline = False
- if os.path.exists(auth_keys_file):
- with open(auth_keys_file, 'r') as f:
- f.seek(0, os.SEEK_END)
- if f.tell() > 0:
- f.seek(f.tell() - 1, os.SEEK_SET) # go to last char
- if f.read() != '\n':
- add_newline = True
-
- with open(auth_keys_file, 'a') as f:
- os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it
- os.fchmod(f.fileno(), DEFAULT_MODE) # just in case we created it
- if add_newline:
- f.write('\n')
- f.write(ssh_pub_key + '\n')
-
- return True
-
-
-def revoke_ssh_key(key: str, ssh_user: str) -> None:
- """Revoke the public key authorization for the ssh user"""
- ssh_uid, ssh_gid, ssh_dir = get_ssh_vars(ssh_user)
- auth_keys_file = '%s/authorized_keys' % ssh_dir
- deleted = False
- if os.path.exists(auth_keys_file):
- with open(auth_keys_file, 'r') as f:
- lines = f.readlines()
- _, filename = tempfile.mkstemp()
- with open(filename, 'w') as f:
- os.fchown(f.fileno(), ssh_uid, ssh_gid)
- os.fchmod(f.fileno(), DEFAULT_MODE) # secure access to the keys file
- for line in lines:
- if line.strip() == key.strip():
- deleted = True
- else:
- f.write(line)
-
- if deleted:
- shutil.move(filename, auth_keys_file)
- else:
- logger.warning('Cannot find the ssh key to be deleted')
-
-
-def check_ssh_connectivity(ctx: CephadmContext) -> None:
-
- def cmd_is_available(cmd: str) -> bool:
- if shutil.which(cmd) is None:
- logger.warning(f'Command not found: {cmd}')
- return False
- return True
-
- if not cmd_is_available('ssh') or not cmd_is_available('ssh-keygen'):
- logger.warning('Cannot check ssh connectivity. Skipping...')
- return
-
- ssh_priv_key_path = ''
- ssh_pub_key_path = ''
- ssh_signed_cert_path = ''
- if ctx.ssh_private_key and ctx.ssh_public_key:
- # let's use the keys provided by the user
- ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
- ssh_pub_key_path = pathify(ctx.ssh_public_key.name)
- elif ctx.ssh_private_key and ctx.ssh_signed_cert:
- # CA signed keys use case
- ssh_priv_key_path = pathify(ctx.ssh_private_key.name)
- ssh_signed_cert_path = pathify(ctx.ssh_signed_cert.name)
- else:
- # no custom keys, let's generate some random keys just for this check
- ssh_priv_key_path = f'/tmp/ssh_key_{uuid.uuid1()}'
- ssh_pub_key_path = f'{ssh_priv_key_path}.pub'
- ssh_key_gen_cmd = ['ssh-keygen', '-q', '-t', 'rsa', '-N', '', '-C', '', '-f', ssh_priv_key_path]
- _, _, code = call(ctx, ssh_key_gen_cmd)
- if code != 0:
- logger.warning('Cannot generate keys to check ssh connectivity.')
- return
-
- if ssh_signed_cert_path:
- logger.info('Verification for CA signed keys authentication not implemented. Skipping ...')
- elif ssh_pub_key_path:
- logger.info('Verifying ssh connectivity using standard pubkey authentication ...')
- with open(ssh_pub_key_path, 'r') as f:
- key = f.read().strip()
- new_key = authorize_ssh_key(key, ctx.ssh_user)
- ssh_cfg_file_arg = ['-F', pathify(ctx.ssh_config.name)] if ctx.ssh_config else []
- _, _, code = call(ctx, ['ssh', '-o StrictHostKeyChecking=no',
- *ssh_cfg_file_arg, '-i', ssh_priv_key_path,
- '-o PasswordAuthentication=no',
- f'{ctx.ssh_user}@{get_hostname()}',
- 'sudo echo'])
-
- # we only remove the key if it's a new one. In case the user has provided
- # some already existing key then we don't alter authorized_keys file
- if new_key:
- revoke_ssh_key(key, ctx.ssh_user)
-
- pub_key_msg = '- The public key file configured by --ssh-public-key is valid\n' if ctx.ssh_public_key else ''
- prv_key_msg = '- The private key file configured by --ssh-private-key is valid\n' if ctx.ssh_private_key else ''
- ssh_cfg_msg = '- The ssh configuration file configured by --ssh-config is valid\n' if ctx.ssh_config else ''
- err_msg = f"""
-** Please verify your user's ssh configuration and make sure:
-- User {ctx.ssh_user} must have passwordless sudo access
-{pub_key_msg}{prv_key_msg}{ssh_cfg_msg}
-"""
- if code != 0:
- raise Error(err_msg)
-
-
def command_prepare_host(ctx: CephadmContext) -> None:
logger.info('Verifying podman|docker is present...')
pkg = None
@@ -8631,505 +6890,6 @@ class CustomValidation(argparse.Action):
##################################
-def get_distro():
- # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
- distro = None
- distro_version = None
- distro_codename = None
- with open('/etc/os-release', 'r') as f:
- for line in f.readlines():
- line = line.strip()
- if '=' not in line or line.startswith('#'):
- continue
- (var, val) = line.split('=', 1)
- if val[0] == '"' and val[-1] == '"':
- val = val[1:-1]
- if var == 'ID':
- distro = val.lower()
- elif var == 'VERSION_ID':
- distro_version = val.lower()
- elif var == 'VERSION_CODENAME':
- distro_codename = val.lower()
- return distro, distro_version, distro_codename
-
-
-class Packager(object):
- def __init__(self, ctx: CephadmContext,
- stable: Optional[str] = None, version: Optional[str] = None,
- branch: Optional[str] = None, commit: Optional[str] = None):
- assert \
- (stable and not version and not branch and not commit) or \
- (not stable and version and not branch and not commit) or \
- (not stable and not version and branch) or \
- (not stable and not version and not branch and not commit)
- self.ctx = ctx
- self.stable = stable
- self.version = version
- self.branch = branch
- self.commit = commit
-
- def validate(self) -> None:
- """Validate parameters before writing any state to disk."""
- pass
-
- def add_repo(self) -> None:
- raise NotImplementedError
-
- def rm_repo(self) -> None:
- raise NotImplementedError
-
- def install(self, ls: List[str]) -> None:
- raise NotImplementedError
-
- def install_podman(self) -> None:
- raise NotImplementedError
-
- def query_shaman(self, distro: str, distro_version: Any, branch: Optional[str], commit: Optional[str]) -> str:
- # query shaman
- logger.info('Fetching repo metadata from shaman and chacra...')
- shaman_url = 'https://shaman.ceph.com/api/repos/ceph/{branch}/{sha1}/{distro}/{distro_version}/repo/?arch={arch}'.format(
- distro=distro,
- distro_version=distro_version,
- branch=branch,
- sha1=commit or 'latest',
- arch=get_arch()
- )
- try:
- shaman_response = urlopen(shaman_url)
- except HTTPError as err:
- logger.error('repository not found in shaman (might not be available yet)')
- raise Error('%s, failed to fetch %s' % (err, shaman_url))
- chacra_url = ''
- try:
- chacra_url = shaman_response.geturl()
- chacra_response = urlopen(chacra_url)
- except HTTPError as err:
- logger.error('repository not found in chacra (might not be available yet)')
- raise Error('%s, failed to fetch %s' % (err, chacra_url))
- return chacra_response.read().decode('utf-8')
-
- def repo_gpgkey(self) -> Tuple[str, str]:
- if self.ctx.gpg_url:
- return self.ctx.gpg_url, 'manual'
- if self.stable or self.version:
- return 'https://download.ceph.com/keys/release.gpg', 'release'
- else:
- return 'https://download.ceph.com/keys/autobuild.gpg', 'autobuild'
-
- def enable_service(self, service: str) -> None:
- """
- Start and enable the service (typically using systemd).
- """
- call_throws(self.ctx, ['systemctl', 'enable', '--now', service])
-
-
-class Apt(Packager):
- DISTRO_NAMES = {
- 'ubuntu': 'ubuntu',
- 'debian': 'debian',
- }
-
- def __init__(self, ctx: CephadmContext,
- stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
- distro: Optional[str], distro_version: Optional[str], distro_codename: Optional[str]) -> None:
- super(Apt, self).__init__(ctx, stable=stable, version=version,
- branch=branch, commit=commit)
- assert distro
- self.ctx = ctx
- self.distro = self.DISTRO_NAMES[distro]
- self.distro_codename = distro_codename
- self.distro_version = distro_version
-
- def repo_path(self) -> str:
- return '/etc/apt/sources.list.d/ceph.list'
-
- def add_repo(self) -> None:
-
- url, name = self.repo_gpgkey()
- logger.info('Installing repo GPG key from %s...' % url)
- try:
- response = urlopen(url)
- except HTTPError as err:
- logger.error('failed to fetch GPG repo key from %s: %s' % (
- url, err))
- raise Error('failed to fetch GPG key')
- key = response.read()
- with open('/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name, 'wb') as f:
- f.write(key)
-
- if self.version:
- content = 'deb %s/debian-%s/ %s main\n' % (
- self.ctx.repo_url, self.version, self.distro_codename)
- elif self.stable:
- content = 'deb %s/debian-%s/ %s main\n' % (
- self.ctx.repo_url, self.stable, self.distro_codename)
- else:
- content = self.query_shaman(self.distro, self.distro_codename, self.branch,
- self.commit)
-
- logger.info('Installing repo file at %s...' % self.repo_path())
- with open(self.repo_path(), 'w') as f:
- f.write(content)
-
- self.update()
-
- def rm_repo(self) -> None:
- for name in ['autobuild', 'release', 'manual']:
- p = '/etc/apt/trusted.gpg.d/ceph.%s.gpg' % name
- if os.path.exists(p):
- logger.info('Removing repo GPG key %s...' % p)
- os.unlink(p)
- if os.path.exists(self.repo_path()):
- logger.info('Removing repo at %s...' % self.repo_path())
- os.unlink(self.repo_path())
-
- if self.distro == 'ubuntu':
- self.rm_kubic_repo()
-
- def install(self, ls: List[str]) -> None:
- logger.info('Installing packages %s...' % ls)
- call_throws(self.ctx, ['apt-get', 'install', '-y'] + ls)
-
- def update(self) -> None:
- logger.info('Updating package list...')
- call_throws(self.ctx, ['apt-get', 'update'])
-
- def install_podman(self) -> None:
- if self.distro == 'ubuntu':
- logger.info('Setting up repo for podman...')
- self.add_kubic_repo()
- self.update()
-
- logger.info('Attempting podman install...')
- try:
- self.install(['podman'])
- except Error:
- logger.info('Podman did not work. Falling back to docker...')
- self.install(['docker.io'])
-
- def kubic_repo_url(self) -> str:
- return 'https://download.opensuse.org/repositories/devel:/kubic:/' \
- 'libcontainers:/stable/xUbuntu_%s/' % self.distro_version
-
- def kubic_repo_path(self) -> str:
- return '/etc/apt/sources.list.d/devel:kubic:libcontainers:stable.list'
-
- def kubic_repo_gpgkey_url(self) -> str:
- return '%s/Release.key' % self.kubic_repo_url()
-
- def kubic_repo_gpgkey_path(self) -> str:
- return '/etc/apt/trusted.gpg.d/kubic.release.gpg'
-
- def add_kubic_repo(self) -> None:
- url = self.kubic_repo_gpgkey_url()
- logger.info('Installing repo GPG key from %s...' % url)
- try:
- response = urlopen(url)
- except HTTPError as err:
- logger.error('failed to fetch GPG repo key from %s: %s' % (
- url, err))
- raise Error('failed to fetch GPG key')
- key = response.read().decode('utf-8')
- tmp_key = write_tmp(key, 0, 0)
- keyring = self.kubic_repo_gpgkey_path()
- call_throws(self.ctx, ['apt-key', '--keyring', keyring, 'add', tmp_key.name])
-
- logger.info('Installing repo file at %s...' % self.kubic_repo_path())
- content = 'deb %s /\n' % self.kubic_repo_url()
- with open(self.kubic_repo_path(), 'w') as f:
- f.write(content)
-
- def rm_kubic_repo(self) -> None:
- keyring = self.kubic_repo_gpgkey_path()
- if os.path.exists(keyring):
- logger.info('Removing repo GPG key %s...' % keyring)
- os.unlink(keyring)
-
- p = self.kubic_repo_path()
- if os.path.exists(p):
- logger.info('Removing repo at %s...' % p)
- os.unlink(p)
-
-
-class YumDnf(Packager):
- DISTRO_NAMES = {
- 'centos': ('centos', 'el'),
- 'rhel': ('centos', 'el'),
- 'scientific': ('centos', 'el'),
- 'rocky': ('centos', 'el'),
- 'almalinux': ('centos', 'el'),
- 'ol': ('centos', 'el'),
- 'fedora': ('fedora', 'fc'),
- 'mariner': ('mariner', 'cm'),
- }
-
- def __init__(self, ctx: CephadmContext,
- stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
- distro: Optional[str], distro_version: Optional[str]) -> None:
- super(YumDnf, self).__init__(ctx, stable=stable, version=version,
- branch=branch, commit=commit)
- assert distro
- assert distro_version
- self.ctx = ctx
- self.major = int(distro_version.split('.')[0])
- self.distro_normalized = self.DISTRO_NAMES[distro][0]
- self.distro_code = self.DISTRO_NAMES[distro][1] + str(self.major)
- if (self.distro_code == 'fc' and self.major >= 30) or \
- (self.distro_code == 'el' and self.major >= 8):
- self.tool = 'dnf'
- elif (self.distro_code == 'cm'):
- self.tool = 'tdnf'
- else:
- self.tool = 'yum'
-
- def custom_repo(self, **kw: Any) -> str:
- """
- Repo files need special care in that a whole line should not be present
- if there is no value for it. Because we were using `format()` we could
- not conditionally add a line for a repo file. So the end result would
- contain a key with a missing value (say if we were passing `None`).
-
- For example, it could look like::
-
- [ceph repo]
- name= ceph repo
- proxy=
- gpgcheck=
-
- Which breaks. This function allows us to conditionally add lines,
- preserving an order and be more careful.
-
- Previously, and for historical purposes, this is how the template used
- to look::
-
- custom_repo =
- [{repo_name}]
- name={name}
- baseurl={baseurl}
- enabled={enabled}
- gpgcheck={gpgcheck}
- type={_type}
- gpgkey={gpgkey}
- proxy={proxy}
-
- """
- lines = []
-
- # by using tuples (vs a dict) we preserve the order of what we want to
- # return, like starting with a [repo name]
- tmpl = (
- ('reponame', '[%s]'),
- ('name', 'name=%s'),
- ('baseurl', 'baseurl=%s'),
- ('enabled', 'enabled=%s'),
- ('gpgcheck', 'gpgcheck=%s'),
- ('_type', 'type=%s'),
- ('gpgkey', 'gpgkey=%s'),
- ('proxy', 'proxy=%s'),
- ('priority', 'priority=%s'),
- )
-
- for line in tmpl:
- tmpl_key, tmpl_value = line # key values from tmpl
-
- # ensure that there is an actual value (not None nor empty string)
- if tmpl_key in kw and kw.get(tmpl_key) not in (None, ''):
- lines.append(tmpl_value % kw.get(tmpl_key))
-
- return '\n'.join(lines)
-
- def repo_path(self) -> str:
- return '/etc/yum.repos.d/ceph.repo'
-
- def repo_baseurl(self) -> str:
- assert self.stable or self.version
- if self.version:
- return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.version,
- self.distro_code)
- else:
- return '%s/rpm-%s/%s' % (self.ctx.repo_url, self.stable,
- self.distro_code)
-
- def validate(self) -> None:
- if self.distro_code.startswith('fc'):
- raise Error('Ceph team does not build Fedora specific packages and therefore cannot add repos for this distro')
- if self.distro_code == 'el7':
- if self.stable and self.stable >= 'pacific':
- raise Error('Ceph does not support pacific or later for this version of this linux distro and therefore cannot add a repo for it')
- if self.version and self.version.split('.')[0] >= '16':
- raise Error('Ceph does not support 16.y.z or later for this version of this linux distro and therefore cannot add a repo for it')
-
- if self.stable or self.version:
- # we know that yum & dnf require there to be a
- # $base_url/$arch/repodata/repomd.xml so we can test if this URL
- # is gettable in order to validate the inputs
- test_url = self.repo_baseurl() + '/noarch/repodata/repomd.xml'
- try:
- urlopen(test_url)
- except HTTPError as err:
- logger.error('unable to fetch repo metadata: %r', err)
- raise Error('failed to fetch repository metadata. please check'
- ' the provided parameters are correct and try again')
-
- def add_repo(self) -> None:
- if self.stable or self.version:
- content = ''
- for n, t in {
- 'Ceph': '$basearch',
- 'Ceph-noarch': 'noarch',
- 'Ceph-source': 'SRPMS'}.items():
- content += '[%s]\n' % (n)
- content += self.custom_repo(
- name='Ceph %s' % t,
- baseurl=self.repo_baseurl() + '/' + t,
- enabled=1,
- gpgcheck=1,
- gpgkey=self.repo_gpgkey()[0],
- )
- content += '\n\n'
- else:
- content = self.query_shaman(self.distro_normalized, self.major,
- self.branch,
- self.commit)
-
- logger.info('Writing repo to %s...' % self.repo_path())
- with open(self.repo_path(), 'w') as f:
- f.write(content)
-
- if self.distro_code.startswith('el'):
- logger.info('Enabling EPEL...')
- call_throws(self.ctx, [self.tool, 'install', '-y', 'epel-release'])
-
- def rm_repo(self) -> None:
- if os.path.exists(self.repo_path()):
- os.unlink(self.repo_path())
-
- def install(self, ls: List[str]) -> None:
- logger.info('Installing packages %s...' % ls)
- call_throws(self.ctx, [self.tool, 'install', '-y'] + ls)
-
- def install_podman(self) -> None:
- self.install(['podman'])
-
-
-class Zypper(Packager):
- DISTRO_NAMES = [
- 'sles',
- 'opensuse-tumbleweed',
- 'opensuse-leap'
- ]
-
- def __init__(self, ctx: CephadmContext,
- stable: Optional[str], version: Optional[str], branch: Optional[str], commit: Optional[str],
- distro: Optional[str], distro_version: Optional[str]) -> None:
- super(Zypper, self).__init__(ctx, stable=stable, version=version,
- branch=branch, commit=commit)
- assert distro is not None
- self.ctx = ctx
- self.tool = 'zypper'
- self.distro = 'opensuse'
- self.distro_version = '15.1'
- if 'tumbleweed' not in distro and distro_version is not None:
- self.distro_version = distro_version
-
- def custom_repo(self, **kw: Any) -> str:
- """
- See YumDnf for format explanation.
- """
- lines = []
-
- # by using tuples (vs a dict) we preserve the order of what we want to
- # return, like starting with a [repo name]
- tmpl = (
- ('reponame', '[%s]'),
- ('name', 'name=%s'),
- ('baseurl', 'baseurl=%s'),
- ('enabled', 'enabled=%s'),
- ('gpgcheck', 'gpgcheck=%s'),
- ('_type', 'type=%s'),
- ('gpgkey', 'gpgkey=%s'),
- ('proxy', 'proxy=%s'),
- ('priority', 'priority=%s'),
- )
-
- for line in tmpl:
- tmpl_key, tmpl_value = line # key values from tmpl
-
- # ensure that there is an actual value (not None nor empty string)
- if tmpl_key in kw and kw.get(tmpl_key) not in (None, ''):
- lines.append(tmpl_value % kw.get(tmpl_key))
-
- return '\n'.join(lines)
-
- def repo_path(self) -> str:
- return '/etc/zypp/repos.d/ceph.repo'
-
- def repo_baseurl(self) -> str:
- assert self.stable or self.version
- if self.version:
- return '%s/rpm-%s/%s' % (self.ctx.repo_url,
- self.stable, self.distro)
- else:
- return '%s/rpm-%s/%s' % (self.ctx.repo_url,
- self.stable, self.distro)
-
- def add_repo(self) -> None:
- if self.stable or self.version:
- content = ''
- for n, t in {
- 'Ceph': '$basearch',
- 'Ceph-noarch': 'noarch',
- 'Ceph-source': 'SRPMS'}.items():
- content += '[%s]\n' % (n)
- content += self.custom_repo(
- name='Ceph %s' % t,
- baseurl=self.repo_baseurl() + '/' + t,
- enabled=1,
- gpgcheck=1,
- gpgkey=self.repo_gpgkey()[0],
- )
- content += '\n\n'
- else:
- content = self.query_shaman(self.distro, self.distro_version,
- self.branch,
- self.commit)
-
- logger.info('Writing repo to %s...' % self.repo_path())
- with open(self.repo_path(), 'w') as f:
- f.write(content)
-
- def rm_repo(self) -> None:
- if os.path.exists(self.repo_path()):
- os.unlink(self.repo_path())
-
- def install(self, ls: List[str]) -> None:
- logger.info('Installing packages %s...' % ls)
- call_throws(self.ctx, [self.tool, 'in', '-y'] + ls)
-
- def install_podman(self) -> None:
- self.install(['podman'])
-
-
-def create_packager(ctx: CephadmContext,
- stable: Optional[str] = None, version: Optional[str] = None,
- branch: Optional[str] = None, commit: Optional[str] = None) -> Packager:
- distro, distro_version, distro_codename = get_distro()
- if distro in YumDnf.DISTRO_NAMES:
- return YumDnf(ctx, stable=stable, version=version,
- branch=branch, commit=commit,
- distro=distro, distro_version=distro_version)
- elif distro in Apt.DISTRO_NAMES:
- return Apt(ctx, stable=stable, version=version,
- branch=branch, commit=commit,
- distro=distro, distro_version=distro_version,
- distro_codename=distro_codename)
- elif distro in Zypper.DISTRO_NAMES:
- return Zypper(ctx, stable=stable, version=version,
- branch=branch, commit=commit,
- distro=distro, distro_version=distro_version)
- raise Error('Distro %s version %s not supported' % (distro, distro_version))
-
-
def command_add_repo(ctx: CephadmContext) -> None:
if ctx.version and ctx.release:
raise Error('you can specify either --release or --version but not both')
@@ -9206,843 +6966,6 @@ def command_rescan_disks(ctx: CephadmContext) -> str:
return f'Ok. {len(all_scan_files)} adapters detected: {len(scan_files)} rescanned, {len(skipped)} skipped, {len(failures)} failed ({elapsed:.2f}s)'
-##################################
-
-
-def get_ipv4_address(ifname):
- # type: (str) -> str
- def _extract(sock: socket.socket, offset: int) -> str:
- return socket.inet_ntop(
- socket.AF_INET,
- fcntl.ioctl(
- sock.fileno(),
- offset,
- struct.pack('256s', bytes(ifname[:15], 'utf-8'))
- )[20:24])
-
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- addr = _extract(s, 35093) # '0x8915' = SIOCGIFADDR
- dq_mask = _extract(s, 35099) # 0x891b = SIOCGIFNETMASK
- except OSError:
- # interface does not have an ipv4 address
- return ''
-
- dec_mask = sum([bin(int(i)).count('1')
- for i in dq_mask.split('.')])
- return '{}/{}'.format(addr, dec_mask)
-
-
-def get_ipv6_address(ifname):
- # type: (str) -> str
- if not os.path.exists('/proc/net/if_inet6'):
- return ''
-
- raw = read_file(['/proc/net/if_inet6'])
- data = raw.splitlines()
- # based on docs @ https://www.tldp.org/HOWTO/Linux+IPv6-HOWTO/ch11s04.html
- # field 0 is ipv6, field 2 is scope
- for iface_setting in data:
- field = iface_setting.split()
- if field[-1] == ifname:
- ipv6_raw = field[0]
- ipv6_fmtd = ':'.join([ipv6_raw[_p:_p + 4] for _p in range(0, len(field[0]), 4)])
- # apply naming rules using ipaddress module
- ipv6 = ipaddress.ip_address(ipv6_fmtd)
- return '{}/{}'.format(str(ipv6), int('0x{}'.format(field[2]), 16))
- return ''
-
-
-def bytes_to_human(num, mode='decimal'):
- # type: (float, str) -> str
- """Convert a bytes value into it's human-readable form.
-
- :param num: number, in bytes, to convert
- :param mode: Either decimal (default) or binary to determine divisor
- :returns: string representing the bytes value in a more readable format
- """
- unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB']
- divisor = 1000.0
- yotta = 'YB'
-
- if mode == 'binary':
- unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB']
- divisor = 1024.0
- yotta = 'YiB'
-
- for unit in unit_list:
- if abs(num) < divisor:
- return '%3.1f%s' % (num, unit)
- num /= divisor
- return '%.1f%s' % (num, yotta)
-
-
-def read_file(path_list, file_name=''):
- # type: (List[str], str) -> str
- """Returns the content of the first file found within the `path_list`
-
- :param path_list: list of file paths to search
- :param file_name: optional file_name to be applied to a file path
- :returns: content of the file or 'Unknown'
- """
- for path in path_list:
- if file_name:
- file_path = os.path.join(path, file_name)
- else:
- file_path = path
- if os.path.exists(file_path):
- with open(file_path, 'rb') as f:
- try:
- content = f.read().decode('utf-8', 'ignore').strip()
- except OSError:
- # sysfs may populate the file, but for devices like
- # virtio reads can fail
- return 'Unknown'
- else:
- return content
- return 'Unknown'
-
-##################################
-
-
-class Enclosure:
- def __init__(self, enc_id: str, enc_path: str, dev_path: str):
- """External disk enclosure metadata
-
- Args:
- :param enc_id: enclosure id (normally a WWN)
- :param enc_path: sysfs path to HBA attached to the enclosure
- e.g. /sys/class/scsi_generic/sg11/device/enclosure/0:0:9:0
- :param dev_path: sysfs path to the generic scsi device for the enclosure HBA
- e.g. /sys/class/scsi_generic/sg2
- """
- self._path: str = dev_path
- self._dev_path: str = os.path.join(dev_path, 'device')
- self._enc_path: str = enc_path
- self.ses_paths: List[str] = []
- self.path_count: int = 0
- self.vendor: str = ''
- self.model: str = ''
- self.enc_id: str = enc_id
- self.components: Union[int, str] = 0
- self.device_lookup: Dict[str, str] = {}
- self.device_count: int = 0
- self.slot_map: Dict[str, Dict[str, str]] = {}
-
- self._probe()
-
- def _probe(self) -> None:
- """Analyse the dev paths to identify enclosure related information"""
-
- self.vendor = read_file([os.path.join(self._dev_path, 'vendor')])
- self.model = read_file([os.path.join(self._dev_path, 'model')])
- self.components = read_file([os.path.join(self._enc_path, 'components')])
- slot_paths = glob(os.path.join(self._enc_path, '*', 'slot'))
- for slot_path in slot_paths:
- slot = read_file([slot_path])
- serial_path = os.path.join(os.path.dirname(slot_path), 'device', 'vpd_pg80')
- serial = ''
- if os.path.exists(serial_path):
- serial_raw = read_file([serial_path])
- serial = (''.join(char for char in serial_raw if char in string.printable)).strip()
- self.device_lookup[serial] = slot
- slot_dir = os.path.dirname(slot_path)
- self.slot_map[slot] = {
- 'status': read_file([os.path.join(slot_dir, 'status')]),
- 'fault': read_file([os.path.join(slot_dir, 'fault')]),
- 'locate': read_file([os.path.join(slot_dir, 'locate')]),
- 'serial': serial,
- }
-
- self.device_count = len(self.device_lookup)
- self.update(os.path.basename(self._path))
-
- def update(self, dev_id: str) -> None:
- """Update an enclosure object with a related sg device name
-
- :param dev_id (str): device name e.g. sg2
- """
- self.ses_paths.append(dev_id)
- self.path_count = len(self.ses_paths)
-
- def _dump(self) -> Dict[str, Any]:
- """Return a dict representation of the object"""
- return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
-
- def __str__(self) -> str:
- """Return a formatted json representation of the object as a string"""
- return json.dumps(self._dump(), indent=2)
-
- def __repr__(self) -> str:
- """Return a json representation of the object as a string"""
- return json.dumps(self._dump())
-
- def as_json(self) -> Dict[str, Any]:
- """Return a dict representing the object"""
- return self._dump()
-
-
-class HostFacts():
- _dmi_path_list = ['/sys/class/dmi/id']
- _nic_path_list = ['/sys/class/net']
- _apparmor_path_list = ['/etc/apparmor']
- _disk_vendor_workarounds = {
- '0x1af4': 'Virtio Block Device'
- }
- _excluded_block_devices = ('sr', 'zram', 'dm-', 'loop', 'md')
- _sg_generic_glob = '/sys/class/scsi_generic/*'
-
- def __init__(self, ctx: CephadmContext):
- self.ctx: CephadmContext = ctx
- self.cpu_model: str = 'Unknown'
- self.sysctl_options: Dict[str, str] = self._populate_sysctl_options()
- self.cpu_count: int = 0
- self.cpu_cores: int = 0
- self.cpu_threads: int = 0
- self.interfaces: Dict[str, Any] = {}
-
- self._meminfo: List[str] = read_file(['/proc/meminfo']).splitlines()
- self._get_cpuinfo()
- self._process_nics()
- self.arch: str = platform.processor()
- self.kernel: str = platform.release()
- self._enclosures = self._discover_enclosures()
- self._block_devices = self._get_block_devs()
- self._device_list = self._get_device_info()
-
- def _populate_sysctl_options(self) -> Dict[str, str]:
- sysctl_options = {}
- out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.QUIET_UNLESS_ERROR)
- if out:
- for line in out.splitlines():
- option, value = line.split('=')
- sysctl_options[option.strip()] = value.strip()
- return sysctl_options
-
- def _discover_enclosures(self) -> Dict[str, Enclosure]:
- """Build a dictionary of discovered scsi enclosures
-
- Enclosures are detected by walking the scsi generic sysfs hierarchy.
- Any device tree that holds an 'enclosure' subdirectory is interpreted as
- an enclosure. Once identified the enclosure directory is analysis to
- identify key descriptors that will help relate disks to enclosures and
- disks to enclosure slots.
-
- :return: Dict[str, Enclosure]: a map of enclosure id (hex) to enclosure object
- """
- sg_paths: List[str] = glob(HostFacts._sg_generic_glob)
- enclosures: Dict[str, Enclosure] = {}
-
- for sg_path in sg_paths:
- enc_path = os.path.join(sg_path, 'device', 'enclosure')
- if os.path.exists(enc_path):
- enc_dirs = glob(os.path.join(enc_path, '*'))
- if len(enc_dirs) != 1:
- # incomplete enclosure spec - expecting ONE dir in the format
- # host(adapter):bus:target:lun e.g. 16:0:0:0
- continue
- enc_path = enc_dirs[0]
- enc_id = read_file([os.path.join(enc_path, 'id')])
- if enc_id in enclosures:
- enclosures[enc_id].update(os.path.basename(sg_path))
- continue
-
- enclosure = Enclosure(enc_id, enc_path, sg_path)
- enclosures[enc_id] = enclosure
-
- return enclosures
-
- @property
- def enclosures(self) -> Dict[str, Dict[str, Any]]:
- """Dump the enclosure objects as dicts"""
- return {k: v._dump() for k, v in self._enclosures.items()}
-
- @property
- def enclosure_count(self) -> int:
- """Return the number of enclosures detected"""
- return len(self._enclosures.keys())
-
- def _get_cpuinfo(self):
- # type: () -> None
- """Determine cpu information via /proc/cpuinfo"""
- raw = read_file(['/proc/cpuinfo'])
- output = raw.splitlines()
- cpu_set = set()
-
- for line in output:
- field = [f.strip() for f in line.split(':')]
- if 'model name' in line:
- self.cpu_model = field[1]
- if 'physical id' in line:
- cpu_set.add(field[1])
- if 'siblings' in line:
- self.cpu_threads = int(field[1].strip())
- if 'cpu cores' in line:
- self.cpu_cores = int(field[1].strip())
- pass
- self.cpu_count = len(cpu_set)
-
- def _get_block_devs(self):
- # type: () -> List[str]
- """Determine the list of block devices by looking at /sys/block"""
- return [dev for dev in os.listdir('/sys/block')
- if not dev.startswith(HostFacts._excluded_block_devices)]
-
- @property
- def operating_system(self):
- # type: () -> str
- """Determine OS version"""
- raw_info = read_file(['/etc/os-release'])
- os_release = raw_info.splitlines()
- rel_str = 'Unknown'
- rel_dict = dict()
-
- for line in os_release:
- if '=' in line:
- var_name, var_value = line.split('=')
- rel_dict[var_name] = var_value.strip('"')
-
- # Would normally use PRETTY_NAME, but NAME and VERSION are more
- # consistent
- if all(_v in rel_dict for _v in ['NAME', 'VERSION']):
- rel_str = '{} {}'.format(rel_dict['NAME'], rel_dict['VERSION'])
- return rel_str
-
- @property
- def hostname(self):
- # type: () -> str
- """Return the hostname"""
- return platform.node()
-
- @property
- def shortname(self) -> str:
- return platform.node().split('.', 1)[0]
-
- @property
- def fqdn(self) -> str:
- return get_fqdn()
-
- @property
- def subscribed(self):
- # type: () -> str
- """Highlevel check to see if the host is subscribed to receive updates/support"""
- def _red_hat():
- # type: () -> str
- # RHEL 7 and RHEL 8
- entitlements_dir = '/etc/pki/entitlement'
- if os.path.exists(entitlements_dir):
- pems = glob('{}/*.pem'.format(entitlements_dir))
- if len(pems) >= 2:
- return 'Yes'
-
- return 'No'
-
- os_name = self.operating_system
- if os_name.upper().startswith('RED HAT'):
- return _red_hat()
-
- return 'Unknown'
-
- @property
- def hdd_count(self):
- # type: () -> int
- """Return a count of HDDs (spinners)"""
- return len(self.hdd_list)
-
- def _get_capacity(self, dev):
- # type: (str) -> int
- """Determine the size of a given device
-
- The kernel always bases device size calculations based on a 512 byte
- sector. For more information see
- https://git.kernel.org/pub/scm/linux/kernel/git/stable/linux.git/tree/include/linux/types.h?h=v5.15.63#n120
- """
- size_path = os.path.join('/sys/block', dev, 'size')
- size_blocks = int(read_file([size_path]))
- return size_blocks * 512
-
- def _get_capacity_by_type(self, disk_type='hdd'):
- # type: (str) -> int
- """Return the total capacity of a category of device (flash or hdd)"""
- capacity: int = 0
- for dev in self._device_list:
- if dev['disk_type'] == disk_type:
- disk_capacity = cast(int, dev.get('disk_size_bytes', 0))
- capacity += disk_capacity
- return capacity
-
- def _get_device_info(self):
- # type: () -> List[Dict[str, object]]
- """Return a 'pretty' name list for each unique device in the `dev_list`"""
- disk_list = list()
-
- # serial_num_lookup is a dict of serial number -> List of devices with that serial number
- serial_num_lookup: Dict[str, List[str]] = {}
-
- # make a map of devname -> disk path. this path name may indicate the physical slot
- # of a drive (phyXX)
- disk_path_map: Dict[str, str] = {}
- for path in glob('/dev/disk/by-path/*'):
- tgt_raw = Path(path).resolve()
- tgt = os.path.basename(str(tgt_raw))
- disk_path_map[tgt] = path
-
- # make a map of holder (dm-XX) -> full mpath name
- dm_device_map: Dict[str, str] = {}
- for mpath in glob('/dev/mapper/mpath*'):
- tgt_raw = Path(mpath).resolve()
- tgt = os.path.basename(str(tgt_raw))
- dm_device_map[tgt] = mpath
-
- # main loop to process all eligible block devices
- for dev in self._block_devices:
- enclosure_id = ''
- enclosure_slot = ''
- scsi_addr = ''
- mpath = ''
-
- disk_model = read_file(['/sys/block/{}/device/model'.format(dev)]).strip()
- disk_rev = read_file(['/sys/block/{}/device/rev'.format(dev)]).strip()
- disk_wwid = read_file(['/sys/block/{}/device/wwid'.format(dev)]).strip()
- vendor = read_file(['/sys/block/{}/device/vendor'.format(dev)]).strip()
- rotational = read_file(['/sys/block/{}/queue/rotational'.format(dev)])
- holders_raw = glob('/sys/block/{}/holders/*'.format(dev))
- if len(holders_raw) == 1:
- # mpath will have 1 holder entry
- holder = os.path.basename(holders_raw[0])
- mpath = dm_device_map.get(holder, '')
-
- disk_type = 'hdd' if rotational == '1' else 'flash'
- scsi_addr_path = glob('/sys/block/{}/device/bsg/*'.format(dev))
- if len(scsi_addr_path) == 1:
- scsi_addr = os.path.basename(scsi_addr_path[0])
-
- # vpd_pg80 isn't guaranteed (libvirt, vmware for example)
- serial_raw = read_file(['/sys/block/{}/device/vpd_pg80'.format(dev)])
- serial = (''.join(i for i in serial_raw if i in string.printable)).strip()
- if serial.lower() == 'unknown':
- serial = ''
- else:
- if serial in serial_num_lookup:
- serial_num_lookup[serial].append(dev)
- else:
- serial_num_lookup[serial] = [dev]
- for enc_id, enclosure in self._enclosures.items():
- if serial in enclosure.device_lookup.keys():
- enclosure_id = enc_id
- enclosure_slot = enclosure.device_lookup[serial]
-
- disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor)
- disk_size_bytes = self._get_capacity(dev)
- disk_list.append({
- 'description': '{} {} ({})'.format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)),
- 'vendor': disk_vendor,
- 'model': disk_model,
- 'rev': disk_rev,
- 'wwid': disk_wwid,
- 'dev_name': dev,
- 'disk_size_bytes': disk_size_bytes,
- 'disk_type': disk_type,
- 'serial': serial,
- 'alt_dev_name': '',
- 'scsi_addr': scsi_addr,
- 'enclosure_id': enclosure_id,
- 'enclosure_slot': enclosure_slot,
- 'path_id': disk_path_map.get(dev, ''),
- 'mpath': mpath,
- })
-
- # process the devices to drop duplicate physical devs based on matching
- # the unique serial number
- disk_list_unique: List[Dict[str, Any]] = []
- serials_seen: List[str] = []
- for dev in disk_list:
- serial = str(dev['serial'])
- if serial:
- if serial in serials_seen:
- continue
- else:
- serials_seen.append(serial)
- devs = serial_num_lookup[serial].copy()
- devs.remove(str(dev['dev_name']))
- dev['alt_dev_name'] = ','.join(devs)
- disk_list_unique.append(dev)
-
- return disk_list_unique
-
- @property
- def hdd_list(self):
- # type: () -> List[Dict[str, object]]
- """Return a list of devices that are HDDs (spinners)"""
- return [dev for dev in self._device_list if dev['disk_type'] == 'hdd']
-
- @property
- def flash_list(self):
- # type: () -> List[Dict[str, object]]
- """Return a list of devices that are flash based (SSD, NVMe)"""
- return [dev for dev in self._device_list if dev['disk_type'] == 'flash']
-
- @property
- def hdd_capacity_bytes(self):
- # type: () -> int
- """Return the total capacity for all HDD devices (bytes)"""
- return self._get_capacity_by_type(disk_type='hdd')
-
- @property
- def hdd_capacity(self):
- # type: () -> str
- """Return the total capacity for all HDD devices (human readable format)"""
- return bytes_to_human(self.hdd_capacity_bytes)
-
- @property
- def cpu_load(self):
- # type: () -> Dict[str, float]
- """Return the cpu load average data for the host"""
- raw = read_file(['/proc/loadavg']).strip()
- data = raw.split()
- return {
- '1min': float(data[0]),
- '5min': float(data[1]),
- '15min': float(data[2]),
- }
-
- @property
- def flash_count(self):
- # type: () -> int
- """Return the number of flash devices in the system (SSD, NVMe)"""
- return len(self.flash_list)
-
- @property
- def flash_capacity_bytes(self):
- # type: () -> int
- """Return the total capacity for all flash devices (bytes)"""
- return self._get_capacity_by_type(disk_type='flash')
-
- @property
- def flash_capacity(self):
- # type: () -> str
- """Return the total capacity for all Flash devices (human readable format)"""
- return bytes_to_human(self.flash_capacity_bytes)
-
- def _process_nics(self):
- # type: () -> None
- """Look at the NIC devices and extract network related metadata"""
- # from https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_arp.h
- hw_lookup = {
- '1': 'ethernet',
- '32': 'infiniband',
- '772': 'loopback',
- }
-
- for nic_path in HostFacts._nic_path_list:
- if not os.path.exists(nic_path):
- continue
- for iface in os.listdir(nic_path):
-
- if os.path.exists(os.path.join(nic_path, iface, 'bridge')):
- nic_type = 'bridge'
- elif os.path.exists(os.path.join(nic_path, iface, 'bonding')):
- nic_type = 'bonding'
- else:
- nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown')
-
- if nic_type == 'loopback': # skip loopback devices
- continue
-
- lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))]
- upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))]
-
- try:
- mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')]))
- except ValueError:
- mtu = 0
-
- operstate = read_file([os.path.join(nic_path, iface, 'operstate')])
- try:
- speed = int(read_file([os.path.join(nic_path, iface, 'speed')]))
- except (OSError, ValueError):
- # OSError : device doesn't support the ethtool get_link_ksettings
- # ValueError : raised when the read fails, and returns Unknown
- #
- # Either way, we show a -1 when speed isn't available
- speed = -1
-
- dev_link = os.path.join(nic_path, iface, 'device')
- if os.path.exists(dev_link):
- iftype = 'physical'
- driver_path = os.path.join(dev_link, 'driver')
- if os.path.exists(driver_path):
- driver = os.path.basename(os.path.realpath(driver_path))
- else:
- driver = 'Unknown'
-
- else:
- iftype = 'logical'
- driver = ''
-
- self.interfaces[iface] = {
- 'mtu': mtu,
- 'upper_devs_list': upper_devs_list,
- 'lower_devs_list': lower_devs_list,
- 'operstate': operstate,
- 'iftype': iftype,
- 'nic_type': nic_type,
- 'driver': driver,
- 'speed': speed,
- 'ipv4_address': get_ipv4_address(iface),
- 'ipv6_address': get_ipv6_address(iface),
- }
-
- @property
- def nic_count(self):
- # type: () -> int
- """Return a total count of all physical NICs detected in the host"""
- phys_devs = []
- for iface in self.interfaces:
- if self.interfaces[iface]['iftype'] == 'physical':
- phys_devs.append(iface)
- return len(phys_devs)
-
- def _get_mem_data(self, field_name):
- # type: (str) -> int
- for line in self._meminfo:
- if line.startswith(field_name):
- _d = line.split()
- return int(_d[1])
- return 0
-
- @property
- def memory_total_kb(self):
- # type: () -> int
- """Determine the memory installed (kb)"""
- return self._get_mem_data('MemTotal')
-
- @property
- def memory_free_kb(self):
- # type: () -> int
- """Determine the memory free (not cache, immediately usable)"""
- return self._get_mem_data('MemFree')
-
- @property
- def memory_available_kb(self):
- # type: () -> int
- """Determine the memory available to new applications without swapping"""
- return self._get_mem_data('MemAvailable')
-
- @property
- def vendor(self):
- # type: () -> str
- """Determine server vendor from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, 'sys_vendor')
-
- @property
- def model(self):
- # type: () -> str
- """Determine server model information from DMI data in sysfs"""
- family = read_file(HostFacts._dmi_path_list, 'product_family')
- product = read_file(HostFacts._dmi_path_list, 'product_name')
- if family == 'Unknown' and product:
- return '{}'.format(product)
-
- return '{} ({})'.format(family, product)
-
- @property
- def bios_version(self):
- # type: () -> str
- """Determine server BIOS version from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, 'bios_version')
-
- @property
- def bios_date(self):
- # type: () -> str
- """Determine server BIOS date from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, 'bios_date')
-
- @property
- def chassis_serial(self):
- # type: () -> str
- """Determine chassis serial number from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, 'chassis_serial')
-
- @property
- def board_serial(self):
- # type: () -> str
- """Determine mainboard serial number from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, 'board_serial')
-
- @property
- def product_serial(self):
- # type: () -> str
- """Determine server's serial number from DMI data in sysfs"""
- return read_file(HostFacts._dmi_path_list, 'product_serial')
-
- @property
- def timestamp(self):
- # type: () -> float
- """Return the current time as Epoch seconds"""
- return time.time()
-
- @property
- def system_uptime(self):
- # type: () -> float
- """Return the system uptime (in secs)"""
- raw_time = read_file(['/proc/uptime'])
- up_secs, _ = raw_time.split()
- return float(up_secs)
-
- @property
- def kernel_security(self):
- # type: () -> Dict[str, str]
- """Determine the security features enabled in the kernel - SELinux, AppArmor"""
- def _fetch_selinux() -> Dict[str, str]:
- """Get the selinux status"""
- security = {}
- try:
- out, err, code = call(self.ctx, ['sestatus'],
- verbosity=CallVerbosity.QUIET)
- security['type'] = 'SELinux'
- status, mode, policy = '', '', ''
- for line in out.split('\n'):
- if line.startswith('SELinux status:'):
- k, v = line.split(':')
- status = v.strip()
- elif line.startswith('Current mode:'):
- k, v = line.split(':')
- mode = v.strip()
- elif line.startswith('Loaded policy name:'):
- k, v = line.split(':')
- policy = v.strip()
- if status == 'disabled':
- security['description'] = 'SELinux: Disabled'
- else:
- security['description'] = 'SELinux: Enabled({}, {})'.format(mode, policy)
- except Exception as e:
- logger.info('unable to get selinux status: %s' % e)
- return security
-
- def _fetch_apparmor() -> Dict[str, str]:
- """Read the apparmor profiles directly, returning an overview of AppArmor status"""
- security = {}
- for apparmor_path in HostFacts._apparmor_path_list:
- if os.path.exists(apparmor_path):
- security['type'] = 'AppArmor'
- security['description'] = 'AppArmor: Enabled'
- try:
- profiles = read_file(['/sys/kernel/security/apparmor/profiles'])
- if len(profiles) == 0:
- return {}
- except OSError:
- pass
- else:
- summary = {} # type: Dict[str, int]
- for line in profiles.split('\n'):
- item, mode = line.split(' ')
- mode = mode.strip('()')
- if mode in summary:
- summary[mode] += 1
- else:
- summary[mode] = 0
- summary_str = ','.join(['{} {}'.format(v, k) for k, v in summary.items()])
- security = {**security, **summary} # type: ignore
- security['description'] += '({})'.format(summary_str)
-
- return security
- return {}
-
- ret = {}
- if os.path.exists('/sys/kernel/security/lsm'):
- lsm = read_file(['/sys/kernel/security/lsm']).strip()
- if 'selinux' in lsm:
- ret = _fetch_selinux()
- elif 'apparmor' in lsm:
- ret = _fetch_apparmor()
- else:
- return {
- 'type': 'Unknown',
- 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor'
- }
-
- if ret:
- return ret
-
- return {
- 'type': 'None',
- 'description': 'Linux Security Module framework is not available'
- }
-
- @property
- def selinux_enabled(self) -> bool:
- return (self.kernel_security['type'] == 'SELinux') and \
- (self.kernel_security['description'] != 'SELinux: Disabled')
-
- @property
- def kernel_parameters(self):
- # type: () -> Dict[str, str]
- """Get kernel parameters required/used in Ceph clusters"""
-
- k_param = {}
- out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT)
- if out:
- param_list = out.split('\n')
- param_dict = {param.split(' = ')[0]: param.split(' = ')[-1] for param in param_list}
-
- # return only desired parameters
- if 'net.ipv4.ip_nonlocal_bind' in param_dict:
- k_param['net.ipv4.ip_nonlocal_bind'] = param_dict['net.ipv4.ip_nonlocal_bind']
-
- return k_param
-
- @staticmethod
- def _process_net_data(tcp_file: str, protocol: str = 'tcp') -> List[int]:
- listening_ports = []
- # Connections state documentation
- # tcp - https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/net/tcp_states.h
- # udp - uses 07 (TCP_CLOSE or UNCONN, since udp is stateless. test with netcat -ul <port>)
- listening_state = {
- 'tcp': '0A',
- 'udp': '07'
- }
-
- if protocol not in listening_state.keys():
- return []
-
- if os.path.exists(tcp_file):
- with open(tcp_file) as f:
- tcp_data = f.readlines()[1:]
-
- for con in tcp_data:
- con_info = con.strip().split()
- if con_info[3] == listening_state[protocol]:
- local_port = int(con_info[1].split(':')[1], 16)
- listening_ports.append(local_port)
-
- return listening_ports
-
- @property
- def tcp_ports_used(self) -> List[int]:
- return HostFacts._process_net_data('/proc/net/tcp')
-
- @property
- def tcp6_ports_used(self) -> List[int]:
- return HostFacts._process_net_data('/proc/net/tcp6')
-
- @property
- def udp_ports_used(self) -> List[int]:
- return HostFacts._process_net_data('/proc/net/udp', 'udp')
-
- @property
- def udp6_ports_used(self) -> List[int]:
- return HostFacts._process_net_data('/proc/net/udp6', 'udp')
-
- def dump(self):
- # type: () -> str
- """Return the attributes of this HostFacts object as json"""
- data = {
- k: getattr(self, k) for k in dir(self)
- if not k.startswith('_')
- and isinstance(getattr(self, k), (float, int, str, list, dict, tuple))
- }
- return json.dumps(data, indent=2, sort_keys=True)
##################################
@@ -10257,6 +7180,11 @@ def _get_parser():
action='store_true',
help='Show debug-level log messages')
parser.add_argument(
+ '--log-dest',
+ action='append',
+ choices=[v.name for v in LogDestination],
+ help='select one or more destination for persistent logging')
+ parser.add_argument(
'--timeout',
type=int,
default=DEFAULT_TIMEOUT,
@@ -10287,6 +7215,11 @@ def _get_parser():
parser_version = subparsers.add_parser(
'version', help='get cephadm version')
parser_version.set_defaults(func=command_version)
+ parser_version.add_argument(
+ '--verbose',
+ action='store_true',
+ help='Detailed version information',
+ )
parser_pull = subparsers.add_parser(
'pull', help='pull the default container image')
@@ -10456,6 +7389,10 @@ def _get_parser():
'--no-hosts',
action='store_true',
help='dont pass /etc/hosts through to the container')
+ parser_shell.add_argument(
+ '--dry-run',
+ action='store_true',
+ help='print, but do not execute, the container command to start the shell')
parser_enter = subparsers.add_parser(
'enter', help='run an interactive shell inside a running daemon container')
@@ -10520,6 +7457,10 @@ def _get_parser():
required=True,
help='daemon name (type.id)')
+ parser_unit_install = subparsers.add_parser(
+ 'unit-install', help="Install the daemon's systemd unit")
+ parser_unit_install.set_defaults(func=command_unit_install)
+
parser_logs = subparsers.add_parser(
'logs', help='print journald logs for a daemon container')
parser_logs.set_defaults(func=command_logs)
@@ -10652,22 +7593,10 @@ def _get_parser():
'--allow-overwrite',
action='store_true',
help='allow overwrite of existing --output-* config/keyring/ssh files')
- # following logic to have both '--cleanup-on-failure' and '--no-cleanup-on-failure'
- # has been included in argparse of python v3.9, however since we have to support
- # older python versions the following is more generic. Once python v3.9 becomes
- # the minium supported version we can implement the same by using the new option
- # argparse.BooleanOptionalAction
- group = parser_bootstrap.add_mutually_exclusive_group()
- group.add_argument(
- '--cleanup-on-failure',
- action='store_true',
- default=True,
- help='Delete cluster files in case of a failed installation')
- group.add_argument(
+ parser_bootstrap.add_argument(
'--no-cleanup-on-failure',
- action='store_const',
- const=False,
- dest='cleanup_on_failure',
+ action='store_true',
+ default=False,
help='Do not delete cluster files in case of a failed installation')
parser_bootstrap.add_argument(
'--allow-fqdn-hostname',
@@ -10897,43 +7826,6 @@ def cephadm_init_ctx(args: List[str]) -> CephadmContext:
return ctx
-def cephadm_init_logging(ctx: CephadmContext, args: List[str]) -> None:
- """Configure the logging for cephadm as well as updating the system
- to have the expected log dir and logrotate configuration.
- """
- logging.addLevelName(QUIET_LOG_LEVEL, 'QUIET')
- global logger
- if not os.path.exists(LOG_DIR):
- os.makedirs(LOG_DIR)
- operations = ['bootstrap', 'rm-cluster']
- if any(op in args for op in operations):
- dictConfig(interactive_logging_config)
- else:
- dictConfig(logging_config)
-
- logger = logging.getLogger()
- logger.setLevel(QUIET_LOG_LEVEL)
-
- if not os.path.exists(ctx.logrotate_dir + '/cephadm'):
- with open(ctx.logrotate_dir + '/cephadm', 'w') as f:
- f.write("""# created by cephadm
-/var/log/ceph/cephadm.log {
- rotate 7
- daily
- compress
- missingok
- notifempty
- su root root
-}
-""")
-
- if ctx.verbose:
- for handler in logger.handlers:
- if handler.name in ['console', 'log_file', 'console_stdout']:
- handler.setLevel(QUIET_LOG_LEVEL)
- logger.debug('%s\ncephadm %s' % ('-' * 80, args))
-
-
def cephadm_require_root() -> None:
"""Exit if the process is not running as root."""
if os.geteuid() != 0:
@@ -10960,7 +7852,7 @@ def main() -> None:
sys.exit(1)
cephadm_require_root()
- cephadm_init_logging(ctx, av)
+ cephadm_init_logging(ctx, logger, av)
try:
# podman or docker?
ctx.container_engine = find_container_engine(ctx)