diff options
Diffstat (limited to 'src/pybind/mgr/cephadm/module.py')
-rw-r--r-- | src/pybind/mgr/cephadm/module.py | 435 |
1 files changed, 256 insertions, 179 deletions
diff --git a/src/pybind/mgr/cephadm/module.py b/src/pybind/mgr/cephadm/module.py index f8f0efc9d28..6690153d435 100644 --- a/src/pybind/mgr/cephadm/module.py +++ b/src/pybind/mgr/cephadm/module.py @@ -30,6 +30,7 @@ import multiprocessing.pool import subprocess from prettytable import PrettyTable +from ceph.cephadm.images import DefaultImages from ceph.deployment import inventory from ceph.deployment.drive_group import DriveGroupSpec from ceph.deployment.service_spec import \ @@ -101,6 +102,7 @@ from .utils import CEPH_IMAGE_TYPES, RESCHEDULE_FROM_OFFLINE_HOSTS_TYPES, forall from .configchecks import CephadmConfigChecks from .offline_watcher import OfflineHostWatcher from .tuned_profiles import TunedProfileUtils +from .ceph_volume import CephVolume try: import asyncssh @@ -129,28 +131,7 @@ def os_exit_noop(status: int) -> None: os._exit = os_exit_noop # type: ignore - -# Default container images ----------------------------------------------------- DEFAULT_IMAGE = 'quay.io/ceph/ceph' -DEFAULT_PROMETHEUS_IMAGE = 'quay.io/prometheus/prometheus:v2.51.0' -DEFAULT_NODE_EXPORTER_IMAGE = 'quay.io/prometheus/node-exporter:v1.7.0' -DEFAULT_NVMEOF_IMAGE = 'quay.io/ceph/nvmeof:1.2.17' -DEFAULT_LOKI_IMAGE = 'quay.io/ceph/loki:3.0.0' -DEFAULT_PROMTAIL_IMAGE = 'quay.io/ceph/promtail:3.0.0' -DEFAULT_ALERT_MANAGER_IMAGE = 'quay.io/prometheus/alertmanager:v0.27.0' -DEFAULT_GRAFANA_IMAGE = 'quay.io/ceph/grafana:10.4.8' -DEFAULT_HAPROXY_IMAGE = 'quay.io/ceph/haproxy:2.3' -DEFAULT_KEEPALIVED_IMAGE = 'quay.io/ceph/keepalived:2.2.4' -DEFAULT_SNMP_GATEWAY_IMAGE = 'quay.io/ceph/snmp-notifier:v1.2.1' -DEFAULT_ELASTICSEARCH_IMAGE = 'quay.io/omrizeneva/elasticsearch:6.8.23' -DEFAULT_JAEGER_COLLECTOR_IMAGE = 'quay.io/jaegertracing/jaeger-collector:1.29' -DEFAULT_JAEGER_AGENT_IMAGE = 'quay.io/jaegertracing/jaeger-agent:1.29' -DEFAULT_NGINX_IMAGE = 'quay.io/ceph/nginx:sclorg-nginx-126' -DEFAULT_OAUTH2_PROXY_IMAGE = 'quay.io/oauth2-proxy/oauth2-proxy:v7.6.0' -DEFAULT_JAEGER_QUERY_IMAGE = 'quay.io/jaegertracing/jaeger-query:1.29' -DEFAULT_SAMBA_IMAGE = 'quay.io/samba.org/samba-server:devbuilds-centos-amd64' -DEFAULT_SAMBA_METRICS_IMAGE = 'quay.io/samba.org/samba-metrics:latest' -# ------------------------------------------------------------------------------ def host_exists(hostname_position: int = 1) -> Callable: @@ -237,96 +218,6 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, runtime=True, ), Option( - 'container_image_prometheus', - default=DEFAULT_PROMETHEUS_IMAGE, - desc='Prometheus container image', - ), - Option( - 'container_image_nvmeof', - default=DEFAULT_NVMEOF_IMAGE, - desc='Nvme-of container image', - ), - Option( - 'container_image_grafana', - default=DEFAULT_GRAFANA_IMAGE, - desc='Prometheus container image', - ), - Option( - 'container_image_alertmanager', - default=DEFAULT_ALERT_MANAGER_IMAGE, - desc='Prometheus container image', - ), - Option( - 'container_image_node_exporter', - default=DEFAULT_NODE_EXPORTER_IMAGE, - desc='Prometheus container image', - ), - Option( - 'container_image_loki', - default=DEFAULT_LOKI_IMAGE, - desc='Loki container image', - ), - Option( - 'container_image_promtail', - default=DEFAULT_PROMTAIL_IMAGE, - desc='Promtail container image', - ), - Option( - 'container_image_haproxy', - default=DEFAULT_HAPROXY_IMAGE, - desc='HAproxy container image', - ), - Option( - 'container_image_keepalived', - default=DEFAULT_KEEPALIVED_IMAGE, - desc='Keepalived container image', - ), - Option( - 'container_image_snmp_gateway', - default=DEFAULT_SNMP_GATEWAY_IMAGE, - desc='SNMP Gateway container image', - ), - Option( - 'container_image_nginx', - default=DEFAULT_NGINX_IMAGE, - desc='Nginx container image', - ), - Option( - 'container_image_oauth2_proxy', - default=DEFAULT_OAUTH2_PROXY_IMAGE, - desc='oauth2-proxy container image', - ), - Option( - 'container_image_elasticsearch', - default=DEFAULT_ELASTICSEARCH_IMAGE, - desc='elasticsearch container image', - ), - Option( - 'container_image_jaeger_agent', - default=DEFAULT_JAEGER_AGENT_IMAGE, - desc='Jaeger agent container image', - ), - Option( - 'container_image_jaeger_collector', - default=DEFAULT_JAEGER_COLLECTOR_IMAGE, - desc='Jaeger collector container image', - ), - Option( - 'container_image_jaeger_query', - default=DEFAULT_JAEGER_QUERY_IMAGE, - desc='Jaeger query container image', - ), - Option( - 'container_image_samba', - default=DEFAULT_SAMBA_IMAGE, - desc='Samba/SMB container image', - ), - Option( - 'container_image_samba_metrics', - default=DEFAULT_SAMBA_METRICS_IMAGE, - desc='Samba/SMB metrics exporter container image', - ), - Option( 'warn_on_stray_hosts', type='bool', default=True, @@ -562,6 +453,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, desc="Default address for RedFish API (oob management)." ), ] + for image in DefaultImages: + MODULE_OPTIONS.append(Option(image.key, default=image.image_ref, desc=image.desc)) def __init__(self, *args: Any, **kwargs: Any): super(CephadmOrchestrator, self).__init__(*args, **kwargs) @@ -792,6 +685,8 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, # as part of the handling of stray daemons self.recently_altered_daemons: Dict[str, datetime.datetime] = {} + self.ceph_volume: CephVolume = CephVolume(self) + def shutdown(self) -> None: self.log.debug('shutdown') self._worker_pool.close() @@ -819,30 +714,33 @@ class CephadmOrchestrator(orchestrator.Orchestrator, MgrModule, security_enabled = self.secure_monitoring_stack or mgmt_gw_enabled return security_enabled, mgmt_gw_enabled, oauth2_proxy_enabled - def get_mgmt_gw_internal_endpoint(self) -> Optional[str]: + def _get_mgmt_gw_endpoint(self, is_internal: bool) -> Optional[str]: mgmt_gw_daemons = self.cache.get_daemons_by_service('mgmt-gateway') if not mgmt_gw_daemons: return None dd = mgmt_gw_daemons[0] assert dd.hostname is not None - mgmt_gw_addr = self.get_fqdn(dd.hostname) - mgmt_gw_internal_endpoint = build_url(scheme='https', host=mgmt_gw_addr, port=MgmtGatewayService.INTERNAL_SERVICE_PORT) - return f'{mgmt_gw_internal_endpoint}/internal' + mgmt_gw_spec = cast(MgmtGatewaySpec, self.spec_store['mgmt-gateway'].spec) + mgmt_gw_addr = mgmt_gw_spec.virtual_ip if mgmt_gw_spec.virtual_ip is not None else self.get_fqdn(dd.hostname) - def get_mgmt_gw_external_endpoint(self) -> Optional[str]: - mgmt_gw_daemons = self.cache.get_daemons_by_service('mgmt-gateway') - if not mgmt_gw_daemons: - return None + if is_internal: + mgmt_gw_port: Optional[int] = MgmtGatewayService.INTERNAL_SERVICE_PORT + protocol = 'https' + endpoint_suffix = '/internal' + else: + mgmt_gw_port = dd.ports[0] if dd.ports else None + protocol = 'http' if mgmt_gw_spec.disable_https else 'https' + endpoint_suffix = '' - dd = mgmt_gw_daemons[0] - assert dd.hostname is not None - mgmt_gw_port = dd.ports[0] if dd.ports else None - mgmt_gw_addr = self.get_fqdn(dd.hostname) - mgmt_gw_spec = cast(MgmtGatewaySpec, self.spec_store['mgmt-gateway'].spec) - protocol = 'http' if mgmt_gw_spec.disable_https else 'https' - mgmt_gw_external_endpoint = build_url(scheme=protocol, host=mgmt_gw_addr, port=mgmt_gw_port) - return mgmt_gw_external_endpoint + mgmt_gw_endpoint = build_url(scheme=protocol, host=mgmt_gw_addr, port=mgmt_gw_port) + return f'{mgmt_gw_endpoint}{endpoint_suffix}' + + def get_mgmt_gw_internal_endpoint(self) -> Optional[str]: + return self._get_mgmt_gw_endpoint(is_internal=True) + + def get_mgmt_gw_external_endpoint(self) -> Optional[str]: + return self._get_mgmt_gw_endpoint(is_internal=False) def _get_cephadm_binary_path(self) -> str: import hashlib @@ -1897,7 +1795,7 @@ Then run the following: self.inventory.add_host(spec) self.offline_hosts_remove(spec.hostname) if spec.status == 'maintenance': - self._set_maintenance_healthcheck() + self.set_maintenance_healthcheck() self.event.set() # refresh stray health check self.log.info('Added host %s' % spec.hostname) return "Added host '{}' with addr '{}'".format(spec.hostname, spec.addr) @@ -2068,6 +1966,7 @@ Then run the following: self.ssh.reset_con(host) # if host was in offline host list, we should remove it now. self.offline_hosts_remove(host) + self.set_maintenance_healthcheck() self.event.set() # refresh stray health check self.log.info('Removed host %s' % host) return "Removed {} host '{}'".format('offline' if offline else '', host) @@ -2182,7 +2081,7 @@ Then run the following: self.log.info(msg) return msg - def _set_maintenance_healthcheck(self) -> None: + def set_maintenance_healthcheck(self) -> None: """Raise/update or clear the maintenance health check as needed""" in_maintenance = self.inventory.get_host_with_state("maintenance") @@ -2266,12 +2165,12 @@ Then run the following: self.inventory._inventory[hostname] = tgt_host self.inventory.save() - self._set_maintenance_healthcheck() + self.set_maintenance_healthcheck() return f'Daemons for Ceph cluster {self._cluster_fsid} stopped on host {hostname}. Host {hostname} moved to maintenance mode' @handle_orch_error @host_exists() - def exit_host_maintenance(self, hostname: str) -> str: + def exit_host_maintenance(self, hostname: str, force: bool = False, offline: bool = False) -> str: """Exit maintenance mode and return a host to an operational state Returning from maintenance will enable the clusters systemd target and @@ -2279,6 +2178,8 @@ Then run the following: host has osd daemons :param hostname: (str) host name + :param force: (bool) force removal of the host from maintenance mode + :param offline: (bool) to remove hosts that are offline from maintenance mode :raises OrchestratorError: Unable to return from maintenance, or unset the noout flag @@ -2287,37 +2188,74 @@ Then run the following: if tgt_host['status'] != "maintenance": raise OrchestratorError(f"Host {hostname} is not in maintenance mode") - with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'): - outs, errs, _code = self.wait_async( - CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, - 'host-maintenance', ['exit'], error_ok=True)) - returned_msg = errs[0].split('\n')[-1] - if returned_msg.startswith('failed') or returned_msg.startswith('ERROR'): - raise OrchestratorError( - f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") - - if "osd" in self.cache.get_daemon_types(hostname): - crush_node = hostname if '.' not in hostname else hostname.split('.')[0] - rc, _out, _err = self.mon_command({ - 'prefix': 'osd unset-group', - 'flags': 'noout', - 'who': [crush_node], - 'format': 'json' - }) - if rc: + # Given we do not regularly check maintenance mode hosts for being offline, + # we have no idea at this point whether the host is online or not. + # Keep in mind this goes both ways, as users could have run + # "ceph cephadm check-host <hostname>" when the host was in maintenance + # mode and offline and the host could have since come online. This following + # "cephadm check-host" command is being run purely so we know if the host + # is online or offline, as those should be handled differently + try: + with self.async_timeout_handler(hostname, 'cephadm check-host'): + outs, errs, _code = self.wait_async( + CephadmServe(self)._run_cephadm( + hostname, cephadmNoImage, + 'check-host', [], error_ok=False + ) + ) + except OrchestratorError: + pass + + host_offline = hostname in self.offline_hosts + + if host_offline and not offline: + raise OrchestratorValidationError( + f'{hostname} is offline, please use --offline and --force to take this host out of maintenance mode') + + if not host_offline and offline: + raise OrchestratorValidationError( + f'{hostname} is online, please take host out of maintenance mode without --offline.') + + if offline and not force: + raise OrchestratorValidationError("Taking an offline host out of maintenance mode requires --force") + + # no point trying these parts if we know the host is offline + if not host_offline: + with self.async_timeout_handler(hostname, 'cephadm host-maintenance exit'): + outs, errs, _code = self.wait_async( + CephadmServe(self)._run_cephadm(hostname, cephadmNoImage, + 'host-maintenance', ['exit'], error_ok=True)) + returned_msg = errs[0].split('\n')[-1] + if (returned_msg.startswith('failed') or returned_msg.startswith('ERROR')): self.log.warning( - f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})") - raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})") - else: - self.log.info( - f"exit maintenance request has UNSET for the noout group on host {hostname}") + f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") + if not force: + raise OrchestratorError( + f"Failed to exit maintenance state for host {hostname}, cluster {self._cluster_fsid}") + + if "osd" in self.cache.get_daemon_types(hostname): + crush_node = hostname if '.' not in hostname else hostname.split('.')[0] + rc, _out, _err = self.mon_command({ + 'prefix': 'osd unset-group', + 'flags': 'noout', + 'who': [crush_node], + 'format': 'json' + }) + if rc: + self.log.warning( + f"exit maintenance request failed to UNSET the noout group for {hostname}, (rc={rc})") + if not force: + raise OrchestratorError(f"Unable to set the osds on {hostname} to noout (rc={rc})") + else: + self.log.info( + f"exit maintenance request has UNSET for the noout group on host {hostname}") # update the host record status tgt_host['status'] = "" self.inventory._inventory[hostname] = tgt_host self.inventory.save() - self._set_maintenance_healthcheck() + self.set_maintenance_healthcheck() return f"Ceph cluster {self._cluster_fsid} on {hostname} has exited maintenance mode" @@ -2522,7 +2460,7 @@ Then run the following: @handle_orch_error def service_action(self, action: str, service_name: str) -> List[str]: - if service_name not in self.spec_store.all_specs.keys(): + if service_name not in self.spec_store.all_specs.keys() and service_name != 'osd': raise OrchestratorError(f'Invalid service name "{service_name}".' + ' View currently running services using "ceph orch ls"') dds: List[DaemonDescription] = self.cache.get_daemons_by_service(service_name) @@ -3001,8 +2939,16 @@ Then run the following: daemon_names.append(dd.name()) return daemon_names - alertmanager_user, alertmanager_password = self._get_alertmanager_credentials() - prometheus_user, prometheus_password = self._get_prometheus_credentials() + prom_cred_hash = None + alertmgr_cred_hash = None + security_enabled, mgmt_gw_enabled, _ = self._get_security_config() + if security_enabled: + alertmanager_user, alertmanager_password = self._get_alertmanager_credentials() + prometheus_user, prometheus_password = self._get_prometheus_credentials() + if prometheus_user and prometheus_password: + prom_cred_hash = f'{utils.md5_hash(prometheus_user + prometheus_password)}' + if alertmanager_user and alertmanager_password: + alertmgr_cred_hash = f'{utils.md5_hash(alertmanager_user + alertmanager_password)}' deps = [] if daemon_type == 'haproxy': @@ -3049,9 +2995,10 @@ Then run the following: else: deps = [self.get_mgr_ip()] elif daemon_type == 'prometheus': - # for prometheus we add the active mgr as an explicit dependency, - # this way we force a redeploy after a mgr failover - deps.append(self.get_active_mgr().name()) + if not mgmt_gw_enabled: + # for prometheus we add the active mgr as an explicit dependency, + # this way we force a redeploy after a mgr failover + deps.append(self.get_active_mgr().name()) deps.append(str(self.get_module_option_ex('prometheus', 'server_port', 9283))) deps.append(str(self.service_discovery_port)) # prometheus yaml configuration file (generated by prometheus.yml.j2) contains @@ -3068,22 +3015,20 @@ Then run the following: deps += [d.name() for d in self.cache.get_daemons_by_service('ceph-exporter')] deps += [d.name() for d in self.cache.get_daemons_by_service('mgmt-gateway')] deps += [d.name() for d in self.cache.get_daemons_by_service('oauth2-proxy')] - security_enabled, _, _ = self._get_security_config() - if security_enabled: - if prometheus_user and prometheus_password: - deps.append(f'{hash(prometheus_user + prometheus_password)}') - if alertmanager_user and alertmanager_password: - deps.append(f'{hash(alertmanager_user + alertmanager_password)}') + if prom_cred_hash is not None: + deps.append(prom_cred_hash) + if alertmgr_cred_hash is not None: + deps.append(alertmgr_cred_hash) elif daemon_type == 'grafana': deps += get_daemon_names(['prometheus', 'loki', 'mgmt-gateway', 'oauth2-proxy']) - security_enabled, _, _ = self._get_security_config() - if security_enabled and prometheus_user and prometheus_password: - deps.append(f'{hash(prometheus_user + prometheus_password)}') + if prom_cred_hash is not None: + deps.append(prom_cred_hash) elif daemon_type == 'alertmanager': - deps += get_daemon_names(['mgr', 'alertmanager', 'snmp-gateway', 'mgmt-gateway', 'oauth2-proxy']) - security_enabled, _, _ = self._get_security_config() - if security_enabled and alertmanager_user and alertmanager_password: - deps.append(f'{hash(alertmanager_user + alertmanager_password)}') + deps += get_daemon_names(['alertmanager', 'snmp-gateway', 'mgmt-gateway', 'oauth2-proxy']) + if not mgmt_gw_enabled: + deps += get_daemon_names(['mgr']) + if alertmgr_cred_hash is not None: + deps.append(alertmgr_cred_hash) elif daemon_type == 'promtail': deps += get_daemon_names(['loki']) elif daemon_type in ['ceph-exporter', 'node-exporter']: @@ -3095,9 +3040,7 @@ Then run the following: deps.append(build_url(host=dd.hostname, port=port).lstrip('/')) deps = sorted(deps) elif daemon_type == 'mgmt-gateway': - # url_prefix for monitoring daemons depends on the presence of mgmt-gateway - # while dashboard urls depend on the mgr daemons - deps += get_daemon_names(['mgr', 'grafana', 'prometheus', 'alertmanager', 'oauth2-proxy']) + deps = MgmtGatewayService.get_dependencies(self) else: # this daemon type doesn't need deps mgmt pass @@ -3468,6 +3411,33 @@ Then run the following: return f'Added setting {setting} with value {value} to tuned profile {profile_name}' @handle_orch_error + def tuned_profile_add_settings(self, profile_name: str, settings: dict) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f"Tuned profile {profile_name} does not exist. Cannot add setting." + ) + self.tuned_profiles.add_settings(profile_name, settings) + results = [ + f"Added setting {key} with value {value} to tuned profile {profile_name}" + for key, value in settings.items() + ] + self._kick_serve_loop() + return "\n".join(results) + + @handle_orch_error + def tuned_profile_rm_settings(self, profile_name: str, settings: List[str]) -> str: + if profile_name not in self.tuned_profiles: + raise OrchestratorError( + f"Tuned profile {profile_name} does not exist. Cannot remove setting." + ) + self.tuned_profiles.rm_settings(profile_name, settings) + results = [ + f'Removed setting {settings} from tuned profile {profile_name}' + ] + self._kick_serve_loop() + return "\n".join(results) + + @handle_orch_error def tuned_profile_rm_setting(self, profile_name: str, setting: str) -> str: if profile_name not in self.tuned_profiles: raise OrchestratorError( @@ -3610,7 +3580,12 @@ Then run the following: return "Scheduled %s update..." % spec.service_name() @handle_orch_error - def apply(self, specs: Sequence[GenericSpec], no_overwrite: bool = False) -> List[str]: + def apply( + self, + specs: Sequence[GenericSpec], + no_overwrite: bool = False, + continue_on_error: bool = True + ) -> List[str]: results = [] for spec in specs: if no_overwrite: @@ -3622,7 +3597,14 @@ Then run the following: results.append('Skipped %s service spec. To change %s spec omit --no-overwrite flag' % (cast(ServiceSpec, spec).service_name(), cast(ServiceSpec, spec).service_name())) continue - results.append(self._apply(spec)) + try: + res = self._apply(spec) + results.append(res) + except Exception as e: + if continue_on_error: + results.append(f'Failed to apply spec for {spec}: {str(e)}') + else: + raise e return results @handle_orch_error @@ -3829,8 +3811,55 @@ Then run the following: return self.upgrade.upgrade_stop() @handle_orch_error + def replace_device(self, + hostname: str, + device: str, + clear: bool = False, + yes_i_really_mean_it: bool = False) -> Any: + output: str = '' + + self.ceph_volume.lvm_list.get_data(hostname=hostname) + + if clear: + output = self.ceph_volume.clear_replace_header(hostname, device) + else: + osds_to_zap: List[str] = [] + if hostname not in list(self.inventory.keys()): + raise OrchestratorError(f'{hostname} invalid host.') + + if device not in self.ceph_volume.lvm_list.all_devices(): + raise OrchestratorError(f"{device} doesn't appear to be used for an OSD, not a valid device in {hostname}.") + + device_osd_mapping = self.ceph_volume.lvm_list.device_osd_mapping() + osds_to_zap = device_osd_mapping[device]['osd_ids'] + + if self.ceph_volume.lvm_list.is_shared_device(device): + if not yes_i_really_mean_it: + raise OrchestratorError(f'{device} is a shared device.\n' + f'Replacing {device} implies destroying OSD(s): {osds_to_zap}.\n' + 'Please, *be very careful*, this can be a very dangerous operation.\n' + 'If you know what you are doing, pass --yes-i-really-mean-it') + if not self.to_remove_osds.rm_util.safe_to_destroy([int(osd_id) for osd_id in osds_to_zap]): + raise OrchestratorError(f"Destroying OSD(s) {osds_to_zap} would cause some PGs to be undersized/degraded.\n" + 'Refusing to proceed.') + replace_block: bool = self.ceph_volume.lvm_list.is_block_device(device) + replace_db: bool = self.ceph_volume.lvm_list.is_db_device(device) + replace_wal: bool = self.ceph_volume.lvm_list.is_wal_device(device) + + self.remove_osds(list(osds_to_zap), + replace_block=replace_block, + replace_db=replace_db, + replace_wal=replace_wal) + + output = f'Scheduled to destroy osds: {osds_to_zap} and mark {device} as being replaced.' + return output + + @handle_orch_error def remove_osds(self, osd_ids: List[str], replace: bool = False, + replace_block: bool = False, + replace_db: bool = False, + replace_wal: bool = False, force: bool = False, zap: bool = False, no_destroy: bool = False) -> str: @@ -3853,6 +3882,9 @@ Then run the following: try: self.to_remove_osds.enqueue(OSD(osd_id=int(daemon.daemon_id), replace=replace, + replace_block=replace_block, + replace_db=replace_db, + replace_wal=replace_wal, force=force, zap=zap, no_destroy=no_destroy, @@ -3893,6 +3925,51 @@ Then run the following: return self.to_remove_osds.all_osds() @handle_orch_error + def set_osd_spec(self, service_name: str, osd_ids: List[str]) -> str: + """ + Update unit.meta file for osd with service name + """ + if service_name not in self.spec_store: + raise OrchestratorError(f"Cannot find service '{service_name}' in the inventory. " + "Please try again after applying an OSD service that matches " + "the service name to which you want to attach OSDs.") + + daemons: List[orchestrator.DaemonDescription] = self.cache.get_daemons_by_type('osd') + update_osd = defaultdict(list) + for daemon in daemons: + if daemon.daemon_id in osd_ids and daemon.hostname: + update_osd[daemon.hostname].append(daemon.daemon_id) + + if not update_osd: + raise OrchestratorError(f"Unable to find OSDs: {osd_ids}") + + failed_osds = [] + success_osds = [] + for host in update_osd: + osds = ",".join(update_osd[host]) + # run cephadm command with all host osds on specific host, + # if it fails, continue with other hosts + try: + with self.async_timeout_handler(host): + outs, errs, _code = self.wait_async( + CephadmServe(self)._run_cephadm(host, + cephadmNoImage, + 'update-osd-service', + ['--service-name', service_name, '--osd-ids', osds])) + if _code: + self.log.error(f"Failed to update service for {osds} osd. Cephadm error: {errs}") + failed_osds.extend(update_osd[host]) + else: + success_osds.extend(update_osd[host]) + except Exception: + self.log.exception(f"Failed to set service name for {osds}") + failed_osds.extend(update_osd[host]) + self.cache.invalidate_host_daemons(host) + self._kick_serve_loop() + return f"Updated service for osd {','.join(success_osds)}" + (f" and failed for {','.join(failed_osds)}" if failed_osds else "") + + @handle_orch_error + @host_exists() def drain_host(self, hostname: str, force: bool = False, keep_conf_keyring: bool = False, zap_osd_devices: bool = False) -> str: """ Drain all daemons from a host. |