1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <optional>
#include "rgw_putobj.h"
#include "services/svc_tier_rados.h"
#include "rgw_sal.h"
#include "rgw_obj_manifest.h"
namespace rgw {
namespace sal {
class RadosStore;
}
class Aio;
namespace putobj {
// an object processor with special handling for the first chunk of the head.
// the virtual process_first_chunk() function returns a processor to handle the
// rest of the object
class HeadObjectProcessor : public rgw::sal::ObjectProcessor {
uint64_t head_chunk_size;
// buffer to capture the first chunk of the head object
bufferlist head_data;
// initialized after process_first_chunk() to process everything else
rgw::sal::DataProcessor *processor = nullptr;
uint64_t data_offset = 0; // maximum offset of data written (ie compressed)
protected:
uint64_t get_actual_size() const { return data_offset; }
// process the first chunk of data and return a processor for the rest
virtual int process_first_chunk(bufferlist&& data,
rgw::sal::DataProcessor **processor) = 0;
public:
HeadObjectProcessor(uint64_t head_chunk_size)
: head_chunk_size(head_chunk_size)
{}
void set_head_chunk_size(uint64_t size) { head_chunk_size = size; }
// cache first chunk for process_first_chunk(), then forward everything else
// to the returned processor
int process(bufferlist&& data, uint64_t logical_offset) final override;
};
using RawObjSet = std::set<rgw_raw_obj>;
// a data sink that writes to rados objects and deletes them on cancelation
class RadosWriter : public rgw::sal::DataProcessor {
Aio *const aio;
RGWRados *const store;
const RGWBucketInfo& bucket_info;
RGWObjectCtx& obj_ctx;
rgw_obj head_obj;
rgw_rados_ref stripe_obj; // current stripe object
RawObjSet written; // set of written objects for deletion
const DoutPrefixProvider *dpp;
optional_yield y;
jspan_context& trace;
public:
RadosWriter(Aio *aio, RGWRados *store,
const RGWBucketInfo& bucket_info,
RGWObjectCtx& obj_ctx, const rgw_obj& _head_obj,
const DoutPrefixProvider *dpp, optional_yield y, jspan_context& _trace)
: aio(aio), store(store), bucket_info(bucket_info),
obj_ctx(obj_ctx), head_obj(_head_obj), dpp(dpp), y(y), trace(_trace)
{}
~RadosWriter();
// add alloc hint to osd
void add_write_hint(librados::ObjectWriteOperation& op);
// change the head object
void set_head_obj(const rgw_obj& head);
// change the current stripe object
int set_stripe_obj(const rgw_raw_obj& obj);
// write the data at the given offset of the current stripe object
int process(bufferlist&& data, uint64_t stripe_offset) override;
// write the data as an exclusive create and wait for it to complete
int write_exclusive(const bufferlist& data);
int drain();
// when the operation completes successfully, clear the set of written objects
// so they aren't deleted on destruction
void clear_written() { written.clear(); }
jspan_context& get_trace() { return trace; }
};
// a rados object processor that stripes according to RGWObjManifest
class ManifestObjectProcessor : public HeadObjectProcessor,
public StripeGenerator {
protected:
RGWRados* const store;
RGWBucketInfo& bucket_info;
rgw_placement_rule tail_placement_rule;
rgw_user owner;
RGWObjectCtx& obj_ctx;
rgw_obj head_obj;
RadosWriter writer;
RGWObjManifest manifest;
RGWObjManifest::generator manifest_gen;
ChunkProcessor chunk;
StripeProcessor stripe;
const DoutPrefixProvider *dpp;
// implements StripeGenerator
int next(uint64_t offset, uint64_t *stripe_size) override;
public:
ManifestObjectProcessor(Aio *aio, RGWRados* store,
RGWBucketInfo& bucket_info,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& _obj_ctx,
const rgw_obj& _head_obj,
const DoutPrefixProvider* dpp,
optional_yield y,
jspan_context& trace)
: HeadObjectProcessor(0),
store(store), bucket_info(bucket_info),
owner(owner),
obj_ctx(_obj_ctx), head_obj(_head_obj),
writer(aio, store, bucket_info, obj_ctx, head_obj, dpp, y, trace),
chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) {
if (ptail_placement_rule) {
tail_placement_rule = *ptail_placement_rule;
}
}
void set_owner(const rgw_user& _owner) {
owner = _owner;
}
void set_tail_placement(const rgw_placement_rule& tpr) {
tail_placement_rule = tpr;
}
void set_tail_placement(const rgw_placement_rule&& tpr) {
tail_placement_rule = tpr;
}
};
// a processor that completes with an atomic write to the head object as part of
// a bucket index transaction
class AtomicObjectProcessor : public ManifestObjectProcessor {
const std::optional<uint64_t> olh_epoch;
const std::string unique_tag;
bufferlist first_chunk; // written with the head in complete()
int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
public:
AtomicObjectProcessor(Aio *aio, RGWRados* store,
RGWBucketInfo& bucket_info,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner,
RGWObjectCtx& obj_ctx, const rgw_obj& _head_obj,
std::optional<uint64_t> olh_epoch,
const std::string& unique_tag,
const DoutPrefixProvider *dpp, optional_yield y, jspan_context& trace)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
owner, obj_ctx, _head_obj, dpp, y, trace),
olh_epoch(olh_epoch), unique_tag(unique_tag)
{}
// prepare a trivial manifest
int prepare(optional_yield y) override;
// write the head object atomically in a bucket index transaction
int complete(size_t accounted_size, const std::string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
std::map<std::string, bufferlist>& attrs,
ceph::real_time delete_at,
const char *if_match, const char *if_nomatch,
const std::string *user_data,
rgw_zone_set *zones_trace, bool *canceled,
const req_context& rctx,
uint32_t flags) override;
};
// a processor for multipart parts, which don't require atomic completion. the
// part's head is written with an exclusive create to detect racing uploads of
// the same part/upload id, which are restarted with a random oid prefix
class MultipartObjectProcessor : public ManifestObjectProcessor {
const rgw_obj target_obj; // target multipart object
const std::string upload_id;
const int part_num;
const std::string part_num_str;
RGWMPObj mp;
// write the first chunk and wait on aio->drain() for its completion.
// on EEXIST, retry with random prefix
int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
// prepare the head stripe and manifest
int prepare_head();
public:
MultipartObjectProcessor(Aio *aio, RGWRados* store,
RGWBucketInfo& bucket_info,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& obj_ctx,
const rgw_obj& _head_obj,
const std::string& upload_id, uint64_t part_num,
const std::string& part_num_str,
const DoutPrefixProvider *dpp, optional_yield y, jspan_context& trace)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
owner, obj_ctx, _head_obj, dpp, y, trace),
target_obj(head_obj), upload_id(upload_id),
part_num(part_num), part_num_str(part_num_str),
mp(head_obj.key.name, upload_id)
{}
// prepare a multipart manifest
int prepare(optional_yield y) override;
// write the head object attributes in a bucket index transaction, then
// register the completed part with the multipart meta object
int complete(size_t accounted_size, const std::string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
std::map<std::string, bufferlist>& attrs,
ceph::real_time delete_at,
const char *if_match, const char *if_nomatch,
const std::string *user_data,
rgw_zone_set *zones_trace, bool *canceled,
const req_context& rctx,
uint32_t flags) override;
};
class AppendObjectProcessor : public ManifestObjectProcessor {
uint64_t cur_part_num;
uint64_t position;
uint64_t cur_size;
uint64_t *cur_accounted_size;
std::string cur_etag;
const std::string unique_tag;
RGWObjManifest *cur_manifest;
int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
public:
AppendObjectProcessor(Aio *aio, RGWRados* store,
RGWBucketInfo& bucket_info,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& obj_ctx,
const rgw_obj& _head_obj,
const std::string& unique_tag, uint64_t position,
uint64_t *cur_accounted_size,
const DoutPrefixProvider *dpp, optional_yield y, jspan_context& trace)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
owner, obj_ctx, _head_obj, dpp, y, trace),
position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
unique_tag(unique_tag), cur_manifest(nullptr)
{}
int prepare(optional_yield y) override;
int complete(size_t accounted_size, const std::string& etag,
ceph::real_time *mtime, ceph::real_time set_mtime,
std::map<std::string, bufferlist>& attrs, ceph::real_time delete_at,
const char *if_match, const char *if_nomatch, const std::string *user_data,
rgw_zone_set *zones_trace, bool *canceled,
const req_context& rctx,
uint32_t flags) override;
};
} // namespace putobj
} // namespace rgw
|