diff options
Diffstat (limited to 'src/ceph-volume/ceph_volume/util/disk.py')
-rw-r--r-- | src/ceph-volume/ceph_volume/util/disk.py | 47 |
1 files changed, 31 insertions, 16 deletions
diff --git a/src/ceph-volume/ceph_volume/util/disk.py b/src/ceph-volume/ceph_volume/util/disk.py index 78c140597d6..921e61a4534 100644 --- a/src/ceph-volume/ceph_volume/util/disk.py +++ b/src/ceph-volume/ceph_volume/util/disk.py @@ -7,7 +7,7 @@ import json from ceph_volume import process, allow_loop_devices from ceph_volume.api import lvm from ceph_volume.util.system import get_file_contents -from typing import Dict, List, Any +from typing import Dict, List, Any, Union, Optional logger = logging.getLogger(__name__) @@ -251,7 +251,9 @@ def lsblk(device, columns=None, abspath=False): return result[0] -def lsblk_all(device='', columns=None, abspath=False): +def lsblk_all(device: str = '', + columns: Optional[List[str]] = None, + abspath: bool = False) -> List[Dict[str, str]]: """ Create a dictionary of identifying values for a device using ``lsblk``. Each supported column is a key, in its *raw* format (all uppercase @@ -332,7 +334,6 @@ def lsblk_all(device='', columns=None, abspath=False): if device: base_command.append('--nodeps') base_command.append(device) - out, err, rc = process.call(base_command) if rc != 0: @@ -346,12 +347,21 @@ def lsblk_all(device='', columns=None, abspath=False): return result -def is_device(dev): +def is_device(dev: str) -> bool: """ - Boolean to determine if a given device is a block device (**not** - a partition!) + Determines whether the given path corresponds to a block device (not a partition). + + This function checks whether the provided device path represents a valid block device, + such as a physical disk (/dev/sda) or an allowed loop device, but excludes partitions + (/dev/sdc1). It performs several validation steps, including file existence, path format, + device type, and additional checks for loop devices if allowed. + + Args: + dev (str): The path to the device (e.g., "/dev/sda"). - For example: /dev/sda would return True, but not /dev/sdc1 + Returns: + bool: True if the path corresponds to a valid block device (not a partition), + otherwise False. """ if not os.path.exists(dev): return False @@ -363,7 +373,7 @@ def is_device(dev): TYPE = lsblk(dev).get('TYPE') if TYPE: - return TYPE in ['disk', 'mpath'] + return TYPE in ['disk', 'mpath', 'loop'] # fallback to stat return _stat_is_device(os.lstat(dev).st_mode) and not is_partition(dev) @@ -857,13 +867,14 @@ def get_devices(_sys_block_path='/sys/block', device=''): device_slaves = os.listdir(os.path.join(sysdir, 'slaves')) metadata['partitions'] = get_partitions_facts(sysdir) + metadata['device_nodes'] = [] if device_slaves: - metadata['device_nodes'] = ','.join(device_slaves) + metadata['device_nodes'].extend(device_slaves) else: if block[2] == 'part': - metadata['device_nodes'] = block[3] + metadata['device_nodes'].append(block[3]) else: - metadata['device_nodes'] = devname + metadata['device_nodes'].append(devname) metadata['actuators'] = None if os.path.isdir(sysdir + "/queue/independent_access_ranges/"): @@ -979,7 +990,7 @@ def _dd_read(device: str, count: int, skip: int = 0) -> str: return result -def _dd_write(device: str, data: str, skip: int = 0) -> None: +def _dd_write(device: str, data: Union[str, bytes], skip: int = 0) -> None: """Write bytes to a device Args: @@ -991,10 +1002,14 @@ def _dd_write(device: str, data: str, skip: int = 0) -> None: OSError: If there is an error opening or writing to the device. Exception: If any other error occurs during the write operation. """ + + if isinstance(data, str): + data = data.encode('utf-8') + try: with open(device, 'r+b') as b: b.seek(skip) - b.write(data.encode('utf-8')) + b.write(data) except OSError: logger.warning(f"Can't write to {device}") raise @@ -1370,8 +1385,8 @@ class UdevData: """ result: str = self.path if self.is_lvm: - vg: str = self.environment.get('DM_VG_NAME') - lv: str = self.environment.get('DM_LV_NAME') + vg: str = self.environment.get('DM_VG_NAME', '') + lv: str = self.environment.get('DM_LV_NAME', '') result = f'/dev/{vg}/{lv}' return result @@ -1385,6 +1400,6 @@ class UdevData: """ result: str = self.path if self.is_lvm: - name: str = self.environment.get('DM_NAME') + name: str = self.environment.get('DM_NAME', '') result = f'/dev/mapper/{name}' return result |