1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef MESSAGE_HELPER_H
#define MESSAGE_HELPER_H
#include "msg/msg_types.h"
#include "messages/MDataPing.h"
#if defined(HAVE_XIO)
#include "msg/xio/XioMessenger.h"
#endif
static inline Message* new_ping_monstyle(const char *tag, int mult)
{
Message *m = new MPing();
Formatter *f = new JSONFormatter(true);
string str = "one giant step for ";
f->open_object_section(tag);
for (int ix = 0; ix < mult; ++ix) {
f->dump_string(tag, str);
}
f->close_section();
bufferlist bl;
stringstream ss;
f->flush(ss);
::encode(ss.str(), bl);
m->set_payload(bl);
return m;
}
#if defined(HAVE_XIO)
extern struct xio_mempool *xio_msgr_mpool;
void xio_hook_func(struct xio_reg_mem *mp)
{
xio_mempool_free(mp);
}
static inline Message* new_ping_with_data(const char *tag, uint32_t size)
{
static uint32_t counter;
MDataPing *m = new MDataPing();
m->counter = counter++;
m->tag = tag;
bufferlist bl;
void *p;
struct xio_reg_mem *mp = m->get_mp();
int e = xio_mempool_alloc(xio_msgr_mpool, size, mp);
assert(e == 0);
p = mp->addr;
m->set_rdma_hook(xio_hook_func);
strcpy((char*) p, tag);
uint32_t* t = (uint32_t* ) (((char*) p) + size - 32);
*t = counter;
bl.append(buffer::create_static(size, (char*) p));
m->set_data(bl);
return static_cast<Message*>(m);
}
#endif
static inline Message* new_simple_ping_with_data(const char *tag,
uint32_t size,
uint32_t nfrags)
{
static size_t pagesize = sysconf(_SC_PAGESIZE);
static uint32_t counter;
uint32_t segsize;
int do_page_alignment;
MDataPing *m = new MDataPing();
m->counter = counter++;
m->tag = tag;
bufferlist bl;
void *p;
segsize = (size+nfrags-1)/nfrags;
segsize = (segsize + 7) & ~7;
if (segsize < 32) segsize = 32;
do_page_alignment = segsize >= 1024;
if (do_page_alignment)
segsize = (segsize + pagesize - 1) & ~(pagesize - 1);
m->free_data = true;
for (uint32_t i = 0; i < nfrags; ++i) {
if (do_page_alignment) {
if (posix_memalign(&p, pagesize, segsize))
p = nullptr;
} else {
p = malloc(segsize);
}
if (!p)
throw std::bad_alloc();
strcpy((char*) p, tag);
uint32_t* t = (uint32_t* ) (((char*) p) + segsize - 32);
*t = counter;
t[1] = i;
bl.append(buffer::create_static(segsize, (char*) p));
}
m->set_data(bl);
return static_cast<Message*>(m);
}
static inline Message* new_simple_ping_with_data(const char *tag,
uint32_t size)
{
return new_simple_ping_with_data(tag, size, 1);
}
#endif /* MESSAGE_HELPER_H */
|