diff options
Diffstat (limited to 'qa/tasks')
28 files changed, 1384 insertions, 59 deletions
diff --git a/qa/tasks/ceph.py b/qa/tasks/ceph.py index 9b04e3dc675..8f666d2fa9b 100644 --- a/qa/tasks/ceph.py +++ b/qa/tasks/ceph.py @@ -1206,8 +1206,18 @@ def cluster(ctx, config): args.extend([ run.Raw('|'), 'head', '-n', '1', ]) - stdout = mon0_remote.sh(args) - return stdout or None + r = mon0_remote.run( + stdout=BytesIO(), + args=args, + stderr=StringIO(), + ) + stdout = r.stdout.getvalue().decode() + if stdout: + return stdout + stderr = r.stderr.getvalue() + if stderr: + return stderr + return None if first_in_ceph_log('\[ERR\]|\[WRN\]|\[SEC\]', config['log_ignorelist']) is not None: diff --git a/qa/tasks/ceph_manager.py b/qa/tasks/ceph_manager.py index 7005c8db0ff..57d22f3b5e6 100644 --- a/qa/tasks/ceph_manager.py +++ b/qa/tasks/ceph_manager.py @@ -2796,6 +2796,59 @@ class CephManager: num += 1 return num + def _print_not_active_clean_pg(self, pgs): + """ + Print the PGs that are not active+clean. + """ + for pg in pgs: + if not (pg['state'].count('active') and + pg['state'].count('clean') and + not pg['state'].count('stale')): + log.debug( + "PG %s is not active+clean, but %s", + pg['pgid'], pg['state'] + ) + + def pg_all_active_clean(self): + """ + Check if all pgs are active+clean + return: True if all pgs are active+clean else False + """ + pgs = self.get_pg_stats() + result = self._get_num_active_clean(pgs) == len(pgs) + if result: + log.debug("All PGs are active+clean") + else: + log.debug("Not all PGs are active+clean") + self._print_not_active_clean_pg(pgs) + return result + + def _print_not_active_pg(self, pgs): + """ + Print the PGs that are not active. + """ + for pg in pgs: + if not (pg['state'].count('active') + and not pg['state'].count('stale')): + log.debug( + "PG %s is not active, but %s", + pg['pgid'], pg['state'] + ) + + def pg_all_active(self): + """ + Check if all pgs are active + return: True if all pgs are active else False + """ + pgs = self.get_pg_stats() + result = self._get_num_active(pgs) == len(pgs) + if result: + log.debug("All PGs are active") + else: + log.debug("Not all PGs are active") + self._print_not_active_pg(pgs) + return result + def is_clean(self): """ True if all pgs are clean @@ -3237,6 +3290,26 @@ class CephManager: self.make_admin_daemon_dir(remote) self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart() + def get_crush_rule_id(self, crush_rule_name): + """ + Get crush rule id by name + :returns: int -- crush rule id + """ + out = self.raw_cluster_cmd('osd', 'crush', 'rule', 'dump', '--format=json') + j = json.loads('\n'.join(out.split('\n')[1:])) + for rule in j: + if rule['rule_name'] == crush_rule_name: + return rule['rule_id'] + assert False, 'rule %s not found' % crush_rule_name + + def get_mon_dump_json(self): + """ + mon dump --format=json converted to a python object + :returns: the python object + """ + out = self.raw_cluster_cmd('mon', 'dump', '--format=json') + return json.loads('\n'.join(out.split('\n')[1:])) + def get_mon_status(self, mon): """ Extract all the monitor status information from the cluster @@ -3340,6 +3413,23 @@ class CephManager: self.log(task_status) return task_status + # Stretch mode related functions + def is_degraded_stretch_mode(self): + """ + Return whether the cluster is in degraded stretch mode + """ + try: + osdmap = self.get_osd_dump_json() + stretch_mode = osdmap.get('stretch_mode', {}) + degraded_stretch_mode = stretch_mode.get('degraded_stretch_mode', 0) + self.log("is_degraded_stretch_mode: {0}".format(degraded_stretch_mode)) + return degraded_stretch_mode == 1 + except (TypeError, AttributeError) as e: + # Log the error or handle it as needed + self.log("Error accessing degraded_stretch_mode: {0}".format(e)) + return False + + def utility_task(name): """ Generate ceph_manager subtask corresponding to ceph_manager diff --git a/qa/tasks/cephadm.py b/qa/tasks/cephadm.py index dab61c2c700..0cde6050718 100644 --- a/qa/tasks/cephadm.py +++ b/qa/tasks/cephadm.py @@ -475,12 +475,16 @@ def ceph_log(ctx, config): run.Raw('|'), 'head', '-n', '1', ]) r = ctx.ceph[cluster_name].bootstrap_remote.run( - stdout=StringIO(), + stdout=BytesIO(), args=args, + stderr=StringIO(), ) - stdout = r.stdout.getvalue() - if stdout != '': + stdout = r.stdout.getvalue().decode() + if stdout: return stdout + stderr = r.stderr.getvalue() + if stderr: + return stderr return None # NOTE: technically the first and third arg to first_in_ceph_log diff --git a/qa/tasks/cephfs/cephfs_test_case.py b/qa/tasks/cephfs/cephfs_test_case.py index c1312ec5efc..21b96d2b22b 100644 --- a/qa/tasks/cephfs/cephfs_test_case.py +++ b/qa/tasks/cephfs/cephfs_test_case.py @@ -252,8 +252,8 @@ class CephFSTestCase(CephTestCase): def get_session_data(self, client_id): return self._session_by_id(client_id) - def _session_list(self): - ls_data = self.fs.mds_asok(['session', 'ls']) + def _session_list(self, rank=None, status=None): + ls_data = self.fs.rank_asok(['session', 'ls'], rank=rank, status=status) ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']] return ls_data @@ -269,9 +269,9 @@ class CephFSTestCase(CephTestCase): def perf_dump(self, rank=None, status=None): return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status) - def wait_until_evicted(self, client_id, timeout=30): + def wait_until_evicted(self, client_id, rank=None, timeout=30): def is_client_evicted(): - ls = self._session_list() + ls = self._session_list(rank=rank) for s in ls: if s['id'] == client_id: return False diff --git a/qa/tasks/cephfs/filesystem.py b/qa/tasks/cephfs/filesystem.py index 2b7fd2ee569..3846ef23f97 100644 --- a/qa/tasks/cephfs/filesystem.py +++ b/qa/tasks/cephfs/filesystem.py @@ -649,6 +649,8 @@ class FilesystemBase(MDSClusterBase): def set_session_timeout(self, timeout): self.set_var("session_timeout", "%d" % timeout) + def set_session_autoclose(self, autoclose_time): + self.set_var("session_autoclose", "%d" % autoclose_time) def set_allow_standby_replay(self, yes): self.set_var("allow_standby_replay", yes) diff --git a/qa/tasks/cephfs/test_exports.py b/qa/tasks/cephfs/test_exports.py index e5ad18dd662..468378fce3d 100644 --- a/qa/tasks/cephfs/test_exports.py +++ b/qa/tasks/cephfs/test_exports.py @@ -153,6 +153,8 @@ class TestExportPin(CephFSTestCase): # vstart.sh sets mds_debug_subtrees to True. That causes a ESubtreeMap # to be written out every event. Yuck! self.config_set('mds', 'mds_debug_subtrees', False) + # make sure ESubtreeMap is written frequently enough: + self.config_set('mds', 'mds_log_minor_segments_per_major_segment', '4') self.config_rm('mds', 'mds bal split size') # don't split /top self.mount_a.run_shell_payload("rm -rf 1") @@ -724,3 +726,91 @@ class TestDumpExportStates(CephFSTestCase): self._test_freeze_tree(state, 0) self.assertTrue(type(state['notify_ack_waiting']) is list) + +class TestKillExports(CephFSTestCase): + MDSS_REQUIRED = 2 + CLIENTS_REQUIRED = 1 + + def setUp(self): + CephFSTestCase.setUp(self) + + self.fs.set_max_mds(self.MDSS_REQUIRED) + self.status = self.fs.wait_for_daemons() + + self.mount_a.run_shell_payload('mkdir -p test/export') + + def tearDown(self): + super().tearDown() + + def _kill_export_as(self, rank, kill): + self.fs.rank_asok(['config', 'set', 'mds_kill_export_at', str(kill)], rank=rank, status=self.status) + + def _export_dir(self, path, source, target): + self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status) + + def _wait_failover(self): + self.wait_until_true(lambda: self.fs.status().hadfailover(self.status), timeout=self.fs.beacon_timeout) + + def _clear_coredump(self, rank): + crash_rank = self.fs.get_rank(rank=rank, status=self.status) + self.delete_mds_coredump(crash_rank['name']) + + def _run_kill_export(self, kill_at, exporter_rank=0, importer_rank=1, restart=True): + self._kill_export_as(exporter_rank, kill_at) + self._export_dir("/test", exporter_rank, importer_rank) + self._wait_failover() + self._clear_coredump(exporter_rank) + + if restart: + self.fs.rank_restart(rank=exporter_rank, status=self.status) + self.status = self.fs.wait_for_daemons() + + def test_session_cleanup(self): + """ + Test importer's session cleanup after an export subtree task is interrupted. + Set 'mds_kill_export_at' to 9 or 10 so that the importer will wait for the exporter + to restart while the state is 'acking'. + + See https://tracker.ceph.com/issues/61459 + """ + + kill_export_at = [9, 10] + + exporter_rank = 0 + importer_rank = 1 + + for kill in kill_export_at: + log.info(f"kill_export_at: {kill}") + self._run_kill_export(kill, exporter_rank, importer_rank) + + if len(self._session_list(importer_rank, self.status)) > 0: + client_id = self.mount_a.get_global_id() + self.fs.rank_asok(['session', 'evict', "%s" % client_id], rank=importer_rank, status=self.status) + + # timeout if buggy + self.wait_until_evicted(client_id, importer_rank) + + # for multiple tests + self.mount_a.remount() + + def test_client_eviction(self): + # modify the timeout so that we don't have to wait too long + timeout = 30 + self.fs.set_session_timeout(timeout) + self.fs.set_session_autoclose(timeout + 5) + + kill_export_at = [9, 10] + + exporter_rank = 0 + importer_rank = 1 + + for kill in kill_export_at: + log.info(f"kill_export_at: {kill}") + self._run_kill_export(kill, exporter_rank, importer_rank) + + client_id = self.mount_a.get_global_id() + self.wait_until_evicted(client_id, importer_rank, timeout + 10) + time.sleep(1) + + # failed if buggy + self.mount_a.ls() diff --git a/qa/tasks/cephfs/test_failover.py b/qa/tasks/cephfs/test_failover.py index 29af1e76a4f..46139163ddd 100644 --- a/qa/tasks/cephfs/test_failover.py +++ b/qa/tasks/cephfs/test_failover.py @@ -1,3 +1,4 @@ +import re import time import signal import logging @@ -342,6 +343,60 @@ class TestClusterResize(CephFSTestCase): self.fs.wait_for_daemons(timeout=90) +class TestFailoverBeaconHealth(CephFSTestCase): + CLIENTS_REQUIRED = 1 + MDSS_REQUIRED = 1 + + def initiate_journal_replay(self, num_files=100): + """ Initiate journal replay by creating files and restarting mds server.""" + + self.config_set("mds", "mds_delay_journal_replay_for_testing", "5000") + self.mounts[0].test_files = [str(x) for x in range(num_files)] + self.mounts[0].create_files() + self.fs.fail() + self.fs.set_joinable() + + def test_replay_beacon_estimated_time(self): + """ + That beacon emits warning message with estimated time to complete replay + """ + self.initiate_journal_replay() + self.wait_for_health("MDS_ESTIMATED_REPLAY_TIME", 60) + # remove the config so that replay finishes and the cluster + # is HEALTH_OK + self.config_rm("mds", "mds_delay_journal_replay_for_testing") + self.wait_for_health_clear(timeout=60) + + def test_replay_estimated_time_accuracy(self): + self.initiate_journal_replay(250) + def replay_complete(): + health = self.ceph_cluster.mon_manager.get_mon_health(debug=False, detail=True) + codes = [s for s in health['checks']] + return 'MDS_ESTIMATED_REPLAY_TIME' not in codes + + def get_estimated_time(): + completion_percentage = 0.0 + time_duration = pending_duration = 0 + with safe_while(sleep=5, tries=360) as proceed: + while proceed(): + health = self.ceph_cluster.mon_manager.get_mon_health(debug=False, detail=True) + codes = [s for s in health['checks']] + if 'MDS_ESTIMATED_REPLAY_TIME' in codes: + message = health['checks']['MDS_ESTIMATED_REPLAY_TIME']['detail'][0]['message'] + ### sample warning string: "mds.a(mds.0): replay: 50.0446% complete - elapsed time: 582s, estimated time remaining: 581s" + m = re.match(".* replay: (\d+(\.\d+)?)% complete - elapsed time: (\d+)s, estimated time remaining: (\d+)s", message) + if not m: + continue + completion_percentage = float(m.group(1)) + time_duration = int(m.group(3)) + pending_duration = int(m.group(4)) + log.debug(f"MDS_ESTIMATED_REPLAY_TIME is present in health: {message}, duration: {time_duration}, completion_percentage: {completion_percentage}") + if completion_percentage >= 50: + return (completion_percentage, time_duration, pending_duration) + _, _, pending_duration = get_estimated_time() + # wait for 25% more time to avoid false negative failures + self.wait_until_true(replay_complete, timeout=pending_duration * 1.25) + class TestFailover(CephFSTestCase): CLIENTS_REQUIRED = 1 MDSS_REQUIRED = 2 diff --git a/qa/tasks/cephfs/test_nfs.py b/qa/tasks/cephfs/test_nfs.py index faa35be6926..0a1c07dce04 100644 --- a/qa/tasks/cephfs/test_nfs.py +++ b/qa/tasks/cephfs/test_nfs.py @@ -369,6 +369,45 @@ class TestNFS(MgrTestCase): except CommandFailedError as e: self.fail(f"expected read/write of a file to be successful but failed with {e.exitstatus}") + def _mnt_nfs(self, pseudo_path, port, ip): + ''' + Mount created export + :param pseudo_path: It is the pseudo root name + :param port: Port of deployed nfs cluster + :param ip: IP of deployed nfs cluster + ''' + tries = 3 + while True: + try: + self.ctx.cluster.run( + args=['sudo', 'mount', '-t', 'nfs', '-o', f'port={port}', + f'{ip}:{pseudo_path}', '/mnt']) + break + except CommandFailedError: + if tries: + tries -= 1 + time.sleep(2) + continue + raise + + self.ctx.cluster.run(args=['sudo', 'chmod', '1777', '/mnt']) + + def _test_fio(self, pseudo_path, port, ip): + ''' + run fio with libaio on /mnt/fio + :param mnt_path: nfs mount point + ''' + try: + self._mnt_nfs(pseudo_path, port, ip) + self.ctx.cluster.run(args=['mkdir', '/mnt/fio']) + fio_cmd=['sudo', 'fio', '--ioengine=libaio', '-directory=/mnt/fio', '--filename=fio.randrw.test', '--name=job', '--bs=16k', '--direct=1', '--group_reporting', '--iodepth=128', '--randrepeat=0', '--norandommap=1', '--thread=2', '--ramp_time=20s', '--offset_increment=5%', '--size=5G', '--time_based', '--runtime=300', '--ramp_time=1s', '--percentage_random=0', '--rw=randrw', '--rwmixread=50'] + self.ctx.cluster.run(args=fio_cmd) + except CommandFailedError as e: + self.fail(f"expected fio to be successful but failed with {e.exitstatus}") + finally: + self.ctx.cluster.run(args=['sudo', 'rm', '-rf', '/mnt/fio']) + self.ctx.cluster.run(args=['sudo', 'umount', '/mnt']) + def _write_to_read_only_export(self, pseudo_path, port, ip): ''' Check if write to read only export fails @@ -627,6 +666,18 @@ class TestNFS(MgrTestCase): self._test_data_read_write(self.pseudo_path, port, ip) self._test_delete_cluster() + def test_async_io_fio(self): + ''' + Test async io using fio. Expect completion without hang or crash + ''' + self._test_create_cluster() + self._create_export(export_id='1', create_fs=True, + extra_cmd=['--pseudo-path', self.pseudo_path]) + port, ip = self._get_port_ip_info() + self._check_nfs_cluster_status('running', 'NFS Ganesha cluster restart failed') + self._test_fio(self.pseudo_path, port, ip) + self._test_delete_cluster() + def test_cluster_info(self): ''' Test cluster info outputs correct ip and hostname diff --git a/qa/tasks/check_counter.py b/qa/tasks/check_counter.py index 40818f3f475..1f63b6a0bd4 100644 --- a/qa/tasks/check_counter.py +++ b/qa/tasks/check_counter.py @@ -1,11 +1,14 @@ import logging import json +import errno from teuthology.task import Task from teuthology import misc from tasks import ceph_manager +from tasks.cephfs.filesystem import MDSCluster +from teuthology.exceptions import CommandFailedError log = logging.getLogger(__name__) @@ -61,6 +64,9 @@ class CheckCounter(Task): mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=self.ctx, logger=log.getChild('ceph_manager')) active_mgr = json.loads(mon_manager.raw_cluster_cmd("mgr", "dump", "--format=json-pretty"))["active_name"] + mds_cluster = MDSCluster(self.ctx) + status = mds_cluster.status() + for daemon_type, counters in targets.items(): # List of 'a', 'b', 'c'... daemon_ids = list(misc.all_roles_of_type(self.ctx.cluster, daemon_type)) @@ -80,13 +86,31 @@ class CheckCounter(Task): else: log.debug("Getting stats from {0}".format(daemon_id)) - manager = self.ctx.managers[cluster_name] - proc = manager.admin_socket(daemon_type, daemon_id, ["perf", "dump"]) - response_data = proc.stdout.getvalue().strip() + if daemon_type == 'mds': + mds_info = status.get_mds(daemon_id) + if not mds_info: + continue + mds = f"mds.{mds_info['gid']}" + if mds_info['state'] != "up:active": + log.debug(f"skipping {mds}") + continue + log.debug(f"Getting stats from {mds}") + try: + proc = mon_manager.raw_cluster_cmd("tell", mds, "perf", "dump", + "--format=json-pretty") + response_data = proc.strip() + except CommandFailedError as e: + if e.exitstatus == errno.ENOENT: + log.debug(f"Failed to do 'perf dump' on {mds}") + continue + else: + manager = self.ctx.managers[cluster_name] + proc = manager.admin_socket(daemon_type, daemon_id, ["perf", "dump"]) + response_data = proc.stdout.getvalue().strip() if response_data: perf_dump = json.loads(response_data) else: - log.warning("No admin socket response from {0}, skipping".format(daemon_id)) + log.warning("No response from {0}, skipping".format(daemon_id)) continue minval = '' diff --git a/qa/tasks/kafka.py b/qa/tasks/kafka.py index 5e6c208ca30..833f03babf6 100644 --- a/qa/tasks/kafka.py +++ b/qa/tasks/kafka.py @@ -4,6 +4,7 @@ Deploy and configure Kafka for Teuthology import contextlib import logging import time +import os from teuthology import misc as teuthology from teuthology import contextutil @@ -33,6 +34,13 @@ def install_kafka(ctx, config): assert isinstance(config, dict) log.info('Installing Kafka...') + # programmatically find a nearby mirror so as not to hammer archive.apache.org + apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \ + "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1" + log.info("determining apache mirror by running: " + apache_mirror_cmd) + apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/) + log.info("chosen apache mirror is " + apache_mirror_url_front) + for (client, _) in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() test_dir=teuthology.get_testdir(ctx) @@ -40,7 +48,8 @@ def install_kafka(ctx, config): kafka_file = kafka_prefix + current_version + '.tgz' - link1 = 'https://archive.apache.org/dist/kafka/' + current_version + '/' + kafka_file + link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \ + current_version + '/' + kafka_file ctx.cluster.only(client).run( args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1], ) diff --git a/qa/tasks/kafka_failover.py b/qa/tasks/kafka_failover.py new file mode 100644 index 00000000000..3ca60ab84fc --- /dev/null +++ b/qa/tasks/kafka_failover.py @@ -0,0 +1,244 @@ +""" +Deploy and configure Kafka for Teuthology +""" +import contextlib +import logging +import time +import os + +from teuthology import misc as teuthology +from teuthology import contextutil +from teuthology.orchestra import run + +log = logging.getLogger(__name__) + +def get_kafka_version(config): + for client, client_config in config.items(): + if 'kafka_version' in client_config: + kafka_version = client_config.get('kafka_version') + return kafka_version + +kafka_prefix = 'kafka_2.13-' + +def get_kafka_dir(ctx, config): + kafka_version = get_kafka_version(config) + current_version = kafka_prefix + kafka_version + return '{tdir}/{ver}'.format(tdir=teuthology.get_testdir(ctx),ver=current_version) + + +@contextlib.contextmanager +def install_kafka(ctx, config): + """ + Downloading the kafka tar file. + """ + assert isinstance(config, dict) + log.info('Installing Kafka...') + + # programmatically find a nearby mirror so as not to hammer archive.apache.org + apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \ + "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1" + log.info("determining apache mirror by running: " + apache_mirror_cmd) + apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/) + log.info("chosen apache mirror is " + apache_mirror_url_front) + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + test_dir=teuthology.get_testdir(ctx) + current_version = get_kafka_version(config) + + kafka_file = kafka_prefix + current_version + '.tgz' + + link1 = '{apache_mirror_url_front}/kafka/'.format(apache_mirror_url_front=apache_mirror_url_front) + \ + current_version + '/' + kafka_file + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'wget', link1], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=test_dir), run.Raw('&&'), 'tar', '-xvzf', kafka_file], + ) + + kafka_dir = get_kafka_dir(ctx, config) + # create config for second broker + second_broker_config_name = "server2.properties" + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_data_logs_escaped = "{}/logs".format(second_broker_data).replace("/", "\/") + + ctx.cluster.only(client).run( + args=['cd', '{tdir}'.format(tdir=kafka_dir), run.Raw('&&'), + 'cp', '{tdir}/config/server.properties'.format(tdir=kafka_dir), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'mkdir', '-p', '{tdir}/data'.format(tdir=kafka_dir) + ], + ) + + # edit config + ctx.cluster.only(client).run( + args=['sed', '-i', 's/broker.id=0/broker.id=1/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/localhost:19092/g', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'sed', '-i', 's/log.dirs=\/tmp\/kafka-logs/log.dirs={}/g'.format(second_broker_data_logs_escaped), '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name), run.Raw('&&'), + 'cat', '{tdir}/config/{second_broker_config_name}'.format(tdir=kafka_dir, second_broker_config_name=second_broker_config_name) + ] + ) + + try: + yield + finally: + log.info('Removing packaged dependencies of Kafka...') + test_dir=get_kafka_dir(ctx, config) + current_version = get_kafka_version(config) + for (client,_) in config.items(): + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/logs'.format(tdir=test_dir)], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', test_dir], + ) + + ctx.cluster.only(client).run( + args=['rm', '-rf', '{tdir}/{doc}'.format(tdir=teuthology.get_testdir(ctx),doc=kafka_file)], + ) + + +@contextlib.contextmanager +def run_kafka(ctx,config): + """ + This includes two parts: + 1. Starting Zookeeper service + 2. Starting Kafka service + """ + assert isinstance(config, dict) + log.info('Bringing up Zookeeper and Kafka services...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + kafka_dir = get_kafka_dir(ctx, config) + + second_broker_data = "{tdir}/data/broker02".format(tdir=kafka_dir) + second_broker_java_log_dir = "{}/java_logs".format(second_broker_data) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './zookeeper-server-start.sh', + '{tir}/config/zookeeper.properties'.format(tir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + './kafka-server-start.sh', + '{tir}/config/server.properties'.format(tir=get_kafka_dir(ctx, config)), + run.Raw('&'), 'exit' + ], + ) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=kafka_dir), run.Raw('&&'), + run.Raw('LOG_DIR={second_broker_java_log_dir}'.format(second_broker_java_log_dir=second_broker_java_log_dir)), + './kafka-server-start.sh', '{tdir}/config/server2.properties'.format(tdir=kafka_dir), + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + log.info('Stopping Zookeeper and Kafka Services...') + + for (client, _) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-server-stop.sh', + '{tir}/config/kafka.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run( + args=['cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './zookeeper-server-stop.sh', + '{tir}/config/zookeeper.properties'.format(tir=get_kafka_dir(ctx, config)), + ], + ) + + time.sleep(5) + + ctx.cluster.only(client).run(args=['killall', '-9', 'java']) + + +@contextlib.contextmanager +def run_admin_cmds(ctx,config): + """ + Running Kafka Admin commands in order to check the working of producer anf consumer and creation of topic. + """ + assert isinstance(config, dict) + log.info('Checking kafka server through producer/consumer commands...') + for (client,_) in config.items(): + (remote,) = ctx.cluster.only(client).remotes.keys() + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-topics.sh', '--create', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + 'echo', "First", run.Raw('|'), + './kafka-console-producer.sh', '--topic', 'quickstart-events', + '--bootstrap-server', 'localhost:9092' + ], + ) + + ctx.cluster.only(client).run( + args=[ + 'cd', '{tdir}/bin'.format(tdir=get_kafka_dir(ctx, config)), run.Raw('&&'), + './kafka-console-consumer.sh', '--topic', 'quickstart-events', + '--from-beginning', + '--bootstrap-server', 'localhost:9092', + run.Raw('&'), 'exit' + ], + ) + + try: + yield + finally: + pass + + +@contextlib.contextmanager +def task(ctx,config): + """ + Following is the way how to run kafka:: + tasks: + - kafka: + client.0: + kafka_version: 2.6.0 + """ + assert config is None or isinstance(config, list) \ + or isinstance(config, dict), \ + "task kafka only supports a list or dictionary for configuration" + + all_clients = ['client.{id}'.format(id=id_) + for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')] + if config is None: + config = all_clients + if isinstance(config, list): + config = dict.fromkeys(config) + + log.debug('Kafka config is %s', config) + + with contextutil.nested( + lambda: install_kafka(ctx=ctx, config=config), + lambda: run_kafka(ctx=ctx, config=config), + lambda: run_admin_cmds(ctx=ctx, config=config), + ): + yield + diff --git a/qa/tasks/mgr/dashboard/helper.py b/qa/tasks/mgr/dashboard/helper.py index e6a7c35a23d..55355048a36 100644 --- a/qa/tasks/mgr/dashboard/helper.py +++ b/qa/tasks/mgr/dashboard/helper.py @@ -220,13 +220,11 @@ class DashboardTestCase(MgrTestCase): # To avoid any issues with e.g. unlink bugs, we destroy and recreate # the filesystem rather than just doing a rm -rf of files - cls.mds_cluster.mds_stop() - cls.mds_cluster.mds_fail() cls.mds_cluster.delete_all_filesystems() + cls.mds_cluster.mds_restart() # to reset any run-time configs, etc. cls.fs = None # is now invalid! cls.fs = cls.mds_cluster.newfs(create=True) - cls.fs.mds_restart() # In case some test messed with auth caps, reset them # pylint: disable=not-an-iterable diff --git a/qa/tasks/mgr/dashboard/test_mgr_module.py b/qa/tasks/mgr/dashboard/test_mgr_module.py index d6a368905b6..1dbdef23d34 100644 --- a/qa/tasks/mgr/dashboard/test_mgr_module.py +++ b/qa/tasks/mgr/dashboard/test_mgr_module.py @@ -4,6 +4,7 @@ from __future__ import absolute_import import logging import requests +from urllib3.exceptions import MaxRetryError from .helper import (DashboardTestCase, JLeaf, JList, JObj, module_options_object_schema, module_options_schema, @@ -24,10 +25,11 @@ class MgrModuleTestCase(DashboardTestCase): def _check_connection(): try: # Try reaching an API endpoint successfully. + logger.info('Trying to reach the REST API endpoint') self._get('/api/mgr/module') if self._resp.status_code == 200: return True - except requests.ConnectionError: + except (MaxRetryError, requests.ConnectionError): pass return False diff --git a/qa/tasks/mgr/dashboard/test_rbd.py b/qa/tasks/mgr/dashboard/test_rbd.py index a872645e33e..83b3bf520c2 100644 --- a/qa/tasks/mgr/dashboard/test_rbd.py +++ b/qa/tasks/mgr/dashboard/test_rbd.py @@ -869,7 +869,19 @@ class RbdTest(DashboardTestCase): self.assertEqual(clone_format_version, 2) self.assertStatus(200) + # if empty list is sent, then the config will remain as it is value = [] + res = [{'section': "global", 'value': "2"}] + self._post('/api/cluster_conf', { + 'name': config_name, + 'value': value + }) + self.wait_until_equal( + lambda: _get_config_by_name(config_name), + res, + timeout=60) + + value = [{'section': "global", 'value': ""}] self._post('/api/cluster_conf', { 'name': config_name, 'value': value diff --git a/qa/tasks/mgr/dashboard/test_rgw.py b/qa/tasks/mgr/dashboard/test_rgw.py index 5c7b0329675..a9071bc2a3a 100644 --- a/qa/tasks/mgr/dashboard/test_rgw.py +++ b/qa/tasks/mgr/dashboard/test_rgw.py @@ -785,7 +785,7 @@ class RgwUserSubuserTest(RgwTestCase): 'access': 'readwrite', 'key_type': 'swift' }) - self.assertStatus(200) + self.assertStatus(201) data = self.jsonBody() subuser = self.find_object_in_list('id', 'teuth-test-user:tux', data) self.assertIsInstance(subuser, object) @@ -808,7 +808,7 @@ class RgwUserSubuserTest(RgwTestCase): 'access_key': 'yyy', 'secret_key': 'xxx' }) - self.assertStatus(200) + self.assertStatus(201) data = self.jsonBody() subuser = self.find_object_in_list('id', 'teuth-test-user:hugo', data) self.assertIsInstance(subuser, object) diff --git a/qa/tasks/mgr/mgr_test_case.py b/qa/tasks/mgr/mgr_test_case.py index 9032e0e2658..4a5506391f2 100644 --- a/qa/tasks/mgr/mgr_test_case.py +++ b/qa/tasks/mgr/mgr_test_case.py @@ -1,5 +1,6 @@ import json import logging +import socket from unittest import SkipTest @@ -229,15 +230,22 @@ class MgrTestCase(CephTestCase): """ # Start handing out ports well above Ceph's range. assign_port = min_port + ip_addr = cls.mgr_cluster.get_mgr_map()['active_addr'].split(':')[0] for mgr_id in cls.mgr_cluster.mgr_ids: cls.mgr_cluster.mgr_stop(mgr_id) cls.mgr_cluster.mgr_fail(mgr_id) + for mgr_id in cls.mgr_cluster.mgr_ids: - log.debug("Using port {0} for {1} on mgr.{2}".format( - assign_port, module_name, mgr_id - )) + # Find a port that isn't in use + while True: + if not cls.is_port_in_use(ip_addr, assign_port): + break + log.debug(f"Port {assign_port} in use, trying next") + assign_port += 1 + + log.debug(f"Using port {assign_port} for {module_name} on mgr.{mgr_id}") cls.mgr_cluster.set_module_localized_conf(module_name, mgr_id, config_name, str(assign_port), @@ -255,3 +263,8 @@ class MgrTestCase(CephTestCase): mgr_map['active_name'], mgr_map['active_gid'])) return done cls.wait_until_true(is_available, timeout=30) + + @classmethod + def is_port_in_use(cls, ip_addr: str, port: int) -> bool: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex((ip_addr, port)) == 0 diff --git a/qa/tasks/notification_tests.py b/qa/tasks/notification_tests.py index b4697a6f797..f1eae3c89c4 100644 --- a/qa/tasks/notification_tests.py +++ b/qa/tasks/notification_tests.py @@ -220,7 +220,7 @@ def run_tests(ctx, config): for client, client_config in config.items(): (remote,) = ctx.cluster.only(client).remotes.keys() - attr = ["!kafka_test", "!data_path_v2_kafka_test", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] + attr = ["!kafka_test", "!data_path_v2_kafka_test", "!kafka_failover", "!amqp_test", "!amqp_ssl_test", "!kafka_security_test", "!modification_required", "!manual_test", "!http_test"] if 'extra_attr' in client_config: attr = client_config.get('extra_attr') diff --git a/qa/tasks/nvme_loop.py b/qa/tasks/nvme_loop.py index fef270ea085..fdec467a16d 100644 --- a/qa/tasks/nvme_loop.py +++ b/qa/tasks/nvme_loop.py @@ -70,7 +70,7 @@ def task(ctx, config): remote.run(args=['lsblk'], stdout=StringIO()) p = remote.run(args=['sudo', 'nvme', 'list', '-o', 'json'], stdout=StringIO()) new_devs = [] - # `nvme list -o json` will return the following output: + # `nvme list -o json` will return one of the following output: '''{ "Devices" : [ { @@ -91,13 +91,112 @@ def task(ctx, config): } ] }''' + '''{ + "Devices":[ + { + "HostNQN":"nqn.2014-08.org.nvmexpress:uuid:00000000-0000-0000-0000-0cc47ada6ba4", + "HostID":"898a0e10-da2d-4a42-8017-d9c445089d0c", + "Subsystems":[ + { + "Subsystem":"nvme-subsys0", + "SubsystemNQN":"nqn.2014.08.org.nvmexpress:80868086CVFT623300LN400BGN INTEL SSDPEDMD400G4", + "Controllers":[ + { + "Controller":"nvme0", + "Cntlid":"0", + "SerialNumber":"CVFT623300LN400BGN", + "ModelNumber":"INTEL SSDPEDMD400G4", + "Firmware":"8DV101H0", + "Transport":"pcie", + "Address":"0000:02:00.0", + "Slot":"2", + "Namespaces":[ + { + "NameSpace":"nvme0n1", + "Generic":"ng0n1", + "NSID":1, + "UsedBytes":400088457216, + "MaximumLBA":781422768, + "PhysicalSize":400088457216, + "SectorSize":512 + } + ], + "Paths":[ + ] + } + ], + "Namespaces":[ + ] + } + ] + } + ] + } + ''' + '''{ + "Devices":[ + { + "HostNQN":"nqn.2014-08.org.nvmexpress:uuid:00000000-0000-0000-0000-0cc47ada6ba4", + "HostID":"898a0e10-da2d-4a42-8017-d9c445089d0c", + "Subsystems":[ + { + "Subsystem":"nvme-subsys0", + "SubsystemNQN":"nqn.2014.08.org.nvmexpress:80868086CVFT534400C2400BGN INTEL SSDPEDMD400G4", + "Controllers":[ + { + "Controller":"nvme0", + "Cntlid":"0", + "SerialNumber":"CVFT534400C2400BGN", + "ModelNumber":"INTEL SSDPEDMD400G4", + "Firmware":"8DV101H0", + "Transport":"pcie", + "Address":"0000:02:00.0", + "Slot":"2", + "Namespaces":[ + { + "NameSpace":"nvme0n1", + "Generic":"ng0n1", + "NSID":1, + "UsedBytes":400088457216, + "MaximumLBA":781422768, + "PhysicalSize":400088457216, + "SectorSize":512 + } + ], + "Paths":[ + ] + } + ], + "Namespaces":[ + ] + } + ] + } + ] + } + ''' nvme_list = json.loads(p.stdout.getvalue()) for device in nvme_list['Devices']: - dev = device['DevicePath'] - vendor = device['ModelNumber'] - if dev.startswith('/dev/') and vendor == 'Linux': - new_devs.append(dev) - bluestore_zap(remote, dev) + try: + # first try format 1 / older format + dev = device['DevicePath'] + vendor = device['ModelNumber'] + if dev.startswith('/dev/') and vendor == 'Linux': + new_devs.append(dev) + bluestore_zap(remote, dev) + except KeyError: + for subsystem in device['Subsystems']: + # format 2 + if 'Namespaces' in subsystem and subsystem['Namespaces']: + dev = '/dev/' + subsystem['Namespaces'][0]['NameSpace'] + # try format 3 last + else: + dev = '/dev/' + subsystem['Controllers'][0]['Namespaces'][0]['NameSpace'] + # vendor is the same for format 2 and 3 + vendor = subsystem['Controllers'][0]['ModelNumber'] + if vendor == 'Linux': + new_devs.append(dev) + bluestore_zap(remote, dev) log.info(f'new_devs {new_devs}') assert len(new_devs) <= len(devs) if len(new_devs) == len(devs): diff --git a/qa/tasks/nvmeof.py b/qa/tasks/nvmeof.py index 42e357294d9..691a6f7dd86 100644 --- a/qa/tasks/nvmeof.py +++ b/qa/tasks/nvmeof.py @@ -128,12 +128,11 @@ class Nvmeof(Task): total_images = int(self.namespaces_count) * int(self.subsystems_count) log.info(f'[nvmeof]: creating {total_images} images') + rbd_create_cmd = [] for i in range(1, total_images + 1): imagename = self.image_name_prefix + str(i) - log.info(f'[nvmeof]: rbd create {poolname}/{imagename} --size {self.rbd_size}') - _shell(self.ctx, self.cluster_name, self.remote, [ - 'rbd', 'create', f'{poolname}/{imagename}', '--size', f'{self.rbd_size}' - ]) + rbd_create_cmd += ['rbd', 'create', f'{poolname}/{imagename}', '--size', f'{self.rbd_size}', run.Raw(';')] + _shell(self.ctx, self.cluster_name, self.remote, rbd_create_cmd) for role, i in daemons.items(): remote, id_ = i @@ -251,9 +250,9 @@ class NvmeofThrasher(Thrasher, Greenlet): daemon_max_thrash_times: For now, NVMeoF daemons have limitation that each daemon can - be thrashed only 3 times in span of 30 mins. This option + be thrashed only 5 times in span of 30 mins. This option allows to set the amount of times it could be thrashed in a period - of time. (default: 3) + of time. (default: 5) daemon_max_thrash_period: This option goes with the above option. It sets the period of time over which each daemons can be thrashed for daemon_max_thrash_times @@ -306,17 +305,17 @@ class NvmeofThrasher(Thrasher, Greenlet): self.max_thrash_daemons = int(self.config.get('max_thrash', len(self.daemons) - 1)) # Limits on thrashing each daemon - self.daemon_max_thrash_times = int(self.config.get('daemon_max_thrash_times', 3)) + self.daemon_max_thrash_times = int(self.config.get('daemon_max_thrash_times', 5)) self.daemon_max_thrash_period = int(self.config.get('daemon_max_thrash_period', 30 * 60)) # seconds self.min_thrash_delay = int(self.config.get('min_thrash_delay', 60)) self.max_thrash_delay = int(self.config.get('max_thrash_delay', self.min_thrash_delay + 30)) - self.min_revive_delay = int(self.config.get('min_revive_delay', 100)) + self.min_revive_delay = int(self.config.get('min_revive_delay', 60)) self.max_revive_delay = int(self.config.get('max_revive_delay', self.min_revive_delay + 30)) def _get_devices(self, remote): GET_DEVICE_CMD = "sudo nvme list --output-format=json | " \ - "jq -r '.Devices | sort_by(.NameSpace) | .[] | select(.ModelNumber == \"Ceph bdev Controller\") | .DevicePath'" + "jq -r '.Devices[].Subsystems[] | select(.Controllers | all(.ModelNumber == \"Ceph bdev Controller\")) | .Namespaces | sort_by(.NSID) | .[] | .NameSpace'" devices = remote.sh(GET_DEVICE_CMD).split() return devices @@ -347,6 +346,7 @@ class NvmeofThrasher(Thrasher, Greenlet): run.Raw('&&'), 'ceph', 'orch', 'ps', '--daemon-type', 'nvmeof', run.Raw('&&'), 'ceph', 'health', 'detail', run.Raw('&&'), 'ceph', '-s', + run.Raw('&&'), 'sudo', 'nvme', 'list', ] for dev in self.devices: check_cmd += [ @@ -421,13 +421,11 @@ class NvmeofThrasher(Thrasher, Greenlet): while not self.stopping.is_set(): killed_daemons = defaultdict(list) - weight = 1.0 / len(self.daemons) - count = 0 + thrash_daemon_num = self.rng.randint(1, self.max_thrash_daemons) + selected_daemons = self.rng.sample(self.daemons, thrash_daemon_num) for daemon in self.daemons: - skip = self.rng.uniform(0.0, 1.0) - if weight <= skip: - self.log('skipping daemon {label} with skip ({skip}) > weight ({weight})'.format( - label=daemon.id_, skip=skip, weight=weight)) + if daemon not in selected_daemons: + self.log(f'skipping daemon {daemon.id_} ...') continue # For now, nvmeof daemons can only be thrashed 3 times in last 30mins. @@ -445,17 +443,11 @@ class NvmeofThrasher(Thrasher, Greenlet): continue self.log('kill {label}'.format(label=daemon.id_)) - # daemon.stop() kill_method = self.kill_daemon(daemon) killed_daemons[kill_method].append(daemon) daemons_thrash_history[daemon.id_] += [datetime.now()] - # only thrash max_thrash_daemons amount of daemons - count += 1 - if count >= self.max_thrash_daemons: - break - if killed_daemons: iteration_summary = "thrashed- " for kill_method in killed_daemons: @@ -468,7 +460,7 @@ class NvmeofThrasher(Thrasher, Greenlet): self.log(f'waiting for {revive_delay} secs before reviving') time.sleep(revive_delay) # blocking wait - self.log('done waiting before reviving') + self.log(f'done waiting before reviving - iteration #{len(summary)}: {iteration_summary}') self.do_checks() self.switch_task() @@ -487,7 +479,7 @@ class NvmeofThrasher(Thrasher, Greenlet): if thrash_delay > 0.0: self.log(f'waiting for {thrash_delay} secs before thrashing') time.sleep(thrash_delay) # blocking - self.log('done waiting before thrashing') + self.log('done waiting before thrashing - everything should be up now') self.do_checks() self.switch_task() diff --git a/qa/tasks/radosgw_admin.py b/qa/tasks/radosgw_admin.py index 3b98702acca..fb82378761b 100644 --- a/qa/tasks/radosgw_admin.py +++ b/qa/tasks/radosgw_admin.py @@ -16,6 +16,7 @@ import logging import time import datetime import sys +import errno from io import StringIO from queue import Queue @@ -725,6 +726,40 @@ def task(ctx, config): (err, out) = rgwadmin(ctx, client, ['user', 'rm', '--tenant', tenant_name, '--uid', 'tenanteduser'], check_status=True) + account_id = 'RGW12312312312312312' + account_name = 'testacct' + rgwadmin(ctx, client, [ + 'account', 'create', + '--account-id', account_id, + '--account-name', account_name, + ], check_status=True) + rgwadmin(ctx, client, [ + 'user', 'create', + '--account-id', account_id, + '--uid', 'testacctuser', + '--display-name', 'accountuser', + '--gen-access-key', + '--gen-secret', + ], check_status=True) + + # TESTCASE 'bucket link', 'bucket', 'account user', 'fails' + (err, out) = rgwadmin(ctx, client, ['bucket', 'link', '--bucket', bucket_name, '--uid', 'testacctuser']) + assert err == errno.EINVAL + + rgwadmin(ctx, client, ['user', 'rm', '--uid', 'testacctuser'], check_status=True) + + # TESTCASE 'bucket link', 'bucket', 'account', 'succeeds' + rgwadmin(ctx, client, + ['bucket', 'link', '--bucket', bucket_name, '--account-id', account_id], + check_status=True) + + # relink the bucket to the first user and delete the account + rgwadmin(ctx, client, + ['bucket', 'link', '--bucket', bucket_name, '--uid', user1], + check_status=True) + rgwadmin(ctx, client, ['account', 'rm', '--account-id', account_id], + check_status=True) + # TESTCASE 'object-rm', 'object', 'rm', 'remove object', 'succeeds, object is removed' # upload an object diff --git a/qa/tasks/rgw_multisite.py b/qa/tasks/rgw_multisite.py index e83a54efc2b..f93ca017fa2 100644 --- a/qa/tasks/rgw_multisite.py +++ b/qa/tasks/rgw_multisite.py @@ -361,6 +361,8 @@ def create_zonegroup(cluster, gateways, period, config): if endpoints: # replace client names with their gateway endpoints config['endpoints'] = extract_gateway_endpoints(gateways, endpoints) + if not config.get('api_name'): # otherwise it will be set to an empty string + config['api_name'] = config['name'] zonegroup = multisite.ZoneGroup(config['name'], period) # `zonegroup set` needs --default on command line, and 'is_master' in json args = is_default_arg(config) diff --git a/qa/tasks/rook.py b/qa/tasks/rook.py index 6cb75173966..fae5ef3bf00 100644 --- a/qa/tasks/rook.py +++ b/qa/tasks/rook.py @@ -8,7 +8,7 @@ import json import logging import os import yaml -from io import BytesIO +from io import BytesIO, StringIO from tarfile import ReadError from tasks.ceph_manager import CephManager @@ -235,10 +235,14 @@ def ceph_log(ctx, config): r = ctx.rook[cluster_name].remote.run( stdout=BytesIO(), args=args, + stderr=StringIO(), ) stdout = r.stdout.getvalue().decode() if stdout: return stdout + stderr = r.stderr.getvalue() + if stderr: + return stderr return None if first_in_ceph_log('\[ERR\]|\[WRN\]|\[SEC\]', diff --git a/qa/tasks/s3a_hadoop.py b/qa/tasks/s3a_hadoop.py index 7b77359fcf2..4518a6f397c 100644 --- a/qa/tasks/s3a_hadoop.py +++ b/qa/tasks/s3a_hadoop.py @@ -1,5 +1,6 @@ import contextlib import logging +import os from teuthology import misc from teuthology.orchestra import run @@ -40,7 +41,7 @@ def task(ctx, config): # get versions maven_major = config.get('maven-major', 'maven-3') - maven_version = config.get('maven-version', '3.6.3') + maven_version = config.get('maven-version', '3.9.9') hadoop_ver = config.get('hadoop-version', '2.9.2') bucket_name = config.get('bucket-name', 's3atest') access_key = config.get('access-key', 'EGAQRD2ULOIFKFSKCT4F') @@ -48,11 +49,19 @@ def task(ctx, config): 'secret-key', 'zi816w1vZKfaSM85Cl0BxXTwSLyN7zB4RbTswrGb') + # programmatically find a nearby mirror so as not to hammer archive.apache.org + apache_mirror_cmd="curl 'https://www.apache.org/dyn/closer.cgi' 2>/dev/null | " \ + "grep -o '<strong>[^<]*</strong>' | sed 's/<[^>]*>//g' | head -n 1" + log.info("determining apache mirror by running: " + apache_mirror_cmd) + apache_mirror_url_front = os.popen(apache_mirror_cmd).read().rstrip() # note: includes trailing slash (/) + log.info("chosen apache mirror is " + apache_mirror_url_front) + # set versions for cloning the repo apache_maven = 'apache-maven-{maven_version}-bin.tar.gz'.format( maven_version=maven_version) - maven_link = 'http://archive.apache.org/dist/maven/' + \ - '{maven_major}/{maven_version}/binaries/'.format(maven_major=maven_major, maven_version=maven_version) + apache_maven + maven_link = '{apache_mirror_url_front}/maven/'.format(apache_mirror_url_front=apache_mirror_url_front) + \ + '{maven_major}/{maven_version}/binaries/'.format(maven_major=maven_major, maven_version=maven_version) + \ + apache_maven hadoop_git = 'https://github.com/apache/hadoop' hadoop_rel = 'hadoop-{ver} rel/release-{ver}'.format(ver=hadoop_ver) if hadoop_ver == 'trunk': @@ -204,6 +213,7 @@ def run_s3atest(client, maven_version, testdir, test_options): run.Raw('&&'), run.Raw(rm_test), run.Raw('&&'), + run.Raw('JAVA_HOME=$(alternatives --list | grep jre_1.8.0 | head -n 1 | awk \'{print $3}\')'), run.Raw(run_test), run.Raw(test_options) ] diff --git a/qa/tasks/s3tests.py b/qa/tasks/s3tests.py index 6d7b39d5892..85ab97d23cd 100644 --- a/qa/tasks/s3tests.py +++ b/qa/tasks/s3tests.py @@ -57,6 +57,17 @@ def download(ctx, config): 'git', 'reset', '--hard', sha1, ], ) + if client_config.get('boto3_extensions'): + ctx.cluster.only(client).run( + args=['mkdir', + '-p', + '/home/ubuntu/.aws/models/s3/2006-03-01/'] + ) + (remote,) = ctx.cluster.only(client).remotes.keys() + remote_file = '/home/ubuntu/.aws/models/s3/2006-03-01/service-2.sdk-extras.json' + local_file = '{qadir}/../examples/rgw/boto3/service-2.sdk-extras.json'.format(qadir=ctx.config.get('suite_path')) + remote.put_file(local_file, remote_file) + try: yield finally: @@ -70,6 +81,17 @@ def download(ctx, config): '{tdir}/s3-tests-{client}'.format(tdir=testdir, client=client), ], ) + if client_config.get('boto3_extensions'): + ctx.cluster.only(client).run( + args=[ + 'rm', '-rf', '/home/ubuntu/.aws/models/s3/2006-03-01/service-2.sdk-extras.json', + ], + ) + ctx.cluster.only(client).run( + args=[ + 'cd', '/home/ubuntu/', run.Raw('&&'), 'rmdir', '-p', '.aws/models/s3/2006-03-01/', + ], + ) def _config_user(s3tests_conf, section, user, email): @@ -444,8 +466,10 @@ def run_tests(ctx, config): attrs += ['not fails_with_subdomain'] if not client_config.get('with-sse-s3'): attrs += ['not sse_s3'] - + attrs += client_config.get('extra_attrs', []) + if 'bucket_logging' not in attrs: + attrs += ['not bucket_logging'] if 'unit_test_scan' in client_config and client_config['unit_test_scan']: xmlfile_id = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S--") + str(uuid.uuid4()) xmlpath= f'{testdir}/archive/s3test-{xmlfile_id}.xml' diff --git a/qa/tasks/s3tests_java.py b/qa/tasks/s3tests_java.py index 3e20e10d06c..a58aa6cf0b4 100644 --- a/qa/tasks/s3tests_java.py +++ b/qa/tasks/s3tests_java.py @@ -284,6 +284,7 @@ class S3tests_java(Task): args = ['cd', '{tdir}/s3-tests-java'.format(tdir=testdir), run.Raw('&&'), + run.Raw('JAVA_HOME=$(alternatives --list | grep jre_1.8.0 | head -n 1 | awk \'{print $3}\')'), '/opt/gradle/gradle/bin/gradle', 'clean', 'test', '--rerun-tasks', '--no-build-cache', ] diff --git a/qa/tasks/stretch_mode_disable_enable.py b/qa/tasks/stretch_mode_disable_enable.py new file mode 100644 index 00000000000..a84a85bb307 --- /dev/null +++ b/qa/tasks/stretch_mode_disable_enable.py @@ -0,0 +1,547 @@ +import logging +from tasks.mgr.mgr_test_case import MgrTestCase + +log = logging.getLogger(__name__) + +class TestStretchMode(MgrTestCase): + """ + Test the stretch mode feature of Ceph + """ + POOL = 'stretch_pool' + CLUSTER = "ceph" + WRITE_PERIOD = 10 + RECOVERY_PERIOD = WRITE_PERIOD * 6 + SUCCESS_HOLD_TIME = 7 + STRETCH_CRUSH_RULE = 'stretch_rule' + STRETCH_CRUSH_RULE_ID = None + STRETCH_BUCKET_TYPE = 'datacenter' + TIEBREAKER_MON_NAME = 'e' + DEFAULT_POOL_TYPE = 'replicated' + DEFAULT_POOL_CRUSH_RULE = 'replicated_rule' + DEFAULT_POOL_SIZE = 3 + DEFAULT_POOL_MIN_SIZE = 2 + DEFAULT_POOL_CRUSH_RULE_ID = None + # This dictionary maps the datacenter to the osd ids and hosts + DC_OSDS = { + 'dc1': { + "host01": [0, 1], + "host02": [2, 3], + }, + 'dc2': { + "host03": [4, 5], + "host04": [6, 7], + }, + } + DC_MONS = { + 'dc1': { + "host01": ['a'], + "host02": ['b'], + }, + 'dc2': { + "host03": ['c'], + "host04": ['d'], + }, + 'dc3': { + "host05": ['e'], + } + } + def _osd_count(self): + """ + Get the number of OSDs in the cluster. + """ + osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json() + return len(osd_map['osds']) + + def setUp(self): + """ + Setup the cluster and + ensure we have a clean condition before the test. + """ + # Ensure we have at least 6 OSDs + super(TestStretchMode, self).setUp() + self.DEFAULT_POOL_CRUSH_RULE_ID = self.mgr_cluster.mon_manager.get_crush_rule_id(self.DEFAULT_POOL_CRUSH_RULE) + self.STRETCH_CRUSH_RULE_ID = self.mgr_cluster.mon_manager.get_crush_rule_id(self.STRETCH_CRUSH_RULE) + if self._osd_count() < 4: + self.skipTest("Not enough OSDS!") + + # Remove any filesystems so that we can remove their pools + if self.mds_cluster: + self.mds_cluster.mds_stop() + self.mds_cluster.mds_fail() + self.mds_cluster.delete_all_filesystems() + + # Remove all other pools + for pool in self.mgr_cluster.mon_manager.get_osd_dump_json()['pools']: + try: + self.mgr_cluster.mon_manager.remove_pool(pool['pool_name']) + except: + self.mgr_cluster.mon_manager.raw_cluster_cmd( + 'osd', 'pool', 'delete', + pool['pool_name'], + pool['pool_name'], + '--yes-i-really-really-mean-it') + + def _setup_pool( + self, + pool_name=POOL, + pg_num=16, + pool_type=DEFAULT_POOL_TYPE, + crush_rule=DEFAULT_POOL_CRUSH_RULE, + size=None, + min_size=None + ): + """ + Create a pool, set its size and pool if specified. + """ + self.mgr_cluster.mon_manager.raw_cluster_cmd( + 'osd', 'pool', 'create', pool_name, str(pg_num), pool_type, crush_rule) + + if size is not None: + self.mgr_cluster.mon_manager.raw_cluster_cmd( + 'osd', 'pool', 'set', pool_name, 'size', str(size)) + + if min_size is not None: + self.mgr_cluster.mon_manager.raw_cluster_cmd( + 'osd', 'pool', 'set', pool_name, 'min_size', str(min_size)) + + def _write_some_data(self, t): + """ + Write some data to the pool to simulate a workload. + """ + args = [ + "rados", "-p", self.POOL, "bench", str(t), "write", "-t", "16"] + self.mgr_cluster.admin_remote.run(args=args, wait=True) + + def _get_all_mons_from_all_dc(self): + """ + Get all mons from all datacenters. + """ + return [mon for dc in self.DC_MONS.values() for mons in dc.values() for mon in mons] + + def _bring_back_mon(self, mon): + """ + Bring back the mon. + """ + try: + self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).restart() + except Exception: + log.error("Failed to bring back mon.{}".format(str(mon))) + pass + + def _get_host(self, osd): + """ + Get the host of the osd. + """ + for dc, nodes in self.DC_OSDS.items(): + for node, osds in nodes.items(): + if osd in osds: + return node + return None + + def _move_osd_back_to_host(self, osd): + """ + Move the osd back to the host. + """ + host = self._get_host(osd) + assert host is not None, "The host of osd {} is not found.".format(osd) + log.debug("Moving osd.%d back to %s", osd, host) + self.mgr_cluster.mon_manager.raw_cluster_cmd( + 'osd', 'crush', 'move', 'osd.{}'.format(str(osd)), + 'host={}'.format(host) + ) + + def tearDown(self): + """ + Clean up the cluster after the test. + """ + # Remove the pool + if self.POOL in self.mgr_cluster.mon_manager.pools: + self.mgr_cluster.mon_manager.remove_pool(self.POOL) + + osd_map = self.mgr_cluster.mon_manager.get_osd_dump_json() + for osd in osd_map['osds']: + # mark all the osds in + if osd['weight'] == 0.0: + self.mgr_cluster.mon_manager.raw_cluster_cmd( + 'osd', 'in', str(osd['osd'])) + # Bring back all the osds and move it back to the host. + if osd['up'] == 0: + self.mgr_cluster.mon_manager.revive_osd(osd['osd']) + self._move_osd_back_to_host(osd['osd']) + + # Bring back all the mons + mons = self._get_all_mons_from_all_dc() + for mon in mons: + self._bring_back_mon(mon) + super(TestStretchMode, self).tearDown() + + def _kill_osd(self, osd): + """ + Kill the osd. + """ + try: + self.ctx.daemons.get_daemon('osd', osd, self.CLUSTER).stop() + except Exception: + log.error("Failed to stop osd.{}".format(str(osd))) + pass + + def _get_osds_data(self, want_osds): + """ + Get the osd data + """ + all_osds_data = \ + self.mgr_cluster.mon_manager.get_osd_dump_json()['osds'] + return [ + osd_data for osd_data in all_osds_data + if int(osd_data['osd']) in want_osds + ] + + def _get_osds_by_dc(self, dc): + """ + Get osds by datacenter. + """ + ret = [] + for host, osds in self.DC_OSDS[dc].items(): + ret.extend(osds) + return ret + + def _fail_over_all_osds_in_dc(self, dc): + """ + Fail over all osds in specified <datacenter> + """ + if not isinstance(dc, str): + raise ValueError("dc must be a string") + if dc not in self.DC_OSDS: + raise ValueError( + "dc must be one of the following: %s" % self.DC_OSDS.keys() + ) + log.debug("Failing over all osds in %s", dc) + osds = self._get_osds_by_dc(dc) + # fail over all the OSDs in the DC + log.debug("OSDs to failed over: %s", osds) + for osd_id in osds: + self._kill_osd(osd_id) + # wait until all the osds are down + self.wait_until_true( + lambda: all([int(osd['up']) == 0 + for osd in self._get_osds_data(osds)]), + timeout=self.RECOVERY_PERIOD + ) + + def _check_mons_out_of_quorum(self, want_mons): + """ + Check if the mons are not in quorum. + """ + quorum_names = self.mgr_cluster.mon_manager.get_mon_quorum_names() + return all([mon not in quorum_names for mon in want_mons]) + + def _kill_mon(self, mon): + """ + Kill the mon. + """ + try: + self.ctx.daemons.get_daemon('mon', mon, self.CLUSTER).stop() + except Exception: + log.error("Failed to stop mon.{}".format(str(mon))) + pass + + def _get_mons_by_dc(self, dc): + """ + Get mons by datacenter. + """ + ret = [] + for host, mons in self.DC_MONS[dc].items(): + ret.extend(mons) + return ret + + def _fail_over_all_mons_in_dc(self, dc): + """ + Fail over all mons in the specified <datacenter> + """ + if not isinstance(dc, str): + raise ValueError("dc must be a string") + if dc not in self.DC_MONS: + raise ValueError("dc must be one of the following: %s" % + ", ".join(self.DC_MONS.keys())) + log.debug("Failing over all mons %s", dc) + mons = self._get_mons_by_dc(dc) + log.debug("Mons to be failed over: %s", mons) + for mon in mons: + self._kill_mon(mon) + # wait until all the mons are out of quorum + self.wait_until_true( + lambda: self._check_mons_out_of_quorum(mons), + timeout=self.RECOVERY_PERIOD + ) + + def _stretch_mode_enabled_correctly(self): + """ + Evaluate whether the stretch mode is enabled correctly. + by checking the OSDMap and MonMap. + """ + # Checking the OSDMap + osdmap = self.mgr_cluster.mon_manager.get_osd_dump_json() + for pool in osdmap['pools']: + # expects crush_rule to be stretch_rule + self.assertEqual( + self.STRETCH_CRUSH_RULE_ID, + pool['crush_rule'] + ) + # expects pool size to be 4 + self.assertEqual( + 4, + pool['size'] + ) + # expects pool min_size to be 2 + self.assertEqual( + 2, + pool['min_size'] + ) + # expects pool is_stretch_pool flag to be true + self.assertEqual( + True, + pool['is_stretch_pool'] + ) + # expects peering_crush_bucket_count = 2 (always this value for stretch mode) + self.assertEqual( + 2, + pool['peering_crush_bucket_count'] + ) + # expects peering_crush_bucket_target = 2 (always this value for stretch mode) + self.assertEqual( + 2, + pool['peering_crush_bucket_target'] + ) + # expects peering_crush_bucket_barrier = 8 (crush type of datacenter is 8) + self.assertEqual( + 8, + pool['peering_crush_bucket_barrier'] + ) + # expects stretch_mode_enabled to be True + self.assertEqual( + True, + osdmap['stretch_mode']['stretch_mode_enabled'] + ) + # expects stretch_mode_bucket_count to be 2 + self.assertEqual( + 2, + osdmap['stretch_mode']['stretch_bucket_count'] + ) + # expects degraded_stretch_mode to be 0 + self.assertEqual( + 0, + osdmap['stretch_mode']['degraded_stretch_mode'] + ) + # expects recovering_stretch_mode to be 0 + self.assertEqual( + 0, + osdmap['stretch_mode']['recovering_stretch_mode'] + ) + # expects stretch_mode_bucket to be 8 (datacenter crush type = 8) + self.assertEqual( + 8, + osdmap['stretch_mode']['stretch_mode_bucket'] + ) + # Checking the MonMap + monmap = self.mgr_cluster.mon_manager.get_mon_dump_json() + # expects stretch_mode to be True + self.assertEqual( + True, + monmap['stretch_mode'] + ) + # expects disallowed_leaders to be tiebreaker_mon + self.assertEqual( + self.TIEBREAKER_MON_NAME, + monmap['disallowed_leaders'] + ) + # expects tiebreaker_mon to be tiebreaker_mon + self.assertEqual( + self.TIEBREAKER_MON_NAME, + monmap['tiebreaker_mon'] + ) + + def _stretch_mode_disabled_correctly(self): + """ + Evaluate whether the stretch mode is disabled correctly. + by checking the OSDMap and MonMap. + """ + # Checking the OSDMap + osdmap = self.mgr_cluster.mon_manager.get_osd_dump_json() + for pool in osdmap['pools']: + # expects crush_rule to be default + self.assertEqual( + self.DEFAULT_POOL_CRUSH_RULE_ID, + pool['crush_rule'] + ) + # expects pool size to be default + self.assertEqual( + self.DEFAULT_POOL_SIZE, + pool['size'] + ) + # expects pool min_size to be default + self.assertEqual( + self.DEFAULT_POOL_MIN_SIZE, + pool['min_size'] + ) + # expects pool is_stretch_pool flag to be false + self.assertEqual( + False, + pool['is_stretch_pool'] + ) + # expects peering_crush_bucket_count = 0 + self.assertEqual( + 0, + pool['peering_crush_bucket_count'] + ) + # expects peering_crush_bucket_target = 0 + self.assertEqual( + 0, + pool['peering_crush_bucket_target'] + ) + # expects peering_crush_bucket_barrier = 0 + self.assertEqual( + 0, + pool['peering_crush_bucket_barrier'] + ) + # expects stretch_mode_enabled to be False + self.assertEqual( + False, + osdmap['stretch_mode']['stretch_mode_enabled'] + ) + # expects stretch_mode_bucket to be 0 + self.assertEqual( + 0, + osdmap['stretch_mode']['stretch_bucket_count'] + ) + # expects degraded_stretch_mode to be 0 + self.assertEqual( + 0, + osdmap['stretch_mode']['degraded_stretch_mode'] + ) + # expects recovering_stretch_mode to be 0 + self.assertEqual( + 0, + osdmap['stretch_mode']['recovering_stretch_mode'] + ) + # expects stretch_mode_bucket to be 0 + self.assertEqual( + 0, + osdmap['stretch_mode']['stretch_mode_bucket'] + ) + # Checking the MonMap + monmap = self.mgr_cluster.mon_manager.get_mon_dump_json() + # expects stretch_mode to be False + self.assertEqual( + False, + monmap['stretch_mode'] + ) + # expects disallowed_leaders to be empty + self.assertEqual( + "", + monmap['disallowed_leaders'] + ) + # expects tiebreaker_mon to be empty + self.assertEqual( + "", + monmap['tiebreaker_mon'] + ) + + def test_disable_stretch_mode(self): + """ + Test disabling stretch mode with the following scenario: + 1. Healthy Stretch Mode + 2. Degraded Stretch Mode + """ + # Create a pool + self._setup_pool(self.POOL, 16, 'replicated', self.STRETCH_CRUSH_RULE, 4, 2) + # Write some data to the pool + self._write_some_data(self.WRITE_PERIOD) + # disable stretch mode without --yes-i-really-mean-it (expects -EPERM 1) + self.assertEqual( + 1, + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'mon', + 'disable_stretch_mode' + )) + # Disable stretch mode with non-existent crush rule (expects -EINVAL 22) + self.assertEqual( + 22, + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'mon', + 'disable_stretch_mode', + 'non_existent_rule', + '--yes-i-really-mean-it' + )) + # Disable stretch mode with the current stretch rule (expect -EINVAL 22) + self.assertEqual( + 22, + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'mon', + 'disable_stretch_mode', + self.STRETCH_CRUSH_RULE, + '--yes-i-really-mean-it', + + )) + # Disable stretch mode without crush rule (expect success 0) + self.assertEqual( + 0, + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'mon', + 'disable_stretch_mode', + '--yes-i-really-mean-it' + )) + # Check if stretch mode is disabled correctly + self._stretch_mode_disabled_correctly() + # all PGs are active + clean + self.wait_until_true_and_hold( + lambda: self.mgr_cluster.mon_manager.pg_all_active_clean(), + timeout=self.RECOVERY_PERIOD, + success_hold_time=self.SUCCESS_HOLD_TIME + ) + # write some data to the pool + self._write_some_data(self.WRITE_PERIOD) + # Enable stretch mode + self.assertEqual( + 0, + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'mon', + 'enable_stretch_mode', + self.TIEBREAKER_MON_NAME, + self.STRETCH_CRUSH_RULE, + self.STRETCH_BUCKET_TYPE + )) + self._stretch_mode_enabled_correctly() + # all PGs are active + clean + self.wait_until_true_and_hold( + lambda: self.mgr_cluster.mon_manager.pg_all_active_clean(), + timeout=self.RECOVERY_PERIOD, + success_hold_time=self.SUCCESS_HOLD_TIME + ) + # write some data to the pool + # self._write_some_data(self.WRITE_PERIOD) + # Bring down dc1 + self._fail_over_all_osds_in_dc('dc1') + self._fail_over_all_mons_in_dc('dc1') + # should be in degraded stretch mode + self.wait_until_true_and_hold( + lambda: self.mgr_cluster.mon_manager.is_degraded_stretch_mode(), + timeout=self.RECOVERY_PERIOD, + success_hold_time=self.SUCCESS_HOLD_TIME + ) + # Disable stretch mode with valid crush rule (expect success 0) + self.assertEqual( + 0, + self.mgr_cluster.mon_manager.raw_cluster_cmd_result( + 'mon', + 'disable_stretch_mode', + self.DEFAULT_POOL_CRUSH_RULE, + '--yes-i-really-mean-it' + )) + # Check if stretch mode is disabled correctly + self._stretch_mode_disabled_correctly() + # all PGs are active + self.wait_until_true_and_hold( + lambda: self.mgr_cluster.mon_manager.pg_all_active(), + timeout=self.RECOVERY_PERIOD, + success_hold_time=self.SUCCESS_HOLD_TIME + ) diff --git a/qa/tasks/thrashosds-health.yaml b/qa/tasks/thrashosds-health.yaml index b70583a75e1..dbde1ced0db 100644 --- a/qa/tasks/thrashosds-health.yaml +++ b/qa/tasks/thrashosds-health.yaml @@ -30,3 +30,4 @@ overrides: - out of quorum - noscrub - nodeep-scrub + - is down diff --git a/qa/tasks/vstart_runner.py b/qa/tasks/vstart_runner.py index ca929ba05b4..2ed21431330 100644 --- a/qa/tasks/vstart_runner.py +++ b/qa/tasks/vstart_runner.py @@ -233,6 +233,11 @@ class LocalRemoteProcess(object): else: self.stderr.write(err) + def _handle_subprocess_output(self, output, stream): + if isinstance(stream, StringIO): + return rm_nonascii_chars(output) + return output + def wait(self, timeout=None): # Null subproc.stdin so communicate() does not try flushing/closing it # again. @@ -250,7 +255,8 @@ class LocalRemoteProcess(object): return out, err = self.subproc.communicate(timeout=timeout) - out, err = rm_nonascii_chars(out), rm_nonascii_chars(err) + out = self._handle_subprocess_output(out, self.stdout) + err = self._handle_subprocess_output(err, self.stderr) self._write_stdout(out) self._write_stderr(err) |