summaryrefslogtreecommitdiffstats
path: root/qa/tasks
diff options
context:
space:
mode:
Diffstat (limited to 'qa/tasks')
-rw-r--r--qa/tasks/ceph.py14
-rw-r--r--qa/tasks/ceph_manager.py90
-rw-r--r--qa/tasks/cephadm.py10
-rw-r--r--qa/tasks/cephfs/cephfs_test_case.py8
-rw-r--r--qa/tasks/cephfs/filesystem.py2
-rw-r--r--qa/tasks/cephfs/test_exports.py90
-rw-r--r--qa/tasks/cephfs/test_failover.py55
-rw-r--r--qa/tasks/cephfs/test_nfs.py51
-rw-r--r--qa/tasks/check_counter.py32
-rw-r--r--qa/tasks/kafka.py11
-rw-r--r--qa/tasks/kafka_failover.py244
-rw-r--r--qa/tasks/mgr/dashboard/helper.py4
-rw-r--r--qa/tasks/mgr/dashboard/test_mgr_module.py4
-rw-r--r--qa/tasks/mgr/dashboard/test_rbd.py12
-rw-r--r--qa/tasks/mgr/dashboard/test_rgw.py4
-rw-r--r--qa/tasks/mgr/mgr_test_case.py19
-rw-r--r--qa/tasks/notification_tests.py2
-rw-r--r--qa/tasks/nvme_loop.py111
-rw-r--r--qa/tasks/nvmeof.py38
-rw-r--r--qa/tasks/radosgw_admin.py35
-rw-r--r--qa/tasks/rgw_multisite.py2
-rw-r--r--qa/tasks/rook.py6
-rw-r--r--qa/tasks/s3a_hadoop.py16
-rw-r--r--qa/tasks/s3tests.py26
-rw-r--r--qa/tasks/s3tests_java.py1
-rw-r--r--qa/tasks/stretch_mode_disable_enable.py547
-rw-r--r--qa/tasks/thrashosds-health.yaml1
-rw-r--r--qa/tasks/vstart_runner.py8
28 files changed, 1384 insertions, 59 deletions
diff --git a/qa/tasks/ceph.py b/qa/tasks/ceph.py
index 9b04e3dc675..8f666d2fa9b 100644
--- a/qa/tasks/ceph.py
+++ b/qa/tasks/ceph.py
@@ -1206,8 +1206,18 @@ def cluster(ctx, config):
args.extend([
run.Raw('|'), 'head', '-n', '1',
])
- stdout = mon0_remote.sh(args)
- return stdout or None
+ r = mon0_remote.run(
+ stdout=BytesIO(),
+ args=args,
+ stderr=StringIO(),
+ )
+ stdout = r.stdout.getvalue().decode()
+ if stdout:
+ return stdout
+ stderr = r.stderr.getvalue()
+ if stderr:
+ return stderr
+ return None
if first_in_ceph_log('\[ERR\]|\[WRN\]|\[SEC\]',
config['log_ignorelist']) is not None:
diff --git a/qa/tasks/ceph_manager.py b/qa/tasks/ceph_manager.py
index 7005c8db0ff..57d22f3b5e6 100644
--- a/qa/tasks/ceph_manager.py
+++ b/qa/tasks/ceph_manager.py
@@ -2796,6 +2796,59 @@ class CephManager:
num += 1
return num
+ def _print_not_active_clean_pg(self, pgs):
+ """
+ Print the PGs that are not active+clean.
+ """
+ for pg in pgs:
+ if not (pg['state'].count('active') and
+ pg['state'].count('clean') and
+ not pg['state'].count('stale')):
+ log.debug(
+ "PG %s is not active+clean, but %s",
+ pg['pgid'], pg['state']
+ )
+
+ def pg_all_active_clean(self):
+ """
+ Check if all pgs are active+clean
+ return: True if all pgs are active+clean else False
+ """
+ pgs = self.get_pg_stats()
+ result = self._get_num_active_clean(pgs) == len(pgs)
+ if result:
+ log.debug("All PGs are active+clean")
+ else:
+ log.debug("Not all PGs are active+clean")
+ self._print_not_active_clean_pg(pgs)
+ return result
+
+ def _print_not_active_pg(self, pgs):
+ """
+ Print the PGs that are not active.
+ """
+ for pg in pgs:
+ if not (pg['state'].count('active')
+ and not pg['state'].count('stale')):
+ log.debug(
+ "PG %s is not active, but %s",
+ pg['pgid'], pg['state']
+ )
+
+ def pg_all_active(self):
+ """
+ Check if all pgs are active
+ return: True if all pgs are active else False
+ """
+ pgs = self.get_pg_stats()
+ result = self._get_num_active(pgs) == len(pgs)
+ if result:
+ log.debug("All PGs are active")
+ else:
+ log.debug("Not all PGs are active")
+ self._print_not_active_pg(pgs)
+ return result
+
def is_clean(self):
"""
True if all pgs are clean
@@ -3237,6 +3290,26 @@ class CephManager:
self.make_admin_daemon_dir(remote)
self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
+ def get_crush_rule_id(self, crush_rule_name):
+ """
+ Get crush rule id by name
+ :returns: int -- crush rule id
+ """
+ out = self.raw_cluster_cmd('osd', 'crush', 'rule', 'dump', '--format=json')
+ j = json.loads('\n'.join(out.split('\n')[1:]))
+ for rule in j:
+ if rule['rule_name'] == crush_rule_name:
+ return rule['rule_id']
+ assert False, 'rule %s not found' % crush_rule_name
+
+ def get_mon_dump_json(self):
+ """
+ mon dump --format=json converted to a python object
+ :returns: the python object
+ """
+ out = self.raw_cluster_cmd('mon', 'dump', '--format=json')
+ return json.loads('\n'.join(out.split('\n')[1:]))
+
def get_mon_status(self, mon):
"""
Extract all the monitor status information from the cluster
@@ -3340,6 +3413,23 @@ class CephManager:
self.log(task_status)
return task_status
+ # Stretch mode related functions
+ def is_degraded_stretch_mode(self):
+ """
+ Return whether the cluster is in degraded stretch mode
+ """
+ try:
+ osdmap = self.get_osd_dump_json()
+ stretch_mode = osdmap.get('stretch_mode', {})
+ degraded_stretch_mode = stretch_mode.get('degraded_stretch_mode', 0)
+ self.log("is_degraded_stretch_mode: {0}".format(degraded_stretch_mode))
+ return degraded_stretch_mode == 1
+ except (TypeError, AttributeError) as e:
+ # Log the error or handle it as needed
+ self.log("Error accessing degraded_stretch_mode: {0}".format(e))
+ return False
+
+
def utility_task(name):
"""
Generate ceph_manager subtask corresponding to ceph_manager
diff --git a/qa/tasks/cephadm.py b/qa/tasks/cephadm.py
index dab61c2c700..0cde6050718 100644
--- a/qa/tasks/cephadm.py
+++ b/qa/tasks/cephadm.py
@@ -475,12 +475,16 @@ def ceph_log(ctx, config):
run.Raw('|'), 'head', '-n', '1',
])
r = ctx.ceph[cluster_name].bootstrap_remote.run(
- stdout=StringIO(),
+ stdout=BytesIO(),
args=args,
+ stderr=StringIO(),
)
- stdout = r.stdout.getvalue()
- if stdout != '':
+ stdout = r.stdout.getvalue().decode()
+ if stdout:
return stdout
+ stderr = r.stderr.getvalue()
+ if stderr:
+ return stderr
return None
# NOTE: technically the first and third arg to first_in_ceph_log
diff --git a/qa/tasks/cephfs/cephfs_test_case.py b/qa/tasks/cephfs/cephfs_test_case.py
index c1312ec5efc..21b96d2b22b 100644
--- a/qa/tasks/cephfs/cephfs_test_case.py
+++ b/qa/tasks/cephfs/cephfs_test_case.py
@@ -252,8 +252,8 @@ class CephFSTestCase(CephTestCase):
def get_session_data(self, client_id):
return self._session_by_id(client_id)
- def _session_list(self):
- ls_data = self.fs.mds_asok(['session', 'ls'])
+ def _session_list(self, rank=None, status=None):
+ ls_data = self.fs.rank_asok(['session', 'ls'], rank=rank, status=status)
ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
return ls_data
@@ -269,9 +269,9 @@ class CephFSTestCase(CephTestCase):
def perf_dump(self, rank=None, status=None):
return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status)
- def wait_until_evicted(self, client_id, timeout=30):
+ def wait_until_evicted(self, client_id, rank=None, timeout=30):
def is_client_evicted():
- ls = self._session_list()
+ ls = self._session_list(rank=rank)
for s in ls:
if s['id'] == client_id:
return False
diff --git a/qa/tasks/cephfs/filesystem.py b/qa/tasks/cephfs/filesystem.py
index 2b7fd2ee569..3846ef23f97 100644
--- a/qa/tasks/cephfs/filesystem.py
+++ b/qa/tasks/cephfs/filesystem.py
@@ -649,6 +649,8 @@ class FilesystemBase(MDSClusterBase):
def set_session_timeout(self, timeout):
self.set_var("session_timeout", "%d" % timeout)
+ def set_session_autoclose(self, autoclose_time):
+ self.set_var("session_autoclose", "%d" % autoclose_time)
def set_allow_standby_replay(self, yes):
self.set_var("allow_standby_replay", yes)
diff --git a/qa/tasks/cephfs/test_exports.py b/qa/tasks/cephfs/test_exports.py
index e5ad18dd662..468378fce3d 100644
--- a/qa/tasks/cephfs/test_exports.py
+++ b/qa/tasks/cephfs/test_exports.py
@@ -153,6 +153,8 @@ class TestExportPin(CephFSTestCase):
# vstart.sh sets mds_debug_subtrees to True. That causes a ESubtreeMap
# to be written out every event. Yuck!
self.config_set('mds', 'mds_debug_subtrees', False)
+ # make sure ESubtreeMap is written frequently enough:
+ self.config_set('mds', 'mds_log_minor_segments_per_major_segment', '4')
self.config_rm('mds', 'mds bal split size') # don't split /top
self.mount_a.run_shell_payload("rm -rf 1")
@@ -724,3 +726,91 @@ class TestDumpExportStates(CephFSTestCase):
self._test_freeze_tree(state, 0)
self.assertTrue(type(state['notify_ack_waiting']) is list)
+
+class TestKillExports(CephFSTestCase):
+ MDSS_REQUIRED = 2
+ CLIENTS_REQUIRED = 1
+
+ def setUp(self):
+ CephFSTestCase.setUp(self)
+
+ self.fs.set_max_mds(self.MDSS_REQUIRED)
+ self.status = self.fs.wait_for_daemons()
+
+ self.mount_a.run_shell_payload('mkdir -p test/export')
+
+ def tearDown(self):
+ super().tearDown()
+
+ def _kill_export_as(self, rank, kill):
+ self.fs.rank_asok(['config', 'set', 'mds_kill_export_at', str(kill)], rank=rank, status=self.status)
+
+ def _export_dir(self, path, source, target):
+ self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status)
+
+ def _wait_failover(self):
+ self.wait_until_true(lambda: self.fs.status().hadfailover(self.status), timeout=self.fs.beacon_timeout)
+
+ def _clear_coredump(self, rank):
+ crash_rank = self.fs.get_rank(rank=rank, status=self.status)
+ self.delete_mds_coredump(crash_rank['name'])
+
+ def _run_kill_export(self, kill_at, exporter_rank=0, importer_rank=1, restart=True):
+ self._kill_export_as(exporter_rank, kill_at)
+ self._export_dir("/test", exporter_rank, importer_rank)
+ self._wait_failover()
+ self._clear_coredump(exporter_rank)
+
+ if restart:
+ self.fs.rank_restart(rank=exporter_rank, status=self.status)
+ self.status = self.fs.wait_for_daemons()
+
+ def test_session_cleanup(self):
+ """
+ Test importer's session cleanup after an export subtree task is interrupted.
+ Set 'mds_kill_export_at' to 9 or 10 so that the importer will wait for the exporter
+ to restart while the state is 'acking'.
+
+ See https://tracker.ceph.com/issues/61459
+ """
+
+ kill_export_at = [9, 10]
+
+ exporter_rank = 0
+ importer_rank = 1
+
+ for kill in kill_export_at:
+ log.info(f"kill_export_at: {kill}")
+ self._run_kill_export(kill, exporter_rank, importer_rank)
+
+ if len(self._session_list(importer_rank, self.status)) > 0:
+ client_id = self.mount_a.get_global_id()
+ self.fs.rank_asok(['session', 'evict', "%s" % client_id], rank=importer_rank, status=self.status)
+
+ # timeout if buggy
+ self.wait_until_evicted(client_id, importer_rank)
+
+ # for multiple tests
+ self.mount_a.remount()
+
+ def test_client_eviction(self):
+ # modify the timeout so that we don't have to wait too long
+ timeout = 30
+ self.fs.set_session_timeout(timeout)
+ self.fs.set_session_autoclose(timeout + 5)
+
+ kill_export_at = [9, 10]
+
+ exporter_rank = 0
+ importer_rank = 1
+
+ for kill in kill_export_at:
+ log.info(f"kill_export_at: {kill}")
+ self._run_kill_export(kill, exporter_rank, importer_rank)
+
+ client_id = self.mount_a.get_global_id()
+ self.wait_until_evicted(client_id, importer_rank, timeout + 10)
+ time.sleep(1)
+
+ # failed if buggy
+ self.mount_a.ls()
diff --git a/qa/tasks/cephfs/test_failover.py b/qa/tasks/cephfs/test_failover.py
index 29af1e76a4f..46139163ddd 100644
--- a/qa/tasks/cephfs/test_failover.py
+++ b/qa/tasks/cephfs/test_failover.py
@@ -1,3 +1,4 @@
+import re
import time
import signal
import logging
@@ -342,6 +343,60 @@ class TestClusterResize(CephFSTestCase):
self.fs.wait_for_daemons(timeout=90)
+class TestFailoverBeaconHealth(CephFSTestCase):
+ CLIENTS_REQUIRED = 1
+ MDSS_REQUIRED = 1
+
+ def initiate_journal_replay(self, num_files=100):
+ """ Initiate journal replay by creating files and restarting mds server."""
+
+ self.config_set("mds", "mds_delay_journal_replay_for_testing", "5000")
+ self.mounts[0].test_files = [str(x) for x in range(num_files)]
+ self.mounts[0].create_files()
+ self.fs.fail()
+ self.fs.set_joinable()
+
+ def test_replay_beacon_estimated_time(self):
+ """
+ That beacon emits warning message with estimated time to complete replay
+ """
+ self.initiate_journal_replay()
+ self.wait_for_health("MDS_ESTIMATED_REPLAY_TIME", 60)
+ # remove the config so that replay finishes and the cluster
+ # is HEALTH_OK
+ self.config_rm("mds", "mds_delay_journal_replay_for_testing")
+ self.wait_for_health_clear(timeout=60)
+
+ def test_replay_estimated_time_accuracy(self):
+ self.initiate_journal_replay(250)
+ def replay_complete():
+ health = self.ceph_cluster.mon_manager.get_mon_health(debug=False, detail=True)
+ codes = [s for s in health['checks']]
+ return 'MDS_ESTIMATED_REPLAY_TIME' not in codes
+
+ def get_estimated_time():
+ completion_percentage = 0.0
+ time_duration = pending_duration = 0
+ with safe_while(sleep=5, tries=360) as proceed:
+ while proceed():
+ health = self.ceph_cluster.mon_manager.get_mon_health(debug=False, detail=True)
+ codes = [s for s in health['checks']]
+ if 'MDS_ESTIMATED_REPLAY_TIME' in codes:
+ message = health['checks']['MDS_ESTIMATED_REPLAY_TIME']['detail'][0]['message']
+ ### sample warning string: "mds.a(mds.0): replay: 50.0446% complete - elapsed time: 582s, estimated time remaining: 581s"
+ m = re.match(".* replay: (\d+(\.\d+)?)% complete - elapsed time: (\d+)s, estimated time remaining: (\d+)s", message)
+ if not m:
+ continue
+ completion_percentage = float(m.group(1))
+ time_duration = int(m.group(3))
+ pending_duration = int(m.group(4))
+ log.debug(f"MDS_ESTIMATED_REPLAY_TIME is present in health: {message}, duration: {time_duration}, completion_percentage: {completion_percentage}")
+ if completion_percentage >= 50:
+ return (completion_percentage, time_duration, pending_duration)
+ _, _, pending_duration = get_estimated_time()
+ # wait for 25% more time to avoid false negative failures
+ self.wait_until_true(replay_complete, timeout=pending_duration * 1.25)
+
class TestFailover(CephFSTestCase):
CLIENTS_REQUIRED = 1
MDSS_REQUIRED = 2
diff --git a/qa/tasks/cephfs/test_nfs.py b/qa/tasks/cephfs/test_nfs.py
index faa35be6926..0a1c07dce04 100644
--- a/qa/tasks/cephfs/test_nfs.py
+++ b/qa/tasks/cephfs/test_nfs.py
@@ -369,6 +369,45 @@ class TestNFS(MgrTestCase):
except CommandFailedError as e:
self.fail(f"expected read/write of a file to be successful but failed with {e.exitstatus}")
+ def _mnt_nfs(self, pseudo_path, port, ip):
+ '''
+ Mount created export
+ :param pseudo_path: It is the pseudo root name
+ :param port: Port of deployed nfs cluster
+ :param ip: IP of deployed nfs cluster
+ '''
+ tries = 3
+ while True:
+ try:
+ self.ctx.cluster.run(
+ args=['sudo', 'mount', '-t', 'nfs', '-o', f'port={port}',
+ f'{ip}:{pseudo_path}', '/mnt'])
+ break
+ except CommandFailedError:
+ if tries:
+ tries -= 1
+ time.sleep(2)
+ continue
+ raise
+
+ self.ctx.cluster.run(args=['sudo', 'chmod', '1777', '/mnt'])
+
+ def _test_fio(self, pseudo_path, port, ip):
+ '''
+ run fio with libaio on /mnt/fio
+ :param mnt_path: nfs mount point
+ '''
+ try:
+ self._mnt_nfs(pseudo_path, port, ip)
+ self.ctx.cluster.run(args=['mkdir', '/mnt/fio'])
+ fio_cmd=['sudo', 'fio', '--ioengine=libaio', '-directory=/mnt/fio', '--filename=fio.randrw.test', '--name=job', '--bs=16k', '--direct=1', '--group_reporting', '--iodepth=128', '--randrepeat=0', '--norandommap=1', '--thread=2', '--ramp_time=20s', '--offset_increment=5%', '--size=5G', '--time_based', '--runtime=300', '--ramp_time=1s', '--percentage_random=0', '--rw=randrw', '--rwmixread=50']
+ self.ctx.cluster.run(args=fio_cmd)
+ except CommandFailedError as e:
+ self.fail(f"expected fio to be successful but failed with {e.exitstatus}")
+ finally:
+ self.ctx.cluster.run(args=['sudo', 'rm', '-rf', '/mnt/fio'])
+ self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
+
def _write_to_read_only_export(self, pseudo_path, port, ip):
'''
Check if write to read only export fails
@@ -627,6 +666,18 @@ class TestNFS(MgrTestCase):
self._test_data_read_write(self.pseudo_path, port, ip)
self._test_delete_cluster()
+ def test_async_io_fio(self):
+ '''
+ Test async io using fio. Expect completion without hang or crash
+ '''
+ self._test_create_cluster()
+ self._create_export(export_id='1', create_fs=True,
+ extra_cmd=['--pseudo-path', self.pseudo_path])
+ port, ip = self._get_port_ip_info()
+ self._check_nfs_cluster_status('running', 'NFS Ganesha cluster restart failed')
+ self._test_fio(self.pseudo_path, port, ip)
+ self._test_delete_cluster()
+
def test_cluster_info(self):
'''
Test cluster info outputs correct ip and hostname
diff --git a/qa/tasks/check_counter.py b/qa/tasks/check_counter.py
index 40818f3f475..1f63b6a0bd4 100644
--- a/qa/tasks/check_counter.py
+++ b/qa/tasks/check_counter.py
@@ -1,11 +1,14 @@
import logging
import json
+import errno
from teuthology.task import Task
from teuthology import misc
from tasks import ceph_manager
+from tasks.cephfs.filesystem import MDSCluster
+from teuthology.exceptions import CommandFailedError
log = logging.getLogger(__name__)
@@ -61,6 +64,9 @@ class CheckCounter(Task):
mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=self.ctx, logger=log.getChild('ceph_manager'))
active_mgr = json.loads(mon_manager.raw_cluster_cmd("mgr", "dump", "--format=json-pretty"))["active_name"]
+ mds_cluster = MDSCluster(self.ctx)
+ status = mds_cluster.status()
+
for daemon_type, counters in targets.items():
# List of 'a', 'b', 'c'...
daemon_ids = list(misc.all_roles_of_type(self.ctx.cluster, daemon_type))
@@ -80,13 +86,31 @@ class CheckCounter(Task):
else:
log.debug("Getting stats from {0}".format(daemon_id))
- manager = self.ctx.managers[cluster_name]
- proc = manager.admin_socket(daemon_type, daemon_id, ["perf", "dump"])
- response_data = proc.stdout.getvalue().strip()
+ if daemon_type == 'mds':
+ mds_info = status.get_mds(daemon_id)
+ if not mds_info:
+ continue
+ mds = f"mds.{mds_info['gid']}"
+ if mds_info['state'] != "up:active":
+ log.debug(f"skipping {mds}")
+ continue
+ log.debug(f"Getting stats from {mds}")
+ try:
+ proc = mon_manager.raw_cluster_cmd("tell", mds, "perf", "dump",
+ "--format=json-pretty")
+ response_data = proc.strip()
+ except CommandFailedError as e:
+ if e.exitstatus == errno.ENOENT:
+ log.debug(f"Failed to do 'perf dump' on {mds}")
+ continue
+ else:
+ manager = self.ctx.managers[cluster_name]
+ proc = manager.admin_socket(daemon_type, daemon_id, ["perf", "dump"])
+ response_data = proc.stdout.getvalue().strip()
if response_data:
perf_dump = json.loads(response_data)
else:
- log.warning("No admin socket response from {0}, skipping".format(daemon_id))
+ log.warning("No response from {0}, skipping".format(daemon_id))
continue
minval = ''
diff --git a/qa/tasks/kafka.py b/qa/tasks/kafka.py
index 5e6c208ca30..833f03babf6 100644
--- a/qa/tasks/kafka.py
+++ b/qa/tasks/kafka.py
@@ -4,6 +4,7 @@ Deploy and configure Kafka for Teuthology
import contextlib
import logging
import time
+import os
from teuthology import misc as teuthology
from teuthology import contextutil
@@ -33,6 +34,13 @@ def install_kafka(ctx, config):
assert isinstance(config, dict)
log.info('Installing Kafka...')
+ # programmatically find a nearby mirror so as not to hammer archive.apache.org
+ apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \
+ "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1"
+ log.info("determining apache mirror by running: " + apache_mirror_cmd)
+ apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/)
+ log.info("chosen apache mirror is " + apache_mirror_url_front)
+
for (client, _) in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
test_dir=teuthology.get_testdir(ctx)
@@ -40,7 +48,8 @@ def install_kafka(ctx, config):
kafka_file = kafka_prefix + current_version + '.tgz'
- link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/' + kafka_file
+ link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \
+ current_version + '/' + kafka_file
ctx.cluster.only(client).run(
args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
)
diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py
new file mode 100644
index 00000000000..3ca60ab84fc
--- /dev/null
+++ b/qa/tasks/kafka_failover.py
@@ -0,0 +1,244 @@
+"""
+Deploy and configure Kafka for Teuthology
+"""
+import contextlib
+import logging
+import time
+import os
+
+from teuthology import misc as teuthology
+from teuthology import contextutil
+from teuthology.orchestra import run
+
+log = logging.getLogger(__name__)
+
+def get_kafka_version(config):
+ for client, client_config in config.items():
+ if 'kafka_version' in client_config:
+ kafka_version = client_config.get('kafka_version')
+ return kafka_version
+
+kafka_prefix = 'kafka_2.13-'
+
+def get_kafka_dir(ctx, config):
+ kafka_version = get_kafka_version(config)
+ current_version = kafka_prefix + kafka_version
+ return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version)
+
+
+@contextlib.contextmanager
+def install_kafka(ctx, config):
+ """
+ Downloading the kafka tar file.
+ """
+ assert isinstance(config, dict)
+ log.info('Installing Kafka...')
+
+ # programmatically find a nearby mirror so as not to hammer archive.apache.org
+ apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \
+ "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1"
+ log.info("determining apache mirror by running: " + apache_mirror_cmd)
+ apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/)
+ log.info("chosen apache mirror is " + apache_mirror_url_front)
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ test_dir=teuthology.get_testdir(ctx)
+ current_version = get_kafka_version(config)
+
+ kafka_file = kafka_prefix + current_version + '.tgz'
+
+ link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \
+ current_version + '/' + kafka_file
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file],
+ )
+
+ kafka_dir = get_kafka_dir(ctx, config)
+ # create config for second broker
+ second_broker_config_name = "server2.properties"
+ second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+ second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/")
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'),
+ 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir)
+ ],
+ )
+
+ # edit config
+ ctx.cluster.only(client).run(
+ args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'),
+ 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name)
+ ]
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Removing packaged dependencies of Kafka...')
+ test_dir=get_kafka_dir(ctx, config)
+ current_version = get_kafka_version(config)
+ for (client,_) in config.items():
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', test_dir],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)],
+ )
+
+
+@contextlib.contextmanager
+def run_kafka(ctx,config):
+ """
+ This includes two parts:
+ 1. Starting Zookeeper service
+ 2. Starting Kafka service
+ """
+ assert isinstance(config, dict)
+ log.info('Bringing up Zookeeper and Kafka services...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ kafka_dir = get_kafka_dir(ctx, config)
+
+ second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir)
+ second_broker_java_log_dir = "{}/java_logs".format(second_broker_data)
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './zookeeper-server-start.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=kafka_dir),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ './kafka-server-start.sh',
+ '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'),
+ run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)),
+ './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir),
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ log.info('Stopping Zookeeper and Kafka Services...')
+
+ for (client, _) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-server-stop.sh',
+ '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(
+ args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './zookeeper-server-stop.sh',
+ '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)),
+ ],
+ )
+
+ time.sleep(5)
+
+ ctx.cluster.only(client).run(args=['killall', '-9', 'java'])
+
+
+@contextlib.contextmanager
+def run_admin_cmds(ctx,config):
+ """
+ Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic.
+ """
+ assert isinstance(config, dict)
+ log.info('Checking kafka server through producer/consumer commands...')
+ for (client,_) in config.items():
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-topics.sh', '--create', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ 'echo', "First", run.Raw('|'),
+ './kafka-console-producer.sh', '--topic', 'quickstart-events',
+ '--bootstrap-server', 'localhost:9092'
+ ],
+ )
+
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'),
+ './kafka-console-consumer.sh', '--topic', 'quickstart-events',
+ '--from-beginning',
+ '--bootstrap-server', 'localhost:9092',
+ run.Raw('&'), 'exit'
+ ],
+ )
+
+ try:
+ yield
+ finally:
+ pass
+
+
+@contextlib.contextmanager
+def task(ctx,config):
+ """
+ Following is the way how to run kafka::
+ tasks:
+ - kafka:
+ client.0:
+ kafka_version: 2.6.0
+ """
+ assert config is None or isinstance(config, list) \
+ or isinstance(config, dict), \
+ "task kafka only supports a list or dictionary for configuration"
+
+ all_clients = ['client.{id}'.format(id=id_)
+ for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
+ if config is None:
+ config = all_clients
+ if isinstance(config, list):
+ config = dict.fromkeys(config)
+
+ log.debug('Kafka config is %s', config)
+
+ with contextutil.nested(
+ lambda: install_kafka(ctx=ctx, config=config),
+ lambda: run_kafka(ctx=ctx, config=config),
+ lambda: run_admin_cmds(ctx=ctx, config=config),
+ ):
+ yield
+
diff --git a/qa/tasks/mgr/dashboard/helper.py b/qa/tasks/mgr/dashboard/helper.py
index e6a7c35a23d..55355048a36 100644
--- a/qa/tasks/mgr/dashboard/helper.py
+++ b/qa/tasks/mgr/dashboard/helper.py
@@ -220,13 +220,11 @@ class DashboardTestCase(MgrTestCase):
# To avoid any issues with e.g. unlink bugs, we destroy and recreate
# the filesystem rather than just doing a rm -rf of files
- cls.mds_cluster.mds_stop()
- cls.mds_cluster.mds_fail()
cls.mds_cluster.delete_all_filesystems()
+ cls.mds_cluster.mds_restart() # to reset any run-time configs, etc.
cls.fs = None # is now invalid!
cls.fs = cls.mds_cluster.newfs(create=True)
- cls.fs.mds_restart()
# In case some test messed with auth caps, reset them
# pylint: disable=not-an-iterable
diff --git a/qa/tasks/mgr/dashboard/test_mgr_module.py b/qa/tasks/mgr/dashboard/test_mgr_module.py
index d6a368905b6..1dbdef23d34 100644
--- a/qa/tasks/mgr/dashboard/test_mgr_module.py
+++ b/qa/tasks/mgr/dashboard/test_mgr_module.py
@@ -4,6 +4,7 @@ from __future__ import absolute_import
import logging
import requests
+from urllib3.exceptions import MaxRetryError
from .helper import (DashboardTestCase, JLeaf, JList, JObj,
module_options_object_schema, module_options_schema,
@@ -24,10 +25,11 @@ class MgrModuleTestCase(DashboardTestCase):
def _check_connection():
try:
# Try reaching an API endpoint successfully.
+ logger.info('Trying to reach the REST API endpoint')
self._get('/api/mgr/module')
if self._resp.status_code == 200:
return True
- except requests.ConnectionError:
+ except (MaxRetryError, requests.ConnectionError):
pass
return False
diff --git a/qa/tasks/mgr/dashboard/test_rbd.py b/qa/tasks/mgr/dashboard/test_rbd.py
index a872645e33e..83b3bf520c2 100644
--- a/qa/tasks/mgr/dashboard/test_rbd.py
+++ b/qa/tasks/mgr/dashboard/test_rbd.py
@@ -869,7 +869,19 @@ class RbdTest(DashboardTestCase):
self.assertEqual(clone_format_version, 2)
self.assertStatus(200)
+ # if empty list is sent, then the config will remain as it is
value = []
+ res = [{'section': "global", 'value': "2"}]
+ self._post('/api/cluster_conf', {
+ 'name': config_name,
+ 'value': value
+ })
+ self.wait_until_equal(
+ lambda: _get_config_by_name(config_name),
+ res,
+ timeout=60)
+
+ value = [{'section': "global", 'value': ""}]
self._post('/api/cluster_conf', {
'name': config_name,
'value': value
diff --git a/qa/tasks/mgr/dashboard/test_rgw.py b/qa/tasks/mgr/dashboard/test_rgw.py
index 5c7b0329675..a9071bc2a3a 100644
--- a/qa/tasks/mgr/dashboard/test_rgw.py
+++ b/qa/tasks/mgr/dashboard/test_rgw.py
@@ -785,7 +785,7 @@ class RgwUserSubuserTest(RgwTestCase):
'access': 'readwrite',
'key_type': 'swift'
})
- self.assertStatus(200)
+ self.assertStatus(201)
data = self.jsonBody()
subuser = self.find_object_in_list('id', 'teuth-test-user:tux', data)
self.assertIsInstance(subuser, object)
@@ -808,7 +808,7 @@ class RgwUserSubuserTest(RgwTestCase):
'access_key': 'yyy',
'secret_key': 'xxx'
})
- self.assertStatus(200)
+ self.assertStatus(201)
data = self.jsonBody()
subuser = self.find_object_in_list('id', 'teuth-test-user:hugo', data)
self.assertIsInstance(subuser, object)
diff --git a/qa/tasks/mgr/mgr_test_case.py b/qa/tasks/mgr/mgr_test_case.py
index 9032e0e2658..4a5506391f2 100644
--- a/qa/tasks/mgr/mgr_test_case.py
+++ b/qa/tasks/mgr/mgr_test_case.py
@@ -1,5 +1,6 @@
import json
import logging
+import socket
from unittest import SkipTest
@@ -229,15 +230,22 @@ class MgrTestCase(CephTestCase):
"""
# Start handing out ports well above Ceph's range.
assign_port = min_port
+ ip_addr = cls.mgr_cluster.get_mgr_map()['active_addr'].split(':')[0]
for mgr_id in cls.mgr_cluster.mgr_ids:
cls.mgr_cluster.mgr_stop(mgr_id)
cls.mgr_cluster.mgr_fail(mgr_id)
+
for mgr_id in cls.mgr_cluster.mgr_ids:
- log.debug("Using port {0} for {1} on mgr.{2}".format(
- assign_port, module_name, mgr_id
- ))
+ # Find a port that isn't in use
+ while True:
+ if not cls.is_port_in_use(ip_addr, assign_port):
+ break
+ log.debug(f"Port {assign_port} in use, trying next")
+ assign_port += 1
+
+ log.debug(f"Using port {assign_port} for {module_name} on mgr.{mgr_id}")
cls.mgr_cluster.set_module_localized_conf(module_name, mgr_id,
config_name,
str(assign_port),
@@ -255,3 +263,8 @@ class MgrTestCase(CephTestCase):
mgr_map['active_name'], mgr_map['active_gid']))
return done
cls.wait_until_true(is_available, timeout=30)
+
+ @classmethod
+ def is_port_in_use(cls, ip_addr: str, port: int) -> bool:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ return s.connect_ex((ip_addr, port)) == 0
diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py
index b4697a6f797..f1eae3c89c4 100644
--- a/qa/tasks/notification_tests.py
+++ b/qa/tasks/notification_tests.py
@@ -220,7 +220,7 @@ def run_tests(ctx, config):
for client, client_config in config.items():
(remote,) = ctx.cluster.only(client).remotes.keys()
- attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
+ attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"]
if 'extra_attr' in client_config:
attr = client_config.get('extra_attr')
diff --git a/qa/tasks/nvme_loop.py b/qa/tasks/nvme_loop.py
index fef270ea085..fdec467a16d 100644
--- a/qa/tasks/nvme_loop.py
+++ b/qa/tasks/nvme_loop.py
@@ -70,7 +70,7 @@ def task(ctx, config):
remote.run(args=['lsblk'], stdout=StringIO())
p = remote.run(args=['sudo', 'nvme', 'list', '-o', 'json'], stdout=StringIO())
new_devs = []
- # `nvme list -o json` will return the following output:
+ # `nvme list -o json` will return one of the following output:
'''{
"Devices" : [
{
@@ -91,13 +91,112 @@ def task(ctx, config):
}
]
}'''
+ '''{
+ "Devices":[
+ {
+ "HostNQN":"nqn.2014-08.org.nvmexpress:uuid:00000000-0000-0000-0000-0cc47ada6ba4",
+ "HostID":"898a0e10-da2d-4a42-8017-d9c445089d0c",
+ "Subsystems":[
+ {
+ "Subsystem":"nvme-subsys0",
+ "SubsystemNQN":"nqn.2014.08.org.nvmexpress:80868086CVFT623300LN400BGN INTEL SSDPEDMD400G4",
+ "Controllers":[
+ {
+ "Controller":"nvme0",
+ "Cntlid":"0",
+ "SerialNumber":"CVFT623300LN400BGN",
+ "ModelNumber":"INTEL SSDPEDMD400G4",
+ "Firmware":"8DV101H0",
+ "Transport":"pcie",
+ "Address":"0000:02:00.0",
+ "Slot":"2",
+ "Namespaces":[
+ {
+ "NameSpace":"nvme0n1",
+ "Generic":"ng0n1",
+ "NSID":1,
+ "UsedBytes":400088457216,
+ "MaximumLBA":781422768,
+ "PhysicalSize":400088457216,
+ "SectorSize":512
+ }
+ ],
+ "Paths":[
+ ]
+ }
+ ],
+ "Namespaces":[
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ '''
+ '''{
+ "Devices":[
+ {
+ "HostNQN":"nqn.2014-08.org.nvmexpress:uuid:00000000-0000-0000-0000-0cc47ada6ba4",
+ "HostID":"898a0e10-da2d-4a42-8017-d9c445089d0c",
+ "Subsystems":[
+ {
+ "Subsystem":"nvme-subsys0",
+ "SubsystemNQN":"nqn.2014.08.org.nvmexpress:80868086CVFT534400C2400BGN INTEL SSDPEDMD400G4",
+ "Controllers":[
+ {
+ "Controller":"nvme0",
+ "Cntlid":"0",
+ "SerialNumber":"CVFT534400C2400BGN",
+ "ModelNumber":"INTEL SSDPEDMD400G4",
+ "Firmware":"8DV101H0",
+ "Transport":"pcie",
+ "Address":"0000:02:00.0",
+ "Slot":"2",
+ "Namespaces":[
+ {
+ "NameSpace":"nvme0n1",
+ "Generic":"ng0n1",
+ "NSID":1,
+ "UsedBytes":400088457216,
+ "MaximumLBA":781422768,
+ "PhysicalSize":400088457216,
+ "SectorSize":512
+ }
+ ],
+ "Paths":[
+ ]
+ }
+ ],
+ "Namespaces":[
+ ]
+ }
+ ]
+ }
+ ]
+ }
+ '''
nvme_list = json.loads(p.stdout.getvalue())
for device in nvme_list['Devices']:
- dev = device['DevicePath']
- vendor = device['ModelNumber']
- if dev.startswith('/dev/') and vendor == 'Linux':
- new_devs.append(dev)
- bluestore_zap(remote, dev)
+ try:
+ # first try format 1 / older format
+ dev = device['DevicePath']
+ vendor = device['ModelNumber']
+ if dev.startswith('/dev/') and vendor == 'Linux':
+ new_devs.append(dev)
+ bluestore_zap(remote, dev)
+ except KeyError:
+ for subsystem in device['Subsystems']:
+ # format 2
+ if 'Namespaces' in subsystem and subsystem['Namespaces']:
+ dev = '/dev/' + subsystem['Namespaces'][0]['NameSpace']
+ # try format 3 last
+ else:
+ dev = '/dev/' + subsystem['Controllers'][0]['Namespaces'][0]['NameSpace']
+ # vendor is the same for format 2 and 3
+ vendor = subsystem['Controllers'][0]['ModelNumber']
+ if vendor == 'Linux':
+ new_devs.append(dev)
+ bluestore_zap(remote, dev)
log.info(f'new_devs {new_devs}')
assert len(new_devs) <= len(devs)
if len(new_devs) == len(devs):
diff --git a/qa/tasks/nvmeof.py b/qa/tasks/nvmeof.py
index 42e357294d9..691a6f7dd86 100644
--- a/qa/tasks/nvmeof.py
+++ b/qa/tasks/nvmeof.py
@@ -128,12 +128,11 @@ class Nvmeof(Task):
total_images = int(self.namespaces_count) * int(self.subsystems_count)
log.info(f'[nvmeof]: creating {total_images} images')
+ rbd_create_cmd = []
for i in range(1, total_images + 1):
imagename = self.image_name_prefix + str(i)
- log.info(f'[nvmeof]: rbd create {poolname}/{imagename} --size {self.rbd_size}')
- _shell(self.ctx, self.cluster_name, self.remote, [
- 'rbd', 'create', f'{poolname}/{imagename}', '--size', f'{self.rbd_size}'
- ])
+ rbd_create_cmd += ['rbd', 'create', f'{poolname}/{imagename}', '--size', f'{self.rbd_size}', run.Raw(';')]
+ _shell(self.ctx, self.cluster_name, self.remote, rbd_create_cmd)
for role, i in daemons.items():
remote, id_ = i
@@ -251,9 +250,9 @@ class NvmeofThrasher(Thrasher, Greenlet):
daemon_max_thrash_times:
For now, NVMeoF daemons have limitation that each daemon can
- be thrashed only 3 times in span of 30 mins. This option
+ be thrashed only 5 times in span of 30 mins. This option
allows to set the amount of times it could be thrashed in a period
- of time. (default: 3)
+ of time. (default: 5)
daemon_max_thrash_period:
This option goes with the above option. It sets the period of time
over which each daemons can be thrashed for daemon_max_thrash_times
@@ -306,17 +305,17 @@ class NvmeofThrasher(Thrasher, Greenlet):
self.max_thrash_daemons = int(self.config.get('max_thrash', len(self.daemons) - 1))
# Limits on thrashing each daemon
- self.daemon_max_thrash_times = int(self.config.get('daemon_max_thrash_times', 3))
+ self.daemon_max_thrash_times = int(self.config.get('daemon_max_thrash_times', 5))
self.daemon_max_thrash_period = int(self.config.get('daemon_max_thrash_period', 30 * 60)) # seconds
self.min_thrash_delay = int(self.config.get('min_thrash_delay', 60))
self.max_thrash_delay = int(self.config.get('max_thrash_delay', self.min_thrash_delay + 30))
- self.min_revive_delay = int(self.config.get('min_revive_delay', 100))
+ self.min_revive_delay = int(self.config.get('min_revive_delay', 60))
self.max_revive_delay = int(self.config.get('max_revive_delay', self.min_revive_delay + 30))
def _get_devices(self, remote):
GET_DEVICE_CMD = "sudo nvme list --output-format=json | " \
- "jq -r '.Devices | sort_by(.NameSpace) | .[] | select(.ModelNumber == \"Ceph bdev Controller\") | .DevicePath'"
+ "jq -r '.Devices[].Subsystems[] | select(.Controllers | all(.ModelNumber == \"Ceph bdev Controller\")) | .Namespaces | sort_by(.NSID) | .[] | .NameSpace'"
devices = remote.sh(GET_DEVICE_CMD).split()
return devices
@@ -347,6 +346,7 @@ class NvmeofThrasher(Thrasher, Greenlet):
run.Raw('&&'), 'ceph', 'orch', 'ps', '--daemon-type', 'nvmeof',
run.Raw('&&'), 'ceph', 'health', 'detail',
run.Raw('&&'), 'ceph', '-s',
+ run.Raw('&&'), 'sudo', 'nvme', 'list',
]
for dev in self.devices:
check_cmd += [
@@ -421,13 +421,11 @@ class NvmeofThrasher(Thrasher, Greenlet):
while not self.stopping.is_set():
killed_daemons = defaultdict(list)
- weight = 1.0 / len(self.daemons)
- count = 0
+ thrash_daemon_num = self.rng.randint(1, self.max_thrash_daemons)
+ selected_daemons = self.rng.sample(self.daemons, thrash_daemon_num)
for daemon in self.daemons:
- skip = self.rng.uniform(0.0, 1.0)
- if weight <= skip:
- self.log('skipping daemon {label} with skip ({skip}) > weight ({weight})'.format(
- label=daemon.id_, skip=skip, weight=weight))
+ if daemon not in selected_daemons:
+ self.log(f'skipping daemon {daemon.id_} ...')
continue
# For now, nvmeof daemons can only be thrashed 3 times in last 30mins.
@@ -445,17 +443,11 @@ class NvmeofThrasher(Thrasher, Greenlet):
continue
self.log('kill {label}'.format(label=daemon.id_))
- # daemon.stop()
kill_method = self.kill_daemon(daemon)
killed_daemons[kill_method].append(daemon)
daemons_thrash_history[daemon.id_] += [datetime.now()]
- # only thrash max_thrash_daemons amount of daemons
- count += 1
- if count >= self.max_thrash_daemons:
- break
-
if killed_daemons:
iteration_summary = "thrashed- "
for kill_method in killed_daemons:
@@ -468,7 +460,7 @@ class NvmeofThrasher(Thrasher, Greenlet):
self.log(f'waiting for {revive_delay} secs before reviving')
time.sleep(revive_delay) # blocking wait
- self.log('done waiting before reviving')
+ self.log(f'done waiting before reviving - iteration #{len(summary)}: {iteration_summary}')
self.do_checks()
self.switch_task()
@@ -487,7 +479,7 @@ class NvmeofThrasher(Thrasher, Greenlet):
if thrash_delay > 0.0:
self.log(f'waiting for {thrash_delay} secs before thrashing')
time.sleep(thrash_delay) # blocking
- self.log('done waiting before thrashing')
+ self.log('done waiting before thrashing - everything should be up now')
self.do_checks()
self.switch_task()
diff --git a/qa/tasks/radosgw_admin.py b/qa/tasks/radosgw_admin.py
index 3b98702acca..fb82378761b 100644
--- a/qa/tasks/radosgw_admin.py
+++ b/qa/tasks/radosgw_admin.py
@@ -16,6 +16,7 @@ import logging
import time
import datetime
import sys
+import errno
from io import StringIO
from queue import Queue
@@ -725,6 +726,40 @@ def task(ctx, config):
(err, out) = rgwadmin(ctx, client, ['user', 'rm', '--tenant', tenant_name, '--uid', 'tenanteduser'],
check_status=True)
+ account_id = 'RGW12312312312312312'
+ account_name = 'testacct'
+ rgwadmin(ctx, client, [
+ 'account', 'create',
+ '--account-id', account_id,
+ '--account-name', account_name,
+ ], check_status=True)
+ rgwadmin(ctx, client, [
+ 'user', 'create',
+ '--account-id', account_id,
+ '--uid', 'testacctuser',
+ '--display-name', 'accountuser',
+ '--gen-access-key',
+ '--gen-secret',
+ ], check_status=True)
+
+ # TESTCASE 'bucket link', 'bucket', 'account user', 'fails'
+ (err, out) = rgwadmin(ctx, client, ['bucket', 'link', '--bucket', bucket_name, '--uid', 'testacctuser'])
+ assert err == errno.EINVAL
+
+ rgwadmin(ctx, client, ['user', 'rm', '--uid', 'testacctuser'], check_status=True)
+
+ # TESTCASE 'bucket link', 'bucket', 'account', 'succeeds'
+ rgwadmin(ctx, client,
+ ['bucket', 'link', '--bucket', bucket_name, '--account-id', account_id],
+ check_status=True)
+
+ # relink the bucket to the first user and delete the account
+ rgwadmin(ctx, client,
+ ['bucket', 'link', '--bucket', bucket_name, '--uid', user1],
+ check_status=True)
+ rgwadmin(ctx, client, ['account', 'rm', '--account-id', account_id],
+ check_status=True)
+
# TESTCASE 'object-rm', 'object', 'rm', 'remove object', 'succeeds, object is removed'
# upload an object
diff --git a/qa/tasks/rgw_multisite.py b/qa/tasks/rgw_multisite.py
index e83a54efc2b..f93ca017fa2 100644
--- a/qa/tasks/rgw_multisite.py
+++ b/qa/tasks/rgw_multisite.py
@@ -361,6 +361,8 @@ def create_zonegroup(cluster, gateways, period, config):
if endpoints:
# replace client names with their gateway endpoints
config['endpoints'] = extract_gateway_endpoints(gateways, endpoints)
+ if not config.get('api_name'): # otherwise it will be set to an empty string
+ config['api_name'] = config['name']
zonegroup = multisite.ZoneGroup(config['name'], period)
# `zonegroup set` needs --default on command line, and 'is_master' in json
args = is_default_arg(config)
diff --git a/qa/tasks/rook.py b/qa/tasks/rook.py
index 6cb75173966..fae5ef3bf00 100644
--- a/qa/tasks/rook.py
+++ b/qa/tasks/rook.py
@@ -8,7 +8,7 @@ import json
import logging
import os
import yaml
-from io import BytesIO
+from io import BytesIO, StringIO
from tarfile import ReadError
from tasks.ceph_manager import CephManager
@@ -235,10 +235,14 @@ def ceph_log(ctx, config):
r = ctx.rook[cluster_name].remote.run(
stdout=BytesIO(),
args=args,
+ stderr=StringIO(),
)
stdout = r.stdout.getvalue().decode()
if stdout:
return stdout
+ stderr = r.stderr.getvalue()
+ if stderr:
+ return stderr
return None
if first_in_ceph_log('\[ERR\]|\[WRN\]|\[SEC\]',
diff --git a/qa/tasks/s3a_hadoop.py b/qa/tasks/s3a_hadoop.py
index 7b77359fcf2..4518a6f397c 100644
--- a/qa/tasks/s3a_hadoop.py
+++ b/qa/tasks/s3a_hadoop.py
@@ -1,5 +1,6 @@
import contextlib
import logging
+import os
from teuthology import misc
from teuthology.orchestra import run
@@ -40,7 +41,7 @@ def task(ctx, config):
# get versions
maven_major = config.get('maven-major', 'maven-3')
- maven_version = config.get('maven-version', '3.6.3')
+ maven_version = config.get('maven-version', '3.9.9')
hadoop_ver = config.get('hadoop-version', '2.9.2')
bucket_name = config.get('bucket-name', 's3atest')
access_key = config.get('access-key', 'EGAQRD2ULOIFKFSKCT4F')
@@ -48,11 +49,19 @@ def task(ctx, config):
'secret-key',
'zi816w1vZKfaSM85Cl0BxXTwSLyN7zB4RbTswrGb')
+ # programmatically find a nearby mirror so as not to hammer archive.apache.org
+ apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \
+ "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1"
+ log.info("determining apache mirror by running: " + apache_mirror_cmd)
+ apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/)
+ log.info("chosen apache mirror is " + apache_mirror_url_front)
+
# set versions for cloning the repo
apache_maven = 'apache-maven-{maven_version}-bin.tar.gz'.format(
maven_version=maven_version)
- maven_link = 'http://archive.apache.org/dist/maven/' + \
- '{maven_major}/{maven_version}/binaries/'.format(maven_major=maven_major, maven_version=maven_version) + apache_maven
+ maven_link = '{apache_mirror_url_front}/maven/'.format(apache_mirror_url_front=apache_mirror_url_front) + \
+ '{maven_major}/{maven_version}/binaries/'.format(maven_major=maven_major, maven_version=maven_version) + \
+ apache_maven
hadoop_git = 'https://github.com/apache/hadoop'
hadoop_rel = 'hadoop-{ver} rel/release-{ver}'.format(ver=hadoop_ver)
if hadoop_ver == 'trunk':
@@ -204,6 +213,7 @@ def run_s3atest(client, maven_version, testdir, test_options):
run.Raw('&&'),
run.Raw(rm_test),
run.Raw('&&'),
+ run.Raw('JAVA_HOME=$(alternatives --list | grep jre_1.8.0 | head -n 1 | awk \'{print $3}\')'),
run.Raw(run_test),
run.Raw(test_options)
]
diff --git a/qa/tasks/s3tests.py b/qa/tasks/s3tests.py
index 6d7b39d5892..85ab97d23cd 100644
--- a/qa/tasks/s3tests.py
+++ b/qa/tasks/s3tests.py
@@ -57,6 +57,17 @@ def download(ctx, config):
'git', 'reset', '--hard', sha1,
],
)
+ if client_config.get('boto3_extensions'):
+ ctx.cluster.only(client).run(
+ args=['mkdir',
+ '-p',
+ '/home/ubuntu/.aws/models/s3/2006-03-01/']
+ )
+ (remote,) = ctx.cluster.only(client).remotes.keys()
+ remote_file = '/home/ubuntu/.aws/models/s3/2006-03-01/service-2.sdk-extras.json'
+ local_file = '{qadir}/../examples/rgw/boto3/service-2.sdk-extras.json'.format(qadir=ctx.config.get('suite_path'))
+ remote.put_file(local_file, remote_file)
+
try:
yield
finally:
@@ -70,6 +81,17 @@ def download(ctx, config):
'{tdir}/s3-tests-{client}'.format(tdir=testdir, client=client),
],
)
+ if client_config.get('boto3_extensions'):
+ ctx.cluster.only(client).run(
+ args=[
+ 'rm', '-rf', '/home/ubuntu/.aws/models/s3/2006-03-01/service-2.sdk-extras.json',
+ ],
+ )
+ ctx.cluster.only(client).run(
+ args=[
+ 'cd', '/home/ubuntu/', run.Raw('&&'), 'rmdir', '-p', '.aws/models/s3/2006-03-01/',
+ ],
+ )
def _config_user(s3tests_conf, section, user, email):
@@ -444,8 +466,10 @@ def run_tests(ctx, config):
attrs += ['not fails_with_subdomain']
if not client_config.get('with-sse-s3'):
attrs += ['not sse_s3']
-
+
attrs += client_config.get('extra_attrs', [])
+ if 'bucket_logging' not in attrs:
+ attrs += ['not bucket_logging']
if 'unit_test_scan' in client_config and client_config['unit_test_scan']:
xmlfile_id = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S--") + str(uuid.uuid4())
xmlpath= f'{testdir}/archive/s3test-{xmlfile_id}.xml'
diff --git a/qa/tasks/s3tests_java.py b/qa/tasks/s3tests_java.py
index 3e20e10d06c..a58aa6cf0b4 100644
--- a/qa/tasks/s3tests_java.py
+++ b/qa/tasks/s3tests_java.py
@@ -284,6 +284,7 @@ class S3tests_java(Task):
args = ['cd',
'{tdir}/s3-tests-java'.format(tdir=testdir),
run.Raw('&&'),
+ run.Raw('JAVA_HOME=$(alternatives --list | grep jre_1.8.0 | head -n 1 | awk \'{print $3}\')'),
'/opt/gradle/gradle/bin/gradle', 'clean', 'test',
'--rerun-tasks', '--no-build-cache',
]
diff --git a/qa/tasks/stretch_mode_disable_enable.py b/qa/tasks/stretch_mode_disable_enable.py
new file mode 100644
index 00000000000..a84a85bb307
--- /dev/null
+++ b/qa/tasks/stretch_mode_disable_enable.py
@@ -0,0 +1,547 @@
+import logging
+from tasks.mgr.mgr_test_case import MgrTestCase
+
+log = logging.getLogger(__name__)
+
+class TestStretchMode(MgrTestCase):
+ """
+ Test the stretch mode feature of Ceph
+ """
+ POOL = 'stretch_pool'
+ CLUSTER = "ceph"
+ WRITE_PERIOD = 10
+ RECOVERY_PERIOD = WRITE_PERIOD * 6
+ SUCCESS_HOLD_TIME = 7
+ STRETCH_CRUSH_RULE = 'stretch_rule'
+ STRETCH_CRUSH_RULE_ID = None
+ STRETCH_BUCKET_TYPE = 'datacenter'
+ TIEBREAKER_MON_NAME = 'e'
+ DEFAULT_POOL_TYPE = 'replicated'
+ DEFAULT_POOL_CRUSH_RULE = 'replicated_rule'
+ DEFAULT_POOL_SIZE = 3
+ DEFAULT_POOL_MIN_SIZE = 2
+ DEFAULT_POOL_CRUSH_RULE_ID = None
+ # This dictionary maps the datacenter to the osd ids and hosts
+ DC_OSDS = {
+ 'dc1': {
+ "host01": [0, 1],
+ "host02": [2, 3],
+ },
+ 'dc2': {
+ "host03": [4, 5],
+ "host04": [6, 7],
+ },
+ }
+ DC_MONS = {
+ 'dc1': {
+ "host01": ['a'],
+ "host02": ['b'],
+ },
+ 'dc2': {
+ "host03": ['c'],
+ "host04": ['d'],
+ },
+ 'dc3': {
+ "host05": ['e'],
+ }
+ }
+ def _osd_count(self):
+ """
+ Get the number of OSDs in the cluster.
+ """
+ osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json()
+ return len(osd_map['osds'])
+
+ def setUp(self):
+ """
+ Setup the cluster and
+ ensure we have a clean condition before the test.
+ """
+ # Ensure we have at least 6 OSDs
+ super(TestStretchMode, self).setUp()
+ self.DEFAULT_POOL_CRUSH_RULE_ID = self.mgr_cluster.mon_manager.get_crush_rule_id(self.DEFAULT_POOL_CRUSH_RULE)
+ self.STRETCH_CRUSH_RULE_ID = self.mgr_cluster.mon_manager.get_crush_rule_id(self.STRETCH_CRUSH_RULE)
+ if self._osd_count() < 4:
+ self.skipTest("Not enough OSDS!")
+
+ # Remove any filesystems so that we can remove their pools
+ if self.mds_cluster:
+ self.mds_cluster.mds_stop()
+ self.mds_cluster.mds_fail()
+ self.mds_cluster.delete_all_filesystems()
+
+ # Remove all other pools
+ for pool in self.mgr_cluster.mon_manager.get_osd_dump_json()['pools']:
+ try:
+ self.mgr_cluster.mon_manager.remove_pool(pool['pool_name'])
+ except:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'delete',
+ pool['pool_name'],
+ pool['pool_name'],
+ '--yes-i-really-really-mean-it')
+
+ def _setup_pool(
+ self,
+ pool_name=POOL,
+ pg_num=16,
+ pool_type=DEFAULT_POOL_TYPE,
+ crush_rule=DEFAULT_POOL_CRUSH_RULE,
+ size=None,
+ min_size=None
+ ):
+ """
+ Create a pool, set its size and pool if specified.
+ """
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'create', pool_name, str(pg_num), pool_type, crush_rule)
+
+ if size is not None:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'set', pool_name, 'size', str(size))
+
+ if min_size is not None:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'pool', 'set', pool_name, 'min_size', str(min_size))
+
+ def _write_some_data(self, t):
+ """
+ Write some data to the pool to simulate a workload.
+ """
+ args = [
+ "rados", "-p", self.POOL, "bench", str(t), "write", "-t", "16"]
+ self.mgr_cluster.admin_remote.run(args=args, wait=True)
+
+ def _get_all_mons_from_all_dc(self):
+ """
+ Get all mons from all datacenters.
+ """
+ return [mon for dc in self.DC_MONS.values() for mons in dc.values() for mon in mons]
+
+ def _bring_back_mon(self, mon):
+ """
+ Bring back the mon.
+ """
+ try:
+ self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).restart()
+ except Exception:
+ log.error("Failed to bring back mon.{}".format(str(mon)))
+ pass
+
+ def _get_host(self, osd):
+ """
+ Get the host of the osd.
+ """
+ for dc, nodes in self.DC_OSDS.items():
+ for node, osds in nodes.items():
+ if osd in osds:
+ return node
+ return None
+
+ def _move_osd_back_to_host(self, osd):
+ """
+ Move the osd back to the host.
+ """
+ host = self._get_host(osd)
+ assert host is not None, "The host of osd {} is not found.".format(osd)
+ log.debug("Moving osd.%d back to %s", osd, host)
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'crush', 'move', 'osd.{}'.format(str(osd)),
+ 'host={}'.format(host)
+ )
+
+ def tearDown(self):
+ """
+ Clean up the cluster after the test.
+ """
+ # Remove the pool
+ if self.POOL in self.mgr_cluster.mon_manager.pools:
+ self.mgr_cluster.mon_manager.remove_pool(self.POOL)
+
+ osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json()
+ for osd in osd_map['osds']:
+ # mark all the osds in
+ if osd['weight'] == 0.0:
+ self.mgr_cluster.mon_manager.raw_cluster_cmd(
+ 'osd', 'in', str(osd['osd']))
+ # Bring back all the osds and move it back to the host.
+ if osd['up'] == 0:
+ self.mgr_cluster.mon_manager.revive_osd(osd['osd'])
+ self._move_osd_back_to_host(osd['osd'])
+
+ # Bring back all the mons
+ mons = self._get_all_mons_from_all_dc()
+ for mon in mons:
+ self._bring_back_mon(mon)
+ super(TestStretchMode, self).tearDown()
+
+ def _kill_osd(self, osd):
+ """
+ Kill the osd.
+ """
+ try:
+ self.ctx.daemons.get_daemon('osd', osd, self.CLUSTER).stop()
+ except Exception:
+ log.error("Failed to stop osd.{}".format(str(osd)))
+ pass
+
+ def _get_osds_data(self, want_osds):
+ """
+ Get the osd data
+ """
+ all_osds_data = \
+ self.mgr_cluster.mon_manager.get_osd_dump_json()['osds']
+ return [
+ osd_data for osd_data in all_osds_data
+ if int(osd_data['osd']) in want_osds
+ ]
+
+ def _get_osds_by_dc(self, dc):
+ """
+ Get osds by datacenter.
+ """
+ ret = []
+ for host, osds in self.DC_OSDS[dc].items():
+ ret.extend(osds)
+ return ret
+
+ def _fail_over_all_osds_in_dc(self, dc):
+ """
+ Fail over all osds in specified <datacenter>
+ """
+ if not isinstance(dc, str):
+ raise ValueError("dc must be a string")
+ if dc not in self.DC_OSDS:
+ raise ValueError(
+ "dc must be one of the following: %s" % self.DC_OSDS.keys()
+ )
+ log.debug("Failing over all osds in %s", dc)
+ osds = self._get_osds_by_dc(dc)
+ # fail over all the OSDs in the DC
+ log.debug("OSDs to failed over: %s", osds)
+ for osd_id in osds:
+ self._kill_osd(osd_id)
+ # wait until all the osds are down
+ self.wait_until_true(
+ lambda: all([int(osd['up']) == 0
+ for osd in self._get_osds_data(osds)]),
+ timeout=self.RECOVERY_PERIOD
+ )
+
+ def _check_mons_out_of_quorum(self, want_mons):
+ """
+ Check if the mons are not in quorum.
+ """
+ quorum_names = self.mgr_cluster.mon_manager.get_mon_quorum_names()
+ return all([mon not in quorum_names for mon in want_mons])
+
+ def _kill_mon(self, mon):
+ """
+ Kill the mon.
+ """
+ try:
+ self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).stop()
+ except Exception:
+ log.error("Failed to stop mon.{}".format(str(mon)))
+ pass
+
+ def _get_mons_by_dc(self, dc):
+ """
+ Get mons by datacenter.
+ """
+ ret = []
+ for host, mons in self.DC_MONS[dc].items():
+ ret.extend(mons)
+ return ret
+
+ def _fail_over_all_mons_in_dc(self, dc):
+ """
+ Fail over all mons in the specified <datacenter>
+ """
+ if not isinstance(dc, str):
+ raise ValueError("dc must be a string")
+ if dc not in self.DC_MONS:
+ raise ValueError("dc must be one of the following: %s" %
+ ", ".join(self.DC_MONS.keys()))
+ log.debug("Failing over all mons %s", dc)
+ mons = self._get_mons_by_dc(dc)
+ log.debug("Mons to be failed over: %s", mons)
+ for mon in mons:
+ self._kill_mon(mon)
+ # wait until all the mons are out of quorum
+ self.wait_until_true(
+ lambda: self._check_mons_out_of_quorum(mons),
+ timeout=self.RECOVERY_PERIOD
+ )
+
+ def _stretch_mode_enabled_correctly(self):
+ """
+ Evaluate whether the stretch mode is enabled correctly.
+ by checking the OSDMap and MonMap.
+ """
+ # Checking the OSDMap
+ osdmap = self.mgr_cluster.mon_manager.get_osd_dump_json()
+ for pool in osdmap['pools']:
+ # expects crush_rule to be stretch_rule
+ self.assertEqual(
+ self.STRETCH_CRUSH_RULE_ID,
+ pool['crush_rule']
+ )
+ # expects pool size to be 4
+ self.assertEqual(
+ 4,
+ pool['size']
+ )
+ # expects pool min_size to be 2
+ self.assertEqual(
+ 2,
+ pool['min_size']
+ )
+ # expects pool is_stretch_pool flag to be true
+ self.assertEqual(
+ True,
+ pool['is_stretch_pool']
+ )
+ # expects peering_crush_bucket_count = 2 (always this value for stretch mode)
+ self.assertEqual(
+ 2,
+ pool['peering_crush_bucket_count']
+ )
+ # expects peering_crush_bucket_target = 2 (always this value for stretch mode)
+ self.assertEqual(
+ 2,
+ pool['peering_crush_bucket_target']
+ )
+ # expects peering_crush_bucket_barrier = 8 (crush type of datacenter is 8)
+ self.assertEqual(
+ 8,
+ pool['peering_crush_bucket_barrier']
+ )
+ # expects stretch_mode_enabled to be True
+ self.assertEqual(
+ True,
+ osdmap['stretch_mode']['stretch_mode_enabled']
+ )
+ # expects stretch_mode_bucket_count to be 2
+ self.assertEqual(
+ 2,
+ osdmap['stretch_mode']['stretch_bucket_count']
+ )
+ # expects degraded_stretch_mode to be 0
+ self.assertEqual(
+ 0,
+ osdmap['stretch_mode']['degraded_stretch_mode']
+ )
+ # expects recovering_stretch_mode to be 0
+ self.assertEqual(
+ 0,
+ osdmap['stretch_mode']['recovering_stretch_mode']
+ )
+ # expects stretch_mode_bucket to be 8 (datacenter crush type = 8)
+ self.assertEqual(
+ 8,
+ osdmap['stretch_mode']['stretch_mode_bucket']
+ )
+ # Checking the MonMap
+ monmap = self.mgr_cluster.mon_manager.get_mon_dump_json()
+ # expects stretch_mode to be True
+ self.assertEqual(
+ True,
+ monmap['stretch_mode']
+ )
+ # expects disallowed_leaders to be tiebreaker_mon
+ self.assertEqual(
+ self.TIEBREAKER_MON_NAME,
+ monmap['disallowed_leaders']
+ )
+ # expects tiebreaker_mon to be tiebreaker_mon
+ self.assertEqual(
+ self.TIEBREAKER_MON_NAME,
+ monmap['tiebreaker_mon']
+ )
+
+ def _stretch_mode_disabled_correctly(self):
+ """
+ Evaluate whether the stretch mode is disabled correctly.
+ by checking the OSDMap and MonMap.
+ """
+ # Checking the OSDMap
+ osdmap = self.mgr_cluster.mon_manager.get_osd_dump_json()
+ for pool in osdmap['pools']:
+ # expects crush_rule to be default
+ self.assertEqual(
+ self.DEFAULT_POOL_CRUSH_RULE_ID,
+ pool['crush_rule']
+ )
+ # expects pool size to be default
+ self.assertEqual(
+ self.DEFAULT_POOL_SIZE,
+ pool['size']
+ )
+ # expects pool min_size to be default
+ self.assertEqual(
+ self.DEFAULT_POOL_MIN_SIZE,
+ pool['min_size']
+ )
+ # expects pool is_stretch_pool flag to be false
+ self.assertEqual(
+ False,
+ pool['is_stretch_pool']
+ )
+ # expects peering_crush_bucket_count = 0
+ self.assertEqual(
+ 0,
+ pool['peering_crush_bucket_count']
+ )
+ # expects peering_crush_bucket_target = 0
+ self.assertEqual(
+ 0,
+ pool['peering_crush_bucket_target']
+ )
+ # expects peering_crush_bucket_barrier = 0
+ self.assertEqual(
+ 0,
+ pool['peering_crush_bucket_barrier']
+ )
+ # expects stretch_mode_enabled to be False
+ self.assertEqual(
+ False,
+ osdmap['stretch_mode']['stretch_mode_enabled']
+ )
+ # expects stretch_mode_bucket to be 0
+ self.assertEqual(
+ 0,
+ osdmap['stretch_mode']['stretch_bucket_count']
+ )
+ # expects degraded_stretch_mode to be 0
+ self.assertEqual(
+ 0,
+ osdmap['stretch_mode']['degraded_stretch_mode']
+ )
+ # expects recovering_stretch_mode to be 0
+ self.assertEqual(
+ 0,
+ osdmap['stretch_mode']['recovering_stretch_mode']
+ )
+ # expects stretch_mode_bucket to be 0
+ self.assertEqual(
+ 0,
+ osdmap['stretch_mode']['stretch_mode_bucket']
+ )
+ # Checking the MonMap
+ monmap = self.mgr_cluster.mon_manager.get_mon_dump_json()
+ # expects stretch_mode to be False
+ self.assertEqual(
+ False,
+ monmap['stretch_mode']
+ )
+ # expects disallowed_leaders to be empty
+ self.assertEqual(
+ "",
+ monmap['disallowed_leaders']
+ )
+ # expects tiebreaker_mon to be empty
+ self.assertEqual(
+ "",
+ monmap['tiebreaker_mon']
+ )
+
+ def test_disable_stretch_mode(self):
+ """
+ Test disabling stretch mode with the following scenario:
+ 1. Healthy Stretch Mode
+ 2. Degraded Stretch Mode
+ """
+ # Create a pool
+ self._setup_pool(self.POOL, 16, 'replicated', self.STRETCH_CRUSH_RULE, 4, 2)
+ # Write some data to the pool
+ self._write_some_data(self.WRITE_PERIOD)
+ # disable stretch mode without --yes-i-really-mean-it (expects -EPERM 1)
+ self.assertEqual(
+ 1,
+ self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
+ 'mon',
+ 'disable_stretch_mode'
+ ))
+ # Disable stretch mode with non-existent crush rule (expects -EINVAL 22)
+ self.assertEqual(
+ 22,
+ self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
+ 'mon',
+ 'disable_stretch_mode',
+ 'non_existent_rule',
+ '--yes-i-really-mean-it'
+ ))
+ # Disable stretch mode with the current stretch rule (expect -EINVAL 22)
+ self.assertEqual(
+ 22,
+ self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
+ 'mon',
+ 'disable_stretch_mode',
+ self.STRETCH_CRUSH_RULE,
+ '--yes-i-really-mean-it',
+
+ ))
+ # Disable stretch mode without crush rule (expect success 0)
+ self.assertEqual(
+ 0,
+ self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
+ 'mon',
+ 'disable_stretch_mode',
+ '--yes-i-really-mean-it'
+ ))
+ # Check if stretch mode is disabled correctly
+ self._stretch_mode_disabled_correctly()
+ # all PGs are active + clean
+ self.wait_until_true_and_hold(
+ lambda: self.mgr_cluster.mon_manager.pg_all_active_clean(),
+ timeout=self.RECOVERY_PERIOD,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ # write some data to the pool
+ self._write_some_data(self.WRITE_PERIOD)
+ # Enable stretch mode
+ self.assertEqual(
+ 0,
+ self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
+ 'mon',
+ 'enable_stretch_mode',
+ self.TIEBREAKER_MON_NAME,
+ self.STRETCH_CRUSH_RULE,
+ self.STRETCH_BUCKET_TYPE
+ ))
+ self._stretch_mode_enabled_correctly()
+ # all PGs are active + clean
+ self.wait_until_true_and_hold(
+ lambda: self.mgr_cluster.mon_manager.pg_all_active_clean(),
+ timeout=self.RECOVERY_PERIOD,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ # write some data to the pool
+ # self._write_some_data(self.WRITE_PERIOD)
+ # Bring down dc1
+ self._fail_over_all_osds_in_dc('dc1')
+ self._fail_over_all_mons_in_dc('dc1')
+ # should be in degraded stretch mode
+ self.wait_until_true_and_hold(
+ lambda: self.mgr_cluster.mon_manager.is_degraded_stretch_mode(),
+ timeout=self.RECOVERY_PERIOD,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
+ # Disable stretch mode with valid crush rule (expect success 0)
+ self.assertEqual(
+ 0,
+ self.mgr_cluster.mon_manager.raw_cluster_cmd_result(
+ 'mon',
+ 'disable_stretch_mode',
+ self.DEFAULT_POOL_CRUSH_RULE,
+ '--yes-i-really-mean-it'
+ ))
+ # Check if stretch mode is disabled correctly
+ self._stretch_mode_disabled_correctly()
+ # all PGs are active
+ self.wait_until_true_and_hold(
+ lambda: self.mgr_cluster.mon_manager.pg_all_active(),
+ timeout=self.RECOVERY_PERIOD,
+ success_hold_time=self.SUCCESS_HOLD_TIME
+ )
diff --git a/qa/tasks/thrashosds-health.yaml b/qa/tasks/thrashosds-health.yaml
index b70583a75e1..dbde1ced0db 100644
--- a/qa/tasks/thrashosds-health.yaml
+++ b/qa/tasks/thrashosds-health.yaml
@@ -30,3 +30,4 @@ overrides:
- out of quorum
- noscrub
- nodeep-scrub
+ - is down
diff --git a/qa/tasks/vstart_runner.py b/qa/tasks/vstart_runner.py
index ca929ba05b4..2ed21431330 100644
--- a/qa/tasks/vstart_runner.py
+++ b/qa/tasks/vstart_runner.py
@@ -233,6 +233,11 @@ class LocalRemoteProcess(object):
else:
self.stderr.write(err)
+ def _handle_subprocess_output(self, output, stream):
+ if isinstance(stream, StringIO):
+ return rm_nonascii_chars(output)
+ return output
+
def wait(self, timeout=None):
# Null subproc.stdin so communicate() does not try flushing/closing it
# again.
@@ -250,7 +255,8 @@ class LocalRemoteProcess(object):
return
out, err = self.subproc.communicate(timeout=timeout)
- out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
+ out = self._handle_subprocess_output(out, self.stdout)
+ err = self._handle_subprocess_output(err, self.stderr)
self._write_stdout(out)
self._write_stderr(err)