summaryrefslogtreecommitdiffstats
path: root/qa/tasks/stretch_cluster.py
blob: 48acf0025ee95e9f06b2e9448d7a1de8fda5dcb9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
import json
import logging
import random
from tasks.mgr.mgr_test_case import MgrTestCase
from time import sleep

log = logging.getLogger(__name__)


class TestStretchCluster(MgrTestCase):
    """
    Test the stretch cluster feature.
    """
    # Define some constants
    POOL = 'pool_stretch'
    CLUSTER = "ceph"
    WRITE_PERIOD = 10
    RECOVERY_PERIOD = WRITE_PERIOD * 6
    SUCCESS_HOLD_TIME = 7
    # This dictionary maps the datacenter to the osd ids and hosts
    DC_OSDS = {
        'dc1': {
            "node-1": 0,
            "node-2": 1,
            "node-3": 2,
        },
        'dc2': {
            "node-4": 3,
            "node-5": 4,
            "node-6": 5,
        },
        'dc3': {
            "node-7": 6,
            "node-8": 7,
            "node-9": 8,
        }
    }

    # This dictionary maps the datacenter to the mon ids and hosts
    DC_MONS = {
        'dc1': {
            "node-1": 'a',
            "node-2": 'b',
            "node-3": 'c',
        },
        'dc2': {
            "node-4": 'd',
            "node-5": 'e',
            "node-6": 'f',
        },
        'dc3': {
            "node-7": 'g',
            "node-8": 'h',
            "node-9": 'i',
        }
    }
    PEERING_CRUSH_BUCKET_COUNT = 2
    PEERING_CRUSH_BUCKET_TARGET = 3
    PEERING_CRUSH_BUCKET_BARRIER = 'datacenter'
    CRUSH_RULE = 'replicated_rule_custom'
    SIZE = 6
    MIN_SIZE = 3
    BUCKET_MAX = SIZE // PEERING_CRUSH_BUCKET_TARGET
    if (BUCKET_MAX * PEERING_CRUSH_BUCKET_TARGET) < SIZE:
        BUCKET_MAX += 1

    def setUp(self):
        """
        Setup the cluster and
        ensure we have a clean condition before the test.
        """
        # Ensure we have at least 6 OSDs
        super(TestStretchCluster, self).setUp()
        if self._osd_count() < 6:
            self.skipTest("Not enough OSDS!")

        # Remove any filesystems so that we can remove their pools
        if self.mds_cluster:
            self.mds_cluster.mds_stop()
            self.mds_cluster.mds_fail()
            self.mds_cluster.delete_all_filesystems()

        # Remove all other pools
        for pool in self.mgr_cluster.mon_manager.get_osd_dump_json()['pools']:
            self.mgr_cluster.mon_manager.remove_pool(pool['pool_name'])

    def tearDown(self):
        """
        Clean up the cluster after the test.
        """
        # Remove the pool
        if self.POOL in self.mgr_cluster.mon_manager.pools:
            self.mgr_cluster.mon_manager.remove_pool(self.POOL)

        osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json()
        for osd in osd_map['osds']:
            # mark all the osds in
            if osd['weight'] == 0.0:
                self.mgr_cluster.mon_manager.raw_cluster_cmd(
                    'osd', 'in', str(osd['osd']))
            # Bring back all the osds and move it back to the host.
            if osd['up'] == 0:
                self._bring_back_osd(osd['osd'])
                self._move_osd_back_to_host(osd['osd'])

        # Bring back all the MONS
        mons = self._get_all_mons_from_all_dc()
        for mon in mons:
            self._bring_back_mon(mon)
        super(TestStretchCluster, self).tearDown()

    def _setup_pool(self, size=None, min_size=None, rule=None):
        """
        Create a pool and set its size.
        """
        self.mgr_cluster.mon_manager.create_pool(self.POOL, min_size=min_size)
        if size is not None:
            self.mgr_cluster.mon_manager.raw_cluster_cmd(
                'osd', 'pool', 'set', self.POOL, 'size', str(size))
        if rule is not None:
            self.mgr_cluster.mon_manager.raw_cluster_cmd(
                'osd', 'pool', 'set', self.POOL, 'crush_rule', rule)

    def _osd_count(self):
        """
        Get the number of OSDs in the cluster.
        """
        osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json()
        return len(osd_map['osds'])

    def _write_some_data(self, t):
        """
        Write some data to the pool to simulate a workload.
        """

        args = [
            "rados", "-p", self.POOL, "bench", str(t), "write", "-t", "16"]

        self.mgr_cluster.admin_remote.run(args=args, wait=True)

    def _get_pg_stats(self):
        """
        Dump the cluster and get pg stats
        """
        out = self.mgr_cluster.mon_manager.raw_cluster_cmd(
                'pg', 'dump', '--format=json')
        j = json.loads('\n'.join(out.split('\n')[1:]))
        try:
            return j['pg_map']['pg_stats']
        except KeyError:
            return j['pg_stats']

    def _get_active_pg(self, pgs):
        """
        Get the number of active PGs
        """
        num_active = 0
        for pg in pgs:
            if pg['state'].count('active') and not pg['state'].count('stale'):
                num_active += 1
        return num_active

    def _get_active_clean_pg(self, pgs):
        """
        Get the number of active+clean PGs
        """
        num_active_clean = 0
        for pg in pgs:
            if (pg['state'].count('active') and
                pg['state'].count('clean') and
                    not pg['state'].count('stale')):
                num_active_clean += 1
        return num_active_clean

    def _get_acting_set(self, pgs):
        """
        Get the acting set of the PGs
        """
        acting_set = []
        for pg in pgs:
            acting_set.append(pg['acting'])
        return acting_set

    def _surviving_osds_in_acting_set_dont_exceed(self, n, osds):
        """
        Check if the acting set of the PGs doesn't contain more
        than n OSDs of the surviving DC.
        NOTE: Only call this function after we set the pool to stretch.
        """
        pgs = self._get_pg_stats()
        acting_set = self._get_acting_set(pgs)
        for acting in acting_set:
            log.debug("Acting set: %s", acting)
            intersect = list(set(acting) & set(osds))
            if len(intersect) > n:
                log.error(
                    "Acting set: %s contains more than %d \
                    OSDs from the same %s which are: %s",
                    acting, n, self.PEERING_CRUSH_BUCKET_BARRIER,
                    intersect
                )
                return False
        return True

    def _print_not_active_clean_pg(self, pgs):
        """
        Print the PGs that are not active+clean.
        """
        for pg in pgs:
            if not (pg['state'].count('active') and
                    pg['state'].count('clean') and
                    not pg['state'].count('stale')):
                log.debug(
                    "PG %s is not active+clean, but %s",
                    pg['pgid'], pg['state']
                )

    def _print_not_active_pg(self, pgs):
        """
        Print the PGs that are not active.
        """
        for pg in pgs:
            if not (pg['state'].count('active') and
                    not pg['state'].count('stale')):
                log.debug(
                    "PG %s is not active, but %s",
                    pg['pgid'], pg['state']
                )

    def _pg_all_active_clean(self):
        """
        Check if all pgs are active and clean.
        """
        pgs = self._get_pg_stats()
        result = self._get_active_clean_pg(pgs) == len(pgs)
        if result:
            log.debug("All PGs are active+clean")
        else:
            log.debug("Not all PGs are active+clean")
            self._print_not_active_clean_pg(pgs)
        return result

    def _pg_all_active(self):
        """
        Check if all pgs are active.
        """
        pgs = self._get_pg_stats()
        result = self._get_active_pg(pgs) == len(pgs)
        if result:
            log.debug("All PGs are active")
        else:
            log.debug("Not all PGs are active")
            self._print_not_active_pg(pgs)
        return result

    def _pg_all_unavailable(self):
        """
        Check if all pgs are unavailable.
        """
        pgs = self._get_pg_stats()
        return self._get_active_pg(pgs) == 0

    def _pg_partial_active(self):
        """
        Check if some pgs are active.
        """
        pgs = self._get_pg_stats()
        return 0 < self._get_active_pg(pgs) <= len(pgs)

    def _kill_osd(self, osd):
        """
        Kill the osd.
        """
        try:
            self.ctx.daemons.get_daemon('osd', osd, self.CLUSTER).stop()
        except Exception:
            log.error("Failed to stop osd.{}".format(str(osd)))
            pass

    def _get_osds_by_dc(self, dc):
        """
        Get osds by datacenter.
        """
        return [osd for _, osd in self.DC_OSDS[dc].items()]

    def _get_all_osds_from_all_dc(self):
        """
        Get all osds from all datacenters.
        """
        return [osd for nodes in self.DC_OSDS.values()
                for osd in nodes.values()]

    def _get_osds_data(self, want_osds):
        """
        Get the osd data
        """
        all_osds_data = \
            self.mgr_cluster.mon_manager.get_osd_dump_json()['osds']
        return [
            osd_data for osd_data in all_osds_data
            if int(osd_data['osd']) in want_osds
        ]

    def _get_host(self, osd):
        """
        Get the host of the osd.
        """
        for dc, nodes in self.DC_OSDS.items():
            for node, osd_id in nodes.items():
                if osd_id == osd:
                    return node
        return None

    def _move_osd_back_to_host(self, osd):
        """
        Move the osd back to the host.
        """
        host = self._get_host(osd)
        assert host is not None, "The host of osd {} is not found.".format(osd)
        log.debug("Moving osd.%d back to %s", osd, host)
        self.mgr_cluster.mon_manager.raw_cluster_cmd(
            'osd', 'crush', 'move', 'osd.{}'.format(str(osd)),
            'host={}'.format(host)
        )

    def _bring_back_osd(self, osd):
        """
        Bring back the osd.
        """
        try:
            self.ctx.daemons.get_daemon('osd', osd, self.CLUSTER).restart()
        except Exception:
            log.error("Failed to bring back osd.{}".format(str(osd)))
            pass

    def _bring_back_all_osds_in_dc(self, dc):
        """
        Bring back all osds in the specified <datacenter>
        """
        if not isinstance(dc, str):
            raise ValueError("dc must be a string")
        if dc not in self.DC_OSDS:
            raise ValueError("dc must be one of the following: %s" %
                             self.DC_OSDS.keys())
        log.debug("Bringing back %s", dc)
        osds = self._get_osds_by_dc(dc)
        # Bring back all the osds in the DC and move it back to the host.
        for osd_id in osds:
            # Bring back the osd
            self._bring_back_osd(osd_id)
            # Wait until the osd is up since we need it to be up before we can
            # move it back to the host
            self.wait_until_true(
                lambda: all([int(osd['up']) == 1
                            for osd in self._get_osds_data([osd_id])]),
                timeout=self.RECOVERY_PERIOD
            )
            # Move the osd back to the host
            self._move_osd_back_to_host(osd_id)

    def _fail_over_all_osds_in_dc(self, dc):
        """
        Fail over all osds in specified <datacenter>
        """
        if not isinstance(dc, str):
            raise ValueError("dc must be a string")
        if dc not in self.DC_OSDS:
            raise ValueError(
                "dc must be one of the following: %s" % self.DC_OSDS.keys()
                )
        log.debug("Failing over %s", dc)
        osds = self._get_osds_by_dc(dc)
        # fail over all the OSDs in the DC
        for osd_id in osds:
            self._kill_osd(osd_id)
        # wait until all the osds are down
        self.wait_until_true(
            lambda: all([int(osd['up']) == 0
                        for osd in self._get_osds_data(osds)]),
            timeout=self.RECOVERY_PERIOD
        )

    def _fail_over_one_osd_from_dc(self, dc):
        """
        Fail over one random OSD from the specified <datacenter>
        """
        if not isinstance(dc, str):
            raise ValueError("dc must be a string")
        if dc not in self.DC_OSDS:
            raise ValueError("dc must be one of the following: %s" %
                             self.DC_OSDS.keys())
        log.debug("Failing over one random OSD from %s", dc)
        # filter out failed osds
        osds_data = self._get_osds_data(self._get_osds_by_dc(dc))
        osds = [int(osd['osd']) for osd in osds_data if int(osd['up']) == 1]
        # fail over one random OSD in the DC
        osd_id = random.choice(osds)
        self._kill_osd(osd_id)
        # wait until the osd is down
        self.wait_until_true(
            lambda: int(self._get_osds_data([osd_id])[0]['up']) == 0,
            timeout=self.RECOVERY_PERIOD
        )

    def _fail_over_one_mon_from_dc(self, dc, no_wait=False):
        """
        Fail over one random mon from the specified <datacenter>
        no_wait: if True, don't wait for the mon to be out of quorum
        """
        if not isinstance(dc, str):
            raise ValueError("dc must be a string")
        if dc not in self.DC_MONS:
            raise ValueError("dc must be one of the following: %s" %
                             ", ".join(self.DC_MONS.keys()))
        log.debug("Failing over one random mon from %s", dc)
        mons = self._get_mons_by_dc(dc)
        # filter out failed mons
        mon_quorum = self.mgr_cluster.mon_manager.get_mon_quorum_names()
        mons = [mon for mon in mons if mon in mon_quorum]
        # fail over one random mon in the DC
        mon = random.choice(mons)
        self._kill_mon(mon)
        if no_wait:
            return
        else:
            # wait until the mon is out of quorum
            self.wait_until_true(
                lambda: self._check_mons_out_of_quorum([mon]),
                timeout=self.RECOVERY_PERIOD
            )

    def _fail_over_all_mons_in_dc(self, dc):
        """
        Fail over all mons in the specified <datacenter>
        """
        if not isinstance(dc, str):
            raise ValueError("dc must be a string")
        if dc not in self.DC_MONS:
            raise ValueError("dc must be one of the following: %s" %
                             ", ".join(self.DC_MONS.keys()))
        log.debug("Failing over %s", dc)
        mons = self._get_mons_by_dc(dc)
        for mon in mons:
            self._kill_mon(mon)
        # wait until all the mons are out of quorum
        self.wait_until_true(
            lambda: self._check_mons_out_of_quorum(mons),
            timeout=self.RECOVERY_PERIOD
        )

    def _kill_mon(self, mon):
        """
        Kill the mon.
        """
        try:
            self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).stop()
        except Exception:
            log.error("Failed to stop mon.{}".format(str(mon)))
            pass

    def _get_mons_by_dc(self, dc):
        """
        Get mons by datacenter.
        """
        return [mon for _, mon in self.DC_MONS[dc].items()]

    def _get_all_mons_from_all_dc(self):
        """
        Get all mons from all datacenters.
        """
        return [mon for nodes in self.DC_MONS.values()
                for mon in nodes.values()]

    def _check_mons_out_of_quorum(self, want_mons):
        """
        Check if the mons are not in quorum.
        """
        quorum_names = self.mgr_cluster.mon_manager.get_mon_quorum_names()
        return all([mon not in quorum_names for mon in want_mons])

    def _check_mons_in_quorum(self, want_mons):
        """
        Check if the mons are in quorum.
        """
        quorum_names = self.mgr_cluster.mon_manager.get_mon_quorum_names()
        return all([mon in quorum_names for mon in want_mons])

    def _check_mon_quorum_size(self, size):
        """
        Check if the mon quorum size is equal to <size>
        """
        return len(self.mgr_cluster.mon_manager.get_mon_quorum_names()) == size

    def _bring_back_mon(self, mon):
        """
        Bring back the mon.
        """
        try:
            self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).restart()
        except Exception:
            log.error("Failed to bring back mon.{}".format(str(mon)))
            pass

    def _bring_back_all_mons_in_dc(self, dc):
        """
        Bring back all mons in the specified <datacenter>
        """
        if not isinstance(dc, str):
            raise ValueError("dc must be a string")
        if dc not in self.DC_MONS:
            raise ValueError("dc must be one of the following: %s" %
                             ", ".join(self.DC_MONS.keys()))
        log.debug("Bringing back %s", dc)
        mons = self._get_mons_by_dc(dc)
        for mon in mons:
            self._bring_back_mon(mon)
        # wait until all the mons are up
        self.wait_until_true(
            lambda: self._check_mons_in_quorum(mons),
            timeout=self.RECOVERY_PERIOD
        )

    def _no_reply_to_mon_command(self):
        """
        Check if the cluster is inaccessible.
        """
        try:
            self.mgr_cluster.mon_manager.raw_cluster_cmd('status')
            return False
        except Exception:
            return True

    def test_mon_failures_in_stretch_pool(self):
        """
        Test mon failures in stretch pool.
        """
        self._setup_pool(
            self.SIZE,
            min_size=self.MIN_SIZE,
            rule=self.CRUSH_RULE
        )
        self._write_some_data(self.WRITE_PERIOD)
        # Set the pool to stretch
        self.mgr_cluster.mon_manager.raw_cluster_cmd(
            'osd', 'pool', 'stretch', 'set',
            self.POOL, str(self.PEERING_CRUSH_BUCKET_COUNT),
            str(self.PEERING_CRUSH_BUCKET_TARGET),
            self.PEERING_CRUSH_BUCKET_BARRIER,
            self.CRUSH_RULE, str(self.SIZE), str(self.MIN_SIZE))

        # SCENARIO 1: MONS in DC1 down

        # Fail over mons in DC1
        self._fail_over_all_mons_in_dc('dc1')
        # Expects mons in DC2 and DC3 to be in quorum
        mons_dc2_dc3 = (
            self._get_mons_by_dc('dc2') +
            self._get_mons_by_dc('dc3')
        )
        self.wait_until_true_and_hold(
            lambda: self._check_mons_in_quorum(mons_dc2_dc3),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )

        # SCENARIO 2: MONS in DC1 down + 1 MON in DC2 down

        # Fail over 1 random MON from DC2
        self._fail_over_one_mon_from_dc('dc2')
        # Expects quorum size to be 5
        self.wait_until_true_and_hold(
            lambda: self._check_mon_quorum_size(5),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )

        # SCENARIO 3: MONS in DC1 down + 2 MONS in DC2 down

        # Fail over 1 random MON from DC2
        self._fail_over_one_mon_from_dc('dc2', no_wait=True)
        # sleep for 30 seconds to allow the mon to be out of quorum
        sleep(30)
        # Expects cluster to be inaccesible
        self.wait_until_true(
            lambda: self._no_reply_to_mon_command(),
            timeout=self.RECOVERY_PERIOD,
        )
        # Bring back all mons in DC2 to unblock the cluster
        self._bring_back_all_mons_in_dc('dc2')
        # Expects mons in DC2 and DC3 to be in quorum
        self.wait_until_true_and_hold(
            lambda: self._check_mons_in_quorum(mons_dc2_dc3),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )

    def test_set_stretch_pool_no_active_pgs(self):
        """
        Test setting a pool to stretch cluster and checks whether
        it prevents PGs from the going active when there is not
        enough buckets available in the acting set of PGs to
        go active.
        """
        self._setup_pool(
            self.SIZE,
            min_size=self.MIN_SIZE,
            rule=self.CRUSH_RULE
        )
        self._write_some_data(self.WRITE_PERIOD)
        # 1. We test the case where we didn't make the pool stretch
        #   and we expect the PGs to go active even when there is only
        #   one bucket available in the acting set of PGs.

        # Fail over osds in DC1 expects PGs to be 100% active
        self._fail_over_all_osds_in_dc('dc1')
        self.wait_until_true_and_hold(
            lambda: self._pg_all_active(),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )
        # Fail over osds in DC2 expects PGs to be partially active
        self._fail_over_all_osds_in_dc('dc2')
        self.wait_until_true_and_hold(
            lambda: self._pg_partial_active,
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )

        # Bring back osds in DC1 expects PGs to be 100% active
        self._bring_back_all_osds_in_dc('dc1')
        self.wait_until_true_and_hold(
            lambda: self._pg_all_active(),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )
        # Bring back osds DC2 expects PGs to be 100% active+clean
        self._bring_back_all_osds_in_dc('dc2')
        self.wait_until_true_and_hold(
            lambda: self._pg_all_active_clean(),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )
        # 2. We test the case where we make the pool stretch
        #   and we expect the PGs to not go active even when there is only
        #   one bucket available in the acting set of PGs.

        # Set the pool to stretch
        self.mgr_cluster.mon_manager.raw_cluster_cmd(
            'osd', 'pool', 'stretch', 'set',
            self.POOL, str(self.PEERING_CRUSH_BUCKET_COUNT),
            str(self.PEERING_CRUSH_BUCKET_TARGET),
            self.PEERING_CRUSH_BUCKET_BARRIER,
            self.CRUSH_RULE, str(self.SIZE), str(self.MIN_SIZE))

        # Fail over osds in DC1 expects PGs to be 100% active
        self._fail_over_all_osds_in_dc('dc1')
        self.wait_until_true_and_hold(lambda: self._pg_all_active(),
                                      timeout=self.RECOVERY_PERIOD,
                                      success_hold_time=self.SUCCESS_HOLD_TIME)

        # Fail over 1 random OSD from DC2 expects PGs to be 100% active
        self._fail_over_one_osd_from_dc('dc2')
        self.wait_until_true_and_hold(lambda: self._pg_all_active(),
                                      timeout=self.RECOVERY_PERIOD,
                                      success_hold_time=self.SUCCESS_HOLD_TIME)

        # Fail over osds in DC2 completely expects PGs to be 100% inactive
        self._fail_over_all_osds_in_dc('dc2')
        self.wait_until_true_and_hold(lambda: self._pg_all_unavailable,
                                      timeout=self.RECOVERY_PERIOD,
                                      success_hold_time=self.SUCCESS_HOLD_TIME)

        # We expect that there will be no more than BUCKET_MAX osds from DC3
        # in the acting set of the PGs.
        self.wait_until_true(
            lambda: self._surviving_osds_in_acting_set_dont_exceed(
                        self.BUCKET_MAX,
                        self._get_osds_by_dc('dc3')
                    ),
            timeout=self.RECOVERY_PERIOD)

        # Bring back osds in DC1 expects PGs to be 100% active
        self._bring_back_all_osds_in_dc('dc1')
        self.wait_until_true_and_hold(
            lambda: self._pg_all_active(),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME)

        # Bring back osds iin DC2 expects PGs to be 100% active+clean
        self._bring_back_all_osds_in_dc('dc2')
        self.wait_until_true_and_hold(
            lambda: self._pg_all_active_clean(),
            timeout=self.RECOVERY_PERIOD,
            success_hold_time=self.SUCCESS_HOLD_TIME
        )