summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/smb/rados_store.py
blob: 8896350ee412c4670fe73acd7136b578467d7dc8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
from typing import (
    TYPE_CHECKING,
    Callable,
    Collection,
    Iterator,
    Optional,
    Tuple,
)

import contextlib
import functools
import json
import logging
import time
import uuid

import rados

from .proto import EntryKey, Self, Simplified

if TYPE_CHECKING:  # pragma: no cover
    from mgr_module import MgrModule

_CHUNK_SIZE = 1024 * 1024
SMB_POOL = '.smb'

log = logging.getLogger(__name__)


class RADOSConfigEntry:
    """A store entry object for the RADOS pool based store."""

    def __init__(
        self, rados: rados.Rados, pool: str, ns: str, key: str
    ) -> None:
        self._rados = rados
        self._pool = pool
        self._ns = ns
        self._key = key
        self._ioctx = None

    @property
    def uri(self) -> str:
        """Returns an identifier for the entry within the store."""
        # The rados://<pool>/<ns>/<key> convention can be found elsewhere
        # in ceph. borrowed here for communicating resource keys to
        # other components.
        return f'rados://{self._pool}/{self._ns}/{self._key}'

    @property
    def full_key(self) -> EntryKey:
        """Return a namespaced key for the entry."""
        return (self._ns, self._key)

    def read(self) -> str:
        """Read a RAODS object."""
        log.debug('rados read of %s', self.full_key)
        with self._shared_ioctx() as ioctx:
            ioctx.set_namespace(self._ns)
            try:
                val = ioctx.read(self._key, _CHUNK_SIZE).decode()
            except rados.ObjectNotFound:
                val = ''
        log.debug('rados read result of %s = %r', self.full_key, val)
        return val

    def write(self, content: str) -> None:
        """Write a RADOS object."""
        log.debug('rados write to %s', self.full_key)
        data = content.encode('utf-8')
        assert len(data) < _CHUNK_SIZE
        with self._shared_ioctx() as ioctx:
            ioctx.set_namespace(self._ns)
            ioctx.write_full(self._key, data)

    def get(self) -> Simplified:
        """Get the deserialized store entry value."""
        if not self.exists():
            raise KeyError(self.full_key)
        data = self.read()
        if not data:
            # empty data is equivalent to object not existing.
            # this may occur if a lock is taken.
            raise KeyError(self.full_key)
        return json.loads(data)

    def set(self, obj: Simplified) -> None:
        """Set the store entry value to that of the serialized value of obj."""
        self.write(json.dumps(obj))

    def remove(self) -> bool:
        """Remove the current entry from the store."""
        log.debug('rados remove of %s', self.full_key)
        with self._shared_ioctx() as ioctx:
            ioctx.set_namespace(self._ns)
            try:
                ioctx.remove_object(self._key)
                removed = True
            except rados.ObjectNotFound:
                removed = False
        log.debug('rados remove result of %s = %r', self.full_key, removed)
        return removed

    def exists(self) -> bool:
        """Returns true if the entry currently exists within the store."""
        log.debug('rados exists of %s', self.full_key)
        try:
            with self._shared_ioctx() as ioctx:
                ioctx.set_namespace(self._ns)
                ioctx.stat(self._key)
            found = True
        except rados.ObjectNotFound:
            found = False
        log.debug('rados exists result of %s = %r', self.full_key, found)
        return found

    @contextlib.contextmanager
    def locked(self, name: str) -> Iterator[None]:
        """Place a rados lock on the object for the duration of the context
        manager. Requires a lock name.
        """
        with self._shared_ioctx() as ioctx:
            ioctx.set_namespace(self._ns)
            cookie = self._acquire_lock(ioctx, name)
            try:
                yield None
            finally:
                self._release_lock(ioctx, name, cookie)

    @contextlib.contextmanager
    def _shared_ioctx(self) -> Iterator[rados.Ioctx]:
        """Helper for returning a ioctx for nested operations."""
        if self._ioctx is not None:
            yield self._ioctx
            return
        with self._rados.open_ioctx(self._pool) as ioctx:
            self._ioctx = ioctx
            try:
                yield ioctx
            finally:
                self._ioctx = None

    def _acquire_lock(
        self,
        ioctx: rados.Ioctx,
        name: str,
        desc: str = 'rados_store',
        *,
        wait_sec: float = 0.25,
        max_wait: int = 30,
    ) -> str:
        """Acquire a rados lock."""
        cookie = f'mgr:smb:{uuid.uuid4()}'
        for _ in range(int(max_wait / wait_sec)):
            try:
                ioctx.lock_exclusive(
                    self._key, name, cookie, desc=desc, duration=None
                )
                return cookie
            except rados.ObjectBusy as err:
                log.debug("object busy: %r, %r, %r", self._key, name, cookie)
                time.sleep(wait_sec)
                last_err = err
        log.warning('failed to acquire lock in %ssec: %r', max_wait, last_err)
        raise last_err

    def _release_lock(
        self, ioctx: rados.Ioctx, name: str, cookie: str
    ) -> None:
        """Release a rados lock."""
        ioctx.unlock(self._key, name, cookie)


class RADOSConfigStore:
    """A config store that saves entries in a RADOS pool.

    N.B. The RADOS config store exposes a subset of the RADOS functionality
    to implement a simple key-value store. As the namespaced keys map directly
    to RADOS namespaces and object names this store is suitable for sharing
    configuration items with external components.
    """

    def __init__(
        self,
        rados: rados.Rados,
        pool: str = SMB_POOL,
        init_cb: Optional[Callable] = None,
    ) -> None:
        self._rados = rados
        self._pool = pool
        # An optional initialization callback. If set, the callback will be
        # called once before any get, set, or other data-access call.  This is
        # to support lazily setting up the pool when we start acessing the
        # store contents.
        self._init_cb = init_cb

    def _lazy_init(self) -> None:
        if self._init_cb:
            self._init_cb
            self._init_cb = None

    def __getitem__(self, key: EntryKey) -> RADOSConfigEntry:
        """Return an entry object given a namespaced entry key. This entry does
        not have to exist in the store.
        """
        self._lazy_init()
        ns, okey = key
        return RADOSConfigEntry(self._rados, self._pool, ns, okey)

    def remove(self, ns: EntryKey) -> bool:
        """Remove an entry from the store. Returns true if an entry was
        present.
        """
        self._lazy_init()
        return self[ns].remove()

    def namespaces(self) -> Collection[str]:
        """Return all namespaces currently in the store."""
        self._lazy_init()
        return {item[0] for item in self}

    def contents(self, ns: str) -> Collection[str]:
        """Return all subkeys currently in the namespace."""
        self._lazy_init()
        return [item[1] for item in self if ns == item[0]]

    def __iter__(self) -> Iterator[EntryKey]:
        """Iterate over all namespaced keys currently in the store."""
        self._lazy_init()
        out = []
        with self._rados.open_ioctx(self._pool) as ioctx:
            ioctx.set_namespace(rados.LIBRADOS_ALL_NSPACES)
            for obj in ioctx.list_objects():
                out.append((obj.nspace, obj.key))
        return iter(out)

    @classmethod
    def init(cls, mgr: 'MgrModule', pool: str = SMB_POOL) -> Self:
        """Return a new RADOSConfigStore using the specified pool. The pool
        will be immediately created if it doesn't already exist.
        """
        _init_pool(mgr, pool)
        return cls(mgr.rados, pool=pool)

    @classmethod
    def lazy_init(cls, mgr: 'MgrModule', pool: str = SMB_POOL) -> Self:
        """Return a new RADOSConfigStore using the specified pool. The pool
        will be created when other RADOSConfigStore methods are called if it
        doesn't already exist.
        """
        cb = functools.partial(_init_pool, mgr, pool)
        return cls(mgr.rados, pool=pool, init_cb=cb)


def _init_pool(mgr: 'MgrModule', pool: str) -> None:
    """Use mgr apis to initialize a new pool if it doesn't exist."""
    pools = mgr.get_osdmap().dump().get('pools', [])
    pool_names = {p['pool_name'] for p in pools}
    if pool in pool_names:
        return
    log.debug('rados pool %r not found, creating it', pool)
    mgr.check_mon_command(
        {
            'prefix': 'osd pool create',
            'pool': pool,
            'yes_i_really_mean_it': True,
        }
    )
    mgr.check_mon_command(
        {
            'prefix': 'osd pool application enable',
            'pool': pool,
            'app': 'smb',
        }
    )


def parse_uri(uri: str) -> Tuple[str, str, str]:
    """Parse a rados-like uri into pool, namespace, and object values.
    Namespace may be an empty string.
    """
    if uri.startswith('rados://'):
        parts = uri.removeprefix('rados://').split('/')
        if len(parts) == 3:
            return tuple(parts)  # type: ignore
        if len(parts) == 2:
            return parts[0], '', parts[1]
        raise ValueError('invalid rados uri: {uri!r}')
    elif uri.startswith('rados:'):
        raise ValueError('not a supported rados uri: {uri!r}')
    raise ValueError('not a rados uri: {uri!r}')