1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
import logging
import orchestrator
from ceph.deployment.service_spec import PlacementSpec, SMBSpec
from mgr_module import MgrModule, Option, OptionLevel
from . import (
cli,
fs,
handler,
mon_store,
rados_store,
resources,
results,
sqlite_store,
utils,
)
from .enums import AuthMode, JoinSourceType, UserGroupSourceType
from .proto import AccessAuthorizer, ConfigStore, Simplified
if TYPE_CHECKING:
import sqlite3
log = logging.getLogger(__name__)
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
MODULE_OPTIONS: List[Option] = [
Option(
'update_orchestration',
type='bool',
default=True,
desc='automatically update orchestration when smb resources are changed',
),
Option(
'internal_store_backend',
level=OptionLevel.DEV,
type='str',
default='',
desc='set internal store backend. for develoment and testing only',
),
]
def __init__(self, *args: str, **kwargs: Any) -> None:
internal_store = kwargs.pop('internal_store', None)
priv_store = kwargs.pop('priv_store', None)
public_store = kwargs.pop('public_store', None)
path_resolver = kwargs.pop('path_resolver', None)
authorizer = kwargs.pop('authorizer', None)
uo = kwargs.pop('update_orchestration', None)
super().__init__(*args, **kwargs)
# the update_orchestration property only works post-init
update_orchestration = self.update_orchestration if uo is None else uo
if internal_store is not None:
self._internal_store = internal_store
log.info('Using internal_store passed to class: {internal_store}')
else:
self._internal_store = self._backend_store(
self.internal_store_backend
)
self._priv_store = priv_store or mon_store.MonKeyConfigStore(self)
# self._public_store = public_store or mon_store.MonKeyConfigStore(self)
self._public_store = (
public_store or rados_store.RADOSConfigStore.init(self)
)
path_resolver = path_resolver or fs.CachingCephFSPathResolver(self)
# Why the honk is the cast needed but path_resolver doesn't need it??
# Sometimes mypy drives me batty.
authorizer = cast(
AccessAuthorizer, authorizer or fs.FileSystemAuthorizer(self)
)
self._handler = handler.ClusterConfigHandler(
internal_store=self._internal_store,
priv_store=self._priv_store,
public_store=self._public_store,
path_resolver=path_resolver,
authorizer=authorizer,
orch=(self if update_orchestration else None),
)
def _backend_store(self, store_conf: str = '') -> ConfigStore:
# Store conf is meant for devs, maybe testers to experiment with
# certain backend options at run time. This is not meant to be
# a formal or friendly interface.
name = 'default'
opts = {}
if store_conf:
parts = [v.strip() for v in store_conf.split(';')]
assert parts
name = parts[0]
opts = dict(p.split('=', 1) for p in parts[1:])
if name == 'default':
log.info('Using default backend: sqlite3 with mirroring')
mc_store = mon_store.ModuleConfigStore(self)
db_store = sqlite_store.mgr_sqlite3_db_with_mirroring(
self, mc_store, opts
)
return db_store
if name == 'mon':
log.info('Using specified backend: module config internal store')
return mon_store.ModuleConfigStore(self)
if name == 'db':
log.info('Using specified backend: mgr pool sqlite3 db')
return sqlite_store.mgr_sqlite3_db(self, opts)
raise ValueError(f'invalid internal store: {name}')
@property
def update_orchestration(self) -> bool:
return cast(
bool,
self.get_module_option('update_orchestration', True),
)
@property
def internal_store_backend(self) -> str:
return cast(
str,
self.get_module_option('internal_store_backend', ''),
)
@cli.SMBCommand('apply', perm='rw')
def apply_resources(self, inbuf: str) -> results.ResultGroup:
"""Create, update, or remove smb configuration resources based on YAML
or JSON specs
"""
try:
return self._handler.apply(resources.load_text(inbuf))
except resources.InvalidResourceError as err:
# convert the exception into a result and return it as the only
# item in the result group
return results.ResultGroup(
[results.InvalidResourceResult(err.resource_data, str(err))]
)
@cli.SMBCommand('cluster ls', perm='r')
def cluster_ls(self) -> List[str]:
"""List smb clusters by ID"""
return [cid for cid in self._handler.cluster_ids()]
@cli.SMBCommand('cluster create', perm='rw')
def cluster_create(
self,
cluster_id: str,
auth_mode: AuthMode,
domain_realm: str = '',
domain_join_ref: Optional[List[str]] = None,
domain_join_user_pass: Optional[List[str]] = None,
user_group_ref: Optional[List[str]] = None,
define_user_pass: Optional[List[str]] = None,
custom_dns: Optional[List[str]] = None,
placement: Optional[str] = None,
) -> results.Result:
"""Create an smb cluster"""
domain_settings = None
user_group_settings = None
to_apply: List[resources.SMBResource] = []
if domain_realm or domain_join_ref or domain_join_user_pass:
join_sources: List[resources.JoinSource] = []
# create join auth resource references
for djref in domain_join_ref or []:
join_sources.append(
resources.JoinSource(
source_type=JoinSourceType.RESOURCE,
ref=djref,
)
)
# as a "short cut" allow passing username%password combos on the
# command line for testing / automation where the auth tokens are
# single use or don't really matter security wise
for djunpw in domain_join_user_pass or []:
try:
username, password = djunpw.split('%', 1)
except ValueError:
raise ValueError(
'a domain join username & password value'
' must contain a "%" separator'
)
rname = utils.rand_name(cluster_id)
join_sources.append(
resources.JoinSource(
source_type=JoinSourceType.RESOURCE,
ref=rname,
)
)
to_apply.append(
resources.JoinAuth(
auth_id=rname,
auth=resources.JoinAuthValues(
username=username,
password=password,
),
linked_to_cluster=cluster_id,
)
)
domain_settings = resources.DomainSettings(
realm=domain_realm,
join_sources=join_sources,
)
# we don't permit creating groups on the command line. A bit too
# complex for very little payoff. We do support a very simple
# <username>%<password> split for just creating users
# However, it's much preferred to use the declarative resources for
# managing these.
user_group_settings = []
if user_group_ref:
user_group_settings += [
resources.UserGroupSource(
source_type=UserGroupSourceType.RESOURCE, ref=r
)
for r in user_group_ref
]
if define_user_pass:
users = []
for unpw in define_user_pass or []:
username, password = unpw.split('%', 1)
users.append({'name': username, 'password': password})
rname = utils.rand_name(cluster_id)
user_group_settings.append(
resources.UserGroupSource(
source_type=UserGroupSourceType.RESOURCE, ref=rname
)
)
to_apply.append(
resources.UsersAndGroups(
users_groups_id=rname,
values=resources.UserGroupSettings(
users=users,
groups=[],
),
linked_to_cluster=cluster_id,
)
)
pspec = resources.WrappedPlacementSpec.wrap(
PlacementSpec.from_string(placement)
)
cluster = resources.Cluster(
cluster_id=cluster_id,
auth_mode=auth_mode,
domain_settings=domain_settings,
user_group_settings=user_group_settings,
custom_dns=custom_dns,
placement=pspec,
)
to_apply.append(cluster)
return self._handler.apply(to_apply, create_only=True).squash(cluster)
@cli.SMBCommand('cluster rm', perm='rw')
def cluster_rm(self, cluster_id: str) -> results.Result:
"""Remove an smb cluster"""
cluster = resources.RemovedCluster(cluster_id=cluster_id)
return self._handler.apply([cluster]).one()
@cli.SMBCommand('share ls', perm='r')
def share_ls(self, cluster_id: str) -> List[str]:
"""List smb shares in a cluster by ID"""
return [
shid
for cid, shid in self._handler.share_ids()
if cid == cluster_id
]
@cli.SMBCommand('share create', perm='rw')
def share_create(
self,
cluster_id: str,
share_id: str,
cephfs_volume: str,
path: str,
# plain old 'name' conflicts with builtin options to the `ceph` command.
# use `share_name` to avoid having to `ceph -- smb share create ...`.
share_name: str = '',
subvolume: str = '',
readonly: bool = False,
) -> results.Result:
"""Create an smb share"""
share = resources.Share(
cluster_id=cluster_id,
share_id=share_id,
name=share_name,
readonly=readonly,
cephfs=resources.CephFSStorage(
volume=cephfs_volume,
path=path,
subvolume=subvolume,
),
)
return self._handler.apply([share], create_only=True).one()
@cli.SMBCommand('share rm', perm='rw')
def share_rm(self, cluster_id: str, share_id: str) -> results.Result:
"""Remove an smb share"""
share = resources.RemovedShare(
cluster_id=cluster_id, share_id=share_id
)
return self._handler.apply([share]).one()
@cli.SMBCommand('show', perm='r')
def show(self, resource_names: Optional[List[str]] = None) -> Simplified:
"""Show resources fetched from the local config store based on resource
type or resource type and id(s).
"""
if not resource_names:
resources = self._handler.all_resources()
else:
try:
resources = self._handler.matching_resources(resource_names)
except handler.InvalidResourceMatch as err:
raise cli.InvalidInputValue(str(err)) from err
if len(resources) == 1:
return resources[0].to_simplified()
return {'resources': [r.to_simplified() for r in resources]}
@cli.SMBCommand('dump cluster-config', perm='r')
def dump_config(self, cluster_id: str) -> Dict[str, Any]:
"""DEBUG: Generate an example configuration"""
# TODO: Remove this command prior to release
return self._handler.generate_config(cluster_id)
@cli.SMBCommand('dump service-spec', perm='r')
def dump_service_spec(self, cluster_id: str) -> Dict[str, Any]:
"""DEBUG: Generate an example smb service spec"""
# TODO: Remove this command prior to release
return dict(
self._handler.generate_smb_service_spec(cluster_id).to_json()
)
@cli.SMBCommand('dump everything', perm='r')
def dump_everything(self) -> Dict[str, Any]:
"""DEBUG: Show me everything"""
# TODO: Remove this command prior to release
everything: Dict[str, Any] = {}
everything['PUBLIC'] = {}
log.warning('dumping PUBLIC')
for key in self._public_store:
e = self._public_store[key]
log.warning('dumping e: %s %r', e.uri, e.full_key)
everything['PUBLIC'][e.uri] = e.get()
log.warning('dumping PRIV')
everything['PRIV'] = {}
for key in self._priv_store:
e = self._priv_store[key]
log.warning('dumping e: %s %r', e.uri, e.full_key)
everything['PRIV'][e.uri] = e.get()
log.warning('dumping INTERNAL')
everything['INTERNAL'] = {}
for key in self._internal_store:
e = self._internal_store[key]
log.warning('dumping e: %s %r', e.uri, e.full_key)
everything['INTERNAL'][e.uri] = e.get()
return everything
def submit_smb_spec(self, spec: SMBSpec) -> None:
"""Submit a new or updated smb spec object to ceph orchestration."""
completion = self.apply_smb(spec)
orchestrator.raise_if_exception(completion)
def remove_smb_service(self, service_name: str) -> None:
completion = self.remove_service(service_name)
orchestrator.raise_if_exception(completion)
def maybe_upgrade(self, db: 'sqlite3.Connection', version: int) -> None:
# Our db tables are self managed by our abstraction layer, via a store
# class, not directly by the mgr module. Disable the default behavior
# of the mgr module schema loader and use our internal_store class.
if not isinstance(self._internal_store, sqlite_store.SqliteStore):
return
log.debug('Preparing db tables')
self._internal_store.prepare(db.cursor())
|