summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorYuval Lifshitz <ylifshit@redhat.com>2023-12-12 16:06:01 +0100
committerYuval Lifshitz <ylifshit@redhat.com>2023-12-12 16:06:01 +0100
commitf3cfd02db3eab956f2cb8e69736c30b3f90ec1b4 (patch)
treef419f6fbf435cb3c0927d2724f99819f4bad8b93 /src
parentMerge pull request #54875 from zdover23/wip-doc-2023-12-12-radosgw-role-creat... (diff)
downloadceph-f3cfd02db3eab956f2cb8e69736c30b3f90ec1b4.tar.xz
ceph-f3cfd02db3eab956f2cb8e69736c30b3f90ec1b4.zip
rgw: split RGWDataAccess from rgw_tools.cc
that class is not rados specific, while the rest of the code in rgw_tools.cc is Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
Diffstat (limited to 'src')
-rw-r--r--src/rgw/CMakeLists.txt1
-rw-r--r--src/rgw/driver/rados/rgw_tools.cc170
-rw-r--r--src/rgw/driver/rados/rgw_tools.h155
-rw-r--r--src/rgw/rgw_admin.cc1
-rw-r--r--src/rgw/rgw_data_access.cc222
-rw-r--r--src/rgw/rgw_data_access.h124
6 files changed, 348 insertions, 325 deletions
diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt
index 00a9e4127e1..a308c833ddc 100644
--- a/src/rgw/CMakeLists.txt
+++ b/src/rgw/CMakeLists.txt
@@ -147,6 +147,7 @@ set(librgw_common_srcs
rgw_bucket_encryption.cc
rgw_tracer.cc
rgw_lua_background.cc
+ rgw_data_access.cc
driver/rados/cls_fifo_legacy.cc
driver/rados/rgw_bucket.cc
driver/rados/rgw_bucket_sync.cc
diff --git a/src/rgw/driver/rados/rgw_tools.cc b/src/rgw/driver/rados/rgw_tools.cc
index bf78b9bb22e..20de47d2cf9 100644
--- a/src/rgw/driver/rados/rgw_tools.cc
+++ b/src/rgw/driver/rados/rgw_tools.cc
@@ -280,176 +280,6 @@ void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const strin
}
}
-RGWDataAccess::RGWDataAccess(rgw::sal::Driver* _driver) : driver(_driver)
-{
-}
-
-
-int RGWDataAccess::Bucket::finish_init()
-{
- auto iter = attrs.find(RGW_ATTR_ACL);
- if (iter == attrs.end()) {
- return 0;
- }
-
- bufferlist::const_iterator bliter = iter->second.begin();
- try {
- policy.decode(bliter);
- } catch (buffer::error& err) {
- return -EIO;
- }
-
- return 0;
-}
-
-int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
-{
- std::unique_ptr<rgw::sal::Bucket> bucket;
- int ret = sd->driver->load_bucket(dpp, rgw_bucket(tenant, name), &bucket, y);
- if (ret < 0) {
- return ret;
- }
-
- bucket_info = bucket->get_info();
- mtime = bucket->get_modification_time();
- attrs = bucket->get_attrs();
-
- return finish_init();
-}
-
-int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
- const map<string, bufferlist>& _attrs)
-{
- bucket_info = _bucket_info;
- attrs = _attrs;
-
- return finish_init();
-}
-
-int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
- ObjectRef *obj) {
- obj->reset(new Object(sd, shared_from_this(), key));
- return 0;
-}
-
-int RGWDataAccess::Object::put(bufferlist& data,
- map<string, bufferlist>& attrs,
- const DoutPrefixProvider *dpp,
- optional_yield y)
-{
- rgw::sal::Driver* driver = sd->driver;
- CephContext *cct = driver->ctx();
-
- string tag;
- append_rand_alpha(cct, tag, tag, 32);
-
- RGWBucketInfo& bucket_info = bucket->bucket_info;
-
- rgw::BlockingAioThrottle aio(driver->ctx()->_conf->rgw_put_obj_min_window_size);
-
- std::unique_ptr<rgw::sal::Bucket> b = driver->get_bucket(bucket_info);
- std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);
-
- auto& owner = bucket->policy.get_owner();
-
- string req_id = driver->zone_unique_id(driver->get_new_req_id());
-
- std::unique_ptr<rgw::sal::Writer> processor;
- processor = driver->get_atomic_writer(dpp, y, obj.get(), owner.id,
- nullptr, olh_epoch, req_id);
-
- int ret = processor->prepare(y);
- if (ret < 0)
- return ret;
-
- rgw::sal::DataProcessor *filter = processor.get();
-
- CompressorRef plugin;
- boost::optional<RGWPutObj_Compress> compressor;
-
- const auto& compression_type = driver->get_compression_type(bucket_info.placement_rule);
- if (compression_type != "none") {
- plugin = Compressor::create(driver->ctx(), compression_type);
- if (!plugin) {
- ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
- << compression_type << dendl;
- } else {
- compressor.emplace(driver->ctx(), plugin, filter);
- filter = &*compressor;
- }
- }
-
- off_t ofs = 0;
- auto obj_size = data.length();
-
- RGWMD5Etag etag_calc;
-
- do {
- size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
-
- bufferlist bl;
-
- data.splice(0, read_len, &bl);
- etag_calc.update(bl);
-
- ret = filter->process(std::move(bl), ofs);
- if (ret < 0)
- return ret;
-
- ofs += read_len;
- } while (data.length() > 0);
-
- ret = filter->process({}, ofs);
- if (ret < 0) {
- return ret;
- }
- bool has_etag_attr = false;
- auto iter = attrs.find(RGW_ATTR_ETAG);
- if (iter != attrs.end()) {
- bufferlist& bl = iter->second;
- etag = bl.to_str();
- has_etag_attr = true;
- }
-
- if (!aclbl) {
- RGWAccessControlPolicy policy;
-
- const auto& owner = bucket->policy.get_owner();
- policy.create_default(owner.id, owner.display_name); // default private policy
-
- policy.encode(aclbl.emplace());
- }
-
- if (etag.empty()) {
- etag_calc.finish(&etag);
- }
-
- if (!has_etag_attr) {
- bufferlist etagbl;
- etagbl.append(etag);
- attrs[RGW_ATTR_ETAG] = etagbl;
- }
- attrs[RGW_ATTR_ACL] = *aclbl;
-
- string *puser_data = nullptr;
- if (user_data) {
- puser_data = &(*user_data);
- }
-
- const req_context rctx{dpp, y, nullptr};
- return processor->complete(obj_size, etag,
- &mtime, mtime,
- attrs, delete_at,
- nullptr, nullptr,
- puser_data,
- nullptr, nullptr, rctx);
-}
-
-void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
-{
- policy.encode(aclbl.emplace());
-}
-
void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
auto pc = c->pc;
librados::CB_AioCompleteAndSafe cb(pc);
diff --git a/src/rgw/driver/rados/rgw_tools.h b/src/rgw/driver/rados/rgw_tools.h
index 27a8b424ecc..27bc6f0c4da 100644
--- a/src/rgw/driver/rados/rgw_tools.h
+++ b/src/rgw/driver/rados/rgw_tools.h
@@ -165,161 +165,6 @@ int rgw_get_rados_ref(const DoutPrefixProvider* dpp, librados::Rados* rados,
int rgw_tools_init(const DoutPrefixProvider *dpp, CephContext *cct);
void rgw_tools_cleanup();
-template<class H, size_t S>
-class RGWEtag
-{
- H hash;
-
-public:
- RGWEtag() {
- if constexpr (std::is_same_v<H, MD5>) {
- // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
- hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
- }
- }
-
- void update(const char *buf, size_t len) {
- hash.Update((const unsigned char *)buf, len);
- }
-
- void update(bufferlist& bl) {
- if (bl.length() > 0) {
- update(bl.c_str(), bl.length());
- }
- }
-
- void update(const std::string& s) {
- if (!s.empty()) {
- update(s.c_str(), s.size());
- }
- }
- void finish(std::string *etag) {
- char etag_buf[S];
- char etag_buf_str[S * 2 + 16];
-
- hash.Final((unsigned char *)etag_buf);
- buf_to_hex((const unsigned char *)etag_buf, S,
- etag_buf_str);
-
- *etag = etag_buf_str;
- }
-};
-
-using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>;
-
-class RGWDataAccess
-{
- rgw::sal::Driver* driver;
-
-public:
- RGWDataAccess(rgw::sal::Driver* _driver);
-
- class Object;
- class Bucket;
-
- using BucketRef = std::shared_ptr<Bucket>;
- using ObjectRef = std::shared_ptr<Object>;
-
- class Bucket : public std::enable_shared_from_this<Bucket> {
- friend class RGWDataAccess;
- friend class Object;
-
- RGWDataAccess *sd{nullptr};
- RGWBucketInfo bucket_info;
- std::string tenant;
- std::string name;
- std::string bucket_id;
- ceph::real_time mtime;
- std::map<std::string, bufferlist> attrs;
-
- RGWAccessControlPolicy policy;
- int finish_init();
-
- Bucket(RGWDataAccess *_sd,
- const std::string& _tenant,
- const std::string& _name,
- const std::string& _bucket_id) : sd(_sd),
- tenant(_tenant),
- name(_name),
- bucket_id(_bucket_id) {}
- Bucket(RGWDataAccess *_sd) : sd(_sd) {}
- int init(const DoutPrefixProvider *dpp, optional_yield y);
- int init(const RGWBucketInfo& _bucket_info, const std::map<std::string, bufferlist>& _attrs);
- public:
- int get_object(const rgw_obj_key& key,
- ObjectRef *obj);
-
- };
-
-
- class Object {
- RGWDataAccess *sd{nullptr};
- BucketRef bucket;
- rgw_obj_key key;
-
- ceph::real_time mtime;
- std::string etag;
- uint64_t olh_epoch{0};
- ceph::real_time delete_at;
- std::optional<std::string> user_data;
-
- std::optional<bufferlist> aclbl;
-
- Object(RGWDataAccess *_sd,
- BucketRef&& _bucket,
- const rgw_obj_key& _key) : sd(_sd),
- bucket(_bucket),
- key(_key) {}
- public:
- int put(bufferlist& data, std::map<std::string, bufferlist>& attrs, const DoutPrefixProvider *dpp, optional_yield y); /* might modify attrs */
-
- void set_mtime(const ceph::real_time& _mtime) {
- mtime = _mtime;
- }
-
- void set_etag(const std::string& _etag) {
- etag = _etag;
- }
-
- void set_olh_epoch(uint64_t epoch) {
- olh_epoch = epoch;
- }
-
- void set_delete_at(ceph::real_time _delete_at) {
- delete_at = _delete_at;
- }
-
- void set_user_data(const std::string& _user_data) {
- user_data = _user_data;
- }
-
- void set_policy(const RGWAccessControlPolicy& policy);
-
- friend class Bucket;
- };
-
- int get_bucket(const DoutPrefixProvider *dpp,
- const std::string& tenant,
- const std::string name,
- const std::string bucket_id,
- BucketRef *bucket,
- optional_yield y) {
- bucket->reset(new Bucket(this, tenant, name, bucket_id));
- return (*bucket)->init(dpp, y);
- }
-
- int get_bucket(const RGWBucketInfo& bucket_info,
- const std::map<std::string, bufferlist>& attrs,
- BucketRef *bucket) {
- bucket->reset(new Bucket(this));
- return (*bucket)->init(bucket_info, attrs);
- }
- friend class Bucket;
- friend class Object;
-};
-
-using RGWDataAccessRef = std::shared_ptr<RGWDataAccess>;
-
/// Complete an AioCompletion. To return error values or otherwise
/// satisfy the caller. Useful for making complicated asynchronous
/// calls and error handling.
diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc
index f4a2d1480bd..2288399d4d8 100644
--- a/src/rgw/rgw_admin.cc
+++ b/src/rgw/rgw_admin.cc
@@ -62,6 +62,7 @@ extern "C" {
#include "rgw_lua.h"
#include "rgw_sal.h"
#include "rgw_sal_config.h"
+#include "rgw_data_access.h"
#include "services/svc_sync_modules.h"
#include "services/svc_cls.h"
diff --git a/src/rgw/rgw_data_access.cc b/src/rgw/rgw_data_access.cc
new file mode 100644
index 00000000000..07bf12e3746
--- /dev/null
+++ b/src/rgw/rgw_data_access.cc
@@ -0,0 +1,222 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_data_access.h"
+#include "rgw_acl_s3.h"
+#include "rgw_aio_throttle.h"
+#include "rgw_compression.h"
+#include "common/BackTrace.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+template<class H, size_t S>
+class RGWEtag
+{
+ H hash;
+
+public:
+ RGWEtag() {
+ if constexpr (std::is_same_v<H, MD5>) {
+ // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
+ hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
+ }
+ }
+
+ void update(const char *buf, size_t len) {
+ hash.Update((const unsigned char *)buf, len);
+ }
+
+ void update(bufferlist& bl) {
+ if (bl.length() > 0) {
+ update(bl.c_str(), bl.length());
+ }
+ }
+
+ void update(const std::string& s) {
+ if (!s.empty()) {
+ update(s.c_str(), s.size());
+ }
+ }
+ void finish(std::string *etag) {
+ char etag_buf[S];
+ char etag_buf_str[S * 2 + 16];
+
+ hash.Final((unsigned char *)etag_buf);
+ buf_to_hex((const unsigned char *)etag_buf, S,
+ etag_buf_str);
+
+ *etag = etag_buf_str;
+ }
+};
+
+using RGWMD5Etag = RGWEtag<MD5, CEPH_CRYPTO_MD5_DIGESTSIZE>;
+
+RGWDataAccess::RGWDataAccess(rgw::sal::Driver* _driver) : driver(_driver)
+{
+}
+
+int RGWDataAccess::Bucket::finish_init()
+{
+ auto iter = attrs.find(RGW_ATTR_ACL);
+ if (iter == attrs.end()) {
+ return 0;
+ }
+
+ bufferlist::const_iterator bliter = iter->second.begin();
+ try {
+ policy.decode(bliter);
+ } catch (buffer::error& err) {
+ return -EIO;
+ }
+
+ return 0;
+}
+
+int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
+{
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ int ret = sd->driver->load_bucket(dpp, rgw_bucket(tenant, name), &bucket, y);
+ if (ret < 0) {
+ return ret;
+ }
+
+ bucket_info = bucket->get_info();
+ mtime = bucket->get_modification_time();
+ attrs = bucket->get_attrs();
+
+ return finish_init();
+}
+
+int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
+ const std::map<std::string, bufferlist>& _attrs)
+{
+ bucket_info = _bucket_info;
+ attrs = _attrs;
+
+ return finish_init();
+}
+
+int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
+ ObjectRef *obj) {
+ obj->reset(new Object(sd, shared_from_this(), key));
+ return 0;
+}
+
+int RGWDataAccess::Object::put(bufferlist& data,
+ std::map<std::string, bufferlist>& attrs,
+ const DoutPrefixProvider *dpp,
+ optional_yield y)
+{
+ rgw::sal::Driver* driver = sd->driver;
+ CephContext *cct = driver->ctx();
+
+ std::string tag;
+ append_rand_alpha(cct, tag, tag, 32);
+
+ RGWBucketInfo& bucket_info = bucket->bucket_info;
+
+ rgw::BlockingAioThrottle aio(driver->ctx()->_conf->rgw_put_obj_min_window_size);
+
+ std::unique_ptr<rgw::sal::Bucket> b = driver->get_bucket(bucket_info);
+ std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);
+
+ auto& owner = bucket->policy.get_owner();
+
+ std::string req_id = driver->zone_unique_id(driver->get_new_req_id());
+
+ std::unique_ptr<rgw::sal::Writer> processor;
+ processor = driver->get_atomic_writer(dpp, y, obj.get(), owner.id,
+ nullptr, olh_epoch, req_id);
+
+ int ret = processor->prepare(y);
+ if (ret < 0)
+ return ret;
+
+ rgw::sal::DataProcessor *filter = processor.get();
+
+ CompressorRef plugin;
+ boost::optional<RGWPutObj_Compress> compressor;
+
+ const auto& compression_type = driver->get_compression_type(bucket_info.placement_rule);
+ if (compression_type != "none") {
+ plugin = Compressor::create(driver->ctx(), compression_type);
+ if (!plugin) {
+ ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
+ << compression_type << dendl;
+ } else {
+ compressor.emplace(driver->ctx(), plugin, filter);
+ filter = &*compressor;
+ }
+ }
+
+ off_t ofs = 0;
+ auto obj_size = data.length();
+
+ RGWMD5Etag etag_calc;
+
+ do {
+ size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
+
+ bufferlist bl;
+
+ data.splice(0, read_len, &bl);
+ etag_calc.update(bl);
+
+ ret = filter->process(std::move(bl), ofs);
+ if (ret < 0)
+ return ret;
+
+ ofs += read_len;
+ } while (data.length() > 0);
+
+ ret = filter->process({}, ofs);
+ if (ret < 0) {
+ return ret;
+ }
+ bool has_etag_attr = false;
+ auto iter = attrs.find(RGW_ATTR_ETAG);
+ if (iter != attrs.end()) {
+ bufferlist& bl = iter->second;
+ etag = bl.to_str();
+ has_etag_attr = true;
+ }
+
+ if (!aclbl) {
+ RGWAccessControlPolicy policy;
+
+ const auto& owner = bucket->policy.get_owner();
+ policy.create_default(owner.id, owner.display_name); // default private policy
+
+ policy.encode(aclbl.emplace());
+ }
+
+ if (etag.empty()) {
+ etag_calc.finish(&etag);
+ }
+
+ if (!has_etag_attr) {
+ bufferlist etagbl;
+ etagbl.append(etag);
+ attrs[RGW_ATTR_ETAG] = etagbl;
+ }
+ attrs[RGW_ATTR_ACL] = *aclbl;
+
+ std::string *puser_data = nullptr;
+ if (user_data) {
+ puser_data = &(*user_data);
+ }
+
+ const req_context rctx{dpp, y, nullptr};
+ return processor->complete(obj_size, etag,
+ &mtime, mtime,
+ attrs, delete_at,
+ nullptr, nullptr,
+ puser_data,
+ nullptr, nullptr, rctx);
+}
+
+void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
+{
+ policy.encode(aclbl.emplace());
+}
+
diff --git a/src/rgw/rgw_data_access.h b/src/rgw/rgw_data_access.h
new file mode 100644
index 00000000000..df921a67f4d
--- /dev/null
+++ b/src/rgw/rgw_data_access.h
@@ -0,0 +1,124 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#pragma once
+
+#include <string>
+#include "include/types.h"
+#include "common/ceph_time.h"
+#include "rgw_common.h"
+#include "rgw_sal_fwd.h"
+
+class RGWDataAccess
+{
+ rgw::sal::Driver* driver;
+
+public:
+ RGWDataAccess(rgw::sal::Driver* _driver);
+
+ class Object;
+ class Bucket;
+
+ using BucketRef = std::shared_ptr<Bucket>;
+ using ObjectRef = std::shared_ptr<Object>;
+
+ class Bucket : public std::enable_shared_from_this<Bucket> {
+ friend class RGWDataAccess;
+ friend class Object;
+
+ RGWDataAccess *sd{nullptr};
+ RGWBucketInfo bucket_info;
+ std::string tenant;
+ std::string name;
+ std::string bucket_id;
+ ceph::real_time mtime;
+ std::map<std::string, bufferlist> attrs;
+
+ RGWAccessControlPolicy policy;
+ int finish_init();
+
+ Bucket(RGWDataAccess *_sd,
+ const std::string& _tenant,
+ const std::string& _name,
+ const std::string& _bucket_id) : sd(_sd),
+ tenant(_tenant),
+ name(_name),
+ bucket_id(_bucket_id) {}
+ Bucket(RGWDataAccess *_sd) : sd(_sd) {}
+ int init(const DoutPrefixProvider *dpp, optional_yield y);
+ int init(const RGWBucketInfo& _bucket_info, const std::map<std::string, bufferlist>& _attrs);
+ public:
+ int get_object(const rgw_obj_key& key,
+ ObjectRef *obj);
+
+ };
+
+
+ class Object {
+ RGWDataAccess *sd{nullptr};
+ BucketRef bucket;
+ rgw_obj_key key;
+
+ ceph::real_time mtime;
+ std::string etag;
+ uint64_t olh_epoch{0};
+ ceph::real_time delete_at;
+ std::optional<std::string> user_data;
+
+ std::optional<bufferlist> aclbl;
+
+ Object(RGWDataAccess *_sd,
+ BucketRef&& _bucket,
+ const rgw_obj_key& _key) : sd(_sd),
+ bucket(_bucket),
+ key(_key) {}
+ public:
+ int put(bufferlist& data, std::map<std::string, bufferlist>& attrs, const DoutPrefixProvider *dpp, optional_yield y); /* might modify attrs */
+
+ void set_mtime(const ceph::real_time& _mtime) {
+ mtime = _mtime;
+ }
+
+ void set_etag(const std::string& _etag) {
+ etag = _etag;
+ }
+
+ void set_olh_epoch(uint64_t epoch) {
+ olh_epoch = epoch;
+ }
+
+ void set_delete_at(ceph::real_time _delete_at) {
+ delete_at = _delete_at;
+ }
+
+ void set_user_data(const std::string& _user_data) {
+ user_data = _user_data;
+ }
+
+ void set_policy(const RGWAccessControlPolicy& policy);
+
+ friend class Bucket;
+ };
+
+ int get_bucket(const DoutPrefixProvider *dpp,
+ const std::string& tenant,
+ const std::string name,
+ const std::string bucket_id,
+ BucketRef *bucket,
+ optional_yield y) {
+ bucket->reset(new Bucket(this, tenant, name, bucket_id));
+ return (*bucket)->init(dpp, y);
+ }
+
+ int get_bucket(const RGWBucketInfo& bucket_info,
+ const std::map<std::string, bufferlist>& attrs,
+ BucketRef *bucket) {
+ bucket->reset(new Bucket(this));
+ return (*bucket)->init(bucket_info, attrs);
+ }
+ friend class Bucket;
+ friend class Object;
+};
+
+using RGWDataAccessRef = std::shared_ptr<RGWDataAccess>;
+