summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/cephadm/services/osd.py
blob: 80bf92772c49b0740747af1a31583cf365fd34fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
import json
import logging
from asyncio import gather
from threading import Lock
from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING

from ceph.deployment import translate
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.drive_selection import DriveSelection
from ceph.deployment.inventory import Device
from ceph.utils import datetime_to_str, str_to_datetime

from datetime import datetime
import orchestrator
from cephadm.serve import CephadmServe
from cephadm.utils import SpecialHostLabels
from ceph.utils import datetime_now
from orchestrator import OrchestratorError, DaemonDescription
from mgr_module import MonCommandFailed

from cephadm.services.cephadmservice import CephadmDaemonDeploySpec, CephService

if TYPE_CHECKING:
    from cephadm.module import CephadmOrchestrator

logger = logging.getLogger(__name__)


class OSDService(CephService):
    TYPE = 'osd'

    def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
        logger.debug(f"Processing DriveGroup {drive_group}")
        osd_id_claims = OsdIdClaims(self.mgr)
        if osd_id_claims.get():
            logger.info(
                f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims.get()}")

        async def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
            # skip this host if there has been no change in inventory
            if not self.mgr.cache.osdspec_needs_apply(host, drive_group):
                self.mgr.log.debug("skipping apply of %s on %s (no change)" % (
                    host, drive_group))
                return None
            # skip this host if we cannot schedule here
            if self.mgr.inventory.has_label(host, SpecialHostLabels.DRAIN_DAEMONS):
                return None

            osd_id_claims_for_host = osd_id_claims.filtered_by_host(host)

            cmds: List[str] = self.driveselection_to_ceph_volume(drive_selection,
                                                                 osd_id_claims_for_host)
            if not cmds:
                logger.debug("No data_devices, skipping DriveGroup: {}".format(
                    drive_group.service_id))
                return None

            logger.debug('Applying service osd.%s on host %s...' % (
                drive_group.service_id, host
            ))
            start_ts = datetime_now()
            env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
            ret_msg = await self.create_single_host(
                drive_group, host, cmds,
                replace_osd_ids=osd_id_claims_for_host, env_vars=env_vars
            )
            self.mgr.cache.update_osdspec_last_applied(
                host, drive_group.service_name(), start_ts
            )
            self.mgr.cache.save_host(host)
            return ret_msg

        async def all_hosts() -> List[Optional[str]]:
            futures = [create_from_spec_one(h, ds)
                       for h, ds in self.prepare_drivegroup(drive_group)]
            return await gather(*futures)

        with self.mgr.async_timeout_handler('cephadm deploy (osd daemon)'):
            ret = self.mgr.wait_async(all_hosts())
        return ", ".join(filter(None, ret))

    async def create_single_host(self,
                                 drive_group: DriveGroupSpec,
                                 host: str, cmds: List[str], replace_osd_ids: List[str],
                                 env_vars: Optional[List[str]] = None) -> str:
        for cmd in cmds:
            out, err, code = await self._run_ceph_volume_command(host, cmd, env_vars=env_vars)
            if code == 1 and ', it is already prepared' in '\n'.join(err):
                # HACK: when we create against an existing LV, ceph-volume
                # returns an error and the above message.  To make this
                # command idempotent, tolerate this "error" and continue.
                logger.debug('the device was already prepared; continuing')
                code = 0
            if code:
                raise RuntimeError(
                    'cephadm exited with an error code: %d, stderr:%s' % (
                        code, '\n'.join(err)))
        return await self.deploy_osd_daemons_for_existing_osds(host, drive_group.service_name(),
                                                               replace_osd_ids)

    async def deploy_osd_daemons_for_existing_osds(self, host: str, service_name: str,
                                                   replace_osd_ids: Optional[List[str]] = None) -> str:

        if replace_osd_ids is None:
            replace_osd_ids = OsdIdClaims(self.mgr).filtered_by_host(host)
            assert replace_osd_ids is not None

        # check result: lvm
        osds_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
            host, 'osd', 'ceph-volume',
            [
                '--',
                'lvm', 'list',
                '--format', 'json',
            ])
        before_osd_uuid_map = self.mgr.get_osd_uuid_map(only_up=True)
        fsid = self.mgr._cluster_fsid
        osd_uuid_map = self.mgr.get_osd_uuid_map()
        created = []
        for osd_id, osds in osds_elems.items():
            for osd in osds:
                if osd['type'] == 'db':
                    continue
                if osd['tags']['ceph.cluster_fsid'] != fsid:
                    logger.debug('mismatched fsid, skipping %s' % osd)
                    continue
                if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
                    # if it exists but is part of the replacement operation, don't skip
                    continue
                if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
                    # cephadm daemon instance already exists
                    logger.debug(f'osd id {osd_id} daemon already exists')
                    continue
                if osd_id not in osd_uuid_map:
                    logger.debug('osd id {} does not exist in cluster'.format(osd_id))
                    continue
                if osd_uuid_map.get(osd_id) != osd['tags']['ceph.osd_fsid']:
                    logger.debug('mismatched osd uuid (cluster has %s, osd '
                                 'has %s)' % (
                                     osd_uuid_map.get(osd_id),
                                     osd['tags']['ceph.osd_fsid']))
                    continue

                created.append(osd_id)
                daemon_spec: CephadmDaemonDeploySpec = CephadmDaemonDeploySpec(
                    service_name=service_name,
                    daemon_id=str(osd_id),
                    host=host,
                    daemon_type='osd',
                )
                daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
                await CephadmServe(self.mgr)._create_daemon(
                    daemon_spec,
                    osd_uuid_map=osd_uuid_map)

        # check result: raw
        raw_elems: dict = await CephadmServe(self.mgr)._run_cephadm_json(
            host, 'osd', 'ceph-volume',
            [
                '--',
                'raw', 'list',
                '--format', 'json',
            ])
        for osd_uuid, osd in raw_elems.items():
            if osd.get('ceph_fsid') != fsid:
                continue
            osd_id = str(osd.get('osd_id', '-1'))
            if osd_id in before_osd_uuid_map and osd_id not in replace_osd_ids:
                # if it exists but is part of the replacement operation, don't skip
                continue
            if self.mgr.cache.has_daemon(f'osd.{osd_id}', host):
                # cephadm daemon instance already exists
                logger.debug(f'osd id {osd_id} daemon already exists')
                continue
            if osd_id not in osd_uuid_map:
                logger.debug('osd id {} does not exist in cluster'.format(osd_id))
                continue
            if osd_uuid_map.get(osd_id) != osd_uuid:
                logger.debug('mismatched osd uuid (cluster has %s, osd '
                             'has %s)' % (osd_uuid_map.get(osd_id), osd_uuid))
                continue
            if osd_id in created:
                continue

            created.append(osd_id)
            daemon_spec = CephadmDaemonDeploySpec(
                service_name=service_name,
                daemon_id=osd_id,
                host=host,
                daemon_type='osd',
            )
            daemon_spec.final_config, daemon_spec.deps = self.generate_config(daemon_spec)
            await CephadmServe(self.mgr)._create_daemon(
                daemon_spec,
                osd_uuid_map=osd_uuid_map)

        if created:
            self.mgr.cache.invalidate_host_devices(host)
            self.mgr.cache.invalidate_autotune(host)
            return "Created osd(s) %s on host '%s'" % (','.join(created), host)
        else:
            return "Created no osd(s) on host %s; already created?" % host

    def prepare_drivegroup(self, drive_group: DriveGroupSpec) -> List[Tuple[str, DriveSelection]]:
        # 1) use fn_filter to determine matching_hosts
        matching_hosts = drive_group.placement.filter_matching_hostspecs(
            self.mgr.cache.get_schedulable_hosts())
        # 2) Map the inventory to the InventoryHost object
        host_ds_map = []

        # set osd_id_claims

        def _find_inv_for_host(hostname: str, inventory_dict: dict) -> List[Device]:
            # This is stupid and needs to be loaded with the host
            for _host, _inventory in inventory_dict.items():
                if _host == hostname:
                    return _inventory
            raise OrchestratorError("No inventory found for host: {}".format(hostname))

        # 3) iterate over matching_host and call DriveSelection
        logger.debug(f"Checking matching hosts -> {matching_hosts}")
        for host in matching_hosts:
            inventory_for_host = _find_inv_for_host(host, self.mgr.cache.devices)
            logger.debug(f"Found inventory for host {inventory_for_host}")

            # List of Daemons on that host
            dd_for_spec = self.mgr.cache.get_daemons_by_service(drive_group.service_name())
            dd_for_spec_and_host = [dd for dd in dd_for_spec if dd.hostname == host]

            drive_selection = DriveSelection(drive_group, inventory_for_host,
                                             existing_daemons=len(dd_for_spec_and_host))
            logger.debug(f"Found drive selection {drive_selection}")
            if drive_group.method and drive_group.method == 'raw':
                # ceph-volume can currently only handle a 1:1 mapping
                # of data/db/wal devices for raw mode osds. If db/wal devices
                # are defined and the number does not match the number of data
                # devices, we need to bail out
                if drive_selection.data_devices() and drive_selection.db_devices():
                    if len(drive_selection.data_devices()) != len(drive_selection.db_devices()):
                        raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to db devices. Found '
                                                f'{len(drive_selection.data_devices())} potential data device(s) and '
                                                f'{len(drive_selection.db_devices())} potential db device(s) on host {host}')
                if drive_selection.data_devices() and drive_selection.wal_devices():
                    if len(drive_selection.data_devices()) != len(drive_selection.wal_devices()):
                        raise OrchestratorError('Raw mode only supports a 1:1 ratio of data to wal devices. Found '
                                                f'{len(drive_selection.data_devices())} potential data device(s) and '
                                                f'{len(drive_selection.wal_devices())} potential wal device(s) on host {host}')
            host_ds_map.append((host, drive_selection))
        return host_ds_map

    @staticmethod
    def driveselection_to_ceph_volume(drive_selection: DriveSelection,
                                      osd_id_claims: Optional[List[str]] = None,
                                      preview: bool = False) -> List[str]:
        logger.debug(f"Translating DriveGroup <{drive_selection.spec}> to ceph-volume command")
        cmds: List[str] = translate.to_ceph_volume(drive_selection,
                                                   osd_id_claims, preview=preview).run()
        logger.debug(f"Resulting ceph-volume cmds: {cmds}")
        return cmds

    def get_previews(self, host: str) -> List[Dict[str, Any]]:
        # Find OSDSpecs that match host.
        osdspecs = self.resolve_osdspecs_for_host(host)
        return self.generate_previews(osdspecs, host)

    def generate_previews(self, osdspecs: List[DriveGroupSpec], for_host: str) -> List[Dict[str, Any]]:
        """

        The return should look like this:

        [
          {'data': {<metadata>},
           'osdspec': <name of osdspec>,
           'host': <name of host>,
           'notes': <notes>
           },

           {'data': ...,
            'osdspec': ..,
            'host': ...,
            'notes': ...
           }
        ]

        Note: One host can have multiple previews based on its assigned OSDSpecs.
        """
        self.mgr.log.debug(f"Generating OSDSpec previews for {osdspecs}")
        ret_all: List[Dict[str, Any]] = []
        if not osdspecs:
            return ret_all
        for osdspec in osdspecs:

            # populate osd_id_claims
            osd_id_claims = OsdIdClaims(self.mgr)

            # prepare driveselection
            for host, ds in self.prepare_drivegroup(osdspec):
                if host != for_host:
                    continue

                # driveselection for host
                cmds: List[str] = self.driveselection_to_ceph_volume(ds,
                                                                     osd_id_claims.filtered_by_host(
                                                                         host),
                                                                     preview=True)
                if not cmds:
                    logger.debug("No data_devices, skipping DriveGroup: {}".format(
                        osdspec.service_name()))
                    continue

                # get preview data from ceph-volume
                for cmd in cmds:
                    with self.mgr.async_timeout_handler(host, f'cephadm ceph-volume -- {cmd}'):
                        out, err, code = self.mgr.wait_async(self._run_ceph_volume_command(host, cmd))
                    if out:
                        try:
                            concat_out: Dict[str, Any] = json.loads(' '.join(out))
                        except ValueError:
                            logger.exception('Cannot decode JSON: \'%s\'' % ' '.join(out))
                            concat_out = {}
                        notes = []
                        if (
                            osdspec.data_devices is not None
                            and osdspec.data_devices.limit
                            and (len(concat_out) + ds.existing_daemons) < osdspec.data_devices.limit
                        ):
                            found = len(concat_out)
                            limit = osdspec.data_devices.limit
                            notes.append(
                                f'NOTE: Did not find enough disks matching filter on host {host} to reach data device limit\n'
                                f'(New Devices: {found} | Existing Matching Daemons: {ds.existing_daemons} | Limit: {limit})')
                        ret_all.append({'data': concat_out,
                                        'osdspec': osdspec.service_id,
                                        'host': host,
                                        'notes': notes})
        return ret_all

    def resolve_hosts_for_osdspecs(self,
                                   specs: Optional[List[DriveGroupSpec]] = None
                                   ) -> List[str]:
        osdspecs = []
        if specs:
            osdspecs = [cast(DriveGroupSpec, spec) for spec in specs]
        if not osdspecs:
            self.mgr.log.debug("No OSDSpecs found")
            return []
        return sum([spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()) for spec in osdspecs], [])

    def resolve_osdspecs_for_host(self, host: str,
                                  specs: Optional[List[DriveGroupSpec]] = None) -> List[DriveGroupSpec]:
        matching_specs = []
        self.mgr.log.debug(f"Finding OSDSpecs for host: <{host}>")
        if not specs:
            specs = [cast(DriveGroupSpec, spec) for (sn, spec) in self.mgr.spec_store.spec_preview.items()
                     if spec.service_type == 'osd']
        for spec in specs:
            if host in spec.placement.filter_matching_hostspecs(self.mgr.cache.get_schedulable_hosts()):
                self.mgr.log.debug(f"Found OSDSpecs for host: <{host}> -> <{spec}>")
                matching_specs.append(spec)
        return matching_specs

    async def _run_ceph_volume_command(self, host: str,
                                       cmd: str, env_vars: Optional[List[str]] = None
                                       ) -> Tuple[List[str], List[str], int]:
        self.mgr.inventory.assert_host(host)

        # get bootstrap key
        ret, keyring, err = self.mgr.check_mon_command({
            'prefix': 'auth get',
            'entity': 'client.bootstrap-osd',
        })

        j = json.dumps({
            'config': self.mgr.get_minimal_ceph_conf(),
            'keyring': keyring,
        })

        split_cmd = cmd.split(' ')
        _cmd = ['--config-json', '-', '--']
        _cmd.extend(split_cmd)
        out, err, code = await CephadmServe(self.mgr)._run_cephadm(
            host, 'osd', 'ceph-volume',
            _cmd,
            env_vars=env_vars,
            stdin=j,
            error_ok=True)
        return out, err, code

    def post_remove(self, daemon: DaemonDescription, is_failed_deploy: bool) -> None:
        # Do not remove the osd.N keyring, if we failed to deploy the OSD, because
        # we cannot recover from it. The OSD keys are created by ceph-volume and not by
        # us.
        if not is_failed_deploy:
            super().post_remove(daemon, is_failed_deploy=is_failed_deploy)


class OsdIdClaims(object):
    """
    Retrieve and provide osd ids that can be reused in the cluster
    """

    def __init__(self, mgr: "CephadmOrchestrator") -> None:
        self.mgr: "CephadmOrchestrator" = mgr
        self.osd_host_map: Dict[str, List[str]] = dict()
        self.refresh()

    def refresh(self) -> None:
        try:
            ret, out, err = self.mgr.check_mon_command({
                'prefix': 'osd tree',
                'states': ['destroyed'],
                'format': 'json'
            })
        except MonCommandFailed as e:
            logger.exception('osd tree failed')
            raise OrchestratorError(str(e))
        try:
            tree = json.loads(out)
        except ValueError:
            logger.exception(f'Cannot decode JSON: \'{out}\'')
            return

        nodes = tree.get('nodes', {})
        for node in nodes:
            if node.get('type') == 'host':
                self.osd_host_map.update(
                    {node.get('name'): [str(_id) for _id in node.get('children', list())]}
                )
        if self.osd_host_map:
            self.mgr.log.info(f"Found osd claims -> {self.osd_host_map}")

    def get(self) -> Dict[str, List[str]]:
        return self.osd_host_map

    def filtered_by_host(self, host: str) -> List[str]:
        """
        Return the list of osd ids that can be reused  in a host

        OSD id claims in CRUSH map are linked to the bare name of
        the hostname. In case of FQDN hostnames the host is searched by the
        bare name
        """
        return self.osd_host_map.get(host.split(".")[0], [])


class RemoveUtil(object):
    def __init__(self, mgr: "CephadmOrchestrator") -> None:
        self.mgr: "CephadmOrchestrator" = mgr

    def get_osds_in_cluster(self) -> List[str]:
        osd_map = self.mgr.get_osdmap()
        return [str(x.get('osd')) for x in osd_map.dump().get('osds', [])]

    def osd_df(self) -> dict:
        base_cmd = 'osd df'
        ret, out, err = self.mgr.mon_command({
            'prefix': base_cmd,
            'format': 'json'
        })
        try:
            return json.loads(out)
        except ValueError:
            logger.exception(f'Cannot decode JSON: \'{out}\'')
            return {}

    def get_pg_count(self, osd_id: int, osd_df: Optional[dict] = None) -> int:
        if not osd_df:
            osd_df = self.osd_df()
        osd_nodes = osd_df.get('nodes', [])
        for osd_node in osd_nodes:
            if osd_node.get('id') == int(osd_id):
                return osd_node.get('pgs', -1)
        return -1

    def find_osd_stop_threshold(self, osds: List["OSD"]) -> Optional[List["OSD"]]:
        """
        Cut osd_id list in half until it's ok-to-stop

        :param osds: list of osd_ids
        :return: list of ods_ids that can be stopped at once
        """
        if not osds:
            return []
        while not self.ok_to_stop(osds):
            if len(osds) <= 1:
                # can't even stop one OSD, aborting
                self.mgr.log.debug(
                    "Can't even stop one OSD. Cluster is probably busy. Retrying later..")
                return []

            # This potentially prolongs the global wait time.
            self.mgr.event.wait(1)
            # splitting osd_ids in half until ok_to_stop yields success
            # maybe popping ids off one by one is better here..depends on the cluster size I guess..
            # There's a lot of room for micro adjustments here
            osds = osds[len(osds) // 2:]
        return osds

        # todo start draining
        #  return all([osd.start_draining() for osd in osds])

    def ok_to_stop(self, osds: List["OSD"]) -> bool:
        cmd_args = {
            'prefix': "osd ok-to-stop",
            'ids': [str(osd.osd_id) for osd in osds]
        }
        return self._run_mon_cmd(cmd_args, error_ok=True)

    def set_osd_flag(self, osds: List["OSD"], flag: str) -> bool:
        base_cmd = f"osd {flag}"
        self.mgr.log.debug(f"running cmd: {base_cmd} on ids {osds}")
        ret, out, err = self.mgr.mon_command({
            'prefix': base_cmd,
            'ids': [str(osd.osd_id) for osd in osds]
        })
        if ret != 0:
            self.mgr.log.error(f"Could not set {flag} flag for {osds}. <{err}>")
            return False
        self.mgr.log.info(f"{','.join([str(o) for o in osds])} now {flag}")
        return True

    def get_weight(self, osd: "OSD") -> Optional[float]:
        ret, out, err = self.mgr.mon_command({
            'prefix': 'osd crush tree',
            'format': 'json',
        })
        if ret != 0:
            self.mgr.log.error(f"Could not dump crush weights. <{err}>")
            return None
        j = json.loads(out)
        for n in j.get("nodes", []):
            if n.get("name") == f"osd.{osd.osd_id}":
                self.mgr.log.info(f"{osd} crush weight is {n.get('crush_weight')}")
                return n.get("crush_weight")
        return None

    def reweight_osd(self, osd: "OSD", weight: float) -> bool:
        self.mgr.log.debug(f"running cmd: osd crush reweight on {osd}")
        ret, out, err = self.mgr.mon_command({
            'prefix': "osd crush reweight",
            'name': f"osd.{osd.osd_id}",
            'weight': weight,
        })
        if ret != 0:
            self.mgr.log.error(f"Could not reweight {osd} to {weight}. <{err}>")
            return False
        self.mgr.log.info(f"{osd} weight is now {weight}")
        return True

    def zap_osd(self, osd: "OSD") -> str:
        "Zaps all devices that are associated with an OSD"
        if osd.hostname is not None:
            cmd = ['--', 'lvm', 'zap', '--osd-id', str(osd.osd_id)]
            if osd.replace_block:
                cmd.append('--replace-block')
            if osd.replace_db:
                cmd.append('--replace-db')
            if osd.replace_wal:
                cmd.append('--replace-wal')
            if not osd.no_destroy:
                cmd.append('--destroy')
            with self.mgr.async_timeout_handler(osd.hostname, f'cephadm ceph-volume {" ".join(cmd)}'):
                out, err, code = self.mgr.wait_async(CephadmServe(self.mgr)._run_cephadm(
                    osd.hostname, 'osd', 'ceph-volume',
                    cmd,
                    error_ok=True))
            self.mgr.cache.invalidate_host_devices(osd.hostname)
            if code:
                raise OrchestratorError('Zap failed: %s' % '\n'.join(out + err))
            return '\n'.join(out + err)
        raise OrchestratorError(f"Failed to zap OSD {osd.osd_id} because host was unknown")

    def safe_to_destroy(self, osd_ids: List[int]) -> bool:
        """ Queries the safe-to-destroy flag for OSDs """
        cmd_args = {'prefix': 'osd safe-to-destroy',
                    'ids': [str(x) for x in osd_ids]}
        return self._run_mon_cmd(cmd_args, error_ok=True)

    def destroy_osd(self, osd_id: int) -> bool:
        """ Destroys an OSD (forcefully) """
        cmd_args = {'prefix': 'osd destroy-actual',
                    'id': int(osd_id),
                    'yes_i_really_mean_it': True}
        return self._run_mon_cmd(cmd_args)

    def purge_osd(self, osd_id: int) -> bool:
        """ Purges an OSD from the cluster (forcefully) """
        cmd_args = {
            'prefix': 'osd purge-actual',
            'id': int(osd_id),
            'yes_i_really_mean_it': True
        }
        return self._run_mon_cmd(cmd_args)

    def _run_mon_cmd(self, cmd_args: dict, error_ok: bool = False) -> bool:
        """
        Generic command to run mon_command and evaluate/log the results
        """
        ret, out, err = self.mgr.mon_command(cmd_args)
        if ret != 0:
            self.mgr.log.debug(f"ran {cmd_args} with mon_command")
            if not error_ok:
                self.mgr.log.error(
                    f"cmd: {cmd_args.get('prefix')} failed with: {err}. (errno:{ret})")
            return False
        self.mgr.log.debug(f"cmd: {cmd_args.get('prefix')} returns: {out}")
        return True


class NotFoundError(Exception):
    pass


class OSD:

    def __init__(self,
                 osd_id: int,
                 remove_util: RemoveUtil,
                 drain_started_at: Optional[datetime] = None,
                 process_started_at: Optional[datetime] = None,
                 drain_stopped_at: Optional[datetime] = None,
                 drain_done_at: Optional[datetime] = None,
                 draining: bool = False,
                 started: bool = False,
                 stopped: bool = False,
                 replace: bool = False,
                 replace_block: bool = False,
                 replace_db: bool = False,
                 replace_wal: bool = False,
                 force: bool = False,
                 hostname: Optional[str] = None,
                 zap: bool = False,
                 no_destroy: bool = False,
                 original_weight: Optional[float] = None):
        # the ID of the OSD
        self.osd_id = osd_id

        # when did process (not the actual draining) start
        self.process_started_at = process_started_at

        # when did the drain start
        self.drain_started_at = drain_started_at

        # when did the drain stop
        self.drain_stopped_at = drain_stopped_at

        # when did the drain finish
        self.drain_done_at = drain_done_at

        # did the draining start
        self.draining = draining

        # was the operation started
        self.started = started

        # was the operation stopped
        self.stopped = stopped

        # If this is a replace or remove operation
        self.replace = replace
        # If this is a block device replacement
        self.replace_block = replace_block
        # If this is a db device replacement
        self.replace_db = replace_db
        # If this is a wal device replacement
        self.replace_wal = replace_wal
        # If we wait for the osd to be drained
        self.force = force
        # The name of the node
        self.hostname = hostname

        # mgr obj to make mgr/mon calls
        self.rm_util: RemoveUtil = remove_util

        self.original_weight: Optional[float] = original_weight

        # Whether devices associated with the OSD should be zapped (DATA ERASED)
        self.zap = zap
        # Whether all associated LV devices should be destroyed.
        self.no_destroy = no_destroy

    def start(self) -> None:
        if self.started:
            logger.debug(f"Already started draining {self}")
            return None
        self.started = True
        self.stopped = False
        self.original_weight = self.rm_util.get_weight(self)

    def start_draining(self) -> bool:
        if self.stopped:
            logger.debug(f"Won't start draining {self}. OSD draining is stopped.")
            return False
        if self.any_replace_params:
            self.rm_util.set_osd_flag([self], 'out')
        else:
            self.rm_util.reweight_osd(self, 0.0)
        self.drain_started_at = datetime.utcnow()
        self.draining = True
        logger.debug(f"Started draining {self}.")
        return True

    def stop_draining(self) -> bool:
        if self.any_replace_params:
            self.rm_util.set_osd_flag([self], 'in')
        else:
            if self.original_weight:
                self.rm_util.reweight_osd(self, self.original_weight)
        self.drain_stopped_at = datetime.utcnow()
        self.draining = False
        logger.debug(f"Stopped draining {self}.")
        return True

    def stop(self) -> None:
        if self.stopped:
            logger.debug(f"Already stopped draining {self}")
            return None
        self.started = False
        self.stopped = True
        self.stop_draining()

    @property
    def is_draining(self) -> bool:
        """
        Consider an OSD draining when it is
        actively draining but not yet empty
        """
        return self.draining and not self.is_empty

    @property
    def is_ok_to_stop(self) -> bool:
        return self.rm_util.ok_to_stop([self])

    @property
    def is_empty(self) -> bool:
        if self.get_pg_count() == 0:
            if not self.drain_done_at:
                self.drain_done_at = datetime.utcnow()
                self.draining = False
            return True
        return False

    def safe_to_destroy(self) -> bool:
        return self.rm_util.safe_to_destroy([self.osd_id])

    def down(self) -> bool:
        return self.rm_util.set_osd_flag([self], 'down')

    def destroy(self) -> bool:
        return self.rm_util.destroy_osd(self.osd_id)

    def do_zap(self) -> str:
        return self.rm_util.zap_osd(self)

    def purge(self) -> bool:
        return self.rm_util.purge_osd(self.osd_id)

    def get_pg_count(self) -> int:
        return self.rm_util.get_pg_count(self.osd_id)

    @property
    def exists(self) -> bool:
        return str(self.osd_id) in self.rm_util.get_osds_in_cluster()

    def drain_status_human(self) -> str:
        default_status = 'not started'
        status = 'started' if self.started and not self.draining else default_status
        status = 'draining' if self.draining else status
        status = 'done, waiting for purge' if self.drain_done_at and not self.draining else status
        return status

    def pg_count_str(self) -> str:
        return 'n/a' if self.get_pg_count() < 0 else str(self.get_pg_count())

    def to_json(self) -> dict:
        out: Dict[str, Any] = dict()
        out['osd_id'] = self.osd_id
        out['started'] = self.started
        out['draining'] = self.draining
        out['stopped'] = self.stopped
        out['replace'] = self.replace
        out['replace_block'] = self.replace_block
        out['replace_db'] = self.replace_db
        out['replace_wal'] = self.replace_wal
        out['force'] = self.force
        out['zap'] = self.zap
        out['hostname'] = self.hostname  # type: ignore
        out['original_weight'] = self.original_weight

        for k in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
            if getattr(self, k):
                out[k] = datetime_to_str(getattr(self, k))
            else:
                out[k] = getattr(self, k)
        return out

    @classmethod
    def from_json(cls, inp: Optional[Dict[str, Any]], rm_util: RemoveUtil) -> Optional["OSD"]:
        if not inp:
            return None
        for date_field in ['drain_started_at', 'drain_stopped_at', 'drain_done_at', 'process_started_at']:
            if inp.get(date_field):
                inp.update({date_field: str_to_datetime(inp.get(date_field, ''))})
        inp.update({'remove_util': rm_util})
        if 'nodename' in inp:
            hostname = inp.pop('nodename')
            inp['hostname'] = hostname
        return cls(**inp)

    @property
    def any_replace_params(self) -> bool:
        return any([self.replace,
                    self.replace_block,
                    self.replace_db,
                    self.replace_wal])

    def __hash__(self) -> int:
        return hash(self.osd_id)

    def __eq__(self, other: object) -> bool:
        if not isinstance(other, OSD):
            return NotImplemented
        return self.osd_id == other.osd_id

    def __repr__(self) -> str:
        return f"osd.{self.osd_id}{' (draining)' if self.draining else ''}"


class OSDRemovalQueue(object):

    def __init__(self, mgr: "CephadmOrchestrator") -> None:
        self.mgr: "CephadmOrchestrator" = mgr
        self.osds: Set[OSD] = set()
        self.rm_util = RemoveUtil(mgr)

        # locks multithreaded access to self.osds. Please avoid locking
        # network calls, like mon commands.
        self.lock = Lock()

    def process_removal_queue(self) -> bool:
        """
        Performs actions in the _serve() loop to remove an OSD
        when criteria is met.

        we can't hold self.lock, as we're calling _remove_daemon in the loop
        """

        result: bool = False

        # make sure that we don't run on OSDs that are not in the cluster anymore.
        self.cleanup()

        # find osds that are ok-to-stop and not yet draining
        ready_to_drain_osds = self._ready_to_drain_osds()
        if ready_to_drain_osds:
            # start draining those
            _ = [osd.start_draining() for osd in ready_to_drain_osds]

        all_osds = self.all_osds()

        logger.debug(
            f"{self.queue_size()} OSDs are scheduled "
            f"for removal: {all_osds}")

        # Check all osds for their state and take action (remove, purge etc)
        new_queue: Set[OSD] = set()
        for osd in all_osds:  # type: OSD
            if not osd.force:
                # skip criteria
                if not osd.is_empty:
                    logger.debug(f"{osd} is not empty yet. Waiting a bit more")
                    new_queue.add(osd)
                    continue

            if not osd.safe_to_destroy():
                logger.debug(
                    f"{osd} is not safe-to-destroy yet. Waiting a bit more")
                new_queue.add(osd)
                continue

            # abort criteria
            if not osd.down():
                # also remove it from the remove_osd list and set a health_check warning?
                raise orchestrator.OrchestratorError(
                    f"Could not mark {osd} down")

            # stop and remove daemon
            assert osd.hostname is not None

            if self.mgr.cache.has_daemon(f'osd.{osd.osd_id}'):
                CephadmServe(self.mgr)._remove_daemon(f'osd.{osd.osd_id}', osd.hostname)
                logger.info(f"Successfully removed {osd} on {osd.hostname}")
                result = True
            else:
                logger.info(f"Daemon {osd} on {osd.hostname} was already removed")

            any_replace_params: bool = any([osd.replace,
                                            osd.replace_block,
                                            osd.replace_db,
                                            osd.replace_wal])
            if any_replace_params:
                # mark destroyed in osdmap
                if not osd.destroy():
                    raise orchestrator.OrchestratorError(
                        f"Could not destroy {osd}")
                logger.info(
                    f"Successfully destroyed old {osd} on {osd.hostname}; ready for replacement")
                if any_replace_params:
                    osd.zap = True
            else:
                # purge from osdmap
                if not osd.purge():
                    raise orchestrator.OrchestratorError(f"Could not purge {osd}")
                logger.info(f"Successfully purged {osd} on {osd.hostname}")

            if osd.zap:
                # throws an exception if the zap fails
                logger.info(f"Zapping devices for {osd} on {osd.hostname}")
                osd.do_zap()
                logger.info(f"Successfully zapped devices for {osd} on {osd.hostname}")
            self.mgr.cache.invalidate_host_devices(osd.hostname)
            logger.debug(f"Removing {osd} from the queue.")

        # self could change while this is processing (osds get added from the CLI)
        # The new set is: 'an intersection of all osds that are still not empty/removed (new_queue) and
        # osds that were added while this method was executed'
        with self.lock:
            self.osds.intersection_update(new_queue)
            self._save_to_store()
        return result

    def cleanup(self) -> None:
        # OSDs can always be cleaned up manually. This ensures that we run on existing OSDs
        with self.lock:
            for osd in self._not_in_cluster():
                self.osds.remove(osd)

    def _ready_to_drain_osds(self) -> List["OSD"]:
        """
        Returns OSDs that are ok to stop and not yet draining. Only returns as many OSDs as can
        be accommodated by the 'max_osd_draining_count' config value, considering the number of OSDs
        that are already draining.
        """
        draining_limit = max(1, self.mgr.max_osd_draining_count)
        num_already_draining = len(self.draining_osds())
        num_to_start_draining = max(0, draining_limit - num_already_draining)
        stoppable_osds = self.rm_util.find_osd_stop_threshold(self.idling_osds())
        return [] if stoppable_osds is None else stoppable_osds[:num_to_start_draining]

    def _save_to_store(self) -> None:
        osd_queue = [osd.to_json() for osd in self.osds]
        logger.debug(f"Saving {osd_queue} to store")
        self.mgr.set_store('osd_remove_queue', json.dumps(osd_queue))

    def load_from_store(self) -> None:
        with self.lock:
            for k, v in self.mgr.get_store_prefix('osd_remove_queue').items():
                for osd in json.loads(v):
                    logger.debug(f"Loading osd ->{osd} from store")
                    osd_obj = OSD.from_json(osd, rm_util=self.rm_util)
                    if osd_obj is not None:
                        self.osds.add(osd_obj)

    def as_osd_ids(self) -> List[int]:
        with self.lock:
            return [osd.osd_id for osd in self.osds]

    def queue_size(self) -> int:
        with self.lock:
            return len(self.osds)

    def draining_osds(self) -> List["OSD"]:
        with self.lock:
            return [osd for osd in self.osds if osd.is_draining]

    def idling_osds(self) -> List["OSD"]:
        with self.lock:
            return [osd for osd in self.osds if not osd.is_draining and not osd.is_empty]

    def empty_osds(self) -> List["OSD"]:
        with self.lock:
            return [osd for osd in self.osds if osd.is_empty]

    def all_osds(self) -> List["OSD"]:
        with self.lock:
            return [osd for osd in self.osds]

    def _not_in_cluster(self) -> List["OSD"]:
        return [osd for osd in self.osds if not osd.exists]

    def enqueue(self, osd: "OSD") -> None:
        if not osd.exists:
            raise NotFoundError()
        with self.lock:
            self.osds.add(osd)
        osd.start()

    def rm_by_osd_id(self, osd_id: int) -> None:
        osd: Optional["OSD"] = None
        for o in self.osds:
            if o.osd_id == osd_id:
                osd = o
        if not osd:
            logger.debug(f"Could not find osd with id {osd_id} in queue.")
            raise KeyError(f'No osd with id {osd_id} in removal queue')
        self.rm(osd)

    def rm(self, osd: "OSD") -> None:
        if not osd.exists:
            raise NotFoundError()
        osd.stop()
        with self.lock:
            try:
                logger.debug(f'Removing {osd} from the queue.')
                self.osds.remove(osd)
            except KeyError:
                logger.debug(f"Could not find {osd} in queue.")
                raise KeyError

    def __eq__(self, other: Any) -> bool:
        if not isinstance(other, OSDRemovalQueue):
            return False
        with self.lock:
            return self.osds == other.osds