summaryrefslogtreecommitdiffstats
path: root/src/rgw/rgw_bucket_logging.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rgw/rgw_bucket_logging.cc')
-rw-r--r--src/rgw/rgw_bucket_logging.cc299
1 files changed, 268 insertions, 31 deletions
diff --git a/src/rgw/rgw_bucket_logging.cc b/src/rgw/rgw_bucket_logging.cc
index d24a53024f1..dd407f26e8c 100644
--- a/src/rgw/rgw_bucket_logging.cc
+++ b/src/rgw/rgw_bucket_logging.cc
@@ -192,7 +192,7 @@ ceph::coarse_real_time time_from_name(const std::string& obj_name, const DoutPre
ldpp_dout(dpp, 1) << "ERROR: logging object name too short: " << obj_name << dendl;
return extracted_time;
}
- const auto time_start_pos = obj_name_length - (time_format_length + UniqueStringLength + 1);
+ const auto time_start_pos = obj_name_length - (time_format_length + UniqueStringLength + 1);
// note: +1 is for the dash between the timestamp and the unique string
std::string time_str = obj_name.substr(time_start_pos, time_format_length);
@@ -206,6 +206,13 @@ ceph::coarse_real_time time_from_name(const std::string& obj_name, const DoutPre
return extracted_time;
}
+std::string full_bucket_name(const std::unique_ptr<rgw::sal::Bucket>& bucket) {
+ if (bucket->get_tenant().empty()) {
+ return bucket->get_name();
+ }
+ return fmt::format("{}:{}", bucket->get_tenant(), bucket->get_name());
+}
+
int new_logging_object(const configuration& conf,
const std::unique_ptr<rgw::sal::Bucket>& bucket,
std::string& obj_name,
@@ -235,23 +242,22 @@ int new_logging_object(const configuration& conf,
conf.target_prefix,
to_string(bucket->get_owner()),
source_region,
- bucket->get_name(),
+ full_bucket_name(bucket),
t,
t,
unique);
}
break;
}
-
int ret = bucket->set_logging_object_name(obj_name, conf.target_prefix, y, dpp, init_obj, objv_tracker);
if (ret == -EEXIST || ret == -ECANCELED) {
if (ret = bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
- conf.target_bucket << "'. ret = " << ret << dendl;
+ conf.target_bucket << "' and prefix '" << conf.target_prefix << "', ret = " << ret << dendl;
return ret;
}
ldpp_dout(dpp, 20) << "INFO: name already set. got name of logging object '" << obj_name << "' of bucket '" <<
- conf.target_bucket << "'" << dendl;
+ conf.target_bucket << "' and prefix '" << conf.target_prefix << "'" << dendl;
return -ECANCELED;
} else if (ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to write name of logging object '" << obj_name << "' of bucket '" <<
@@ -263,6 +269,44 @@ int new_logging_object(const configuration& conf,
return 0;
}
+int commit_logging_object(const configuration& conf,
+ const DoutPrefixProvider *dpp,
+ rgw::sal::Driver* driver,
+ const std::string& tenant_name,
+ optional_yield y) {
+ std::string target_bucket_name;
+ std::string target_tenant_name;
+ auto ret = rgw_parse_url_bucket(conf.target_bucket, tenant_name, target_tenant_name, target_bucket_name);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << conf.target_bucket << "' when commiting logging object, ret = "
+ << ret << dendl;
+ return ret;
+ }
+ const rgw_bucket target_bucket_id(target_tenant_name, target_bucket_name);
+ std::unique_ptr<rgw::sal::Bucket> target_bucket;
+ ret = driver->load_bucket(dpp, target_bucket_id,
+ &target_bucket, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "' when commiting logging object, ret = "
+ << ret << dendl;
+ return ret;
+ }
+ return commit_logging_object(conf, target_bucket, dpp, y);
+}
+
+int commit_logging_object(const configuration& conf,
+ const std::unique_ptr<rgw::sal::Bucket>& target_bucket,
+ const DoutPrefixProvider *dpp,
+ optional_yield y) {
+ std::string obj_name;
+ if (const auto ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
+ target_bucket->get_info().bucket << "'. ret = " << ret << dendl;
+ return ret;
+ }
+ return target_bucket->commit_logging_object(obj_name, y, dpp);
+}
+
int rollover_logging_object(const configuration& conf,
const std::unique_ptr<rgw::sal::Bucket>& bucket,
std::string& obj_name,
@@ -270,12 +314,16 @@ int rollover_logging_object(const configuration& conf,
optional_yield y,
bool must_commit,
RGWObjVersionTracker* objv_tracker) {
- if (conf.target_bucket != bucket->get_name()) {
- ldpp_dout(dpp, 1) << "ERROR: bucket name mismatch: '" << conf.target_bucket << "' != '" << bucket->get_name() << "'" << dendl;
+ std::string target_bucket_name;
+ std::string target_tenant_name;
+ std::ignore = rgw_parse_url_bucket(conf.target_bucket, bucket->get_tenant(), target_tenant_name, target_bucket_name);
+ if (target_bucket_name != bucket->get_name() || target_tenant_name != bucket->get_tenant()) {
+ ldpp_dout(dpp, 1) << "ERROR: bucket name mismatch. conf= '" << conf.target_bucket <<
+ "', bucket= '" << bucket->get_info().bucket << "'" << dendl;
return -EINVAL;
}
const auto old_obj = obj_name;
- const auto ret = new_logging_object(conf, bucket, obj_name, dpp, y, false, objv_tracker);
+ const auto ret = new_logging_object(conf, bucket, obj_name, dpp, y, false, objv_tracker);
if (ret == -ECANCELED) {
ldpp_dout(dpp, 20) << "INFO: rollover already performed for '" << old_obj << "' to bucket '" <<
conf.target_bucket << "'. ret = " << ret << dendl;
@@ -342,14 +390,14 @@ S3 bucket short (ceph) log record
- eTag
};*/
-int log_record(rgw::sal::Driver* driver,
+int log_record(rgw::sal::Driver* driver,
const sal::Object* obj,
- const req_state* s,
- const std::string& op_name,
- const std::string& etag,
+ const req_state* s,
+ const std::string& op_name,
+ const std::string& etag,
size_t size,
const configuration& conf,
- const DoutPrefixProvider *dpp,
+ const DoutPrefixProvider *dpp,
optional_yield y,
bool async_completion,
bool log_source_bucket) {
@@ -357,11 +405,19 @@ int log_record(rgw::sal::Driver* driver,
ldpp_dout(dpp, 1) << "ERROR: only bucket operations are logged" << dendl;
return -EINVAL;
}
+ std::string target_bucket_name;
+ std::string target_tenant_name;
+ auto ret = rgw_parse_url_bucket(conf.target_bucket, s->bucket_tenant, target_tenant_name, target_bucket_name);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << conf.target_bucket << "', ret = " << ret << dendl;
+ return ret;
+ }
+ const rgw_bucket target_bucket_id(target_tenant_name, target_bucket_name);
std::unique_ptr<rgw::sal::Bucket> target_bucket;
- auto ret = driver->load_bucket(dpp, rgw_bucket(s->bucket_tenant, conf.target_bucket),
+ ret = driver->load_bucket(dpp, target_bucket_id,
&target_bucket, y);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << conf.target_bucket << "'. ret = " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "'. ret = " << ret << dendl;
return ret;
}
std::string obj_name;
@@ -382,12 +438,14 @@ int log_record(rgw::sal::Driver* driver,
// try to create the temporary log object for the first time
ret = new_logging_object(conf, target_bucket, obj_name, dpp, y, true, nullptr);
if (ret == 0) {
- ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << conf.target_bucket << "'" << dendl;
+ ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << conf.target_bucket << "' and prefix '" <<
+ conf.target_prefix << "'" << dendl;
} else if (ret == -ECANCELED) {
- ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << conf.target_bucket << "', will be used" << dendl;
+ ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << conf.target_bucket << "' and prefix" <<
+ conf.target_prefix << "'" << dendl;
} else {
ldpp_dout(dpp, 1) << "ERROR: failed to create logging object of bucket '" <<
- conf.target_bucket << "' for the first time. ret = " << ret << dendl;
+ conf.target_bucket << "' and prefix '" << conf.target_prefix << "' for the first time. ret = " << ret << dendl;
return ret;
}
} else {
@@ -420,7 +478,7 @@ int log_record(rgw::sal::Driver* driver,
bucket_name = s->src_bucket_name;
} else {
bucket_owner = to_string( s->bucket->get_owner());
- bucket_name = s->bucket->get_name();
+ bucket_name = full_bucket_name(s->bucket);
}
switch (conf.logging_type) {
@@ -459,7 +517,7 @@ int log_record(rgw::sal::Driver* driver,
case LoggingType::Journal:
record = fmt::format("{} {} [{:%d/%b/%Y:%H:%M:%S %z}] {} {} {} {} {}",
dash_if_empty(to_string(s->bucket->get_owner())),
- dash_if_empty(s->bucket->get_name()),
+ dash_if_empty(full_bucket_name(s->bucket)),
t,
op_name,
dash_if_empty_or_null(obj, obj->get_name()),
@@ -512,12 +570,12 @@ std::string object_name_oid(const rgw::sal::Bucket* bucket, const std::string& p
int log_record(rgw::sal::Driver* driver,
LoggingType type,
const sal::Object* obj,
- const req_state* s,
- const std::string& op_name,
- const std::string& etag,
- size_t size,
- const DoutPrefixProvider *dpp,
- optional_yield y,
+ const req_state* s,
+ const std::string& op_name,
+ const std::string& etag,
+ size_t size,
+ const DoutPrefixProvider *dpp,
+ optional_yield y,
bool async_completion,
bool log_source_bucket) {
if (!s->bucket) {
@@ -534,7 +592,7 @@ int log_record(rgw::sal::Driver* driver,
try {
configuration.enabled = true;
auto bl_iter = iter->second.cbegin();
- decode(configuration, bl_iter);
+ decode(configuration, bl_iter);
if (type != LoggingType::Any && configuration.logging_type != type) {
return 0;
}
@@ -543,20 +601,199 @@ int log_record(rgw::sal::Driver* driver,
return 0;
}
}
- ldpp_dout(dpp, 20) << "INFO: found matching logging configuration of bucket '" << s->bucket->get_name() <<
+ ldpp_dout(dpp, 20) << "INFO: found matching logging configuration of bucket '" << s->bucket->get_info().bucket <<
"' configuration: " << configuration.to_json_str() << dendl;
- if (auto ret = log_record(driver, obj, s, op_name, etag, size, configuration, dpp, y, async_completion, log_source_bucket); ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to perform logging for bucket '" << s->bucket->get_name() <<
+ if (auto ret = log_record(driver, obj, s, op_name, etag, size, configuration, dpp, y, async_completion, log_source_bucket); ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to perform logging for bucket '" << s->bucket->get_info().bucket <<
"'. ret=" << ret << dendl;
return ret;
}
} catch (buffer::error& err) {
- ldpp_dout(dpp, 1) << "ERROR: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING
+ ldpp_dout(dpp, 1) << "ERROR: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING
<< "'. error: " << err.what() << dendl;
return -EINVAL;
}
return 0;
}
+int get_bucket_id(const std::string& bucket_name, const std::string& tenant_name, rgw_bucket& bucket_id) {
+ std::string parsed_bucket_name;
+ std::string parsed_tenant_name;
+ if (const auto ret = rgw_parse_url_bucket(bucket_name, tenant_name, parsed_tenant_name, parsed_bucket_name); ret < 0) {
+ return ret;
+ }
+ bucket_id = rgw_bucket{parsed_tenant_name, parsed_bucket_name};
+ return 0;
+}
+
+int update_bucket_logging_sources(const DoutPrefixProvider* dpp, rgw::sal::Driver* driver, const rgw_bucket& target_bucket_id, const rgw_bucket& src_bucket_id, bool add, optional_yield y) {
+ std::unique_ptr<rgw::sal::Bucket> target_bucket;
+ const auto ret = driver->load_bucket(dpp, target_bucket_id, &target_bucket, y);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to get target bucket '" << target_bucket_id << "', ret = " << ret << dendl;
+ return ret;
+ }
+ return update_bucket_logging_sources(dpp, target_bucket, src_bucket_id, add, y);
+}
+
+int update_bucket_logging_sources(const DoutPrefixProvider* dpp, std::unique_ptr<rgw::sal::Bucket>& bucket, const rgw_bucket& src_bucket_id, bool add, optional_yield y) {
+ return retry_raced_bucket_write(dpp, bucket.get(), [dpp, &bucket, &src_bucket_id, add, y] {
+ auto& attrs = bucket->get_attrs();
+ auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING_SOURCES);
+ if (iter == attrs.end()) {
+ if (!add) {
+ ldpp_dout(dpp, 20) << "INFO: no logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
+ << "' for bucket '" << bucket->get_info().bucket << "', nothing to remove" << dendl;
+ return 0;
+ }
+ source_buckets sources{src_bucket_id};
+ bufferlist bl;
+ ceph::encode(sources, bl);
+ attrs.insert(std::make_pair(RGW_ATTR_BUCKET_LOGGING_SOURCES, std::move(bl)));
+ return bucket->merge_and_store_attrs(dpp, attrs, y);
+ }
+ try {
+ source_buckets sources;
+ ceph::decode(sources, iter->second);
+ if ((add && sources.insert(src_bucket_id).second) ||
+ (!add && sources.erase(src_bucket_id) > 0)) {
+ bufferlist bl;
+ ceph::encode(sources, bl);
+ iter->second = std::move(bl);
+ return bucket->merge_and_store_attrs(dpp, attrs, y);
+ }
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
+ << "' for bucket '" << bucket->get_info().bucket << "', error: " << err.what() << dendl;
+ }
+ ldpp_dout(dpp, 20) << "INFO: logging source '" << src_bucket_id << "' already " <<
+ (add ? "added to" : "removed from") << " bucket '" << bucket->get_info().bucket << "'" << dendl;
+ return 0;
+ }, y);
+}
+
+
+int bucket_deletion_cleanup(const DoutPrefixProvider* dpp,
+ sal::Driver* driver,
+ sal::Bucket* bucket,
+ optional_yield y) {
+ // if the bucket is used a log bucket, we should delete all pending log objects
+ // and also delete the object holding the pending object name
+ auto& attrs = bucket->get_attrs();
+ if (const auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING_SOURCES); iter != attrs.end()) {
+ try {
+ source_buckets sources;
+ ceph::decode(sources, iter->second);
+ for (const auto& source : sources) {
+ std::unique_ptr<rgw::sal::Bucket> src_bucket;
+ if (const auto ret = driver->load_bucket(dpp, source, &src_bucket, y); ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to get logging source bucket '" << source << "' for log bucket '" <<
+ bucket->get_info().bucket << "', ret = " << ret << dendl;
+ continue;
+ }
+ auto& src_attrs = src_bucket->get_attrs();
+ if (const auto iter = src_attrs.find(RGW_ATTR_BUCKET_LOGGING); iter != src_attrs.end()) {
+ configuration conf;
+ try {
+ auto bl_iter = iter->second.cbegin();
+ decode(conf, bl_iter);
+ std::string obj_name;
+ RGWObjVersionTracker objv;
+ if (const auto ret = bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, &objv); ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to get logging object name for log bucket '" << bucket->get_info().bucket <<
+ "', ret = " << ret << dendl;
+ continue;
+ }
+ if (const auto ret = bucket->remove_logging_object(obj_name, y, dpp); ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to delete pending logging object '" << obj_name << "' for log bucket '" <<
+ bucket->get_info().bucket << "', ret = " << ret << dendl;
+ continue;
+ }
+ ldpp_dout(dpp, 20) << "INFO: successfully deleted pending logging object '" << obj_name << "' from deleted log bucket '" <<
+ bucket->get_info().bucket << "'" << dendl;
+ if (const auto ret = bucket->remove_logging_object_name(conf.target_prefix, y, dpp, &objv); ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to delete object holding bucket logging object name for log bucket '" <<
+ bucket->get_info().bucket << "', ret = " << ret << dendl;
+ continue;
+ }
+ ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted log bucket '" <<
+ bucket->get_info().bucket << "'" << dendl;
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING
+ << "' of bucket '" << src_bucket->get_info().bucket << "', error: " << err.what() << dendl;
+ }
+ }
+ }
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES
+ << "' for bucket '" << bucket->get_info().bucket << "', error: " << err.what() << dendl;
+ return -EIO;
+ }
+ }
+
+ return source_bucket_cleanup(dpp, driver, bucket, false, y);
+}
+
+int source_bucket_cleanup(const DoutPrefixProvider* dpp,
+ sal::Driver* driver,
+ sal::Bucket* bucket,
+ bool remove_attr,
+ optional_yield y) {
+ std::optional<configuration> conf;
+ const auto& info = bucket->get_info();
+ if (const auto ret = retry_raced_bucket_write(dpp, bucket, [dpp, bucket, &conf, &info, remove_attr, y] {
+ auto& attrs = bucket->get_attrs();
+ if (auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING); iter != attrs.end()) {
+ try {
+ auto bl_iter = iter->second.cbegin();
+ configuration tmp_conf;
+ tmp_conf.enabled = true;
+ decode(tmp_conf, bl_iter);
+ conf = std::move(tmp_conf);
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to decode existing logging attribute '" << RGW_ATTR_BUCKET_LOGGING
+ << "' of bucket '" << info.bucket << "', error: " << err.what() << dendl;
+ return -EIO;
+ }
+ if (remove_attr) {
+ attrs.erase(iter);
+ return bucket->merge_and_store_attrs(dpp, attrs, y);
+ }
+ }
+ // nothing to remove or no need to remove
+ return 0;
+ }, y); ret < 0) {
+ if (remove_attr) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to remove logging attribute '" << RGW_ATTR_BUCKET_LOGGING << "' from bucket '" <<
+ info.bucket << "', ret = " << ret << dendl;
+ }
+ return ret;
+ }
+ if (!conf) {
+ // no logging attribute found
+ return 0;
+ }
+ if (const auto ret = commit_logging_object(*conf, dpp, driver, info.bucket.tenant, y); ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: could not commit pending logging object of bucket '" <<
+ info.bucket << "', ret = " << ret << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "INFO: successfully committed pending logging object of bucket '" << info.bucket << "'" << dendl;
+ }
+ rgw_bucket target_bucket_id;
+ rgw_bucket src_bucket_id{info.bucket.tenant, info.bucket.name};
+ if (const auto ret = get_bucket_id(conf->target_bucket, info.bucket.tenant, target_bucket_id); ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: failed to parse target bucket '" << conf->target_bucket << "', ret = " << ret << dendl;
+ return 0;
+ }
+ if (const auto ret = update_bucket_logging_sources(dpp, driver, target_bucket_id, src_bucket_id, false, y); ret < 0) {
+ ldpp_dout(dpp, 1) << "WARNING: could not update bucket logging source '" <<
+ info.bucket << "', ret = " << ret << dendl;
+ return 0;
+ }
+ ldpp_dout(dpp, 20) << "INFO: successfully updated bucket logging source '" <<
+ info.bucket << "'"<< dendl;
+ return 0;
+}
+
} // namespace rgw::bucketlogging