1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_MESSAGE_H
#define CEPH_MESSAGE_H
#include <concepts>
#include <cstdlib>
#include <ostream>
#include <string_view>
#include <boost/intrusive/list.hpp>
#if FMT_VERSION >= 90000
#include <fmt/ostream.h>
#endif
#include "include/Context.h"
#include "common/RefCountedObj.h"
#include "common/ThrottleInterface.h"
#include "common/config.h"
#include "common/ref.h"
#include "common/debug.h"
#include "common/zipkin_trace.h"
#include "common/tracer.h"
#include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
#include "include/buffer.h"
#include "include/types.h"
#include "msg/Connection.h"
#include "msg/MessageRef.h"
#include "msg_types.h"
// monitor internal
#define MSG_MON_SCRUB 64
#define MSG_MON_ELECTION 65
#define MSG_MON_PAXOS 66
#define MSG_MON_PROBE 67
#define MSG_MON_JOIN 68
#define MSG_MON_SYNC 69
#define MSG_MON_PING 140
/* monitor <-> mon admin tool */
#define MSG_MON_COMMAND 50
#define MSG_MON_COMMAND_ACK 51
#define MSG_LOG 52
#define MSG_LOGACK 53
#define MSG_GETPOOLSTATS 58
#define MSG_GETPOOLSTATSREPLY 59
#define MSG_MON_GLOBAL_ID 60
#define MSG_MON_USED_PENDING_KEYS 141
#define MSG_ROUTE 47
#define MSG_FORWARD 46
#define MSG_PAXOS 40
#define MSG_CONFIG 62
#define MSG_GET_CONFIG 63
#define MSG_KV_DATA 54
#define MSG_MON_GET_PURGED_SNAPS 76
#define MSG_MON_GET_PURGED_SNAPS_REPLY 77
// osd internal
#define MSG_OSD_PING 70
#define MSG_OSD_BOOT 71
#define MSG_OSD_FAILURE 72
#define MSG_OSD_ALIVE 73
#define MSG_OSD_MARK_ME_DOWN 74
#define MSG_OSD_FULL 75
#define MSG_OSD_MARK_ME_DEAD 123
// removed right after luminous
//#define MSG_OSD_SUBOP 76
//#define MSG_OSD_SUBOPREPLY 77
#define MSG_OSD_PGTEMP 78
#define MSG_OSD_BEACON 79
#define MSG_OSD_PG_NOTIFY 80
#define MSG_OSD_PG_NOTIFY2 130
#define MSG_OSD_PG_QUERY 81
#define MSG_OSD_PG_QUERY2 131
#define MSG_OSD_PG_LOG 83
#define MSG_OSD_PG_REMOVE 84
#define MSG_OSD_PG_INFO 85
#define MSG_OSD_PG_INFO2 132
#define MSG_OSD_PG_TRIM 86
#define MSG_PGSTATS 87
#define MSG_PGSTATSACK 88
#define MSG_OSD_PG_CREATE 89
#define MSG_REMOVE_SNAPS 90
#define MSG_OSD_SCRUB 91
#define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
#define MSG_OSD_REP_SCRUB 93
#define MSG_OSD_PG_SCAN 94
#define MSG_OSD_PG_BACKFILL 95
#define MSG_OSD_PG_BACKFILL_REMOVE 96
#define MSG_COMMAND 97
#define MSG_COMMAND_REPLY 98
#define MSG_OSD_BACKFILL_RESERVE 99
#define MSG_OSD_RECOVERY_RESERVE 150
#define MSG_OSD_FORCE_RECOVERY 151
#define MSG_OSD_PG_PUSH 105
#define MSG_OSD_PG_PULL 106
#define MSG_OSD_PG_PUSH_REPLY 107
#define MSG_OSD_EC_WRITE 108
#define MSG_OSD_EC_WRITE_REPLY 109
#define MSG_OSD_EC_READ 110
#define MSG_OSD_EC_READ_REPLY 111
#define MSG_OSD_REPOP 112
#define MSG_OSD_REPOPREPLY 113
#define MSG_OSD_PG_UPDATE_LOG_MISSING 114
#define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
#define MSG_OSD_PG_CREATED 116
#define MSG_OSD_REP_SCRUBMAP 117
#define MSG_OSD_PG_RECOVERY_DELETE 118
#define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
#define MSG_OSD_PG_CREATE2 120
#define MSG_OSD_SCRUB2 121
#define MSG_OSD_PG_READY_TO_MERGE 122
#define MSG_OSD_PG_LEASE 133
#define MSG_OSD_PG_LEASE_ACK 134
// *** MDS ***
#define MSG_MDS_BEACON 100 // to monitor
#define MSG_MDS_PEER_REQUEST 101
#define MSG_MDS_TABLE_REQUEST 102
#define MSG_MDS_SCRUB 135
// 150 already in use (MSG_OSD_RECOVERY_RESERVE)
#define MSG_MDS_RESOLVE 0x200 // 0x2xx are for mdcache of mds
#define MSG_MDS_RESOLVEACK 0x201
#define MSG_MDS_CACHEREJOIN 0x202
#define MSG_MDS_DISCOVER 0x203
#define MSG_MDS_DISCOVERREPLY 0x204
#define MSG_MDS_INODEUPDATE 0x205
#define MSG_MDS_DIRUPDATE 0x206
#define MSG_MDS_CACHEEXPIRE 0x207
#define MSG_MDS_DENTRYUNLINK 0x208
#define MSG_MDS_FRAGMENTNOTIFY 0x209
#define MSG_MDS_OFFLOAD_TARGETS 0x20a
#define MSG_MDS_DENTRYLINK 0x20c
#define MSG_MDS_FINDINO 0x20d
#define MSG_MDS_FINDINOREPLY 0x20e
#define MSG_MDS_OPENINO 0x20f
#define MSG_MDS_OPENINOREPLY 0x210
#define MSG_MDS_SNAPUPDATE 0x211
#define MSG_MDS_FRAGMENTNOTIFYACK 0x212
#define MSG_MDS_LOCK 0x300 // 0x3xx are for locker of mds
#define MSG_MDS_INODEFILECAPS 0x301
#define MSG_MDS_EXPORTDIRDISCOVER 0x449 // 0x4xx are for migrator of mds
#define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
#define MSG_MDS_EXPORTDIRCANCEL 0x451
#define MSG_MDS_EXPORTDIRPREP 0x452
#define MSG_MDS_EXPORTDIRPREPACK 0x453
#define MSG_MDS_EXPORTDIRWARNING 0x454
#define MSG_MDS_EXPORTDIRWARNINGACK 0x455
#define MSG_MDS_EXPORTDIR 0x456
#define MSG_MDS_EXPORTDIRACK 0x457
#define MSG_MDS_EXPORTDIRNOTIFY 0x458
#define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
#define MSG_MDS_EXPORTDIRFINISH 0x460
#define MSG_MDS_EXPORTCAPS 0x470
#define MSG_MDS_EXPORTCAPSACK 0x471
#define MSG_MDS_GATHERCAPS 0x472
#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
#define MSG_MDS_METRICS 0x501 // for mds metric aggregator
#define MSG_MDS_PING 0x502 // for mds pinger
#define MSG_MDS_SCRUB_STATS 0x503 // for mds scrub stack
#define MSG_MDS_QUIESCE_DB_LISTING 0x505 // quiesce db replication
#define MSG_MDS_QUIESCE_DB_ACK 0x506 // quiesce agent ack back to the db
// *** generic ***
#define MSG_TIMECHECK 0x600
#define MSG_MON_HEALTH 0x601
// *** Message::encode() crcflags bits ***
#define MSG_CRC_DATA (1 << 0)
#define MSG_CRC_HEADER (1 << 1)
#define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
// Special
#define MSG_NOP 0x607
#define MSG_MON_HEALTH_CHECKS 0x608
#define MSG_TIMECHECK2 0x609
// *** ceph-mgr <-> OSD/MDS daemons ***
#define MSG_MGR_OPEN 0x700
#define MSG_MGR_CONFIGURE 0x701
#define MSG_MGR_REPORT 0x702
// *** ceph-mgr <-> ceph-mon ***
#define MSG_MGR_BEACON 0x703
// *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
#define MSG_MGR_MAP 0x704
// *** ceph-mon(MgrMonitor) -> ceph-mgr
#define MSG_MGR_DIGEST 0x705
// *** cephmgr -> ceph-mon
#define MSG_MON_MGR_REPORT 0x706
#define MSG_SERVICE_MAP 0x707
#define MSG_MGR_CLOSE 0x708
#define MSG_MGR_COMMAND 0x709
#define MSG_MGR_COMMAND_REPLY 0x70a
// *** ceph-mgr <-> MON daemons ***
#define MSG_MGR_UPDATE 0x70b
// *** nvmeof mon -> gw daemons ***
#define MSG_MNVMEOF_GW_MAP 0x800
// *** gw daemons -> nvmeof mon ***
#define MSG_MNVMEOF_GW_BEACON 0x801
// ======================================================
// abstract Message class
class Message : public RefCountedObject {
public:
#ifdef WITH_SEASTAR
// In crimson, conn is independently maintained outside Message.
using ConnectionRef = void*;
#else
using ConnectionRef = ::ConnectionRef;
#endif
protected:
ceph_msg_header header; // headerelope
ceph_msg_footer footer;
ceph::buffer::list payload; // "front" unaligned blob
ceph::buffer::list middle; // "middle" unaligned blob
ceph::buffer::list data; // data payload (page-alignment will be preserved where possible)
/* recv_stamp is set when the Messenger starts reading the
* Message off the wire */
utime_t recv_stamp;
/* dispatch_stamp is set when the Messenger starts calling dispatch() on
* its endpoints */
utime_t dispatch_stamp;
/* throttle_stamp is the point at which we got throttle */
utime_t throttle_stamp;
/* time at which message was fully read */
utime_t recv_complete_stamp;
ConnectionRef connection;
uint32_t magic = 0;
boost::intrusive::list_member_hook<> dispatch_q;
public:
// zipkin tracing
ZTracer::Trace trace;
void encode_trace(ceph::buffer::list &bl, uint64_t features) const;
void decode_trace(ceph::buffer::list::const_iterator &p, bool create = false);
// otel tracing
jspan_context otel_trace{false, false};
void encode_otel_trace(ceph::buffer::list &bl, uint64_t features) const;
void decode_otel_trace(ceph::buffer::list::const_iterator &p, bool create = false);
class CompletionHook : public Context {
protected:
Message *m;
friend class Message;
public:
explicit CompletionHook(Message *_m) : m(_m) {}
virtual void set_message(Message *_m) { m = _m; }
};
typedef boost::intrusive::list<Message,
boost::intrusive::member_hook<
Message,
boost::intrusive::list_member_hook<>,
&Message::dispatch_q>> Queue;
ceph::mono_time queue_start;
protected:
CompletionHook* completion_hook = nullptr; // owned by Messenger
// release our size in bytes back to this throttler when our payload
// is adjusted or when we are destroyed.
ThrottleInterface *byte_throttler = nullptr;
// release a count back to this throttler when we are destroyed
ThrottleInterface *msg_throttler = nullptr;
// keep track of how big this message was when we reserved space in
// the msgr dispatch_throttler, so that we can properly release it
// later. this is necessary because messages can enter the dispatch
// queue locally (not via read_message()), and those are not
// currently throttled.
uint64_t dispatch_throttle_size = 0;
friend class Messenger;
public:
Message() {
memset(&header, 0, sizeof(header));
memset(&footer, 0, sizeof(footer));
}
Message(int t, int version=1, int compat_version=0) {
memset(&header, 0, sizeof(header));
header.type = t;
header.version = version;
header.compat_version = compat_version;
memset(&footer, 0, sizeof(footer));
}
Message *get() {
return static_cast<Message *>(RefCountedObject::get());
}
protected:
~Message() override {
if (byte_throttler)
byte_throttler->put(payload.length() + middle.length() + data.length());
release_message_throttle();
trace.event("message destructed");
/* call completion hooks (if any) */
if (completion_hook)
completion_hook->complete(0);
}
public:
const ConnectionRef& get_connection() const {
#ifdef WITH_SEASTAR
ceph_abort("In crimson, conn is independently maintained outside Message");
#endif
return connection;
}
void set_connection(ConnectionRef c) {
#ifdef WITH_SEASTAR
// In crimson, conn is independently maintained outside Message.
ceph_assert(c == nullptr);
#endif
connection = std::move(c);
}
CompletionHook* get_completion_hook() { return completion_hook; }
void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
void set_byte_throttler(ThrottleInterface *t) {
byte_throttler = t;
}
void set_message_throttler(ThrottleInterface *t) {
msg_throttler = t;
}
void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
const ceph_msg_header &get_header() const { return header; }
ceph_msg_header &get_header() { return header; }
void set_header(const ceph_msg_header &e) { header = e; }
void set_footer(const ceph_msg_footer &e) { footer = e; }
const ceph_msg_footer &get_footer() const { return footer; }
ceph_msg_footer &get_footer() { return footer; }
void set_src(const entity_name_t& src) { header.src = src; }
uint32_t get_magic() const { return magic; }
void set_magic(int _magic) { magic = _magic; }
/*
* If you use get_[data, middle, payload] you shouldn't
* use it to change those ceph::buffer::lists unless you KNOW
* there is no throttle being used. The other
* functions are throttling-aware as appropriate.
*/
void clear_payload() {
if (byte_throttler) {
byte_throttler->put(payload.length() + middle.length());
}
payload.clear();
middle.clear();
}
virtual void clear_buffers() {}
void clear_data() {
if (byte_throttler)
byte_throttler->put(data.length());
data.clear();
clear_buffers(); // let subclass drop buffers as well
}
void release_message_throttle() {
if (msg_throttler)
msg_throttler->put();
msg_throttler = nullptr;
}
bool empty_payload() const { return payload.length() == 0; }
ceph::buffer::list& get_payload() { return payload; }
const ceph::buffer::list& get_payload() const { return payload; }
void set_payload(ceph::buffer::list& bl) {
if (byte_throttler)
byte_throttler->put(payload.length());
payload = std::move(bl);
if (byte_throttler)
byte_throttler->take(payload.length());
}
void set_middle(ceph::buffer::list& bl) {
if (byte_throttler)
byte_throttler->put(middle.length());
middle = std::move(bl);
if (byte_throttler)
byte_throttler->take(middle.length());
}
ceph::buffer::list& get_middle() { return middle; }
void set_data(const ceph::buffer::list &bl) {
if (byte_throttler)
byte_throttler->put(data.length());
data.share(bl);
if (byte_throttler)
byte_throttler->take(data.length());
}
const ceph::buffer::list& get_data() const { return data; }
ceph::buffer::list& get_data() { return data; }
void claim_data(ceph::buffer::list& bl) {
if (byte_throttler)
byte_throttler->put(data.length());
bl = std::move(data);
}
uint32_t get_data_len() const { return data.length(); }
void set_recv_stamp(utime_t t) { recv_stamp = t; }
const utime_t& get_recv_stamp() const { return recv_stamp; }
void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
const utime_t& get_throttle_stamp() const { return throttle_stamp; }
void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
void calc_header_crc() {
header.crc = ceph_crc32c(0, (unsigned char*)&header,
sizeof(header) - sizeof(header.crc));
}
void calc_front_crc() {
footer.front_crc = payload.crc32c(0);
footer.middle_crc = middle.crc32c(0);
}
void calc_data_crc() {
footer.data_crc = data.crc32c(0);
}
virtual int get_cost() const {
return data.length();
}
// type
int get_type() const { return header.type; }
void set_type(int t) { header.type = t; }
uint64_t get_tid() const { return header.tid; }
void set_tid(uint64_t t) { header.tid = t; }
uint64_t get_seq() const { return header.seq; }
void set_seq(uint64_t s) { header.seq = s; }
unsigned get_priority() const { return header.priority; }
void set_priority(__s16 p) { header.priority = p; }
// source/dest
entity_inst_t get_source_inst() const {
return entity_inst_t(get_source(), get_source_addr());
}
entity_name_t get_source() const {
return entity_name_t(header.src);
}
entity_addr_t get_source_addr() const {
#ifdef WITH_SEASTAR
ceph_abort("In crimson, conn is independently maintained outside Message");
#else
if (connection)
return connection->get_peer_addr();
#endif
return entity_addr_t();
}
entity_addrvec_t get_source_addrs() const {
#ifdef WITH_SEASTAR
ceph_abort("In crimson, conn is independently maintained outside Message");
#else
if (connection)
return connection->get_peer_addrs();
#endif
return entity_addrvec_t();
}
// forwarded?
entity_inst_t get_orig_source_inst() const {
return get_source_inst();
}
entity_name_t get_orig_source() const {
return get_source();
}
entity_addr_t get_orig_source_addr() const {
return get_source_addr();
}
entity_addrvec_t get_orig_source_addrs() const {
return get_source_addrs();
}
// virtual bits
virtual void decode_payload() = 0;
virtual void encode_payload(uint64_t features) = 0;
virtual std::string_view get_type_name() const = 0;
virtual void print(std::ostream& out) const {
out << get_type_name() << " magic: " << magic;
}
virtual void dump(ceph::Formatter *f) const;
void encode(uint64_t features, int crcflags, bool skip_header_crc = false);
};
extern Message *decode_message(CephContext *cct,
int crcflags,
ceph_msg_header& header,
ceph_msg_footer& footer,
ceph::buffer::list& front,
ceph::buffer::list& middle,
ceph::buffer::list& data,
Message::ConnectionRef conn);
inline std::ostream& operator<<(std::ostream& out, const Message& m) {
m.print(out);
if (m.get_header().version)
out << " v" << m.get_header().version;
return out;
}
extern void encode_message(Message *m, uint64_t features, ceph::buffer::list& bl);
extern Message *decode_message(CephContext *cct, int crcflags,
ceph::buffer::list::const_iterator& bl);
/// this is a "safe" version of Message. it does not allow calling get/put
/// methods on its derived classes. This is intended to prevent some accidental
/// reference leaks by forcing . Instead, you must either cast the derived class to a
/// RefCountedObject to do the get/put or detach a temporary reference.
class SafeMessage : public Message {
public:
using Message::Message;
bool is_a_client() const {
#ifdef WITH_SEASTAR
ceph_abort("In crimson, conn is independently maintained outside Message");
#else
return get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT;
#endif
}
private:
using RefCountedObject::get;
using RefCountedObject::put;
};
namespace ceph {
template<class T, typename... Args>
ceph::ref_t<T> make_message(Args&&... args) {
return {new T(std::forward<Args>(args)...), false};
}
}
namespace crimson {
template<class T, typename... Args>
MURef<T> make_message(Args&&... args) {
return {new T(std::forward<Args>(args)...), TOPNSPC::common::UniquePtrDeleter{}};
}
}
namespace fmt {
// placed in the fmt namespace due to an ADL bug in g++ < 12
// (https://gcc.gnu.org/bugzilla/show_bug.cgi?id=92944).
// Specifically - gcc pre-12 can't handle two templated specializations of
// the formatter if in two different namespaces.
template <std::derived_from<Message> M>
struct formatter<M> {
constexpr auto parse(fmt::format_parse_context& ctx) { return ctx.begin(); }
template <typename FormatContext>
auto format(const M& m, FormatContext& ctx) const {
std::ostringstream oss;
m.print(oss);
if (auto ver = m.get_header().version; ver) {
return fmt::format_to(ctx.out(), "{} v{}", oss.str(), (uint32_t)ver);
} else {
return fmt::format_to(ctx.out(), "{}", oss.str());
}
}
};
} // namespace fmt
#endif
|