summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/dashboard/services/auth/oauth2.py
blob: 5376107667e0040fa57b69f464b00c6f1adef8d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import json
from typing import Dict, List
from urllib.parse import quote

import cherrypy
import requests

from ... import mgr
from ...services.auth import BaseAuth, SSOAuth, decode_jwt_segment
from ...tools import prepare_url_prefix
from ..access_control import Role, User, UserAlreadyExists


class OAuth2(SSOAuth):
    LOGIN_URL = 'auth/oauth2/login'
    LOGOUT_URL = 'auth/oauth2/logout'
    sso = True

    class OAuth2Config(BaseAuth.Config):
        pass

    @staticmethod
    def enabled():
        return mgr.get_module_option('sso_oauth2')

    def to_dict(self) -> 'BaseAuth.Config':
        return self.OAuth2Config()

    @classmethod
    def from_dict(cls, s_dict: OAuth2Config) -> 'OAuth2':
        # pylint: disable=unused-argument
        return OAuth2()

    @classmethod
    def get_auth_name(cls):
        return cls.__name__.lower()

    @classmethod
    # pylint: disable=protected-access
    def get_token(cls, request: cherrypy._ThreadLocalProxy) -> str:
        try:
            return request.cookie['token'].value
        except KeyError:
            return request.headers.get('X-Access-Token')

    @classmethod
    def set_token(cls, token: str):
        cherrypy.request.jwt = token
        cherrypy.request.jwt_payload = cls.get_token_payload()
        cherrypy.request.user = cls.get_user(token)

    @classmethod
    def get_token_payload(cls) -> Dict:
        try:
            return cherrypy.request.jwt_payload
        except AttributeError:
            pass
        try:
            return decode_jwt_segment(cherrypy.request.jwt.split(".")[1])
        except AttributeError:
            return {}

    @classmethod
    def set_token_payload(cls, token):
        cherrypy.request.jwt_payload = decode_jwt_segment(token.split(".")[1])

    @classmethod
    def get_user_roles(cls):
        roles: List[Role] = []
        user_roles: List[Role] = []
        try:
            jwt_payload = cherrypy.request.jwt_payload
        except AttributeError:
            raise cherrypy.HTTPError()

        # check for client roes
        if 'resource_access' in jwt_payload:
            # Find the first value where the key is not 'account'
            roles = next((value['roles'] for key, value in jwt_payload['resource_access'].items()
                          if key != "account"), user_roles)
        # check for global roles
        elif 'realm_access' in jwt_payload:
            roles = next((value['roles'] for _, value in jwt_payload['realm_access'].items()),
                         user_roles)
        else:
            raise cherrypy.HTTPError()
        user_roles = Role.map_to_system_roles(roles)
        return user_roles

    @classmethod
    def get_user(cls, token: str) -> User:
        try:
            return cherrypy.request.user
        except AttributeError:
            cls.set_token_payload(token)
            cls._create_user()
        return cherrypy.request.user

    @classmethod
    def _create_user(cls):
        try:
            jwt_payload = cherrypy.request.jwt_payload
        except AttributeError:
            raise cherrypy.HTTPError()
        try:
            user = mgr.ACCESS_CTRL_DB.create_user(
                jwt_payload['sub'], None, jwt_payload['name'], jwt_payload['email'])
        except UserAlreadyExists:
            user = mgr.ACCESS_CTRL_DB.get_user(jwt_payload['sub'])
        user.set_roles(cls.get_user_roles())
        # set user last update to token time issued
        user.last_update = jwt_payload['iat']
        cherrypy.request.user = user

    @classmethod
    def reset_user(cls):
        try:
            mgr.ACCESS_CTRL_DB.delete_user(cherrypy.request.user.username)
            cherrypy.request.user = None
        except AttributeError:
            raise cherrypy.HTTPError()

    @classmethod
    def get_token_iss(cls, token=''):
        if token:
            cls.set_token_payload(token)
        return cls.get_token_payload()['iss']

    @classmethod
    def get_openid_config(cls, iss):
        msg = 'Failed to logout: could not contact IDP'
        try:
            response = requests.get(f'{iss}/.well-known/openid-configuration')
        except requests.exceptions.RequestException:
            raise cherrypy.HTTPError(500, message=msg)
        if response.status_code != 200:
            raise cherrypy.HTTPError(500, message=msg)
        return json.loads(response.text)

    @classmethod
    def get_login_redirect_url(cls, token) -> str:
        url_prefix = prepare_url_prefix(mgr.get_module_option('url_prefix', default=''))
        return f"{url_prefix}/#/login?access_token={token}"

    @classmethod
    def get_logout_redirect_url(cls, token) -> str:
        openid_config = OAuth2.get_openid_config(OAuth2.get_token_iss(token))
        end_session_url = openid_config.get('end_session_endpoint')
        encoded_end_session_url = quote(end_session_url, safe="")
        url_prefix = prepare_url_prefix(mgr.get_module_option('url_prefix', default=''))
        return f'{url_prefix}/oauth2/sign_out?rd={encoded_end_session_url}'