// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_MONCLIENT_H #define CEPH_MONCLIENT_H #include #include "msg/Messenger.h" #include "MonMap.h" #include "common/Timer.h" #include "common/Finisher.h" #include "common/config.h" class MMonMap; class MMonGetVersionReply; struct MMonSubscribeAck; class MMonCommandAck; struct MAuthReply; class MAuthRotating; class LogClient; struct AuthAuthorizer; class AuthMethodList; class AuthClientHandler; class KeyRing; class RotatingKeyRing; struct MonClientPinger : public Dispatcher { Mutex lock; Cond ping_recvd_cond; string *result; bool done; MonClientPinger(CephContext *cct_, string *res_) : Dispatcher(cct_), lock("MonClientPinger::lock"), result(res_), done(false) { } int wait_for_reply(double timeout = 0.0) { utime_t until = ceph_clock_now(); until += (timeout > 0 ? timeout : cct->_conf->client_mount_timeout); done = false; int ret = 0; while (!done) { ret = ping_recvd_cond.WaitUntil(lock, until); if (ret == ETIMEDOUT) break; } return ret; } bool ms_dispatch(Message *m) override { Mutex::Locker l(lock); if (m->get_type() != CEPH_MSG_PING) return false; bufferlist &payload = m->get_payload(); if (result && payload.length() > 0) { bufferlist::iterator p = payload.begin(); ::decode(*result, p); } done = true; ping_recvd_cond.SignalAll(); m->put(); return true; } bool ms_handle_reset(Connection *con) override { Mutex::Locker l(lock); done = true; ping_recvd_cond.SignalAll(); return true; } void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override { return false; } }; class MonConnection { public: MonConnection(CephContext *cct, ConnectionRef conn, uint64_t global_id); ~MonConnection(); MonConnection(MonConnection&& rhs) = default; MonConnection& operator=(MonConnection&&) = default; MonConnection(const MonConnection& rhs) = delete; MonConnection& operator=(const MonConnection&) = delete; int handle_auth(MAuthReply *m, const EntityName& entity_name, uint32_t want_keys, RotatingKeyRing* keyring); int authenticate(MAuthReply *m); void start(epoch_t epoch, const EntityName& entity_name, const AuthMethodList& auth_supported); bool have_session() const; uint64_t get_global_id() const { return global_id; } ConnectionRef get_con() { return con; } std::unique_ptr& get_auth() { return auth; } private: int _negotiate(MAuthReply *m, const EntityName& entity_name, uint32_t want_keys, RotatingKeyRing* keyring); private: CephContext *cct; enum class State { NONE, NEGOTIATING, AUTHENTICATING, HAVE_SESSION, }; State state = State::NONE; ConnectionRef con; std::unique_ptr auth; uint64_t global_id; }; class MonClient : public Dispatcher { public: MonMap monmap; private: Messenger *messenger; std::unique_ptr active_con; std::map pending_cons; EntityName entity_name; entity_addr_t my_addr; mutable Mutex monc_lock; SafeTimer timer; Finisher finisher; bool initialized; bool no_keyring_disabled_cephx; LogClient *log_client; bool more_log_pending; void send_log(bool flush = false); std::unique_ptr auth_supported; bool ms_dispatch(Message *m) override; bool ms_handle_reset(Connection *con) override; void ms_handle_remote_reset(Connection *con) override {} bool ms_handle_refused(Connection *con) override { return false; } void handle_monmap(MMonMap *m); void handle_auth(MAuthReply *m); // monitor session void tick(); void schedule_tick(); // monclient bool want_monmap; Cond map_cond; private: // authenticate std::unique_ptr auth; uint32_t want_keys = 0; uint64_t global_id = 0; Cond auth_cond; int authenticate_err = 0; list waiting_for_session; utime_t last_rotating_renew_sent; std::unique_ptr session_established_context; bool had_a_connection; double reopen_interval_multiplier; bool _opened() const; bool _hunting() const; void _start_hunting(); void _finish_hunting(); void _finish_auth(int auth_err); void _reopen_session(int rank = -1); MonConnection& _add_conn(unsigned rank, uint64_t global_id); void _add_conns(uint64_t global_id); void _send_mon_message(Message *m); public: void set_entity_name(EntityName name) { entity_name = name; } int _check_auth_tickets(); int _check_auth_rotating(); int wait_auth_rotating(double timeout); int authenticate(double timeout=0.0); /** * Try to flush as many log messages as we can in a single * message. Use this before shutting down to transmit your * last message. */ void flush_log(); // mon subscriptions private: map sub_sent; // my subs, and current versions map sub_new; // unsent new subs utime_t sub_renew_sent, sub_renew_after; void _renew_subs(); void handle_subscribe_ack(MMonSubscribeAck* m); bool _sub_want(const string &what, version_t start, unsigned flags) { auto sub = sub_new.find(what); if (sub != sub_new.end() && sub->second.start == start && sub->second.flags == flags) { return false; } else { sub = sub_sent.find(what); if (sub != sub_sent.end() && sub->second.start == start && sub->second.flags == flags) return false; } sub_new[what].start = start; sub_new[what].flags = flags; return true; } void _sub_got(const string &what, version_t got) { if (sub_new.count(what)) { if (sub_new[what].start <= got) { if (sub_new[what].flags & CEPH_SUBSCRIBE_ONETIME) sub_new.erase(what); else sub_new[what].start = got + 1; } } else if (sub_sent.count(what)) { if (sub_sent[what].start <= got) { if (sub_sent[what].flags & CEPH_SUBSCRIBE_ONETIME) sub_sent.erase(what); else sub_sent[what].start = got + 1; } } } void _sub_unwant(const string &what) { sub_sent.erase(what); sub_new.erase(what); } public: void renew_subs() { Mutex::Locker l(monc_lock); _renew_subs(); } bool sub_want(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); return _sub_want(what, start, flags); } void sub_got(string what, version_t have) { Mutex::Locker l(monc_lock); _sub_got(what, have); } void sub_unwant(string what) { Mutex::Locker l(monc_lock); _sub_unwant(what); } /** * Increase the requested subscription start point. If you do increase * the value, apply the passed-in flags as well; otherwise do nothing. */ bool sub_want_increment(string what, version_t start, unsigned flags) { Mutex::Locker l(monc_lock); map::iterator i = sub_new.find(what); if (i != sub_new.end()) { if (i->second.start >= start) return false; i->second.start = start; i->second.flags = flags; return true; } i = sub_sent.find(what); if (i == sub_sent.end() || i->second.start < start) { ceph_mon_subscribe_item& item = sub_new[what]; item.start = start; item.flags = flags; return true; } return false; } std::unique_ptr keyring; std::unique_ptr rotating_secrets; public: explicit MonClient(CephContext *cct_); MonClient(const MonClient &) = delete; MonClient& operator=(const MonClient &) = delete; ~MonClient() override; int init(); void shutdown(); void set_log_client(LogClient *clog) { log_client = clog; } int build_initial_monmap(); int get_monmap(); int get_monmap_privately(); /** * Ping monitor with ID @p mon_id and record the resulting * reply in @p result_reply. * * @param[in] mon_id Target monitor's ID * @param[out] result_reply reply from mon.ID, if param != NULL * @returns 0 in case of success; < 0 in case of error, * -ETIMEDOUT if monitor didn't reply before timeout * expired (default: conf->client_mount_timeout). */ int ping_monitor(const string &mon_id, string *result_reply); void send_mon_message(Message *m) { Mutex::Locker l(monc_lock); _send_mon_message(m); } /** * If you specify a callback, you should not call * reopen_session() again until it has been triggered. The MonClient * will behave, but the first callback could be triggered after * the session has been killed and the MonClient has started trying * to reconnect to another monitor. */ void reopen_session(Context *cb=NULL) { Mutex::Locker l(monc_lock); if (cb) { session_established_context.reset(cb); } _reopen_session(); } entity_addr_t get_my_addr() const { return my_addr; } const uuid_d& get_fsid() const { return monmap.fsid; } entity_addr_t get_mon_addr(unsigned i) const { Mutex::Locker l(monc_lock); if (i < monmap.size()) return monmap.get_addr(i); return entity_addr_t(); } entity_inst_t get_mon_inst(unsigned i) const { Mutex::Locker l(monc_lock); if (i < monmap.size()) return monmap.get_inst(i); return entity_inst_t(); } int get_num_mon() const { Mutex::Locker l(monc_lock); return monmap.size(); } uint64_t get_global_id() const { Mutex::Locker l(monc_lock); return global_id; } void set_messenger(Messenger *m) { messenger = m; } entity_addr_t get_myaddr() const { return messenger->get_myaddr(); } AuthAuthorizer* build_authorizer(int service_id) const; void set_want_keys(uint32_t want) { want_keys = want; } // admin commands private: uint64_t last_mon_command_tid; struct MonCommand { string target_name; int target_rank; uint64_t tid; vector cmd; bufferlist inbl; bufferlist *poutbl; string *prs; int *prval; Context *onfinish, *ontimeout; explicit MonCommand(uint64_t t) : target_rank(-1), tid(t), poutbl(NULL), prs(NULL), prval(NULL), onfinish(NULL), ontimeout(NULL) {} }; map mon_commands; void _send_command(MonCommand *r); void _resend_mon_commands(); int _cancel_mon_command(uint64_t tid, int r); void _finish_command(MonCommand *r, int ret, string rs); void _finish_auth(); void handle_mon_command_ack(MMonCommandAck *ack); public: void start_mon_command(const vector& cmd, const bufferlist& inbl, bufferlist *outbl, string *outs, Context *onfinish); void start_mon_command(int mon_rank, const vector& cmd, const bufferlist& inbl, bufferlist *outbl, string *outs, Context *onfinish); void start_mon_command(const string &mon_name, ///< mon name, with mon. prefix const vector& cmd, const bufferlist& inbl, bufferlist *outbl, string *outs, Context *onfinish); // version requests public: /** * get latest known version(s) of cluster map * * @param map string name of map (e.g., 'osdmap') * @param newest pointer where newest map version will be stored * @param oldest pointer where oldest map version will be stored * @param onfinish context that will be triggered on completion * @return (via context) 0 on success, -EAGAIN if we need to resubmit our request */ void get_version(string map, version_t *newest, version_t *oldest, Context *onfinish); /** * Run a callback within our lock, with a reference * to the MonMap */ template auto with_monmap(Callback&& cb, Args&&...args) -> typename std::enable_if< std::is_void< decltype(cb(const_cast(monmap), std::forward(args)...))>::value, void>::type { Mutex::Locker l(monc_lock); std::forward(cb)(const_cast(monmap), std::forward(args)...); } private: struct version_req_d { Context *context; version_t *newest, *oldest; version_req_d(Context *con, version_t *n, version_t *o) : context(con),newest(n), oldest(o) {} }; map version_requests; ceph_tid_t version_req_id; void handle_get_version_reply(MMonGetVersionReply* m); }; #endif