1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef __CEPH_MONCLIENT_H
#define __CEPH_MONCLIENT_H
#include "msg/Dispatcher.h"
#include "msg/Messenger.h"
#include "MonMap.h"
#include "auth/ClientTicket.h"
#include "common/Timer.h"
#include "messages/MMonSubscribe.h"
class MonMap;
class MMonMap;
class MClientMountAck;
class MMonSubscribeAck;
class MonClient : public Dispatcher {
public:
MonMap monmap;
private:
Messenger *messenger;
entity_addr_t my_addr;
Context *mount_timeout_event;
Mutex monc_lock;
SafeTimer timer;
bool mounted;
int mounters;
Cond mount_cond, map_cond;
bool ms_dispatch(Message *m);
void handle_monmap(MMonMap *m);
void ms_handle_remote_reset(const entity_addr_t& peer);
protected:
class C_MountTimeout : public Context {
MonClient *client;
double timeout;
public:
C_MountTimeout(MonClient *c, double to) : client(c), timeout(to) { }
void finish(int r) {
if (r >= 0) client->_mount_timeout(timeout);
}
};
void _try_mount(double timeout);
void _mount_timeout(double timeout);
void handle_mount_ack(MClientMountAck* m);
// mon subscriptions
private:
map<nstring,MMonSubscribe::sub_rec> sub_have; // my subs, and current versions
utime_t sub_renew_sent, sub_renew_after;
public:
void renew_subs();
void sub_want(nstring what, version_t have) {
sub_have[what].have = have;
sub_have[what].onetime = false;
}
void sub_want_onetime(nstring what, version_t have) {
sub_have[what].have = have;
sub_have[what].onetime = true;
}
void sub_got(nstring what, version_t have) {
if (sub_have.count(what)) {
if (sub_have[what].onetime)
sub_have.erase(what);
else
sub_have[what].have = have;
}
}
void handle_subscribe_ack(MMonSubscribeAck* m);
public:
MonClient() : messenger(NULL),
mount_timeout_event(NULL),
monc_lock("MonClient::monc_lock"),
timer(monc_lock) {
mounted = false;
mounters = 0;
}
int build_initial_monmap();
int get_monmap();
int mount(double mount_timeout);
void tick();
void send_mon_message(Message *m, bool new_mon=false);
void note_mon_leader(int m) {
monmap.last_mon = m;
}
void pick_new_mon();
entity_addr_t get_my_addr() { return my_addr; }
const ceph_fsid_t& get_fsid() {
return monmap.fsid;
}
entity_addr_t get_mon_addr(unsigned i) {
Mutex::Locker l(monc_lock);
if (i < monmap.size())
return monmap.mon_inst[i].addr;
return entity_addr_t();
}
entity_inst_t get_mon_inst(unsigned i) {
Mutex::Locker l(monc_lock);
if (i < monmap.size())
return monmap.mon_inst[i];
return entity_inst_t();
}
int get_num_mon() {
Mutex::Locker l(monc_lock);
return monmap.size();
}
void set_messenger(Messenger *m) { messenger = m; }
};
#endif
|