diff options
Diffstat (limited to 'src/python-common')
19 files changed, 960 insertions, 175 deletions
diff --git a/src/python-common/CMakeLists.txt b/src/python-common/CMakeLists.txt index e89bbe2feef..08660342a6a 100644 --- a/src/python-common/CMakeLists.txt +++ b/src/python-common/CMakeLists.txt @@ -3,5 +3,5 @@ distutils_install_module(ceph) if(WITH_TESTS) include(AddCephTest) - add_tox_test(python-common TOX_ENVS py3 lint) + add_tox_test(python-common TOX_ENVS __tox_defaults__) endif() diff --git a/src/python-common/ceph/cephadm/__init__.py b/src/python-common/ceph/cephadm/__init__.py new file mode 100644 index 00000000000..3c74dfd3941 --- /dev/null +++ b/src/python-common/ceph/cephadm/__init__.py @@ -0,0 +1,2 @@ +# this directory is meant for things that will be shared only between +# the cephadm binary and cephadm mgr module diff --git a/src/python-common/ceph/cephadm/images.py b/src/python-common/ceph/cephadm/images.py new file mode 100644 index 00000000000..5b3c7421205 --- /dev/null +++ b/src/python-common/ceph/cephadm/images.py @@ -0,0 +1,57 @@ +# Default container images ----------------------------------------------------- + +from typing import NamedTuple +from enum import Enum + + +class ContainerImage(NamedTuple): + image_ref: str # reference to default container image + key: str # image key + desc: str # description of image + + def __repr__(self) -> str: + return self.image_ref + + +def _create_image(image_ref: str, key: str) -> ContainerImage: + _img_prefix = 'container_image_' + description = key.replace('_', ' ').capitalize() + return ContainerImage( + image_ref, + f'{_img_prefix}{key}', + f'{description} container image' + ) + + +class DefaultImages(Enum): + PROMETHEUS = _create_image('quay.io/prometheus/prometheus:v2.51.0', 'prometheus') + LOKI = _create_image('docker.io/grafana/loki:3.0.0', 'loki') + PROMTAIL = _create_image('docker.io/grafana/promtail:3.0.0', 'promtail') + NODE_EXPORTER = _create_image('quay.io/prometheus/node-exporter:v1.7.0', 'node_exporter') + ALERTMANAGER = _create_image('quay.io/prometheus/alertmanager:v0.27.0', 'alertmanager') + GRAFANA = _create_image('quay.io/ceph/grafana:10.4.8', 'grafana') + HAPROXY = _create_image('quay.io/ceph/haproxy:2.3', 'haproxy') + KEEPALIVED = _create_image('quay.io/ceph/keepalived:2.2.4', 'keepalived') + NVMEOF = _create_image('quay.io/ceph/nvmeof:1.4', 'nvmeof') + SNMP_GATEWAY = _create_image('docker.io/maxwo/snmp-notifier:v1.2.1', 'snmp_gateway') + ELASTICSEARCH = _create_image('quay.io/omrizeneva/elasticsearch:6.8.23', 'elasticsearch') + JAEGER_COLLECTOR = _create_image('quay.io/jaegertracing/jaeger-collector:1.29', + 'jaeger_collector') + JAEGER_AGENT = _create_image('quay.io/jaegertracing/jaeger-agent:1.29', 'jaeger_agent') + JAEGER_QUERY = _create_image('quay.io/jaegertracing/jaeger-query:1.29', 'jaeger_query') + SAMBA = _create_image('quay.io/samba.org/samba-server:devbuilds-centos-amd64', 'samba') + SAMBA_METRICS = _create_image('quay.io/samba.org/samba-metrics:latest', 'samba_metrics') + NGINX = _create_image('quay.io/ceph/nginx:sclorg-nginx-126', 'nginx') + OAUTH2_PROXY = _create_image('quay.io/oauth2-proxy/oauth2-proxy:v7.6.0', 'oauth2_proxy') + + @property + def image_ref(self) -> str: + return self.value.image_ref + + @property + def key(self) -> str: + return self.value.key + + @property + def desc(self) -> str: + return self.value.desc diff --git a/src/python-common/ceph/deployment/drive_group.py b/src/python-common/ceph/deployment/drive_group.py index cf24fc0efa7..43175aa79fb 100644 --- a/src/python-common/ceph/deployment/drive_group.py +++ b/src/python-common/ceph/deployment/drive_group.py @@ -2,7 +2,7 @@ import enum import yaml from ceph.deployment.inventory import Device -from ceph.deployment.service_spec import ( +from ceph.deployment.service_spec import ( # noqa: F401 (type comments) CustomConfig, GeneralArgList, PlacementSpec, @@ -11,7 +11,7 @@ from ceph.deployment.service_spec import ( from ceph.deployment.hostspec import SpecValidationError try: - from typing import Optional, List, Dict, Any, Union + from typing import Optional, List, Dict, Any, Union # noqa: F401 except ImportError: pass @@ -166,7 +166,7 @@ class DriveGroupSpec(ServiceSpec): """ _supported_features = [ - "encrypted", "block_wal_size", "osds_per_device", + "encrypted", "tpm2", "block_wal_size", "osds_per_device", "db_slots", "wal_slots", "block_db_size", "placement", "service_id", "service_type", "data_devices", "db_devices", "wal_devices", "journal_devices", "data_directories", "osds_per_device", "objectstore", "osd_id_claims", @@ -185,6 +185,7 @@ class DriveGroupSpec(ServiceSpec): osds_per_device=None, # type: Optional[int] objectstore='bluestore', # type: str encrypted=False, # type: bool + tpm2=False, # type: bool db_slots=None, # type: Optional[int] wal_slots=None, # type: Optional[int] osd_id_claims=None, # type: Optional[Dict[str, List[str]]] @@ -248,6 +249,9 @@ class DriveGroupSpec(ServiceSpec): #: ``true`` or ``false`` self.encrypted = encrypted + #: ``true`` or ``false`` + self.tpm2 = tpm2 + #: How many OSDs per DB device self.db_slots = db_slots diff --git a/src/python-common/ceph/deployment/drive_selection/filter.py b/src/python-common/ceph/deployment/drive_selection/filter.py index 0da1b5c3901..28f63ddc2f2 100644 --- a/src/python-common/ceph/deployment/drive_selection/filter.py +++ b/src/python-common/ceph/deployment/drive_selection/filter.py @@ -15,12 +15,10 @@ logger = logging.getLogger(__name__) class FilterGenerator(object): - def __init__(self, device_filter): - # type: (DeviceSelection) -> None + def __init__(self, device_filter: DeviceSelection) -> None: self.device_filter = device_filter - def __iter__(self): - # type: () -> Generator[Matcher, None, None] + def __iter__(self) -> Generator[Matcher, None, None]: if self.device_filter.actuators: yield EqualityMatcher('actuators', self.device_filter.actuators) if self.device_filter.size: diff --git a/src/python-common/ceph/deployment/drive_selection/matchers.py b/src/python-common/ceph/deployment/drive_selection/matchers.py index df502410aeb..a6a2147ce9e 100644 --- a/src/python-common/ceph/deployment/drive_selection/matchers.py +++ b/src/python-common/ceph/deployment/drive_selection/matchers.py @@ -1,8 +1,9 @@ # -*- coding: utf-8 -*- -from typing import Tuple, Optional, Any, Union, Iterator +# TODO: remove noqa and update to python3/mypy style type annotations +from typing import Tuple, Optional, Any, Union, Iterator # noqa: F401 -from ceph.deployment.inventory import Device +from ceph.deployment.inventory import Device # noqa: F401 import re import logging diff --git a/src/python-common/ceph/deployment/drive_selection/selector.py b/src/python-common/ceph/deployment/drive_selection/selector.py index 041f1ed3044..85fc95cf394 100644 --- a/src/python-common/ceph/deployment/drive_selection/selector.py +++ b/src/python-common/ceph/deployment/drive_selection/selector.py @@ -3,7 +3,7 @@ import logging from typing import List, Optional, Dict, Callable from ..inventory import Device -from ..drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError +from ..drive_group import DriveGroupSpec, DeviceSelection, DriveGroupValidationError # noqa: F401 from .filter import FilterGenerator from .matchers import _MatchInvalid @@ -131,6 +131,10 @@ class DriveSelection(object): for disk in self.disks: logger.debug("Processing disk {}".format(disk.path)) + if disk.being_replaced: + logger.debug('Ignoring disk {} as it is being replaced.'.format(disk.path)) + continue + if not disk.available and not disk.ceph_device: logger.debug( ("Ignoring disk {}. " diff --git a/src/python-common/ceph/deployment/inventory.py b/src/python-common/ceph/deployment/inventory.py index a3023882108..29475e94d82 100644 --- a/src/python-common/ceph/deployment/inventory.py +++ b/src/python-common/ceph/deployment/inventory.py @@ -1,5 +1,5 @@ try: - from typing import List, Optional, Dict, Any, Union + from typing import List, Optional, Dict, Any, Union # noqa: F401 except ImportError: pass # for type checking @@ -54,7 +54,8 @@ class Device(object): 'human_readable_type', 'device_id', 'lsm_data', - 'crush_device_class' + 'crush_device_class', + 'being_replaced' ] def __init__(self, @@ -67,7 +68,8 @@ class Device(object): lsm_data=None, # type: Optional[Dict[str, Dict[str, str]]] created=None, # type: Optional[datetime.datetime] ceph_device=None, # type: Optional[bool] - crush_device_class=None # type: Optional[str] + crush_device_class=None, # type: Optional[str] + being_replaced=None, # type: Optional[bool] ): self.path = path @@ -80,6 +82,7 @@ class Device(object): self.created = created if created is not None else datetime_now() self.ceph_device = ceph_device self.crush_device_class = crush_device_class + self.being_replaced = being_replaced def __eq__(self, other): # type: (Any) -> bool @@ -129,7 +132,8 @@ class Device(object): 'lvs': self.lvs if self.lvs else 'None', 'available': str(self.available), 'ceph_device': str(self.ceph_device), - 'crush_device_class': str(self.crush_device_class) + 'crush_device_class': str(self.crush_device_class), + 'being_replaced': str(self.being_replaced) } if not self.available and self.rejected_reasons: device_desc['rejection reasons'] = self.rejected_reasons diff --git a/src/python-common/ceph/deployment/service_spec.py b/src/python-common/ceph/deployment/service_spec.py index a69b3a25dcd..6869d5b2188 100644 --- a/src/python-common/ceph/deployment/service_spec.py +++ b/src/python-common/ceph/deployment/service_spec.py @@ -5,7 +5,7 @@ import enum from collections import OrderedDict from contextlib import contextmanager from functools import wraps -from ipaddress import ip_network, ip_address +from ipaddress import ip_network, ip_address, ip_interface from typing import ( Any, Callable, @@ -25,7 +25,9 @@ from typing import ( import yaml from ceph.deployment.hostspec import HostSpec, SpecValidationError, assert_valid_host -from ceph.deployment.utils import unwrap_ipv6, valid_addr +from ceph.deployment.utils import unwrap_ipv6, valid_addr, verify_non_negative_int +from ceph.deployment.utils import verify_positive_int, verify_non_negative_number +from ceph.deployment.utils import verify_boolean, verify_enum from ceph.utils import is_hex ServiceSpecT = TypeVar('ServiceSpecT', bound='ServiceSpec') @@ -527,8 +529,8 @@ pattern_type=PatternType.fnmatch)) labels = [x for x in strings if 'label:' in x] if len(labels) > 1: raise SpecValidationError('more than one label provided: {}'.format(labels)) - for l in labels: - strings.remove(l) + for lbl in labels: + strings.remove(lbl) label = labels[0][6:] if labels else None host_patterns = strings @@ -701,7 +703,7 @@ class ArgumentSpec: if isinstance(data, str): return cls(data, split=True, origin=cls.OriginalType.STRING) if 'argument' not in data: - raise SpecValidationError(f'ArgumentSpec must have an "argument" field') + raise SpecValidationError('ArgumentSpec must have an "argument" field') for k in data.keys(): if k not in cls._fields: raise SpecValidationError(f'ArgumentSpec got an unknown field {k!r}') @@ -766,6 +768,7 @@ class ServiceSpec(object): 'grafana', 'ingress', 'mgmt-gateway', + 'oauth2-proxy', 'iscsi', 'jaeger-agent', 'jaeger-collector', @@ -821,6 +824,7 @@ class ServiceSpec(object): 'alertmanager': AlertManagerSpec, 'ingress': IngressSpec, 'mgmt-gateway': MgmtGatewaySpec, + 'oauth2-proxy': OAuth2ProxySpec, 'container': CustomContainerSpec, 'grafana': GrafanaSpec, 'node-exporter': MonitoringSpec, @@ -1206,7 +1210,7 @@ class RGWSpec(ServiceSpec): rgw_zonegroup: Optional[str] = None, rgw_zone: Optional[str] = None, rgw_frontend_port: Optional[int] = None, - rgw_frontend_ssl_certificate: Optional[List[str]] = None, + rgw_frontend_ssl_certificate: Optional[Union[str, List[str]]] = None, rgw_frontend_type: Optional[str] = None, rgw_frontend_extra_args: Optional[List[str]] = None, unmanaged: bool = False, @@ -1221,11 +1225,13 @@ class RGWSpec(ServiceSpec): rgw_realm_token: Optional[str] = None, update_endpoints: Optional[bool] = False, zone_endpoints: Optional[str] = None, # comma separated endpoints list - zonegroup_hostnames: Optional[str] = None, + zonegroup_hostnames: Optional[List[str]] = None, rgw_user_counters_cache: Optional[bool] = False, rgw_user_counters_cache_size: Optional[int] = None, rgw_bucket_counters_cache: Optional[bool] = False, rgw_bucket_counters_cache_size: Optional[int] = None, + generate_cert: bool = False, + disable_multisite_sync_traffic: Optional[bool] = None, ): assert service_type == 'rgw', service_type @@ -1255,7 +1261,8 @@ class RGWSpec(ServiceSpec): #: Port of the RGW daemons self.rgw_frontend_port: Optional[int] = rgw_frontend_port #: List of SSL certificates - self.rgw_frontend_ssl_certificate: Optional[List[str]] = rgw_frontend_ssl_certificate + self.rgw_frontend_ssl_certificate: Optional[Union[str, List[str]]] \ + = rgw_frontend_ssl_certificate #: civetweb or beast (default: beast). See :ref:`rgw_frontends` self.rgw_frontend_type: Optional[str] = rgw_frontend_type #: List of extra arguments for rgw_frontend in the form opt=value. See :ref:`rgw_frontends` @@ -1275,6 +1282,10 @@ class RGWSpec(ServiceSpec): self.rgw_bucket_counters_cache = rgw_bucket_counters_cache #: Used to set number of entries in each cache of bucket counters self.rgw_bucket_counters_cache_size = rgw_bucket_counters_cache_size + #: Whether we should generate a cert/key for the user if not provided + self.generate_cert = generate_cert + #: Used to make RGW not do multisite replication so it can dedicate to IO + self.disable_multisite_sync_traffic = disable_multisite_sync_traffic def get_port_start(self) -> List[int]: return [self.get_port()] @@ -1303,6 +1314,14 @@ class RGWSpec(ServiceSpec): 'Additional rgw type parameters can be passed using rgw_frontend_extra_args.' ) + if self.generate_cert and not self.ssl: + raise SpecValidationError('"ssl" field must be set to true when "generate_cert" ' + 'is set to true') + + if self.generate_cert and self.rgw_frontend_ssl_certificate: + raise SpecValidationError('"generate_cert" field and "rgw_frontend_ssl_certificate" ' + 'field are mutually exclusive') + yaml.add_representer(RGWSpec, ServiceSpec.yaml_representer) @@ -1313,31 +1332,51 @@ class NvmeofServiceSpec(ServiceSpec): service_id: Optional[str] = None, name: Optional[str] = None, group: Optional[str] = None, + addr: Optional[str] = None, + addr_map: Optional[Dict[str, str]] = None, port: Optional[int] = None, pool: Optional[str] = None, enable_auth: bool = False, state_update_notify: Optional[bool] = True, state_update_interval_sec: Optional[int] = 5, enable_spdk_discovery_controller: Optional[bool] = False, + enable_key_encryption: Optional[bool] = True, + encryption_key: Optional[str] = None, + rebalance_period_sec: Optional[int] = 7, + max_gws_in_grp: Optional[int] = 16, + max_ns_to_change_lb_grp: Optional[int] = 8, omap_file_lock_duration: Optional[int] = 20, omap_file_lock_retries: Optional[int] = 30, omap_file_lock_retry_sleep_interval: Optional[float] = 1.0, omap_file_update_reloads: Optional[int] = 10, enable_prometheus_exporter: Optional[bool] = True, + prometheus_port: Optional[int] = 10008, + prometheus_stats_interval: Optional[int] = 10, bdevs_per_cluster: Optional[int] = 32, verify_nqns: Optional[bool] = True, + verify_keys: Optional[bool] = True, allowed_consecutive_spdk_ping_failures: Optional[int] = 1, spdk_ping_interval_in_seconds: Optional[float] = 2.0, + ping_spdk_under_lock: Optional[bool] = False, + max_hosts_per_namespace: Optional[int] = 8, + max_namespaces_with_netmask: Optional[int] = 1000, + max_subsystems: Optional[int] = 128, + max_namespaces: Optional[int] = 1024, + max_namespaces_per_subsystem: Optional[int] = 256, + max_hosts_per_subsystem: Optional[int] = 32, server_key: Optional[str] = None, server_cert: Optional[str] = None, client_key: Optional[str] = None, client_cert: Optional[str] = None, root_ca_cert: Optional[str] = None, + # unused and duplicate of tgt_path below, consider removing spdk_path: Optional[str] = None, + spdk_mem_size: Optional[int] = None, tgt_path: Optional[str] = None, spdk_timeout: Optional[float] = 60.0, spdk_log_level: Optional[str] = '', spdk_protocol_log_level: Optional[str] = 'WARNING', + spdk_log_file_dir: Optional[str] = '', rpc_socket_dir: Optional[str] = '/var/tmp/', rpc_socket_name: Optional[str] = 'spdk.sock', conn_retries: Optional[int] = 10, @@ -1345,6 +1384,9 @@ class NvmeofServiceSpec(ServiceSpec): transport_tcp_options: Optional[Dict[str, int]] = {"in_capsule_data_size": 8192, "max_io_qpairs_per_ctrlr": 7}, tgt_cmd_extra_args: Optional[str] = None, + iobuf_options: Optional[Dict[str, int]] = None, + discovery_addr: Optional[str] = None, + discovery_addr_map: Optional[Dict[str, str]] = None, discovery_port: Optional[int] = None, log_level: Optional[str] = 'INFO', log_files_enabled: Optional[bool] = True, @@ -1356,6 +1398,7 @@ class NvmeofServiceSpec(ServiceSpec): log_directory: Optional[str] = '/var/log/ceph/', monitor_timeout: Optional[float] = 1.0, enable_monitor_client: bool = True, + monitor_client_log_file_dir: Optional[str] = '', placement: Optional[PlacementSpec] = None, unmanaged: bool = False, preview_only: bool = False, @@ -1376,6 +1419,10 @@ class NvmeofServiceSpec(ServiceSpec): #: RADOS pool where ceph-nvmeof config data is stored. self.pool = pool + #: ``addr`` address of the nvmeof gateway + self.addr = addr + #: ``addr_map`` per node address map of the nvmeof gateways + self.addr_map = addr_map #: ``port`` port of the nvmeof gateway self.port = port or 5500 #: ``name`` name of the nvmeof gateway @@ -1390,10 +1437,26 @@ class NvmeofServiceSpec(ServiceSpec): self.state_update_interval_sec = state_update_interval_sec #: ``enable_spdk_discovery_controller`` SPDK or ceph-nvmeof discovery service self.enable_spdk_discovery_controller = enable_spdk_discovery_controller + #: ``enable_key_encryption`` encrypt DHCHAP and PSK keys before saving in OMAP + self.enable_key_encryption = enable_key_encryption + #: ``encryption_key`` gateway encryption key + self.encryption_key = encryption_key + #: ``rebalance_period_sec`` number of seconds between cycles of auto namesapce rebalancing + self.rebalance_period_sec = rebalance_period_sec + #: ``max_gws_in_grp`` max number of gateways in one group + self.max_gws_in_grp = max_gws_in_grp + #: ``max_ns_to_change_lb_grp`` max number of namespaces before switching to a new lb group + self.max_ns_to_change_lb_grp = max_ns_to_change_lb_grp #: ``enable_prometheus_exporter`` enables Prometheus exporter self.enable_prometheus_exporter = enable_prometheus_exporter + #: ``prometheus_port`` Prometheus port + self.prometheus_port = prometheus_port or 10008 + #: ``prometheus_stats_interval`` Prometheus get stats interval + self.prometheus_stats_interval = prometheus_stats_interval #: ``verify_nqns`` enables verification of subsystem and host NQNs for validity self.verify_nqns = verify_nqns + #: ``verify_keys`` enables verification of PSJ and DHCHAP keys in the gateway + self.verify_keys = verify_keys #: ``omap_file_lock_duration`` number of seconds before automatically unlock OMAP file lock self.omap_file_lock_duration = omap_file_lock_duration #: ``omap_file_lock_retries`` number of retries to lock OMAP file before giving up @@ -1402,10 +1465,24 @@ class NvmeofServiceSpec(ServiceSpec): self.omap_file_lock_retry_sleep_interval = omap_file_lock_retry_sleep_interval #: ``omap_file_update_reloads`` number of attempt to reload OMAP when it differs from local self.omap_file_update_reloads = omap_file_update_reloads + #: ``max_hosts_per_namespace`` max number of hosts per namespace + self.max_hosts_per_namespace = max_hosts_per_namespace + #: ``max_namespaces_with_netmask`` max number of namespaces which are not auto visible + self.max_namespaces_with_netmask = max_namespaces_with_netmask + #: ``max_subsystems`` max number of subsystems + self.max_subsystems = max_subsystems + #: ``max_namespaces`` max number of namespaces on all subsystems + self.max_namespaces = max_namespaces + #: ``max_namespaces_per_subsystem`` max number of namespaces per one subsystem + self.max_namespaces_per_subsystem = max_namespaces_per_subsystem + #: ``max_hosts_per_subsystem`` max number of hosts per subsystems + self.max_hosts_per_subsystem = max_hosts_per_subsystem #: ``allowed_consecutive_spdk_ping_failures`` # of ping failures before aborting gateway self.allowed_consecutive_spdk_ping_failures = allowed_consecutive_spdk_ping_failures #: ``spdk_ping_interval_in_seconds`` sleep interval in seconds between SPDK pings self.spdk_ping_interval_in_seconds = spdk_ping_interval_in_seconds + #: ``ping_spdk_under_lock`` whether or not we should perform SPDK ping under the RPC lock + self.ping_spdk_under_lock = ping_spdk_under_lock #: ``bdevs_per_cluster`` number of bdevs per cluster self.bdevs_per_cluster = bdevs_per_cluster #: ``server_key`` gateway server key @@ -1418,19 +1495,23 @@ class NvmeofServiceSpec(ServiceSpec): self.client_cert = client_cert #: ``root_ca_cert`` CA cert for server/client certs self.root_ca_cert = root_ca_cert - #: ``spdk_path`` path to SPDK + #: ``spdk_path`` path is unused and duplicate of tgt_path below, consider removing self.spdk_path = spdk_path or '/usr/local/bin/nvmf_tgt' + #: ``spdk_mem_size`` memory size in MB for DPDK + self.spdk_mem_size = spdk_mem_size #: ``tgt_path`` nvmeof target path self.tgt_path = tgt_path or '/usr/local/bin/nvmf_tgt' #: ``spdk_timeout`` SPDK connectivity timeout self.spdk_timeout = spdk_timeout #: ``spdk_log_level`` the SPDK log level self.spdk_log_level = spdk_log_level - #: ``spdk_protocol_log_level`` the SPDK-GW protocol log level + #: ``spdk_protocol_log_level`` the SPDK protocol log level self.spdk_protocol_log_level = spdk_protocol_log_level or 'WARNING' - #: ``rpc_socket_dir`` the SPDK socket file directory + #: ``spdk_log_file_dir`` the SPDK log output file file directory + self.spdk_log_file_dir = spdk_log_file_dir + #: ``rpc_socket_dir`` the SPDK RPC socket file directory self.rpc_socket_dir = rpc_socket_dir or '/var/tmp/' - #: ``rpc_socket_name`` the SPDK socket file name + #: ``rpc_socket_name`` the SPDK RPC socket file name self.rpc_socket_name = rpc_socket_name or 'spdk.sock' #: ``conn_retries`` ceph connection retries number self.conn_retries = conn_retries @@ -1440,6 +1521,12 @@ class NvmeofServiceSpec(ServiceSpec): self.transport_tcp_options: Optional[Dict[str, int]] = transport_tcp_options #: ``tgt_cmd_extra_args`` extra arguments for the nvmf_tgt process self.tgt_cmd_extra_args = tgt_cmd_extra_args + #: List of extra arguments for SPDK iobuf in the form opt=value + self.iobuf_options: Optional[Dict[str, int]] = iobuf_options + #: ``discovery_addr`` address of the discovery service + self.discovery_addr = discovery_addr + #: ``discovery_addr_map`` per node address map of the discovery service + self.discovery_addr_map = discovery_addr_map #: ``discovery_port`` port of the discovery service self.discovery_port = discovery_port or 8009 #: ``log_level`` the nvmeof gateway log level @@ -1462,9 +1549,11 @@ class NvmeofServiceSpec(ServiceSpec): self.monitor_timeout = monitor_timeout #: ``enable_monitor_client`` whether to connect to the ceph monitor or not self.enable_monitor_client = enable_monitor_client + #: ``monitor_client_log_file_dir`` the monitor client log output file file directory + self.monitor_client_log_file_dir = monitor_client_log_file_dir def get_port_start(self) -> List[int]: - return [5500, 4420, 8009] + return [self.port, 4420, self.discovery_port] def validate(self) -> None: # TODO: what other parameters should be validated as part of this function? @@ -1473,6 +1562,7 @@ class NvmeofServiceSpec(ServiceSpec): if not self.pool: raise SpecValidationError('Cannot add NVMEOF: No Pool specified') + verify_boolean(self.enable_auth, "Enable authentication") if self.enable_auth: if not all([self.server_key, self.server_cert, self.client_key, self.client_cert, self.root_ca_cert]): @@ -1487,112 +1577,65 @@ class NvmeofServiceSpec(ServiceSpec): if self.transports not in ['tcp']: raise SpecValidationError('Invalid transport. Valid values are tcp') - if self.log_level: - if self.log_level.lower() not in ['debug', - 'info', - 'warning', - 'error', - 'critical']: - raise SpecValidationError( - 'Invalid log level. Valid values are: debug, info, warning, error, critial') - - if self.spdk_log_level: - if self.spdk_log_level.lower() not in ['debug', - 'info', - 'warning', - 'error', - 'notice']: - raise SpecValidationError( - 'Invalid SPDK log level. Valid values are: ' - 'DEBUG, INFO, WARNING, ERROR, NOTICE') - - if self.spdk_protocol_log_level: - if self.spdk_protocol_log_level.lower() not in ['debug', - 'info', - 'warning', - 'error', - 'notice']: - raise SpecValidationError( - 'Invalid SPDK protocol log level. Valid values are: ' - 'DEBUG, INFO, WARNING, ERROR, NOTICE') + verify_enum(self.log_level, "log level", ['debug', 'info', 'warning', 'error', 'critical']) + verify_enum(self.spdk_log_level, "SPDK log level", + ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'NOTICE']) + verify_enum(self.spdk_protocol_log_level, "SPDK protocol log level", + ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'NOTICE']) + verify_positive_int(self.bdevs_per_cluster, "Bdevs per cluster") + if self.bdevs_per_cluster is not None and self.bdevs_per_cluster < 1: + raise SpecValidationError("Bdevs per cluster should be at least 1") + verify_non_negative_number(self.spdk_ping_interval_in_seconds, "SPDK ping interval") if ( - self.spdk_ping_interval_in_seconds + self.spdk_ping_interval_in_seconds is not None and self.spdk_ping_interval_in_seconds < 1.0 ): raise SpecValidationError("SPDK ping interval should be at least 1 second") + verify_non_negative_int(self.allowed_consecutive_spdk_ping_failures, + "Allowed consecutive SPDK ping failures") if ( - self.allowed_consecutive_spdk_ping_failures + self.allowed_consecutive_spdk_ping_failures is not None and self.allowed_consecutive_spdk_ping_failures < 1 ): raise SpecValidationError("Allowed consecutive SPDK ping failures should be at least 1") - if ( - self.state_update_interval_sec - and self.state_update_interval_sec < 0 - ): - raise SpecValidationError("State update interval can't be negative") - - if ( - self.omap_file_lock_duration - and self.omap_file_lock_duration < 0 - ): - raise SpecValidationError("OMAP file lock duration can't be negative") - - if ( - self.omap_file_lock_retries - and self.omap_file_lock_retries < 0 - ): - raise SpecValidationError("OMAP file lock retries can't be negative") - - if ( - self.omap_file_update_reloads - and self.omap_file_update_reloads < 0 - ): - raise SpecValidationError("OMAP file reloads can't be negative") - - if ( - self.spdk_timeout - and self.spdk_timeout < 0.0 - ): - raise SpecValidationError("SPDK timeout can't be negative") - - if ( - self.conn_retries - and self.conn_retries < 0 - ): - raise SpecValidationError("Connection retries can't be negative") - - if ( - self.max_log_file_size_in_mb - and self.max_log_file_size_in_mb < 0 - ): - raise SpecValidationError("Log file size can't be negative") - - if ( - self.max_log_files_count - and self.max_log_files_count < 0 - ): - raise SpecValidationError("Log files count can't be negative") - - if ( - self.max_log_directory_backups - and self.max_log_directory_backups < 0 - ): - raise SpecValidationError("Log file directory backups can't be negative") - - if ( - self.monitor_timeout - and self.monitor_timeout < 0.0 - ): - raise SpecValidationError("Monitor timeout can't be negative") - - if self.port and self.port < 0: - raise SpecValidationError("Port can't be negative") - - if self.discovery_port and self.discovery_port < 0: - raise SpecValidationError("Discovery port can't be negative") + verify_non_negative_int(self.state_update_interval_sec, "State update interval") + verify_non_negative_int(self.rebalance_period_sec, "Rebalance period") + verify_non_negative_int(self.max_gws_in_grp, "Max gateways in group") + verify_non_negative_int(self.max_ns_to_change_lb_grp, + "Max namespaces to change load balancing group") + verify_non_negative_int(self.omap_file_lock_duration, "OMAP file lock duration") + verify_non_negative_number(self.omap_file_lock_retry_sleep_interval, + "OMAP file lock sleep interval") + verify_non_negative_int(self.omap_file_lock_retries, "OMAP file lock retries") + verify_non_negative_int(self.omap_file_update_reloads, "OMAP file reloads") + verify_non_negative_number(self.spdk_timeout, "SPDK timeout") + verify_non_negative_int(self.max_log_file_size_in_mb, "Log file size") + verify_non_negative_int(self.max_log_files_count, "Log files count") + verify_non_negative_int(self.max_log_directory_backups, "Log file directory backups") + verify_non_negative_int(self.max_hosts_per_namespace, "Max hosts per namespace") + verify_non_negative_int(self.max_namespaces_with_netmask, "Max namespaces with netmask") + verify_positive_int(self.max_subsystems, "Max subsystems") + verify_positive_int(self.max_namespaces, "Max namespaces") + verify_positive_int(self.max_namespaces_per_subsystem, "Max namespaces per subsystem") + verify_positive_int(self.max_hosts_per_subsystem, "Max hosts per subsystem") + verify_non_negative_number(self.monitor_timeout, "Monitor timeout") + verify_non_negative_int(self.port, "Port") + verify_non_negative_int(self.discovery_port, "Discovery port") + verify_non_negative_int(self.prometheus_port, "Prometheus port") + verify_non_negative_int(self.prometheus_stats_interval, "Prometheus stats interval") + verify_boolean(self.state_update_notify, "State update notify") + verify_boolean(self.enable_spdk_discovery_controller, "Enable SPDK discovery controller") + verify_boolean(self.enable_key_encryption, "Enable key encryption") + verify_boolean(self.enable_prometheus_exporter, "Enable Prometheus exporter") + verify_boolean(self.verify_nqns, "Verify NQNs") + verify_boolean(self.verify_keys, "Verify Keys") + verify_boolean(self.log_files_enabled, "Log files enabled") + verify_boolean(self.log_files_rotation_enabled, "Log files rotation enabled") + verify_boolean(self.verbose_log_messages, "Verbose log messages") + verify_boolean(self.enable_monitor_client, "Enable monitor client") yaml.add_representer(NvmeofServiceSpec, ServiceSpec.yaml_representer) @@ -1756,7 +1799,7 @@ class IngressSpec(ServiceSpec): if not self.keepalive_only and not self.frontend_port: raise SpecValidationError( 'Cannot add ingress: No frontend_port specified') - if not self.monitor_port: + if not self.keepalive_only and not self.monitor_port: raise SpecValidationError( 'Cannot add ingress: No monitor_port specified') if not self.virtual_ip and not self.virtual_ips_list: @@ -1785,6 +1828,7 @@ class MgmtGatewaySpec(ServiceSpec): networks: Optional[List[str]] = None, placement: Optional[PlacementSpec] = None, disable_https: Optional[bool] = False, + enable_auth: Optional[bool] = False, port: Optional[int] = None, ssl_certificate: Optional[str] = None, ssl_certificate_key: Optional[str] = None, @@ -1797,6 +1841,8 @@ class MgmtGatewaySpec(ServiceSpec): ssl_stapling_verify: Optional[str] = None, ssl_protocols: Optional[List[str]] = None, ssl_ciphers: Optional[List[str]] = None, + enable_health_check_endpoint: bool = False, + virtual_ip: Optional[str] = None, preview_only: bool = False, unmanaged: bool = False, extra_container_args: Optional[GeneralArgList] = None, @@ -1816,6 +1862,8 @@ class MgmtGatewaySpec(ServiceSpec): ) #: Is a flag to disable HTTPS. If True, the server will use unsecure HTTP self.disable_https = disable_https + #: Is a flag to enable SSO auth. Requires oauth2-proxy to be active for SSO authentication. + self.enable_auth = enable_auth #: The port number on which the server will listen self.port = port #: A multi-line string that contains the SSL certificate @@ -1840,6 +1888,8 @@ class MgmtGatewaySpec(ServiceSpec): self.ssl_protocols = ssl_protocols #: List of supported secure SSL ciphers. Changing this list may reduce system security. self.ssl_ciphers = ssl_ciphers + self.enable_health_check_endpoint = enable_health_check_endpoint + self.virtual_ip = virtual_ip def get_port_start(self) -> List[int]: ports = [] @@ -1906,6 +1956,129 @@ class MgmtGatewaySpec(ServiceSpec): yaml.add_representer(MgmtGatewaySpec, ServiceSpec.yaml_representer) +class OAuth2ProxySpec(ServiceSpec): + def __init__(self, + service_type: str = 'oauth2-proxy', + service_id: Optional[str] = None, + config: Optional[Dict[str, str]] = None, + networks: Optional[List[str]] = None, + placement: Optional[PlacementSpec] = None, + https_address: Optional[str] = None, + provider_display_name: Optional[str] = None, + client_id: Optional[str] = None, + client_secret: Optional[str] = None, + oidc_issuer_url: Optional[str] = None, + redirect_url: Optional[str] = None, + cookie_secret: Optional[str] = None, + ssl_certificate: Optional[str] = None, + ssl_certificate_key: Optional[str] = None, + allowlist_domains: Optional[List[str]] = None, + unmanaged: bool = False, + extra_container_args: Optional[GeneralArgList] = None, + extra_entrypoint_args: Optional[GeneralArgList] = None, + custom_configs: Optional[List[CustomConfig]] = None, + ): + assert service_type == 'oauth2-proxy' + + super(OAuth2ProxySpec, self).__init__( + 'oauth2-proxy', service_id=service_id, + placement=placement, config=config, + networks=networks, + extra_container_args=extra_container_args, + extra_entrypoint_args=extra_entrypoint_args, + custom_configs=custom_configs + ) + #: The address for HTTPS connections, formatted as 'host:port'. + self.https_address = https_address + #: The display name for the identity provider (IDP) in the UI. + self.provider_display_name = provider_display_name + #: The client ID for authenticating with the identity provider. + self.client_id = client_id + #: The client secret for authenticating with the identity provider. + self.client_secret = client_secret + #: The URL of the OpenID Connect (OIDC) issuer. + self.oidc_issuer_url = oidc_issuer_url + #: The URL oauth2-proxy will redirect to after a successful login. If not provided + # cephadm will calculate automatically the value of this url. + self.redirect_url = redirect_url + #: The secret key used for signing cookies. Its length must be 16, + # 24, or 32 bytes to create an AES cipher. + self.cookie_secret = cookie_secret + #: The multi-line SSL certificate for encrypting communications. + self.ssl_certificate = ssl_certificate + #: The multi-line SSL certificate private key for decrypting communications. + self.ssl_certificate_key = ssl_certificate_key + #: List of allowed domains for safe redirection after login or logout, + # preventing unauthorized redirects. + self.allowlist_domains = allowlist_domains + self.unmanaged = unmanaged + + def get_port_start(self) -> List[int]: + ports = [4180] + return ports + + def validate(self) -> None: + super(OAuth2ProxySpec, self).validate() + self._validate_non_empty_string(self.provider_display_name, "provider_display_name") + self._validate_non_empty_string(self.client_id, "client_id") + self._validate_non_empty_string(self.client_secret, "client_secret") + self._validate_cookie_secret(self.cookie_secret) + self._validate_url(self.oidc_issuer_url, "oidc_issuer_url") + if self.redirect_url is not None: + self._validate_url(self.redirect_url, "redirect_url") + if self.https_address is not None: + self._validate_https_address(self.https_address) + + def _validate_non_empty_string(self, value: Optional[str], field_name: str) -> None: + if not value or not isinstance(value, str) or not value.strip(): + raise SpecValidationError(f"Invalid {field_name}: Must be a non-empty string.") + + def _validate_url(self, url: Optional[str], field_name: str) -> None: + from urllib.parse import urlparse + try: + result = urlparse(url) + except Exception as e: + raise SpecValidationError(f"Invalid {field_name}: {e}. Must be a valid URL.") + else: + if not all([result.scheme, result.netloc]): + raise SpecValidationError(f"Error parsing {field_name} field: Must be a valid URL.") + + def _validate_https_address(self, https_address: Optional[str]) -> None: + from urllib.parse import urlparse + result = urlparse(f'http://{https_address}') + # Check if netloc contains a valid IP or hostname and a port + if not result.netloc or ':' not in result.netloc: + raise SpecValidationError("Invalid https_address: Valid format [IP|hostname]:port.") + # Split netloc into hostname and port + hostname, port = result.netloc.rsplit(':', 1) + # Validate port + if not port.isdigit() or not (0 <= int(port) <= 65535): + raise SpecValidationError("Invalid https_address: Port must be between 0 and 65535.") + + def _validate_cookie_secret(self, cookie_secret: Optional[str]) -> None: + if cookie_secret is None: + return + if not isinstance(cookie_secret, str): + raise SpecValidationError("Invalid cookie_secret: Must be a non-empty string.") + + import base64 + import binascii + try: + # Try decoding the cookie_secret as base64 + decoded_secret = base64.urlsafe_b64decode(cookie_secret) + length = len(decoded_secret) + except binascii.Error: + # If decoding fails, consider it as a plain string + length = len(cookie_secret.encode('utf-8')) + + if length not in [16, 24, 32]: + raise SpecValidationError(f"cookie_secret is {length} bytes " + "but must be 16, 24, or 32 bytes to create an AES cipher.") + + +yaml.add_representer(OAuth2ProxySpec, ServiceSpec.yaml_representer) + + class InitContainerSpec(object): """An init container is not a service that lives on its own, but rather is used to run and exit prior to a service container starting in order @@ -2161,6 +2334,7 @@ class AlertManagerSpec(MonitoringSpec): user_data: Optional[Dict[str, Any]] = None, config: Optional[Dict[str, str]] = None, networks: Optional[List[str]] = None, + only_bind_port_on_networks: bool = False, port: Optional[int] = None, secure: bool = False, extra_container_args: Optional[GeneralArgList] = None, @@ -2191,6 +2365,7 @@ class AlertManagerSpec(MonitoringSpec): # <webhook_configs> configuration. self.user_data = user_data or {} self.secure = secure + self.only_bind_port_on_networks = only_bind_port_on_networks def get_port_start(self) -> List[int]: return [self.get_port(), 9094] @@ -2237,7 +2412,7 @@ class GrafanaSpec(MonitoringSpec): self.protocol = protocol # whether ports daemons for this service bind to should - # bind to only hte networks listed in networks param, or + # bind to only the networks listed in networks param, or # to all networks. Defaults to false which is saying to bind # on all networks. self.only_bind_port_on_networks = only_bind_port_on_networks @@ -2704,6 +2879,9 @@ class CephExporterSpec(ServiceSpec): self.prio_limit = prio_limit self.stats_period = stats_period + def get_port_start(self) -> List[int]: + return [self.port or 9926] + def validate(self) -> None: super(CephExporterSpec, self).validate() @@ -2718,9 +2896,131 @@ class CephExporterSpec(ServiceSpec): yaml.add_representer(CephExporterSpec, ServiceSpec.yaml_representer) +class SMBClusterPublicIPSpec: + # The SMBClusterIPSpec must be able to translate between what cephadm + # knows about the system, networks using network addresses, and what + # ctdb wants, an IP combined with a prefixlen and device names. + def __init__( + self, + address: str, + destination: Union[str, List[str], None] = None, + ) -> None: + self.address = address + self.destination = destination + self.validate() + + def validate(self) -> None: + if not self.address: + raise SpecValidationError('address value missing') + if '/' not in self.address: + raise SpecValidationError( + 'a combined address and prefix length is required' + ) + # in the future we may want to enhance this to take IPs only and figure + # out the prefixlen automatically. However, we going to start simple and + # require prefix lengths just like ctdb itself does. + try: + # cache the parsed interface address internally + self._addr_iface = ip_interface(self.address) + except ValueError as err: + raise SpecValidationError( + f'Cannot parse interface address {self.address}' + ) from err + # we strongly prefer /{prefixlen} form, even if the user supplied + # a netmask + self.address = self._addr_iface.with_prefixlen + + self._destinations = [] + if not self.destination: + return + if isinstance(self.destination, str): + _dests = [self.destination] + elif isinstance(self.destination, list) and all( + isinstance(v, str) for v in self.destination + ): + _dests = self.destination + else: + raise ValueError( + 'destination field must be a string or list of strings' + ) + for dest in _dests: + try: + dnet = ip_network(dest) + except ValueError as err: + raise SpecValidationError( + f'Cannot parse network value {self.address}' + ) from err + self._destinations.append(dnet) + + def __eq__(self, other: Any) -> bool: + try: + return ( + other.address == self.address + and other.destination == self.destination + ) + except AttributeError: + return NotImplemented + + def __repr__(self) -> str: + return ( + f'SMBClusterPublicIPSpec({self.address!r}, {self.destination!r})' + ) + + def to_json(self) -> Dict[str, Any]: + """Return a JSON-compatible representation of the SMBClusterPublicIPSpec.""" + out: Dict[str, Any] = {'address': self.address} + if self.destination: + out['destination'] = self.destination + return out + + def to_strict(self) -> Dict[str, Any]: + """Return a strictly formed expanded JSON-compatible representation of + the spec. This is not round-trip-able. + """ + # The strict form always contains destination as a list of strings. + dests = [n.with_prefixlen for n in self._destinations] + if not dests: + dests = [self._addr_iface.network.with_prefixlen] + return { + 'address': self.address, + 'destinations': dests, + } + + @classmethod + def from_json(cls, spec: Dict[str, Any]) -> 'SMBClusterPublicIPSpec': + if 'address' not in spec: + raise SpecValidationError( + 'SMB cluster public IP spec missing required field: address' + ) + return cls(spec['address'], spec.get('destination')) + + @classmethod + def convert_list( + cls, arg: Optional[List[Any]] + ) -> Optional[List['SMBClusterPublicIPSpec']]: + if arg is None: + return None + assert isinstance(arg, list) + out = [] + for value in arg: + if isinstance(value, cls): + out.append(value) + elif hasattr(value, 'to_json'): + out.append(cls.from_json(value.to_json())) + elif isinstance(value, dict): + out.append(cls.from_json(value)) + else: + raise SpecValidationError( + f"Unknown type for SMBClusterPublicIPSpec: {type(value)}" + ) + return out + + class SMBSpec(ServiceSpec): service_type = 'smb' - _valid_features = {'domain'} + _valid_features = {'domain', 'clustered'} + _default_cluster_meta_obj = 'cluster.meta.json' + _default_cluster_lock_obj = 'cluster.meta.lock' def __init__( self, @@ -2764,6 +3064,16 @@ class SMBSpec(ServiceSpec): # automatically added to the ceph keyring provided to the samba # container. include_ceph_users: Optional[List[str]] = None, + # cluster_meta_uri - a pseudo-uri that resolves to a (rados) object + # that will store information about the state of samba cluster members + cluster_meta_uri: Optional[str] = None, + # cluster_lock_uri - a pseudo-uri that resolves to a (rados) object + # that will be used by CTDB for a cluster leader / recovery lock. + cluster_lock_uri: Optional[str] = None, + # cluster_public_addrs - A list of SMB cluster public IP specs. + # If supplied, these will be used to esatablish floating virtual ips + # managed by Samba CTDB cluster subsystem. + cluster_public_addrs: Optional[List[SMBClusterPublicIPSpec]] = None, # --- genearal tweaks --- extra_container_args: Optional[GeneralArgList] = None, extra_entrypoint_args: Optional[GeneralArgList] = None, @@ -2791,6 +3101,11 @@ class SMBSpec(ServiceSpec): self.user_sources = user_sources or [] self.custom_dns = custom_dns or [] self.include_ceph_users = include_ceph_users or [] + self.cluster_meta_uri = cluster_meta_uri + self.cluster_lock_uri = cluster_lock_uri + self.cluster_public_addrs = SMBClusterPublicIPSpec.convert_list( + cluster_public_addrs + ) self.validate() def validate(self) -> None: @@ -2801,7 +3116,51 @@ class SMBSpec(ServiceSpec): if self.features: invalid = set(self.features).difference(self._valid_features) if invalid: - raise ValueError(f'invalid feature flags: {", ".join(invalid)}') + raise ValueError( + f'invalid feature flags: {", ".join(invalid)}' + ) + if 'clustered' in self.features and not self.cluster_meta_uri: + # derive a cluster meta uri from config uri by default (if possible) + self.cluster_meta_uri = self._derive_cluster_uri( + self.config_uri, + self._default_cluster_meta_obj, + ) + if 'clustered' not in self.features and self.cluster_meta_uri: + raise ValueError( + 'cluster meta uri unsupported when "clustered" feature not set' + ) + if 'clustered' in self.features and not self.cluster_lock_uri: + # derive a cluster meta uri from config uri by default (if possible) + self.cluster_lock_uri = self._derive_cluster_uri( + self.config_uri, + self._default_cluster_lock_obj, + ) + if 'clustered' not in self.features and self.cluster_lock_uri: + raise ValueError( + 'cluster lock uri unsupported when "clustered" feature not set' + ) + for spec in self.cluster_public_addrs or []: + spec.validate() + + def _derive_cluster_uri(self, uri: str, objname: str) -> str: + if not uri.startswith('rados://'): + raise ValueError('invalid uri scheme for cluster metadata') + parts = uri[8:].split('/') + parts[-1] = objname + uri = 'rados://' + '/'.join(parts) + return uri + + def strict_cluster_ip_specs(self) -> List[Dict[str, Any]]: + return [s.to_strict() for s in (self.cluster_public_addrs or [])] + + def to_json(self) -> "OrderedDict[str, Any]": + obj = super().to_json() + spec = obj.get('spec') + if spec and spec.get('cluster_public_addrs'): + spec['cluster_public_addrs'] = [ + a.to_json() for a in spec['cluster_public_addrs'] + ] + return obj yaml.add_representer(SMBSpec, ServiceSpec.yaml_representer) diff --git a/src/python-common/ceph/deployment/translate.py b/src/python-common/ceph/deployment/translate.py index dd91b33e986..9dfe7cfcf81 100644 --- a/src/python-common/ceph/deployment/translate.py +++ b/src/python-common/ceph/deployment/translate.py @@ -5,7 +5,7 @@ try: except ImportError: pass -from ceph.deployment.drive_selection.selector import DriveSelection +from ceph.deployment.drive_selection.selector import DriveSelection # noqa: F401 logger = logging.getLogger(__name__) @@ -132,6 +132,9 @@ class to_ceph_volume(object): if self.spec.encrypted: cmds[i] += " --dmcrypt" + if self.spec.tpm2: + cmds[i] += " --with-tpm" + if self.spec.osds_per_device: cmds[i] += " --osds-per-device {}".format(self.spec.osds_per_device) diff --git a/src/python-common/ceph/deployment/utils.py b/src/python-common/ceph/deployment/utils.py index f800e373897..758eddc9412 100644 --- a/src/python-common/ceph/deployment/utils.py +++ b/src/python-common/ceph/deployment/utils.py @@ -1,7 +1,9 @@ import ipaddress import socket -from typing import Tuple, Optional +from typing import Tuple, Optional, Any from urllib.parse import urlparse +from ceph.deployment.hostspec import SpecValidationError +from numbers import Number def unwrap_ipv6(address): @@ -100,3 +102,50 @@ def valid_addr(addr: str) -> Tuple[bool, str]: if addr[0].isalpha() and '.' in addr: return _dns_lookup(addr, port) return _ip_lookup(addr, port) + + +def verify_numeric(field: Any, field_name: str) -> None: + if field is not None: + if not isinstance(field, Number) or isinstance(field, bool): + raise SpecValidationError(f"{field_name} must be a number") + + +def verify_non_negative_int(field: Any, field_name: str) -> None: + verify_numeric(field, field_name) + if field is not None: + if not isinstance(field, int) or isinstance(field, bool): + raise SpecValidationError(f"{field_name} must be an integer") + if field < 0: + raise SpecValidationError(f"{field_name} can't be negative") + + +def verify_positive_int(field: Any, field_name: str) -> None: + verify_non_negative_int(field, field_name) + if field is not None and field <= 0: + raise SpecValidationError(f"{field_name} must be greater than zero") + + +def verify_non_negative_number(field: Any, field_name: str) -> None: + verify_numeric(field, field_name) + if field is not None: + if field < 0.0: + raise SpecValidationError(f"{field_name} can't be negative") + + +def verify_boolean(field: Any, field_name: str) -> None: + if field is not None: + if not isinstance(field, bool): + raise SpecValidationError(f"{field_name} must be a boolean") + + +def verify_enum(field: Any, field_name: str, allowed: list) -> None: + if field: + allowed_lower = [] + if not isinstance(field, str): + raise SpecValidationError(f"{field_name} must be a string") + for val in allowed: + assert isinstance(val, str) + allowed_lower.append(val.lower()) + if field.lower() not in allowed_lower: + raise SpecValidationError( + f'Invalid {field_name}. Valid values are: {", ".join(allowed)}') diff --git a/src/python-common/ceph/fs/__init__.py b/src/python-common/ceph/fs/__init__.py new file mode 100644 index 00000000000..3988bf129e2 --- /dev/null +++ b/src/python-common/ceph/fs/__init__.py @@ -0,0 +1,3 @@ +import logging + +log = logging.getLogger(__name__) diff --git a/src/python-common/ceph/fs/earmarking.py b/src/python-common/ceph/fs/earmarking.py new file mode 100644 index 00000000000..f4fd4ddf96c --- /dev/null +++ b/src/python-common/ceph/fs/earmarking.py @@ -0,0 +1,168 @@ +""" +Module: CephFS Volume Earmarking + +This module provides the `CephFSVolumeEarmarking` class, which is designed to manage the earmarking +of subvolumes within a CephFS filesystem. The earmarking mechanism allows +administrators to tag specific subvolumes with identifiers that indicate their intended use +such as NFS or SMB, ensuring that only one file service is assigned to a particular subvolume +at a time. This is crucial to prevent data corruption in environments where +mixed protocol support (NFS and SMB) is not yet available. + +Key Features: +- **Set Earmark**: Assigns an earmark to a subvolume. +- **Get Earmark**: Retrieves the existing earmark of a subvolume, if any. +- **Remove Earmark**: Removes the earmark from a subvolume, making it available for reallocation. +- **Validate Earmark**: Ensures that the earmark follows the correct format and only uses +supported top-level scopes. +""" + +import errno +import enum +import logging +from typing import List, NamedTuple, Optional, Tuple, Protocol + +log = logging.getLogger(__name__) + +XATTR_SUBVOLUME_EARMARK_NAME = 'user.ceph.subvolume.earmark' + + +class FSOperations(Protocol): + """Protocol class representing the file system operations earmarking + classes will perform. + """ + + def setxattr( + self, path: str, key: str, value: bytes, flags: int + ) -> None: ... + + def getxattr(self, path: str, key: str) -> bytes: ... + + +class EarmarkTopScope(enum.Enum): + NFS = "nfs" + SMB = "smb" + + +class EarmarkException(Exception): + def __init__(self, error_code: int, error_message: str) -> None: + self.errno = error_code + self.error_str = error_message + + def to_tuple(self) -> Tuple[int, Optional[str], str]: + return self.errno, "", self.error_str + + def __str__(self) -> str: + return f"{self.errno} ({self.error_str})" + + +class EarmarkContents(NamedTuple): + top: 'EarmarkTopScope' + subsections: List[str] + + +class EarmarkParseError(ValueError): + pass + + +class CephFSVolumeEarmarking: + def __init__(self, fs: FSOperations, path: str) -> None: + self.fs = fs + self.path = path + + def _handle_cephfs_error(self, e: Exception, action: str) -> Optional[str]: + if isinstance(e, ValueError): + raise EarmarkException(errno.EINVAL, f"Invalid earmark specified: {e}") from e + elif isinstance(e, OSError): + if e.errno == errno.ENODATA: + # Return empty string when earmark is not set + log.info(f"No earmark set for the path while {action}. Returning empty result.") + return '' + else: + log.error(f"Error {action} earmark: {e}") + raise EarmarkException(-e.errno, e.strerror) from e + else: + log.error(f"Unexpected error {action} earmark: {e}") + raise EarmarkException(errno.EFAULT, f"Unexpected error {action} earmark: {e}") from e + + @staticmethod + def parse_earmark(value: str) -> Optional[EarmarkContents]: + """ + Parse an earmark value. Returns None if the value is an empty string. + Raises EarmarkParseError if the top-level scope is not valid or the earmark + string is not properly structured. + Returns an EarmarkContents for valid earmark values. + + :param value: The earmark string to parse. + :return: An EarmarkContents instance if valid, None if empty. + """ + if not value: + return None + + parts = value.split('.') + + # Check if the top-level scope is valid + if parts[0] not in (scope.value for scope in EarmarkTopScope): + raise EarmarkParseError(f"Invalid top-level scope: {parts[0]}") + + # Check if all parts are non-empty to ensure valid dot-separated format + if not all(parts): + raise EarmarkParseError("Earmark contains empty sections.") + + # Return parsed earmark with top scope and subsections + return EarmarkContents(top=EarmarkTopScope(parts[0]), subsections=parts[1:]) + + def _validate_earmark(self, earmark: str) -> bool: + """ + Validates the earmark string further by checking specific conditions for scopes like 'smb'. + + :param earmark: The earmark string to validate. + :return: True if valid, False otherwise. + """ + try: + parsed = self.parse_earmark(earmark) + except EarmarkParseError: + return False + + # If parsed is None, it's considered valid since the earmark is empty + if not parsed: + return True + + # Specific validation for 'smb' scope + if parsed.top == EarmarkTopScope.SMB: + # Valid formats: 'smb' or 'smb.cluster.{cluster_id}' + if not (len(parsed.subsections) == 0 or + (len(parsed.subsections) == 2 and + parsed.subsections[0] == 'cluster' and parsed.subsections[1])): + return False + + return True + + def get_earmark(self) -> Optional[str]: + try: + earmark_value = ( + self.fs.getxattr(self.path, XATTR_SUBVOLUME_EARMARK_NAME) + .decode('utf-8') + ) + return earmark_value + except Exception as e: + return self._handle_cephfs_error(e, "getting") + + def set_earmark(self, earmark: str) -> None: + # Validate the earmark before attempting to set it + if not self._validate_earmark(earmark): + raise EarmarkException( + errno.EINVAL, + f"Invalid earmark specified: '{earmark}'. " + "A valid earmark should either be empty or start with 'nfs' or 'smb', " + "followed by dot-separated non-empty components or simply set " + "'smb.cluster.{cluster_id}' for the smb intra-cluster scope." + ) + + try: + self.fs.setxattr(self.path, XATTR_SUBVOLUME_EARMARK_NAME, earmark.encode('utf-8'), 0) + log.info(f"Earmark '{earmark}' set on {self.path}.") + except Exception as e: + self._handle_cephfs_error(e, "setting") + + def clear_earmark(self) -> None: + self.set_earmark("") diff --git a/src/python-common/ceph/rgw/rgwam_core.py b/src/python-common/ceph/rgw/rgwam_core.py index 312d66362ec..2f8f1e92087 100644 --- a/src/python-common/ceph/rgw/rgwam_core.py +++ b/src/python-common/ceph/rgw/rgwam_core.py @@ -236,7 +236,7 @@ class ZonegroupOp: return [] def get(self, zonegroup: EntityKey = None): - ze = ZoneEnv(self.env) + ze = ZoneEnv(self.env, zg=zonegroup) params = ['zonegroup', 'get'] return RGWAdminJSONCmd(ze).run(params) @@ -323,30 +323,33 @@ class PeriodOp: self.env = env def update(self, realm: EntityKey, zonegroup: EntityKey, zone: EntityKey, commit=True): - master_zone_info = self.get_master_zone(realm, zonegroup) - master_zone = EntityName(master_zone_info['name']) if master_zone_info else zone - master_zonegroup_info = self.get_master_zonegroup(realm) - master_zonegroup = EntityName(master_zonegroup_info['name']) \ - if master_zonegroup_info else zonegroup - ze = ZoneEnv(self.env, realm=realm, zg=master_zonegroup, zone=master_zone) + ze = ZoneEnv(self.env, realm=realm, zg=zonegroup, zone=zone) params = ['period', 'update'] opt_arg_bool(params, '--commit', commit) return RGWAdminJSONCmd(ze).run(params) - def get_master_zone(self, realm, zonegroup=None): + def get_master_zone(self, realm, zonegroup): try: - ze = ZoneEnv(self.env, realm=realm, zg=zonegroup) - params = ['zone', 'get'] - return RGWAdminJSONCmd(ze).run(params) - except RGWAMCmdRunException: + # Fetch the realm period + realm_period = self.get(realm) + zonegroups = realm_period['period_map']['zonegroups'] + + # Find the master zone in the realm period data + for zonegroup_inf in zonegroups: + if zonegroup_inf['name'] == zonegroup.name: + for zone in zonegroup_inf.get('zones', []): + if zone['id'] == zonegroup_inf['master_zone']: + return zone return None - def get_master_zone_ep(self, realm, zonegroup=None): + except RGWAMCmdRunException as e: + log.error(f"Failed to fetch master zone: {e}") + return None + + def get_master_zone_ep(self, realm): try: - ze = ZoneEnv(self.env, realm=realm, zg=zonegroup) - params = ['period', 'get'] - output = RGWAdminJSONCmd(ze).run(params) - for zg in output['period_map']['zonegroups']: + realm_period = self.get(realm) + for zg in realm_period['period_map']['zonegroups']: if not bool(zg['is_master']): continue for zone in zg['zones']: @@ -358,10 +361,19 @@ class PeriodOp: def get_master_zonegroup(self, realm): try: - ze = ZoneEnv(self.env, realm=realm) - params = ['zonegroup', 'get'] - return RGWAdminJSONCmd(ze).run(params) - except RGWAMCmdRunException: + # Fetch the realm period + realm_period = self.get(realm) + master_zonegroup_id = realm_period['master_zonegroup'] + zonegroups = realm_period['period_map']['zonegroups'] + + # Find the master zonegroup in the realm period data + for zonegroup in zonegroups: + if zonegroup['id'] == master_zonegroup_id: + return zonegroup + return None + + except RGWAMCmdRunException as e: + log.error(f"Failed to fetch master zonegroup: {e}") return None def get(self, realm=None): @@ -539,7 +551,7 @@ class RGWAM: realm = self.create_realm(realm_name) zonegroup = self.create_zonegroup(realm, zonegroup_name, zonegroup_is_master=True) zone = self.create_zone(realm, zonegroup, zone_name, zone_is_master=True) - self.update_period(realm, zonegroup) + self.update_period(realm, zonegroup, zone) # Create system user, normal user and update the master zone sys_user = self.create_system_user(realm, zonegroup, zone) @@ -548,7 +560,7 @@ class RGWAM: secret = rgw_acces_key.secret_key if rgw_acces_key else '' self.zone_op().modify(zone, zonegroup, None, access_key, secret, endpoints=rgw_spec.zone_endpoints) - self.update_period(realm, zonegroup) + self.update_period(realm, zonegroup, zone) if start_radosgw and rgw_spec.zone_endpoints is None: # Instruct the orchestrator to start RGW daemons, asynchronically, this will @@ -770,22 +782,43 @@ class RGWAM: realms_info = [] for realm_name in self.realm_op().list(): realm = self.get_realm(realm_name) - master_zone_inf = self.period_op().get_master_zone(realm) - zone_ep = self.period_op().get_master_zone_ep(realm) - if master_zone_inf and 'system_key' in master_zone_inf: - access_key = master_zone_inf['system_key']['access_key'] - secret = master_zone_inf['system_key']['secret_key'] - else: - access_key = '' - secret = '' - realms_info.append({"realm_name": realm_name, - "realm_id": realm.id, - "master_zone_id": master_zone_inf['id'] if master_zone_inf else '', - "endpoint": zone_ep[0] if zone_ep else None, - "access_key": access_key, - "secret": secret}) + realm_period = self.period_op().get(realm) + master_zone_id = realm_period['master_zone'] + master_zone_name = self.get_master_zone_name(realm_period, master_zone_id) + local_zone_list = self.zone_op().list() + + # Only consider the realm if master_zone_name is in the local zone list + if master_zone_name in local_zone_list: + master_zone_inf = self.zone_op().get(EntityID(master_zone_id)) + zone_ep = self.period_op().get_master_zone_ep(realm) + + if master_zone_inf and 'system_key' in master_zone_inf: + access_key = master_zone_inf['system_key']['access_key'] + secret = master_zone_inf['system_key']['secret_key'] + else: + access_key = '' + secret = '' + + realms_info.append({ + "realm_name": realm_name, + "realm_id": realm.id, + "master_zone_id": master_zone_inf['id'] if master_zone_inf else '', + "endpoint": zone_ep[0] if zone_ep else None, + "access_key": access_key, + "secret": secret + }) + return realms_info + def get_master_zone_name(self, realm_data, master_zone_id): + # Find the zonegroups in the period_map + zonegroups = realm_data.get('period_map', {}).get('zonegroups', []) + for zonegroup in zonegroups: + for zone in zonegroup.get('zones', []): + if zone.get('id') == master_zone_id: + return zone.get('name') + return None + def zone_create(self, rgw_spec, start_radosgw, secondary_zone_period_retry_limit=5): if not rgw_spec.rgw_realm_token: @@ -811,7 +844,7 @@ class RGWAM: realm_name = realm_info['name'] realm_id = realm_info['id'] - realm = EntityID(realm_id) + realm = EntityKey(realm_name, realm_id) period_info = self.period_op().get(realm) period = RGWPeriod(period_info) logging.info('Period: ' + period.id) diff --git a/src/python-common/ceph/tests/test_earmarking.py b/src/python-common/ceph/tests/test_earmarking.py new file mode 100644 index 00000000000..28c54f0770c --- /dev/null +++ b/src/python-common/ceph/tests/test_earmarking.py @@ -0,0 +1,84 @@ +import pytest +import errno +from unittest import mock + +from ceph.fs.earmarking import ( + CephFSVolumeEarmarking, + EarmarkException, + EarmarkParseError, + EarmarkTopScope +) + +XATTR_SUBVOLUME_EARMARK_NAME = 'user.ceph.subvolume.earmark' + + +class TestCephFSVolumeEarmarking: + + @pytest.fixture + def mock_fs(self): + return mock.Mock() + + @pytest.fixture + def earmarking(self, mock_fs): + return CephFSVolumeEarmarking(mock_fs, "/test/path") + + def test_parse_earmark_valid(self): + earmark_value = "nfs.subsection1.subsection2" + result = CephFSVolumeEarmarking.parse_earmark(earmark_value) + assert result.top == EarmarkTopScope.NFS + assert result.subsections == ["subsection1", "subsection2"] + + def test_parse_earmark_empty_string(self): + result = CephFSVolumeEarmarking.parse_earmark("") + assert result is None + + def test_parse_earmark_invalid_scope(self): + with pytest.raises(EarmarkParseError): + CephFSVolumeEarmarking.parse_earmark("invalid.scope") + + def test_parse_earmark_empty_sections(self): + with pytest.raises(EarmarkParseError): + CephFSVolumeEarmarking.parse_earmark("nfs..section") + + def test_validate_earmark_valid_empty(self, earmarking): + assert earmarking._validate_earmark("") + + def test_validate_earmark_valid_smb(self, earmarking): + assert earmarking._validate_earmark("smb.cluster.cluster_id") + + def test_validate_earmark_invalid_smb_format(self, earmarking): + assert not earmarking._validate_earmark("smb.invalid.format") + + def test_get_earmark_success(self, earmarking): + earmarking.fs.getxattr.return_value = b'nfs.valid.earmark' + result = earmarking.get_earmark() + assert result == 'nfs.valid.earmark' + + def test_get_earmark_handle_error(self, earmarking): + earmarking.fs.getxattr.side_effect = OSError(errno.EIO, "I/O error") + with pytest.raises(EarmarkException) as excinfo: + earmarking.get_earmark() + assert excinfo.value.errno == -errno.EIO + + def test_set_earmark_valid(self, earmarking): + earmark = "nfs.valid.earmark" + earmarking.set_earmark(earmark) + earmarking.fs.setxattr.assert_called_with( + "/test/path", XATTR_SUBVOLUME_EARMARK_NAME, earmark.encode('utf-8'), 0 + ) + + def test_set_earmark_invalid(self, earmarking): + with pytest.raises(EarmarkException) as excinfo: + earmarking.set_earmark("invalid.earmark") + assert excinfo.value.errno == errno.EINVAL + + def test_set_earmark_handle_error(self, earmarking): + earmarking.fs.setxattr.side_effect = OSError(errno.EIO, "I/O error") + with pytest.raises(EarmarkException) as excinfo: + earmarking.set_earmark("nfs.valid.earmark") + assert excinfo.value.errno == -errno.EIO + + def test_clear_earmark(self, earmarking): + with mock.patch.object(earmarking, 'set_earmark') as mock_set_earmark: + earmarking.clear_earmark() + mock_set_earmark.assert_called_once_with("") diff --git a/src/python-common/ceph/tests/utils.py b/src/python-common/ceph/tests/utils.py index 04b8a4e3895..20a39e4666b 100644 --- a/src/python-common/ceph/tests/utils.py +++ b/src/python-common/ceph/tests/utils.py @@ -35,8 +35,7 @@ def _mk_device(rotational=True, )] -def _mk_inventory(devices): - # type: (Any) -> List[Device] +def _mk_inventory(devices: Any) -> List[Device]: devs = [] for dev_, name in zip(devices, map(chr, range(ord('a'), ord('z')))): dev = Device.from_json(dev_.to_json()) diff --git a/src/python-common/ceph/utils.py b/src/python-common/ceph/utils.py index e92a2d1de7d..0544e9f4173 100644 --- a/src/python-common/ceph/utils.py +++ b/src/python-common/ceph/utils.py @@ -167,3 +167,18 @@ def http_req(hostname: str = '', log.error(e) # handle error here if needed raise + + +_TRUE_VALS = {'y', 'yes', 't', 'true', 'on', '1'} +_FALSE_VALS = {'n', 'no', 'f', 'false', 'off', '0'} + + +def strtobool(value: str) -> bool: + """Convert a string to a boolean value. + Based on a simlilar function once available at distutils.util.strtobool. + """ + if value.lower() in _TRUE_VALS: + return True + if value.lower() in _FALSE_VALS: + return False + raise ValueError(f'invalid truth value {value!r}') diff --git a/src/python-common/requirements-lint.txt b/src/python-common/requirements-lint.txt deleted file mode 100644 index 2a7142182c2..00000000000 --- a/src/python-common/requirements-lint.txt +++ /dev/null @@ -1,2 +0,0 @@ -flake8==3.7.8 -rstcheck==3.3.1 diff --git a/src/python-common/tox.ini b/src/python-common/tox.ini index 313a4334d51..e0b59c700ca 100644 --- a/src/python-common/tox.ini +++ b/src/python-common/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py3, mypy, lint +envlist = lint, rstcheck, mypy, py3 skip_missing_interpreters = true [testenv:py3] @@ -26,9 +26,13 @@ exclude = __pycache__ [testenv:lint] -deps = - -rrequirements-lint.txt +deps = + flake8 commands = flake8 {posargs:ceph} - rstcheck --report info --debug README.rst +[testenv:rstcheck] +deps = + rstcheck +commands = + rstcheck --report-level info README.rst |