1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
|
import logging
from typing import Any, Dict, List, Tuple, cast, Optional
from ceph.deployment.service_spec import ServiceSpec, SMBSpec
from orchestrator import DaemonDescription
from .cephadmservice import (
AuthEntity,
CephService,
CephadmDaemonDeploySpec,
simplified_keyring,
)
logger = logging.getLogger(__name__)
class SMBService(CephService):
TYPE = 'smb'
DEFAULT_EXPORTER_PORT = 9922
smb_pool = '.smb' # minor layering violation. try to clean up later.
def config(self, spec: ServiceSpec) -> None:
assert self.TYPE == spec.service_type
smb_spec = cast(SMBSpec, spec)
self._configure_cluster_meta(smb_spec)
def ranked(self, spec: ServiceSpec) -> bool:
smb_spec = cast(SMBSpec, spec)
return 'clustered' in smb_spec.features
def fence_old_ranks(
self,
spec: ServiceSpec,
rank_map: Dict[int, Dict[int, Optional[str]]],
num_ranks: int,
) -> None:
logger.warning(
'fence_old_ranks: Unsupported %r %r', rank_map, num_ranks
)
def prepare_create(
self, daemon_spec: CephadmDaemonDeploySpec
) -> CephadmDaemonDeploySpec:
assert self.TYPE == daemon_spec.daemon_type
logger.debug('smb prepare_create')
daemon_spec.final_config, daemon_spec.deps = self.generate_config(
daemon_spec
)
return daemon_spec
def generate_config(
self, daemon_spec: CephadmDaemonDeploySpec
) -> Tuple[Dict[str, Any], List[str]]:
logger.debug('smb generate_config')
assert self.TYPE == daemon_spec.daemon_type
smb_spec = cast(
SMBSpec, self.mgr.spec_store[daemon_spec.service_name].spec
)
config_blobs: Dict[str, Any] = {}
config_blobs['cluster_id'] = smb_spec.cluster_id
config_blobs['features'] = smb_spec.features
config_blobs['config_uri'] = smb_spec.config_uri
if smb_spec.join_sources:
config_blobs['join_sources'] = smb_spec.join_sources
if smb_spec.user_sources:
config_blobs['user_sources'] = smb_spec.user_sources
if smb_spec.custom_dns:
config_blobs['custom_dns'] = smb_spec.custom_dns
if smb_spec.cluster_meta_uri:
config_blobs['cluster_meta_uri'] = smb_spec.cluster_meta_uri
if smb_spec.cluster_lock_uri:
config_blobs['cluster_lock_uri'] = smb_spec.cluster_lock_uri
cluster_public_addrs = smb_spec.strict_cluster_ip_specs()
if cluster_public_addrs:
config_blobs['cluster_public_addrs'] = cluster_public_addrs
ceph_users = smb_spec.include_ceph_users or []
config_blobs.update(
self._ceph_config_and_keyring_for(
smb_spec, daemon_spec.daemon_id, ceph_users
)
)
config_blobs['metrics_image'] = (
self.mgr.container_image_samba_metrics
)
config_blobs['metrics_port'] = SMBService.DEFAULT_EXPORTER_PORT
logger.debug('smb generate_config: %r', config_blobs)
self._configure_cluster_meta(smb_spec, daemon_spec)
return config_blobs, []
def config_dashboard(
self, daemon_descrs: List[DaemonDescription]
) -> None:
# TODO ???
logger.warning('config_dashboard is a no-op')
def get_auth_entity(self, daemon_id: str, host: str = "") -> AuthEntity:
# We want a clear, distinct auth entity for fetching the config versus
# data path access.
return AuthEntity(f'client.{self.TYPE}.config.{daemon_id}')
def ignore_possible_stray(
self, service_type: str, daemon_id: str, name: str
) -> bool:
"""Called to decide if a possible stray service should be ignored
because it "virtually" belongs to a service.
This is mainly needed when properly managed services spawn layered ceph
services with different names (for example).
"""
if service_type == 'ctdb':
# in the future it would be good if the ctdb service registered
# with a name/key we could associate with a cephadm deployed smb
# service (or not). But for now we just suppress the stray service
# warning for all ctdb lock helpers using the cluster
logger.debug('ignoring possibly stray ctdb service: %s', name)
return True
return False
def _allow_config_key_command(self, name: str) -> str:
# permit the samba container config access to the mon config key store
# with keys like smb/config/<cluster_id>/*.
return f'allow command "config-key get" with "key" prefix "smb/config/{name}/"'
def _pool_caps_from_uri(self, uri: str) -> List[str]:
if not uri.startswith('rados://'):
logger.warning("ignoring unexpected uri scheme: %r", uri)
return []
part = uri[8:].rstrip('/')
if part.count('/') > 1:
# assumes no extra "/"s
pool, ns, _ = part.split('/', 2)
else:
pool, _ = part.split('/', 1)
ns = ''
if pool != self.smb_pool:
logger.debug('extracted pool %r from uri %r', pool, uri)
return [f'allow r pool={pool}']
logger.debug(
'found smb pool in uri [pool=%r, ns=%r]: %r', pool, ns, uri
)
# enhanced caps for smb pools to be used for ctdb mgmt
return [
# TODO - restrict this read access to the namespace too?
f'allow r pool={pool}',
# the x perm is needed to lock the cluster meta object
f'allow rwx pool={pool} namespace={ns} object_prefix cluster.meta.',
]
def _expand_osd_caps(self, smb_spec: SMBSpec) -> str:
caps = set()
uris = [smb_spec.config_uri]
uris.extend(smb_spec.join_sources or [])
uris.extend(smb_spec.user_sources or [])
for uri in uris:
for cap in self._pool_caps_from_uri(uri):
caps.add(cap)
return ', '.join(caps)
def _key_for_user(self, entity: str) -> str:
ret, keyring, err = self.mgr.mon_command(
{
'prefix': 'auth get',
'entity': entity,
}
)
if ret != 0:
raise ValueError(f'no auth key for user: {entity!r}')
return '\n' + simplified_keyring(entity, keyring)
def _ceph_config_and_keyring_for(
self, smb_spec: SMBSpec, daemon_id: str, ceph_users: List[str]
) -> Dict[str, str]:
ackc = self._allow_config_key_command(smb_spec.cluster_id)
wanted_caps = ['mon', f'allow r, {ackc}']
osd_caps = self._expand_osd_caps(smb_spec)
if osd_caps:
wanted_caps.append('osd')
wanted_caps.append(osd_caps)
entity = self.get_auth_entity(daemon_id)
keyring = self.get_keyring_with_caps(entity, wanted_caps)
# add additional data-path users to the ceph keyring
for ceph_user in ceph_users:
keyring += self._key_for_user(ceph_user)
return {
'config': self.mgr.get_minimal_ceph_conf(),
'keyring': keyring,
'config_auth_entity': entity,
}
def _configure_cluster_meta(
self,
smb_spec: SMBSpec,
daemon_spec: Optional[CephadmDaemonDeploySpec] = None,
) -> None:
if 'clustered' not in smb_spec.features:
logger.debug(
'smb clustering disabled: %s: lacks feature flag',
smb_spec.service_name(),
)
return
uri = smb_spec.cluster_meta_uri
if not uri:
logger.error(
'smb spec (%s) with clustering missing uri value',
smb_spec.service_name(),
)
return
logger.info('configuring smb/ctdb cluster metadata')
name = smb_spec.service_name()
rank_map = self.mgr.spec_store[name].rank_map or {}
daemons = self.mgr.cache.get_daemons_by_service(name)
logger.debug(
'smb cluster meta: name=%r rank_map=%r daemons=%r daemon_spec=%r',
name,
rank_map,
daemons,
daemon_spec,
)
from smb import clustermeta
smb_dmap: clustermeta.DaemonMap = {}
for dd in daemons:
assert dd.daemon_type and dd.daemon_id
assert dd.hostname
host_ip = dd.ip or self.mgr.inventory.get_addr(dd.hostname)
smb_dmap[dd.name()] = {
'daemon_type': dd.daemon_type,
'daemon_id': dd.daemon_id,
'hostname': dd.hostname,
'host_ip': host_ip,
# specific ctdb_ip? (someday?)
}
if daemon_spec:
host_ip = daemon_spec.ip or self.mgr.inventory.get_addr(
daemon_spec.host
)
smb_dmap[daemon_spec.name()] = {
'daemon_type': daemon_spec.daemon_type,
'daemon_id': daemon_spec.daemon_id,
'hostname': daemon_spec.host,
'host_ip': host_ip,
# specific ctdb_ip? (someday?)
}
logger.debug("smb daemon map: %r", smb_dmap)
with clustermeta.rados_object(self.mgr, uri) as cmeta:
cmeta.sync_ranks(rank_map, smb_dmap)
|