diff options
-rw-r--r-- | doc/radosgw/bucket_logging.rst | 13 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_sal_rados.cc | 109 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_sal_rados.h | 23 | ||||
-rw-r--r-- | src/rgw/radosgw-admin/radosgw-admin.cc | 46 | ||||
-rw-r--r-- | src/rgw/rgw_bucket_logging.cc | 299 | ||||
-rw-r--r-- | src/rgw/rgw_bucket_logging.h | 67 | ||||
-rw-r--r-- | src/rgw/rgw_common.h | 1 | ||||
-rw-r--r-- | src/rgw/rgw_rest_bucket_logging.cc | 196 | ||||
-rw-r--r-- | src/rgw/rgw_s3_filter.h | 1 | ||||
-rw-r--r-- | src/rgw/rgw_sal.h | 21 | ||||
-rw-r--r-- | src/rgw/rgw_sal_filter.h | 25 | ||||
-rw-r--r-- | src/rgw/rgw_sal_store.h | 19 | ||||
-rw-r--r-- | src/test/cli/radosgw-admin/help.t | 3 |
13 files changed, 658 insertions, 165 deletions
diff --git a/doc/radosgw/bucket_logging.rst b/doc/radosgw/bucket_logging.rst index cb9f8465d20..f3e790f5705 100644 --- a/doc/radosgw/bucket_logging.rst +++ b/doc/radosgw/bucket_logging.rst @@ -15,6 +15,12 @@ The log bucket can accumulate logs from multiple buckets. It is recommended to c a different "prefix" for each bucket, so that the logs of different buckets will be stored in different objects in the log bucket. +.. note:: + + - The log bucket must be created before enabling logging on a bucket + - The log bucket cannot be the same as the bucket being logged + - The log bucket cannot have logging enabled on it + .. toctree:: :maxdepth: 1 @@ -29,6 +35,7 @@ Adding a log object to the log bucket is done "lazily", meaning, that if no more remain outside of the log bucket even after the configured time has passed. To counter that, you can flush all logging objects on a given source bucket to log them, regardless if enough time passed or if no more records are written to the object. +Flushing will happen automatically when logging is disabled on a bucket, its logging configuration is changed, or the bucket is deleted. Standard ```````` @@ -72,7 +79,7 @@ has the following format: :: - <prefix><bucket owner>/<source region>/<bucket name>/<year>/<month>/<day>/<year-month-day-hour-minute-second>-<16 bytes unique-id> + <prefix><bucket owner>/<source region>/[tenant:]<bucket name>/<year>/<month>/<day>/<year-month-day-hour-minute-second>-<16 bytes unique-id> For example: @@ -90,7 +97,7 @@ Journal minimum amount of data used for journaling bucket changes (this is a Ceph extension). - bucket owner (or dash if empty) - - bucket name (or dash if empty) + - bucket name (or dash if empty). in the format: ``[tenant:]<bucket name>`` - time in the following format: ``[day/month/year:hour:minute:second timezone]`` - object key (or dash if empty) - operation in the following format: ``WEBSITE/REST.<HTTP method>.<resource>`` @@ -111,7 +118,7 @@ Standard based on `AWS Logging Record Format`_. - bucket owner (or dash if empty) - - bucket name (or dash if empty) + - bucket name (or dash if empty). in the format: ``[tenant:]<bucket name>`` - time - remote IP (not supported, always a dash) - user or account (or dash if empty) diff --git a/src/rgw/driver/rados/rgw_sal_rados.cc b/src/rgw/driver/rados/rgw_sal_rados.cc index 4c67d0ee71a..4c05421653b 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.cc +++ b/src/rgw/driver/rados/rgw_sal_rados.cc @@ -429,6 +429,10 @@ int RadosBucket::remove(const DoutPrefixProvider* dpp, ldpp_dout(dpp, -1) << "ERROR: unable to remove notifications from bucket. ret=" << ps_ret << dendl; } + if (ret = rgw::bucketlogging::bucket_deletion_cleanup(dpp, store, this, y); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: could not cleanup bucket logging configuration and pending objects, ret = " << ret << dendl; + } + ret = store->ctl()->bucket->unlink_bucket(rados, info.owner, info.bucket, y, dpp, false); if (ret < 0) { @@ -1024,15 +1028,15 @@ int RadosBucket::remove_topics(RGWObjVersionTracker* objv_tracker, objv_tracker, y); } -int RadosBucket::get_logging_object_name(std::string& obj_name, - const std::string& prefix, - optional_yield y, +int RadosBucket::get_logging_object_name(std::string& obj_name, + const std::string& prefix, + optional_yield y, const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker) { rgw_pool data_pool; const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix); if (!store->getRados()->get_obj_data_pool(get_placement_rule(), rgw_obj{get_key(), obj_name_oid}, &data_pool)) { - ldpp_dout(dpp, 1) << "failed to get data pool for bucket '" << get_name() << + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() << "' when getting logging object name" << dendl; return -EIO; } @@ -1048,23 +1052,23 @@ int RadosBucket::get_logging_object_name(std::string& obj_name, nullptr, nullptr); if (ret < 0) { - ldpp_dout(dpp, 1) << "failed to get logging object name from '" << obj_name_oid << "'. ret = " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to get logging object name from '" << obj_name_oid << "'. ret = " << ret << dendl; return ret; } obj_name = bl.to_str(); return 0; } -int RadosBucket::set_logging_object_name(const std::string& obj_name, - const std::string& prefix, - optional_yield y, - const DoutPrefixProvider *dpp, +int RadosBucket::set_logging_object_name(const std::string& obj_name, + const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, bool new_obj, RGWObjVersionTracker* objv_tracker) { rgw_pool data_pool; const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix); if (!store->getRados()->get_obj_data_pool(get_placement_rule(), rgw_obj{get_key(), obj_name_oid}, &data_pool)) { - ldpp_dout(dpp, 1) << "failed to get data pool for bucket '" << get_name() << + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() << "' when setting logging object name" << dendl; return -EIO; } @@ -1080,28 +1084,65 @@ int RadosBucket::set_logging_object_name(const std::string& obj_name, y, nullptr); if (ret == -EEXIST) { - ldpp_dout(dpp, 20) << "race detected in initializing '" << obj_name_oid << "' with logging object name:'" << obj_name << "'. ret = " << ret << dendl; + ldpp_dout(dpp, 20) << "INFO: race detected in initializing '" << obj_name_oid << "' with logging object name:'" << obj_name << "'. ret = " << ret << dendl; } else if (ret == -ECANCELED) { - ldpp_dout(dpp, 20) << "race detected in updating logging object name '" << obj_name << "' at '" << obj_name_oid << "'. ret = " << ret << dendl; + ldpp_dout(dpp, 20) << "INFO: race detected in updating logging object name '" << obj_name << "' at '" << obj_name_oid << "'. ret = " << ret << dendl; } else if (ret < 0) { - ldpp_dout(dpp, 1) << "failed to set logging object name '" << obj_name << "' at '" << obj_name_oid << "'. ret = " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to set logging object name '" << obj_name << "' at '" << obj_name_oid << "'. ret = " << ret << dendl; } return ret; } +int RadosBucket::remove_logging_object_name(const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, + RGWObjVersionTracker* objv_tracker) { + rgw_pool data_pool; + const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix); + if (!store->getRados()->get_obj_data_pool(get_placement_rule(), rgw_obj{get_key(), obj_name_oid}, &data_pool)) { + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() << + "' when setting logging object name" << dendl; + return -EIO; + } + return rgw_delete_system_obj(dpp, store->svc()->sysobj, + data_pool, + obj_name_oid, + objv_tracker, + y); +} + std::string to_temp_object_name(const rgw::sal::Bucket* bucket, const std::string& obj_name) { return fmt::format("{}__shadow_{}0", bucket->get_bucket_id(), obj_name); } +int RadosBucket::remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) { + rgw_pool data_pool; + const rgw_obj head_obj{get_key(), obj_name}; + const auto placement_rule = get_placement_rule(); + + if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) { + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() << + "' when deleting logging object" << dendl; + return -EIO; + } + + const auto temp_obj_name = to_temp_object_name(this, obj_name); + return rgw_delete_system_obj(dpp, store->svc()->sysobj, + data_pool, + temp_obj_name, + nullptr, + y); +} + int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) { rgw_pool data_pool; const rgw_obj head_obj{get_key(), obj_name}; const auto placement_rule = get_placement_rule(); if (!store->getRados()->get_obj_data_pool(placement_rule, head_obj, &data_pool)) { - ldpp_dout(dpp, 1) << "failed to get data pool for bucket '" << get_name() << + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() << "' when comitting logging object" << dendl; return -EIO; } @@ -1110,7 +1151,6 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie std::map<string, bufferlist> obj_attrs; ceph::real_time mtime; bufferlist bl_data; - // TODO: this is needed only for etag calculation if (const auto ret = rgw_get_system_obj(store->svc()->sysobj, data_pool, temp_obj_name, @@ -1120,10 +1160,13 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie y, dpp, &obj_attrs, - nullptr); ret < 0) { - ldpp_dout(dpp, 1) << "faild to read logging data when comitting to object '" << temp_obj_name + nullptr); ret < 0 && ret != -ENOENT) { + ldpp_dout(dpp, 1) << "ERROR: failed to read logging data when comitting object '" << temp_obj_name << ". error: " << ret << dendl; return ret; + } else if (ret == -ENOENT) { + ldpp_dout(dpp, 1) << "WARNING: temporary logging object '" << temp_obj_name << "' does not exists" << dendl; + return 0; } uint64_t size = bl_data.length(); @@ -1137,13 +1180,13 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie nullptr, // no special placment for tail get_key(), head_obj); ret < 0) { - ldpp_dout(dpp, 1) << "failed to create manifest when comitting logging object. error: " << + ldpp_dout(dpp, 1) << "ERROR: failed to create manifest when comitting logging object. error: " << ret << dendl; return ret; } if (const auto ret = manifest_gen.create_next(size); ret < 0) { - ldpp_dout(dpp, 1) << "failed to add object to manifest when comitting logging object. error: " << + ldpp_dout(dpp, 1) << "ERROR: failed to add object to manifest when comitting logging object. error: " << ret << dendl; return ret; } @@ -1151,7 +1194,7 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie if (const auto expected_temp_obj = manifest_gen.get_cur_obj(store->getRados()); temp_obj_name != expected_temp_obj.oid) { // TODO: cleanup temporary object, commit would never succeed - ldpp_dout(dpp, 1) << "temporary logging object name mismatch: '" << + ldpp_dout(dpp, 1) << "ERROR: temporary logging object name mismatch: '" << temp_obj_name << "' != '" << expected_temp_obj.oid << "'" << dendl; return -EINVAL; } @@ -1182,11 +1225,11 @@ int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yie const req_context rctx{dpp, y, nullptr}; jspan_context trace{false, false}; if (const auto ret = head_obj_wop.write_meta(0, size, obj_attrs, rctx, trace); ret < 0) { - ldpp_dout(dpp, 1) << "failed to commit logging object '" << temp_obj_name << - "' to bucket id '" << get_bucket_id() <<"'. error: " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << temp_obj_name << + "' to bucket id '" << get_info().bucket <<"'. error: " << ret << dendl; return ret; } - ldpp_dout(dpp, 20) << "committed logging object '" << temp_obj_name << + ldpp_dout(dpp, 20) << "INFO: committed logging object '" << temp_obj_name << "' with size of " << size << " bytes, to bucket '" << get_key() << "' as '" << obj_name << "'" << dendl; return 0; @@ -1204,30 +1247,30 @@ void bucket_logging_completion(rados_completion_t completion, void* args) { auto* aio_comp = reinterpret_cast<librados::AioCompletionImpl*>(completion); std::unique_ptr<BucketLoggingCompleteArg> logging_args(reinterpret_cast<BucketLoggingCompleteArg*>(args)); if (aio_comp->get_return_value() < 0) { - ldout(logging_args->cct, 1) << "failed to complete append to logging object '" << logging_args->obj_name << + ldout(logging_args->cct, 1) << "ERROR: failed to complete append to logging object '" << logging_args->obj_name << "'. ret = " << aio_comp->get_return_value() << dendl; } else { - ldout(logging_args->cct, 20) << "wrote " << logging_args->size << " bytes to logging object '" << + ldout(logging_args->cct, 20) << "INFO: wrote " << logging_args->size << " bytes to logging object '" << logging_args->obj_name << "'" << dendl; } } -int RadosBucket::write_logging_object(const std::string& obj_name, - const std::string& record, - optional_yield y, +int RadosBucket::write_logging_object(const std::string& obj_name, + const std::string& record, + optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) { const auto temp_obj_name = to_temp_object_name(this, obj_name); rgw_pool data_pool; rgw_obj obj{get_key(), obj_name}; if (!store->getRados()->get_obj_data_pool(get_placement_rule(), obj, &data_pool)) { - ldpp_dout(dpp, 1) << "failed to get data pool for bucket '" << get_name() << + ldpp_dout(dpp, 1) << "ERROR: failed to get data pool for bucket '" << get_name() << "' when writing logging object" << dendl; return -EIO; } librados::IoCtx io_ctx; if (const auto ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx); ret < 0) { - ldpp_dout(dpp, 1) << "failed to get IO context for logging object from data pool:" << data_pool.to_str() << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to get IO context for logging object from data pool:" << data_pool.to_str() << dendl; return -EIO; } bufferlist bl; @@ -1242,7 +1285,7 @@ int RadosBucket::write_logging_object(const std::string& obj_name, auto arg = std::make_unique<BucketLoggingCompleteArg>(temp_obj_name, record.length(), store->ctx()); completion->set_complete_callback(arg.get(), bucket_logging_completion); if (const auto ret = io_ctx.aio_operate(temp_obj_name, completion.get(), &op); ret < 0) { - ldpp_dout(dpp, 1) << "failed to append to logging object '" << temp_obj_name << + ldpp_dout(dpp, 1) << "ERROR: failed to append to logging object '" << temp_obj_name << "'. ret = " << ret << dendl; return ret; } @@ -1251,11 +1294,11 @@ int RadosBucket::write_logging_object(const std::string& obj_name, return 0; } if (const auto ret = rgw_rados_operate(dpp, io_ctx, temp_obj_name, &op, y); ret < 0) { - ldpp_dout(dpp, 1) << "failed to append to logging object '" << temp_obj_name << + ldpp_dout(dpp, 1) << "ERROR: failed to append to logging object '" << temp_obj_name << "'. ret = " << ret << dendl; return ret; } - ldpp_dout(dpp, 20) << "wrote " << record.length() << " bytes to logging object '" << + ldpp_dout(dpp, 20) << "INFO: wrote " << record.length() << " bytes to logging object '" << temp_obj_name << "'" << dendl; return 0; } diff --git a/src/rgw/driver/rados/rgw_sal_rados.h b/src/rgw/driver/rados/rgw_sal_rados.h index 85ea247e345..e65c3c0050e 100644 --- a/src/rgw/driver/rados/rgw_sal_rados.h +++ b/src/rgw/driver/rados/rgw_sal_rados.h @@ -780,18 +780,23 @@ class RadosBucket : public StoreBucket { optional_yield y, const DoutPrefixProvider *dpp) override; int remove_topics(RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override; - int get_logging_object_name(std::string& obj_name, - const std::string& prefix, - optional_yield y, - const DoutPrefixProvider *dpp, + int get_logging_object_name(std::string& obj_name, + const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, + RGWObjVersionTracker* objv_tracker) override; + int set_logging_object_name(const std::string& obj_name, + const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, + bool new_obj, RGWObjVersionTracker* objv_tracker) override; - int set_logging_object_name(const std::string& obj_name, - const std::string& prefix, - optional_yield y, - const DoutPrefixProvider *dpp, - bool new_obj, + int remove_logging_object_name(const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker) override; int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override; + int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override; int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override; private: diff --git a/src/rgw/radosgw-admin/radosgw-admin.cc b/src/rgw/radosgw-admin/radosgw-admin.cc index 182e42b8e31..47b68d3f902 100644 --- a/src/rgw/radosgw-admin/radosgw-admin.cc +++ b/src/rgw/radosgw-admin/radosgw-admin.cc @@ -171,7 +171,8 @@ void usage() cout << " bucket sync disable disable bucket sync\n"; cout << " bucket sync enable enable bucket sync\n"; cout << " bucket radoslist list rados objects backing bucket's objects\n"; - cout << " bucket logging flush flush pending log records object of source bucket to the log bucket to bucket\n"; + cout << " bucket logging flush flush pending log records object of source bucket to the log bucket\n"; + cout << " bucket logging info get info on bucket logging configuration on source bucket or list of sources in log bucket\n"; cout << " bi get retrieve bucket index object entries\n"; cout << " bi put store bucket index object entries\n"; cout << " bi list list raw bucket index entries\n"; @@ -701,6 +702,7 @@ enum class OPT { BUCKET_OBJECT_SHARD, BUCKET_RESYNC_ENCRYPTED_MULTIPART, BUCKET_LOGGING_FLUSH, + BUCKET_LOGGING_INFO, POLICY, LOG_LIST, LOG_SHOW, @@ -940,6 +942,7 @@ static SimpleCmd::Commands all_cmds = { { "bucket object shard", OPT::BUCKET_OBJECT_SHARD }, { "bucket resync encrypted multipart", OPT::BUCKET_RESYNC_ENCRYPTED_MULTIPART }, { "bucket logging flush", OPT::BUCKET_LOGGING_FLUSH }, + { "bucket logging info", OPT::BUCKET_LOGGING_INFO }, { "policy", OPT::POLICY }, { "log list", OPT::LOG_LIST }, { "log show", OPT::LOG_SHOW }, @@ -7750,6 +7753,47 @@ int main(int argc, const char **argv) return 0; } + if (opt_cmd == OPT::BUCKET_LOGGING_INFO) { + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + int ret = init_bucket(tenant, bucket_name, bucket_id, &bucket); + if (ret < 0) { + return -ret; + } + const auto& bucket_attrs = bucket->get_attrs(); + auto iter = bucket_attrs.find(RGW_ATTR_BUCKET_LOGGING); + if (iter != bucket_attrs.end()) { + rgw::bucketlogging::configuration configuration; + try { + configuration.enabled = true; + decode(configuration, iter->second); + } catch (buffer::error& err) { + cerr << "ERROR: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING + << "'. error: " << err.what() << std::endl; + return EINVAL; + } + encode_json("logging", configuration, formatter.get()); + formatter->flush(cout); + } + iter = bucket_attrs.find(RGW_ATTR_BUCKET_LOGGING_SOURCES); + if (iter != bucket_attrs.end()) { + rgw::bucketlogging::source_buckets sources; + try { + decode(sources, iter->second); + } catch (buffer::error& err) { + cerr << "ERROR: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES + << "'. error: " << err.what() << std::endl; + return EINVAL; + } + encode_json("logging_sources", sources, formatter.get()); + formatter->flush(cout); + } + + return 0; + } + if (opt_cmd == OPT::LOG_LIST) { // filter by date? if (date.size() && date.size() != 10) { diff --git a/src/rgw/rgw_bucket_logging.cc b/src/rgw/rgw_bucket_logging.cc index d24a53024f1..dd407f26e8c 100644 --- a/src/rgw/rgw_bucket_logging.cc +++ b/src/rgw/rgw_bucket_logging.cc @@ -192,7 +192,7 @@ ceph::coarse_real_time time_from_name(const std::string& obj_name, const DoutPre ldpp_dout(dpp, 1) << "ERROR: logging object name too short: " << obj_name << dendl; return extracted_time; } - const auto time_start_pos = obj_name_length - (time_format_length + UniqueStringLength + 1); + const auto time_start_pos = obj_name_length - (time_format_length + UniqueStringLength + 1); // note: +1 is for the dash between the timestamp and the unique string std::string time_str = obj_name.substr(time_start_pos, time_format_length); @@ -206,6 +206,13 @@ ceph::coarse_real_time time_from_name(const std::string& obj_name, const DoutPre return extracted_time; } +std::string full_bucket_name(const std::unique_ptr<rgw::sal::Bucket>& bucket) { + if (bucket->get_tenant().empty()) { + return bucket->get_name(); + } + return fmt::format("{}:{}", bucket->get_tenant(), bucket->get_name()); +} + int new_logging_object(const configuration& conf, const std::unique_ptr<rgw::sal::Bucket>& bucket, std::string& obj_name, @@ -235,23 +242,22 @@ int new_logging_object(const configuration& conf, conf.target_prefix, to_string(bucket->get_owner()), source_region, - bucket->get_name(), + full_bucket_name(bucket), t, t, unique); } break; } - int ret = bucket->set_logging_object_name(obj_name, conf.target_prefix, y, dpp, init_obj, objv_tracker); if (ret == -EEXIST || ret == -ECANCELED) { if (ret = bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" << - conf.target_bucket << "'. ret = " << ret << dendl; + conf.target_bucket << "' and prefix '" << conf.target_prefix << "', ret = " << ret << dendl; return ret; } ldpp_dout(dpp, 20) << "INFO: name already set. got name of logging object '" << obj_name << "' of bucket '" << - conf.target_bucket << "'" << dendl; + conf.target_bucket << "' and prefix '" << conf.target_prefix << "'" << dendl; return -ECANCELED; } else if (ret < 0) { ldpp_dout(dpp, 1) << "ERROR: failed to write name of logging object '" << obj_name << "' of bucket '" << @@ -263,6 +269,44 @@ int new_logging_object(const configuration& conf, return 0; } +int commit_logging_object(const configuration& conf, + const DoutPrefixProvider *dpp, + rgw::sal::Driver* driver, + const std::string& tenant_name, + optional_yield y) { + std::string target_bucket_name; + std::string target_tenant_name; + auto ret = rgw_parse_url_bucket(conf.target_bucket, tenant_name, target_tenant_name, target_bucket_name); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << conf.target_bucket << "' when commiting logging object, ret = " + << ret << dendl; + return ret; + } + const rgw_bucket target_bucket_id(target_tenant_name, target_bucket_name); + std::unique_ptr<rgw::sal::Bucket> target_bucket; + ret = driver->load_bucket(dpp, target_bucket_id, + &target_bucket, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "' when commiting logging object, ret = " + << ret << dendl; + return ret; + } + return commit_logging_object(conf, target_bucket, dpp, y); +} + +int commit_logging_object(const configuration& conf, + const std::unique_ptr<rgw::sal::Bucket>& target_bucket, + const DoutPrefixProvider *dpp, + optional_yield y) { + std::string obj_name; + if (const auto ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" << + target_bucket->get_info().bucket << "'. ret = " << ret << dendl; + return ret; + } + return target_bucket->commit_logging_object(obj_name, y, dpp); +} + int rollover_logging_object(const configuration& conf, const std::unique_ptr<rgw::sal::Bucket>& bucket, std::string& obj_name, @@ -270,12 +314,16 @@ int rollover_logging_object(const configuration& conf, optional_yield y, bool must_commit, RGWObjVersionTracker* objv_tracker) { - if (conf.target_bucket != bucket->get_name()) { - ldpp_dout(dpp, 1) << "ERROR: bucket name mismatch: '" << conf.target_bucket << "' != '" << bucket->get_name() << "'" << dendl; + std::string target_bucket_name; + std::string target_tenant_name; + std::ignore = rgw_parse_url_bucket(conf.target_bucket, bucket->get_tenant(), target_tenant_name, target_bucket_name); + if (target_bucket_name != bucket->get_name() || target_tenant_name != bucket->get_tenant()) { + ldpp_dout(dpp, 1) << "ERROR: bucket name mismatch. conf= '" << conf.target_bucket << + "', bucket= '" << bucket->get_info().bucket << "'" << dendl; return -EINVAL; } const auto old_obj = obj_name; - const auto ret = new_logging_object(conf, bucket, obj_name, dpp, y, false, objv_tracker); + const auto ret = new_logging_object(conf, bucket, obj_name, dpp, y, false, objv_tracker); if (ret == -ECANCELED) { ldpp_dout(dpp, 20) << "INFO: rollover already performed for '" << old_obj << "' to bucket '" << conf.target_bucket << "'. ret = " << ret << dendl; @@ -342,14 +390,14 @@ S3 bucket short (ceph) log record - eTag };*/ -int log_record(rgw::sal::Driver* driver, +int log_record(rgw::sal::Driver* driver, const sal::Object* obj, - const req_state* s, - const std::string& op_name, - const std::string& etag, + const req_state* s, + const std::string& op_name, + const std::string& etag, size_t size, const configuration& conf, - const DoutPrefixProvider *dpp, + const DoutPrefixProvider *dpp, optional_yield y, bool async_completion, bool log_source_bucket) { @@ -357,11 +405,19 @@ int log_record(rgw::sal::Driver* driver, ldpp_dout(dpp, 1) << "ERROR: only bucket operations are logged" << dendl; return -EINVAL; } + std::string target_bucket_name; + std::string target_tenant_name; + auto ret = rgw_parse_url_bucket(conf.target_bucket, s->bucket_tenant, target_tenant_name, target_bucket_name); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to parse target bucket '" << conf.target_bucket << "', ret = " << ret << dendl; + return ret; + } + const rgw_bucket target_bucket_id(target_tenant_name, target_bucket_name); std::unique_ptr<rgw::sal::Bucket> target_bucket; - auto ret = driver->load_bucket(dpp, rgw_bucket(s->bucket_tenant, conf.target_bucket), + ret = driver->load_bucket(dpp, target_bucket_id, &target_bucket, y); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << conf.target_bucket << "'. ret = " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to get target logging bucket '" << target_bucket_id << "'. ret = " << ret << dendl; return ret; } std::string obj_name; @@ -382,12 +438,14 @@ int log_record(rgw::sal::Driver* driver, // try to create the temporary log object for the first time ret = new_logging_object(conf, target_bucket, obj_name, dpp, y, true, nullptr); if (ret == 0) { - ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << conf.target_bucket << "'" << dendl; + ldpp_dout(dpp, 20) << "INFO: first time logging for bucket '" << conf.target_bucket << "' and prefix '" << + conf.target_prefix << "'" << dendl; } else if (ret == -ECANCELED) { - ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << conf.target_bucket << "', will be used" << dendl; + ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' already exists for bucket '" << conf.target_bucket << "' and prefix" << + conf.target_prefix << "'" << dendl; } else { ldpp_dout(dpp, 1) << "ERROR: failed to create logging object of bucket '" << - conf.target_bucket << "' for the first time. ret = " << ret << dendl; + conf.target_bucket << "' and prefix '" << conf.target_prefix << "' for the first time. ret = " << ret << dendl; return ret; } } else { @@ -420,7 +478,7 @@ int log_record(rgw::sal::Driver* driver, bucket_name = s->src_bucket_name; } else { bucket_owner = to_string( s->bucket->get_owner()); - bucket_name = s->bucket->get_name(); + bucket_name = full_bucket_name(s->bucket); } switch (conf.logging_type) { @@ -459,7 +517,7 @@ int log_record(rgw::sal::Driver* driver, case LoggingType::Journal: record = fmt::format("{} {} [{:%d/%b/%Y:%H:%M:%S %z}] {} {} {} {} {}", dash_if_empty(to_string(s->bucket->get_owner())), - dash_if_empty(s->bucket->get_name()), + dash_if_empty(full_bucket_name(s->bucket)), t, op_name, dash_if_empty_or_null(obj, obj->get_name()), @@ -512,12 +570,12 @@ std::string object_name_oid(const rgw::sal::Bucket* bucket, const std::string& p int log_record(rgw::sal::Driver* driver, LoggingType type, const sal::Object* obj, - const req_state* s, - const std::string& op_name, - const std::string& etag, - size_t size, - const DoutPrefixProvider *dpp, - optional_yield y, + const req_state* s, + const std::string& op_name, + const std::string& etag, + size_t size, + const DoutPrefixProvider *dpp, + optional_yield y, bool async_completion, bool log_source_bucket) { if (!s->bucket) { @@ -534,7 +592,7 @@ int log_record(rgw::sal::Driver* driver, try { configuration.enabled = true; auto bl_iter = iter->second.cbegin(); - decode(configuration, bl_iter); + decode(configuration, bl_iter); if (type != LoggingType::Any && configuration.logging_type != type) { return 0; } @@ -543,20 +601,199 @@ int log_record(rgw::sal::Driver* driver, return 0; } } - ldpp_dout(dpp, 20) << "INFO: found matching logging configuration of bucket '" << s->bucket->get_name() << + ldpp_dout(dpp, 20) << "INFO: found matching logging configuration of bucket '" << s->bucket->get_info().bucket << "' configuration: " << configuration.to_json_str() << dendl; - if (auto ret = log_record(driver, obj, s, op_name, etag, size, configuration, dpp, y, async_completion, log_source_bucket); ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to perform logging for bucket '" << s->bucket->get_name() << + if (auto ret = log_record(driver, obj, s, op_name, etag, size, configuration, dpp, y, async_completion, log_source_bucket); ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: failed to perform logging for bucket '" << s->bucket->get_info().bucket << "'. ret=" << ret << dendl; return ret; } } catch (buffer::error& err) { - ldpp_dout(dpp, 1) << "ERROR: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING + ldpp_dout(dpp, 1) << "ERROR: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING << "'. error: " << err.what() << dendl; return -EINVAL; } return 0; } +int get_bucket_id(const std::string& bucket_name, const std::string& tenant_name, rgw_bucket& bucket_id) { + std::string parsed_bucket_name; + std::string parsed_tenant_name; + if (const auto ret = rgw_parse_url_bucket(bucket_name, tenant_name, parsed_tenant_name, parsed_bucket_name); ret < 0) { + return ret; + } + bucket_id = rgw_bucket{parsed_tenant_name, parsed_bucket_name}; + return 0; +} + +int update_bucket_logging_sources(const DoutPrefixProvider* dpp, rgw::sal::Driver* driver, const rgw_bucket& target_bucket_id, const rgw_bucket& src_bucket_id, bool add, optional_yield y) { + std::unique_ptr<rgw::sal::Bucket> target_bucket; + const auto ret = driver->load_bucket(dpp, target_bucket_id, &target_bucket, y); + if (ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: failed to get target bucket '" << target_bucket_id << "', ret = " << ret << dendl; + return ret; + } + return update_bucket_logging_sources(dpp, target_bucket, src_bucket_id, add, y); +} + +int update_bucket_logging_sources(const DoutPrefixProvider* dpp, std::unique_ptr<rgw::sal::Bucket>& bucket, const rgw_bucket& src_bucket_id, bool add, optional_yield y) { + return retry_raced_bucket_write(dpp, bucket.get(), [dpp, &bucket, &src_bucket_id, add, y] { + auto& attrs = bucket->get_attrs(); + auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING_SOURCES); + if (iter == attrs.end()) { + if (!add) { + ldpp_dout(dpp, 20) << "INFO: no logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES + << "' for bucket '" << bucket->get_info().bucket << "', nothing to remove" << dendl; + return 0; + } + source_buckets sources{src_bucket_id}; + bufferlist bl; + ceph::encode(sources, bl); + attrs.insert(std::make_pair(RGW_ATTR_BUCKET_LOGGING_SOURCES, std::move(bl))); + return bucket->merge_and_store_attrs(dpp, attrs, y); + } + try { + source_buckets sources; + ceph::decode(sources, iter->second); + if ((add && sources.insert(src_bucket_id).second) || + (!add && sources.erase(src_bucket_id) > 0)) { + bufferlist bl; + ceph::encode(sources, bl); + iter->second = std::move(bl); + return bucket->merge_and_store_attrs(dpp, attrs, y); + } + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES + << "' for bucket '" << bucket->get_info().bucket << "', error: " << err.what() << dendl; + } + ldpp_dout(dpp, 20) << "INFO: logging source '" << src_bucket_id << "' already " << + (add ? "added to" : "removed from") << " bucket '" << bucket->get_info().bucket << "'" << dendl; + return 0; + }, y); +} + + +int bucket_deletion_cleanup(const DoutPrefixProvider* dpp, + sal::Driver* driver, + sal::Bucket* bucket, + optional_yield y) { + // if the bucket is used a log bucket, we should delete all pending log objects + // and also delete the object holding the pending object name + auto& attrs = bucket->get_attrs(); + if (const auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING_SOURCES); iter != attrs.end()) { + try { + source_buckets sources; + ceph::decode(sources, iter->second); + for (const auto& source : sources) { + std::unique_ptr<rgw::sal::Bucket> src_bucket; + if (const auto ret = driver->load_bucket(dpp, source, &src_bucket, y); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: failed to get logging source bucket '" << source << "' for log bucket '" << + bucket->get_info().bucket << "', ret = " << ret << dendl; + continue; + } + auto& src_attrs = src_bucket->get_attrs(); + if (const auto iter = src_attrs.find(RGW_ATTR_BUCKET_LOGGING); iter != src_attrs.end()) { + configuration conf; + try { + auto bl_iter = iter->second.cbegin(); + decode(conf, bl_iter); + std::string obj_name; + RGWObjVersionTracker objv; + if (const auto ret = bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, &objv); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: failed to get logging object name for log bucket '" << bucket->get_info().bucket << + "', ret = " << ret << dendl; + continue; + } + if (const auto ret = bucket->remove_logging_object(obj_name, y, dpp); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: failed to delete pending logging object '" << obj_name << "' for log bucket '" << + bucket->get_info().bucket << "', ret = " << ret << dendl; + continue; + } + ldpp_dout(dpp, 20) << "INFO: successfully deleted pending logging object '" << obj_name << "' from deleted log bucket '" << + bucket->get_info().bucket << "'" << dendl; + if (const auto ret = bucket->remove_logging_object_name(conf.target_prefix, y, dpp, &objv); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: failed to delete object holding bucket logging object name for log bucket '" << + bucket->get_info().bucket << "', ret = " << ret << dendl; + continue; + } + ldpp_dout(dpp, 20) << "INFO: successfully deleted object holding bucket logging object name from deleted log bucket '" << + bucket->get_info().bucket << "'" << dendl; + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "WARNING: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING + << "' of bucket '" << src_bucket->get_info().bucket << "', error: " << err.what() << dendl; + } + } + } + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "WARNING: failed to decode logging sources attribute '" << RGW_ATTR_BUCKET_LOGGING_SOURCES + << "' for bucket '" << bucket->get_info().bucket << "', error: " << err.what() << dendl; + return -EIO; + } + } + + return source_bucket_cleanup(dpp, driver, bucket, false, y); +} + +int source_bucket_cleanup(const DoutPrefixProvider* dpp, + sal::Driver* driver, + sal::Bucket* bucket, + bool remove_attr, + optional_yield y) { + std::optional<configuration> conf; + const auto& info = bucket->get_info(); + if (const auto ret = retry_raced_bucket_write(dpp, bucket, [dpp, bucket, &conf, &info, remove_attr, y] { + auto& attrs = bucket->get_attrs(); + if (auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING); iter != attrs.end()) { + try { + auto bl_iter = iter->second.cbegin(); + configuration tmp_conf; + tmp_conf.enabled = true; + decode(tmp_conf, bl_iter); + conf = std::move(tmp_conf); + } catch (buffer::error& err) { + ldpp_dout(dpp, 1) << "WARNING: failed to decode existing logging attribute '" << RGW_ATTR_BUCKET_LOGGING + << "' of bucket '" << info.bucket << "', error: " << err.what() << dendl; + return -EIO; + } + if (remove_attr) { + attrs.erase(iter); + return bucket->merge_and_store_attrs(dpp, attrs, y); + } + } + // nothing to remove or no need to remove + return 0; + }, y); ret < 0) { + if (remove_attr) { + ldpp_dout(dpp, 1) << "ERROR: failed to remove logging attribute '" << RGW_ATTR_BUCKET_LOGGING << "' from bucket '" << + info.bucket << "', ret = " << ret << dendl; + } + return ret; + } + if (!conf) { + // no logging attribute found + return 0; + } + if (const auto ret = commit_logging_object(*conf, dpp, driver, info.bucket.tenant, y); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: could not commit pending logging object of bucket '" << + info.bucket << "', ret = " << ret << dendl; + } else { + ldpp_dout(dpp, 20) << "INFO: successfully committed pending logging object of bucket '" << info.bucket << "'" << dendl; + } + rgw_bucket target_bucket_id; + rgw_bucket src_bucket_id{info.bucket.tenant, info.bucket.name}; + if (const auto ret = get_bucket_id(conf->target_bucket, info.bucket.tenant, target_bucket_id); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: failed to parse target bucket '" << conf->target_bucket << "', ret = " << ret << dendl; + return 0; + } + if (const auto ret = update_bucket_logging_sources(dpp, driver, target_bucket_id, src_bucket_id, false, y); ret < 0) { + ldpp_dout(dpp, 1) << "WARNING: could not update bucket logging source '" << + info.bucket << "', ret = " << ret << dendl; + return 0; + } + ldpp_dout(dpp, 20) << "INFO: successfully updated bucket logging source '" << + info.bucket << "'"<< dendl; + return 0; +} + } // namespace rgw::bucketlogging diff --git a/src/rgw/rgw_bucket_logging.h b/src/rgw/rgw_bucket_logging.h index d4877bafb0f..cbdb8b55f88 100644 --- a/src/rgw/rgw_bucket_logging.h +++ b/src/rgw/rgw_bucket_logging.h @@ -4,7 +4,6 @@ #pragma once #include <string> -#include <optional> #include <cstdint> #include "rgw_sal_fwd.h" #include "include/buffer.h" @@ -16,7 +15,7 @@ class XMLObj; namespace ceph { class Formatter; } class DoutPrefixProvider; struct req_state; -class RGWObjVersionTracker; +struct RGWObjVersionTracker; class RGWOp; namespace rgw::bucketlogging { @@ -66,6 +65,17 @@ enum class LoggingType {Standard, Journal, Any}; enum class PartitionDateSource {DeliveryTime, EventTime}; struct configuration { + bool operator==(const configuration& rhs) const { + return enabled == rhs.enabled && + target_bucket == rhs.target_bucket && + obj_key_format == rhs.obj_key_format && + target_prefix == rhs.target_prefix && + obj_roll_time == rhs.obj_roll_time && + logging_type == rhs.logging_type && + records_batch_size == rhs.records_batch_size && + date_source == rhs.date_source && + key_filter == rhs.key_filter; + } uint32_t default_obj_roll_time = 300; bool enabled = false; std::string target_bucket; @@ -129,6 +139,8 @@ struct configuration { }; WRITE_CLASS_ENCODER(configuration) +using source_buckets = std::set<rgw_bucket>; + constexpr unsigned MAX_BUCKET_LOGGING_BUFFER = 1000; using bucket_logging_records = std::array<std::string, MAX_BUCKET_LOGGING_BUFFER>; @@ -155,7 +167,7 @@ int log_record(rgw::sal::Driver* driver, bool async_completion, bool log_source_bucket); -// commit the pending log objec tto the log bucket +// commit the pending log objec to the log bucket // and create a new pending log object // if "must_commit" is "false" the function will return success even if the pending log object was not committed int rollover_logging_object(const configuration& conf, @@ -166,6 +178,23 @@ int rollover_logging_object(const configuration& conf, bool must_commit, RGWObjVersionTracker* objv_tracker); +// commit the pending log object to the log bucket +// use this for cleanup, when new pending object is not needed +// and target bucket is known +int commit_logging_object(const configuration& conf, + const std::unique_ptr<rgw::sal::Bucket>& target_bucket, + const DoutPrefixProvider *dpp, + optional_yield y); + +// commit the pending log object to the log bucket +// use this for cleanup, when new pending object is not needed +// and target bucket shoud be loaded based on the configuration +int commit_logging_object(const configuration& conf, + const DoutPrefixProvider *dpp, + rgw::sal::Driver* driver, + const std::string& tenant_name, + optional_yield y); + // return the oid of the object holding the name of the temporary logging object // bucket - log bucket // prefix - logging prefix from configuration. should be used when multiple buckets log into the same log bucket @@ -185,5 +214,37 @@ int log_record(rgw::sal::Driver* driver, optional_yield y, bool async_completion, bool log_source_bucket); + +// return (by ref) an rgw_bucket object with the bucket name and tenant name +// fails if the bucket name is not in the format: [tenant name:]<bucket name> +int get_bucket_id(const std::string& bucket_name, const std::string& tenant_name, rgw_bucket& bucket_id); + +// update (add or remove) a source bucket from the list of source buckets in the target bucket +// use this function when the target bucket is already loaded +int update_bucket_logging_sources(const DoutPrefixProvider* dpp, std::unique_ptr<rgw::sal::Bucket>& bucket, + const rgw_bucket& src_bucket, bool add, optional_yield y); + +// update (add or remove) a source bucket from the list of source buckets in the target bucket +// use this function when the target bucket is not known and needs to be loaded +int update_bucket_logging_sources(const DoutPrefixProvider* dpp, rgw::sal::Driver* driver, const rgw_bucket& target_bucket_id, + const rgw_bucket& src_bucket_id, bool add, optional_yield y); + +// when source bucket is deleted, all pending log objects should be comitted to the log bucket +// when the target bucket is deleted, all pending log objects should be deleted, as well as the object holding the pending log object name +int bucket_deletion_cleanup(const DoutPrefixProvider* dpp, + sal::Driver* driver, + sal::Bucket* bucket, + optional_yield y); + +// if bucket has bucket logging configuration associated with it then: +// if "remove_attr" is true, the bucket logging configuration should be removed from the bucket +// in addition: +// any pending log objects should be comitted to the log bucket +// and the log bucket should be updated to remove the bucket as a source +int source_bucket_cleanup(const DoutPrefixProvider* dpp, + sal::Driver* driver, + sal::Bucket* bucket, + bool remove_attr, + optional_yield y); } // namespace rgw::bucketlogging diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index d2917838f36..99f7db4f569 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -108,6 +108,7 @@ using ceph::crypto::MD5; #define RGW_ATTR_X_ROBOTS_TAG RGW_ATTR_PREFIX "x-robots-tag" #define RGW_ATTR_STORAGE_CLASS RGW_ATTR_PREFIX "storage_class" #define RGW_ATTR_BUCKET_LOGGING RGW_ATTR_PREFIX "logging" +#define RGW_ATTR_BUCKET_LOGGING_SOURCES RGW_ATTR_PREFIX "logging-sources" /* S3 Object Lock*/ #define RGW_ATTR_OBJECT_LOCK RGW_ATTR_PREFIX "object-lock" diff --git a/src/rgw/rgw_rest_bucket_logging.cc b/src/rgw/rgw_rest_bucket_logging.cc index ed12ce855a9..afd79b0a548 100644 --- a/src/rgw/rgw_rest_bucket_logging.cc +++ b/src/rgw/rgw_rest_bucket_logging.cc @@ -58,30 +58,29 @@ public: return; } - std::unique_ptr<rgw::sal::Bucket> bucket; - op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), - &bucket, y); + const rgw_bucket src_bucket_id(s->bucket_tenant, s->bucket_name); + std::unique_ptr<rgw::sal::Bucket> src_bucket; + op_ret = driver->load_bucket(this, src_bucket_id, + &src_bucket, y); if (op_ret < 0) { - ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << - (s->bucket_tenant.empty() ? s->bucket_name : s->bucket_tenant + ":" + s->bucket_name) << - "' info, ret = " << op_ret << dendl; + ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl; return; } - if (auto iter = bucket->get_attrs().find(RGW_ATTR_BUCKET_LOGGING); iter != bucket->get_attrs().end()) { + if (auto iter = src_bucket->get_attrs().find(RGW_ATTR_BUCKET_LOGGING); iter != src_bucket->get_attrs().end()) { try { configuration.enabled = true; decode(configuration, iter->second); } catch (buffer::error& err) { - ldpp_dout(this, 1) << "ERROR: failed to decode attribute '" << RGW_ATTR_BUCKET_LOGGING - << "'. error: " << err.what() << dendl; + ldpp_dout(this, 1) << "WARNING: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING + << "' for bucket '" << src_bucket_id << "', error: " << err.what() << dendl; op_ret = -EIO; return; } } else { - ldpp_dout(this, 5) << "WARNING: no logging configuration on bucket '" << bucket->get_name() << "'" << dendl; + ldpp_dout(this, 5) << "WARNING: no logging configuration on bucket '" << src_bucket_id << "'" << dendl; return; } - ldpp_dout(this, 20) << "INFO: found logging configuration on bucket '" << bucket->get_name() << "'" + ldpp_dout(this, 20) << "INFO: found logging configuration on bucket '" << src_bucket_id << "'" << "'. configuration: " << configuration.to_json_str() << dendl; } @@ -159,58 +158,125 @@ class RGWPutBucketLoggingOp : public RGWDefaultResponseOp { return; } - std::unique_ptr<rgw::sal::Bucket> bucket; - op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), - &bucket, y); + const rgw_bucket src_bucket_id(s->bucket_tenant, s->bucket_name); + std::unique_ptr<rgw::sal::Bucket> src_bucket; + op_ret = driver->load_bucket(this, src_bucket_id, + &src_bucket, y); if (op_ret < 0) { - ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << s->bucket_name << "', ret = " << op_ret << dendl; + ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl; return; } - - auto& attrs = bucket->get_attrs(); if (!configuration.enabled) { - if (auto iter = attrs.find(RGW_ATTR_BUCKET_LOGGING); iter != attrs.end()) { - attrs.erase(iter); - } - } else { - std::unique_ptr<rgw::sal::Bucket> target_bucket; - op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, configuration.target_bucket), - &target_bucket, y); - if (op_ret < 0) { - ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << configuration.target_bucket << "', ret = " << op_ret << dendl; - return; - } - const auto& target_attrs = target_bucket->get_attrs(); - if (target_attrs.find(RGW_ATTR_BUCKET_LOGGING) != target_attrs.end()) { - // target bucket must not have logging set on it - ldpp_dout(this, 1) << "ERROR: logging target bucket '" << configuration.target_bucket << "', is configured with bucket logging" << dendl; - op_ret = -EINVAL; - return; - } - // TODO: verify target bucket does not have encryption - bufferlist conf_bl; - encode(configuration, conf_bl); - attrs[RGW_ATTR_BUCKET_LOGGING] = conf_bl; - // TODO: should we add attribute to target bucket indicating it is target to bucket logging? - // if we do, how do we maintain it when bucket logging changes? + op_ret = rgw::bucketlogging::source_bucket_cleanup(this, driver, src_bucket.get(), true, y); + return; + } + + // set logging configuration + rgw_bucket target_bucket_id; + if (op_ret = rgw::bucketlogging::get_bucket_id(configuration.target_bucket, s->bucket_tenant, target_bucket_id); op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << op_ret << dendl; + return; + } + + if (target_bucket_id == src_bucket_id) { + ldpp_dout(this, 1) << "ERROR: target bucket '" << target_bucket_id << "' must be different from source bucket" << dendl; + op_ret = -EINVAL; + return; + } + std::unique_ptr<rgw::sal::Bucket> target_bucket; + op_ret = driver->load_bucket(this, target_bucket_id, + &target_bucket, y); + if (op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << op_ret << dendl; + return; + } + auto& target_attrs = target_bucket->get_attrs(); + if (target_attrs.find(RGW_ATTR_BUCKET_LOGGING) != target_attrs.end()) { + // target bucket must not have logging set on it + ldpp_dout(this, 1) << "ERROR: logging target bucket '" << target_bucket_id << "', is configured with bucket logging" << dendl; + op_ret = -EINVAL; + return; } - // TODO: use retry_raced_bucket_write from rgw_op.cc - op_ret = bucket->merge_and_store_attrs(this, attrs, y); + // verify target bucket does not have encryption + if (target_attrs.find(RGW_ATTR_BUCKET_ENCRYPTION_POLICY) != target_attrs.end()) { + ldpp_dout(this, 1) << "ERROR: logging target bucket '" << target_bucket_id << "', is configured with encryption" << dendl; + op_ret = -EINVAL; + return; + } + std::optional<rgw::bucketlogging::configuration> old_conf; + bufferlist conf_bl; + encode(configuration, conf_bl); + op_ret = retry_raced_bucket_write(this, src_bucket.get(), [this, &conf_bl, &src_bucket, &old_conf, &configuration, y] { + auto& attrs = src_bucket->get_attrs(); + auto it = attrs.find(RGW_ATTR_BUCKET_LOGGING); + if (it != attrs.end()) { + try { + rgw::bucketlogging::configuration tmp_conf; + tmp_conf.enabled = true; + decode(tmp_conf, it->second); + old_conf = std::move(tmp_conf); + } catch (buffer::error& err) { + ldpp_dout(this, 1) << "WARNING: failed to decode existing logging attribute '" << RGW_ATTR_BUCKET_LOGGING + << "' for bucket '" << src_bucket->get_info().bucket << "', error: " << err.what() << dendl; + } + if (!old_conf || (old_conf && *old_conf != configuration)) { + // conf changed (or was unknown) - update + it->second = conf_bl; + return src_bucket->merge_and_store_attrs(this, attrs, y); + } + // nothing to update + return 0; + } + // conf was added + attrs.insert(std::make_pair(RGW_ATTR_BUCKET_LOGGING, conf_bl)); + return src_bucket->merge_and_store_attrs(this, attrs, y); + }, y); if (op_ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to set logging attribute '" << RGW_ATTR_BUCKET_LOGGING << "' to bucket '" << - bucket->get_name() << "', ret = " << op_ret << dendl; + src_bucket_id << "', ret = " << op_ret << dendl; return; } - - ldpp_dout(this, 20) << "INFO: " << (configuration.enabled ? "wrote" : "removed") - << " logging configuration. bucket '" << bucket->get_name() << "'. configuration: " << - configuration.to_json_str() << dendl; + if (!old_conf) { + ldpp_dout(this, 20) << "INFO: new logging configuration added to bucket '" << src_bucket_id << "'. configuration: " << + configuration.to_json_str() << dendl; + if (const auto ret = rgw::bucketlogging::update_bucket_logging_sources(this, target_bucket, src_bucket_id, true, y); ret < 0) { + ldpp_dout(this, 1) << "WARNING: failed to add source bucket '" << src_bucket_id << "' to logging sources of target bucket '" << + target_bucket_id << "', ret = " << ret << dendl; + } + } else if (*old_conf != configuration) { + // conf changed - do cleanup + if (const auto ret = commit_logging_object(*old_conf, target_bucket, this, y); ret < 0) { + ldpp_dout(this, 1) << "WARNING: could not commit pending logging object when updating logging configuration of bucket '" << + src_bucket->get_info().bucket << "', ret = " << ret << dendl; + } else { + ldpp_dout(this, 20) << "INFO: committed pending logging object when updating logging configuration of bucket '" << + src_bucket->get_info().bucket << "'" << dendl; + } + if (old_conf->target_bucket != configuration.target_bucket) { + rgw_bucket old_target_bucket_id; + if (const auto ret = rgw::bucketlogging::get_bucket_id(old_conf->target_bucket, s->bucket_tenant, old_target_bucket_id); ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << old_conf->target_bucket << "', ret = " << ret << dendl; + return; + } + if (const auto ret = rgw::bucketlogging::update_bucket_logging_sources(this, driver, old_target_bucket_id, src_bucket_id, false, y); ret < 0) { + ldpp_dout(this, 1) << "WARNING: failed to remove source bucket '" << src_bucket_id << "' from logging sources of original target bucket '" << + old_target_bucket_id << "', ret = " << ret << dendl; + } + if (const auto ret = rgw::bucketlogging::update_bucket_logging_sources(this, target_bucket, src_bucket_id, true, y); ret < 0) { + ldpp_dout(this, 1) << "WARNING: failed to add source bucket '" << src_bucket_id << "' to logging sources of target bucket '" << + target_bucket_id << "', ret = " << ret << dendl; + } + } + ldpp_dout(this, 20) << "INFO: wrote logging configuration to bucket '" << src_bucket_id << "'. configuration: " << + configuration.to_json_str() << dendl; + } else { + ldpp_dout(this, 20) << "INFO: logging configuration of bucket '" << src_bucket_id << "' did not change" << dendl; + } } }; // Post /<bucket name>/?logging -// actual configuration is XML encoded in the body of the message class RGWPostBucketLoggingOp : public RGWDefaultResponseOp { int verify_permission(optional_yield y) override { auto [has_s3_existing_tag, has_s3_resource_tag] = rgw_check_policy_condition(this, s, false); @@ -234,17 +300,18 @@ class RGWPostBucketLoggingOp : public RGWDefaultResponseOp { return; } - std::unique_ptr<rgw::sal::Bucket> bucket; - op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name), - &bucket, y); + const rgw_bucket src_bucket_id(s->bucket_tenant, s->bucket_name); + std::unique_ptr<rgw::sal::Bucket> src_bucket; + op_ret = driver->load_bucket(this, src_bucket_id, + &src_bucket, y); if (op_ret < 0) { - ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << s->bucket_name << "', ret = " << op_ret << dendl; + ldpp_dout(this, 1) << "ERROR: failed to get bucket '" << src_bucket_id << "', ret = " << op_ret << dendl; return; } - const auto& bucket_attrs = bucket->get_attrs(); + const auto& bucket_attrs = src_bucket->get_attrs(); auto iter = bucket_attrs.find(RGW_ATTR_BUCKET_LOGGING); if (iter == bucket_attrs.end()) { - ldpp_dout(this, 1) << "WARNING: no logging configured on bucket" << dendl; + ldpp_dout(this, 1) << "WARNING: no logging configured on bucket '" << src_bucket_id << "'" << dendl; return; } rgw::bucketlogging::configuration configuration; @@ -252,33 +319,38 @@ class RGWPostBucketLoggingOp : public RGWDefaultResponseOp { configuration.enabled = true; decode(configuration, iter->second); } catch (buffer::error& err) { - ldpp_dout(this, 1) << "ERROR: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING - << "'. error: " << err.what() << dendl; + ldpp_dout(this, 1) << "WARNING: failed to decode logging attribute '" << RGW_ATTR_BUCKET_LOGGING + << "' for bucket '" << src_bucket_id << "', error: " << err.what() << dendl; op_ret = -EINVAL; return; } + rgw_bucket target_bucket_id; + if (op_ret = rgw::bucketlogging::get_bucket_id(configuration.target_bucket, s->bucket_tenant, target_bucket_id); op_ret < 0) { + ldpp_dout(this, 1) << "ERROR: failed to parse target bucket '" << configuration.target_bucket << "', ret = " << op_ret << dendl; + return; + } std::unique_ptr<rgw::sal::Bucket> target_bucket; - op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, configuration.target_bucket), + op_ret = driver->load_bucket(this, target_bucket_id, &target_bucket, y); if (op_ret < 0) { - ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << configuration.target_bucket << "', ret = " << op_ret << dendl; + ldpp_dout(this, 1) << "ERROR: failed to get target bucket '" << target_bucket_id << "', ret = " << op_ret << dendl; return; } std::string obj_name; RGWObjVersionTracker objv_tracker; op_ret = target_bucket->get_logging_object_name(obj_name, configuration.target_prefix, null_yield, this, &objv_tracker); if (op_ret < 0) { - ldpp_dout(this, 1) << "ERROR: failed to get pending logging object name from target bucket '" << configuration.target_bucket << "'" << dendl; + ldpp_dout(this, 1) << "ERROR: failed to get pending logging object name from target bucket '" << target_bucket_id << "'" << dendl; return; } op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, null_yield, true, &objv_tracker); if (op_ret < 0) { ldpp_dout(this, 1) << "ERROR: failed to flush pending logging object '" << obj_name - << "' to target bucket '" << configuration.target_bucket << "'" << dendl; + << "' to target bucket '" << target_bucket_id << "'" << dendl; return; } - ldpp_dout(this, 20) << "flushed pending logging object '" << obj_name + ldpp_dout(this, 20) << "INFO: flushed pending logging object '" << obj_name << "' to target bucket '" << configuration.target_bucket << "'" << dendl; } }; diff --git a/src/rgw/rgw_s3_filter.h b/src/rgw/rgw_s3_filter.h index 9bbc4ef0088..0273da9a364 100644 --- a/src/rgw/rgw_s3_filter.h +++ b/src/rgw/rgw_s3_filter.h @@ -9,6 +9,7 @@ class XMLObj; struct rgw_s3_key_filter { + bool operator==(const rgw_s3_key_filter& rhs) const = default; std::string prefix_rule; std::string suffix_rule; std::string regex_rule; diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 4b94f74b851..97e25179fc9 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -1006,20 +1006,27 @@ class Bucket { optional_yield y, const DoutPrefixProvider *dpp) = 0; /** Read the name of the pending bucket logging object name */ - virtual int get_logging_object_name(std::string& obj_name, - const std::string& prefix, - optional_yield y, + virtual int get_logging_object_name(std::string& obj_name, + const std::string& prefix, + optional_yield y, const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker) = 0; /** Update the name of the pending bucket logging object name */ - virtual int set_logging_object_name(const std::string& obj_name, - const std::string& prefix, - optional_yield y, - const DoutPrefixProvider *dpp, + virtual int set_logging_object_name(const std::string& obj_name, + const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, bool new_obj, RGWObjVersionTracker* objv_tracker) = 0; + /** Remove the object holding the name of the pending bucket logging object */ + virtual int remove_logging_object_name(const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, + RGWObjVersionTracker* objv_tracker) = 0; /** Move the pending bucket logging object into the bucket */ virtual int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0; + //** Remove the pending bucket logging object */ + virtual int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0; /** Write a record to the pending bucket logging object */ virtual int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) = 0; diff --git a/src/rgw/rgw_sal_filter.h b/src/rgw/rgw_sal_filter.h index 947ce9d4bf5..b6b6ed42b8f 100644 --- a/src/rgw/rgw_sal_filter.h +++ b/src/rgw/rgw_sal_filter.h @@ -666,24 +666,33 @@ public: optional_yield y, const DoutPrefixProvider *dpp) override { return next->remove_topics(objv_tracker, y, dpp); } - int get_logging_object_name(std::string& obj_name, - const std::string& prefix, - optional_yield y, + int get_logging_object_name(std::string& obj_name, + const std::string& prefix, + optional_yield y, const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker) override { return next->get_logging_object_name(obj_name, prefix, y, dpp, objv_tracker); } - int set_logging_object_name(const std::string& obj_name, - const std::string& prefix, - optional_yield y, - const DoutPrefixProvider *dpp, + int set_logging_object_name(const std::string& obj_name, + const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, bool new_obj, RGWObjVersionTracker* objv_track) override { - return next->set_logging_object_name(obj_name, prefix, y, dpp, new_obj, objv_track); + return next->set_logging_object_name(obj_name, prefix, y, dpp, new_obj, objv_track); + } + int remove_logging_object_name(const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, + RGWObjVersionTracker* objv_tracker) override { + return next->remove_logging_object_name(prefix, y, dpp, objv_tracker); } int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp)override { return next->commit_logging_object(obj_name, y, dpp); } + int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { + return next->remove_logging_object(obj_name, y, dpp); + } int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override { return next->write_logging_object(obj_name, record, y, dpp, async_completion); } diff --git a/src/rgw/rgw_sal_store.h b/src/rgw/rgw_sal_store.h index 5cb98d23158..99b90564997 100644 --- a/src/rgw/rgw_sal_store.h +++ b/src/rgw/rgw_sal_store.h @@ -253,18 +253,23 @@ class StoreBucket : public Bucket { optional_yield y, const DoutPrefixProvider *dpp) override {return 0;} int remove_topics(RGWObjVersionTracker* objv_tracker, optional_yield y, const DoutPrefixProvider *dpp) override {return 0;} - int get_logging_object_name(std::string& obj_name, - const std::string& prefix, - optional_yield y, + int get_logging_object_name(std::string& obj_name, + const std::string& prefix, + optional_yield y, const DoutPrefixProvider *dpp, RGWObjVersionTracker* objv_tracker) override { return 0; } - int set_logging_object_name(const std::string& obj_name, - const std::string& prefix, - optional_yield y, - const DoutPrefixProvider *dpp, + int set_logging_object_name(const std::string& obj_name, + const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, bool new_obj, RGWObjVersionTracker* objv_tracker) override { return 0; } + int remove_logging_object_name(const std::string& prefix, + optional_yield y, + const DoutPrefixProvider *dpp, + RGWObjVersionTracker* objv_tracker) override { return 0; } int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; } + int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; } int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override { return 0; } diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index 8b7c6d6e3db..c1675d11a80 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -43,7 +43,8 @@ bucket sync disable disable bucket sync bucket sync enable enable bucket sync bucket radoslist list rados objects backing bucket's objects - bucket logging flush flush pending log records object of source bucket to the log bucket to bucket + bucket logging flush flush pending log records object of source bucket to the log bucket + bucket logging info get info on bucket logging configuration on source bucket or list of sources in log bucket bi get retrieve bucket index object entries bi put store bucket index object entries bi list list raw bucket index entries |