diff options
Diffstat (limited to 'src/pybind/mgr/cephadm/inventory.py')
-rw-r--r-- | src/pybind/mgr/cephadm/inventory.py | 138 |
1 files changed, 99 insertions, 39 deletions
diff --git a/src/pybind/mgr/cephadm/inventory.py b/src/pybind/mgr/cephadm/inventory.py index 9d8816aa60f..f309504155f 100644 --- a/src/pybind/mgr/cephadm/inventory.py +++ b/src/pybind/mgr/cephadm/inventory.py @@ -21,6 +21,10 @@ SPEC_STORE_PREFIX = "spec." class Inventory: + """ + The inventory stores a HostSpec for all hosts persistently. + """ + def __init__(self, mgr: 'CephadmOrchestrator'): self.mgr = mgr # load inventory @@ -37,25 +41,25 @@ class Inventory: def __contains__(self, host: str) -> bool: return host in self._inventory - def assert_host(self, host): + def assert_host(self, host: str) -> None: if host not in self._inventory: raise OrchestratorError('host %s does not exist' % host) - def add_host(self, spec: HostSpec): + def add_host(self, spec: HostSpec) -> None: self._inventory[spec.hostname] = spec.to_json() self.save() - def rm_host(self, host: str): + def rm_host(self, host: str) -> None: self.assert_host(host) del self._inventory[host] self.save() - def set_addr(self, host, addr): + def set_addr(self, host: str, addr: str) -> None: self.assert_host(host) self._inventory[host]['addr'] = addr self.save() - def add_label(self, host, label): + def add_label(self, host: str, label: str) -> None: self.assert_host(host) if 'labels' not in self._inventory[host]: @@ -64,7 +68,7 @@ class Inventory: self._inventory[host]['labels'].append(label) self.save() - def rm_label(self, host, label): + def rm_label(self, host: str, label: str) -> None: self.assert_host(host) if 'labels' not in self._inventory[host]: @@ -73,7 +77,7 @@ class Inventory: self._inventory[host]['labels'].remove(label) self.save() - def get_addr(self, host) -> str: + def get_addr(self, host: str) -> str: self.assert_host(host) return self._inventory[host].get('addr', host) @@ -85,7 +89,7 @@ class Inventory: else: yield h - def spec_from_dict(self, info) -> HostSpec: + def spec_from_dict(self, info: dict) -> HostSpec: hostname = info['hostname'] return HostSpec( hostname, @@ -97,7 +101,7 @@ class Inventory: def all_specs(self) -> List[HostSpec]: return list(map(self.spec_from_dict, self._inventory.values())) - def save(self): + def save(self) -> None: self.mgr.set_store('inventory', json.dumps(self._inventory)) @@ -164,12 +168,44 @@ class SpecStore(): class HostCache(): + """ + HostCache stores different things: + + 1. `daemons`: Deployed daemons O(daemons) + + They're part of the configuration nowadays and need to be + persistent. The name "daemon cache" is unfortunately a bit misleading. + Like for example we really need to know where daemons are deployed on + hosts that are offline. + + 2. `devices`: ceph-volume inventory cache O(hosts) + + As soon as this is populated, it becomes more or less read-only. + + 3. `networks`: network interfaces for each host. O(hosts) + + This is needed in order to deploy MONs. As this is mostly read-only. + + 4. `last_etc_ceph_ceph_conf` O(hosts) + + Stores the last refresh time for the /etc/ceph/ceph.conf. Used + to avoid deploying new configs when failing over to a new mgr. + + 5. `scheduled_daemon_actions`: O(daemons) + + Used to run daemon actions after deploying a daemon. We need to + store it persistently, in order to stay consistent across + MGR failovers. + """ + def __init__(self, mgr): # type: (CephadmOrchestrator) -> None self.mgr: CephadmOrchestrator = mgr self.daemons = {} # type: Dict[str, Dict[str, orchestrator.DaemonDescription]] self.last_daemon_update = {} # type: Dict[str, datetime.datetime] self.devices = {} # type: Dict[str, List[inventory.Device]] + self.facts = {} # type: Dict[str, Dict[str, Any]] + self.last_facts_update = {} # type: Dict[str, datetime.datetime] self.osdspec_previews = {} # type: Dict[str, List[Dict[str, Any]]] self.networks = {} # type: Dict[str, Dict[str, List[str]]] self.last_device_update = {} # type: Dict[str, datetime.datetime] @@ -244,13 +280,18 @@ class HostCache(): self.daemons[host] = dm self.last_daemon_update[host] = datetime.datetime.utcnow() + def update_host_facts(self, host, facts): + # type: (str, Dict[str, Dict[str, Any]]) -> None + self.facts[host] = facts + self.last_facts_update[host] = datetime.datetime.utcnow() + def update_host_devices_networks(self, host, dls, nets): # type: (str, List[inventory.Device], Dict[str,List[str]]) -> None self.devices[host] = dls self.networks[host] = nets self.last_device_update[host] = datetime.datetime.utcnow() - def update_daemon_config_deps(self, host, name, deps, stamp): + def update_daemon_config_deps(self, host: str, name: str, deps: List[str], stamp: datetime.datetime) -> None: self.daemon_config_deps[host][name] = { 'deps': deps, 'last_config': stamp, @@ -289,7 +330,7 @@ class HostCache(): del self.last_device_update[host] self.mgr.event.set() - def distribute_new_registry_login_info(self): + def distribute_new_registry_login_info(self) -> None: self.registry_login_queue = set(self.mgr.inventory.keys()) def save_host(self, host: str) -> None: @@ -303,17 +344,21 @@ class HostCache(): j['last_daemon_update'] = datetime_to_str(self.last_daemon_update[host]) if host in self.last_device_update: j['last_device_update'] = datetime_to_str(self.last_device_update[host]) - for name, dd in self.daemons[host].items(): - j['daemons'][name] = dd.to_json() - for d in self.devices[host]: - j['devices'].append(d.to_json()) - j['networks'] = self.networks[host] - for name, depi in self.daemon_config_deps[host].items(): - j['daemon_config_deps'][name] = { - 'deps': depi.get('deps', []), - 'last_config': datetime_to_str(depi['last_config']), - } - if self.osdspec_previews[host]: + if host in self.daemons: + for name, dd in self.daemons[host].items(): + j['daemons'][name] = dd.to_json() + if host in self.devices: + for d in self.devices[host]: + j['devices'].append(d.to_json()) + if host in self.networks: + j['networks'] = self.networks[host] + if host in self.daemon_config_deps: + for name, depi in self.daemon_config_deps[host].items(): + j['daemon_config_deps'][name] = { + 'deps': depi.get('deps', []), + 'last_config': datetime_to_str(depi['last_config']), + } + if host in self.osdspec_previews and self.osdspec_previews[host]: j['osdspec_previews'] = self.osdspec_previews[host] if host in self.last_host_check: @@ -321,7 +366,7 @@ class HostCache(): if host in self.last_etc_ceph_ceph_conf: j['last_etc_ceph_ceph_conf'] = datetime_to_str(self.last_etc_ceph_ceph_conf[host]) - if self.scheduled_daemon_actions.get(host, {}): + if host in self.scheduled_daemon_actions: j['scheduled_daemon_actions'] = self.scheduled_daemon_actions[host] self.mgr.set_store(HOST_CACHE_PREFIX + host, json.dumps(j)) @@ -332,6 +377,10 @@ class HostCache(): del self.daemons[host] if host in self.devices: del self.devices[host] + if host in self.facts: + del self.facts[host] + if host in self.last_facts_update: + del self.last_facts_update[host] if host in self.osdspec_previews: del self.osdspec_previews[host] if host in self.loading_osdspec_preview: @@ -371,7 +420,7 @@ class HostCache(): raise orchestrator.OrchestratorError(f'Unable to find {daemon_name} daemon(s)') def get_daemons_with_volatile_status(self) -> Iterator[Tuple[str, Dict[str, orchestrator.DaemonDescription]]]: - def alter(host, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: + def alter(host: str, dd_orig: orchestrator.DaemonDescription) -> orchestrator.DaemonDescription: dd = copy(dd_orig) if host in self.mgr.offline_hosts: dd.status = -1 @@ -408,7 +457,7 @@ class HostCache(): r.append(name) return r - def get_daemon_last_config_deps(self, host, name) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: + def get_daemon_last_config_deps(self, host: str, name: str) -> Tuple[Optional[List[str]], Optional[datetime.datetime]]: if host in self.daemon_config_deps: if name in self.daemon_config_deps[host]: return self.daemon_config_deps[host][name].get('deps', []), \ @@ -429,6 +478,17 @@ class HostCache(): return True return False + def host_needs_facts_refresh(self, host): + # type: (str) -> bool + if host in self.mgr.offline_hosts: + logger.debug(f'Host "{host}" marked as offline. Skipping gather facts refresh') + return False + cutoff = datetime.datetime.utcnow() - datetime.timedelta( + seconds=self.mgr.facts_cache_timeout) + if host not in self.last_facts_update or self.last_facts_update[host] < cutoff: + return True + return False + def host_had_daemon_refresh(self, host: str) -> bool: """ ... at least once. @@ -453,7 +513,7 @@ class HostCache(): return True return False - def host_needs_osdspec_preview_refresh(self, host): + def host_needs_osdspec_preview_refresh(self, host: str) -> bool: if host in self.mgr.offline_hosts: logger.debug(f'Host "{host}" marked as offline. Skipping osdspec preview refresh') return False @@ -470,7 +530,7 @@ class HostCache(): seconds=self.mgr.host_check_interval) return host not in self.last_host_check or self.last_host_check[host] < cutoff - def host_needs_new_etc_ceph_ceph_conf(self, host: str): + def host_needs_new_etc_ceph_ceph_conf(self, host: str) -> bool: if not self.mgr.manage_etc_ceph_ceph_conf: return False if self.mgr.paused: @@ -488,7 +548,7 @@ class HostCache(): # already up to date: return False - def update_last_etc_ceph_ceph_conf(self, host: str): + def update_last_etc_ceph_ceph_conf(self, host: str) -> None: if not self.mgr.last_monmap: return self.last_etc_ceph_ceph_conf[host] = datetime.datetime.utcnow() @@ -506,12 +566,12 @@ class HostCache(): assert host in self.daemons self.daemons[host][dd.name()] = dd - def rm_daemon(self, host, name): + def rm_daemon(self, host: str, name: str) -> None: if host in self.daemons: if name in self.daemons[host]: del self.daemons[host][name] - def daemon_cache_filled(self): + def daemon_cache_filled(self) -> bool: """ i.e. we have checked the daemons for each hosts at least once. excluding offline hosts. @@ -522,7 +582,7 @@ class HostCache(): return all((self.host_had_daemon_refresh(h) or h in self.mgr.offline_hosts) for h in self.get_hosts()) - def schedule_daemon_action(self, host: str, daemon_name: str, action: str): + def schedule_daemon_action(self, host: str, daemon_name: str, action: str) -> None: priorities = { 'start': 1, 'restart': 2, @@ -540,14 +600,14 @@ class HostCache(): self.scheduled_daemon_actions[host] = {} self.scheduled_daemon_actions[host][daemon_name] = action - def rm_scheduled_daemon_action(self, host: str, daemon_name: str): + def rm_scheduled_daemon_action(self, host: str, daemon_name: str) -> None: if host in self.scheduled_daemon_actions: if daemon_name in self.scheduled_daemon_actions[host]: del self.scheduled_daemon_actions[host][daemon_name] if not self.scheduled_daemon_actions[host]: del self.scheduled_daemon_actions[host] - def get_scheduled_daemon_action(self, host, daemon) -> Optional[str]: + def get_scheduled_daemon_action(self, host: str, daemon: str) -> Optional[str]: return self.scheduled_daemon_actions.get(host, {}).get(daemon) @@ -571,12 +631,12 @@ class EventStore(): # limit to five events for now. self.events[event.kind_subject()] = self.events[event.kind_subject()][-5:] - def for_service(self, spec: ServiceSpec, level, message) -> None: + def for_service(self, spec: ServiceSpec, level: str, message: str) -> None: e = OrchestratorEvent(datetime.datetime.utcnow(), 'service', spec.service_name(), level, message) self.add(e) - def from_orch_error(self, e: OrchestratorError): + def from_orch_error(self, e: OrchestratorError) -> None: if e.event_subject is not None: self.add(OrchestratorEvent( datetime.datetime.utcnow(), @@ -586,11 +646,11 @@ class EventStore(): str(e) )) - def for_daemon(self, daemon_name, level, message): + def for_daemon(self, daemon_name: str, level: str, message: str) -> None: e = OrchestratorEvent(datetime.datetime.utcnow(), 'daemon', daemon_name, level, message) self.add(e) - def for_daemon_from_exception(self, daemon_name, e: Exception): + def for_daemon_from_exception(self, daemon_name: str, e: Exception) -> None: self.for_daemon( daemon_name, "ERROR", @@ -615,8 +675,8 @@ class EventStore(): for k_s in unknowns: del self.events[k_s] - def get_for_service(self, name) -> List[OrchestratorEvent]: + def get_for_service(self, name: str) -> List[OrchestratorEvent]: return self.events.get('service:' + name, []) - def get_for_daemon(self, name) -> List[OrchestratorEvent]: + def get_for_daemon(self, name: str) -> List[OrchestratorEvent]: return self.events.get('daemon:' + name, []) |