1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_JOURNAL_OBJECT_PLAYER_H
#define CEPH_JOURNAL_OBJECT_PLAYER_H
#include "include/Context.h"
#include "include/interval_set.h"
#include "include/rados/librados.hpp"
#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RefCountedObj.h"
#include "journal/Entry.h"
#include <list>
#include <string>
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <boost/unordered_map.hpp>
#include "include/assert.h"
class SafeTimer;
namespace journal {
class ObjectPlayer;
typedef boost::intrusive_ptr<ObjectPlayer> ObjectPlayerPtr;
class ObjectPlayer : public RefCountedObject {
public:
typedef std::list<Entry> Entries;
typedef interval_set<uint64_t> InvalidRanges;
enum RefetchState {
REFETCH_STATE_NONE,
REFETCH_STATE_REQUIRED,
REFETCH_STATE_IMMEDIATE
};
ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
uint64_t object_num, SafeTimer &timer, Mutex &timer_lock,
uint8_t order, uint64_t max_fetch_bytes);
~ObjectPlayer() override;
inline const std::string &get_oid() const {
return m_oid;
}
inline uint64_t get_object_number() const {
return m_object_num;
}
void fetch(Context *on_finish);
void watch(Context *on_fetch, double interval);
void unwatch();
void front(Entry *entry) const;
void pop_front();
inline bool empty() const {
Mutex::Locker locker(m_lock);
return m_entries.empty();
}
inline void get_entries(Entries *entries) {
Mutex::Locker locker(m_lock);
*entries = m_entries;
}
inline void get_invalid_ranges(InvalidRanges *invalid_ranges) {
Mutex::Locker locker(m_lock);
*invalid_ranges = m_invalid_ranges;
}
inline bool refetch_required() const {
return (get_refetch_state() != REFETCH_STATE_NONE);
}
inline RefetchState get_refetch_state() const {
return m_refetch_state;
}
inline void set_refetch_state(RefetchState refetch_state) {
m_refetch_state = refetch_state;
}
private:
typedef std::pair<uint64_t, uint64_t> EntryKey;
typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
struct C_Fetch : public Context {
ObjectPlayerPtr object_player;
Context *on_finish;
bufferlist read_bl;
C_Fetch(ObjectPlayer *o, Context *ctx) : object_player(o), on_finish(ctx) {
}
void finish(int r) override;
};
struct C_WatchFetch : public Context {
ObjectPlayerPtr object_player;
C_WatchFetch(ObjectPlayer *o) : object_player(o) {
}
void finish(int r) override;
};
librados::IoCtx m_ioctx;
uint64_t m_object_num;
std::string m_oid;
CephContext *m_cct;
SafeTimer &m_timer;
Mutex &m_timer_lock;
uint8_t m_order;
uint64_t m_max_fetch_bytes;
double m_watch_interval;
Context *m_watch_task;
mutable Mutex m_lock;
bool m_fetch_in_progress;
bufferlist m_read_bl;
uint32_t m_read_off = 0;
uint32_t m_read_bl_off = 0;
Entries m_entries;
EntryKeys m_entry_keys;
InvalidRanges m_invalid_ranges;
Context *m_watch_ctx = nullptr;
bool m_unwatched = false;
RefetchState m_refetch_state = REFETCH_STATE_IMMEDIATE;
int handle_fetch_complete(int r, const bufferlist &bl, bool *refetch);
void clear_invalid_range(uint32_t off, uint32_t len);
void schedule_watch();
bool cancel_watch();
void handle_watch_task();
void handle_watch_fetched(int r);
};
} // namespace journal
#endif // CEPH_JOURNAL_OBJECT_PLAYER_H
|