diff options
author | John Spray <john.spray@redhat.com> | 2016-07-15 01:39:22 +0200 |
---|---|---|
committer | John Spray <john.spray@redhat.com> | 2016-09-29 18:26:58 +0200 |
commit | 7845f8d757adf63390d6cb0ff56afbc088bc508b (patch) | |
tree | 72043a3958d542e202a4b945e67c2a4b79f07807 | |
parent | pybind/mgr: implement shutdown() in rest.py (diff) | |
download | ceph-7845f8d757adf63390d6cb0ff56afbc088bc508b.tar.xz ceph-7845f8d757adf63390d6cb0ff56afbc088bc508b.zip |
mgr: flesh out standby/HA
Signed-off-by: John Spray <john.spray@redhat.com>
-rw-r--r-- | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/ceph_mgr.cc | 14 | ||||
-rw-r--r-- | src/common/config_opts.h | 2 | ||||
-rw-r--r-- | src/messages/MMgrBeacon.h | 21 | ||||
-rw-r--r-- | src/mgr/Mgr.cc | 146 | ||||
-rw-r--r-- | src/mgr/Mgr.h | 30 | ||||
-rw-r--r-- | src/mgr/MgrClient.cc | 83 | ||||
-rw-r--r-- | src/mgr/MgrClient.h | 3 | ||||
-rw-r--r-- | src/mgr/MgrStandby.cc | 230 | ||||
-rw-r--r-- | src/mgr/MgrStandby.h | 60 | ||||
-rw-r--r-- | src/mgr/PyModules.cc | 12 | ||||
-rw-r--r-- | src/mon/MgrMap.h | 97 | ||||
-rw-r--r-- | src/mon/MgrMonitor.cc | 282 | ||||
-rw-r--r-- | src/mon/MgrMonitor.h | 32 | ||||
-rw-r--r-- | src/mon/MonCommands.h | 7 | ||||
-rw-r--r-- | src/mon/Monitor.cc | 14 |
16 files changed, 840 insertions, 194 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5a73b26053f..92a19e6295d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -524,6 +524,7 @@ if (WITH_MGR) mgr/PyFormatter.cc mgr/PyState.cc mgr/MgrPyModule.cc + mgr/MgrStandby.cc mgr/Mgr.cc) add_executable(ceph-mgr ${mgr_srcs} $<TARGET_OBJECTS:heap_profiler_objs>) diff --git a/src/ceph_mgr.cc b/src/ceph_mgr.cc index 4b4c62a61b4..c3d032c1326 100644 --- a/src/ceph_mgr.cc +++ b/src/ceph_mgr.cc @@ -14,7 +14,6 @@ * */ -#include "mgr/Mgr.h" #include "include/types.h" #include "common/config.h" @@ -22,9 +21,13 @@ #include "common/errno.h" #include "global/global_init.h" +#include "mgr/MgrStandby.h" - +/** + * A short main() which just instantiates a MgrStandby and + * hands over control to that. + */ int main(int argc, const char **argv) { vector<const char*> args; @@ -33,13 +36,12 @@ int main(int argc, const char **argv) global_init(NULL, args, CEPH_ENTITY_TYPE_MGR, CODE_ENVIRONMENT_DAEMON, 0, "mgr_data"); - common_init_finish(g_ceph_context); // For consumption by KeyRing::from_ceph_context in MonClient g_conf->set_val("keyring", "$mgr_data/keyring", false); - Mgr mgr; + MgrStandby mgr; - // Handle --help before calling init() so we don't depend on network. + // Handle --help if ((args.size() == 1 && (std::string(args[0]) == "--help" || std::string(args[0]) == "-h"))) { mgr.usage(); return 0; @@ -49,14 +51,12 @@ int main(int argc, const char **argv) global_init_chdir(g_ceph_context); common_init_finish(g_ceph_context); - // Connect to mon cluster, download MDS map etc int rc = mgr.init(); if (rc != 0) { std::cerr << "Error in initialization: " << cpp_strerror(rc) << std::endl; return rc; } - // Finally, execute the user's commands return mgr.main(args); } diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 4f625db8174..ed77cb09374 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1504,6 +1504,8 @@ OPTION(rgw_swift_versioning_enabled, OPT_BOOL, false) // whether swift object ve OPTION(mgr_module_path, OPT_STR, CEPH_PKGLIBDIR "/mgr") // where to load python modules from OPTION(mgr_modules, OPT_STR, "rest") // Which modules to load OPTION(mgr_data, OPT_STR, "/var/lib/ceph/mgr/$cluster-$id") // where to find keyring etc +OPTION(mgr_beacon_period, OPT_INT, 5) // How frequently to send beacon +OPTION(mon_mgr_beacon_grace, OPT_INT, 30) // How long to wait to failover OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entries to read from rados for bucket listing diff --git a/src/messages/MMgrBeacon.h b/src/messages/MMgrBeacon.h index e38227e2421..b452a9dda77 100644 --- a/src/messages/MMgrBeacon.h +++ b/src/messages/MMgrBeacon.h @@ -28,21 +28,27 @@ class MMgrBeacon : public PaxosServiceMessage { protected: uint64_t gid; entity_addr_t server_addr; + bool available; + std::string name; public: MMgrBeacon() - : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION) + : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION), + gid(0), available(false) { } - MMgrBeacon(uint64_t gid_, entity_addr_t server_addr_) + MMgrBeacon(uint64_t gid_, const std::string &name_, + entity_addr_t server_addr_, bool available_) : PaxosServiceMessage(MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION), - gid(gid_), server_addr(server_addr_) + gid(gid_), server_addr(server_addr_), available(available_), name(name_) { } uint64_t get_gid() const { return gid; } entity_addr_t get_server_addr() const { return server_addr; } + bool get_available() const { return available; } + const std::string& get_name() const { return name; } private: ~MMgrBeacon() {} @@ -52,17 +58,24 @@ public: const char *get_type_name() const { return "mgrbeacon"; } void print(ostream& out) const { - out << get_type_name() << "(" << gid << ", " << server_addr << ")"; + out << get_type_name() << " mgr." << name << "(" << gid << ", " + << server_addr << ", " << available << ")"; } void encode_payload(uint64_t features) { paxos_encode(); ::encode(server_addr, payload, features); + ::encode(gid, payload); + ::encode(available, payload); + ::encode(name, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); paxos_decode(p); ::decode(server_addr, p); + ::decode(gid, p); + ::decode(available, p); + ::decode(name, p); } }; diff --git a/src/mgr/Mgr.cc b/src/mgr/Mgr.cc index 75197999368..bcc27855296 100644 --- a/src/mgr/Mgr.cc +++ b/src/mgr/Mgr.cc @@ -33,23 +33,21 @@ #define dout_prefix *_dout << "mgr " << __func__ << " " -Mgr::Mgr() : - Dispatcher(g_ceph_context), +Mgr::Mgr(MonClient *monc_, Messenger *clientm_) : + monc(monc_), objecter(NULL), - monc(new MonClient(g_ceph_context)), + client_messenger(clientm_), lock("Mgr::lock"), timer(g_ceph_context, lock), finisher(g_ceph_context, "Mgr", "mgr-fin"), waiting_for_fs_map(NULL), py_modules(daemon_state, cluster_state, *monc, finisher), cluster_state(monc, nullptr), - server(monc, daemon_state, py_modules) + server(monc, daemon_state, py_modules), + initialized(false), + initializing(false) { - client_messenger = Messenger::create_client_messenger(g_ceph_context, "mds"); - - // FIXME: using objecter as convenience to handle incremental - // OSD maps, but that's overkill. We don't really need an objecter. - // Could we separate out the part of Objecter that we really need? + // Using Objecter to handle incremental decode of OSDMap objecter = new Objecter(g_ceph_context, client_messenger, monc, NULL, 0, 0); cluster_state.set_objecter(objecter); @@ -59,9 +57,7 @@ Mgr::Mgr() : Mgr::~Mgr() { delete objecter; - delete monc; - delete client_messenger; - assert(waiting_for_fs_map == NULL); + assert(waiting_for_fs_map == nullptr); } @@ -123,58 +119,35 @@ public: }; - -int Mgr::init() +void Mgr::background_init() { Mutex::Locker l(lock); + assert(!initializing); + assert(!initialized); + initializing = true; - // Initialize Messenger - int r = client_messenger->bind(g_conf->public_addr); - if (r < 0) - return r; + finisher.start(); - client_messenger->start(); + finisher.queue(new C_StdFunction([this](){ + init(); + })); +} + +void Mgr::init() +{ + Mutex::Locker l(lock); + assert(initializing); + assert(!initialized); objecter->set_client_incarnation(0); objecter->init(); - // Connect dispatchers before starting objecter - client_messenger->add_dispatcher_tail(objecter); - client_messenger->add_dispatcher_tail(this); - - // Initialize MonClient - if (monc->build_initial_monmap() < 0) { - objecter->shutdown(); - client_messenger->shutdown(); - client_messenger->wait(); - return -1; - } - - monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD - |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); - monc->set_messenger(client_messenger); - monc->init(); - r = monc->authenticate(); - if (r < 0) { - derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl; - monc->shutdown(); - objecter->shutdown(); - client_messenger->shutdown(); - client_messenger->wait(); - return r; - } - - client_t whoami = monc->get_global_id(); - client_messenger->set_myname(entity_name_t::CLIENT(whoami.v)); + // Dispatcher before starting objecter + client_messenger->add_dispatcher_head(objecter); // Start communicating with daemons to learn statistics etc server.init(monc->get_global_id(), client_messenger->get_myaddr()); - dout(4) << "Initialized server at " << server.get_myaddr() << dendl; - // TODO: send the beacon periodically - MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(), - server.get_myaddr()); - monc->send_mon_message(m); // Preload all daemon metadata (will subsequently keep this // up to date by watching maps, so do the initial load before @@ -189,8 +162,9 @@ int Mgr::init() // Start Objecter and wait for OSD map objecter->start(); + lock.Unlock(); // Drop lock because OSDMap dispatch calls into my ms_dispatch objecter->wait_for_osd_map(); - timer.init(); + lock.Lock(); monc->sub_want("mgrdigest", 0, 0); @@ -212,12 +186,14 @@ int Mgr::init() // Wait for MgrDigest...? // TODO - finisher.start(); + // assume finisher already initialized in background_init py_modules.init(); + py_modules.start(); dout(4) << "Complete." << dendl; - return 0; + initializing = false; + initialized = true; } void Mgr::load_all_metadata() @@ -342,15 +318,12 @@ void Mgr::load_config() py_modules.insert_config(loaded); } -void Mgr::handle_signal(int signum) +void Mgr::shutdown() { + // FIXME: pre-empt init() if it is currently running, so that it will + // give up the lock for us. Mutex::Locker l(lock); - assert(signum == SIGINT || signum == SIGTERM); - shutdown(); -} -void Mgr::shutdown() -{ // First stop the server so that we're not taking any more incoming requests server.shutdown(); @@ -358,13 +331,7 @@ void Mgr::shutdown() // to touch references to the things we're about to tear down finisher.stop(); - //lock.Lock(); - timer.shutdown(); objecter->shutdown(); - //lock.Unlock(); - - monc->shutdown(); - client_messenger->shutdown(); } void Mgr::handle_osd_map() @@ -545,51 +512,6 @@ void Mgr::handle_fs_map(MFSMap* m) } -bool Mgr::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, - bool force_new) -{ - if (dest_type == CEPH_ENTITY_TYPE_MON) - return true; - - if (force_new) { - if (monc->wait_auth_rotating(10) < 0) - return false; - } - - *authorizer = monc->auth->build_authorizer(dest_type); - return *authorizer != NULL; -} - -// A reference for use by the signal handler -Mgr *signal_mgr = nullptr; - -static void handle_mgr_signal(int signum) -{ - if (signal_mgr) { - signal_mgr->handle_signal(signum); - } -} - -int Mgr::main(vector<const char *> args) -{ - py_modules.start(); - - // Enable signal handlers - signal_mgr = this; - init_async_signal_handler(); - register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal); - register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal); - - client_messenger->wait(); - - // Disable signal handlers - unregister_async_signal_handler(SIGINT, handle_mgr_signal); - unregister_async_signal_handler(SIGTERM, handle_mgr_signal); - shutdown_async_signal_handler(); - signal_mgr = nullptr; -} - - void Mgr::handle_mgr_digest(MMgrDigest* m) { dout(10) << m->mon_status_json.length() << dendl; diff --git a/src/mgr/Mgr.h b/src/mgr/Mgr.h index e85358f06b7..825edf15333 100644 --- a/src/mgr/Mgr.h +++ b/src/mgr/Mgr.h @@ -11,8 +11,8 @@ * Foundation. See file COPYING. */ -#ifndef CEPH_PYFOO_H_ -#define CEPH_PYFOO_H_ +#ifndef CEPH_MGR_H_ +#define CEPH_MGR_H_ // Python.h comes first because otherwise it clobbers ceph's assert #include "Python.h" @@ -26,7 +26,6 @@ #include "osdc/Objecter.h" #include "mds/FSMap.h" #include "messages/MFSMap.h" -#include "msg/Dispatcher.h" #include "msg/Messenger.h" #include "auth/Auth.h" #include "common/Finisher.h" @@ -44,10 +43,10 @@ class MMgrDigest; class MgrPyModule; -class Mgr : public Dispatcher { +class Mgr { protected: - Objecter *objecter; MonClient *monc; + Objecter *objecter; Messenger *client_messenger; Mutex lock; @@ -64,24 +63,27 @@ protected: void load_config(); void load_all_metadata(); + void init(); + + bool initialized; + bool initializing; public: - Mgr(); + Mgr(MonClient *monc_, Messenger *clientm_); ~Mgr(); + bool is_initialized() const {return initialized;} + entity_addr_t get_server_addr() const { return server.get_myaddr(); } + void handle_mgr_digest(MMgrDigest* m); void handle_fs_map(MFSMap* m); void handle_osd_map(); + bool ms_dispatch(Message *m); - bool ms_handle_reset(Connection *con) { return false; } - void ms_handle_remote_reset(Connection *con) {} - bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, - bool force_new); - int init(); + + void background_init(); void shutdown(); - void usage() {} int main(vector<const char *> args); - void handle_signal(int signum); }; -#endif /* MDS_UTILITY_H_ */ +#endif diff --git a/src/mgr/MgrClient.cc b/src/mgr/MgrClient.cc index 2fbbc8048d1..8af29a52222 100644 --- a/src/mgr/MgrClient.cc +++ b/src/mgr/MgrClient.cc @@ -44,14 +44,6 @@ void MgrClient::init() assert(msgr != nullptr); timer.init(); - -#if 0 - if (map.epoch == 0) { - ldout(cct, 4) << "no map yet, waiting..." << dendl; - wait_on_list(waiting_for_map); - } - ldout(cct, 4) << "proceeding with map " << map.epoch << dendl; -#endif } void MgrClient::shutdown() @@ -98,38 +90,50 @@ bool MgrClient::handle_mgr_map(MMgrMap *m) if (session == nullptr || session->con->get_peer_addr() != map.get_active_addr()) { - entity_inst_t inst; - inst.addr = map.get_active_addr(); - inst.name = entity_name_t::MGR(map.get_active_gid()); - - delete session; - session = new MgrSessionState(); - session->con = msgr->get_connection(inst); - - // Don't send an open if we're just a client (i.e. doing - // command-sending, not stats etc) - if (g_conf && !g_conf->name.is_client()) { - auto open = new MMgrOpen(); - open->daemon_name = g_conf->name.get_id(); - session->con->send_message(open); + if (session) { + ldout(cct, 4) << "Terminating session with " + << session->con->get_peer_addr() << dendl; + delete session; + session = nullptr; + + std::vector<ceph_tid_t> erase_cmds; + auto commands = command_table.get_commands(); + for (const auto &i : commands) { + // FIXME be nicer, retarget command on new mgr? + if (i.second->on_finish != nullptr) { + i.second->on_finish->complete(-ETIMEDOUT); + } + erase_cmds.push_back(i.first); + } + for (const auto &tid : erase_cmds) { + command_table.erase(tid); + } } - std::vector<ceph_tid_t> erase_cmds; - auto commands = command_table.get_commands(); - for (const auto &i : commands) { - // FIXME be nicer, retarget command on new mgr? - if (i.second->on_finish != nullptr) { - i.second->on_finish->complete(-ETIMEDOUT); + if (map.get_available()) { + ldout(cct, 4) << "Starting new session with " << map.get_active_addr() + << dendl; + entity_inst_t inst; + inst.addr = map.get_active_addr(); + inst.name = entity_name_t::MGR(map.get_active_gid()); + + session = new MgrSessionState(); + session->con = msgr->get_connection(inst); + + // Don't send an open if we're just a client (i.e. doing + // command-sending, not stats etc) + if (g_conf && !g_conf->name.is_client()) { + auto open = new MMgrOpen(); + open->daemon_name = g_conf->name.get_id(); + session->con->send_message(open); } - erase_cmds.push_back(i.first); - } - for (const auto &tid : erase_cmds) { - command_table.erase(tid); + + signal_cond_list(waiting_for_session); + } else { + ldout(cct, 4) << "No active mgr available yet" << dendl; } } - signal_cond_list(waiting_for_map); - return true; } @@ -262,16 +266,13 @@ int MgrClient::start_command(const vector<string>& cmd, const bufferlist& inbl, ldout(cct, 20) << "cmd: " << cmd << dendl; - assert(map.epoch > 0); - - - if (session == nullptr) { - derr << "no session" << dendl; - // FIXME: be nicer: maybe block until a mgr is available? - return -ENOENT; + derr << "no session, waiting" << dendl; + wait_on_list(waiting_for_session); } + assert(map.epoch > 0); + MgrCommand *op = command_table.start_command(); op->cmd = cmd; op->inbl = inbl; diff --git a/src/mgr/MgrClient.h b/src/mgr/MgrClient.h index bcaf3de5ea5..92775d62e3b 100644 --- a/src/mgr/MgrClient.h +++ b/src/mgr/MgrClient.h @@ -63,7 +63,8 @@ protected: void wait_on_list(list<Cond*>& ls); void signal_cond_list(list<Cond*>& ls); - list<Cond*> waiting_for_map; + + list<Cond*> waiting_for_session; public: MgrClient(CephContext *cct_, Messenger *msgr_); diff --git a/src/mgr/MgrStandby.cc b/src/mgr/MgrStandby.cc new file mode 100644 index 00000000000..896fb166f74 --- /dev/null +++ b/src/mgr/MgrStandby.cc @@ -0,0 +1,230 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray <john.spray@redhat.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + +#include "common/errno.h" +#include "mon/MonClient.h" +#include "include/stringify.h" +#include "global/global_context.h" +#include "global/signal_handler.h" + +#include "mgr/MgrContext.h" + +#include "messages/MMgrBeacon.h" +#include "messages/MMgrMap.h" +#include "Mgr.h" + +#include "MgrStandby.h" + +#define dout_subsys ceph_subsys_mgr +#undef dout_prefix +#define dout_prefix *_dout << "mgr " << __func__ << " " + + +MgrStandby::MgrStandby() : + Dispatcher(g_ceph_context), + monc(new MonClient(g_ceph_context)), + lock("MgrStandby::lock"), + timer(g_ceph_context, lock), + active_mgr(nullptr) +{ + client_messenger = Messenger::create_client_messenger(g_ceph_context, "mgr"); +} + + +MgrStandby::~MgrStandby() +{ + delete monc; + delete client_messenger; + delete active_mgr; +} + + +int MgrStandby::init() +{ + Mutex::Locker l(lock); + + // Initialize Messenger + client_messenger->add_dispatcher_tail(this); + client_messenger->start(); + + // Initialize MonClient + if (monc->build_initial_monmap() < 0) { + client_messenger->shutdown(); + client_messenger->wait(); + return -1; + } + + monc->sub_want("mgrmap", 0, 0); + + monc->set_want_keys(CEPH_ENTITY_TYPE_MON|CEPH_ENTITY_TYPE_OSD + |CEPH_ENTITY_TYPE_MDS|CEPH_ENTITY_TYPE_MGR); + monc->set_messenger(client_messenger); + monc->init(); + int r = monc->authenticate(); + if (r < 0) { + derr << "Authentication failed, did you specify a mgr ID with a valid keyring?" << dendl; + monc->shutdown(); + client_messenger->shutdown(); + client_messenger->wait(); + return r; + } + + client_t whoami = monc->get_global_id(); + client_messenger->set_myname(entity_name_t::CLIENT(whoami.v)); + + timer.init(); + send_beacon(); + + dout(4) << "Complete." << dendl; + return 0; +} + +void MgrStandby::send_beacon() +{ + assert(lock.is_locked_by_me()); + dout(1) << state_str() << dendl; + dout(10) << "sending beacon as gid " << monc->get_global_id() << dendl; + + bool available = active_mgr != nullptr && active_mgr->is_initialized(); + auto addr = available ? active_mgr->get_server_addr() : entity_addr_t(); + MMgrBeacon *m = new MMgrBeacon(monc->get_global_id(), + g_conf->name.get_id(), + addr, + available); + + monc->send_mon_message(m); + // TODO configure period + timer.add_event_after(5, new C_StdFunction( + [this](){ + send_beacon(); + } + )); +} + +void MgrStandby::handle_signal(int signum) +{ + Mutex::Locker l(lock); + assert(signum == SIGINT || signum == SIGTERM); + shutdown(); +} + +void MgrStandby::shutdown() +{ + // Expect already to be locked as we're called from signal handler + assert(lock.is_locked_by_me()); + + if (active_mgr) { + active_mgr->shutdown(); + } + + timer.shutdown(); + + monc->shutdown(); + client_messenger->shutdown(); +} + +bool MgrStandby::ms_dispatch(Message *m) +{ + Mutex::Locker l(lock); + dout(4) << state_str() << " " << *m << dendl; + + switch (m->get_type()) { + case MSG_MGR_MAP: + { + auto mmap = static_cast<MMgrMap*>(m); + auto map = mmap->get_map(); + dout(4) << "received map epoch " << map.get_epoch() << dendl; + const bool active_in_map = map.active_gid == monc->get_global_id(); + dout(4) << "active in map: " << active_in_map + << " active is " << map.active_gid << dendl; + if (active_in_map) { + if (active_mgr == nullptr) { + dout(1) << "Activating!" << dendl; + active_mgr = new Mgr(monc, client_messenger); + active_mgr->background_init(); + dout(1) << "I am now active" << dendl; + } else { + dout(10) << "I was already active" << dendl; + } + } else { + if (active_mgr != nullptr) { + derr << "I was active but no longer am" << dendl; + shutdown(); + // FIXME: should politely go back to standby (or respawn + // process) instead of stopping entirely. + } + } + } + break; + + default: + if (active_mgr) { + return active_mgr->ms_dispatch(m); + } else { + return false; + } + } + return true; +} + + +bool MgrStandby::ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, + bool force_new) +{ + if (dest_type == CEPH_ENTITY_TYPE_MON) + return true; + + if (force_new) { + if (monc->wait_auth_rotating(10) < 0) + return false; + } + + *authorizer = monc->auth->build_authorizer(dest_type); + return *authorizer != NULL; +} + +// A reference for use by the signal handler +MgrStandby *signal_mgr = nullptr; + +static void handle_mgr_signal(int signum) +{ + if (signal_mgr) { + signal_mgr->handle_signal(signum); + } +} + +int MgrStandby::main(vector<const char *> args) +{ + // Enable signal handlers + signal_mgr = this; + init_async_signal_handler(); + register_async_signal_handler_oneshot(SIGINT, handle_mgr_signal); + register_async_signal_handler_oneshot(SIGTERM, handle_mgr_signal); + + client_messenger->wait(); + + // Disable signal handlers + unregister_async_signal_handler(SIGINT, handle_mgr_signal); + unregister_async_signal_handler(SIGTERM, handle_mgr_signal); + shutdown_async_signal_handler(); + signal_mgr = nullptr; + + return 0; +} + + +std::string MgrStandby::state_str() +{ + return active_mgr == nullptr ? "standby" : "active"; +} + diff --git a/src/mgr/MgrStandby.h b/src/mgr/MgrStandby.h new file mode 100644 index 00000000000..ccb44d74b71 --- /dev/null +++ b/src/mgr/MgrStandby.h @@ -0,0 +1,60 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 John Spray <john.spray@redhat.com> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + */ + + +#ifndef MGR_STANDBY_H_ +#define MGR_STANDBY_H_ + +#include "auth/Auth.h" +#include "common/Finisher.h" +#include "common/Timer.h" + +#include "DaemonServer.h" +#include "PyModules.h" + +#include "DaemonState.h" +#include "ClusterState.h" + +class Mgr; + +class MgrStandby : public Dispatcher { +protected: + MonClient *monc; + Messenger *client_messenger; + + Mutex lock; + SafeTimer timer; + + Mgr *active_mgr; + + std::string state_str(); + +public: + MgrStandby(); + ~MgrStandby(); + + bool ms_dispatch(Message *m); + bool ms_handle_reset(Connection *con) { return false; } + void ms_handle_remote_reset(Connection *con) {} + bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, + bool force_new); + int init(); + void shutdown(); + void usage() {} + int main(vector<const char *> args); + void handle_signal(int signum); + void send_beacon(); +}; + +#endif + diff --git a/src/mgr/PyModules.cc b/src/mgr/PyModules.cc index 40bc68f9cee..e20940c54a8 100644 --- a/src/mgr/PyModules.cc +++ b/src/mgr/PyModules.cc @@ -312,12 +312,12 @@ public: void PyModules::start() { - { - Mutex::Locker l(lock); - for (auto &i : modules) { - auto thread = new ServeThread(i.second); - serve_threads[i.first] = thread; - } + Mutex::Locker l(lock); + + dout(1) << "Creating threads for " << modules.size() << " modules" << dendl; + for (auto &i : modules) { + auto thread = new ServeThread(i.second); + serve_threads[i.first] = thread; } for (auto &i : serve_threads) { diff --git a/src/mon/MgrMap.h b/src/mon/MgrMap.h index 9606de5cd54..909f40aa0df 100644 --- a/src/mon/MgrMap.h +++ b/src/mon/MgrMap.h @@ -14,19 +14,65 @@ #ifndef MGR_MAP_H_ #define MGR_MAP_H_ +#include <sstream> + #include "msg/msg_types.h" +#include "common/Formatter.h" #include "include/encoding.h" +class StandbyInfo +{ +public: + uint64_t gid; + std::string name; + + StandbyInfo(uint64_t gid_, const std::string &name_) + : gid(gid_), name(name_) + {} + + StandbyInfo() + : gid(0) + {} + + void encode(bufferlist& bl) const + { + ENCODE_START(1, 1, bl); + ::encode(gid, bl); + ::encode(name, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& p) + { + DECODE_START(1, p); + ::decode(gid, p); + ::decode(name, p); + DECODE_FINISH(p); + } +}; +WRITE_CLASS_ENCODER(StandbyInfo) + class MgrMap { public: + epoch_t epoch; + + // global_id of the ceph-mgr instance selected as a leader uint64_t active_gid; + // server address reported by the leader once it is active entity_addr_t active_addr; - epoch_t epoch; + // whether the nominated leader is active (i.e. has initialized its server) + bool available; + // the name (foo in mgr.<foo>) of the active daemon + std::string active_name; + + std::map<uint64_t, StandbyInfo> standbys; epoch_t get_epoch() const { return epoch; } entity_addr_t get_active_addr() const { return active_addr; } uint64_t get_active_gid() const { return active_gid; } + bool get_available() const { return available; } + const std::string &get_active_name() const { return active_name; } void encode(bufferlist& bl, uint64_t features) const { @@ -34,6 +80,9 @@ public: ::encode(epoch, bl); ::encode(active_addr, bl, features); ::encode(active_gid, bl); + ::encode(available, bl); + ::encode(active_name, bl); + ::encode(standbys, bl); ENCODE_FINISH(bl); } @@ -43,11 +92,55 @@ public: ::decode(epoch, p); ::decode(active_addr, p); ::decode(active_gid, p); + ::decode(available, p); + ::decode(active_name, p); + ::decode(standbys, p); DECODE_FINISH(p); } + void print_summary(Formatter *f, std::ostream *ss) const + { + // One or the other, not both + assert((ss != nullptr) != (f != nullptr)); + + if (f) { + f->dump_int("active_gid", get_active_gid()); + f->dump_string("active_name", get_active_name()); + } else { + if (get_active_gid() != 0) { + *ss << "active: " << get_active_name() << " "; + } else { + *ss << "no daemons active "; + } + } + + + if (f) { + f->open_array_section("standbys"); + for (const auto &i : standbys) { + f->open_object_section("standby"); + f->dump_int("gid", i.second.gid); + f->dump_string("name", i.second.name); + f->close_section(); + } + f->close_section(); + } else { + if (standbys.size()) { + *ss << "standbys: "; + bool first = true; + for (const auto &i : standbys) { + if (!first) { + *ss << ", "; + } + *ss << i.second.name; + first = false; + } + } + } + } + MgrMap() - : epoch(0) + : epoch(0), available(false) {} }; diff --git a/src/mon/MgrMonitor.cc b/src/mon/MgrMonitor.cc index 66e2dcbacae..4e34edfdce9 100644 --- a/src/mon/MgrMonitor.cc +++ b/src/mon/MgrMonitor.cc @@ -18,6 +18,7 @@ #include "PGMap.h" #include "PGMonitor.h" #include "include/stringify.h" +#include "mgr/MgrContext.h" #include "MgrMonitor.h" @@ -71,6 +72,8 @@ bool MgrMonitor::preprocess_query(MonOpRequestRef op) switch (m->get_type()) { case MSG_MGR_BEACON: return preprocess_beacon(op); + case MSG_MON_COMMAND: + return preprocess_command(op); default: mon->no_reply(op); derr << "Unhandled message type " << m->get_type() << dendl; @@ -84,6 +87,10 @@ bool MgrMonitor::prepare_update(MonOpRequestRef op) switch (m->get_type()) { case MSG_MGR_BEACON: return prepare_beacon(op); + + case MSG_MON_COMMAND: + return prepare_command(op); + default: mon->no_reply(op); derr << "Unhandled message type " << m->get_type() << dendl; @@ -118,12 +125,70 @@ public: bool MgrMonitor::prepare_beacon(MonOpRequestRef op) { MMgrBeacon *m = static_cast<MMgrBeacon*>(op->get_req()); + dout(4) << "beacon from " << m->get_gid() << dendl; + + // See if we are seeing same name, new GID for the active daemon + if (m->get_name() == pending_map.active_name + && m->get_gid() != pending_map.active_gid) + { + dout(4) << "Active daemon restart (mgr." << m->get_name() << ")" << dendl; + drop_active(); + } + + // See if we are seeing same name, new GID for any standbys + for (const auto &i : pending_map.standbys) { + const StandbyInfo &s = i.second; + if (s.name == m->get_name() && s.gid != m->get_gid()) { + dout(4) << "Standby daemon restart (mgr." << m->get_name() << ")" << dendl; + drop_standby(i.first); + break; + } + } + + last_beacon[m->get_gid()] = ceph_clock_now(g_ceph_context); + + // Track whether we modified pending_map + bool updated = false; + + if (pending_map.active_gid == m->get_gid()) { + // A beacon from the currently active daemon + if (pending_map.active_addr != m->get_server_addr()) { + dout(4) << "learned address " << m->get_server_addr() << dendl; + pending_map.active_addr = m->get_server_addr(); + updated = true; + } + + if (pending_map.get_available() != m->get_available()) { + dout(4) << "available " << m->get_gid() << dendl; + pending_map.available = m->get_available(); + updated = true; + } + } else if (pending_map.active_gid == 0) { + // There is no currently active daemon, select this one. + if (pending_map.standbys.count(m->get_gid())) { + drop_standby(m->get_gid()); + } + pending_map.active_gid = m->get_gid(); + pending_map.active_name = m->get_name(); - pending_map.active_gid = m->get_gid(); - pending_map.active_addr = m->get_server_addr(); + dout(4) << "selecting new active in epoch " << pending_map.epoch << dendl; + wait_for_finished_proposal(op, new C_Updated(this, op)); + } else { + if (pending_map.standbys.count(m->get_gid()) > 0) { + dout(10) << "from existing standby " << m->get_gid() << dendl; + } else { + dout(10) << "new standby " << m->get_gid() << dendl; + pending_map.standbys[m->get_gid()] = {m->get_gid(), m->get_name()}; + updated = true; + } + } - dout(4) << "proposing epoch " << pending_map.epoch << dendl; - wait_for_finished_proposal(op, new C_Updated(this, op)); + if (updated) { + dout(4) << "updating map" << dendl; + wait_for_finished_proposal(op, new C_Updated(this, op)); + } else { + dout(10) << "no change" << dendl; + } return true; } @@ -162,6 +227,8 @@ void MgrMonitor::check_sub(Subscription *sub) */ void MgrMonitor::send_digests() { + digest_callback = nullptr; + const std::string type = "mgrdigest"; if (mon->session_map.subs.count(type) == 0) return; @@ -182,11 +249,214 @@ void MgrMonitor::send_digests() sub->session->con->send_message(mdigest); } + + digest_callback = new C_StdFunction([this](){ + send_digests(); + }); + mon->timer.add_event_after(5, digest_callback); } void MgrMonitor::tick() { - // TODO control frequency independently of the global tick frequency - send_digests(); + const utime_t now = ceph_clock_now(g_ceph_context); + utime_t cutoff = now; + cutoff -= g_conf->mon_mgr_beacon_grace; + + // Populate any missing beacons (i.e. no beacon since MgrMonitor + // instantiation) with the current time, so that they will + // eventually look laggy if they fail to give us a beacon. + if (pending_map.active_gid != 0 + && last_beacon.count(pending_map.active_gid) == 0) { + last_beacon[pending_map.active_gid] = now; + } + for (auto s : pending_map.standbys) { + if (last_beacon.count(s.first) == 0) { + last_beacon[s.first] = now; + } + } + + // Cull standbys first so that any remaining standbys + // will be eligible to take over from the active if we cull him. + std::list<uint64_t> dead_standbys; + for (const auto &i : pending_map.standbys) { + auto last_beacon_time = last_beacon.at(i.first); + if (last_beacon_time < cutoff) { + dead_standbys.push_back(i.first); + } + } + + bool propose = false; + + for (auto i : dead_standbys) { + dout(4) << "Dropping laggy standby " << i << dendl; + drop_standby(i); + propose = true; + } + + if (pending_map.active_gid != 0 + && last_beacon.at(pending_map.active_gid) < cutoff) { + + drop_active(); + dout(4) << "Dropping active" << pending_map.active_gid << dendl; + if (promote_standby()) { + dout(4) << "Promoted standby " << pending_map.active_gid << dendl; + propose = true; + } else { + dout(4) << "Active is laggy but have no standbys to replace it" << dendl; + } + } else if (pending_map.active_gid == 0) { + if (promote_standby()) { + dout(4) << "Promoted standby " << pending_map.active_gid << dendl; + propose = true; + } + } + + if (propose) { + propose_pending(); + } +} + +bool MgrMonitor::promote_standby() +{ + assert(pending_map.active_gid == 0); + if (pending_map.standbys.size()) { + // Promote a replacement (arbitrary choice of standby) + auto replacement_gid = pending_map.standbys.begin()->first; + pending_map.active_gid = replacement_gid; + pending_map.active_name = pending_map.standbys.at(replacement_gid).name; + pending_map.available = false; + pending_map.active_addr = entity_addr_t(); + + drop_standby(replacement_gid); + return true; + } else { + return false; + } +} + +void MgrMonitor::drop_active() +{ + if (last_beacon.count(pending_map.active_gid) > 0) { + last_beacon.erase(pending_map.active_gid); + } + + pending_map.active_name = ""; + pending_map.active_gid = 0; + pending_map.available = false; + pending_map.active_addr = entity_addr_t(); +} + +void MgrMonitor::drop_standby(uint64_t gid) +{ + pending_map.standbys.erase(gid); + if (last_beacon.count(gid) > 0) { + last_beacon.erase(gid); + } + +} + +bool MgrMonitor::preprocess_command(MonOpRequestRef op) +{ + return false; + +} + +bool MgrMonitor::prepare_command(MonOpRequestRef op) +{ + MMonCommand *m = static_cast<MMonCommand*>(op->get_req()); + + std::stringstream ss; + bufferlist rdata; + + std::map<std::string, cmd_vartype> cmdmap; + if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) { + string rs = ss.str(); + mon->reply_command(op, -EINVAL, rs, rdata, get_last_committed()); + return true; + } + + MonSession *session = m->get_session(); + if (!session) { + mon->reply_command(op, -EACCES, "access denied", rdata, get_last_committed()); + return true; + } + + string prefix; + cmd_getval(g_ceph_context, cmdmap, "prefix", prefix); + + int r = 0; + + if (prefix == "mgr fail") { + string who; + cmd_getval(g_ceph_context, cmdmap, "who", who); + + std::string err; + uint64_t gid = strict_strtol(who.c_str(), 10, &err); + bool changed = false; + if (!err.empty()) { + // Does not parse as a gid, treat it as a name + if (pending_map.active_name == who) { + drop_active(); + changed = true; + } else { + gid = 0; + for (const auto &i : pending_map.standbys) { + if (i.second.name == who) { + gid = i.first; + } + } + if (gid != 0) { + drop_standby(gid); + changed = true; + } else { + ss << "Daemon not found '" << who << "', already failed?"; + } + } + } else { + if (pending_map.active_gid == gid) { + drop_active(); + changed = true; + } else if (pending_map.standbys.count(gid) > 0) { + drop_standby(gid); + changed = true; + } else { + ss << "Daemon not found '" << gid << "', already failed?"; + } + } + + if (changed) { + tick(); + } + } else { + r = -ENOSYS; + } + + dout(4) << __func__ << " done, r=" << r << dendl; + /* Compose response */ + string rs; + getline(ss, rs); + + if (r >= 0) { + // success.. delay reply + wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, r, rs, + get_last_committed() + 1)); + return true; + } else { + // reply immediately + mon->reply_command(op, r, rs, rdata, get_last_committed()); + return false; + } +} + +void MgrMonitor::init() +{ + send_digests(); // To get it to schedule its own event +} + +void MgrMonitor::on_shutdown() +{ + if (digest_callback) { + mon->timer.cancel_event(digest_callback); + } } diff --git a/src/mon/MgrMonitor.h b/src/mon/MgrMonitor.h index 51850893ed1..bd4ac4599bf 100644 --- a/src/mon/MgrMonitor.h +++ b/src/mon/MgrMonitor.h @@ -12,25 +12,53 @@ */ +#include "include/Context.h" #include "MgrMap.h" #include "PaxosService.h" + class MgrMonitor : public PaxosService { MgrMap map; MgrMap pending_map; + std::map<uint64_t, utime_t> last_beacon; + + /** + * If a standby is available, make it active, given that + * there is currently no active daemon. + * + * @return true if a standby was promoted + */ + bool promote_standby(); + void drop_active(); + void drop_standby(uint64_t gid); + + Context *digest_callback; + public: MgrMonitor(Monitor *mn, Paxos *p, const string& service_name) - : PaxosService(mn, p, service_name) + : PaxosService(mn, p, service_name), digest_callback(nullptr) {} + void init(); + void on_shutdown(); + + const MgrMap &get_map() const { return map; } + + bool in_use() const { return map.epoch > 0; } + void create_initial(); void update_from_paxos(bool *need_bootstrap); void create_pending(); void encode_pending(MonitorDBStore::TransactionRef t); + bool preprocess_query(MonOpRequestRef op); bool prepare_update(MonOpRequestRef op); + + bool preprocess_command(MonOpRequestRef op); + bool prepare_command(MonOpRequestRef op); + void encode_full(MonitorDBStore::TransactionRef t) { } bool preprocess_beacon(MonOpRequestRef op); @@ -42,6 +70,8 @@ public: void tick(); + void print_summary(Formatter *f, std::ostream *ss) const; + friend class C_Updated; }; diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index 5dc1eed785c..035fe5d4f43 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -842,3 +842,10 @@ COMMAND("config-key exists " \ "name=key,type=CephString", \ "check for <key>'s existence", "config-key", "r", "cli,rest") COMMAND("config-key list ", "list keys", "config-key", "r", "cli,rest") + + +/* + * mon/MgrMonitor.cc + */ +COMMAND("mgr fail name=who,type=CephString", \ + "treat the named manager daemon as failed", "mgr", "rw", "cli,rest") diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 9ff47ce86ed..67cf14304a8 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -2466,6 +2466,10 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f) f->open_object_section("fsmap"); mdsmon()->fsmap.print_summary(f, NULL); f->close_section(); + + f->open_object_section("mgrmap"); + mgrmon()->get_map().print_summary(f, nullptr); + f->close_section(); f->close_section(); } else { ss << " cluster " << monmap->get_fsid() << "\n"; @@ -2477,6 +2481,11 @@ void Monitor::get_cluster_status(stringstream &ss, Formatter *f) if (mdsmon()->fsmap.any_filesystems()) { ss << " fsmap " << mdsmon()->fsmap << "\n"; } + if (mgrmon()->in_use()) { + ss << " mgr "; + mgrmon()->get_map().print_summary(nullptr, &ss); + ss << "\n"; + } osdmon()->osdmap.print_summary(NULL, ss); pgmon()->pg_map.print_summary(NULL, &ss); @@ -2801,6 +2810,11 @@ void Monitor::handle_command(MonOpRequestRef op) return; } + if (module == "mgr") { + mgrmon()->dispatch(op); + return; + } + if (prefix == "fsid") { if (f) { f->open_object_section("fsid"); |