summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/diskprediction/module.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/diskprediction/module.py')
-rw-r--r--src/pybind/mgr/diskprediction/module.py402
1 files changed, 402 insertions, 0 deletions
diff --git a/src/pybind/mgr/diskprediction/module.py b/src/pybind/mgr/diskprediction/module.py
new file mode 100644
index 00000000000..4ae2a7976a1
--- /dev/null
+++ b/src/pybind/mgr/diskprediction/module.py
@@ -0,0 +1,402 @@
+"""
+A diskprediction module
+"""
+from __future__ import absolute_import
+
+from datetime import datetime
+import errno
+import json
+from mgr_module import MgrModule
+import os
+from threading import Event
+
+from .common import DP_MGR_STAT_ENABLED, DP_MGR_STAT_DISABLED
+from .task import MetricsRunner, PredictionRunner, SmartRunner
+
+
+DP_AGENTS = [MetricsRunner, SmartRunner, PredictionRunner]
+
+
+class Module(MgrModule):
+
+ OPTIONS = [
+ {
+ 'name': 'diskprediction_config_mode',
+ 'default': 'local'
+ },
+ {
+ 'name': 'diskprediction_server',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_port',
+ 'default': '31400'
+ },
+ {
+ 'name': 'diskprediction_user',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_password',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_upload_metrics_interval',
+ 'default': '600'
+ },
+ {
+ 'name': 'diskprediction_upload_smart_interval',
+ 'default': '43200'
+ },
+ {
+ 'name': 'diskprediction_retrieve_prediction_interval',
+ 'default': '43200'
+ },
+ {
+ 'name': 'diskprediction_cert_context',
+ 'default': ''
+ },
+ {
+ 'name': 'diskprediction_ssl_target_name_override',
+ 'default': 'api.diskprophet.com'
+ },
+ {
+ 'name': 'diskprediction_default_authority',
+ 'default': 'api.diskprophet.com'
+ }
+ ]
+
+ COMMANDS = [
+ {
+ 'cmd': 'device set-prediction-mode '
+ 'name=mode,type=CephString,req=true',
+ 'desc': 'config disk prediction mode [\"cloud\"|\"local\"]',
+ 'perm': 'rw'
+ },
+ {
+ 'cmd': 'device show-prediction-config',
+ 'desc': 'Prints diskprediction configuration',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'device set-cloud-prediction-config '
+ 'name=server,type=CephString,req=true '
+ 'name=user,type=CephString,req=true '
+ 'name=password,type=CephString,req=true '
+ 'name=certfile,type=CephString,req=true '
+ 'name=port,type=CephString,req=false ',
+ 'desc': 'Configure Disk Prediction service',
+ 'perm': 'rw'
+ },
+ {
+ 'cmd': 'device get-predicted-status '
+ 'name=dev_id,type=CephString,req=true',
+ 'desc': 'Get physical device predicted result',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'device debug metrics-forced',
+ 'desc': 'Run metrics agent forced',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'device debug prediction-forced',
+ 'desc': 'Run prediction agent forced',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'device debug smart-forced',
+ 'desc': 'Run smart agent forced',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'device predict-life-expectancy '
+ 'name=dev_id,type=CephString,req=true',
+ 'desc': 'Predict life expectancy with local predictor',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'diskprediction self-test',
+ 'desc': 'Prints hello world to mgr.x.log',
+ 'perm': 'r'
+ },
+ {
+ 'cmd': 'diskprediction status',
+ 'desc': 'Check diskprediction status',
+ 'perm': 'r'
+ }
+ ]
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.status = {'status': DP_MGR_STAT_DISABLED}
+ self.shutdown_event = Event()
+ self._agents = []
+ self._activated_cloud = False
+ self._activated_local = False
+ self._prediction_result = {}
+ self.config = dict()
+
+ @property
+ def config_keys(self):
+ return dict((o['name'], o.get('default', None)) for o in self.OPTIONS)
+
+ def set_config_option(self, option, value):
+ if option not in self.config_keys.keys():
+ raise RuntimeError('{0} is a unknown configuration '
+ 'option'.format(option))
+
+ if option in ['diskprediction_port',
+ 'diskprediction_upload_metrics_interval',
+ 'diskprediction_upload_smart_interval',
+ 'diskprediction_retrieve_prediction_interval']:
+ if not str(value).isdigit():
+ raise RuntimeError('invalid {} configured. Please specify '
+ 'a valid integer {}'.format(option, value))
+
+ self.log.debug('Setting in-memory config option %s to: %s', option,
+ value)
+ self.set_config(option, value)
+ self.config[option] = value
+
+ return True
+
+ def get_configuration(self, key):
+ return self.get_config(key, self.config_keys[key])
+
+ def _show_prediction_config(self, inbuf, cmd):
+ self.show_module_config()
+ return 0, json.dumps(self.config, indent=4), ''
+
+ def _set_prediction_mode(self, inbuf, cmd):
+ self.status = {}
+ str_mode = cmd.get('mode', 'cloud')
+ if str_mode.lower() not in ['cloud', 'local']:
+ return -errno.EINVAL, '', 'invalid configuration, enable=[cloud|local]'
+ try:
+ self.set_config('diskprediction_config_mode', str_mode)
+ for _agent in self._agents:
+ _agent.event.set()
+ return (0,
+ 'success to config disk prediction mode: %s'
+ % str_mode.lower(), 0)
+ except Exception as e:
+ return -errno.EINVAL, '', str(e)
+
+ def _self_test(self, inbuf, cmd):
+ from .test.test_agents import test_agents
+ test_agents(self)
+ return 0, 'self-test completed', ''
+
+ def _set_ssl_target_name(self, inbuf, cmd):
+ str_ssl_target = cmd.get('ssl_target_name', '')
+ try:
+ self.set_config('diskprediction_ssl_target_name_override', str_ssl_target)
+ return (0,
+ 'success to config ssl target name', 0)
+ except Exception as e:
+ return -errno.EINVAL, '', str(e)
+
+ def _set_ssl_default_authority(self, inbuf, cmd):
+ str_ssl_authority = cmd.get('ssl_authority', '')
+ try:
+ self.set_config('diskprediction_default_authority', str_ssl_authority)
+ return (0,
+ 'success to config ssl default authority', 0)
+ except Exception as e:
+ return -errno.EINVAL, '', str(e)
+
+ def _get_predicted_status(self, inbuf, cmd):
+ physical_data = dict()
+ try:
+ if not self._prediction_result:
+ for _agent in self._agents:
+ if isinstance(_agent, PredictionRunner):
+ _agent.event.set()
+ break
+ pre_data = self._prediction_result.get(cmd['dev_id'])
+ if pre_data:
+ p_data = pre_data.get('prediction', {})
+ if not p_data.get('predicted'):
+ predicted = ''
+ else:
+ predicted = datetime.fromtimestamp(int(
+ p_data.get('predicted')) / (1000 ** 3))
+ d_data = {
+ 'near_failure': p_data.get('near_failure'),
+ 'predicted': str(predicted),
+ 'serial_number': pre_data.get('serial_number'),
+ 'disk_wwn': pre_data.get('disk_wwn'),
+ 'attachment': p_data.get('disk_name', '')
+ }
+ physical_data[cmd['dev_id']] = d_data
+ msg = json.dumps(d_data, indent=4)
+ else:
+ msg = 'device %s predicted data not ready' % cmd['dev_id']
+ except Exception as e:
+ if str(e).find('No such file') >= 0:
+ msg = 'unable to get device {} predicted data'.format(
+ cmd['dev_id'])
+ else:
+ msg = 'unable to get osd {} predicted data, {}'.format(
+ cmd['dev_id'], str(e))
+ self.log.error(msg)
+ return -errno.EINVAL, '', msg
+ return 0, msg, ''
+
+ def _set_cloud_prediction_config(self, inbuf, cmd):
+ trusted_certs = ''
+ str_cert_path = cmd.get('certfile', '')
+ if os.path.exists(str_cert_path):
+ with open(str_cert_path, 'rb') as f:
+ trusted_certs = f.read()
+ self.set_config_option(
+ 'diskprediction_cert_context', trusted_certs)
+ for _agent in self._agents:
+ _agent.event.set()
+ self.set_config('diskprediction_server', cmd['server'])
+ self.set_config('diskprediction_user', cmd['user'])
+ self.set_config('diskprediction_password', cmd['password'])
+ if cmd.get('port'):
+ self.set_config('diskprediction_port', cmd['port'])
+ return 0, 'succeed to config cloud mode connection', ''
+ else:
+ return -errno.EINVAL, '', 'certification file not existed'
+
+ def _debug_prediction_forced(self, inbuf, cmd):
+ msg = ''
+ for _agent in self._agents:
+ if isinstance(_agent, PredictionRunner):
+ msg = 'run prediction agent successfully'
+ _agent.event.set()
+ return 0, msg, ''
+
+ def _debug_metrics_forced(self, inbuf, cmd):
+ msg = ''
+ for _agent in self._agents:
+ if isinstance(_agent, MetricsRunner):
+ msg = 'run metrics agent successfully'
+ _agent.event.set()
+ return 0, msg, ''
+
+ def _debug_smart_forced(self, inbuf, cmd):
+ msg = ' '
+ for _agent in self._agents:
+ if isinstance(_agent, SmartRunner):
+ msg = 'run smart agent successfully'
+ _agent.event.set()
+ return 0, msg, ''
+
+ def _status(self, inbuf, cmd):
+ return 0, json.dumps(self.status), ''
+
+ def _predict_life_expectancy(self, inbuf, cmd):
+ assert cmd['dev_id']
+ from .common.localpredictor import LocalPredictor, gen_configuration
+ conf = gen_configuration(mgr_inst=self)
+ obj_predictor = LocalPredictor(conf)
+ result = obj_predictor.query_info('', cmd['dev_id'], '')
+ if result.status_code == 200:
+ near_failure = result.json()['near_failure']
+ if near_failure.lower() == 'good':
+ return 0, '>6w', ''
+ elif near_failure.lower() == 'warning':
+ return 0, '>=2w and <=6w', ''
+ elif near_failure.lower() == 'bad':
+ return 0, '<2w', ''
+ else:
+ return 0, 'unknown', ''
+ else:
+ return -errno.ENAVAIL, '', result.content
+
+ def handle_command(self, inbuf, cmd):
+ for o_cmd in self.COMMANDS:
+ if cmd['prefix'] == o_cmd['cmd'][:len(cmd['prefix'])]:
+ fun_name = ''
+ avgs = o_cmd['cmd'].split(' ')
+ for avg in avgs:
+ if avg.lower() == 'diskprediction':
+ continue
+ if avg.lower() == 'device':
+ continue
+ if '=' in avg or ',' in avg or not avg:
+ continue
+ fun_name += '_%s' % avg.replace('-', '_')
+ if fun_name:
+ fun = getattr(
+ self, fun_name)
+ if fun:
+ return fun(inbuf, cmd)
+ return -errno.EINVAL, '', 'cmd not found'
+
+ def show_module_config(self):
+ self.fsid = self.get('mon_map')['fsid']
+ self.log.debug('Found Ceph fsid %s', self.fsid)
+
+ for key, default in self.config_keys.items():
+ self.set_config_option(key, self.get_config(key, default))
+
+ def serve(self):
+ self.log.info('Starting diskprediction module')
+ self.status = {'status': DP_MGR_STAT_ENABLED}
+
+ while True:
+ if self.get_configuration('diskprediction_config_mode').lower() == 'cloud':
+ enable_cloud = True
+ else:
+ enable_cloud = False
+ # Enable cloud mode prediction process
+ if enable_cloud and not self._activated_cloud:
+ if self._activated_local:
+ self.stop_disk_prediction()
+ self.start_cloud_disk_prediction()
+ # Enable local mode prediction process
+ elif not enable_cloud and not self._activated_local:
+ if self._activated_cloud:
+ self.stop_disk_prediction()
+ self.start_local_disk_prediction()
+
+ self.shutdown_event.wait(5)
+ if self.shutdown_event.is_set():
+ break
+ self.stop_disk_prediction()
+
+ def start_cloud_disk_prediction(self):
+ assert not self._activated_cloud
+ for dp_agent in DP_AGENTS:
+ obj_agent = dp_agent(self)
+ if obj_agent:
+ obj_agent.start()
+ else:
+ raise Exception('failed to start task %s' % obj_agent.task_name)
+ self._agents.append(obj_agent)
+ self._activated_cloud = True
+ self.log.info('start cloud disk prediction')
+
+ def start_local_disk_prediction(self):
+ assert not self._activated_local
+ for dp_agent in [PredictionRunner]:
+ obj_agent = dp_agent(self)
+ if obj_agent:
+ obj_agent.start()
+ else:
+ raise Exception('failed to start task %s' % obj_agent.task_name)
+ self._agents.append(obj_agent)
+ self._activated_local = True
+ self.log.info('start local model disk prediction')
+
+ def stop_disk_prediction(self):
+ assert self._activated_local or self._activated_cloud
+ self.status = {'status': DP_MGR_STAT_DISABLED}
+ while self._agents:
+ dp_agent = self._agents.pop()
+ dp_agent.terminate()
+ dp_agent.join(5)
+ del dp_agent
+ self._activated_local = False
+ self._activated_cloud = False
+ self.log.info('stop disk prediction')
+
+ def shutdown(self):
+ self.shutdown_event.set()
+ super(Module, self).shutdown()