summaryrefslogtreecommitdiffstats
path: root/src/cephadm/cephadmlib/daemons/custom.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/cephadm/cephadmlib/daemons/custom.py')
-rw-r--r--src/cephadm/cephadmlib/daemons/custom.py222
1 files changed, 222 insertions, 0 deletions
diff --git a/src/cephadm/cephadmlib/daemons/custom.py b/src/cephadm/cephadmlib/daemons/custom.py
new file mode 100644
index 00000000000..e833c80c9a5
--- /dev/null
+++ b/src/cephadm/cephadmlib/daemons/custom.py
@@ -0,0 +1,222 @@
+import logging
+import os
+import re
+
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+from ..container_daemon_form import ContainerDaemonForm, daemon_to_container
+from ..container_types import CephContainer, InitContainer
+from ..context import CephadmContext
+from ..context_getters import fetch_configs
+from ..daemon_form import register as register_daemon_form
+from ..daemon_identity import DaemonIdentity
+from ..data_utils import dict_get, dict_get_join
+from ..deploy import DeploymentType
+from ..deployment_utils import to_deployment_container
+from ..file_utils import write_new, makedirs
+from ..net_utils import EndPoint
+
+
+logger = logging.getLogger()
+
+
+@register_daemon_form
+class CustomContainer(ContainerDaemonForm):
+ """Defines a custom container"""
+
+ daemon_type = 'container'
+
+ @classmethod
+ def for_daemon_type(cls, daemon_type: str) -> bool:
+ return cls.daemon_type == daemon_type
+
+ def __init__(
+ self,
+ fsid: str,
+ daemon_id: Union[int, str],
+ config_json: Dict,
+ image: str,
+ ) -> None:
+ self.fsid = fsid
+ self.daemon_id = daemon_id
+ self.image = image
+
+ # config-json options
+ self.entrypoint = dict_get(config_json, 'entrypoint')
+ self.uid = dict_get(config_json, 'uid', 65534) # nobody
+ self.gid = dict_get(config_json, 'gid', 65534) # nobody
+ self.volume_mounts = dict_get(config_json, 'volume_mounts', {})
+ self.args = dict_get(config_json, 'args', [])
+ self.envs = dict_get(config_json, 'envs', [])
+ self.privileged = dict_get(config_json, 'privileged', False)
+ self.bind_mounts = dict_get(config_json, 'bind_mounts', [])
+ self.ports = dict_get(config_json, 'ports', [])
+ self.dirs = dict_get(config_json, 'dirs', [])
+ self.files = dict_get(config_json, 'files', {})
+
+ @classmethod
+ def init(
+ cls, ctx: CephadmContext, fsid: str, daemon_id: Union[int, str]
+ ) -> 'CustomContainer':
+ return cls(fsid, daemon_id, fetch_configs(ctx), ctx.image)
+
+ @classmethod
+ def create(
+ cls, ctx: CephadmContext, ident: DaemonIdentity
+ ) -> 'CustomContainer':
+ return cls.init(ctx, ident.fsid, ident.daemon_id)
+
+ @property
+ def identity(self) -> DaemonIdentity:
+ return DaemonIdentity(self.fsid, self.daemon_type, self.daemon_id)
+
+ def create_daemon_dirs(self, data_dir: str, uid: int, gid: int) -> None:
+ """
+ Create dirs/files below the container data directory.
+ """
+ logger.info(
+ 'Creating custom container configuration '
+ 'dirs/files in {} ...'.format(data_dir)
+ )
+
+ if not os.path.isdir(data_dir):
+ raise OSError('data_dir is not a directory: %s' % data_dir)
+
+ for dir_path in self.dirs:
+ logger.info('Creating directory: {}'.format(dir_path))
+ dir_path = os.path.join(data_dir, dir_path.strip('/'))
+ makedirs(dir_path, uid, gid, 0o755)
+
+ for file_path in self.files:
+ logger.info('Creating file: {}'.format(file_path))
+ content = dict_get_join(self.files, file_path)
+ file_path = os.path.join(data_dir, file_path.strip('/'))
+ with write_new(
+ file_path, owner=(uid, gid), encoding='utf-8'
+ ) as f:
+ f.write(content)
+
+ def get_daemon_args(self) -> List[str]:
+ return []
+
+ def get_container_args(self) -> List[str]:
+ return self.args
+
+ def get_container_envs(self) -> List[str]:
+ return self.envs
+
+ def _get_container_mounts(self, data_dir: str) -> Dict[str, str]:
+ """
+ Get the volume mounts. Relative source paths will be located below
+ `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
+
+ Example:
+ {
+ /foo/conf: /conf
+ foo/conf: /conf
+ }
+ becomes
+ {
+ /foo/conf: /conf
+ /var/lib/ceph/<cluster-fsid>/<daemon-name>/foo/conf: /conf
+ }
+ """
+ mounts = {}
+ for source, destination in self.volume_mounts.items():
+ source = os.path.join(data_dir, source)
+ mounts[source] = destination
+ return mounts
+
+ def customize_container_mounts(
+ self, ctx: CephadmContext, mounts: Dict[str, str]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ mounts.update(self._get_container_mounts(data_dir))
+
+ def _get_container_binds(self, data_dir: str) -> List[List[str]]:
+ """
+ Get the bind mounts. Relative `source=...` paths will be located below
+ `/var/lib/ceph/<cluster-fsid>/<daemon-name>`.
+
+ Example:
+ [
+ 'type=bind',
+ 'source=lib/modules',
+ 'destination=/lib/modules',
+ 'ro=true'
+ ]
+ becomes
+ [
+ ...
+ 'source=/var/lib/ceph/<cluster-fsid>/<daemon-name>/lib/modules',
+ ...
+ ]
+ """
+ binds = self.bind_mounts.copy()
+ for bind in binds:
+ for index, value in enumerate(bind):
+ match = re.match(r'^source=(.+)$', value)
+ if match:
+ bind[index] = 'source={}'.format(
+ os.path.join(data_dir, match.group(1))
+ )
+ return binds
+
+ def customize_container_binds(
+ self, ctx: CephadmContext, binds: List[List[str]]
+ ) -> None:
+ data_dir = self.identity.data_dir(ctx.data_dir)
+ binds.extend(self._get_container_binds(data_dir))
+
+ # Cache the container so we don't need to rebuild it again when calling
+ # into init_containers
+ _container: Optional[CephContainer] = None
+
+ def container(self, ctx: CephadmContext) -> CephContainer:
+ if self._container is None:
+ ctr = daemon_to_container(
+ ctx,
+ self,
+ host_network=False,
+ privileged=self.privileged,
+ ptrace=ctx.allow_ptrace,
+ )
+ self._container = to_deployment_container(ctx, ctr)
+ return self._container
+
+ def init_containers(self, ctx: CephadmContext) -> List[InitContainer]:
+ primary = self.container(ctx)
+ init_containers: List[Dict[str, Any]] = getattr(
+ ctx, 'init_containers', []
+ )
+ return [
+ InitContainer.from_primary_and_opts(ctx, primary, ic_opts)
+ for ic_opts in init_containers
+ ]
+
+ def customize_container_endpoints(
+ self, endpoints: List[EndPoint], deployment_type: DeploymentType
+ ) -> None:
+ if deployment_type == DeploymentType.DEFAULT:
+ endpoints.extend([EndPoint('0.0.0.0', p) for p in self.ports])
+
+ def customize_container_envs(
+ self, ctx: CephadmContext, envs: List[str]
+ ) -> None:
+ envs.extend(self.get_container_envs())
+
+ def customize_container_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_container_args())
+
+ def customize_process_args(
+ self, ctx: CephadmContext, args: List[str]
+ ) -> None:
+ args.extend(self.get_daemon_args())
+
+ def default_entrypoint(self) -> str:
+ return self.entrypoint or ''
+
+ def uid_gid(self, ctx: CephadmContext) -> Tuple[int, int]:
+ return self.uid, self.gid