diff options
Diffstat (limited to 'src/ceph-volume/ceph_volume/util/disk.py')
-rw-r--r-- | src/ceph-volume/ceph_volume/util/disk.py | 520 |
1 files changed, 480 insertions, 40 deletions
diff --git a/src/ceph-volume/ceph_volume/util/disk.py b/src/ceph-volume/ceph_volume/util/disk.py index 95d69da8d5e..921e61a4534 100644 --- a/src/ceph-volume/ceph_volume/util/disk.py +++ b/src/ceph-volume/ceph_volume/util/disk.py @@ -3,10 +3,11 @@ import os import re import stat import time -from ceph_volume import process +import json +from ceph_volume import process, allow_loop_devices from ceph_volume.api import lvm from ceph_volume.util.system import get_file_contents -from typing import Dict, List, Any +from typing import Dict, List, Any, Union, Optional logger = logging.getLogger(__name__) @@ -250,7 +251,9 @@ def lsblk(device, columns=None, abspath=False): return result[0] -def lsblk_all(device='', columns=None, abspath=False): +def lsblk_all(device: str = '', + columns: Optional[List[str]] = None, + abspath: bool = False) -> List[Dict[str, str]]: """ Create a dictionary of identifying values for a device using ``lsblk``. Each supported column is a key, in its *raw* format (all uppercase @@ -331,7 +334,6 @@ def lsblk_all(device='', columns=None, abspath=False): if device: base_command.append('--nodeps') base_command.append(device) - out, err, rc = process.call(base_command) if rc != 0: @@ -345,12 +347,21 @@ def lsblk_all(device='', columns=None, abspath=False): return result -def is_device(dev): +def is_device(dev: str) -> bool: """ - Boolean to determine if a given device is a block device (**not** - a partition!) + Determines whether the given path corresponds to a block device (not a partition). + + This function checks whether the provided device path represents a valid block device, + such as a physical disk (/dev/sda) or an allowed loop device, but excludes partitions + (/dev/sdc1). It performs several validation steps, including file existence, path format, + device type, and additional checks for loop devices if allowed. + + Args: + dev (str): The path to the device (e.g., "/dev/sda"). - For example: /dev/sda would return True, but not /dev/sdc1 + Returns: + bool: True if the path corresponds to a valid block device (not a partition), + otherwise False. """ if not os.path.exists(dev): return False @@ -362,7 +373,7 @@ def is_device(dev): TYPE = lsblk(dev).get('TYPE') if TYPE: - return TYPE in ['disk', 'mpath'] + return TYPE in ['disk', 'mpath', 'loop'] # fallback to stat return _stat_is_device(os.lstat(dev).st_mode) and not is_partition(dev) @@ -727,31 +738,6 @@ def is_mapper_device(device_name): return device_name.startswith(('/dev/mapper', '/dev/dm-')) -class AllowLoopDevices(object): - allow = False - warned = False - - @classmethod - def __call__(cls): - val = os.environ.get("CEPH_VOLUME_ALLOW_LOOP_DEVICES", "false").lower() - if val not in ("false", 'no', '0'): - cls.allow = True - if not cls.warned: - logger.warning( - "CEPH_VOLUME_ALLOW_LOOP_DEVICES is set in your " - "environment, so we will allow the use of unattached loop" - " devices as disks. This feature is intended for " - "development purposes only and will never be supported in" - " production. Issues filed based on this behavior will " - "likely be ignored." - ) - cls.warned = True - return cls.allow - - -allow_loop_devices = AllowLoopDevices() - - def get_block_devs_sysfs(_sys_block_path: str = '/sys/block', _sys_dev_block_path: str = '/sys/dev/block', device: str = '') -> List[List[str]]: def holder_inner_loop() -> bool: for holder in holders: @@ -795,9 +781,20 @@ def get_block_devs_sysfs(_sys_block_path: str = '/sys/block', _sys_dev_block_pat result.append([name, kname, "part", partitions[partition]]) return sorted(result, key=lambda x: x[0]) -def get_partitions(_sys_dev_block_path ='/sys/dev/block') -> List[str]: +def get_partitions(_sys_dev_block_path: str ='/sys/dev/block') -> Dict[str, str]: + """ + Retrieves a dictionary mapping partition system names to their parent device names. + + Args: + _sys_dev_block_path (str, optional): The path to the system's block device directory. + Defaults to '/sys/dev/block'. + + Returns: + Dict[str, str]: A dictionary where the keys are partition system names, and the values are + the corresponding parent device names. + """ devices: List[str] = os.listdir(_sys_dev_block_path) - result: Dict[str, str] = dict() + result: Dict[str, str] = {} for device in devices: device_path: str = os.path.join(_sys_dev_block_path, device) is_partition: bool = int(get_file_contents(os.path.join(device_path, 'partition'), '0')) > 0 @@ -831,7 +828,7 @@ def get_devices(_sys_block_path='/sys/block', device=''): for block in block_devs: metadata: Dict[str, Any] = {} if block[2] == 'lvm': - block[1] = lvm.get_lv_path_from_mapper(block[1]) + block[1] = UdevData(block[1]).slashed_path devname = os.path.basename(block[0]) diskname = block[1] if block[2] not in block_types: @@ -870,13 +867,14 @@ def get_devices(_sys_block_path='/sys/block', device=''): device_slaves = os.listdir(os.path.join(sysdir, 'slaves')) metadata['partitions'] = get_partitions_facts(sysdir) + metadata['device_nodes'] = [] if device_slaves: - metadata['device_nodes'] = ','.join(device_slaves) + metadata['device_nodes'].extend(device_slaves) else: if block[2] == 'part': - metadata['device_nodes'] = block[3] + metadata['device_nodes'].append(block[3]) else: - metadata['device_nodes'] = devname + metadata['device_nodes'].append(devname) metadata['actuators'] = None if os.path.isdir(sysdir + "/queue/independent_access_ranges/"): @@ -963,3 +961,445 @@ def get_lvm_mappers(sys_block_path: str = '/sys/block') -> List[str]: result.append(f'/dev/mapper/{name.strip()}') result.append(f'/dev/{device}') return result + +def _dd_read(device: str, count: int, skip: int = 0) -> str: + """Read bytes from a device + + Args: + device (str): The device to read bytes from. + count (int): The number of bytes to read. + skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0. + + Returns: + str: A string containing the read bytes. + """ + result: str = '' + try: + with open(device, 'rb') as b: + b.seek(skip) + data: bytes = b.read(count) + result = data.decode('utf-8').replace('\x00', '') + except OSError: + logger.warning(f"Can't read from {device}") + pass + except UnicodeDecodeError: + pass + except Exception as e: + logger.error(f"An error occurred while reading from {device}: {e}") + raise + + return result + +def _dd_write(device: str, data: Union[str, bytes], skip: int = 0) -> None: + """Write bytes to a device + + Args: + device (str): The device to write bytes to. + data (str): The data to write to the device. + skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0. + + Raises: + OSError: If there is an error opening or writing to the device. + Exception: If any other error occurs during the write operation. + """ + + if isinstance(data, str): + data = data.encode('utf-8') + + try: + with open(device, 'r+b') as b: + b.seek(skip) + b.write(data) + except OSError: + logger.warning(f"Can't write to {device}") + raise + except Exception as e: + logger.error(f"An error occurred while writing to {device}: {e}") + raise + +def get_bluestore_header(device: str) -> Dict[str, Any]: + """Retrieve BlueStore header information from a given device. + + This function retrieves BlueStore header information from the specified 'device'. + It first checks if the device exists. If the device does not exist, a RuntimeError + is raised. Then, it calls the 'ceph-bluestore-tool' command to show the label + information of the device. If the command execution is successful, it parses the + JSON output containing the BlueStore header information and returns it as a dictionary. + + Args: + device (str): The path to the device. + + Returns: + Dict[str, Any]: A dictionary containing BlueStore header information. + """ + data: Dict[str, Any] = {} + + if os.path.exists(device): + out, err, rc = process.call([ + 'ceph-bluestore-tool', 'show-label', + '--dev', device], verbose_on_failure=False) + if rc: + logger.debug(f'device {device} is not BlueStore; ceph-bluestore-tool failed to get info from device: {out}\n{err}') + else: + data = json.loads(''.join(out)) + else: + logger.warning(f'device {device} not found.') + return data + +def bluestore_info(device: str, bluestore_labels: Dict[str, Any]) -> Dict[str, Any]: + """Build a dict representation of a BlueStore header + + Args: + device (str): The path of the BlueStore device. + bluestore_labels (Dict[str, Any]): Plain text output from `ceph-bluestore-tool show-label` + + Returns: + Dict[str, Any]: Generated dict representation of the BlueStore header + """ + result: Dict[str, Any] = {} + result['osd_uuid'] = bluestore_labels[device]['osd_uuid'] + if bluestore_labels[device]['description'] == 'main': + whoami = bluestore_labels[device]['whoami'] + result.update({ + 'type': bluestore_labels[device].get('type', 'bluestore'), + 'osd_id': int(whoami), + 'ceph_fsid': bluestore_labels[device]['ceph_fsid'], + 'device': device, + }) + if bluestore_labels[device].get('db_device_uuid', ''): + result['db_device_uuid'] = bluestore_labels[device].get('db_device_uuid') + if bluestore_labels[device].get('wal_device_uuid', ''): + result['wal_device_uuid'] = bluestore_labels[device].get('wal_device_uuid') + elif bluestore_labels[device]['description'] == 'bluefs db': + result['device_db'] = device + elif bluestore_labels[device]['description'] == 'bluefs wal': + result['device_wal'] = device + return result + +def get_block_device_holders(sys_block: str = '/sys/block') -> Dict[str, Any]: + """Get a dictionary of device mappers with their corresponding parent devices. + + This function retrieves information about device mappers and their parent devices + from the '/sys/block' directory. It iterates through each directory within 'sys_block', + and for each directory, it checks if a 'holders' directory exists. If so, it lists + the contents of the 'holders' directory and constructs a dictionary where the keys + are the device mappers and the values are their corresponding parent devices. + + Args: + sys_block (str, optional): The path to the '/sys/block' directory. Defaults to '/sys/block'. + + Returns: + Dict[str, Any]: A dictionary where keys are device mappers (e.g., '/dev/mapper/...') and + values are their corresponding parent devices (e.g., '/dev/sdX'). + """ + result: Dict[str, Any] = {} + for b in os.listdir(sys_block): + path: str = os.path.join(sys_block, b, 'holders') + if os.path.exists(path): + for h in os.listdir(path): + result[f'/dev/{h}'] = f'/dev/{b}' + + return result + +def has_holders(device: str) -> bool: + """Check if a given device has any associated holders. + + This function determines whether the specified device has associated holders + (e.g., other devices that depend on it) by checking if the device's real path + appears in the values of the dictionary returned by `get_block_device_holders`. + + Args: + device (str): The path to the device (e.g., '/dev/sdX') to check. + + Returns: + bool: True if the device has holders, False otherwise. + """ + return os.path.realpath(device) in get_block_device_holders().values() + +def get_parent_device_from_mapper(mapper: str, abspath: bool = True) -> str: + """Get the parent device corresponding to a given device mapper. + + This function retrieves the parent device corresponding to a given device mapper + from the dictionary returned by the 'get_block_device_holders' function. It first + checks if the specified 'mapper' exists. If it does, it resolves the real path of + the mapper using 'os.path.realpath'. Then, it attempts to retrieve the parent device + from the dictionary. If the mapper is not found in the dictionary, an empty string + is returned. + + Args: + mapper (str): The path to the device mapper. + abspath (bool, optional): If True (default), returns the absolute path of the parent device. + If False, returns only the basename of the parent device. + + Returns: + str: The parent device corresponding to the given device mapper, or an empty string + if the mapper is not found in the dictionary of device mappers. + """ + result: str = '' + if os.path.exists(mapper): + _mapper: str = os.path.realpath(mapper) + try: + result = get_block_device_holders()[_mapper] + if not abspath: + result = os.path.basename(result) + except KeyError: + pass + return result + +def get_lvm_mapper_path_from_dm(path: str, sys_block: str = '/sys/block') -> str: + """Retrieve the logical volume path for a given device. + + This function takes the path of a device and returns the corresponding + logical volume path by reading the 'dm/name' file within the sysfs + directory. + + Args: + path (str): The device path for which to retrieve the logical volume path. + sys_block (str, optional): The base sysfs block directory. Defaults to '/sys/block'. + + Returns: + str: The device mapper path in the 'dashed form' of '/dev/mapper/vg-lv'. + """ + result: str = '' + dev: str = os.path.basename(path) + sys_block_path: str = os.path.join(sys_block, dev, 'dm/name') + if os.path.exists(sys_block_path): + with open(sys_block_path, 'r') as f: + content: str = f.read() + result = f'/dev/mapper/{content}' + return result.strip() + + +class BlockSysFs: + def __init__(self, + path: str, + sys_dev_block: str = '/sys/dev/block', + sys_block: str = '/sys/block') -> None: + """ + Initializes a BlockSysFs object. + + Args: + path (str): The path to the block device. + sys_dev_block (str, optional): Path to the sysfs directory containing block devices. + Defaults to '/sys/dev/block'. + sys_block (str, optional): Path to the sysfs directory containing block information. + Defaults to '/sys/block'. + """ + self.path: str = path + self.name: str = os.path.basename(os.path.realpath(self.path)) + self.sys_dev_block: str = sys_dev_block + self.sys_block: str = sys_block + + @property + def is_partition(self) -> bool: + """ + Checks if the current block device is a partition. + + Returns: + bool: True if it is a partition, False otherwise. + """ + path: str = os.path.join(self.get_sys_dev_block_path, 'partition') + return os.path.exists(path) + + @property + def holders(self) -> List[str]: + """ + Retrieves the holders of the current block device. + + Returns: + List[str]: A list of holders (other devices) associated with this block device. + """ + result: List[str] = [] + path: str = os.path.join(self.get_sys_dev_block_path, 'holders') + if os.path.exists(path): + result = os.listdir(path) + return result + + @property + def get_sys_dev_block_path(self) -> str: + """ + Gets the sysfs path for the current block device. + + Returns: + str: The sysfs path corresponding to this block device. + """ + sys_dev_block_path: str = '' + devices: List[str] = os.listdir(self.sys_dev_block) + for device in devices: + path = os.path.join(self.sys_dev_block, device) + if os.path.realpath(path).split('/')[-1:][0] == self.name: + sys_dev_block_path = path + return sys_dev_block_path + + @property + def has_active_mappers(self) -> bool: + """ + Checks if there are any active device mappers for the current block device. + + Returns: + bool: True if active mappers exist, False otherwise. + """ + return len(self.active_mappers()) > 0 + + @property + def has_active_dmcrypt_mapper(self) -> bool: + """ + Checks if there is an active dm-crypt (disk encryption) mapper for the current block device. + + Returns: + bool: True if an active dm-crypt mapper exists, False otherwise. + """ + return any(value.get('type') == 'CRYPT' for value in self.active_mappers().values()) + + def active_mappers(self) -> Dict[str, Any]: + """ + Retrieves information about active device mappers for the current block device. + + Returns: + Dict[str, Any]: A dictionary containing details about active device mappers. + Keys are the holders, and values provide details like type, + dm-crypt metadata, and LVM UUIDs. + """ + result: Dict[str, Any] = {} + for holder in self.holders: + path: str = os.path.join(self.sys_block, holder, 'dm/uuid') + if os.path.exists(path): + result[holder] = {} + with open(path, 'r') as f: + content: str = f.read().strip() + content_split: List[str] = content.split('-', maxsplit=3) + mapper_type: str = content_split[0] + result[holder]['type'] = mapper_type + if mapper_type == 'CRYPT': + result[holder]['dmcrypt_type'] = content_split[1] + result[holder]['dmcrypt_uuid'] = content_split[2] + result[holder]['dmcrypt_mapping'] = content_split[3] + if mapper_type == 'LVM': + result[holder]['uuid'] = content_split[1] + return result + +class UdevData: + """ + Class representing udev data for a specific device. + This class extracts and stores relevant information about the device from udev files. + + Attributes: + ----------- + path : str + The initial device path (e.g., /dev/sda). + realpath : str + The resolved real path of the device. + stats : os.stat_result + The result of the os.stat() call to retrieve device metadata. + major : int + The device's major number. + minor : int + The device's minor number. + udev_data_path : str + The path to the udev metadata for the device (e.g., /run/udev/data/b<major>:<minor>). + symlinks : List[str] + A list of symbolic links pointing to the device. + id : str + A unique identifier for the device. + environment : Dict[str, str] + A dictionary containing environment variables extracted from the udev data. + group : str + The group associated with the device. + queue : str + The queue associated with the device. + version : str + The version of the device or its metadata. + """ + def __init__(self, path: str) -> None: + """Initialize an instance of the UdevData class and load udev information. + + Args: + path (str): The path to the device to be analyzed (e.g., /dev/sda). + + Raises: + RuntimeError: Raised if no udev data file is found for the specified device. + """ + if not os.path.exists(path): + raise RuntimeError(f'{path} not found.') + self.path: str = path + self.realpath: str = os.path.realpath(self.path) + self.stats: os.stat_result = os.stat(self.realpath) + self.major: int = os.major(self.stats.st_rdev) + self.minor: int = os.minor(self.stats.st_rdev) + self.udev_data_path: str = f'/run/udev/data/b{self.major}:{self.minor}' + self.symlinks: List[str] = [] + self.id: str = '' + self.environment: Dict[str, str] = {} + self.group: str = '' + self.queue: str = '' + self.version: str = '' + + if not os.path.exists(self.udev_data_path): + raise RuntimeError(f'No udev data could be retrieved for {self.path}') + + with open(self.udev_data_path, 'r') as f: + content: str = f.read().strip() + self.raw_data: List[str] = content.split('\n') + + for line in self.raw_data: + data_type, data = line.split(':', 1) + if data_type == 'S': + self.symlinks.append(data) + if data_type == 'I': + self.id = data + if data_type == 'E': + key, value = data.split('=') + self.environment[key] = value + if data_type == 'G': + self.group = data + if data_type == 'Q': + self.queue = data + if data_type == 'V': + self.version = data + + @property + def is_dm(self) -> bool: + """Check if the device is a device mapper (DM). + + Returns: + bool: True if the device is a device mapper, otherwise False. + """ + return 'DM_UUID' in self.environment.keys() + + @property + def is_lvm(self) -> bool: + """Check if the device is a Logical Volume Manager (LVM) volume. + + Returns: + bool: True if the device is an LVM volume, otherwise False. + """ + return self.environment.get('DM_UUID', '').startswith('LVM') + + @property + def slashed_path(self) -> str: + """Get the LVM path structured with slashes. + + Returns: + str: A path using slashes if the device is an LVM volume (e.g., /dev/vgname/lvname), + otherwise the original path. + """ + result: str = self.path + if self.is_lvm: + vg: str = self.environment.get('DM_VG_NAME', '') + lv: str = self.environment.get('DM_LV_NAME', '') + result = f'/dev/{vg}/{lv}' + return result + + @property + def dashed_path(self) -> str: + """Get the LVM path structured with dashes. + + Returns: + str: A path using dashes if the device is an LVM volume (e.g., /dev/mapper/vgname-lvname), + otherwise the original path. + """ + result: str = self.path + if self.is_lvm: + name: str = self.environment.get('DM_NAME', '') + result = f'/dev/mapper/{name}' + return result |