summaryrefslogtreecommitdiffstats
path: root/qa/tasks/cephfs
diff options
context:
space:
mode:
Diffstat (limited to 'qa/tasks/cephfs')
-rw-r--r--qa/tasks/cephfs/cephfs_test_case.py8
-rw-r--r--qa/tasks/cephfs/filesystem.py2
-rw-r--r--qa/tasks/cephfs/test_exports.py186
-rw-r--r--qa/tasks/cephfs/test_failover.py55
-rw-r--r--qa/tasks/cephfs/test_nfs.py53
5 files changed, 299 insertions, 5 deletions
diff --git a/qa/tasks/cephfs/cephfs_test_case.py b/qa/tasks/cephfs/cephfs_test_case.py
index c1312ec5efc..21b96d2b22b 100644
--- a/qa/tasks/cephfs/cephfs_test_case.py
+++ b/qa/tasks/cephfs/cephfs_test_case.py
@@ -252,8 +252,8 @@ class CephFSTestCase(CephTestCase):
def get_session_data(self, client_id):
return self._session_by_id(client_id)
- def _session_list(self):
- ls_data = self.fs.mds_asok(['session', 'ls'])
+ def _session_list(self, rank=None, status=None):
+ ls_data = self.fs.rank_asok(['session', 'ls'], rank=rank, status=status)
ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
return ls_data
@@ -269,9 +269,9 @@ class CephFSTestCase(CephTestCase):
def perf_dump(self, rank=None, status=None):
return self.fs.rank_asok(['perf', 'dump'], rank=rank, status=status)
- def wait_until_evicted(self, client_id, timeout=30):
+ def wait_until_evicted(self, client_id, rank=None, timeout=30):
def is_client_evicted():
- ls = self._session_list()
+ ls = self._session_list(rank=rank)
for s in ls:
if s['id'] == client_id:
return False
diff --git a/qa/tasks/cephfs/filesystem.py b/qa/tasks/cephfs/filesystem.py
index 2b7fd2ee569..3846ef23f97 100644
--- a/qa/tasks/cephfs/filesystem.py
+++ b/qa/tasks/cephfs/filesystem.py
@@ -649,6 +649,8 @@ class FilesystemBase(MDSClusterBase):
def set_session_timeout(self, timeout):
self.set_var("session_timeout", "%d" % timeout)
+ def set_session_autoclose(self, autoclose_time):
+ self.set_var("session_autoclose", "%d" % autoclose_time)
def set_allow_standby_replay(self, yes):
self.set_var("allow_standby_replay", yes)
diff --git a/qa/tasks/cephfs/test_exports.py b/qa/tasks/cephfs/test_exports.py
index 16de379f54f..468378fce3d 100644
--- a/qa/tasks/cephfs/test_exports.py
+++ b/qa/tasks/cephfs/test_exports.py
@@ -4,6 +4,7 @@ import time
from tasks.cephfs.fuse_mount import FuseMount
from tasks.cephfs.cephfs_test_case import CephFSTestCase
from teuthology.exceptions import CommandFailedError
+from teuthology.contextutil import safe_while, MaxWhileTries
log = logging.getLogger(__name__)
@@ -152,6 +153,8 @@ class TestExportPin(CephFSTestCase):
# vstart.sh sets mds_debug_subtrees to True. That causes a ESubtreeMap
# to be written out every event. Yuck!
self.config_set('mds', 'mds_debug_subtrees', False)
+ # make sure ESubtreeMap is written frequently enough:
+ self.config_set('mds', 'mds_log_minor_segments_per_major_segment', '4')
self.config_rm('mds', 'mds bal split size') # don't split /top
self.mount_a.run_shell_payload("rm -rf 1")
@@ -628,3 +631,186 @@ done
log.info("{0} migrations have occured due to the cluster resizing".format(count))
# rebalancing from 3 -> 2 may cause half of rank 0/1 to move and all of rank 2
self.assertLessEqual((count/len(subtrees_old)), (1.0/3.0/2.0 + 1.0/3.0/2.0 + 1.0/3.0)*1.25) # aka .66 with 25% overbudget
+
+class TestDumpExportStates(CephFSTestCase):
+ MDSS_REQUIRED = 2
+ CLIENTS_REQUIRED = 1
+
+ EXPORT_STATES = ['locking', 'discovering', 'freezing', 'prepping', 'warning', 'exporting']
+
+ def setUp(self):
+ super().setUp()
+
+ self.fs.set_max_mds(self.MDSS_REQUIRED)
+ self.status = self.fs.wait_for_daemons()
+
+ self.mount_a.run_shell_payload('mkdir -p test/export')
+
+ def tearDown(self):
+ super().tearDown()
+
+ def _wait_for_export_target(self, source, target, sleep=2, timeout=10):
+ try:
+ with safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
+ while proceed():
+ info = self.fs.getinfo().get_rank(self.fs.id, source)
+ log.info(f'waiting for rank {target} to be added to the export target')
+ if target in info['export_targets']:
+ return
+ except MaxWhileTries as e:
+ raise RuntimeError(f'rank {target} has not been added to export target after {timeout}s') from e
+
+ def _dump_export_state(self, rank):
+ states = self.fs.rank_asok(['dump_export_states'], rank=rank, status=self.status)
+ self.assertTrue(type(states) is list)
+ self.assertEqual(len(states), 1)
+ return states[0]
+
+ def _test_base(self, path, source, target, state_index, kill):
+ self.fs.rank_asok(['config', 'set', 'mds_kill_import_at', str(kill)], rank=target, status=self.status)
+
+ self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status)
+ self._wait_for_export_target(source, target)
+
+ target_rank = self.fs.get_rank(rank=target, status=self.status)
+ self.delete_mds_coredump(target_rank['name'])
+
+ state = self._dump_export_state(source)
+
+ self.assertTrue(type(state['tid']) is int)
+ self.assertEqual(state['path'], path)
+ self.assertEqual(state['state'], self.EXPORT_STATES[state_index])
+ self.assertEqual(state['peer'], target)
+
+ return state
+
+ def _test_state_history(self, state):
+ history = state['state_history']
+ self.assertTrue(type(history) is dict)
+ size = 0
+ for name in self.EXPORT_STATES:
+ self.assertTrue(type(history[name]) is dict)
+ size += 1
+ if name == state['state']:
+ break
+ self.assertEqual(len(history), size)
+
+ def _test_freeze_tree(self, state, waiters):
+ self.assertTrue(type(state['freeze_tree_time']) is float)
+ self.assertEqual(state['unfreeze_tree_waiters'], waiters)
+
+ def test_discovering(self):
+ state = self._test_base('/test', 0, 1, 1, 1)
+
+ self._test_state_history(state)
+ self._test_freeze_tree(state, 0)
+
+ self.assertEqual(state['last_cum_auth_pins'], 0)
+ self.assertEqual(state['num_remote_waiters'], 0)
+
+ def test_prepping(self):
+ client_id = self.mount_a.get_global_id()
+
+ state = self._test_base('/test', 0, 1, 3, 3)
+
+ self._test_state_history(state)
+ self._test_freeze_tree(state, 0)
+
+ self.assertEqual(state['flushed_clients'], [client_id])
+ self.assertTrue(type(state['warning_ack_waiting']) is list)
+
+ def test_exporting(self):
+ state = self._test_base('/test', 0, 1, 5, 5)
+
+ self._test_state_history(state)
+ self._test_freeze_tree(state, 0)
+
+ self.assertTrue(type(state['notify_ack_waiting']) is list)
+
+class TestKillExports(CephFSTestCase):
+ MDSS_REQUIRED = 2
+ CLIENTS_REQUIRED = 1
+
+ def setUp(self):
+ CephFSTestCase.setUp(self)
+
+ self.fs.set_max_mds(self.MDSS_REQUIRED)
+ self.status = self.fs.wait_for_daemons()
+
+ self.mount_a.run_shell_payload('mkdir -p test/export')
+
+ def tearDown(self):
+ super().tearDown()
+
+ def _kill_export_as(self, rank, kill):
+ self.fs.rank_asok(['config', 'set', 'mds_kill_export_at', str(kill)], rank=rank, status=self.status)
+
+ def _export_dir(self, path, source, target):
+ self.fs.rank_asok(['export', 'dir', path, str(target)], rank=source, status=self.status)
+
+ def _wait_failover(self):
+ self.wait_until_true(lambda: self.fs.status().hadfailover(self.status), timeout=self.fs.beacon_timeout)
+
+ def _clear_coredump(self, rank):
+ crash_rank = self.fs.get_rank(rank=rank, status=self.status)
+ self.delete_mds_coredump(crash_rank['name'])
+
+ def _run_kill_export(self, kill_at, exporter_rank=0, importer_rank=1, restart=True):
+ self._kill_export_as(exporter_rank, kill_at)
+ self._export_dir("/test", exporter_rank, importer_rank)
+ self._wait_failover()
+ self._clear_coredump(exporter_rank)
+
+ if restart:
+ self.fs.rank_restart(rank=exporter_rank, status=self.status)
+ self.status = self.fs.wait_for_daemons()
+
+ def test_session_cleanup(self):
+ """
+ Test importer's session cleanup after an export subtree task is interrupted.
+ Set 'mds_kill_export_at' to 9 or 10 so that the importer will wait for the exporter
+ to restart while the state is 'acking'.
+
+ See https://tracker.ceph.com/issues/61459
+ """
+
+ kill_export_at = [9, 10]
+
+ exporter_rank = 0
+ importer_rank = 1
+
+ for kill in kill_export_at:
+ log.info(f"kill_export_at: {kill}")
+ self._run_kill_export(kill, exporter_rank, importer_rank)
+
+ if len(self._session_list(importer_rank, self.status)) > 0:
+ client_id = self.mount_a.get_global_id()
+ self.fs.rank_asok(['session', 'evict', "%s" % client_id], rank=importer_rank, status=self.status)
+
+ # timeout if buggy
+ self.wait_until_evicted(client_id, importer_rank)
+
+ # for multiple tests
+ self.mount_a.remount()
+
+ def test_client_eviction(self):
+ # modify the timeout so that we don't have to wait too long
+ timeout = 30
+ self.fs.set_session_timeout(timeout)
+ self.fs.set_session_autoclose(timeout + 5)
+
+ kill_export_at = [9, 10]
+
+ exporter_rank = 0
+ importer_rank = 1
+
+ for kill in kill_export_at:
+ log.info(f"kill_export_at: {kill}")
+ self._run_kill_export(kill, exporter_rank, importer_rank)
+
+ client_id = self.mount_a.get_global_id()
+ self.wait_until_evicted(client_id, importer_rank, timeout + 10)
+ time.sleep(1)
+
+ # failed if buggy
+ self.mount_a.ls()
diff --git a/qa/tasks/cephfs/test_failover.py b/qa/tasks/cephfs/test_failover.py
index 29af1e76a4f..46139163ddd 100644
--- a/qa/tasks/cephfs/test_failover.py
+++ b/qa/tasks/cephfs/test_failover.py
@@ -1,3 +1,4 @@
+import re
import time
import signal
import logging
@@ -342,6 +343,60 @@ class TestClusterResize(CephFSTestCase):
self.fs.wait_for_daemons(timeout=90)
+class TestFailoverBeaconHealth(CephFSTestCase):
+ CLIENTS_REQUIRED = 1
+ MDSS_REQUIRED = 1
+
+ def initiate_journal_replay(self, num_files=100):
+ """ Initiate journal replay by creating files and restarting mds server."""
+
+ self.config_set("mds", "mds_delay_journal_replay_for_testing", "5000")
+ self.mounts[0].test_files = [str(x) for x in range(num_files)]
+ self.mounts[0].create_files()
+ self.fs.fail()
+ self.fs.set_joinable()
+
+ def test_replay_beacon_estimated_time(self):
+ """
+ That beacon emits warning message with estimated time to complete replay
+ """
+ self.initiate_journal_replay()
+ self.wait_for_health("MDS_ESTIMATED_REPLAY_TIME", 60)
+ # remove the config so that replay finishes and the cluster
+ # is HEALTH_OK
+ self.config_rm("mds", "mds_delay_journal_replay_for_testing")
+ self.wait_for_health_clear(timeout=60)
+
+ def test_replay_estimated_time_accuracy(self):
+ self.initiate_journal_replay(250)
+ def replay_complete():
+ health = self.ceph_cluster.mon_manager.get_mon_health(debug=False, detail=True)
+ codes = [s for s in health['checks']]
+ return 'MDS_ESTIMATED_REPLAY_TIME' not in codes
+
+ def get_estimated_time():
+ completion_percentage = 0.0
+ time_duration = pending_duration = 0
+ with safe_while(sleep=5, tries=360) as proceed:
+ while proceed():
+ health = self.ceph_cluster.mon_manager.get_mon_health(debug=False, detail=True)
+ codes = [s for s in health['checks']]
+ if 'MDS_ESTIMATED_REPLAY_TIME' in codes:
+ message = health['checks']['MDS_ESTIMATED_REPLAY_TIME']['detail'][0]['message']
+ ### sample warning string: "mds.a(mds.0): replay: 50.0446% complete - elapsed time: 582s, estimated time remaining: 581s"
+ m = re.match(".* replay: (\d+(\.\d+)?)% complete - elapsed time: (\d+)s, estimated time remaining: (\d+)s", message)
+ if not m:
+ continue
+ completion_percentage = float(m.group(1))
+ time_duration = int(m.group(3))
+ pending_duration = int(m.group(4))
+ log.debug(f"MDS_ESTIMATED_REPLAY_TIME is present in health: {message}, duration: {time_duration}, completion_percentage: {completion_percentage}")
+ if completion_percentage >= 50:
+ return (completion_percentage, time_duration, pending_duration)
+ _, _, pending_duration = get_estimated_time()
+ # wait for 25% more time to avoid false negative failures
+ self.wait_until_true(replay_complete, timeout=pending_duration * 1.25)
+
class TestFailover(CephFSTestCase):
CLIENTS_REQUIRED = 1
MDSS_REQUIRED = 2
diff --git a/qa/tasks/cephfs/test_nfs.py b/qa/tasks/cephfs/test_nfs.py
index 19076ea44b3..0a1c07dce04 100644
--- a/qa/tasks/cephfs/test_nfs.py
+++ b/qa/tasks/cephfs/test_nfs.py
@@ -55,7 +55,7 @@ class TestNFS(MgrTestCase):
"squash": "none",
"security_label": True,
"protocols": [
- 4
+ 3, 4
],
"transports": [
"TCP"
@@ -369,6 +369,45 @@ class TestNFS(MgrTestCase):
except CommandFailedError as e:
self.fail(f"expected read/write of a file to be successful but failed with {e.exitstatus}")
+ def _mnt_nfs(self, pseudo_path, port, ip):
+ '''
+ Mount created export
+ :param pseudo_path: It is the pseudo root name
+ :param port: Port of deployed nfs cluster
+ :param ip: IP of deployed nfs cluster
+ '''
+ tries = 3
+ while True:
+ try:
+ self.ctx.cluster.run(
+ args=['sudo', 'mount', '-t', 'nfs', '-o', f'port={port}',
+ f'{ip}:{pseudo_path}', '/mnt'])
+ break
+ except CommandFailedError:
+ if tries:
+ tries -= 1
+ time.sleep(2)
+ continue
+ raise
+
+ self.ctx.cluster.run(args=['sudo', 'chmod', '1777', '/mnt'])
+
+ def _test_fio(self, pseudo_path, port, ip):
+ '''
+ run fio with libaio on /mnt/fio
+ :param mnt_path: nfs mount point
+ '''
+ try:
+ self._mnt_nfs(pseudo_path, port, ip)
+ self.ctx.cluster.run(args=['mkdir', '/mnt/fio'])
+ fio_cmd=['sudo', 'fio', '--ioengine=libaio', '-directory=/mnt/fio', '--filename=fio.randrw.test', '--name=job', '--bs=16k', '--direct=1', '--group_reporting', '--iodepth=128', '--randrepeat=0', '--norandommap=1', '--thread=2', '--ramp_time=20s', '--offset_increment=5%', '--size=5G', '--time_based', '--runtime=300', '--ramp_time=1s', '--percentage_random=0', '--rw=randrw', '--rwmixread=50']
+ self.ctx.cluster.run(args=fio_cmd)
+ except CommandFailedError as e:
+ self.fail(f"expected fio to be successful but failed with {e.exitstatus}")
+ finally:
+ self.ctx.cluster.run(args=['sudo', 'rm', '-rf', '/mnt/fio'])
+ self.ctx.cluster.run(args=['sudo', 'umount', '/mnt'])
+
def _write_to_read_only_export(self, pseudo_path, port, ip):
'''
Check if write to read only export fails
@@ -627,6 +666,18 @@ class TestNFS(MgrTestCase):
self._test_data_read_write(self.pseudo_path, port, ip)
self._test_delete_cluster()
+ def test_async_io_fio(self):
+ '''
+ Test async io using fio. Expect completion without hang or crash
+ '''
+ self._test_create_cluster()
+ self._create_export(export_id='1', create_fs=True,
+ extra_cmd=['--pseudo-path', self.pseudo_path])
+ port, ip = self._get_port_ip_info()
+ self._check_nfs_cluster_status('running', 'NFS Ganesha cluster restart failed')
+ self._test_fio(self.pseudo_path, port, ip)
+ self._test_delete_cluster()
+
def test_cluster_info(self):
'''
Test cluster info outputs correct ip and hostname