1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_TEST_WATCH_NOTIFY_H
#define CEPH_TEST_WATCH_NOTIFY_H
#include "include/rados/librados.hpp"
#include "common/AsyncOpTracker.h"
#include "common/ceph_mutex.h"
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <list>
#include <map>
class Finisher;
namespace librados {
class TestCluster;
class TestRadosClient;
class TestWatchNotify : boost::noncopyable {
public:
typedef std::pair<uint64_t, uint64_t> WatcherID;
typedef std::set<WatcherID> WatcherIDs;
typedef std::map<std::pair<uint64_t, uint64_t>, bufferlist> NotifyResponses;
struct NotifyHandle {
TestRadosClient *rados_client = nullptr;
WatcherIDs pending_watcher_ids;
NotifyResponses notify_responses;
bufferlist *pbl = nullptr;
Context *on_notify = nullptr;
};
typedef boost::shared_ptr<NotifyHandle> SharedNotifyHandle;
typedef std::map<uint64_t, SharedNotifyHandle> NotifyHandles;
struct WatchHandle {
TestRadosClient *rados_client = nullptr;
std::string addr;
uint32_t nonce;
uint64_t gid;
uint64_t handle;
librados::WatchCtx* watch_ctx;
librados::WatchCtx2* watch_ctx2;
};
typedef std::map<uint64_t, WatchHandle> WatchHandles;
struct ObjectHandler;
typedef boost::shared_ptr<ObjectHandler> SharedObjectHandler;
struct Watcher {
Watcher(int64_t pool_id, const std::string& nspace, const std::string& oid)
: pool_id(pool_id), nspace(nspace), oid(oid) {
}
int64_t pool_id;
std::string nspace;
std::string oid;
SharedObjectHandler object_handler;
WatchHandles watch_handles;
NotifyHandles notify_handles;
};
typedef boost::shared_ptr<Watcher> SharedWatcher;
TestWatchNotify(TestCluster* test_cluster);
int list_watchers(int64_t pool_id, const std::string& nspace,
const std::string& o, std::list<obj_watch_t> *out_watchers);
void aio_flush(TestRadosClient *rados_client, Context *on_finish);
void aio_watch(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string& o, uint64_t gid,
uint64_t *handle, librados::WatchCtx *watch_ctx,
librados::WatchCtx2 *watch_ctx2, Context *on_finish);
void aio_unwatch(TestRadosClient *rados_client, uint64_t handle,
Context *on_finish);
void aio_notify(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string& oid,
const bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify);
void flush(TestRadosClient *rados_client);
int notify(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string& o, bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl);
void notify_ack(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string& o,
uint64_t notify_id, uint64_t handle, uint64_t gid,
bufferlist& bl);
int watch(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string& o, uint64_t gid,
uint64_t *handle, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2);
int unwatch(TestRadosClient *rados_client, uint64_t handle);
void blocklist(uint32_t nonce);
private:
typedef std::tuple<int64_t, std::string, std::string> PoolFile;
typedef std::map<PoolFile, SharedWatcher> FileWatchers;
TestCluster *m_test_cluster;
uint64_t m_handle = 0;
uint64_t m_notify_id = 0;
ceph::mutex m_lock =
ceph::make_mutex("librados::TestWatchNotify::m_lock");
AsyncOpTracker m_async_op_tracker;
FileWatchers m_file_watchers;
SharedWatcher get_watcher(int64_t pool_id, const std::string& nspace,
const std::string& oid);
void maybe_remove_watcher(SharedWatcher shared_watcher);
void execute_watch(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string& o,
uint64_t gid, uint64_t *handle,
librados::WatchCtx *watch_ctx,
librados::WatchCtx2 *watch_ctx2,
Context *on_finish);
void execute_unwatch(TestRadosClient *rados_client, uint64_t handle,
Context *on_finish);
void execute_notify(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string &oid,
const bufferlist &bl, bufferlist *pbl,
Context *on_notify);
void ack_notify(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string &oid,
uint64_t notify_id, const WatcherID &watcher_id,
const bufferlist &bl);
void finish_notify(TestRadosClient *rados_client, int64_t pool_id,
const std::string& nspace, const std::string &oid,
uint64_t notify_id);
void handle_object_removed(int64_t pool_id, const std::string& nspace,
const std::string& oid);
};
} // namespace librados
#endif // CEPH_TEST_WATCH_NOTIFY_H
|