summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/tests/test_api_auditing.py
blob: d22410e847b81c9599f50551c8dc1df4cd203219 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import re
import json
import cherrypy
import mock

from .helper import ControllerTestCase
from ..controllers import RESTController, Controller
from ..tools import RequestLoggingTool
from .. import mgr


# pylint: disable=W0613
@Controller('/foo', secure=False)
class FooResource(RESTController):
    def create(self, password):
        pass

    def get(self, key):
        pass

    def delete(self, key):
        pass

    def set(self, key, password, secret_key=None):
        pass


class ApiAuditingTest(ControllerTestCase):
    settings = {}

    def __init__(self, *args, **kwargs):
        cherrypy.tools.request_logging = RequestLoggingTool()
        cherrypy.config.update({'tools.request_logging.on': True})
        super(ApiAuditingTest, self).__init__(*args, **kwargs)

    @classmethod
    def mock_set_config(cls, key, val):
        cls.settings[key] = val

    @classmethod
    def mock_get_config(cls, key, default=None):
        return cls.settings.get(key, default)

    @classmethod
    def setUpClass(cls):
        mgr.get_config.side_effect = cls.mock_get_config
        mgr.set_config.side_effect = cls.mock_set_config

    @classmethod
    def setup_server(cls):
        cls.setup_controllers([FooResource])

    def setUp(self):
        mgr.cluster_log = mock.Mock()
        mgr.set_config('AUDIT_API_ENABLED', True)
        mgr.set_config('AUDIT_API_LOG_PAYLOAD', True)

    def _validate_cluster_log_msg(self, path, method, user, params):
        channel, _, msg = mgr.cluster_log.call_args_list[0][0]
        self.assertEqual(channel, 'audit')
        pattern = r'^\[DASHBOARD\] from=\'(.+)\' path=\'(.+)\' ' \
                  'method=\'(.+)\' user=\'(.+)\' params=\'(.+)\'$'
        m = re.match(pattern, msg)
        self.assertEqual(m.group(2), path)
        self.assertEqual(m.group(3), method)
        self.assertEqual(m.group(4), user)
        self.assertDictEqual(json.loads(m.group(5)), params)

    def test_no_audit(self):
        mgr.set_config('AUDIT_API_ENABLED', False)
        self._delete('/foo/test1')
        mgr.cluster_log.assert_not_called()

    def test_no_payload(self):
        mgr.set_config('AUDIT_API_LOG_PAYLOAD', False)
        self._delete('/foo/test1')
        _, _, msg = mgr.cluster_log.call_args_list[0][0]
        self.assertNotIn('params=', msg)

    def test_no_audit_get(self):
        self._get('/foo/test1')
        mgr.cluster_log.assert_not_called()

    def test_audit_put(self):
        self._put('/foo/test1', {'password': 'y', 'secret_key': 1234})
        mgr.cluster_log.assert_called_once()
        self._validate_cluster_log_msg('/foo/test1', 'PUT', 'None',
                                       {'key': 'test1',
                                        'password': '***',
                                        'secret_key': '***'})

    def test_audit_post(self):
        with mock.patch('dashboard.services.auth.JwtManager.get_username',
                        return_value='hugo'):
            self._post('/foo?password=1234')
            mgr.cluster_log.assert_called_once()
            self._validate_cluster_log_msg('/foo', 'POST', 'hugo',
                                           {'password': '***'})

    def test_audit_delete(self):
        self._delete('/foo/test1')
        mgr.cluster_log.assert_called_once()
        self._validate_cluster_log_msg('/foo/test1', 'DELETE',
                                       'None', {'key': 'test1'})