summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJohn Spray <john.spray@redhat.com>2016-07-15 01:39:22 +0200
committerJohn Spray <john.spray@redhat.com>2016-09-29 18:26:58 +0200
commit7845f8d757adf63390d6cb0ff56afbc088bc508b (patch)
tree72043a3958d542e202a4b945e67c2a4b79f07807
parentpybind/mgr: implement shutdown() in rest.py (diff)
downloadceph-7845f8d757adf63390d6cb0ff56afbc088bc508b.tar.xz
ceph-7845f8d757adf63390d6cb0ff56afbc088bc508b.zip
mgr: flesh out standby/HA
Signed-off-by: John Spray <john.spray@redhat.com>
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/ceph_mgr.cc14
-rw-r--r--src/common/config_opts.h2
-rw-r--r--src/messages/MMgrBeacon.h21
-rw-r--r--src/mgr/Mgr.cc146
-rw-r--r--src/mgr/Mgr.h30
-rw-r--r--src/mgr/MgrClient.cc83
-rw-r--r--src/mgr/MgrClient.h3
-rw-r--r--src/mgr/MgrStandby.cc230
-rw-r--r--src/mgr/MgrStandby.h60
-rw-r--r--src/mgr/PyModules.cc12
-rw-r--r--src/mon/MgrMap.h97
-rw-r--r--src/mon/MgrMonitor.cc282
-rw-r--r--src/mon/MgrMonitor.h32
-rw-r--r--src/mon/MonCommands.h7
-rw-r--r--src/mon/Monitor.cc14
16 files changed, 840 insertions, 194 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5a73b26053f..92a19e6295d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -524,6 +524,7 @@ if (WITH_MGR)
mgr/PyFormatter.cc
mgr/PyState.cc
mgr/MgrPyModule.cc
+ mgr/MgrStandby.cc
mgr/Mgr.cc)
add_executable(ceph-mgr ${mgr_srcs}
$<TARGET_OBJECTS:heap_profiler_objs>)
diff --git a/src/ceph_mgr.cc b/src/ceph_mgr.cc
index 4b4c62a61b4..c3d032c1326 100644
--- a/src/ceph_mgr.cc
+++ b/src/ceph_mgr.cc
@@ -14,7 +14,6 @@
*
*/
-#include "mgr/Mgr.h"
#include "include/types.h"
#include "common/config.h"
@@ -22,9 +21,13 @@
#include "common/errno.h"
#include "global/global_init.h"
+#include "mgr/MgrStandby.h"
-
+/**
+ * A short main() which just instantiates a MgrStandby and
+ * hands over control to that.
+ */
int main(int argc, const char **argv)
{
vector<const char*> args;
@@ -33,13 +36,12 @@ int main(int argc, const char **argv)
global_init(NULL, args, CEPH_ENTITY_TYPE_MGR, CODE_ENVIRONMENT_DAEMON, 0,
"mgr_data");
- common_init_finish(g_ceph_context);
// For consumption by KeyRing::from_ceph_context in MonClient
g_conf->set_val("keyring", "$mgr_data/keyring", false);
- Mgr mgr;
+ MgrStandby mgr;
- // Handle --help before calling init() so we don't depend on network.
+ // Handle --help
if ((args.size() == 1 && (std::string(args[0]) == "--help" || std::string(args[0]) == "-h"))) {
mgr.usage();
return 0;
@@ -49,14 +51,12 @@ int main(int argc, const char **argv)
global_init_chdir(g_ceph_context);
common_init_finish(g_ceph_context);
- // Connect to mon cluster, download MDS map etc
int rc = mgr.init();
if (rc != 0) {
std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl;
return rc;
}
- // Finally, execute the user's commands
return mgr.main(args);
}
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 4f625db8174..ed77cb09374 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -1504,6 +1504,8 @@ OPTION(rgw_swift_versioning_enabled, OPT_BOOL, false) // whether swift object ve
OPTION(mgr_module_path, OPT_STR, CEPH_PKGLIBDIR "/mgr") // where to load python modules from
OPTION(mgr_modules, OPT_STR, "rest") // Which modules to load
OPTION(mgr_data, OPT_STR, "/var/lib/ceph/mgr/$cluster-$id") // where to find keyring etc
+OPTION(mgr_beacon_period, OPT_INT, 5) // How frequently to send beacon
+OPTION(mon_mgr_beacon_grace, OPT_INT, 30) // How long to wait to failover
OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entries to read from rados for bucket listing
diff --git a/src/messages/MMgrBeacon.h b/src/messages/MMgrBeacon.h
index e38227e2421..b452a9dda77 100644
--- a/src/messages/MMgrBeacon.h
+++ b/src/messages/MMgrBeacon.h
@@ -28,21 +28,27 @@ class MMgrBeacon : public PaxosServiceMessage {
protected:
uint64_t gid;
entity_addr_t server_addr;
+ bool available;
+ std::string name;
public:
MMgrBeacon()
- : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION)
+ : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION),
+ gid(0), available(false)
{
}
- MMgrBeacon(uint64_t gid_, entity_addr_t server_addr_)
+ MMgrBeacon(uint64_t gid_, const std::string &name_,
+ entity_addr_t server_addr_, bool available_)
: PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION),
- gid(gid_), server_addr(server_addr_)
+ gid(gid_), server_addr(server_addr_), available(available_), name(name_)
{
}
uint64_t get_gid() const { return gid; }
entity_addr_t get_server_addr() const { return server_addr; }
+ bool get_available() const { return available; }
+ const std::string& get_name() const { return name; }
private:
~MMgrBeacon() {}
@@ -52,17 +58,24 @@ public:
const char *get_type_name() const { return "mgrbeacon"; }
void print(ostream& out) const {
- out << get_type_name() << "(" << gid << ", " << server_addr << ")";
+ out << get_type_name() << " mgr." << name << "(" << gid << ", "
+ << server_addr << ", " << available << ")";
}
void encode_payload(uint64_t features) {
paxos_encode();
::encode(server_addr, payload, features);
+ ::encode(gid, payload);
+ ::encode(available, payload);
+ ::encode(name, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
paxos_decode(p);
::decode(server_addr, p);
+ ::decode(gid, p);
+ ::decode(available, p);
+ ::decode(name, p);
}
};
diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc
index 75197999368..bcc27855296 100644
--- a/src/mgr/Mgr.cc
+++ b/src/mgr/Mgr.cc
@@ -33,23 +33,21 @@
#define dout_prefix *_dout << "mgr " << __func__ << " "
-Mgr::Mgr() :
- Dispatcher(g_ceph_context),
+Mgr::Mgr(MonClient *monc_, Messenger *clientm_) :
+ monc(monc_),
objecter(NULL),
- monc(new MonClient(g_ceph_context)),
+ client_messenger(clientm_),
lock("Mgr::lock"),
timer(g_ceph_context, lock),
finisher(g_ceph_context, "Mgr", "mgr-fin"),
waiting_for_fs_map(NULL),
py_modules(daemon_state, cluster_state, *monc, finisher),
cluster_state(monc, nullptr),
- server(monc, daemon_state, py_modules)
+ server(monc, daemon_state, py_modules),
+ initialized(false),
+ initializing(false)
{
- client_messenger = Messenger::create_client_messenger(g_ceph_context, "mds");
-
- // FIXME: using objecter as convenience to handle incremental
- // OSD maps, but that's overkill. We don't really need an objecter.
- // Could we separate out the part of Objecter that we really need?
+ // Using Objecter to handle incremental decode of OSDMap
objecter = new Objecter(g_ceph_context, client_messenger, monc, NULL, 0, 0);
cluster_state.set_objecter(objecter);
@@ -59,9 +57,7 @@ Mgr::Mgr() :
Mgr::~Mgr()
{
delete objecter;
- delete monc;
- delete client_messenger;
- assert(waiting_for_fs_map == NULL);
+ assert(waiting_for_fs_map == nullptr);
}
@@ -123,58 +119,35 @@ public:
};
-
-int Mgr::init()
+void Mgr::background_init()
{
Mutex::Locker l(lock);
+ assert(!initializing);
+ assert(!initialized);
+ initializing = true;
- // Initialize Messenger
- int r = client_messenger->bind(g_conf->public_addr);
- if (r < 0)
- return r;
+ finisher.start();
- client_messenger->start();
+ finisher.queue(new C_StdFunction([this](){
+ init();
+ }));
+}
+
+void Mgr::init()
+{
+ Mutex::Locker l(lock);
+ assert(initializing);
+ assert(!initialized);
objecter->set_client_incarnation(0);
objecter->init();
- // Connect dispatchers before starting objecter
- client_messenger->add_dispatcher_tail(objecter);
- client_messenger->add_dispatcher_tail(this);
-
- // Initialize MonClient
- if (monc->build_initial_monmap() < 0) {
- objecter->shutdown();
- client_messenger->shutdown();
- client_messenger->wait();
- return -1;
- }
-
- monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
- |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
- monc->set_messenger(client_messenger);
- monc->init();
- r = monc->authenticate();
- if (r < 0) {
- derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
- monc->shutdown();
- objecter->shutdown();
- client_messenger->shutdown();
- client_messenger->wait();
- return r;
- }
-
- client_t whoami = monc->get_global_id();
- client_messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+ // Dispatcher before starting objecter
+ client_messenger->add_dispatcher_head(objecter);
// Start communicating with daemons to learn statistics etc
server.init(monc->get_global_id(), client_messenger->get_myaddr());
-
dout(4) << "Initialized server at " << server.get_myaddr() << dendl;
- // TODO: send the beacon periodically
- MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(),
- server.get_myaddr());
- monc->send_mon_message(m);
// Preload all daemon metadata (will subsequently keep this
// up to date by watching maps, so do the initial load before
@@ -189,8 +162,9 @@ int Mgr::init()
// Start Objecter and wait for OSD map
objecter->start();
+ lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch
objecter->wait_for_osd_map();
- timer.init();
+ lock.Lock();
monc->sub_want("mgrdigest", 0, 0);
@@ -212,12 +186,14 @@ int Mgr::init()
// Wait for MgrDigest...?
// TODO
- finisher.start();
+ // assume finisher already initialized in background_init
py_modules.init();
+ py_modules.start();
dout(4) << "Complete." << dendl;
- return 0;
+ initializing = false;
+ initialized = true;
}
void Mgr::load_all_metadata()
@@ -342,15 +318,12 @@ void Mgr::load_config()
py_modules.insert_config(loaded);
}
-void Mgr::handle_signal(int signum)
+void Mgr::shutdown()
{
+ // FIXME: pre-empt init() if it is currently running, so that it will
+ // give up the lock for us.
Mutex::Locker l(lock);
- assert(signum == SIGINT || signum == SIGTERM);
- shutdown();
-}
-void Mgr::shutdown()
-{
// First stop the server so that we're not taking any more incoming requests
server.shutdown();
@@ -358,13 +331,7 @@ void Mgr::shutdown()
// to touch references to the things we're about to tear down
finisher.stop();
- //lock.Lock();
- timer.shutdown();
objecter->shutdown();
- //lock.Unlock();
-
- monc->shutdown();
- client_messenger->shutdown();
}
void Mgr::handle_osd_map()
@@ -545,51 +512,6 @@ void Mgr::handle_fs_map(MFSMap* m)
}
-bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
- bool force_new)
-{
- if (dest_type == CEPH_ENTITY_TYPE_MON)
- return true;
-
- if (force_new) {
- if (monc->wait_auth_rotating(10) < 0)
- return false;
- }
-
- *authorizer = monc->auth->build_authorizer(dest_type);
- return *authorizer != NULL;
-}
-
-// A reference for use by the signal handler
-Mgr *signal_mgr = nullptr;
-
-static void handle_mgr_signal(int signum)
-{
- if (signal_mgr) {
- signal_mgr->handle_signal(signum);
- }
-}
-
-int Mgr::main(vector<const char *> args)
-{
- py_modules.start();
-
- // Enable signal handlers
- signal_mgr = this;
- init_async_signal_handler();
- register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
- register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
-
- client_messenger->wait();
-
- // Disable signal handlers
- unregister_async_signal_handler(SIGINT, handle_mgr_signal);
- unregister_async_signal_handler(SIGTERM, handle_mgr_signal);
- shutdown_async_signal_handler();
- signal_mgr = nullptr;
-}
-
-
void Mgr::handle_mgr_digest(MMgrDigest* m)
{
dout(10) << m->mon_status_json.length() << dendl;
diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h
index e85358f06b7..825edf15333 100644
--- a/src/mgr/Mgr.h
+++ b/src/mgr/Mgr.h
@@ -11,8 +11,8 @@
* Foundation. See file COPYING.
*/
-#ifndef CEPH_PYFOO_H_
-#define CEPH_PYFOO_H_
+#ifndef CEPH_MGR_H_
+#define CEPH_MGR_H_
// Python.h comes first because otherwise it clobbers ceph's assert
#include "Python.h"
@@ -26,7 +26,6 @@
#include "osdc/Objecter.h"
#include "mds/FSMap.h"
#include "messages/MFSMap.h"
-#include "msg/Dispatcher.h"
#include "msg/Messenger.h"
#include "auth/Auth.h"
#include "common/Finisher.h"
@@ -44,10 +43,10 @@ class MMgrDigest;
class MgrPyModule;
-class Mgr : public Dispatcher {
+class Mgr {
protected:
- Objecter *objecter;
MonClient *monc;
+ Objecter *objecter;
Messenger *client_messenger;
Mutex lock;
@@ -64,24 +63,27 @@ protected:
void load_config();
void load_all_metadata();
+ void init();
+
+ bool initialized;
+ bool initializing;
public:
- Mgr();
+ Mgr(MonClient *monc_, Messenger *clientm_);
~Mgr();
+ bool is_initialized() const {return initialized;}
+ entity_addr_t get_server_addr() const { return server.get_myaddr(); }
+
void handle_mgr_digest(MMgrDigest* m);
void handle_fs_map(MFSMap* m);
void handle_osd_map();
+
bool ms_dispatch(Message *m);
- bool ms_handle_reset(Connection *con) { return false; }
- void ms_handle_remote_reset(Connection *con) {}
- bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
- bool force_new);
- int init();
+
+ void background_init();
void shutdown();
- void usage() {}
int main(vector<const char *> args);
- void handle_signal(int signum);
};
-#endif /* MDS_UTILITY_H_ */
+#endif
diff --git a/src/mgr/MgrClient.cc b/src/mgr/MgrClient.cc
index 2fbbc8048d1..8af29a52222 100644
--- a/src/mgr/MgrClient.cc
+++ b/src/mgr/MgrClient.cc
@@ -44,14 +44,6 @@ void MgrClient::init()
assert(msgr != nullptr);
timer.init();
-
-#if 0
- if (map.epoch == 0) {
- ldout(cct, 4) << "no map yet, waiting..." << dendl;
- wait_on_list(waiting_for_map);
- }
- ldout(cct, 4) << "proceeding with map " << map.epoch << dendl;
-#endif
}
void MgrClient::shutdown()
@@ -98,38 +90,50 @@ bool MgrClient::handle_mgr_map(MMgrMap *m)
if (session == nullptr ||
session->con->get_peer_addr() != map.get_active_addr()) {
- entity_inst_t inst;
- inst.addr = map.get_active_addr();
- inst.name = entity_name_t::MGR(map.get_active_gid());
-
- delete session;
- session = new MgrSessionState();
- session->con = msgr->get_connection(inst);
-
- // Don't send an open if we're just a client (i.e. doing
- // command-sending, not stats etc)
- if (g_conf && !g_conf->name.is_client()) {
- auto open = new MMgrOpen();
- open->daemon_name = g_conf->name.get_id();
- session->con->send_message(open);
+ if (session) {
+ ldout(cct, 4) << "Terminating session with "
+ << session->con->get_peer_addr() << dendl;
+ delete session;
+ session = nullptr;
+
+ std::vector<ceph_tid_t> erase_cmds;
+ auto commands = command_table.get_commands();
+ for (const auto &i : commands) {
+ // FIXME be nicer, retarget command on new mgr?
+ if (i.second->on_finish != nullptr) {
+ i.second->on_finish->complete(-ETIMEDOUT);
+ }
+ erase_cmds.push_back(i.first);
+ }
+ for (const auto &tid : erase_cmds) {
+ command_table.erase(tid);
+ }
}
- std::vector<ceph_tid_t> erase_cmds;
- auto commands = command_table.get_commands();
- for (const auto &i : commands) {
- // FIXME be nicer, retarget command on new mgr?
- if (i.second->on_finish != nullptr) {
- i.second->on_finish->complete(-ETIMEDOUT);
+ if (map.get_available()) {
+ ldout(cct, 4) << "Starting new session with " << map.get_active_addr()
+ << dendl;
+ entity_inst_t inst;
+ inst.addr = map.get_active_addr();
+ inst.name = entity_name_t::MGR(map.get_active_gid());
+
+ session = new MgrSessionState();
+ session->con = msgr->get_connection(inst);
+
+ // Don't send an open if we're just a client (i.e. doing
+ // command-sending, not stats etc)
+ if (g_conf && !g_conf->name.is_client()) {
+ auto open = new MMgrOpen();
+ open->daemon_name = g_conf->name.get_id();
+ session->con->send_message(open);
}
- erase_cmds.push_back(i.first);
- }
- for (const auto &tid : erase_cmds) {
- command_table.erase(tid);
+
+ signal_cond_list(waiting_for_session);
+ } else {
+ ldout(cct, 4) << "No active mgr available yet" << dendl;
}
}
- signal_cond_list(waiting_for_map);
-
return true;
}
@@ -262,16 +266,13 @@ int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl,
ldout(cct, 20) << "cmd: " << cmd << dendl;
- assert(map.epoch > 0);
-
-
-
if (session == nullptr) {
- derr << "no session" << dendl;
- // FIXME: be nicer: maybe block until a mgr is available?
- return -ENOENT;
+ derr << "no session, waiting" << dendl;
+ wait_on_list(waiting_for_session);
}
+ assert(map.epoch > 0);
+
MgrCommand *op = command_table.start_command();
op->cmd = cmd;
op->inbl = inbl;
diff --git a/src/mgr/MgrClient.h b/src/mgr/MgrClient.h
index bcaf3de5ea5..92775d62e3b 100644
--- a/src/mgr/MgrClient.h
+++ b/src/mgr/MgrClient.h
@@ -63,7 +63,8 @@ protected:
void wait_on_list(list<Cond*>& ls);
void signal_cond_list(list<Cond*>& ls);
- list<Cond*> waiting_for_map;
+
+ list<Cond*> waiting_for_session;
public:
MgrClient(CephContext *cct_, Messenger *msgr_);
diff --git a/src/mgr/MgrStandby.cc b/src/mgr/MgrStandby.cc
new file mode 100644
index 00000000000..896fb166f74
--- /dev/null
+++ b/src/mgr/MgrStandby.cc
@@ -0,0 +1,230 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+#include "common/errno.h"
+#include "mon/MonClient.h"
+#include "include/stringify.h"
+#include "global/global_context.h"
+#include "global/signal_handler.h"
+
+#include "mgr/MgrContext.h"
+
+#include "messages/MMgrBeacon.h"
+#include "messages/MMgrMap.h"
+#include "Mgr.h"
+
+#include "MgrStandby.h"
+
+#define dout_subsys ceph_subsys_mgr
+#undef dout_prefix
+#define dout_prefix *_dout << "mgr " << __func__ << " "
+
+
+MgrStandby::MgrStandby() :
+ Dispatcher(g_ceph_context),
+ monc(new MonClient(g_ceph_context)),
+ lock("MgrStandby::lock"),
+ timer(g_ceph_context, lock),
+ active_mgr(nullptr)
+{
+ client_messenger = Messenger::create_client_messenger(g_ceph_context, "mgr");
+}
+
+
+MgrStandby::~MgrStandby()
+{
+ delete monc;
+ delete client_messenger;
+ delete active_mgr;
+}
+
+
+int MgrStandby::init()
+{
+ Mutex::Locker l(lock);
+
+ // Initialize Messenger
+ client_messenger->add_dispatcher_tail(this);
+ client_messenger->start();
+
+ // Initialize MonClient
+ if (monc->build_initial_monmap() < 0) {
+ client_messenger->shutdown();
+ client_messenger->wait();
+ return -1;
+ }
+
+ monc->sub_want("mgrmap", 0, 0);
+
+ monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD
+ |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR);
+ monc->set_messenger(client_messenger);
+ monc->init();
+ int r = monc->authenticate();
+ if (r < 0) {
+ derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl;
+ monc->shutdown();
+ client_messenger->shutdown();
+ client_messenger->wait();
+ return r;
+ }
+
+ client_t whoami = monc->get_global_id();
+ client_messenger->set_myname(entity_name_t::CLIENT(whoami.v));
+
+ timer.init();
+ send_beacon();
+
+ dout(4) << "Complete." << dendl;
+ return 0;
+}
+
+void MgrStandby::send_beacon()
+{
+ assert(lock.is_locked_by_me());
+ dout(1) << state_str() << dendl;
+ dout(10) << "sending beacon as gid " << monc->get_global_id() << dendl;
+
+ bool available = active_mgr != nullptr && active_mgr->is_initialized();
+ auto addr = available ? active_mgr->get_server_addr() : entity_addr_t();
+ MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(),
+ g_conf->name.get_id(),
+ addr,
+ available);
+
+ monc->send_mon_message(m);
+ // TODO configure period
+ timer.add_event_after(5, new C_StdFunction(
+ [this](){
+ send_beacon();
+ }
+ ));
+}
+
+void MgrStandby::handle_signal(int signum)
+{
+ Mutex::Locker l(lock);
+ assert(signum == SIGINT || signum == SIGTERM);
+ shutdown();
+}
+
+void MgrStandby::shutdown()
+{
+ // Expect already to be locked as we're called from signal handler
+ assert(lock.is_locked_by_me());
+
+ if (active_mgr) {
+ active_mgr->shutdown();
+ }
+
+ timer.shutdown();
+
+ monc->shutdown();
+ client_messenger->shutdown();
+}
+
+bool MgrStandby::ms_dispatch(Message *m)
+{
+ Mutex::Locker l(lock);
+ dout(4) << state_str() << " " << *m << dendl;
+
+ switch (m->get_type()) {
+ case MSG_MGR_MAP:
+ {
+ auto mmap = static_cast<MMgrMap*>(m);
+ auto map = mmap->get_map();
+ dout(4) << "received map epoch " << map.get_epoch() << dendl;
+ const bool active_in_map = map.active_gid == monc->get_global_id();
+ dout(4) << "active in map: " << active_in_map
+ << " active is " << map.active_gid << dendl;
+ if (active_in_map) {
+ if (active_mgr == nullptr) {
+ dout(1) << "Activating!" << dendl;
+ active_mgr = new Mgr(monc, client_messenger);
+ active_mgr->background_init();
+ dout(1) << "I am now active" << dendl;
+ } else {
+ dout(10) << "I was already active" << dendl;
+ }
+ } else {
+ if (active_mgr != nullptr) {
+ derr << "I was active but no longer am" << dendl;
+ shutdown();
+ // FIXME: should politely go back to standby (or respawn
+ // process) instead of stopping entirely.
+ }
+ }
+ }
+ break;
+
+ default:
+ if (active_mgr) {
+ return active_mgr->ms_dispatch(m);
+ } else {
+ return false;
+ }
+ }
+ return true;
+}
+
+
+bool MgrStandby::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+ bool force_new)
+{
+ if (dest_type == CEPH_ENTITY_TYPE_MON)
+ return true;
+
+ if (force_new) {
+ if (monc->wait_auth_rotating(10) < 0)
+ return false;
+ }
+
+ *authorizer = monc->auth->build_authorizer(dest_type);
+ return *authorizer != NULL;
+}
+
+// A reference for use by the signal handler
+MgrStandby *signal_mgr = nullptr;
+
+static void handle_mgr_signal(int signum)
+{
+ if (signal_mgr) {
+ signal_mgr->handle_signal(signum);
+ }
+}
+
+int MgrStandby::main(vector<const char *> args)
+{
+ // Enable signal handlers
+ signal_mgr = this;
+ init_async_signal_handler();
+ register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal);
+ register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal);
+
+ client_messenger->wait();
+
+ // Disable signal handlers
+ unregister_async_signal_handler(SIGINT, handle_mgr_signal);
+ unregister_async_signal_handler(SIGTERM, handle_mgr_signal);
+ shutdown_async_signal_handler();
+ signal_mgr = nullptr;
+
+ return 0;
+}
+
+
+std::string MgrStandby::state_str()
+{
+ return active_mgr == nullptr ? "standby" : "active";
+}
+
diff --git a/src/mgr/MgrStandby.h b/src/mgr/MgrStandby.h
new file mode 100644
index 00000000000..ccb44d74b71
--- /dev/null
+++ b/src/mgr/MgrStandby.h
@@ -0,0 +1,60 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 John Spray <john.spray@redhat.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ */
+
+
+#ifndef MGR_STANDBY_H_
+#define MGR_STANDBY_H_
+
+#include "auth/Auth.h"
+#include "common/Finisher.h"
+#include "common/Timer.h"
+
+#include "DaemonServer.h"
+#include "PyModules.h"
+
+#include "DaemonState.h"
+#include "ClusterState.h"
+
+class Mgr;
+
+class MgrStandby : public Dispatcher {
+protected:
+ MonClient *monc;
+ Messenger *client_messenger;
+
+ Mutex lock;
+ SafeTimer timer;
+
+ Mgr *active_mgr;
+
+ std::string state_str();
+
+public:
+ MgrStandby();
+ ~MgrStandby();
+
+ bool ms_dispatch(Message *m);
+ bool ms_handle_reset(Connection *con) { return false; }
+ void ms_handle_remote_reset(Connection *con) {}
+ bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
+ bool force_new);
+ int init();
+ void shutdown();
+ void usage() {}
+ int main(vector<const char *> args);
+ void handle_signal(int signum);
+ void send_beacon();
+};
+
+#endif
+
diff --git a/src/mgr/PyModules.cc b/src/mgr/PyModules.cc
index 40bc68f9cee..e20940c54a8 100644
--- a/src/mgr/PyModules.cc
+++ b/src/mgr/PyModules.cc
@@ -312,12 +312,12 @@ public:
void PyModules::start()
{
- {
- Mutex::Locker l(lock);
- for (auto &i : modules) {
- auto thread = new ServeThread(i.second);
- serve_threads[i.first] = thread;
- }
+ Mutex::Locker l(lock);
+
+ dout(1) << "Creating threads for " << modules.size() << " modules" << dendl;
+ for (auto &i : modules) {
+ auto thread = new ServeThread(i.second);
+ serve_threads[i.first] = thread;
}
for (auto &i : serve_threads) {
diff --git a/src/mon/MgrMap.h b/src/mon/MgrMap.h
index 9606de5cd54..909f40aa0df 100644
--- a/src/mon/MgrMap.h
+++ b/src/mon/MgrMap.h
@@ -14,19 +14,65 @@
#ifndef MGR_MAP_H_
#define MGR_MAP_H_
+#include <sstream>
+
#include "msg/msg_types.h"
+#include "common/Formatter.h"
#include "include/encoding.h"
+class StandbyInfo
+{
+public:
+ uint64_t gid;
+ std::string name;
+
+ StandbyInfo(uint64_t gid_, const std::string &name_)
+ : gid(gid_), name(name_)
+ {}
+
+ StandbyInfo()
+ : gid(0)
+ {}
+
+ void encode(bufferlist& bl) const
+ {
+ ENCODE_START(1, 1, bl);
+ ::encode(gid, bl);
+ ::encode(name, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& p)
+ {
+ DECODE_START(1, p);
+ ::decode(gid, p);
+ ::decode(name, p);
+ DECODE_FINISH(p);
+ }
+};
+WRITE_CLASS_ENCODER(StandbyInfo)
+
class MgrMap
{
public:
+ epoch_t epoch;
+
+ // global_id of the ceph-mgr instance selected as a leader
uint64_t active_gid;
+ // server address reported by the leader once it is active
entity_addr_t active_addr;
- epoch_t epoch;
+ // whether the nominated leader is active (i.e. has initialized its server)
+ bool available;
+ // the name (foo in mgr.<foo>) of the active daemon
+ std::string active_name;
+
+ std::map<uint64_t, StandbyInfo> standbys;
epoch_t get_epoch() const { return epoch; }
entity_addr_t get_active_addr() const { return active_addr; }
uint64_t get_active_gid() const { return active_gid; }
+ bool get_available() const { return available; }
+ const std::string &get_active_name() const { return active_name; }
void encode(bufferlist& bl, uint64_t features) const
{
@@ -34,6 +80,9 @@ public:
::encode(epoch, bl);
::encode(active_addr, bl, features);
::encode(active_gid, bl);
+ ::encode(available, bl);
+ ::encode(active_name, bl);
+ ::encode(standbys, bl);
ENCODE_FINISH(bl);
}
@@ -43,11 +92,55 @@ public:
::decode(epoch, p);
::decode(active_addr, p);
::decode(active_gid, p);
+ ::decode(available, p);
+ ::decode(active_name, p);
+ ::decode(standbys, p);
DECODE_FINISH(p);
}
+ void print_summary(Formatter *f, std::ostream *ss) const
+ {
+ // One or the other, not both
+ assert((ss != nullptr) != (f != nullptr));
+
+ if (f) {
+ f->dump_int("active_gid", get_active_gid());
+ f->dump_string("active_name", get_active_name());
+ } else {
+ if (get_active_gid() != 0) {
+ *ss << "active: " << get_active_name() << " ";
+ } else {
+ *ss << "no daemons active ";
+ }
+ }
+
+
+ if (f) {
+ f->open_array_section("standbys");
+ for (const auto &i : standbys) {
+ f->open_object_section("standby");
+ f->dump_int("gid", i.second.gid);
+ f->dump_string("name", i.second.name);
+ f->close_section();
+ }
+ f->close_section();
+ } else {
+ if (standbys.size()) {
+ *ss << "standbys: ";
+ bool first = true;
+ for (const auto &i : standbys) {
+ if (!first) {
+ *ss << ", ";
+ }
+ *ss << i.second.name;
+ first = false;
+ }
+ }
+ }
+ }
+
MgrMap()
- : epoch(0)
+ : epoch(0), available(false)
{}
};
diff --git a/src/mon/MgrMonitor.cc b/src/mon/MgrMonitor.cc
index 66e2dcbacae..4e34edfdce9 100644
--- a/src/mon/MgrMonitor.cc
+++ b/src/mon/MgrMonitor.cc
@@ -18,6 +18,7 @@
#include "PGMap.h"
#include "PGMonitor.h"
#include "include/stringify.h"
+#include "mgr/MgrContext.h"
#include "MgrMonitor.h"
@@ -71,6 +72,8 @@ bool MgrMonitor::preprocess_query(MonOpRequestRef op)
switch (m->get_type()) {
case MSG_MGR_BEACON:
return preprocess_beacon(op);
+ case MSG_MON_COMMAND:
+ return preprocess_command(op);
default:
mon->no_reply(op);
derr << "Unhandled message type " << m->get_type() << dendl;
@@ -84,6 +87,10 @@ bool MgrMonitor::prepare_update(MonOpRequestRef op)
switch (m->get_type()) {
case MSG_MGR_BEACON:
return prepare_beacon(op);
+
+ case MSG_MON_COMMAND:
+ return prepare_command(op);
+
default:
mon->no_reply(op);
derr << "Unhandled message type " << m->get_type() << dendl;
@@ -118,12 +125,70 @@ public:
bool MgrMonitor::prepare_beacon(MonOpRequestRef op)
{
MMgrBeacon *m = static_cast<MMgrBeacon*>(op->get_req());
+ dout(4) << "beacon from " << m->get_gid() << dendl;
+
+ // See if we are seeing same name, new GID for the active daemon
+ if (m->get_name() == pending_map.active_name
+ && m->get_gid() != pending_map.active_gid)
+ {
+ dout(4) << "Active daemon restart (mgr." << m->get_name() << ")" << dendl;
+ drop_active();
+ }
+
+ // See if we are seeing same name, new GID for any standbys
+ for (const auto &i : pending_map.standbys) {
+ const StandbyInfo &s = i.second;
+ if (s.name == m->get_name() && s.gid != m->get_gid()) {
+ dout(4) << "Standby daemon restart (mgr." << m->get_name() << ")" << dendl;
+ drop_standby(i.first);
+ break;
+ }
+ }
+
+ last_beacon[m->get_gid()] = ceph_clock_now(g_ceph_context);
+
+ // Track whether we modified pending_map
+ bool updated = false;
+
+ if (pending_map.active_gid == m->get_gid()) {
+ // A beacon from the currently active daemon
+ if (pending_map.active_addr != m->get_server_addr()) {
+ dout(4) << "learned address " << m->get_server_addr() << dendl;
+ pending_map.active_addr = m->get_server_addr();
+ updated = true;
+ }
+
+ if (pending_map.get_available() != m->get_available()) {
+ dout(4) << "available " << m->get_gid() << dendl;
+ pending_map.available = m->get_available();
+ updated = true;
+ }
+ } else if (pending_map.active_gid == 0) {
+ // There is no currently active daemon, select this one.
+ if (pending_map.standbys.count(m->get_gid())) {
+ drop_standby(m->get_gid());
+ }
+ pending_map.active_gid = m->get_gid();
+ pending_map.active_name = m->get_name();
- pending_map.active_gid = m->get_gid();
- pending_map.active_addr = m->get_server_addr();
+ dout(4) << "selecting new active in epoch " << pending_map.epoch << dendl;
+ wait_for_finished_proposal(op, new C_Updated(this, op));
+ } else {
+ if (pending_map.standbys.count(m->get_gid()) > 0) {
+ dout(10) << "from existing standby " << m->get_gid() << dendl;
+ } else {
+ dout(10) << "new standby " << m->get_gid() << dendl;
+ pending_map.standbys[m->get_gid()] = {m->get_gid(), m->get_name()};
+ updated = true;
+ }
+ }
- dout(4) << "proposing epoch " << pending_map.epoch << dendl;
- wait_for_finished_proposal(op, new C_Updated(this, op));
+ if (updated) {
+ dout(4) << "updating map" << dendl;
+ wait_for_finished_proposal(op, new C_Updated(this, op));
+ } else {
+ dout(10) << "no change" << dendl;
+ }
return true;
}
@@ -162,6 +227,8 @@ void MgrMonitor::check_sub(Subscription *sub)
*/
void MgrMonitor::send_digests()
{
+ digest_callback = nullptr;
+
const std::string type = "mgrdigest";
if (mon->session_map.subs.count(type) == 0)
return;
@@ -182,11 +249,214 @@ void MgrMonitor::send_digests()
sub->session->con->send_message(mdigest);
}
+
+ digest_callback = new C_StdFunction([this](){
+ send_digests();
+ });
+ mon->timer.add_event_after(5, digest_callback);
}
void MgrMonitor::tick()
{
- // TODO control frequency independently of the global tick frequency
- send_digests();
+ const utime_t now = ceph_clock_now(g_ceph_context);
+ utime_t cutoff = now;
+ cutoff -= g_conf->mon_mgr_beacon_grace;
+
+ // Populate any missing beacons (i.e. no beacon since MgrMonitor
+ // instantiation) with the current time, so that they will
+ // eventually look laggy if they fail to give us a beacon.
+ if (pending_map.active_gid != 0
+ && last_beacon.count(pending_map.active_gid) == 0) {
+ last_beacon[pending_map.active_gid] = now;
+ }
+ for (auto s : pending_map.standbys) {
+ if (last_beacon.count(s.first) == 0) {
+ last_beacon[s.first] = now;
+ }
+ }
+
+ // Cull standbys first so that any remaining standbys
+ // will be eligible to take over from the active if we cull him.
+ std::list<uint64_t> dead_standbys;
+ for (const auto &i : pending_map.standbys) {
+ auto last_beacon_time = last_beacon.at(i.first);
+ if (last_beacon_time < cutoff) {
+ dead_standbys.push_back(i.first);
+ }
+ }
+
+ bool propose = false;
+
+ for (auto i : dead_standbys) {
+ dout(4) << "Dropping laggy standby " << i << dendl;
+ drop_standby(i);
+ propose = true;
+ }
+
+ if (pending_map.active_gid != 0
+ && last_beacon.at(pending_map.active_gid) < cutoff) {
+
+ drop_active();
+ dout(4) << "Dropping active" << pending_map.active_gid << dendl;
+ if (promote_standby()) {
+ dout(4) << "Promoted standby " << pending_map.active_gid << dendl;
+ propose = true;
+ } else {
+ dout(4) << "Active is laggy but have no standbys to replace it" << dendl;
+ }
+ } else if (pending_map.active_gid == 0) {
+ if (promote_standby()) {
+ dout(4) << "Promoted standby " << pending_map.active_gid << dendl;
+ propose = true;
+ }
+ }
+
+ if (propose) {
+ propose_pending();
+ }
+}
+
+bool MgrMonitor::promote_standby()
+{
+ assert(pending_map.active_gid == 0);
+ if (pending_map.standbys.size()) {
+ // Promote a replacement (arbitrary choice of standby)
+ auto replacement_gid = pending_map.standbys.begin()->first;
+ pending_map.active_gid = replacement_gid;
+ pending_map.active_name = pending_map.standbys.at(replacement_gid).name;
+ pending_map.available = false;
+ pending_map.active_addr = entity_addr_t();
+
+ drop_standby(replacement_gid);
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void MgrMonitor::drop_active()
+{
+ if (last_beacon.count(pending_map.active_gid) > 0) {
+ last_beacon.erase(pending_map.active_gid);
+ }
+
+ pending_map.active_name = "";
+ pending_map.active_gid = 0;
+ pending_map.available = false;
+ pending_map.active_addr = entity_addr_t();
+}
+
+void MgrMonitor::drop_standby(uint64_t gid)
+{
+ pending_map.standbys.erase(gid);
+ if (last_beacon.count(gid) > 0) {
+ last_beacon.erase(gid);
+ }
+
+}
+
+bool MgrMonitor::preprocess_command(MonOpRequestRef op)
+{
+ return false;
+
+}
+
+bool MgrMonitor::prepare_command(MonOpRequestRef op)
+{
+ MMonCommand *m = static_cast<MMonCommand*>(op->get_req());
+
+ std::stringstream ss;
+ bufferlist rdata;
+
+ std::map<std::string, cmd_vartype> cmdmap;
+ if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
+ string rs = ss.str();
+ mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed());
+ return true;
+ }
+
+ MonSession *session = m->get_session();
+ if (!session) {
+ mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed());
+ return true;
+ }
+
+ string prefix;
+ cmd_getval(g_ceph_context, cmdmap, "prefix", prefix);
+
+ int r = 0;
+
+ if (prefix == "mgr fail") {
+ string who;
+ cmd_getval(g_ceph_context, cmdmap, "who", who);
+
+ std::string err;
+ uint64_t gid = strict_strtol(who.c_str(), 10, &err);
+ bool changed = false;
+ if (!err.empty()) {
+ // Does not parse as a gid, treat it as a name
+ if (pending_map.active_name == who) {
+ drop_active();
+ changed = true;
+ } else {
+ gid = 0;
+ for (const auto &i : pending_map.standbys) {
+ if (i.second.name == who) {
+ gid = i.first;
+ }
+ }
+ if (gid != 0) {
+ drop_standby(gid);
+ changed = true;
+ } else {
+ ss << "Daemon not found '" << who << "', already failed?";
+ }
+ }
+ } else {
+ if (pending_map.active_gid == gid) {
+ drop_active();
+ changed = true;
+ } else if (pending_map.standbys.count(gid) > 0) {
+ drop_standby(gid);
+ changed = true;
+ } else {
+ ss << "Daemon not found '" << gid << "', already failed?";
+ }
+ }
+
+ if (changed) {
+ tick();
+ }
+ } else {
+ r = -ENOSYS;
+ }
+
+ dout(4) << __func__ << " done, r=" << r << dendl;
+ /* Compose response */
+ string rs;
+ getline(ss, rs);
+
+ if (r >= 0) {
+ // success.. delay reply
+ wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, r, rs,
+ get_last_committed() + 1));
+ return true;
+ } else {
+ // reply immediately
+ mon->reply_command(op, r, rs, rdata, get_last_committed());
+ return false;
+ }
+}
+
+void MgrMonitor::init()
+{
+ send_digests(); // To get it to schedule its own event
+}
+
+void MgrMonitor::on_shutdown()
+{
+ if (digest_callback) {
+ mon->timer.cancel_event(digest_callback);
+ }
}
diff --git a/src/mon/MgrMonitor.h b/src/mon/MgrMonitor.h
index 51850893ed1..bd4ac4599bf 100644
--- a/src/mon/MgrMonitor.h
+++ b/src/mon/MgrMonitor.h
@@ -12,25 +12,53 @@
*/
+#include "include/Context.h"
#include "MgrMap.h"
#include "PaxosService.h"
+
class MgrMonitor : public PaxosService
{
MgrMap map;
MgrMap pending_map;
+ std::map<uint64_t, utime_t> last_beacon;
+
+ /**
+ * If a standby is available, make it active, given that
+ * there is currently no active daemon.
+ *
+ * @return true if a standby was promoted
+ */
+ bool promote_standby();
+ void drop_active();
+ void drop_standby(uint64_t gid);
+
+ Context *digest_callback;
+
public:
MgrMonitor(Monitor *mn, Paxos *p, const string& service_name)
- : PaxosService(mn, p, service_name)
+ : PaxosService(mn, p, service_name), digest_callback(nullptr)
{}
+ void init();
+ void on_shutdown();
+
+ const MgrMap &get_map() const { return map; }
+
+ bool in_use() const { return map.epoch > 0; }
+
void create_initial();
void update_from_paxos(bool *need_bootstrap);
void create_pending();
void encode_pending(MonitorDBStore::TransactionRef t);
+
bool preprocess_query(MonOpRequestRef op);
bool prepare_update(MonOpRequestRef op);
+
+ bool preprocess_command(MonOpRequestRef op);
+ bool prepare_command(MonOpRequestRef op);
+
void encode_full(MonitorDBStore::TransactionRef t) { }
bool preprocess_beacon(MonOpRequestRef op);
@@ -42,6 +70,8 @@ public:
void tick();
+ void print_summary(Formatter *f, std::ostream *ss) const;
+
friend class C_Updated;
};
diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h
index 5dc1eed785c..035fe5d4f43 100644
--- a/src/mon/MonCommands.h
+++ b/src/mon/MonCommands.h
@@ -842,3 +842,10 @@ COMMAND("config-key exists " \
"name=key,type=CephString", \
"check for <key>'s existence", "config-key", "r", "cli,rest")
COMMAND("config-key list ", "list keys", "config-key", "r", "cli,rest")
+
+
+/*
+ * mon/MgrMonitor.cc
+ */
+COMMAND("mgr fail name=who,type=CephString", \
+ "treat the named manager daemon as failed", "mgr", "rw", "cli,rest")
diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc
index 9ff47ce86ed..67cf14304a8 100644
--- a/src/mon/Monitor.cc
+++ b/src/mon/Monitor.cc
@@ -2466,6 +2466,10 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
f->open_object_section("fsmap");
mdsmon()->fsmap.print_summary(f, NULL);
f->close_section();
+
+ f->open_object_section("mgrmap");
+ mgrmon()->get_map().print_summary(f, nullptr);
+ f->close_section();
f->close_section();
} else {
ss << " cluster " << monmap->get_fsid() << "\n";
@@ -2477,6 +2481,11 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f)
if (mdsmon()->fsmap.any_filesystems()) {
ss << " fsmap " << mdsmon()->fsmap << "\n";
}
+ if (mgrmon()->in_use()) {
+ ss << " mgr ";
+ mgrmon()->get_map().print_summary(nullptr, &ss);
+ ss << "\n";
+ }
osdmon()->osdmap.print_summary(NULL, ss);
pgmon()->pg_map.print_summary(NULL, &ss);
@@ -2801,6 +2810,11 @@ void Monitor::handle_command(MonOpRequestRef op)
return;
}
+ if (module == "mgr") {
+ mgrmon()->dispatch(op);
+ return;
+ }
+
if (prefix == "fsid") {
if (f) {
f->open_object_section("fsid");