summaryrefslogtreecommitdiffstats
path: root/src/ceph-node-proxy/ceph_node_proxy/reporter.py
blob: 4e9c4e2798b313496b3acd05b652dfdff755b9a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import time
import json
from ceph_node_proxy.util import get_logger, http_req, BaseThread
from urllib.error import HTTPError, URLError
from typing import Dict, Any


class Reporter(BaseThread):
    def __init__(self,
                 system: Any,
                 cephx: Dict[str, Any],
                 reporter_scheme: str = 'https',
                 reporter_hostname: str = '',
                 reporter_port: str = '443',
                 reporter_endpoint: str = '/node-proxy/data') -> None:
        super().__init__()
        self.system = system
        self.data: Dict[str, Any] = {}
        self.stop: bool = False
        self.cephx = cephx
        self.data['cephx'] = self.cephx['cephx']
        self.reporter_scheme: str = reporter_scheme
        self.reporter_hostname: str = reporter_hostname
        self.reporter_port: str = reporter_port
        self.reporter_endpoint: str = reporter_endpoint
        self.log = get_logger(__name__)
        self.reporter_url: str = (f'{reporter_scheme}://{reporter_hostname}:'
                                  f'{reporter_port}{reporter_endpoint}')
        self.log.info(f'Reporter url set to {self.reporter_url}')

    def main(self) -> None:
        while not self.stop:
            # Any logic to avoid sending the all the system
            # information every loop can go here. In a real
            # scenario probably we should just send the sub-parts
            # that have changed to minimize the traffic in
            # dense clusters
            self.log.debug('waiting for a lock in reporter loop.')
            with self.system.lock:
                if not self.system.pending_shutdown:
                    self.log.debug('lock acquired in reporter loop.')
                    if self.system.data_ready:
                        self.log.debug('data ready to be sent to the mgr.')
                        if not self.system.get_system() == self.system.previous_data:
                            self.log.info('data has changed since last iteration.')
                            self.data['patch'] = self.system.get_system()
                            try:
                                # TODO: add a timeout parameter to the reporter in the config file
                                self.log.info(f'sending data to {self.reporter_url}')
                                http_req(hostname=self.reporter_hostname,
                                         port=self.reporter_port,
                                         method='POST',
                                         headers={'Content-Type': 'application/json'},
                                         endpoint=self.reporter_endpoint,
                                         scheme=self.reporter_scheme,
                                         data=json.dumps(self.data))
                            except (HTTPError, URLError) as e:
                                self.log.error(f"The reporter couldn't send data to the mgr: {e}")
                                raise
                                # Need to add a new parameter 'max_retries' to the reporter if it can't
                                # send the data for more than x times, maybe the daemon should stop altogether
                            else:
                                self.system.previous_data = self.system.get_system()
                        else:
                            self.log.debug('no diff, not sending data to the mgr.')
                    time.sleep(5)
            self.log.debug('lock released in reporter loop.')
        self.log.debug('exiting reporter loop.')
        raise SystemExit(0)