1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
#pragma once
#include <string>
#include <cstdint>
#include "rgw_sal_fwd.h"
#include "include/buffer.h"
#include "include/encoding.h"
#include "common/async/yield_context.h"
#include "rgw_s3_filter.h"
class XMLObj;
namespace ceph { class Formatter; }
class DoutPrefixProvider;
struct req_state;
struct RGWObjVersionTracker;
class RGWOp;
namespace rgw::bucketlogging {
/* S3 bucket logging configuration
* based on: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLogging.html
* with ceph extensions
<BucketLoggingStatus xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<LoggingEnabled>
<TargetBucket>string</TargetBucket>
<TargetGrants>
<Grant>
<Grantee>
<DisplayName>string</DisplayName>
<EmailAddress>string</EmailAddress>
<ID>string</ID>
<xsi:type>string</xsi:type>
<URI>string</URI>
</Grantee>
<Permission>string</Permission>
</Grant>
</TargetGrants>
<TargetObjectKeyFormat>
<PartitionedPrefix>
<PartitionDateSource>DeliveryTime|EventTime</PartitionDateSource>
</PartitionedPrefix>
<SimplePrefix>
</SimplePrefix>
</TargetObjectKeyFormat>
<TargetPrefix>string</TargetPrefix>
<LoggingType>Standard|Journal</LoggingType> <!-- Ceph extension -->
<ObjectRollTime>integer</ObjectRollTime> <!-- Ceph extension -->
<RecordsBatchSize>integer</RecordsBatchSize> <!-- Ceph extension -->
<Filter>
<S3Key>
<FilterRule>
<Name>suffix/prefix/regex</Name>
<Value></Value>
</FilterRule>
</S3Key>
</Filter>
</LoggingEnabled>
</BucketLoggingStatus>
*/
enum class KeyFormat {Partitioned, Simple};
enum class LoggingType {Standard, Journal, Any};
enum class PartitionDateSource {DeliveryTime, EventTime};
struct configuration {
bool operator==(const configuration& rhs) const {
return enabled == rhs.enabled &&
target_bucket == rhs.target_bucket &&
obj_key_format == rhs.obj_key_format &&
target_prefix == rhs.target_prefix &&
obj_roll_time == rhs.obj_roll_time &&
logging_type == rhs.logging_type &&
records_batch_size == rhs.records_batch_size &&
date_source == rhs.date_source &&
key_filter == rhs.key_filter;
}
uint32_t default_obj_roll_time = 300;
bool enabled = false;
std::string target_bucket;
KeyFormat obj_key_format = KeyFormat::Simple;
// target object key formats:
// Partitioned: [DestinationPrefix][SourceAccountId]/[SourceRegion]/[SourceBucket]/[YYYY]/[MM]/[DD]/[YYYY]-[MM]-[DD]-[hh]-[mm]-[ss]-[UniqueString]
// Simple: [DestinationPrefix][YYYY]-[MM]-[DD]-[hh]-[mm]-[ss]-[UniqueString]
std::string target_prefix; // a prefix for all log object keys.
// useful when multiple bucket log to the same target
// or when the target bucket is used for other things than logs
uint32_t obj_roll_time; // time in seconds to move object to bucket and start another object
LoggingType logging_type = LoggingType::Standard;
// in case of "Standard: logging type, all bucket operations are logged
// in case of "Journal" logging type only the following operations are logged: PUT, COPY, MULTI/DELETE, Complete MPU
uint32_t records_batch_size = 0; // how many records to batch in memory before writing to the object
// if set to zero, records are written syncronously to the object.
// if obj_roll_time is reached, the batch of records will be written to the object
// regardless of the number of records
PartitionDateSource date_source = PartitionDateSource::DeliveryTime;
// EventTime: use only year, month, and day. The hour, minutes and seconds are set to 00 in the key
// DeliveryTime: the time the log object was created
rgw_s3_key_filter key_filter;
bool decode_xml(XMLObj *obj);
void dump_xml(Formatter *f) const;
void dump(Formatter *f) const; // json
std::string to_json_str() const;
void encode(ceph::bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(target_bucket, bl);
encode(static_cast<int>(obj_key_format), bl);
encode(target_prefix, bl);
encode(obj_roll_time, bl);
encode(static_cast<int>(logging_type), bl);
encode(records_batch_size, bl);
encode(static_cast<int>(date_source), bl);
if (logging_type == LoggingType::Journal) {
encode(key_filter, bl);
}
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
decode(target_bucket, bl);
int type;
decode(type, bl);
obj_key_format = static_cast<KeyFormat>(type);
decode(target_prefix, bl);
decode(obj_roll_time, bl);
decode(type, bl);
logging_type = static_cast<LoggingType>(type);
decode(records_batch_size, bl);
decode(type, bl);
date_source = static_cast<PartitionDateSource>(type);
if (logging_type == LoggingType::Journal) {
decode(key_filter, bl);
}
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(configuration)
using source_buckets = std::set<rgw_bucket>;
constexpr unsigned MAX_BUCKET_LOGGING_BUFFER = 1000;
using bucket_logging_records = std::array<std::string, MAX_BUCKET_LOGGING_BUFFER>;
template <typename Records>
inline std::string to_string(const Records& records) {
std::string str_records;
for (const auto& record : records) {
str_records.append(to_string(record)).append("\n");
}
return str_records;
}
// log a bucket logging record according to the configuration
int log_record(rgw::sal::Driver* driver,
const sal::Object* obj,
const req_state* s,
const std::string& op_name,
const std::string& etag,
size_t size,
const configuration& conf,
const DoutPrefixProvider *dpp,
optional_yield y,
bool async_completion,
bool log_source_bucket);
// commit the pending log objec to the log bucket
// and create a new pending log object
// if "must_commit" is "false" the function will return success even if the pending log object was not committed
int rollover_logging_object(const configuration& conf,
const std::unique_ptr<rgw::sal::Bucket>& bucket,
std::string& obj_name,
const DoutPrefixProvider *dpp,
optional_yield y,
bool must_commit,
RGWObjVersionTracker* objv_tracker);
// commit the pending log object to the log bucket
// use this for cleanup, when new pending object is not needed
// and target bucket is known
int commit_logging_object(const configuration& conf,
const std::unique_ptr<rgw::sal::Bucket>& target_bucket,
const DoutPrefixProvider *dpp,
optional_yield y);
// commit the pending log object to the log bucket
// use this for cleanup, when new pending object is not needed
// and target bucket shoud be loaded based on the configuration
int commit_logging_object(const configuration& conf,
const DoutPrefixProvider *dpp,
rgw::sal::Driver* driver,
const std::string& tenant_name,
optional_yield y);
// return the oid of the object holding the name of the temporary logging object
// bucket - log bucket
// prefix - logging prefix from configuration. should be used when multiple buckets log into the same log bucket
std::string object_name_oid(const rgw::sal::Bucket* bucket, const std::string& prefix);
// log a bucket logging record according to type
// configuration is fetched from bucket attributes
// if no configuration exists, or if type does not match the function return zero (success)
int log_record(rgw::sal::Driver* driver,
LoggingType type,
const sal::Object* obj,
const req_state* s,
const std::string& op_name,
const std::string& etag,
size_t size,
const DoutPrefixProvider *dpp,
optional_yield y,
bool async_completion,
bool log_source_bucket);
// return (by ref) an rgw_bucket object with the bucket name and tenant name
// fails if the bucket name is not in the format: [tenant name:]<bucket name>
int get_bucket_id(const std::string& bucket_name, const std::string& tenant_name, rgw_bucket& bucket_id);
// update (add or remove) a source bucket from the list of source buckets in the target bucket
// use this function when the target bucket is already loaded
int update_bucket_logging_sources(const DoutPrefixProvider* dpp, std::unique_ptr<rgw::sal::Bucket>& bucket,
const rgw_bucket& src_bucket, bool add, optional_yield y);
// update (add or remove) a source bucket from the list of source buckets in the target bucket
// use this function when the target bucket is not known and needs to be loaded
int update_bucket_logging_sources(const DoutPrefixProvider* dpp, rgw::sal::Driver* driver, const rgw_bucket& target_bucket_id,
const rgw_bucket& src_bucket_id, bool add, optional_yield y);
// when source bucket is deleted, all pending log objects should be comitted to the log bucket
// when the target bucket is deleted, all pending log objects should be deleted, as well as the object holding the pending log object name
int bucket_deletion_cleanup(const DoutPrefixProvider* dpp,
sal::Driver* driver,
sal::Bucket* bucket,
optional_yield y);
// if bucket has bucket logging configuration associated with it then:
// if "remove_attr" is true, the bucket logging configuration should be removed from the bucket
// in addition:
// any pending log objects should be comitted to the log bucket
// and the log bucket should be updated to remove the bucket as a source
int source_bucket_cleanup(const DoutPrefixProvider* dpp,
sal::Driver* driver,
sal::Bucket* bucket,
bool remove_attr,
optional_yield y);
} // namespace rgw::bucketlogging
|