summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/cephadm/module.py')
-rw-r--r--src/pybind/mgr/cephadm/module.py435
1 files changed, 256 insertions, 179 deletions
diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py
index f8f0efc9d28..6690153d435 100644
--- a/src/pybind/mgr/cephadm/module.py
+++ b/src/pybind/mgr/cephadm/module.py
@@ -30,6 +30,7 @@ import multiprocessing.pool
import subprocess
from prettytable import PrettyTable
+from ceph.cephadm.images import DefaultImages
from ceph.deployment import inventory
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
@@ -101,6 +102,7 @@ from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall
from .configchecks import CephadmConfigChecks
from .offline_watcher import OfflineHostWatcher
from .tuned_profiles import TunedProfileUtils
+from .ceph_volume import CephVolume
try:
import asyncssh
@@ -129,28 +131,7 @@ def os_exit_noop(status: int) -> None:
os._exit = os_exit_noop # type: ignore
-
-# Default container images -----------------------------------------------------
DEFAULT_IMAGE = 'quay.io/ceph/ceph'
-DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.51.0'
-DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.7.0'
-DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:1.2.17'
-DEFAULT_LOKI_IMAGE = 'quay.io/ceph/loki:3.0.0'
-DEFAULT_PROMTAIL_IMAGE = 'quay.io/ceph/promtail:3.0.0'
-DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.27.0'
-DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/grafana:10.4.8'
-DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3'
-DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4'
-DEFAULT_SNMP_GATEWAY_IMAGE = 'quay.io/ceph/snmp-notifier:v1.2.1'
-DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23'
-DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29'
-DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29'
-DEFAULT_NGINX_IMAGE = 'quay.io/ceph/nginx:sclorg-nginx-126'
-DEFAULT_OAUTH2_PROXY_IMAGE = 'quay.io/oauth2-proxy/oauth2-proxy:v7.6.0'
-DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29'
-DEFAULT_SAMBA_IMAGE = 'quay.io/samba.org/samba-server:devbuilds-centos-amd64'
-DEFAULT_SAMBA_METRICS_IMAGE = 'quay.io/samba.org/samba-metrics:latest'
-# ------------------------------------------------------------------------------
def host_exists(hostname_position: int = 1) -> Callable:
@@ -237,96 +218,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
runtime=True,
),
Option(
- 'container_image_prometheus',
- default=DEFAULT_PROMETHEUS_IMAGE,
- desc='Prometheus container image',
- ),
- Option(
- 'container_image_nvmeof',
- default=DEFAULT_NVMEOF_IMAGE,
- desc='Nvme-of container image',
- ),
- Option(
- 'container_image_grafana',
- default=DEFAULT_GRAFANA_IMAGE,
- desc='Prometheus container image',
- ),
- Option(
- 'container_image_alertmanager',
- default=DEFAULT_ALERT_MANAGER_IMAGE,
- desc='Prometheus container image',
- ),
- Option(
- 'container_image_node_exporter',
- default=DEFAULT_NODE_EXPORTER_IMAGE,
- desc='Prometheus container image',
- ),
- Option(
- 'container_image_loki',
- default=DEFAULT_LOKI_IMAGE,
- desc='Loki container image',
- ),
- Option(
- 'container_image_promtail',
- default=DEFAULT_PROMTAIL_IMAGE,
- desc='Promtail container image',
- ),
- Option(
- 'container_image_haproxy',
- default=DEFAULT_HAPROXY_IMAGE,
- desc='HAproxy container image',
- ),
- Option(
- 'container_image_keepalived',
- default=DEFAULT_KEEPALIVED_IMAGE,
- desc='Keepalived container image',
- ),
- Option(
- 'container_image_snmp_gateway',
- default=DEFAULT_SNMP_GATEWAY_IMAGE,
- desc='SNMP Gateway container image',
- ),
- Option(
- 'container_image_nginx',
- default=DEFAULT_NGINX_IMAGE,
- desc='Nginx container image',
- ),
- Option(
- 'container_image_oauth2_proxy',
- default=DEFAULT_OAUTH2_PROXY_IMAGE,
- desc='oauth2-proxy container image',
- ),
- Option(
- 'container_image_elasticsearch',
- default=DEFAULT_ELASTICSEARCH_IMAGE,
- desc='elasticsearch container image',
- ),
- Option(
- 'container_image_jaeger_agent',
- default=DEFAULT_JAEGER_AGENT_IMAGE,
- desc='Jaeger agent container image',
- ),
- Option(
- 'container_image_jaeger_collector',
- default=DEFAULT_JAEGER_COLLECTOR_IMAGE,
- desc='Jaeger collector container image',
- ),
- Option(
- 'container_image_jaeger_query',
- default=DEFAULT_JAEGER_QUERY_IMAGE,
- desc='Jaeger query container image',
- ),
- Option(
- 'container_image_samba',
- default=DEFAULT_SAMBA_IMAGE,
- desc='Samba/SMB container image',
- ),
- Option(
- 'container_image_samba_metrics',
- default=DEFAULT_SAMBA_METRICS_IMAGE,
- desc='Samba/SMB metrics exporter container image',
- ),
- Option(
'warn_on_stray_hosts',
type='bool',
default=True,
@@ -562,6 +453,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
desc="Default address for RedFish API (oob management)."
),
]
+ for image in DefaultImages:
+ MODULE_OPTIONS.append(Option(image.key, default=image.image_ref, desc=image.desc))
def __init__(self, *args: Any, **kwargs: Any):
super(CephadmOrchestrator, self).__init__(*args, **kwargs)
@@ -792,6 +685,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
# as part of the handling of stray daemons
self.recently_altered_daemons: Dict[str, datetime.datetime] = {}
+ self.ceph_volume: CephVolume = CephVolume(self)
+
def shutdown(self) -> None:
self.log.debug('shutdown')
self._worker_pool.close()
@@ -819,30 +714,33 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule,
security_enabled = self.secure_monitoring_stack or mgmt_gw_enabled
return security_enabled, mgmt_gw_enabled, oauth2_proxy_enabled
- def get_mgmt_gw_internal_endpoint(self) -> Optional[str]:
+ def _get_mgmt_gw_endpoint(self, is_internal: bool) -> Optional[str]:
mgmt_gw_daemons = self.cache.get_daemons_by_service('mgmt-gateway')
if not mgmt_gw_daemons:
return None
dd = mgmt_gw_daemons[0]
assert dd.hostname is not None
- mgmt_gw_addr = self.get_fqdn(dd.hostname)
- mgmt_gw_internal_endpoint = build_url(scheme='https', host=mgmt_gw_addr, port=MgmtGatewayService.INTERNAL_SERVICE_PORT)
- return f'{mgmt_gw_internal_endpoint}/internal'
+ mgmt_gw_spec = cast(MgmtGatewaySpec, self.spec_store['mgmt-gateway'].spec)
+ mgmt_gw_addr = mgmt_gw_spec.virtual_ip if mgmt_gw_spec.virtual_ip is not None else self.get_fqdn(dd.hostname)
- def get_mgmt_gw_external_endpoint(self) -> Optional[str]:
- mgmt_gw_daemons = self.cache.get_daemons_by_service('mgmt-gateway')
- if not mgmt_gw_daemons:
- return None
+ if is_internal:
+ mgmt_gw_port: Optional[int] = MgmtGatewayService.INTERNAL_SERVICE_PORT
+ protocol = 'https'
+ endpoint_suffix = '/internal'
+ else:
+ mgmt_gw_port = dd.ports[0] if dd.ports else None
+ protocol = 'http' if mgmt_gw_spec.disable_https else 'https'
+ endpoint_suffix = ''
- dd = mgmt_gw_daemons[0]
- assert dd.hostname is not None
- mgmt_gw_port = dd.ports[0] if dd.ports else None
- mgmt_gw_addr = self.get_fqdn(dd.hostname)
- mgmt_gw_spec = cast(MgmtGatewaySpec, self.spec_store['mgmt-gateway'].spec)
- protocol = 'http' if mgmt_gw_spec.disable_https else 'https'
- mgmt_gw_external_endpoint = build_url(scheme=protocol, host=mgmt_gw_addr, port=mgmt_gw_port)
- return mgmt_gw_external_endpoint
+ mgmt_gw_endpoint = build_url(scheme=protocol, host=mgmt_gw_addr, port=mgmt_gw_port)
+ return f'{mgmt_gw_endpoint}{endpoint_suffix}'
+
+ def get_mgmt_gw_internal_endpoint(self) -> Optional[str]:
+ return self._get_mgmt_gw_endpoint(is_internal=True)
+
+ def get_mgmt_gw_external_endpoint(self) -> Optional[str]:
+ return self._get_mgmt_gw_endpoint(is_internal=False)
def _get_cephadm_binary_path(self) -> str:
import hashlib
@@ -1897,7 +1795,7 @@ Then run the following:
self.inventory.add_host(spec)
self.offline_hosts_remove(spec.hostname)
if spec.status == 'maintenance':
- self._set_maintenance_healthcheck()
+ self.set_maintenance_healthcheck()
self.event.set() # refresh stray health check
self.log.info('Added host %s' % spec.hostname)
return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr)
@@ -2068,6 +1966,7 @@ Then run the following:
self.ssh.reset_con(host)
# if host was in offline host list, we should remove it now.
self.offline_hosts_remove(host)
+ self.set_maintenance_healthcheck()
self.event.set() # refresh stray health check
self.log.info('Removed host %s' % host)
return "Removed {} host '{}'".format('offline' if offline else '', host)
@@ -2182,7 +2081,7 @@ Then run the following:
self.log.info(msg)
return msg
- def _set_maintenance_healthcheck(self) -> None:
+ def set_maintenance_healthcheck(self) -> None:
"""Raise/update or clear the maintenance health check as needed"""
in_maintenance = self.inventory.get_host_with_state("maintenance")
@@ -2266,12 +2165,12 @@ Then run the following:
self.inventory._inventory[hostname] = tgt_host
self.inventory.save()
- self._set_maintenance_healthcheck()
+ self.set_maintenance_healthcheck()
return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode'
@handle_orch_error
@host_exists()
- def exit_host_maintenance(self, hostname: str) -> str:
+ def exit_host_maintenance(self, hostname: str, force: bool = False, offline: bool = False) -> str:
"""Exit maintenance mode and return a host to an operational state
Returning from maintenance will enable the clusters systemd target and
@@ -2279,6 +2178,8 @@ Then run the following:
host has osd daemons
:param hostname: (str) host name
+ :param force: (bool) force removal of the host from maintenance mode
+ :param offline: (bool) to remove hosts that are offline from maintenance mode
:raises OrchestratorError: Unable to return from maintenance, or unset the
noout flag
@@ -2287,37 +2188,74 @@ Then run the following:
if tgt_host['status'] != "maintenance":
raise OrchestratorError(f"Host {hostname} is not in maintenance mode")
- with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'):
- outs, errs, _code = self.wait_async(
- CephadmServe(self)._run_cephadm(hostname, cephadmNoImage,
- 'host-maintenance', ['exit'], error_ok=True))
- returned_msg = errs[0].split('\n')[-1]
- if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'):
- raise OrchestratorError(
- f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
-
- if "osd" in self.cache.get_daemon_types(hostname):
- crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
- rc, _out, _err = self.mon_command({
- 'prefix': 'osd unset-group',
- 'flags': 'noout',
- 'who': [crush_node],
- 'format': 'json'
- })
- if rc:
+ # Given we do not regularly check maintenance mode hosts for being offline,
+ # we have no idea at this point whether the host is online or not.
+ # Keep in mind this goes both ways, as users could have run
+ # "ceph cephadm check-host <hostname>" when the host was in maintenance
+ # mode and offline and the host could have since come online. This following
+ # "cephadm check-host" command is being run purely so we know if the host
+ # is online or offline, as those should be handled differently
+ try:
+ with self.async_timeout_handler(hostname, 'cephadm check-host'):
+ outs, errs, _code = self.wait_async(
+ CephadmServe(self)._run_cephadm(
+ hostname, cephadmNoImage,
+ 'check-host', [], error_ok=False
+ )
+ )
+ except OrchestratorError:
+ pass
+
+ host_offline = hostname in self.offline_hosts
+
+ if host_offline and not offline:
+ raise OrchestratorValidationError(
+ f'{hostname} is offline, please use --offline and --force to take this host out of maintenance mode')
+
+ if not host_offline and offline:
+ raise OrchestratorValidationError(
+ f'{hostname} is online, please take host out of maintenance mode without --offline.')
+
+ if offline and not force:
+ raise OrchestratorValidationError("Taking an offline host out of maintenance mode requires --force")
+
+ # no point trying these parts if we know the host is offline
+ if not host_offline:
+ with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'):
+ outs, errs, _code = self.wait_async(
+ CephadmServe(self)._run_cephadm(hostname, cephadmNoImage,
+ 'host-maintenance', ['exit'], error_ok=True))
+ returned_msg = errs[0].split('\n')[-1]
+ if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')):
self.log.warning(
- f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
- raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
- else:
- self.log.info(
- f"exit maintenance request has UNSET for the noout group on host {hostname}")
+ f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
+ if not force:
+ raise OrchestratorError(
+ f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}")
+
+ if "osd" in self.cache.get_daemon_types(hostname):
+ crush_node = hostname if '.' not in hostname else hostname.split('.')[0]
+ rc, _out, _err = self.mon_command({
+ 'prefix': 'osd unset-group',
+ 'flags': 'noout',
+ 'who': [crush_node],
+ 'format': 'json'
+ })
+ if rc:
+ self.log.warning(
+ f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})")
+ if not force:
+ raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})")
+ else:
+ self.log.info(
+ f"exit maintenance request has UNSET for the noout group on host {hostname}")
# update the host record status
tgt_host['status'] = ""
self.inventory._inventory[hostname] = tgt_host
self.inventory.save()
- self._set_maintenance_healthcheck()
+ self.set_maintenance_healthcheck()
return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode"
@@ -2522,7 +2460,7 @@ Then run the following:
@handle_orch_error
def service_action(self, action: str, service_name: str) -> List[str]:
- if service_name not in self.spec_store.all_specs.keys():
+ if service_name not in self.spec_store.all_specs.keys() and service_name != 'osd':
raise OrchestratorError(f'Invalid service name "{service_name}".'
+ ' View currently running services using "ceph orch ls"')
dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name)
@@ -3001,8 +2939,16 @@ Then run the following:
daemon_names.append(dd.name())
return daemon_names
- alertmanager_user, alertmanager_password = self._get_alertmanager_credentials()
- prometheus_user, prometheus_password = self._get_prometheus_credentials()
+ prom_cred_hash = None
+ alertmgr_cred_hash = None
+ security_enabled, mgmt_gw_enabled, _ = self._get_security_config()
+ if security_enabled:
+ alertmanager_user, alertmanager_password = self._get_alertmanager_credentials()
+ prometheus_user, prometheus_password = self._get_prometheus_credentials()
+ if prometheus_user and prometheus_password:
+ prom_cred_hash = f'{utils.md5_hash(prometheus_user + prometheus_password)}'
+ if alertmanager_user and alertmanager_password:
+ alertmgr_cred_hash = f'{utils.md5_hash(alertmanager_user + alertmanager_password)}'
deps = []
if daemon_type == 'haproxy':
@@ -3049,9 +2995,10 @@ Then run the following:
else:
deps = [self.get_mgr_ip()]
elif daemon_type == 'prometheus':
- # for prometheus we add the active mgr as an explicit dependency,
- # this way we force a redeploy after a mgr failover
- deps.append(self.get_active_mgr().name())
+ if not mgmt_gw_enabled:
+ # for prometheus we add the active mgr as an explicit dependency,
+ # this way we force a redeploy after a mgr failover
+ deps.append(self.get_active_mgr().name())
deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283)))
deps.append(str(self.service_discovery_port))
# prometheus yaml configuration file (generated by prometheus.yml.j2) contains
@@ -3068,22 +3015,20 @@ Then run the following:
deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')]
deps += [d.name() for d in self.cache.get_daemons_by_service('mgmt-gateway')]
deps += [d.name() for d in self.cache.get_daemons_by_service('oauth2-proxy')]
- security_enabled, _, _ = self._get_security_config()
- if security_enabled:
- if prometheus_user and prometheus_password:
- deps.append(f'{hash(prometheus_user + prometheus_password)}')
- if alertmanager_user and alertmanager_password:
- deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
+ if prom_cred_hash is not None:
+ deps.append(prom_cred_hash)
+ if alertmgr_cred_hash is not None:
+ deps.append(alertmgr_cred_hash)
elif daemon_type == 'grafana':
deps += get_daemon_names(['prometheus', 'loki', 'mgmt-gateway', 'oauth2-proxy'])
- security_enabled, _, _ = self._get_security_config()
- if security_enabled and prometheus_user and prometheus_password:
- deps.append(f'{hash(prometheus_user + prometheus_password)}')
+ if prom_cred_hash is not None:
+ deps.append(prom_cred_hash)
elif daemon_type == 'alertmanager':
- deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway', 'mgmt-gateway', 'oauth2-proxy'])
- security_enabled, _, _ = self._get_security_config()
- if security_enabled and alertmanager_user and alertmanager_password:
- deps.append(f'{hash(alertmanager_user + alertmanager_password)}')
+ deps += get_daemon_names(['alertmanager', 'snmp-gateway', 'mgmt-gateway', 'oauth2-proxy'])
+ if not mgmt_gw_enabled:
+ deps += get_daemon_names(['mgr'])
+ if alertmgr_cred_hash is not None:
+ deps.append(alertmgr_cred_hash)
elif daemon_type == 'promtail':
deps += get_daemon_names(['loki'])
elif daemon_type in ['ceph-exporter', 'node-exporter']:
@@ -3095,9 +3040,7 @@ Then run the following:
deps.append(build_url(host=dd.hostname, port=port).lstrip('/'))
deps = sorted(deps)
elif daemon_type == 'mgmt-gateway':
- # url_prefix for monitoring daemons depends on the presence of mgmt-gateway
- # while dashboard urls depend on the mgr daemons
- deps += get_daemon_names(['mgr', 'grafana', 'prometheus', 'alertmanager', 'oauth2-proxy'])
+ deps = MgmtGatewayService.get_dependencies(self)
else:
# this daemon type doesn't need deps mgmt
pass
@@ -3468,6 +3411,33 @@ Then run the following:
return f'Added setting {setting} with value {value} to tuned profile {profile_name}'
@handle_orch_error
+ def tuned_profile_add_settings(self, profile_name: str, settings: dict) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f"Tuned profile {profile_name} does not exist. Cannot add setting."
+ )
+ self.tuned_profiles.add_settings(profile_name, settings)
+ results = [
+ f"Added setting {key} with value {value} to tuned profile {profile_name}"
+ for key, value in settings.items()
+ ]
+ self._kick_serve_loop()
+ return "\n".join(results)
+
+ @handle_orch_error
+ def tuned_profile_rm_settings(self, profile_name: str, settings: List[str]) -> str:
+ if profile_name not in self.tuned_profiles:
+ raise OrchestratorError(
+ f"Tuned profile {profile_name} does not exist. Cannot remove setting."
+ )
+ self.tuned_profiles.rm_settings(profile_name, settings)
+ results = [
+ f'Removed setting {settings} from tuned profile {profile_name}'
+ ]
+ self._kick_serve_loop()
+ return "\n".join(results)
+
+ @handle_orch_error
def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str:
if profile_name not in self.tuned_profiles:
raise OrchestratorError(
@@ -3610,7 +3580,12 @@ Then run the following:
return "Scheduled %s update..." % spec.service_name()
@handle_orch_error
- def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]:
+ def apply(
+ self,
+ specs: Sequence[GenericSpec],
+ no_overwrite: bool = False,
+ continue_on_error: bool = True
+ ) -> List[str]:
results = []
for spec in specs:
if no_overwrite:
@@ -3622,7 +3597,14 @@ Then run the following:
results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag'
% (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name()))
continue
- results.append(self._apply(spec))
+ try:
+ res = self._apply(spec)
+ results.append(res)
+ except Exception as e:
+ if continue_on_error:
+ results.append(f'Failed to apply spec for {spec}: {str(e)}')
+ else:
+ raise e
return results
@handle_orch_error
@@ -3829,8 +3811,55 @@ Then run the following:
return self.upgrade.upgrade_stop()
@handle_orch_error
+ def replace_device(self,
+ hostname: str,
+ device: str,
+ clear: bool = False,
+ yes_i_really_mean_it: bool = False) -> Any:
+ output: str = ''
+
+ self.ceph_volume.lvm_list.get_data(hostname=hostname)
+
+ if clear:
+ output = self.ceph_volume.clear_replace_header(hostname, device)
+ else:
+ osds_to_zap: List[str] = []
+ if hostname not in list(self.inventory.keys()):
+ raise OrchestratorError(f'{hostname} invalid host.')
+
+ if device not in self.ceph_volume.lvm_list.all_devices():
+ raise OrchestratorError(f"{device} doesn't appear to be used for an OSD, not a valid device in {hostname}.")
+
+ device_osd_mapping = self.ceph_volume.lvm_list.device_osd_mapping()
+ osds_to_zap = device_osd_mapping[device]['osd_ids']
+
+ if self.ceph_volume.lvm_list.is_shared_device(device):
+ if not yes_i_really_mean_it:
+ raise OrchestratorError(f'{device} is a shared device.\n'
+ f'Replacing {device} implies destroying OSD(s): {osds_to_zap}.\n'
+ 'Please, *be very careful*, this can be a very dangerous operation.\n'
+ 'If you know what you are doing, pass --yes-i-really-mean-it')
+ if not self.to_remove_osds.rm_util.safe_to_destroy([int(osd_id) for osd_id in osds_to_zap]):
+ raise OrchestratorError(f"Destroying OSD(s) {osds_to_zap} would cause some PGs to be undersized/degraded.\n"
+ 'Refusing to proceed.')
+ replace_block: bool = self.ceph_volume.lvm_list.is_block_device(device)
+ replace_db: bool = self.ceph_volume.lvm_list.is_db_device(device)
+ replace_wal: bool = self.ceph_volume.lvm_list.is_wal_device(device)
+
+ self.remove_osds(list(osds_to_zap),
+ replace_block=replace_block,
+ replace_db=replace_db,
+ replace_wal=replace_wal)
+
+ output = f'Scheduled to destroy osds: {osds_to_zap} and mark {device} as being replaced.'
+ return output
+
+ @handle_orch_error
def remove_osds(self, osd_ids: List[str],
replace: bool = False,
+ replace_block: bool = False,
+ replace_db: bool = False,
+ replace_wal: bool = False,
force: bool = False,
zap: bool = False,
no_destroy: bool = False) -> str:
@@ -3853,6 +3882,9 @@ Then run the following:
try:
self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id),
replace=replace,
+ replace_block=replace_block,
+ replace_db=replace_db,
+ replace_wal=replace_wal,
force=force,
zap=zap,
no_destroy=no_destroy,
@@ -3893,6 +3925,51 @@ Then run the following:
return self.to_remove_osds.all_osds()
@handle_orch_error
+ def set_osd_spec(self, service_name: str, osd_ids: List[str]) -> str:
+ """
+ Update unit.meta file for osd with service name
+ """
+ if service_name not in self.spec_store:
+ raise OrchestratorError(f"Cannot find service '{service_name}' in the inventory. "
+ "Please try again after applying an OSD service that matches "
+ "the service name to which you want to attach OSDs.")
+
+ daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd')
+ update_osd = defaultdict(list)
+ for daemon in daemons:
+ if daemon.daemon_id in osd_ids and daemon.hostname:
+ update_osd[daemon.hostname].append(daemon.daemon_id)
+
+ if not update_osd:
+ raise OrchestratorError(f"Unable to find OSDs: {osd_ids}")
+
+ failed_osds = []
+ success_osds = []
+ for host in update_osd:
+ osds = ",".join(update_osd[host])
+ # run cephadm command with all host osds on specific host,
+ # if it fails, continue with other hosts
+ try:
+ with self.async_timeout_handler(host):
+ outs, errs, _code = self.wait_async(
+ CephadmServe(self)._run_cephadm(host,
+ cephadmNoImage,
+ 'update-osd-service',
+ ['--service-name', service_name, '--osd-ids', osds]))
+ if _code:
+ self.log.error(f"Failed to update service for {osds} osd. Cephadm error: {errs}")
+ failed_osds.extend(update_osd[host])
+ else:
+ success_osds.extend(update_osd[host])
+ except Exception:
+ self.log.exception(f"Failed to set service name for {osds}")
+ failed_osds.extend(update_osd[host])
+ self.cache.invalidate_host_daemons(host)
+ self._kick_serve_loop()
+ return f"Updated service for osd {','.join(success_osds)}" + (f" and failed for {','.join(failed_osds)}" if failed_osds else "")
+
+ @handle_orch_error
+ @host_exists()
def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str:
"""
Drain all daemons from a host.