#ifndef CEPH_RGW_CR_RADOS_H #define CEPH_RGW_CR_RADOS_H #include #include "include/assert.h" #include "rgw_coroutine.h" #include "rgw_rados.h" #include "common/WorkQueue.h" #include "common/Throttle.h" #include class RGWAsyncRadosRequest : public RefCountedObject { RGWCoroutine *caller; RGWAioCompletionNotifier *notifier; int retcode; Mutex lock; protected: virtual int _send_request() = 0; public: RGWAsyncRadosRequest(RGWCoroutine *_caller, RGWAioCompletionNotifier *_cn) : caller(_caller), notifier(_cn), retcode(0), lock("RGWAsyncRadosRequest::lock") { } ~RGWAsyncRadosRequest() override { if (notifier) { notifier->put(); } } void send_request() { get(); retcode = _send_request(); { Mutex::Locker l(lock); if (notifier) { notifier->cb(); // drops its own ref notifier = nullptr; } } put(); } int get_ret_status() { return retcode; } void finish() { { Mutex::Locker l(lock); if (notifier) { // we won't call notifier->cb() to drop its ref, so drop it here notifier->put(); notifier = nullptr; } } put(); } }; class RGWAsyncRadosProcessor { deque m_req_queue; std::atomic going_down = { false }; protected: RGWRados *store; ThreadPool m_tp; Throttle req_throttle; struct RGWWQ : public ThreadPool::WorkQueue { RGWAsyncRadosProcessor *processor; RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp) : ThreadPool::WorkQueue("RGWWQ", timeout, suicide_timeout, tp), processor(p) {} bool _enqueue(RGWAsyncRadosRequest *req) override; void _dequeue(RGWAsyncRadosRequest *req) override { ceph_abort(); } bool _empty() override; RGWAsyncRadosRequest *_dequeue() override; using ThreadPool::WorkQueue::_process; void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override; void _dump_queue(); void _clear() override { assert(processor->m_req_queue.empty()); } } req_wq; public: RGWAsyncRadosProcessor(RGWRados *_store, int num_threads); ~RGWAsyncRadosProcessor() {} void start(); void stop(); void handle_request(RGWAsyncRadosRequest *req); void queue(RGWAsyncRadosRequest *req); bool is_going_down() { return going_down; } }; class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest { RGWRados *store; RGWObjectCtx *obj_ctx; RGWRados::SystemObject::Read::GetObjState read_state; RGWObjVersionTracker *objv_tracker; rgw_raw_obj obj; bufferlist *pbl; map *pattrs; off_t ofs; off_t end; protected: int _send_request() override; public: RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx, RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, bufferlist *_pbl, off_t _ofs, off_t _end); void set_read_attrs(map *_pattrs) { pattrs = _pattrs; } }; class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest { RGWRados *store; RGWObjVersionTracker *objv_tracker; rgw_raw_obj obj; bool exclusive; bufferlist bl; protected: int _send_request() override; public: RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj, bool _exclusive, bufferlist& _bl); }; class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest { RGWRados *store; RGWObjVersionTracker *objv_tracker; rgw_raw_obj obj; map *attrs; protected: int _send_request() override; public: RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, map *_attrs); }; class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest { RGWRados *store; rgw_raw_obj obj; string lock_name; string cookie; uint32_t duration_secs; protected: int _send_request() override; public: RGWAsyncLockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, const string& _name, const string& _cookie, uint32_t _duration_secs); }; class RGWAsyncUnlockSystemObj : public RGWAsyncRadosRequest { RGWRados *store; rgw_raw_obj obj; string lock_name; string cookie; protected: int _send_request() override; public: RGWAsyncUnlockSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj, const string& _name, const string& _cookie); }; template class RGWSimpleRadosReadCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; RGWObjectCtx obj_ctx; bufferlist bl; rgw_raw_obj obj; map *pattrs{nullptr}; T *result; /// on ENOENT, call handle_data() with an empty object instead of failing const bool empty_on_enoent; RGWObjVersionTracker *objv_tracker; RGWAsyncGetSystemObj *req{nullptr}; public: RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, T *_result, bool empty_on_enoent = true, RGWObjVersionTracker *objv_tracker = nullptr) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), obj_ctx(store), obj(_obj), result(_result), empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {} ~RGWSimpleRadosReadCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override; int request_complete() override; virtual int handle_data(T& data) { return 0; } }; template int RGWSimpleRadosReadCR::send_request() { req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(), store, &obj_ctx, objv_tracker, obj, &bl, 0, -1); if (pattrs) { req->set_read_attrs(pattrs); } async_rados->queue(req); return 0; } template int RGWSimpleRadosReadCR::request_complete() { int ret = req->get_ret_status(); retcode = ret; if (ret == -ENOENT && empty_on_enoent) { *result = T(); } else { if (ret < 0) { return ret; } try { bufferlist::iterator iter = bl.begin(); if (iter.end()) { // allow successful reads with empty buffers. ReadSyncStatus coroutines // depend on this to be able to read without locking, because the // cls lock from InitSyncStatus will create an empty object if it didnt // exist *result = T(); } else { ::decode(*result, iter); } } catch (buffer::error& err) { return -EIO; } } return handle_data(*result); } class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; RGWObjectCtx obj_ctx; bufferlist bl; rgw_raw_obj obj; map *pattrs; RGWAsyncGetSystemObj *req; public: RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, map *_pattrs) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), obj_ctx(store), obj(_obj), pattrs(_pattrs), req(NULL) { } ~RGWSimpleRadosReadAttrsCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override; int request_complete() override; }; template class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; bufferlist bl; rgw_raw_obj obj; RGWObjVersionTracker *objv_tracker; RGWAsyncPutSystemObj *req{nullptr}; public: RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, const T& _data, RGWObjVersionTracker *objv_tracker = nullptr) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), obj(_obj), objv_tracker(objv_tracker) { ::encode(_data, bl); } ~RGWSimpleRadosWriteCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override { req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(), store, objv_tracker, obj, false, bl); async_rados->queue(req); return 0; } int request_complete() override { return req->get_ret_status(); } }; class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; rgw_raw_obj obj; map attrs; RGWAsyncPutSystemObjAttrs *req; public: RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, map& _attrs) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), obj(_obj), attrs(_attrs), req(NULL) { } ~RGWSimpleRadosWriteAttrsCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override { req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(), store, NULL, obj, &attrs); async_rados->queue(req); return 0; } int request_complete() override { return req->get_ret_status(); } }; class RGWRadosSetOmapKeysCR : public RGWSimpleCoroutine { RGWRados *store; map entries; rgw_rados_ref ref; rgw_raw_obj obj; boost::intrusive_ptr cn; public: RGWRadosSetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj, map& _entries); int send_request() override; int request_complete() override; }; class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine { RGWRados *store; string marker; map *entries; int max_entries; int rval; rgw_rados_ref ref; rgw_raw_obj obj; boost::intrusive_ptr cn; public: RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj, const string& _marker, map *_entries, int _max_entries); int send_request() override; int request_complete() override { return rval; } }; class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine { RGWRados *store; rgw_rados_ref ref; set keys; rgw_raw_obj obj; boost::intrusive_ptr cn; public: RGWRadosRemoveOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj, const set& _keys); int send_request() override; int request_complete() override; }; class RGWRadosRemoveCR : public RGWSimpleCoroutine { RGWRados *store; librados::IoCtx ioctx; const rgw_raw_obj obj; boost::intrusive_ptr cn; public: RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj); int send_request(); int request_complete(); }; class RGWSimpleRadosLockCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; string lock_name; string cookie; uint32_t duration; rgw_raw_obj obj; RGWAsyncLockSystemObj *req; public: RGWSimpleRadosLockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, const string& _lock_name, const string& _cookie, uint32_t _duration); ~RGWSimpleRadosLockCR() override { request_cleanup(); } void request_cleanup() override; int send_request() override; int request_complete() override; static std::string gen_random_cookie(CephContext* cct) { #define COOKIE_LEN 16 char buf[COOKIE_LEN + 1]; gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); return buf; } }; class RGWSimpleRadosUnlockCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; string lock_name; string cookie; rgw_raw_obj obj; RGWAsyncUnlockSystemObj *req; public: RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, const string& _lock_name, const string& _cookie); ~RGWSimpleRadosUnlockCR() override { request_cleanup(); } void request_cleanup() override; int send_request() override; int request_complete() override; }; #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100 class RGWOmapAppend : public RGWConsumerCR { RGWAsyncRadosProcessor *async_rados; RGWRados *store; rgw_raw_obj obj; bool going_down; int num_pending_entries; list pending_entries; map entries; uint64_t window_size; uint64_t total_entries; public: RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, uint64_t _window_size = OMAP_APPEND_MAX_ENTRIES_DEFAULT); int operate() override; void flush_pending(); bool append(const string& s); bool finish(); uint64_t get_total_entries() { return total_entries; } const rgw_raw_obj& get_obj() { return obj; } }; class RGWAsyncWait : public RGWAsyncRadosRequest { CephContext *cct; Mutex *lock; Cond *cond; utime_t interval; protected: int _send_request() override { Mutex::Locker l(*lock); return cond->WaitInterval(*lock, interval); } public: RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct, Mutex *_lock, Cond *_cond, int _secs) : RGWAsyncRadosRequest(caller, cn), cct(_cct), lock(_lock), cond(_cond), interval(_secs, 0) {} void wakeup() { Mutex::Locker l(*lock); cond->Signal(); } }; class RGWWaitCR : public RGWSimpleCoroutine { CephContext *cct; RGWAsyncRadosProcessor *async_rados; Mutex *lock; Cond *cond; int secs; RGWAsyncWait *req; public: RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct, Mutex *_lock, Cond *_cond, int _secs) : RGWSimpleCoroutine(_cct), cct(_cct), async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) { } ~RGWWaitCR() override { request_cleanup(); } void request_cleanup() override { if (req) { wakeup(); req->finish(); req = NULL; } } int send_request() override { req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs); async_rados->queue(req); return 0; } int request_complete() override { return req->get_ret_status(); } void wakeup() { req->wakeup(); } }; class RGWShardedOmapCRManager { RGWAsyncRadosProcessor *async_rados; RGWRados *store; RGWCoroutine *op; int num_shards; vector shards; public: RGWShardedOmapCRManager(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutine *_op, int _num_shards, const rgw_pool& pool, const string& oid_prefix) : async_rados(_async_rados), store(_store), op(_op), num_shards(_num_shards) { shards.reserve(num_shards); for (int i = 0; i < num_shards; ++i) { char buf[oid_prefix.size() + 16]; snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i); RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, rgw_raw_obj(pool, buf)); shard->get(); shards.push_back(shard); op->spawn(shard, false); } } ~RGWShardedOmapCRManager() { for (auto shard : shards) { shard->put(); } } bool append(const string& entry, int shard_id) { return shards[shard_id]->append(entry); } bool finish() { bool success = true; for (vector::iterator iter = shards.begin(); iter != shards.end(); ++iter) { success &= ((*iter)->finish() && (!(*iter)->is_error())); } return success; } uint64_t get_total_entries(int shard_id) { return shards[shard_id]->get_total_entries(); } }; class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest { RGWRados *store; const std::string oid; RGWBucketInfo *bucket_info; protected: int _send_request() override; public: RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, const std::string& oid, RGWBucketInfo *_bucket_info) : RGWAsyncRadosRequest(caller, cn), store(_store), oid(oid), bucket_info(_bucket_info) {} }; class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; const std::string oid; RGWBucketInfo *bucket_info; RGWAsyncGetBucketInstanceInfo *req{nullptr}; public: // metadata key constructor RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const std::string& meta_key, RGWBucketInfo *_bucket_info) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), oid(RGW_BUCKET_INSTANCE_MD_PREFIX + meta_key), bucket_info(_bucket_info) {} // rgw_bucket constructor RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_bucket& bucket, RGWBucketInfo *_bucket_info) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), oid(RGW_BUCKET_INSTANCE_MD_PREFIX + bucket.get_key(':')), bucket_info(_bucket_info) {} ~RGWGetBucketInstanceInfoCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override { req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid, bucket_info); async_rados->queue(req); return 0; } int request_complete() override { return req->get_ret_status(); } }; class RGWRadosBILogTrimCR : public RGWSimpleCoroutine { RGWRados::BucketShard bs; std::string start_marker; std::string end_marker; boost::intrusive_ptr cn; public: RGWRadosBILogTrimCR(RGWRados *store, const RGWBucketInfo& bucket_info, int shard_id, const std::string& start_marker, const std::string& end_marker); int send_request() override; int request_complete() override; }; class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { RGWRados *store; string source_zone; RGWBucketInfo bucket_info; rgw_obj_key key; uint64_t versioned_epoch; real_time src_mtime; bool copy_if_newer; rgw_zone_set *zones_trace; protected: int _send_request() override; public: RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, uint64_t _versioned_epoch, bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), key(_key), versioned_epoch(_versioned_epoch), copy_if_newer(_if_newer), zones_trace(_zones_trace) {} }; class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { CephContext *cct; RGWAsyncRadosProcessor *async_rados; RGWRados *store; string source_zone; RGWBucketInfo bucket_info; rgw_obj_key key; uint64_t versioned_epoch; real_time src_mtime; bool copy_if_newer; RGWAsyncFetchRemoteObj *req; rgw_zone_set *zones_trace; public: RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, uint64_t _versioned_epoch, bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), key(_key), versioned_epoch(_versioned_epoch), copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {} ~RGWFetchRemoteObjCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override { req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info, key, versioned_epoch, copy_if_newer, zones_trace); async_rados->queue(req); return 0; } int request_complete() override { return req->get_ret_status(); } }; class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest { RGWRados *store; string source_zone; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time *pmtime; uint64_t *psize; map *pattrs; protected: int _send_request() override; public: RGWAsyncStatRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, ceph::real_time *_pmtime, uint64_t *_psize, map *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), key(_key), pmtime(_pmtime), psize(_psize), pattrs(_pattrs) {} }; class RGWStatRemoteObjCR : public RGWSimpleCoroutine { CephContext *cct; RGWAsyncRadosProcessor *async_rados; RGWRados *store; string source_zone; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time *pmtime; uint64_t *psize; map *pattrs; RGWAsyncStatRemoteObj *req; public: RGWStatRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, ceph::real_time *_pmtime, uint64_t *_psize, map *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), key(_key), pmtime(_pmtime), psize(_psize), pattrs(_pattrs), req(NULL) {} ~RGWStatRemoteObjCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override { req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info, key, pmtime, psize, pattrs); async_rados->queue(req); return 0; } int request_complete() override { return req->get_ret_status(); } }; class RGWAsyncRemoveObj : public RGWAsyncRadosRequest { RGWRados *store; string source_zone; RGWBucketInfo bucket_info; rgw_obj_key key; string owner; string owner_display_name; bool versioned; uint64_t versioned_epoch; string marker_version_id; bool del_if_older; ceph::real_time timestamp; rgw_zone_set *zones_trace; protected: int _send_request() override; public: RGWAsyncRemoveObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, const string& _owner, const string& _owner_display_name, bool _versioned, uint64_t _versioned_epoch, bool _delete_marker, bool _if_older, real_time& _timestamp, rgw_zone_set* _zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), key(_key), owner(_owner), owner_display_name(_owner_display_name), versioned(_versioned), versioned_epoch(_versioned_epoch), del_if_older(_if_older), timestamp(_timestamp), zones_trace(_zones_trace) { if (_delete_marker) { marker_version_id = key.instance; } } }; class RGWRemoveObjCR : public RGWSimpleCoroutine { CephContext *cct; RGWAsyncRadosProcessor *async_rados; RGWRados *store; string source_zone; RGWBucketInfo bucket_info; rgw_obj_key key; bool versioned; uint64_t versioned_epoch; bool delete_marker; string owner; string owner_display_name; bool del_if_older; real_time timestamp; RGWAsyncRemoveObj *req; rgw_zone_set *zones_trace; public: RGWRemoveObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const string& _source_zone, RGWBucketInfo& _bucket_info, const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch, string *_owner, string *_owner_display_name, bool _delete_marker, real_time *_timestamp, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), bucket_info(_bucket_info), key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch), delete_marker(_delete_marker), req(NULL), zones_trace(_zones_trace) { del_if_older = (_timestamp != NULL); if (_timestamp) { timestamp = *_timestamp; } if (_owner) { owner = *_owner; } if (_owner_display_name) { owner_display_name = *_owner_display_name; } } ~RGWRemoveObjCR() override { request_cleanup(); } void request_cleanup() override { if (req) { req->finish(); req = NULL; } } int send_request() override { req = new RGWAsyncRemoveObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info, key, owner, owner_display_name, versioned, versioned_epoch, delete_marker, del_if_older, timestamp, zones_trace); async_rados->queue(req); return 0; } int request_complete() override { return req->get_ret_status(); } }; class RGWContinuousLeaseCR : public RGWCoroutine { RGWAsyncRadosProcessor *async_rados; RGWRados *store; const rgw_raw_obj obj; const string lock_name; const string cookie; int interval; Mutex lock; std::atomic going_down = { false }; bool locked{false}; RGWCoroutine *caller; bool aborted{false}; public: RGWContinuousLeaseCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, const rgw_raw_obj& _obj, const string& _lock_name, int _interval, RGWCoroutine *_caller) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), obj(_obj), lock_name(_lock_name), cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)), interval(_interval), lock("RGWContinuousLeaseCR"), caller(_caller) {} int operate() override; bool is_locked() { Mutex::Locker l(lock); return locked; } void set_locked(bool status) { Mutex::Locker l(lock); locked = status; } void go_down() { going_down = true; wakeup(); } void abort() { aborted = true; } }; class RGWRadosTimelogAddCR : public RGWSimpleCoroutine { RGWRados *store; list entries; string oid; boost::intrusive_ptr cn; public: RGWRadosTimelogAddCR(RGWRados *_store, const string& _oid, const cls_log_entry& entry); int send_request() override; int request_complete() override; }; class RGWRadosTimelogTrimCR : public RGWSimpleCoroutine { RGWRados *store; boost::intrusive_ptr cn; protected: std::string oid; real_time start_time; real_time end_time; std::string from_marker; std::string to_marker; public: RGWRadosTimelogTrimCR(RGWRados *store, const std::string& oid, const real_time& start_time, const real_time& end_time, const std::string& from_marker, const std::string& to_marker); int send_request() override; int request_complete() override; }; // wrapper to update last_trim_marker on success class RGWSyncLogTrimCR : public RGWRadosTimelogTrimCR { CephContext *cct; std::string *last_trim_marker; public: RGWSyncLogTrimCR(RGWRados *store, const std::string& oid, const std::string& to_marker, std::string *last_trim_marker); int request_complete() override; }; class RGWAsyncStatObj : public RGWAsyncRadosRequest { RGWRados *store; RGWBucketInfo bucket_info; rgw_obj obj; uint64_t *psize; real_time *pmtime; uint64_t *pepoch; RGWObjVersionTracker *objv_tracker; protected: int _send_request() override; public: RGWAsyncStatObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *store, const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr, real_time *pmtime = nullptr, uint64_t *pepoch = nullptr, RGWObjVersionTracker *objv_tracker = nullptr) : RGWAsyncRadosRequest(caller, cn), store(store), obj(obj), psize(psize), pmtime(pmtime), pepoch(pepoch), objv_tracker(objv_tracker) {} }; class RGWStatObjCR : public RGWSimpleCoroutine { RGWRados *store; RGWAsyncRadosProcessor *async_rados; RGWBucketInfo bucket_info; rgw_obj obj; uint64_t *psize; real_time *pmtime; uint64_t *pepoch; RGWObjVersionTracker *objv_tracker; RGWAsyncStatObj *req = nullptr; public: RGWStatObjCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store, const RGWBucketInfo& _bucket_info, const rgw_obj& obj, uint64_t *psize = nullptr, real_time* pmtime = nullptr, uint64_t *pepoch = nullptr, RGWObjVersionTracker *objv_tracker = nullptr); ~RGWStatObjCR() override { request_cleanup(); } void request_cleanup() override; int send_request() override; int request_complete() override; }; /// coroutine wrapper for IoCtx::aio_notify() class RGWRadosNotifyCR : public RGWSimpleCoroutine { RGWRados *const store; const rgw_raw_obj obj; bufferlist request; const uint64_t timeout_ms; bufferlist *response; rgw_rados_ref ref; boost::intrusive_ptr cn; public: RGWRadosNotifyCR(RGWRados *store, const rgw_raw_obj& obj, bufferlist& request, uint64_t timeout_ms, bufferlist *response); int send_request() override; int request_complete() override; }; #endif