diff options
author | Yuval Lifshitz <ylifshit@redhat.com> | 2023-12-12 16:06:01 +0100 |
---|---|---|
committer | Yuval Lifshitz <ylifshit@redhat.com> | 2023-12-12 16:06:01 +0100 |
commit | f3cfd02db3eab956f2cb8e69736c30b3f90ec1b4 (patch) | |
tree | f419f6fbf435cb3c0927d2724f99819f4bad8b93 /src | |
parent | Merge pull request #54875 from zdover23/wip-doc-2023-12-12-radosgw-role-creat... (diff) | |
download | ceph-f3cfd02db3eab956f2cb8e69736c30b3f90ec1b4.tar.xz ceph-f3cfd02db3eab956f2cb8e69736c30b3f90ec1b4.zip |
rgw: split RGWDataAccess from rgw_tools.cc
that class is not rados specific, while the rest of the code
in rgw_tools.cc is
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/rgw/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_tools.cc | 170 | ||||
-rw-r--r-- | src/rgw/driver/rados/rgw_tools.h | 155 | ||||
-rw-r--r-- | src/rgw/rgw_admin.cc | 1 | ||||
-rw-r--r-- | src/rgw/rgw_data_access.cc | 222 | ||||
-rw-r--r-- | src/rgw/rgw_data_access.h | 124 |
6 files changed, 348 insertions, 325 deletions
diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 00a9e4127e1..a308c833ddc 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -147,6 +147,7 @@ set(librgw_common_srcs rgw_bucket_encryption.cc rgw_tracer.cc rgw_lua_background.cc + rgw_data_access.cc driver/rados/cls_fifo_legacy.cc driver/rados/rgw_bucket.cc driver/rados/rgw_bucket_sync.cc diff --git a/src/rgw/driver/rados/rgw_tools.cc b/src/rgw/driver/rados/rgw_tools.cc index bf78b9bb22e..20de47d2cf9 100644 --- a/src/rgw/driver/rados/rgw_tools.cc +++ b/src/rgw/driver/rados/rgw_tools.cc @@ -280,176 +280,6 @@ void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const strin } } -RGWDataAccess::RGWDataAccess(rgw::sal::Driver* _driver) : driver(_driver) -{ -} - - -int RGWDataAccess::Bucket::finish_init() -{ - auto iter = attrs.find(RGW_ATTR_ACL); - if (iter == attrs.end()) { - return 0; - } - - bufferlist::const_iterator bliter = iter->second.begin(); - try { - policy.decode(bliter); - } catch (buffer::error& err) { - return -EIO; - } - - return 0; -} - -int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y) -{ - std::unique_ptr<rgw::sal::Bucket> bucket; - int ret = sd->driver->load_bucket(dpp, rgw_bucket(tenant, name), &bucket, y); - if (ret < 0) { - return ret; - } - - bucket_info = bucket->get_info(); - mtime = bucket->get_modification_time(); - attrs = bucket->get_attrs(); - - return finish_init(); -} - -int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info, - const map<string, bufferlist>& _attrs) -{ - bucket_info = _bucket_info; - attrs = _attrs; - - return finish_init(); -} - -int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key, - ObjectRef *obj) { - obj->reset(new Object(sd, shared_from_this(), key)); - return 0; -} - -int RGWDataAccess::Object::put(bufferlist& data, - map<string, bufferlist>& attrs, - const DoutPrefixProvider *dpp, - optional_yield y) -{ - rgw::sal::Driver* driver = sd->driver; - CephContext *cct = driver->ctx(); - - string tag; - append_rand_alpha(cct, tag, tag, 32); - - RGWBucketInfo& bucket_info = bucket->bucket_info; - - rgw::BlockingAioThrottle aio(driver->ctx()->_conf->rgw_put_obj_min_window_size); - - std::unique_ptr<rgw::sal::Bucket> b = driver->get_bucket(bucket_info); - std::unique_ptr<rgw::sal::Object> obj = b->get_object(key); - - auto& owner = bucket->policy.get_owner(); - - string req_id = driver->zone_unique_id(driver->get_new_req_id()); - - std::unique_ptr<rgw::sal::Writer> processor; - processor = driver->get_atomic_writer(dpp, y, obj.get(), owner.id, - nullptr, olh_epoch, req_id); - - int ret = processor->prepare(y); - if (ret < 0) - return ret; - - rgw::sal::DataProcessor *filter = processor.get(); - - CompressorRef plugin; - boost::optional<RGWPutObj_Compress> compressor; - - const auto& compression_type = driver->get_compression_type(bucket_info.placement_rule); - if (compression_type != "none") { - plugin = Compressor::create(driver->ctx(), compression_type); - if (!plugin) { - ldpp_dout(dpp, 1) << "Cannot load plugin for compression type " - << compression_type << dendl; - } else { - compressor.emplace(driver->ctx(), plugin, filter); - filter = &*compressor; - } - } - - off_t ofs = 0; - auto obj_size = data.length(); - - RGWMD5Etag etag_calc; - - do { - size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size); - - bufferlist bl; - - data.splice(0, read_len, &bl); - etag_calc.update(bl); - - ret = filter->process(std::move(bl), ofs); - if (ret < 0) - return ret; - - ofs += read_len; - } while (data.length() > 0); - - ret = filter->process({}, ofs); - if (ret < 0) { - return ret; - } - bool has_etag_attr = false; - auto iter = attrs.find(RGW_ATTR_ETAG); - if (iter != attrs.end()) { - bufferlist& bl = iter->second; - etag = bl.to_str(); - has_etag_attr = true; - } - - if (!aclbl) { - RGWAccessControlPolicy policy; - - const auto& owner = bucket->policy.get_owner(); - policy.create_default(owner.id, owner.display_name); // default private policy - - policy.encode(aclbl.emplace()); - } - - if (etag.empty()) { - etag_calc.finish(&etag); - } - - if (!has_etag_attr) { - bufferlist etagbl; - etagbl.append(etag); - attrs[RGW_ATTR_ETAG] = etagbl; - } - attrs[RGW_ATTR_ACL] = *aclbl; - - string *puser_data = nullptr; - if (user_data) { - puser_data = &(*user_data); - } - - const req_context rctx{dpp, y, nullptr}; - return processor->complete(obj_size, etag, - &mtime, mtime, - attrs, delete_at, - nullptr, nullptr, - puser_data, - nullptr, nullptr, rctx); -} - -void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy) -{ - policy.encode(aclbl.emplace()); -} - void rgw_complete_aio_completion(librados::AioCompletion* c, int r) { auto pc = c->pc; librados::CB_AioCompleteAndSafe cb(pc); diff --git a/src/rgw/driver/rados/rgw_tools.h b/src/rgw/driver/rados/rgw_tools.h index 27a8b424ecc..27bc6f0c4da 100644 --- a/src/rgw/driver/rados/rgw_tools.h +++ b/src/rgw/driver/rados/rgw_tools.h @@ -165,161 +165,6 @@ int rgw_get_rados_ref(const DoutPrefixProvider* dpp, librados::Rados* rados, int rgw_tools_init(const DoutPrefixProvider *dpp, CephContext *cct); void rgw_tools_cleanup(); -template<class H, size_t S> -class RGWEtag -{ - H hash; - -public: - RGWEtag() { - if constexpr (std::is_same_v<H, MD5>) { - // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes - hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); - } - } - - void update(const char *buf, size_t len) { - hash.Update((const unsigned char *)buf, len); - } - - void update(bufferlist& bl) { - if (bl.length() > 0) { - update(bl.c_str(), bl.length()); - } - } - - void update(const std::string& s) { - if (!s.empty()) { - update(s.c_str(), s.size()); - } - } - void finish(std::string *etag) { - char etag_buf[S]; - char etag_buf_str[S * 2 + 16]; - - hash.Final((unsigned char *)etag_buf); - buf_to_hex((const unsigned char *)etag_buf, S, - etag_buf_str); - - *etag = etag_buf_str; - } -}; - -using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>; - -class RGWDataAccess -{ - rgw::sal::Driver* driver; - -public: - RGWDataAccess(rgw::sal::Driver* _driver); - - class Object; - class Bucket; - - using BucketRef = std::shared_ptr<Bucket>; - using ObjectRef = std::shared_ptr<Object>; - - class Bucket : public std::enable_shared_from_this<Bucket> { - friend class RGWDataAccess; - friend class Object; - - RGWDataAccess *sd{nullptr}; - RGWBucketInfo bucket_info; - std::string tenant; - std::string name; - std::string bucket_id; - ceph::real_time mtime; - std::map<std::string, bufferlist> attrs; - - RGWAccessControlPolicy policy; - int finish_init(); - - Bucket(RGWDataAccess *_sd, - const std::string& _tenant, - const std::string& _name, - const std::string& _bucket_id) : sd(_sd), - tenant(_tenant), - name(_name), - bucket_id(_bucket_id) {} - Bucket(RGWDataAccess *_sd) : sd(_sd) {} - int init(const DoutPrefixProvider *dpp, optional_yield y); - int init(const RGWBucketInfo& _bucket_info, const std::map<std::string, bufferlist>& _attrs); - public: - int get_object(const rgw_obj_key& key, - ObjectRef *obj); - - }; - - - class Object { - RGWDataAccess *sd{nullptr}; - BucketRef bucket; - rgw_obj_key key; - - ceph::real_time mtime; - std::string etag; - uint64_t olh_epoch{0}; - ceph::real_time delete_at; - std::optional<std::string> user_data; - - std::optional<bufferlist> aclbl; - - Object(RGWDataAccess *_sd, - BucketRef&& _bucket, - const rgw_obj_key& _key) : sd(_sd), - bucket(_bucket), - key(_key) {} - public: - int put(bufferlist& data, std::map<std::string, bufferlist>& attrs, const DoutPrefixProvider *dpp, optional_yield y); /* might modify attrs */ - - void set_mtime(const ceph::real_time& _mtime) { - mtime = _mtime; - } - - void set_etag(const std::string& _etag) { - etag = _etag; - } - - void set_olh_epoch(uint64_t epoch) { - olh_epoch = epoch; - } - - void set_delete_at(ceph::real_time _delete_at) { - delete_at = _delete_at; - } - - void set_user_data(const std::string& _user_data) { - user_data = _user_data; - } - - void set_policy(const RGWAccessControlPolicy& policy); - - friend class Bucket; - }; - - int get_bucket(const DoutPrefixProvider *dpp, - const std::string& tenant, - const std::string name, - const std::string bucket_id, - BucketRef *bucket, - optional_yield y) { - bucket->reset(new Bucket(this, tenant, name, bucket_id)); - return (*bucket)->init(dpp, y); - } - - int get_bucket(const RGWBucketInfo& bucket_info, - const std::map<std::string, bufferlist>& attrs, - BucketRef *bucket) { - bucket->reset(new Bucket(this)); - return (*bucket)->init(bucket_info, attrs); - } - friend class Bucket; - friend class Object; -}; - -using RGWDataAccessRef = std::shared_ptr<RGWDataAccess>; - /// Complete an AioCompletion. To return error values or otherwise /// satisfy the caller. Useful for making complicated asynchronous /// calls and error handling. diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index f4a2d1480bd..2288399d4d8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -62,6 +62,7 @@ extern "C" { #include "rgw_lua.h" #include "rgw_sal.h" #include "rgw_sal_config.h" +#include "rgw_data_access.h" #include "services/svc_sync_modules.h" #include "services/svc_cls.h" diff --git a/src/rgw/rgw_data_access.cc b/src/rgw/rgw_data_access.cc new file mode 100644 index 00000000000..07bf12e3746 --- /dev/null +++ b/src/rgw/rgw_data_access.cc @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_data_access.h" +#include "rgw_acl_s3.h" +#include "rgw_aio_throttle.h" +#include "rgw_compression.h" +#include "common/BackTrace.h" + +#define dout_subsys ceph_subsys_rgw + +template<class H, size_t S> +class RGWEtag +{ + H hash; + +public: + RGWEtag() { + if constexpr (std::is_same_v<H, MD5>) { + // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes + hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); + } + } + + void update(const char *buf, size_t len) { + hash.Update((const unsigned char *)buf, len); + } + + void update(bufferlist& bl) { + if (bl.length() > 0) { + update(bl.c_str(), bl.length()); + } + } + + void update(const std::string& s) { + if (!s.empty()) { + update(s.c_str(), s.size()); + } + } + void finish(std::string *etag) { + char etag_buf[S]; + char etag_buf_str[S * 2 + 16]; + + hash.Final((unsigned char *)etag_buf); + buf_to_hex((const unsigned char *)etag_buf, S, + etag_buf_str); + + *etag = etag_buf_str; + } +}; + +using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>; + +RGWDataAccess::RGWDataAccess(rgw::sal::Driver* _driver) : driver(_driver) +{ +} + +int RGWDataAccess::Bucket::finish_init() +{ + auto iter = attrs.find(RGW_ATTR_ACL); + if (iter == attrs.end()) { + return 0; + } + + bufferlist::const_iterator bliter = iter->second.begin(); + try { + policy.decode(bliter); + } catch (buffer::error& err) { + return -EIO; + } + + return 0; +} + +int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y) +{ + std::unique_ptr<rgw::sal::Bucket> bucket; + int ret = sd->driver->load_bucket(dpp, rgw_bucket(tenant, name), &bucket, y); + if (ret < 0) { + return ret; + } + + bucket_info = bucket->get_info(); + mtime = bucket->get_modification_time(); + attrs = bucket->get_attrs(); + + return finish_init(); +} + +int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info, + const std::map<std::string, bufferlist>& _attrs) +{ + bucket_info = _bucket_info; + attrs = _attrs; + + return finish_init(); +} + +int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key, + ObjectRef *obj) { + obj->reset(new Object(sd, shared_from_this(), key)); + return 0; +} + +int RGWDataAccess::Object::put(bufferlist& data, + std::map<std::string, bufferlist>& attrs, + const DoutPrefixProvider *dpp, + optional_yield y) +{ + rgw::sal::Driver* driver = sd->driver; + CephContext *cct = driver->ctx(); + + std::string tag; + append_rand_alpha(cct, tag, tag, 32); + + RGWBucketInfo& bucket_info = bucket->bucket_info; + + rgw::BlockingAioThrottle aio(driver->ctx()->_conf->rgw_put_obj_min_window_size); + + std::unique_ptr<rgw::sal::Bucket> b = driver->get_bucket(bucket_info); + std::unique_ptr<rgw::sal::Object> obj = b->get_object(key); + + auto& owner = bucket->policy.get_owner(); + + std::string req_id = driver->zone_unique_id(driver->get_new_req_id()); + + std::unique_ptr<rgw::sal::Writer> processor; + processor = driver->get_atomic_writer(dpp, y, obj.get(), owner.id, + nullptr, olh_epoch, req_id); + + int ret = processor->prepare(y); + if (ret < 0) + return ret; + + rgw::sal::DataProcessor *filter = processor.get(); + + CompressorRef plugin; + boost::optional<RGWPutObj_Compress> compressor; + + const auto& compression_type = driver->get_compression_type(bucket_info.placement_rule); + if (compression_type != "none") { + plugin = Compressor::create(driver->ctx(), compression_type); + if (!plugin) { + ldpp_dout(dpp, 1) << "Cannot load plugin for compression type " + << compression_type << dendl; + } else { + compressor.emplace(driver->ctx(), plugin, filter); + filter = &*compressor; + } + } + + off_t ofs = 0; + auto obj_size = data.length(); + + RGWMD5Etag etag_calc; + + do { + size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size); + + bufferlist bl; + + data.splice(0, read_len, &bl); + etag_calc.update(bl); + + ret = filter->process(std::move(bl), ofs); + if (ret < 0) + return ret; + + ofs += read_len; + } while (data.length() > 0); + + ret = filter->process({}, ofs); + if (ret < 0) { + return ret; + } + bool has_etag_attr = false; + auto iter = attrs.find(RGW_ATTR_ETAG); + if (iter != attrs.end()) { + bufferlist& bl = iter->second; + etag = bl.to_str(); + has_etag_attr = true; + } + + if (!aclbl) { + RGWAccessControlPolicy policy; + + const auto& owner = bucket->policy.get_owner(); + policy.create_default(owner.id, owner.display_name); // default private policy + + policy.encode(aclbl.emplace()); + } + + if (etag.empty()) { + etag_calc.finish(&etag); + } + + if (!has_etag_attr) { + bufferlist etagbl; + etagbl.append(etag); + attrs[RGW_ATTR_ETAG] = etagbl; + } + attrs[RGW_ATTR_ACL] = *aclbl; + + std::string *puser_data = nullptr; + if (user_data) { + puser_data = &(*user_data); + } + + const req_context rctx{dpp, y, nullptr}; + return processor->complete(obj_size, etag, + &mtime, mtime, + attrs, delete_at, + nullptr, nullptr, + puser_data, + nullptr, nullptr, rctx); +} + +void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy) +{ + policy.encode(aclbl.emplace()); +} + diff --git a/src/rgw/rgw_data_access.h b/src/rgw/rgw_data_access.h new file mode 100644 index 00000000000..df921a67f4d --- /dev/null +++ b/src/rgw/rgw_data_access.h @@ -0,0 +1,124 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#pragma once + +#include <string> +#include "include/types.h" +#include "common/ceph_time.h" +#include "rgw_common.h" +#include "rgw_sal_fwd.h" + +class RGWDataAccess +{ + rgw::sal::Driver* driver; + +public: + RGWDataAccess(rgw::sal::Driver* _driver); + + class Object; + class Bucket; + + using BucketRef = std::shared_ptr<Bucket>; + using ObjectRef = std::shared_ptr<Object>; + + class Bucket : public std::enable_shared_from_this<Bucket> { + friend class RGWDataAccess; + friend class Object; + + RGWDataAccess *sd{nullptr}; + RGWBucketInfo bucket_info; + std::string tenant; + std::string name; + std::string bucket_id; + ceph::real_time mtime; + std::map<std::string, bufferlist> attrs; + + RGWAccessControlPolicy policy; + int finish_init(); + + Bucket(RGWDataAccess *_sd, + const std::string& _tenant, + const std::string& _name, + const std::string& _bucket_id) : sd(_sd), + tenant(_tenant), + name(_name), + bucket_id(_bucket_id) {} + Bucket(RGWDataAccess *_sd) : sd(_sd) {} + int init(const DoutPrefixProvider *dpp, optional_yield y); + int init(const RGWBucketInfo& _bucket_info, const std::map<std::string, bufferlist>& _attrs); + public: + int get_object(const rgw_obj_key& key, + ObjectRef *obj); + + }; + + + class Object { + RGWDataAccess *sd{nullptr}; + BucketRef bucket; + rgw_obj_key key; + + ceph::real_time mtime; + std::string etag; + uint64_t olh_epoch{0}; + ceph::real_time delete_at; + std::optional<std::string> user_data; + + std::optional<bufferlist> aclbl; + + Object(RGWDataAccess *_sd, + BucketRef&& _bucket, + const rgw_obj_key& _key) : sd(_sd), + bucket(_bucket), + key(_key) {} + public: + int put(bufferlist& data, std::map<std::string, bufferlist>& attrs, const DoutPrefixProvider *dpp, optional_yield y); /* might modify attrs */ + + void set_mtime(const ceph::real_time& _mtime) { + mtime = _mtime; + } + + void set_etag(const std::string& _etag) { + etag = _etag; + } + + void set_olh_epoch(uint64_t epoch) { + olh_epoch = epoch; + } + + void set_delete_at(ceph::real_time _delete_at) { + delete_at = _delete_at; + } + + void set_user_data(const std::string& _user_data) { + user_data = _user_data; + } + + void set_policy(const RGWAccessControlPolicy& policy); + + friend class Bucket; + }; + + int get_bucket(const DoutPrefixProvider *dpp, + const std::string& tenant, + const std::string name, + const std::string bucket_id, + BucketRef *bucket, + optional_yield y) { + bucket->reset(new Bucket(this, tenant, name, bucket_id)); + return (*bucket)->init(dpp, y); + } + + int get_bucket(const RGWBucketInfo& bucket_info, + const std::map<std::string, bufferlist>& attrs, + BucketRef *bucket) { + bucket->reset(new Bucket(this)); + return (*bucket)->init(bucket_info, attrs); + } + friend class Bucket; + friend class Object; +}; + +using RGWDataAccessRef = std::shared_ptr<RGWDataAccess>; + |