1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
|
import threading
import functools
import os
import uuid
try:
from typing import List
except ImportError:
pass # just for type checking
try:
from kubernetes import client, config
from kubernetes.client.rest import ApiException
kubernetes_imported = True
except ImportError:
kubernetes_imported = False
client = None
config = None
from mgr_module import MgrModule
import orchestrator
from .rook_cluster import RookCluster
all_completions = []
class RookReadCompletion(orchestrator.ReadCompletion):
"""
All reads are simply API calls: avoid spawning
huge numbers of threads by just running them
inline when someone calls wait()
"""
def __init__(self, cb):
super(RookReadCompletion, self).__init__()
self.cb = cb
self._result = None
self._complete = False
self.message = "<read op>"
# XXX hacky global
global all_completions
all_completions.append(self)
@property
def result(self):
return self._result
@property
def is_complete(self):
return self._complete
def execute(self):
self._result = self.cb()
self._complete = True
class RookWriteCompletion(orchestrator.WriteCompletion):
"""
Writes are a two-phase thing, firstly sending
the write to the k8s API (fast) and then waiting
for the corresponding change to appear in the
Ceph cluster (slow)
"""
# XXX kubernetes bindings call_api already usefully has
# a completion= param that uses threads. Maybe just
# use that?
def __init__(self, execute_cb, complete_cb, message):
super(RookWriteCompletion, self).__init__()
self.execute_cb = execute_cb
self.complete_cb = complete_cb
# Executed means I executed my k8s API call, it may or may
# not have succeeded
self.executed = False
# Result of k8s API call, this is set if executed==True
self._result = None
self.effective = False
self.id = str(uuid.uuid4())
self.message = message
self.error = None
# XXX hacky global
global all_completions
all_completions.append(self)
@property
def result(self):
return self._result
@property
def is_persistent(self):
return (not self.is_errored) and self.executed
@property
def is_effective(self):
return self.effective
@property
def is_errored(self):
return self.error is not None
def execute(self):
if not self.executed:
self._result = self.execute_cb()
self.executed = True
if not self.effective:
# TODO: check self.result for API errors
if self.complete_cb is None:
self.effective = True
else:
self.effective = self.complete_cb()
def deferred_read(f):
"""
Decorator to make RookOrchestrator methods return
a completion object that executes themselves.
"""
@functools.wraps(f)
def wrapper(*args, **kwargs):
return RookReadCompletion(lambda: f(*args, **kwargs))
return wrapper
class RookEnv(object):
def __init__(self):
# POD_NAMESPACE already exist for Rook 0.9
self.namespace = os.environ.get('POD_NAMESPACE', 'rook-ceph')
# ROOK_CEPH_CLUSTER_CRD_NAME is new is Rook 1.0
self.cluster_name = os.environ.get('ROOK_CEPH_CLUSTER_CRD_NAME', self.namespace)
self.operator_namespace = os.environ.get('ROOK_OPERATOR_NAMESPACE', "rook-ceph-system")
self.crd_version = os.environ.get('ROOK_CEPH_CLUSTER_CRD_VERSION', 'v1')
self.api_name = "ceph.rook.io/" + self.crd_version
def api_version_match(self):
return self.crd_version == 'v1'
def has_namespace(self):
return 'POD_NAMESPACE' in os.environ
class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
MODULE_OPTIONS = [
# TODO: configure k8s API addr instead of assuming local
]
def _progress(self, *args, **kwargs):
try:
self.remote("progress", *args, **kwargs)
except ImportError:
# If the progress module is disabled that's fine,
# they just won't see the output.
pass
def wait(self, completions):
self.log.info("wait: completions={0}".format(completions))
incomplete = False
# Our `wait` implementation is very simple because everything's
# just an API call.
for c in completions:
if not isinstance(c, RookReadCompletion) and \
not isinstance(c, RookWriteCompletion):
raise TypeError(
"wait() requires list of completions, not {0}".format(
c.__class__
))
if c.is_complete:
continue
if not c.is_read:
self._progress("update", c.id, c.message, 0.5)
try:
c.execute()
except Exception as e:
self.log.exception("Completion {0} threw an exception:".format(
c.message
))
c.error = e
c._complete = True
if not c.is_read:
self._progress("complete", c.id)
else:
if c.is_complete:
if not c.is_read:
self._progress("complete", c.id)
if not c.is_complete:
incomplete = True
return not incomplete
@staticmethod
def can_run():
if not kubernetes_imported:
return False, "`kubernetes` python module not found"
if not RookEnv().api_version_match():
return False, "Rook version unsupported."
return True, ''
def available(self):
if not kubernetes_imported:
return False, "`kubernetes` python module not found"
elif not self._rook_env.has_namespace():
return False, "ceph-mgr not running in Rook cluster"
try:
self.k8s.list_namespaced_pod(self._rook_env.cluster_name)
except ApiException as e:
return False, "Cannot reach Kubernetes API: {}".format(e)
else:
return True, ""
def __init__(self, *args, **kwargs):
super(RookOrchestrator, self).__init__(*args, **kwargs)
self._initialized = threading.Event()
self._k8s = None
self._rook_cluster = None
self._rook_env = RookEnv()
self._shutdown = threading.Event()
def shutdown(self):
self._shutdown.set()
@property
def k8s(self):
self._initialized.wait()
return self._k8s
@property
def rook_cluster(self):
# type: () -> RookCluster
self._initialized.wait()
return self._rook_cluster
def serve(self):
# For deployed clusters, we should always be running inside
# a Rook cluster. For development convenience, also support
# running outside (reading ~/.kube config)
if self._rook_env.cluster_name:
config.load_incluster_config()
cluster_name = self._rook_env.cluster_name
else:
self.log.warning("DEVELOPMENT ONLY: Reading kube config from ~")
config.load_kube_config()
cluster_name = "rook-ceph"
# So that I can do port forwarding from my workstation - jcsp
from kubernetes.client import configuration
configuration.verify_ssl = False
self._k8s = client.CoreV1Api()
try:
# XXX mystery hack -- I need to do an API call from
# this context, or subsequent API usage from handle_command
# fails with SSLError('bad handshake'). Suspect some kind of
# thread context setup in SSL lib?
self._k8s.list_namespaced_pod(cluster_name)
except ApiException:
# Ignore here to make self.available() fail with a proper error message
pass
self._rook_cluster = RookCluster(
self._k8s,
self._rook_env)
self._initialized.set()
while not self._shutdown.is_set():
# XXX hack (or is it?) to kick all completions periodically,
# in case we had a caller that wait()'ed on them long enough
# to get persistence but not long enough to get completion
global all_completions
self.wait(all_completions)
all_completions = [c for c in all_completions if not c.is_complete]
self._shutdown.wait(5)
# TODO: watch Rook for config changes to complain/update if
# things look a bit out of sync?
@deferred_read
def get_inventory(self, node_filter=None, refresh=False):
node_list = None
if node_filter and node_filter.nodes:
# Explicit node list
node_list = node_filter.nodes
elif node_filter and node_filter.labels:
# TODO: query k8s API to resolve to node list, and pass
# it into RookCluster.get_discovered_devices
raise NotImplementedError()
devs = self.rook_cluster.get_discovered_devices(node_list)
result = []
for node_name, node_devs in devs.items():
devs = []
for d in node_devs:
dev = orchestrator.InventoryDevice()
# XXX CAUTION! https://github.com/rook/rook/issues/1716
# Passing this through for the sake of completeness but it
# is not trustworthy!
dev.blank = d['empty']
dev.type = 'hdd' if d['rotational'] else 'ssd'
dev.id = d['name']
dev.size = d['size']
if d['filesystem'] == "" and not d['rotational']:
# Empty or partitioned SSD
partitioned_space = sum(
[p['size'] for p in d['Partitions']])
dev.metadata_space_free = max(0, d[
'size'] - partitioned_space)
devs.append(dev)
result.append(orchestrator.InventoryNode(node_name, devs))
return result
@deferred_read
def describe_service(self, service_type=None, service_id=None, node_name=None):
assert service_type in ("mds", "osd", "mgr", "mon", "nfs", None), service_type + " unsupported"
pods = self.rook_cluster.describe_pods(service_type, service_id, node_name)
result = []
for p in pods:
sd = orchestrator.ServiceDescription()
sd.nodename = p['nodename']
sd.container_id = p['name']
sd.service_type = p['labels']['app'].replace('rook-ceph-', '')
if sd.service_type == "osd":
sd.service_instance = "%s" % p['labels']["ceph-osd-id"]
elif sd.service_type == "mds":
sd.service = p['labels']['rook_file_system']
pfx = "{0}-".format(sd.service)
sd.service_instance = p['labels']['ceph_daemon_id'].replace(pfx, '', 1)
elif sd.service_type == "mon":
sd.service_instance = p['labels']["mon"]
elif sd.service_type == "mgr":
sd.service_instance = p['labels']["mgr"]
elif sd.service_type == "nfs":
sd.service = p['labels']['ceph_nfs']
sd.service_instance = p['labels']['instance']
sd.rados_config_location = self.rook_cluster.get_nfs_conf_url(sd.service, sd.service_instance)
elif sd.service_type == "rgw":
sd.service = p['labels']['rgw']
sd.service_instance = p['labels']['ceph_daemon_id']
else:
# Unknown type -- skip it
continue
result.append(sd)
return result
def _service_add_decorate(self, typename, spec, func):
return RookWriteCompletion(lambda: func(spec), None,
"Creating {0} services for {1}".format(typename, spec.name))
def add_stateless_service(self, service_type, spec):
# assert isinstance(spec, orchestrator.StatelessServiceSpec)
if service_type == "mds":
return self._service_add_decorate("Filesystem", spec,
self.rook_cluster.add_filesystem)
elif service_type == "rgw" :
return self._service_add_decorate("RGW", spec,
self.rook_cluster.add_objectstore)
elif service_type == "nfs" :
return self._service_add_decorate("NFS", spec,
self.rook_cluster.add_nfsgw)
else:
raise NotImplementedError(service_type)
def remove_stateless_service(self, service_type, service_id):
return RookWriteCompletion(
lambda: self.rook_cluster.rm_service(service_type, service_id), None,
"Removing {0} services for {1}".format(service_type, service_id))
def update_mons(self, num, hosts):
if hosts:
raise RuntimeError("Host list is not supported by rook.")
return RookWriteCompletion(
lambda: self.rook_cluster.update_mon_count(num), None,
"Updating mon count to {0}".format(num))
def update_stateless_service(self, svc_type, spec):
# only nfs is currently supported
if svc_type != "nfs":
raise NotImplementedError(svc_type)
num = spec.count
return RookWriteCompletion(
lambda: self.rook_cluster.update_nfs_count(spec.name, num), None,
"Updating NFS server count in {0} to {1}".format(spec.name, num))
def create_osds(self, drive_group, all_hosts):
# type: (orchestrator.DriveGroupSpec, List[str]) -> RookWriteCompletion
assert len(drive_group.hosts(all_hosts)) == 1
targets = []
if drive_group.data_devices:
targets += drive_group.data_devices.paths
if drive_group.data_directories:
targets += drive_group.data_directories
if not self.rook_cluster.node_exists(drive_group.hosts(all_hosts)[0]):
raise RuntimeError("Node '{0}' is not in the Kubernetes "
"cluster".format(drive_group.hosts(all_hosts)))
# Validate whether cluster CRD can accept individual OSD
# creations (i.e. not useAllDevices)
if not self.rook_cluster.can_create_osd():
raise RuntimeError("Rook cluster configuration does not "
"support OSD creation.")
def execute():
return self.rook_cluster.add_osds(drive_group, all_hosts)
def is_complete():
# Find OSD pods on this host
pod_osd_ids = set()
pods = self._k8s.list_namespaced_pod(self._rook_env.namespace,
label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
field_selector="spec.nodeName={0}".format(
drive_group.hosts(all_hosts)[0]
)).items
for p in pods:
pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
found = []
osdmap = self.get("osd_map")
for osd in osdmap['osds']:
osd_id = osd['osd']
if osd_id not in pod_osd_ids:
continue
metadata = self.get_metadata('osd', "%s" % osd_id)
if metadata and metadata['devices'] in targets:
found.append(osd_id)
else:
self.log.info("ignoring osd {0} {1}".format(
osd_id, metadata['devices']
))
return found is not None
return RookWriteCompletion(execute, is_complete,
"Creating OSD on {0}:{1}".format(
drive_group.hosts(all_hosts)[0], targets
))
|