summaryrefslogtreecommitdiffstats
path: root/src/ceph-volume/ceph_volume/util/disk.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph-volume/ceph_volume/util/disk.py')
-rw-r--r--src/ceph-volume/ceph_volume/util/disk.py520
1 files changed, 480 insertions, 40 deletions
diff --git a/src/ceph-volume/ceph_volume/util/disk.py b/src/ceph-volume/ceph_volume/util/disk.py
index 95d69da8d5e..921e61a4534 100644
--- a/src/ceph-volume/ceph_volume/util/disk.py
+++ b/src/ceph-volume/ceph_volume/util/disk.py
@@ -3,10 +3,11 @@ import os
import re
import stat
import time
-from ceph_volume import process
+import json
+from ceph_volume import process, allow_loop_devices
from ceph_volume.api import lvm
from ceph_volume.util.system import get_file_contents
-from typing import Dict, List, Any
+from typing import Dict, List, Any, Union, Optional
logger = logging.getLogger(__name__)
@@ -250,7 +251,9 @@ def lsblk(device, columns=None, abspath=False):
return result[0]
-def lsblk_all(device='', columns=None, abspath=False):
+def lsblk_all(device: str = '',
+ columns: Optional[List[str]] = None,
+ abspath: bool = False) -> List[Dict[str, str]]:
"""
Create a dictionary of identifying values for a device using ``lsblk``.
Each supported column is a key, in its *raw* format (all uppercase
@@ -331,7 +334,6 @@ def lsblk_all(device='', columns=None, abspath=False):
if device:
base_command.append('--nodeps')
base_command.append(device)
-
out, err, rc = process.call(base_command)
if rc != 0:
@@ -345,12 +347,21 @@ def lsblk_all(device='', columns=None, abspath=False):
return result
-def is_device(dev):
+def is_device(dev: str) -> bool:
"""
- Boolean to determine if a given device is a block device (**not**
- a partition!)
+ Determines whether the given path corresponds to a block device (not a partition).
+
+ This function checks whether the provided device path represents a valid block device,
+ such as a physical disk (/dev/sda) or an allowed loop device, but excludes partitions
+ (/dev/sdc1). It performs several validation steps, including file existence, path format,
+ device type, and additional checks for loop devices if allowed.
+
+ Args:
+ dev (str): The path to the device (e.g., "/dev/sda").
- For example: /dev/sda would return True, but not /dev/sdc1
+ Returns:
+ bool: True if the path corresponds to a valid block device (not a partition),
+ otherwise False.
"""
if not os.path.exists(dev):
return False
@@ -362,7 +373,7 @@ def is_device(dev):
TYPE = lsblk(dev).get('TYPE')
if TYPE:
- return TYPE in ['disk', 'mpath']
+ return TYPE in ['disk', 'mpath', 'loop']
# fallback to stat
return _stat_is_device(os.lstat(dev).st_mode) and not is_partition(dev)
@@ -727,31 +738,6 @@ def is_mapper_device(device_name):
return device_name.startswith(('/dev/mapper', '/dev/dm-'))
-class AllowLoopDevices(object):
- allow = False
- warned = False
-
- @classmethod
- def __call__(cls):
- val = os.environ.get("CEPH_VOLUME_ALLOW_LOOP_DEVICES", "false").lower()
- if val not in ("false", 'no', '0'):
- cls.allow = True
- if not cls.warned:
- logger.warning(
- "CEPH_VOLUME_ALLOW_LOOP_DEVICES is set in your "
- "environment, so we will allow the use of unattached loop"
- " devices as disks. This feature is intended for "
- "development purposes only and will never be supported in"
- " production. Issues filed based on this behavior will "
- "likely be ignored."
- )
- cls.warned = True
- return cls.allow
-
-
-allow_loop_devices = AllowLoopDevices()
-
-
def get_block_devs_sysfs(_sys_block_path: str = '/sys/block', _sys_dev_block_path: str = '/sys/dev/block', device: str = '') -> List[List[str]]:
def holder_inner_loop() -> bool:
for holder in holders:
@@ -795,9 +781,20 @@ def get_block_devs_sysfs(_sys_block_path: str = '/sys/block', _sys_dev_block_pat
result.append([name, kname, "part", partitions[partition]])
return sorted(result, key=lambda x: x[0])
-def get_partitions(_sys_dev_block_path ='/sys/dev/block') -> List[str]:
+def get_partitions(_sys_dev_block_path: str ='/sys/dev/block') -> Dict[str, str]:
+ """
+ Retrieves a dictionary mapping partition system names to their parent device names.
+
+ Args:
+ _sys_dev_block_path (str, optional): The path to the system's block device directory.
+ Defaults to '/sys/dev/block'.
+
+ Returns:
+ Dict[str, str]: A dictionary where the keys are partition system names, and the values are
+ the corresponding parent device names.
+ """
devices: List[str] = os.listdir(_sys_dev_block_path)
- result: Dict[str, str] = dict()
+ result: Dict[str, str] = {}
for device in devices:
device_path: str = os.path.join(_sys_dev_block_path, device)
is_partition: bool = int(get_file_contents(os.path.join(device_path, 'partition'), '0')) > 0
@@ -831,7 +828,7 @@ def get_devices(_sys_block_path='/sys/block', device=''):
for block in block_devs:
metadata: Dict[str, Any] = {}
if block[2] == 'lvm':
- block[1] = lvm.get_lv_path_from_mapper(block[1])
+ block[1] = UdevData(block[1]).slashed_path
devname = os.path.basename(block[0])
diskname = block[1]
if block[2] not in block_types:
@@ -870,13 +867,14 @@ def get_devices(_sys_block_path='/sys/block', device=''):
device_slaves = os.listdir(os.path.join(sysdir, 'slaves'))
metadata['partitions'] = get_partitions_facts(sysdir)
+ metadata['device_nodes'] = []
if device_slaves:
- metadata['device_nodes'] = ','.join(device_slaves)
+ metadata['device_nodes'].extend(device_slaves)
else:
if block[2] == 'part':
- metadata['device_nodes'] = block[3]
+ metadata['device_nodes'].append(block[3])
else:
- metadata['device_nodes'] = devname
+ metadata['device_nodes'].append(devname)
metadata['actuators'] = None
if os.path.isdir(sysdir + "/queue/independent_access_ranges/"):
@@ -963,3 +961,445 @@ def get_lvm_mappers(sys_block_path: str = '/sys/block') -> List[str]:
result.append(f'/dev/mapper/{name.strip()}')
result.append(f'/dev/{device}')
return result
+
+def _dd_read(device: str, count: int, skip: int = 0) -> str:
+ """Read bytes from a device
+
+ Args:
+ device (str): The device to read bytes from.
+ count (int): The number of bytes to read.
+ skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0.
+
+ Returns:
+ str: A string containing the read bytes.
+ """
+ result: str = ''
+ try:
+ with open(device, 'rb') as b:
+ b.seek(skip)
+ data: bytes = b.read(count)
+ result = data.decode('utf-8').replace('\x00', '')
+ except OSError:
+ logger.warning(f"Can't read from {device}")
+ pass
+ except UnicodeDecodeError:
+ pass
+ except Exception as e:
+ logger.error(f"An error occurred while reading from {device}: {e}")
+ raise
+
+ return result
+
+def _dd_write(device: str, data: Union[str, bytes], skip: int = 0) -> None:
+ """Write bytes to a device
+
+ Args:
+ device (str): The device to write bytes to.
+ data (str): The data to write to the device.
+ skip (int, optional): The number of bytes to skip at the beginning. Defaults to 0.
+
+ Raises:
+ OSError: If there is an error opening or writing to the device.
+ Exception: If any other error occurs during the write operation.
+ """
+
+ if isinstance(data, str):
+ data = data.encode('utf-8')
+
+ try:
+ with open(device, 'r+b') as b:
+ b.seek(skip)
+ b.write(data)
+ except OSError:
+ logger.warning(f"Can't write to {device}")
+ raise
+ except Exception as e:
+ logger.error(f"An error occurred while writing to {device}: {e}")
+ raise
+
+def get_bluestore_header(device: str) -> Dict[str, Any]:
+ """Retrieve BlueStore header information from a given device.
+
+ This function retrieves BlueStore header information from the specified 'device'.
+ It first checks if the device exists. If the device does not exist, a RuntimeError
+ is raised. Then, it calls the 'ceph-bluestore-tool' command to show the label
+ information of the device. If the command execution is successful, it parses the
+ JSON output containing the BlueStore header information and returns it as a dictionary.
+
+ Args:
+ device (str): The path to the device.
+
+ Returns:
+ Dict[str, Any]: A dictionary containing BlueStore header information.
+ """
+ data: Dict[str, Any] = {}
+
+ if os.path.exists(device):
+ out, err, rc = process.call([
+ 'ceph-bluestore-tool', 'show-label',
+ '--dev', device], verbose_on_failure=False)
+ if rc:
+ logger.debug(f'device {device} is not BlueStore; ceph-bluestore-tool failed to get info from device: {out}\n{err}')
+ else:
+ data = json.loads(''.join(out))
+ else:
+ logger.warning(f'device {device} not found.')
+ return data
+
+def bluestore_info(device: str, bluestore_labels: Dict[str, Any]) -> Dict[str, Any]:
+ """Build a dict representation of a BlueStore header
+
+ Args:
+ device (str): The path of the BlueStore device.
+ bluestore_labels (Dict[str, Any]): Plain text output from `ceph-bluestore-tool show-label`
+
+ Returns:
+ Dict[str, Any]: Generated dict representation of the BlueStore header
+ """
+ result: Dict[str, Any] = {}
+ result['osd_uuid'] = bluestore_labels[device]['osd_uuid']
+ if bluestore_labels[device]['description'] == 'main':
+ whoami = bluestore_labels[device]['whoami']
+ result.update({
+ 'type': bluestore_labels[device].get('type', 'bluestore'),
+ 'osd_id': int(whoami),
+ 'ceph_fsid': bluestore_labels[device]['ceph_fsid'],
+ 'device': device,
+ })
+ if bluestore_labels[device].get('db_device_uuid', ''):
+ result['db_device_uuid'] = bluestore_labels[device].get('db_device_uuid')
+ if bluestore_labels[device].get('wal_device_uuid', ''):
+ result['wal_device_uuid'] = bluestore_labels[device].get('wal_device_uuid')
+ elif bluestore_labels[device]['description'] == 'bluefs db':
+ result['device_db'] = device
+ elif bluestore_labels[device]['description'] == 'bluefs wal':
+ result['device_wal'] = device
+ return result
+
+def get_block_device_holders(sys_block: str = '/sys/block') -> Dict[str, Any]:
+ """Get a dictionary of device mappers with their corresponding parent devices.
+
+ This function retrieves information about device mappers and their parent devices
+ from the '/sys/block' directory. It iterates through each directory within 'sys_block',
+ and for each directory, it checks if a 'holders' directory exists. If so, it lists
+ the contents of the 'holders' directory and constructs a dictionary where the keys
+ are the device mappers and the values are their corresponding parent devices.
+
+ Args:
+ sys_block (str, optional): The path to the '/sys/block' directory. Defaults to '/sys/block'.
+
+ Returns:
+ Dict[str, Any]: A dictionary where keys are device mappers (e.g., '/dev/mapper/...') and
+ values are their corresponding parent devices (e.g., '/dev/sdX').
+ """
+ result: Dict[str, Any] = {}
+ for b in os.listdir(sys_block):
+ path: str = os.path.join(sys_block, b, 'holders')
+ if os.path.exists(path):
+ for h in os.listdir(path):
+ result[f'/dev/{h}'] = f'/dev/{b}'
+
+ return result
+
+def has_holders(device: str) -> bool:
+ """Check if a given device has any associated holders.
+
+ This function determines whether the specified device has associated holders
+ (e.g., other devices that depend on it) by checking if the device's real path
+ appears in the values of the dictionary returned by `get_block_device_holders`.
+
+ Args:
+ device (str): The path to the device (e.g., '/dev/sdX') to check.
+
+ Returns:
+ bool: True if the device has holders, False otherwise.
+ """
+ return os.path.realpath(device) in get_block_device_holders().values()
+
+def get_parent_device_from_mapper(mapper: str, abspath: bool = True) -> str:
+ """Get the parent device corresponding to a given device mapper.
+
+ This function retrieves the parent device corresponding to a given device mapper
+ from the dictionary returned by the 'get_block_device_holders' function. It first
+ checks if the specified 'mapper' exists. If it does, it resolves the real path of
+ the mapper using 'os.path.realpath'. Then, it attempts to retrieve the parent device
+ from the dictionary. If the mapper is not found in the dictionary, an empty string
+ is returned.
+
+ Args:
+ mapper (str): The path to the device mapper.
+ abspath (bool, optional): If True (default), returns the absolute path of the parent device.
+ If False, returns only the basename of the parent device.
+
+ Returns:
+ str: The parent device corresponding to the given device mapper, or an empty string
+ if the mapper is not found in the dictionary of device mappers.
+ """
+ result: str = ''
+ if os.path.exists(mapper):
+ _mapper: str = os.path.realpath(mapper)
+ try:
+ result = get_block_device_holders()[_mapper]
+ if not abspath:
+ result = os.path.basename(result)
+ except KeyError:
+ pass
+ return result
+
+def get_lvm_mapper_path_from_dm(path: str, sys_block: str = '/sys/block') -> str:
+ """Retrieve the logical volume path for a given device.
+
+ This function takes the path of a device and returns the corresponding
+ logical volume path by reading the 'dm/name' file within the sysfs
+ directory.
+
+ Args:
+ path (str): The device path for which to retrieve the logical volume path.
+ sys_block (str, optional): The base sysfs block directory. Defaults to '/sys/block'.
+
+ Returns:
+ str: The device mapper path in the 'dashed form' of '/dev/mapper/vg-lv'.
+ """
+ result: str = ''
+ dev: str = os.path.basename(path)
+ sys_block_path: str = os.path.join(sys_block, dev, 'dm/name')
+ if os.path.exists(sys_block_path):
+ with open(sys_block_path, 'r') as f:
+ content: str = f.read()
+ result = f'/dev/mapper/{content}'
+ return result.strip()
+
+
+class BlockSysFs:
+ def __init__(self,
+ path: str,
+ sys_dev_block: str = '/sys/dev/block',
+ sys_block: str = '/sys/block') -> None:
+ """
+ Initializes a BlockSysFs object.
+
+ Args:
+ path (str): The path to the block device.
+ sys_dev_block (str, optional): Path to the sysfs directory containing block devices.
+ Defaults to '/sys/dev/block'.
+ sys_block (str, optional): Path to the sysfs directory containing block information.
+ Defaults to '/sys/block'.
+ """
+ self.path: str = path
+ self.name: str = os.path.basename(os.path.realpath(self.path))
+ self.sys_dev_block: str = sys_dev_block
+ self.sys_block: str = sys_block
+
+ @property
+ def is_partition(self) -> bool:
+ """
+ Checks if the current block device is a partition.
+
+ Returns:
+ bool: True if it is a partition, False otherwise.
+ """
+ path: str = os.path.join(self.get_sys_dev_block_path, 'partition')
+ return os.path.exists(path)
+
+ @property
+ def holders(self) -> List[str]:
+ """
+ Retrieves the holders of the current block device.
+
+ Returns:
+ List[str]: A list of holders (other devices) associated with this block device.
+ """
+ result: List[str] = []
+ path: str = os.path.join(self.get_sys_dev_block_path, 'holders')
+ if os.path.exists(path):
+ result = os.listdir(path)
+ return result
+
+ @property
+ def get_sys_dev_block_path(self) -> str:
+ """
+ Gets the sysfs path for the current block device.
+
+ Returns:
+ str: The sysfs path corresponding to this block device.
+ """
+ sys_dev_block_path: str = ''
+ devices: List[str] = os.listdir(self.sys_dev_block)
+ for device in devices:
+ path = os.path.join(self.sys_dev_block, device)
+ if os.path.realpath(path).split('/')[-1:][0] == self.name:
+ sys_dev_block_path = path
+ return sys_dev_block_path
+
+ @property
+ def has_active_mappers(self) -> bool:
+ """
+ Checks if there are any active device mappers for the current block device.
+
+ Returns:
+ bool: True if active mappers exist, False otherwise.
+ """
+ return len(self.active_mappers()) > 0
+
+ @property
+ def has_active_dmcrypt_mapper(self) -> bool:
+ """
+ Checks if there is an active dm-crypt (disk encryption) mapper for the current block device.
+
+ Returns:
+ bool: True if an active dm-crypt mapper exists, False otherwise.
+ """
+ return any(value.get('type') == 'CRYPT' for value in self.active_mappers().values())
+
+ def active_mappers(self) -> Dict[str, Any]:
+ """
+ Retrieves information about active device mappers for the current block device.
+
+ Returns:
+ Dict[str, Any]: A dictionary containing details about active device mappers.
+ Keys are the holders, and values provide details like type,
+ dm-crypt metadata, and LVM UUIDs.
+ """
+ result: Dict[str, Any] = {}
+ for holder in self.holders:
+ path: str = os.path.join(self.sys_block, holder, 'dm/uuid')
+ if os.path.exists(path):
+ result[holder] = {}
+ with open(path, 'r') as f:
+ content: str = f.read().strip()
+ content_split: List[str] = content.split('-', maxsplit=3)
+ mapper_type: str = content_split[0]
+ result[holder]['type'] = mapper_type
+ if mapper_type == 'CRYPT':
+ result[holder]['dmcrypt_type'] = content_split[1]
+ result[holder]['dmcrypt_uuid'] = content_split[2]
+ result[holder]['dmcrypt_mapping'] = content_split[3]
+ if mapper_type == 'LVM':
+ result[holder]['uuid'] = content_split[1]
+ return result
+
+class UdevData:
+ """
+ Class representing udev data for a specific device.
+ This class extracts and stores relevant information about the device from udev files.
+
+ Attributes:
+ -----------
+ path : str
+ The initial device path (e.g., /dev/sda).
+ realpath : str
+ The resolved real path of the device.
+ stats : os.stat_result
+ The result of the os.stat() call to retrieve device metadata.
+ major : int
+ The device's major number.
+ minor : int
+ The device's minor number.
+ udev_data_path : str
+ The path to the udev metadata for the device (e.g., /run/udev/data/b<major>:<minor>).
+ symlinks : List[str]
+ A list of symbolic links pointing to the device.
+ id : str
+ A unique identifier for the device.
+ environment : Dict[str, str]
+ A dictionary containing environment variables extracted from the udev data.
+ group : str
+ The group associated with the device.
+ queue : str
+ The queue associated with the device.
+ version : str
+ The version of the device or its metadata.
+ """
+ def __init__(self, path: str) -> None:
+ """Initialize an instance of the UdevData class and load udev information.
+
+ Args:
+ path (str): The path to the device to be analyzed (e.g., /dev/sda).
+
+ Raises:
+ RuntimeError: Raised if no udev data file is found for the specified device.
+ """
+ if not os.path.exists(path):
+ raise RuntimeError(f'{path} not found.')
+ self.path: str = path
+ self.realpath: str = os.path.realpath(self.path)
+ self.stats: os.stat_result = os.stat(self.realpath)
+ self.major: int = os.major(self.stats.st_rdev)
+ self.minor: int = os.minor(self.stats.st_rdev)
+ self.udev_data_path: str = f'/run/udev/data/b{self.major}:{self.minor}'
+ self.symlinks: List[str] = []
+ self.id: str = ''
+ self.environment: Dict[str, str] = {}
+ self.group: str = ''
+ self.queue: str = ''
+ self.version: str = ''
+
+ if not os.path.exists(self.udev_data_path):
+ raise RuntimeError(f'No udev data could be retrieved for {self.path}')
+
+ with open(self.udev_data_path, 'r') as f:
+ content: str = f.read().strip()
+ self.raw_data: List[str] = content.split('\n')
+
+ for line in self.raw_data:
+ data_type, data = line.split(':', 1)
+ if data_type == 'S':
+ self.symlinks.append(data)
+ if data_type == 'I':
+ self.id = data
+ if data_type == 'E':
+ key, value = data.split('=')
+ self.environment[key] = value
+ if data_type == 'G':
+ self.group = data
+ if data_type == 'Q':
+ self.queue = data
+ if data_type == 'V':
+ self.version = data
+
+ @property
+ def is_dm(self) -> bool:
+ """Check if the device is a device mapper (DM).
+
+ Returns:
+ bool: True if the device is a device mapper, otherwise False.
+ """
+ return 'DM_UUID' in self.environment.keys()
+
+ @property
+ def is_lvm(self) -> bool:
+ """Check if the device is a Logical Volume Manager (LVM) volume.
+
+ Returns:
+ bool: True if the device is an LVM volume, otherwise False.
+ """
+ return self.environment.get('DM_UUID', '').startswith('LVM')
+
+ @property
+ def slashed_path(self) -> str:
+ """Get the LVM path structured with slashes.
+
+ Returns:
+ str: A path using slashes if the device is an LVM volume (e.g., /dev/vgname/lvname),
+ otherwise the original path.
+ """
+ result: str = self.path
+ if self.is_lvm:
+ vg: str = self.environment.get('DM_VG_NAME', '')
+ lv: str = self.environment.get('DM_LV_NAME', '')
+ result = f'/dev/{vg}/{lv}'
+ return result
+
+ @property
+ def dashed_path(self) -> str:
+ """Get the LVM path structured with dashes.
+
+ Returns:
+ str: A path using dashes if the device is an LVM volume (e.g., /dev/mapper/vgname-lvname),
+ otherwise the original path.
+ """
+ result: str = self.path
+ if self.is_lvm:
+ name: str = self.environment.get('DM_NAME', '')
+ result = f'/dev/mapper/{name}'
+ return result