summaryrefslogtreecommitdiffstats
path: root/src/ceph-volume/ceph_volume/util/disk.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/ceph-volume/ceph_volume/util/disk.py')
-rw-r--r--src/ceph-volume/ceph_volume/util/disk.py47
1 files changed, 31 insertions, 16 deletions
diff --git a/src/ceph-volume/ceph_volume/util/disk.py b/src/ceph-volume/ceph_volume/util/disk.py
index 78c140597d6..921e61a4534 100644
--- a/src/ceph-volume/ceph_volume/util/disk.py
+++ b/src/ceph-volume/ceph_volume/util/disk.py
@@ -7,7 +7,7 @@ import json
from ceph_volume import process, allow_loop_devices
from ceph_volume.api import lvm
from ceph_volume.util.system import get_file_contents
-from typing import Dict, List, Any
+from typing import Dict, List, Any, Union, Optional
logger = logging.getLogger(__name__)
@@ -251,7 +251,9 @@ def lsblk(device, columns=None, abspath=False):
return result[0]
-def lsblk_all(device='', columns=None, abspath=False):
+def lsblk_all(device: str = '',
+ columns: Optional[List[str]] = None,
+ abspath: bool = False) -> List[Dict[str, str]]:
"""
Create a dictionary of identifying values for a device using ``lsblk``.
Each supported column is a key, in its *raw* format (all uppercase
@@ -332,7 +334,6 @@ def lsblk_all(device='', columns=None, abspath=False):
if device:
base_command.append('--nodeps')
base_command.append(device)
-
out, err, rc = process.call(base_command)
if rc != 0:
@@ -346,12 +347,21 @@ def lsblk_all(device='', columns=None, abspath=False):
return result
-def is_device(dev):
+def is_device(dev: str) -> bool:
"""
- Boolean to determine if a given device is a block device (**not**
- a partition!)
+ Determines whether the given path corresponds to a block device (not a partition).
+
+ This function checks whether the provided device path represents a valid block device,
+ such as a physical disk (/dev/sda) or an allowed loop device, but excludes partitions
+ (/dev/sdc1). It performs several validation steps, including file existence, path format,
+ device type, and additional checks for loop devices if allowed.
+
+ Args:
+ dev (str): The path to the device (e.g., "/dev/sda").
- For example: /dev/sda would return True, but not /dev/sdc1
+ Returns:
+ bool: True if the path corresponds to a valid block device (not a partition),
+ otherwise False.
"""
if not os.path.exists(dev):
return False
@@ -363,7 +373,7 @@ def is_device(dev):
TYPE = lsblk(dev).get('TYPE')
if TYPE:
- return TYPE in ['disk', 'mpath']
+ return TYPE in ['disk', 'mpath', 'loop']
# fallback to stat
return _stat_is_device(os.lstat(dev).st_mode) and not is_partition(dev)
@@ -857,13 +867,14 @@ def get_devices(_sys_block_path='/sys/block', device=''):
device_slaves = os.listdir(os.path.join(sysdir, 'slaves'))
metadata['partitions'] = get_partitions_facts(sysdir)
+ metadata['device_nodes'] = []
if device_slaves:
- metadata['device_nodes'] = ','.join(device_slaves)
+ metadata['device_nodes'].extend(device_slaves)
else:
if block[2] == 'part':
- metadata['device_nodes'] = block[3]
+ metadata['device_nodes'].append(block[3])
else:
- metadata['device_nodes'] = devname
+ metadata['device_nodes'].append(devname)
metadata['actuators'] = None
if os.path.isdir(sysdir + "/queue/independent_access_ranges/"):
@@ -979,7 +990,7 @@ def _dd_read(device: str, count: int, skip: int = 0) -> str:
return result
-def _dd_write(device: str, data: str, skip: int = 0) -> None:
+def _dd_write(device: str, data: Union[str, bytes], skip: int = 0) -> None:
"""Write bytes to a device
Args:
@@ -991,10 +1002,14 @@ def _dd_write(device: str, data: str, skip: int = 0) -> None:
OSError: If there is an error opening or writing to the device.
Exception: If any other error occurs during the write operation.
"""
+
+ if isinstance(data, str):
+ data = data.encode('utf-8')
+
try:
with open(device, 'r+b') as b:
b.seek(skip)
- b.write(data.encode('utf-8'))
+ b.write(data)
except OSError:
logger.warning(f"Can't write to {device}")
raise
@@ -1370,8 +1385,8 @@ class UdevData:
"""
result: str = self.path
if self.is_lvm:
- vg: str = self.environment.get('DM_VG_NAME')
- lv: str = self.environment.get('DM_LV_NAME')
+ vg: str = self.environment.get('DM_VG_NAME', '')
+ lv: str = self.environment.get('DM_LV_NAME', '')
result = f'/dev/{vg}/{lv}'
return result
@@ -1385,6 +1400,6 @@ class UdevData:
"""
result: str = self.path
if self.is_lvm:
- name: str = self.environment.get('DM_NAME')
+ name: str = self.environment.get('DM_NAME', '')
result = f'/dev/mapper/{name}'
return result