// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab /* * Ceph - scalable distributed file system * * Copyright (C) 2004-2006 Sage Weil * * This is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License version 2.1, as published by the Free Software * Foundation. See file COPYING. * */ #ifndef CEPH_MESSAGE_H #define CEPH_MESSAGE_H #include #include #include #include #include #if FMT_VERSION >= 90000 #include #endif #include "include/Context.h" #include "common/RefCountedObj.h" #include "common/ThrottleInterface.h" #include "common/config.h" #include "common/ref.h" #include "common/debug.h" #include "common/zipkin_trace.h" #include "common/tracer.h" #include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert... #include "include/buffer.h" #include "include/types.h" #include "msg/Connection.h" #include "msg/MessageRef.h" #include "msg_types.h" // monitor internal #define MSG_MON_SCRUB 64 #define MSG_MON_ELECTION 65 #define MSG_MON_PAXOS 66 #define MSG_MON_PROBE 67 #define MSG_MON_JOIN 68 #define MSG_MON_SYNC 69 #define MSG_MON_PING 140 /* monitor <-> mon admin tool */ #define MSG_MON_COMMAND 50 #define MSG_MON_COMMAND_ACK 51 #define MSG_LOG 52 #define MSG_LOGACK 53 #define MSG_GETPOOLSTATS 58 #define MSG_GETPOOLSTATSREPLY 59 #define MSG_MON_GLOBAL_ID 60 #define MSG_MON_USED_PENDING_KEYS 141 #define MSG_ROUTE 47 #define MSG_FORWARD 46 #define MSG_PAXOS 40 #define MSG_CONFIG 62 #define MSG_GET_CONFIG 63 #define MSG_KV_DATA 54 #define MSG_MON_GET_PURGED_SNAPS 76 #define MSG_MON_GET_PURGED_SNAPS_REPLY 77 // osd internal #define MSG_OSD_PING 70 #define MSG_OSD_BOOT 71 #define MSG_OSD_FAILURE 72 #define MSG_OSD_ALIVE 73 #define MSG_OSD_MARK_ME_DOWN 74 #define MSG_OSD_FULL 75 #define MSG_OSD_MARK_ME_DEAD 123 // removed right after luminous //#define MSG_OSD_SUBOP 76 //#define MSG_OSD_SUBOPREPLY 77 #define MSG_OSD_PGTEMP 78 #define MSG_OSD_BEACON 79 #define MSG_OSD_PG_NOTIFY 80 #define MSG_OSD_PG_NOTIFY2 130 #define MSG_OSD_PG_QUERY 81 #define MSG_OSD_PG_QUERY2 131 #define MSG_OSD_PG_LOG 83 #define MSG_OSD_PG_REMOVE 84 #define MSG_OSD_PG_INFO 85 #define MSG_OSD_PG_INFO2 132 #define MSG_OSD_PG_TRIM 86 #define MSG_PGSTATS 87 #define MSG_PGSTATSACK 88 #define MSG_OSD_PG_CREATE 89 #define MSG_REMOVE_SNAPS 90 #define MSG_OSD_SCRUB 91 #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING #define MSG_OSD_REP_SCRUB 93 #define MSG_OSD_PG_SCAN 94 #define MSG_OSD_PG_BACKFILL 95 #define MSG_OSD_PG_BACKFILL_REMOVE 96 #define MSG_COMMAND 97 #define MSG_COMMAND_REPLY 98 #define MSG_OSD_BACKFILL_RESERVE 99 #define MSG_OSD_RECOVERY_RESERVE 150 #define MSG_OSD_FORCE_RECOVERY 151 #define MSG_OSD_PG_PUSH 105 #define MSG_OSD_PG_PULL 106 #define MSG_OSD_PG_PUSH_REPLY 107 #define MSG_OSD_EC_WRITE 108 #define MSG_OSD_EC_WRITE_REPLY 109 #define MSG_OSD_EC_READ 110 #define MSG_OSD_EC_READ_REPLY 111 #define MSG_OSD_REPOP 112 #define MSG_OSD_REPOPREPLY 113 #define MSG_OSD_PG_UPDATE_LOG_MISSING 114 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115 #define MSG_OSD_PG_CREATED 116 #define MSG_OSD_REP_SCRUBMAP 117 #define MSG_OSD_PG_RECOVERY_DELETE 118 #define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119 #define MSG_OSD_PG_CREATE2 120 #define MSG_OSD_SCRUB2 121 #define MSG_OSD_PG_READY_TO_MERGE 122 #define MSG_OSD_PG_LEASE 133 #define MSG_OSD_PG_LEASE_ACK 134 // *** MDS *** #define MSG_MDS_BEACON 100 // to monitor #define MSG_MDS_PEER_REQUEST 101 #define MSG_MDS_TABLE_REQUEST 102 #define MSG_MDS_SCRUB 135 // 150 already in use (MSG_OSD_RECOVERY_RESERVE) #define MSG_MDS_RESOLVE 0x200 // 0x2xx are for mdcache of mds #define MSG_MDS_RESOLVEACK 0x201 #define MSG_MDS_CACHEREJOIN 0x202 #define MSG_MDS_DISCOVER 0x203 #define MSG_MDS_DISCOVERREPLY 0x204 #define MSG_MDS_INODEUPDATE 0x205 #define MSG_MDS_DIRUPDATE 0x206 #define MSG_MDS_CACHEEXPIRE 0x207 #define MSG_MDS_DENTRYUNLINK 0x208 #define MSG_MDS_FRAGMENTNOTIFY 0x209 #define MSG_MDS_OFFLOAD_TARGETS 0x20a #define MSG_MDS_DENTRYLINK 0x20c #define MSG_MDS_FINDINO 0x20d #define MSG_MDS_FINDINOREPLY 0x20e #define MSG_MDS_OPENINO 0x20f #define MSG_MDS_OPENINOREPLY 0x210 #define MSG_MDS_SNAPUPDATE 0x211 #define MSG_MDS_FRAGMENTNOTIFYACK 0x212 #define MSG_MDS_LOCK 0x300 // 0x3xx are for locker of mds #define MSG_MDS_INODEFILECAPS 0x301 #define MSG_MDS_EXPORTDIRDISCOVER 0x449 // 0x4xx are for migrator of mds #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450 #define MSG_MDS_EXPORTDIRCANCEL 0x451 #define MSG_MDS_EXPORTDIRPREP 0x452 #define MSG_MDS_EXPORTDIRPREPACK 0x453 #define MSG_MDS_EXPORTDIRWARNING 0x454 #define MSG_MDS_EXPORTDIRWARNINGACK 0x455 #define MSG_MDS_EXPORTDIR 0x456 #define MSG_MDS_EXPORTDIRACK 0x457 #define MSG_MDS_EXPORTDIRNOTIFY 0x458 #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459 #define MSG_MDS_EXPORTDIRFINISH 0x460 #define MSG_MDS_EXPORTCAPS 0x470 #define MSG_MDS_EXPORTCAPSACK 0x471 #define MSG_MDS_GATHERCAPS 0x472 #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer #define MSG_MDS_METRICS 0x501 // for mds metric aggregator #define MSG_MDS_PING 0x502 // for mds pinger #define MSG_MDS_SCRUB_STATS 0x503 // for mds scrub stack #define MSG_MDS_QUIESCE_DB_LISTING 0x505 // quiesce db replication #define MSG_MDS_QUIESCE_DB_ACK 0x506 // quiesce agent ack back to the db // *** generic *** #define MSG_TIMECHECK 0x600 #define MSG_MON_HEALTH 0x601 // *** Message::encode() crcflags bits *** #define MSG_CRC_DATA (1 << 0) #define MSG_CRC_HEADER (1 << 1) #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER) // Special #define MSG_NOP 0x607 #define MSG_MON_HEALTH_CHECKS 0x608 #define MSG_TIMECHECK2 0x609 // *** ceph-mgr <-> OSD/MDS daemons *** #define MSG_MGR_OPEN 0x700 #define MSG_MGR_CONFIGURE 0x701 #define MSG_MGR_REPORT 0x702 // *** ceph-mgr <-> ceph-mon *** #define MSG_MGR_BEACON 0x703 // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons *** #define MSG_MGR_MAP 0x704 // *** ceph-mon(MgrMonitor) -> ceph-mgr #define MSG_MGR_DIGEST 0x705 // *** cephmgr -> ceph-mon #define MSG_MON_MGR_REPORT 0x706 #define MSG_SERVICE_MAP 0x707 #define MSG_MGR_CLOSE 0x708 #define MSG_MGR_COMMAND 0x709 #define MSG_MGR_COMMAND_REPLY 0x70a // *** ceph-mgr <-> MON daemons *** #define MSG_MGR_UPDATE 0x70b // *** nvmeof mon -> gw daemons *** #define MSG_MNVMEOF_GW_MAP 0x800 // *** gw daemons -> nvmeof mon *** #define MSG_MNVMEOF_GW_BEACON 0x801 // ====================================================== // abstract Message class class Message : public RefCountedObject { public: #ifdef WITH_SEASTAR // In crimson, conn is independently maintained outside Message. using ConnectionRef = void*; #else using ConnectionRef = ::ConnectionRef; #endif protected: ceph_msg_header header; // headerelope ceph_msg_footer footer; ceph::buffer::list payload; // "front" unaligned blob ceph::buffer::list middle; // "middle" unaligned blob ceph::buffer::list data; // data payload (page-alignment will be preserved where possible) /* recv_stamp is set when the Messenger starts reading the * Message off the wire */ utime_t recv_stamp; /* dispatch_stamp is set when the Messenger starts calling dispatch() on * its endpoints */ utime_t dispatch_stamp; /* throttle_stamp is the point at which we got throttle */ utime_t throttle_stamp; /* time at which message was fully read */ utime_t recv_complete_stamp; ConnectionRef connection; uint32_t magic = 0; boost::intrusive::list_member_hook<> dispatch_q; public: // zipkin tracing ZTracer::Trace trace; void encode_trace(ceph::buffer::list &bl, uint64_t features) const; void decode_trace(ceph::buffer::list::const_iterator &p, bool create = false); // otel tracing jspan_context otel_trace{false, false}; void encode_otel_trace(ceph::buffer::list &bl, uint64_t features) const; void decode_otel_trace(ceph::buffer::list::const_iterator &p, bool create = false); class CompletionHook : public Context { protected: Message *m; friend class Message; public: explicit CompletionHook(Message *_m) : m(_m) {} virtual void set_message(Message *_m) { m = _m; } }; typedef boost::intrusive::list, &Message::dispatch_q>> Queue; ceph::mono_time queue_start; protected: CompletionHook* completion_hook = nullptr; // owned by Messenger // release our size in bytes back to this throttler when our payload // is adjusted or when we are destroyed. ThrottleInterface *byte_throttler = nullptr; // release a count back to this throttler when we are destroyed ThrottleInterface *msg_throttler = nullptr; // keep track of how big this message was when we reserved space in // the msgr dispatch_throttler, so that we can properly release it // later. this is necessary because messages can enter the dispatch // queue locally (not via read_message()), and those are not // currently throttled. uint64_t dispatch_throttle_size = 0; friend class Messenger; public: Message() { memset(&header, 0, sizeof(header)); memset(&footer, 0, sizeof(footer)); } Message(int t, int version=1, int compat_version=0) { memset(&header, 0, sizeof(header)); header.type = t; header.version = version; header.compat_version = compat_version; memset(&footer, 0, sizeof(footer)); } Message *get() { return static_cast(RefCountedObject::get()); } protected: ~Message() override { if (byte_throttler) byte_throttler->put(payload.length() + middle.length() + data.length()); release_message_throttle(); trace.event("message destructed"); /* call completion hooks (if any) */ if (completion_hook) completion_hook->complete(0); } public: const ConnectionRef& get_connection() const { #ifdef WITH_SEASTAR ceph_abort("In crimson, conn is independently maintained outside Message"); #endif return connection; } void set_connection(ConnectionRef c) { #ifdef WITH_SEASTAR // In crimson, conn is independently maintained outside Message. ceph_assert(c == nullptr); #endif connection = std::move(c); } CompletionHook* get_completion_hook() { return completion_hook; } void set_completion_hook(CompletionHook *hook) { completion_hook = hook; } void set_byte_throttler(ThrottleInterface *t) { byte_throttler = t; } void set_message_throttler(ThrottleInterface *t) { msg_throttler = t; } void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; } uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; } const ceph_msg_header &get_header() const { return header; } ceph_msg_header &get_header() { return header; } void set_header(const ceph_msg_header &e) { header = e; } void set_footer(const ceph_msg_footer &e) { footer = e; } const ceph_msg_footer &get_footer() const { return footer; } ceph_msg_footer &get_footer() { return footer; } void set_src(const entity_name_t& src) { header.src = src; } uint32_t get_magic() const { return magic; } void set_magic(int _magic) { magic = _magic; } /* * If you use get_[data, middle, payload] you shouldn't * use it to change those ceph::buffer::lists unless you KNOW * there is no throttle being used. The other * functions are throttling-aware as appropriate. */ void clear_payload() { if (byte_throttler) { byte_throttler->put(payload.length() + middle.length()); } payload.clear(); middle.clear(); } virtual void clear_buffers() {} void clear_data() { if (byte_throttler) byte_throttler->put(data.length()); data.clear(); clear_buffers(); // let subclass drop buffers as well } void release_message_throttle() { if (msg_throttler) msg_throttler->put(); msg_throttler = nullptr; } bool empty_payload() const { return payload.length() == 0; } ceph::buffer::list& get_payload() { return payload; } const ceph::buffer::list& get_payload() const { return payload; } void set_payload(ceph::buffer::list& bl) { if (byte_throttler) byte_throttler->put(payload.length()); payload = std::move(bl); if (byte_throttler) byte_throttler->take(payload.length()); } void set_middle(ceph::buffer::list& bl) { if (byte_throttler) byte_throttler->put(middle.length()); middle = std::move(bl); if (byte_throttler) byte_throttler->take(middle.length()); } ceph::buffer::list& get_middle() { return middle; } void set_data(const ceph::buffer::list &bl) { if (byte_throttler) byte_throttler->put(data.length()); data.share(bl); if (byte_throttler) byte_throttler->take(data.length()); } const ceph::buffer::list& get_data() const { return data; } ceph::buffer::list& get_data() { return data; } void claim_data(ceph::buffer::list& bl) { if (byte_throttler) byte_throttler->put(data.length()); bl = std::move(data); } uint32_t get_data_len() const { return data.length(); } void set_recv_stamp(utime_t t) { recv_stamp = t; } const utime_t& get_recv_stamp() const { return recv_stamp; } void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; } const utime_t& get_dispatch_stamp() const { return dispatch_stamp; } void set_throttle_stamp(utime_t t) { throttle_stamp = t; } const utime_t& get_throttle_stamp() const { return throttle_stamp; } void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; } const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; } void calc_header_crc() { header.crc = ceph_crc32c(0, (unsigned char*)&header, sizeof(header) - sizeof(header.crc)); } void calc_front_crc() { footer.front_crc = payload.crc32c(0); footer.middle_crc = middle.crc32c(0); } void calc_data_crc() { footer.data_crc = data.crc32c(0); } virtual int get_cost() const { return data.length(); } // type int get_type() const { return header.type; } void set_type(int t) { header.type = t; } uint64_t get_tid() const { return header.tid; } void set_tid(uint64_t t) { header.tid = t; } uint64_t get_seq() const { return header.seq; } void set_seq(uint64_t s) { header.seq = s; } unsigned get_priority() const { return header.priority; } void set_priority(__s16 p) { header.priority = p; } // source/dest entity_inst_t get_source_inst() const { return entity_inst_t(get_source(), get_source_addr()); } entity_name_t get_source() const { return entity_name_t(header.src); } entity_addr_t get_source_addr() const { #ifdef WITH_SEASTAR ceph_abort("In crimson, conn is independently maintained outside Message"); #else if (connection) return connection->get_peer_addr(); #endif return entity_addr_t(); } entity_addrvec_t get_source_addrs() const { #ifdef WITH_SEASTAR ceph_abort("In crimson, conn is independently maintained outside Message"); #else if (connection) return connection->get_peer_addrs(); #endif return entity_addrvec_t(); } // forwarded? entity_inst_t get_orig_source_inst() const { return get_source_inst(); } entity_name_t get_orig_source() const { return get_source(); } entity_addr_t get_orig_source_addr() const { return get_source_addr(); } entity_addrvec_t get_orig_source_addrs() const { return get_source_addrs(); } // virtual bits virtual void decode_payload() = 0; virtual void encode_payload(uint64_t features) = 0; virtual std::string_view get_type_name() const = 0; virtual void print(std::ostream& out) const { out << get_type_name() << " magic: " << magic; } virtual void dump(ceph::Formatter *f) const; void encode(uint64_t features, int crcflags, bool skip_header_crc = false); }; extern Message *decode_message(CephContext *cct, int crcflags, ceph_msg_header& header, ceph_msg_footer& footer, ceph::buffer::list& front, ceph::buffer::list& middle, ceph::buffer::list& data, Message::ConnectionRef conn); inline std::ostream& operator<<(std::ostream& out, const Message& m) { m.print(out); if (m.get_header().version) out << " v" << m.get_header().version; return out; } extern void encode_message(Message *m, uint64_t features, ceph::buffer::list& bl); extern Message *decode_message(CephContext *cct, int crcflags, ceph::buffer::list::const_iterator& bl); /// this is a "safe" version of Message. it does not allow calling get/put /// methods on its derived classes. This is intended to prevent some accidental /// reference leaks by forcing . Instead, you must either cast the derived class to a /// RefCountedObject to do the get/put or detach a temporary reference. class SafeMessage : public Message { public: using Message::Message; bool is_a_client() const { #ifdef WITH_SEASTAR ceph_abort("In crimson, conn is independently maintained outside Message"); #else return get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT; #endif } private: using RefCountedObject::get; using RefCountedObject::put; }; namespace ceph { template ceph::ref_t make_message(Args&&... args) { return {new T(std::forward(args)...), false}; } } namespace crimson { template MURef make_message(Args&&... args) { return {new T(std::forward(args)...), TOPNSPC::common::UniquePtrDeleter{}}; } } namespace fmt { // placed in the fmt namespace due to an ADL bug in g++ < 12 // (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=92944). // Specifically - gcc pre-12 can't handle two templated specializations of // the formatter if in two different namespaces. template M> struct formatter { constexpr auto parse(fmt::format_parse_context& ctx) { return ctx.begin(); } template auto format(const M& m, FormatContext& ctx) const { std::ostringstream oss; m.print(oss); if (auto ver = m.get_header().version; ver) { return fmt::format_to(ctx.out(), "{} v{}", oss.str(), (uint32_t)ver); } else { return fmt::format_to(ctx.out(), "{}", oss.str()); } } }; } // namespace fmt #endif