diff options
-rw-r--r-- | src/client/ObjecterWriteback.h | 12 | ||||
-rw-r--r-- | src/librbd/LibrbdWriteback.cc | 69 | ||||
-rw-r--r-- | src/librbd/LibrbdWriteback.h | 28 | ||||
-rw-r--r-- | src/osdc/ObjectCacher.cc | 37 | ||||
-rw-r--r-- | src/osdc/ObjectCacher.h | 1 | ||||
-rw-r--r-- | src/osdc/WritebackHandler.h | 8 | ||||
-rw-r--r-- | src/test/osdc/FakeWriteback.cc | 11 | ||||
-rw-r--r-- | src/test/osdc/FakeWriteback.h | 8 |
8 files changed, 121 insertions, 53 deletions
diff --git a/src/client/ObjecterWriteback.h b/src/client/ObjecterWriteback.h index 1aa5e4932df..9a10fb48a06 100644 --- a/src/client/ObjecterWriteback.h +++ b/src/client/ObjecterWriteback.h @@ -11,12 +11,12 @@ class ObjecterWriteback : public WritebackHandler { ObjecterWriteback(Objecter *o) : m_objecter(o) {} virtual ~ObjecterWriteback() {} - virtual tid_t read(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snapid, - bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, - Context *onfinish) { - return m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0, - trunc_size, trunc_seq, onfinish); + virtual void read(const object_t& oid, const object_locator_t& oloc, + uint64_t off, uint64_t len, snapid_t snapid, + bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, + Context *onfinish) { + m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0, + trunc_size, trunc_seq, onfinish); } virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) { diff --git a/src/librbd/LibrbdWriteback.cc b/src/librbd/LibrbdWriteback.cc index 1689ad91860..237901dc61f 100644 --- a/src/librbd/LibrbdWriteback.cc +++ b/src/librbd/LibrbdWriteback.cc @@ -48,7 +48,6 @@ namespace librbd { C_Request(CephContext *cct, Context *c, Mutex *l) : m_cct(cct), m_ctx(c), m_lock(l) {} virtual ~C_Request() {} - void set_req(AioRequest *req); virtual void finish(int r) { ldout(m_cct, 20) << "aio_cb completing " << dendl; { @@ -63,16 +62,39 @@ namespace librbd { Mutex *m_lock; }; + class C_OrderedWrite : public Context { + public: + C_OrderedWrite(CephContext *cct, LibrbdWriteback::write_result_d *result, + LibrbdWriteback *wb) + : m_cct(cct), m_result(result), m_wb_handler(wb) {} + virtual ~C_OrderedWrite() {} + virtual void finish(int r) { + ldout(m_cct, 20) << "C_OrderedWrite completing " << m_result << dendl; + { + Mutex::Locker l(m_wb_handler->m_lock); + assert(!m_result->done); + m_result->done = true; + m_result->ret = r; + m_wb_handler->complete_writes(m_result->oid); + } + ldout(m_cct, 20) << "C_OrderedWrite finished " << m_result << dendl; + } + private: + CephContext *m_cct; + LibrbdWriteback::write_result_d *m_result; + LibrbdWriteback *m_wb_handler; + }; + LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock) : m_tid(0), m_lock(lock), m_ictx(ictx) { } - tid_t LibrbdWriteback::read(const object_t& oid, - const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snapid, - bufferlist *pbl, uint64_t trunc_size, - __u32 trunc_seq, Context *onfinish) + void LibrbdWriteback::read(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, snapid_t snapid, + bufferlist *pbl, uint64_t trunc_size, + __u32 trunc_seq, Context *onfinish) { // on completion, take the mutex and then call onfinish. Context *req = new C_Request(m_ictx->cct, onfinish, &m_lock); @@ -82,7 +104,6 @@ namespace librbd { len, off); rados_completion->release(); assert(r >= 0); - return ++m_tid; } bool LibrbdWriteback::may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) @@ -132,8 +153,10 @@ namespace librbd { object_no, 0, m_ictx->layout.fl_object_size, objectx); uint64_t object_overlap = m_ictx->prune_parent_extents(objectx, overlap); - - C_Request *req_comp = new C_Request(m_ictx->cct, oncommit, &m_lock); + write_result_d *result = new write_result_d(oid.name, oncommit); + m_writes[oid.name].push(result); + ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl; + C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, this); AioWrite *req = new AioWrite(m_ictx, oid.name, object_no, off, objectx, object_overlap, bl, snapc, snap_id, @@ -141,4 +164,32 @@ namespace librbd { req->send(); return ++m_tid; } + + void LibrbdWriteback::complete_writes(const std::string& oid) + { + assert(m_lock.is_locked()); + std::queue<write_result_d*>& results = m_writes[oid]; + ldout(m_ictx->cct, 20) << "complete_writes() oid " << oid << dendl; + std::list<write_result_d*> finished; + + while (!results.empty()) { + write_result_d *result = results.front(); + if (!result->done) + break; + finished.push_back(result); + results.pop(); + } + + if (results.empty()) + m_writes.erase(oid); + + for (std::list<write_result_d*>::iterator it = finished.begin(); + it != finished.end(); ++it) { + write_result_d *result = *it; + ldout(m_ictx->cct, 20) << "complete_writes() completing " << result + << dendl; + result->oncommit->complete(result->ret); + delete result; + } + } } diff --git a/src/librbd/LibrbdWriteback.h b/src/librbd/LibrbdWriteback.h index b054dbc5950..ba8ff1f114d 100644 --- a/src/librbd/LibrbdWriteback.h +++ b/src/librbd/LibrbdWriteback.h @@ -3,6 +3,8 @@ #ifndef CEPH_LIBRBD_LIBRBDWRITEBACKHANDLER_H #define CEPH_LIBRBD_LIBRBDWRITEBACKHANDLER_H +#include <queue> + #include "include/Context.h" #include "include/types.h" #include "include/rados/librados.hpp" @@ -21,10 +23,10 @@ namespace librbd { virtual ~LibrbdWriteback() {} // Note that oloc, trunc_size, and trunc_seq are ignored - virtual tid_t read(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snapid, - bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, - Context *onfinish); + virtual void read(const object_t& oid, const object_locator_t& oloc, + uint64_t off, uint64_t len, snapid_t snapid, + bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, + Context *onfinish); // Determine whether a read to this extent could be affected by a write-triggered copy-on-write virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid); @@ -35,10 +37,26 @@ namespace librbd { const bufferlist &bl, utime_t mtime, uint64_t trunc_size, __u32 trunc_seq, Context *oncommit); + struct write_result_d { + bool done; + int ret; + std::string oid; + Context *oncommit; + write_result_d(const std::string& oid, Context *oncommit) : + done(false), ret(0), oid(oid), oncommit(oncommit) {} + private: + write_result_d(const write_result_d& rhs); + const write_result_d& operator=(const write_result_d& rhs); + }; + private: - int m_tid; + void complete_writes(const std::string& oid); + + tid_t m_tid; Mutex& m_lock; librbd::ImageCtx *m_ictx; + hash_map<std::string, std::queue<write_result_d*> > m_writes; + friend class C_OrderedWrite; }; } diff --git a/src/osdc/ObjectCacher.cc b/src/osdc/ObjectCacher.cc index 18a85c0b866..92f0d502746 100644 --- a/src/osdc/ObjectCacher.cc +++ b/src/osdc/ObjectCacher.cc @@ -1495,6 +1495,20 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length) return clean; } +bool ObjectCacher::_flush_set_finish(C_GatherBuilder *gather, Context *onfinish) +{ + assert(lock.is_locked()); + if (gather->has_subs()) { + gather->set_finisher(onfinish); + gather->activate(); + return false; + } + + ldout(cct, 10) << "flush_set has no dirty|tx bhs" << dendl; + onfinish->complete(0); + return true; +} + // flush. non-blocking, takes callback. // returns true if already flushed bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) @@ -1526,15 +1540,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish) } } - if (gather.has_subs()) { - gather.set_finisher(onfinish); - gather.activate(); - return false; - } else { - ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl; - onfinish->complete(0); - return true; - } + return _flush_set_finish(&gather, onfinish); } // flush. non-blocking, takes callback. @@ -1549,7 +1555,8 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context return true; } - ldout(cct, 10) << "flush_set " << oset << " on " << exv.size() << " ObjectExtents" << dendl; + ldout(cct, 10) << "flush_set " << oset << " on " << exv.size() + << " ObjectExtents" << dendl; // we'll need to wait for all objects to flush! C_GatherBuilder gather(cct); @@ -1573,15 +1580,7 @@ bool ObjectCacher::flush_set(ObjectSet *oset, vector<ObjectExtent>& exv, Context } } - if (gather.has_subs()) { - gather.set_finisher(onfinish); - gather.activate(); - return false; - } else { - ldout(cct, 10) << "flush_set " << oset << " has no dirty|tx bhs" << dendl; - onfinish->complete(0); - return true; - } + return _flush_set_finish(&gather, onfinish); } void ObjectCacher::purge_set(ObjectSet *oset) diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index 681b02406fa..a17046f9126 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -573,6 +573,7 @@ private: int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, Context *onfreespace); void maybe_wait_for_writeback(uint64_t len); + bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish); public: bool set_is_cached(ObjectSet *oset); diff --git a/src/osdc/WritebackHandler.h b/src/osdc/WritebackHandler.h index a9a035ca52f..17e1f683bec 100644 --- a/src/osdc/WritebackHandler.h +++ b/src/osdc/WritebackHandler.h @@ -12,10 +12,10 @@ class WritebackHandler { WritebackHandler() {} virtual ~WritebackHandler() {} - virtual tid_t read(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snapid, - bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, - Context *onfinish) = 0; + virtual void read(const object_t& oid, const object_locator_t& oloc, + uint64_t off, uint64_t len, snapid_t snapid, + bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, + Context *onfinish) = 0; /** * check if a given extent read result may change due to a write * diff --git a/src/test/osdc/FakeWriteback.cc b/src/test/osdc/FakeWriteback.cc index 4445140a6f5..b4cd35ea979 100644 --- a/src/test/osdc/FakeWriteback.cc +++ b/src/test/osdc/FakeWriteback.cc @@ -58,15 +58,14 @@ FakeWriteback::~FakeWriteback() delete m_finisher; } -tid_t FakeWriteback::read(const object_t& oid, - const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snapid, - bufferlist *pbl, uint64_t trunc_size, - __u32 trunc_seq, Context *onfinish) +void FakeWriteback::read(const object_t& oid, + const object_locator_t& oloc, + uint64_t off, uint64_t len, snapid_t snapid, + bufferlist *pbl, uint64_t trunc_size, + __u32 trunc_seq, Context *onfinish) { C_Delay *wrapper = new C_Delay(m_cct, onfinish, m_lock, off, pbl, m_delay_ns); m_finisher->queue(wrapper, len); - return m_tid.inc(); } tid_t FakeWriteback::write(const object_t& oid, diff --git a/src/test/osdc/FakeWriteback.h b/src/test/osdc/FakeWriteback.h index ff48592d728..e7d6dc16bb4 100644 --- a/src/test/osdc/FakeWriteback.h +++ b/src/test/osdc/FakeWriteback.h @@ -17,10 +17,10 @@ public: FakeWriteback(CephContext *cct, Mutex *lock, uint64_t delay_ns); virtual ~FakeWriteback(); - virtual tid_t read(const object_t& oid, const object_locator_t& oloc, - uint64_t off, uint64_t len, snapid_t snapid, - bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, - Context *onfinish); + virtual void read(const object_t& oid, const object_locator_t& oloc, + uint64_t off, uint64_t len, snapid_t snapid, + bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, + Context *onfinish); virtual tid_t write(const object_t& oid, const object_locator_t& oloc, uint64_t off, uint64_t len, const SnapContext& snapc, |