diff options
author | Jason Dillaman <dillaman@redhat.com> | 2015-10-07 20:56:22 +0200 |
---|---|---|
committer | Jason Dillaman <dillaman@redhat.com> | 2015-10-07 22:34:55 +0200 |
commit | 65fb1b86cbab65023f6207798c9e189bce55dcf6 (patch) | |
tree | 0f58000d22c93e6626881b4c80b9c6d463087e4e | |
parent | rbd-replay: improve error messages (diff) | |
download | ceph-65fb1b86cbab65023f6207798c9e189bce55dcf6.tar.xz ceph-65fb1b86cbab65023f6207798c9e189bce55dcf6.zip |
rbd-replay-prep: simplify IO dependency calculation
Only track read-after-write and write-after-write IO dependencies
via the associated write completions. All IO events after a write
completion are considered to be dependent and can be pruned down
to at most the number of concurrent IOs. This reduces the prep
time from a simple 'rbd bench-write' from over 4 hrs down to seconds.
Fixes: #13378, #13384
Backport: hammer
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
-rw-r--r-- | src/rbd_replay/actions.cc | 10 | ||||
-rw-r--r-- | src/rbd_replay/actions.hpp | 8 | ||||
-rw-r--r-- | src/rbd_replay/ios.cc | 130 | ||||
-rw-r--r-- | src/rbd_replay/ios.hpp | 114 | ||||
-rw-r--r-- | src/rbd_replay/rbd-replay-prep.cc | 173 |
5 files changed, 93 insertions, 342 deletions
diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc index 2bcdfb194ec..0c327b653ed 100644 --- a/src/rbd_replay/actions.cc +++ b/src/rbd_replay/actions.cc @@ -25,13 +25,9 @@ using namespace std; Action::Action(action_id_t id, thread_id_t thread_id, - int num_successors, - int num_completion_successors, std::vector<dependency_d> &predecessors) : m_id(id), m_thread_id(thread_id), - m_num_successors(num_successors), - m_num_completion_successors(num_completion_successors), m_predecessors(predecessors) { } @@ -45,8 +41,8 @@ Action::ptr Action::read_from(Deser &d) { } uint32_t ionum = d.read_uint32_t(); uint64_t thread_id = d.read_uint64_t(); - uint32_t num_successors = d.read_uint32_t(); - uint32_t num_completion_successors = d.read_uint32_t(); + d.read_uint32_t(); // unused + d.read_uint32_t(); // unused uint32_t num_dependencies = d.read_uint32_t(); vector<dependency_d> deps; for (unsigned int i = 0; i < num_dependencies; i++) { @@ -54,7 +50,7 @@ Action::ptr Action::read_from(Deser &d) { uint64_t time_delta = d.read_uint64_t(); deps.push_back(dependency_d(dep_id, time_delta)); } - DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps); + DummyAction dummy(ionum, thread_id, deps); switch (type) { case IO_START_THREAD: return StartThreadAction::read_from(dummy, d); diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp index 068e4dc4139..e9522dbf99f 100644 --- a/src/rbd_replay/actions.hpp +++ b/src/rbd_replay/actions.hpp @@ -133,8 +133,6 @@ public: Action(action_id_t id, thread_id_t thread_id, - int num_successors, - int num_completion_successors, std::vector<dependency_d> &predecessors); virtual ~Action(); @@ -176,8 +174,6 @@ private: const action_id_t m_id; const thread_id_t m_thread_id; - const int m_num_successors; - const int m_num_completion_successors; const std::vector<dependency_d> m_predecessors; }; @@ -194,10 +190,8 @@ class DummyAction : public Action { public: DummyAction(action_id_t id, thread_id_t thread_id, - int num_successors, - int num_completion_successors, std::vector<dependency_d> &predecessors) - : Action(id, thread_id, num_successors, num_completion_successors, predecessors) { + : Action(id, thread_id, predecessors) { } void perform(ActionCtx &ctx) { diff --git a/src/rbd_replay/ios.cc b/src/rbd_replay/ios.cc index ccc560f85c8..21a68019ece 100644 --- a/src/rbd_replay/ios.cc +++ b/src/rbd_replay/ios.cc @@ -24,33 +24,6 @@ bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) { return p1->start_time() < p2->start_time(); } -static uint64_t min_time(const map<action_id_t, IO::ptr>& s) { - if (s.empty()) { - return 0; - } - return s.begin()->second->start_time(); -} - -static uint64_t max_time(const map<action_id_t, IO::ptr>& s) { - if (s.empty()) { - return 0; - } - map<action_id_t, IO::ptr>::const_iterator itr(s.end()); - --itr; - return itr->second->start_time(); -} - -void IO::add_dependencies(const io_set_t& deps) { - io_set_t base(m_dependencies); - for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { - ptr dep(*itr); - for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) { - base.insert(*itr2); - } - } - batch_unreachable_from(deps, base, &m_dependencies); -} - void IO::write_debug_base(ostream& out, string type) const { out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {"; bool first = true; @@ -62,16 +35,17 @@ void IO::write_debug_base(ostream& out, string type) const { } out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time; } - out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors(); + out << "}"; } void IO::write_to(Ser& out, io_type iotype) const { + // TODO break compatibility now to add version (and yank unused fields)? out.write_uint8_t(iotype); out.write_uint32_t(m_ionum); out.write_uint64_t(m_thread_id); - out.write_uint32_t(m_num_successors); - out.write_uint32_t(num_completion_successors()); + out.write_uint32_t(0); + out.write_uint32_t(0); out.write_uint32_t(m_dependencies.size()); vector<IO::ptr> deps; for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) { @@ -84,102 +58,6 @@ void IO::write_to(Ser& out, io_type iotype) const { } } -IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) { - assert(!m_completion.lock()); - IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id)); - m_completion = completion; - completion->m_dependencies.insert(shared_from_this()); - return completion; -} - - -// TODO: Add unit tests -// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable' -void rbd_replay::batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) { - if (deps.empty()) { - return; - } - - map<action_id_t, IO::ptr> searching_for; - for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) { - searching_for[(*itr)->ionum()] = *itr; - } - - map<action_id_t, IO::ptr> boundary; - for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) { - boundary[(*itr)->ionum()] = *itr; - } - - // The boundary horizon is the maximum timestamp of IOs in the boundary. - // This monotonically decreases, because dependencies (which are added to the set) - // have earlier timestamp than the dependent IOs (which were just removed from the set). - uint64_t boundary_horizon = max_time(boundary); - - for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { - if (boundary_horizon >= itr->second->start_time()) { - break; - } - unreachable->insert(itr->second); - searching_for.erase(itr++); - } - if (searching_for.empty()) { - return; - } - - // The searching horizon is the minimum timestamp of IOs in the searching set. - // This monotonically increases, because elements are only removed from the set. - uint64_t searching_horizon = min_time(searching_for); - - while (!boundary.empty()) { - // Take an IO from the end, which has the highest timestamp. - // This reduces the boundary horizon as early as possible, - // which means we can short cut as soon as possible. - map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end()); - --b_itr; - boost::shared_ptr<IO> io(b_itr->second); - boundary.erase(b_itr); - - for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) { - IO::ptr dep(*itr); - assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum()); - io_map_t::iterator p = searching_for.find(dep->ionum()); - if (p != searching_for.end()) { - searching_for.erase(p); - if (dep->start_time() == searching_horizon) { - searching_horizon = min_time(searching_for); - if (searching_horizon == 0) { - return; - } - } - } - boundary[dep->ionum()] = dep; - } - - boundary_horizon = max_time(boundary); - if (boundary_horizon != 0) { - // Anything we're searching for that has a timestamp greater than the - // boundary horizon will never be found, since the boundary horizon - // falls monotonically. - for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) { - if (boundary_horizon >= itr->second->start_time()) { - break; - } - unreachable->insert(itr->second); - searching_for.erase(itr++); - } - searching_horizon = min_time(searching_for); - if (searching_horizon == 0) { - return; - } - } - } - - // Anything we're still searching for has not been found. - for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) { - unreachable->insert(itr->second); - } -} - ostream& operator<<(ostream& out, IO::ptr io) { io->write_debug(out); return out; diff --git a/src/rbd_replay/ios.hpp b/src/rbd_replay/ios.hpp index 5bebcd71731..7d4153a82e3 100644 --- a/src/rbd_replay/ios.hpp +++ b/src/rbd_replay/ios.hpp @@ -36,23 +36,6 @@ typedef std::set<boost::shared_ptr<IO> > io_set_t; typedef std::map<action_id_t, boost::shared_ptr<IO> > io_map_t; /** - Calculates reachability of IOs in the dependency graph. - All IOs in \c deps which are not transitive dependencies of anything in \c base - is added to \c unreachable. - In other words, for every IO \c x in \c deps: if nothing in \c base depends on \c x, - and nothing in \c base has dependencies that depend on \c x, etc., - then \c x is added to \c unreachable. - Note that \c unreachable is \em not cleared, so the same set can be used across multiple - calls to collect dependencies. - @param[in] deps IOs to search for - @param[in] base root set of IOs to search from - @param[out] unreachable collects unreachable IOs - @related IO -*/ -void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable); - - -/** Used by rbd-replay-prep for processing the raw trace. Corresponds to the Action class, except that Actions are executed by rbd-replay, and IOs are used by rbd-replay-prep for processing the raw trace. @@ -67,19 +50,16 @@ public: @param ionum ID of this %IO @param start_time time the %IO started, in nanoseconds @param thread_id ID of the thread that issued the %IO - @param prev previously issued %IO on the same thread. NULL for the first %IO on a thread. */ IO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id, - ptr prev) + const io_set_t& deps) : m_ionum(ionum), m_start_time(start_time), - m_dependencies(io_set_t()), - m_completion(weak_ptr()), - m_num_successors(0), + m_dependencies(deps), m_thread_id(thread_id), - m_prev(prev) { + m_completed(false) { } virtual ~IO() { @@ -97,22 +77,8 @@ public: return m_dependencies; } - void add_dependencies(const io_set_t& deps); - - /** - Returns the completion's number of successors, or 0 if the %IO does not have a completion. - */ - uint64_t num_completion_successors() const { - ptr c(m_completion.lock()); - return c ? c->m_num_successors : 0; - } - virtual void write_to(Ser& out) const = 0; - virtual bool is_completion() const { - return false; - } - void set_ionum(action_id_t ionum) { m_ionum = ionum; } @@ -121,27 +87,8 @@ public: return m_ionum; } - ptr prev() const { - return m_prev; - } - - void set_num_successors(uint32_t n) { - m_num_successors = n; - } - - uint32_t num_successors() const { - return m_num_successors; - } - virtual void write_debug(std::ostream& out) const = 0; - /** - Creates the completion for this IO. - This may only be called once per IO, and may not be called on completion IOs. - The completion must be stored, or else m_completion will expire. - */ - ptr create_completion(uint64_t start_time, thread_id_t thread_id); - protected: void write_to(Ser& out, io_type iotype) const; @@ -151,10 +98,8 @@ private: action_id_t m_ionum; uint64_t m_start_time; io_set_t m_dependencies; - boost::weak_ptr<IO> m_completion; - uint32_t m_num_successors; thread_id_t m_thread_id; - ptr m_prev; + bool m_completed; }; /// Used for dumping debug info. @@ -167,7 +112,7 @@ public: StartThreadIO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id) - : IO(ionum, start_time, thread_id, IO::ptr()) { + : IO(ionum, start_time, thread_id, io_set_t()) { } void write_to(Ser& out) const; @@ -179,8 +124,9 @@ class StopThreadIO : public IO { public: StopThreadIO(action_id_t ionum, uint64_t start_time, - thread_id_t thread_id) - : IO(ionum, start_time, thread_id, IO::ptr()) { + thread_id_t thread_id, + const io_set_t& deps) + : IO(ionum, start_time, thread_id, deps) { } void write_to(Ser& out) const; @@ -193,11 +139,11 @@ public: ReadIO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id, - IO::ptr prev, + const io_set_t& deps, imagectx_id_t imagectx, uint64_t offset, uint64_t length) - : IO(ionum, start_time, thread_id, prev), + : IO(ionum, start_time, thread_id, deps), m_imagectx(imagectx), m_offset(offset), m_length(length) { @@ -218,11 +164,11 @@ public: WriteIO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id, - IO::ptr prev, + const io_set_t& deps, imagectx_id_t imagectx, uint64_t offset, uint64_t length) - : IO(ionum, start_time, thread_id, prev), + : IO(ionum, start_time, thread_id, deps), m_imagectx(imagectx), m_offset(offset), m_length(length) { @@ -243,11 +189,11 @@ public: AioReadIO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id, - IO::ptr prev, + const io_set_t& deps, imagectx_id_t imagectx, uint64_t offset, uint64_t length) - : IO(ionum, start_time, thread_id, prev), + : IO(ionum, start_time, thread_id, deps), m_imagectx(imagectx), m_offset(offset), m_length(length) { @@ -268,11 +214,11 @@ public: AioWriteIO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id, - IO::ptr prev, + const io_set_t& deps, imagectx_id_t imagectx, uint64_t offset, uint64_t length) - : IO(ionum, start_time, thread_id, prev), + : IO(ionum, start_time, thread_id, deps), m_imagectx(imagectx), m_offset(offset), m_length(length) { @@ -293,12 +239,12 @@ public: OpenImageIO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id, - IO::ptr prev, + const io_set_t& deps, imagectx_id_t imagectx, const std::string& name, const std::string& snap_name, bool readonly) - : IO(ionum, start_time, thread_id, prev), + : IO(ionum, start_time, thread_id, deps), m_imagectx(imagectx), m_name(name), m_snap_name(snap_name), @@ -325,9 +271,9 @@ public: CloseImageIO(action_id_t ionum, uint64_t start_time, thread_id_t thread_id, - IO::ptr prev, + const io_set_t& deps, imagectx_id_t imagectx) - : IO(ionum, start_time, thread_id, prev), + : IO(ionum, start_time, thread_id, deps), m_imagectx(imagectx) { } @@ -343,26 +289,6 @@ private: imagectx_id_t m_imagectx; }; -class CompletionIO : public IO { -public: - CompletionIO(action_id_t ionum, - uint64_t start_time, - thread_id_t thread_id) - : IO(ionum, start_time, thread_id, IO::ptr()) { - } - - void write_to(Ser& out) const { - } - - bool is_completion() const { - return true; - } - - void write_debug(std::ostream& out) const { - write_debug_base(out, "completion"); - } -}; - /// @related IO bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2); diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc index e0998f5b2f3..0a108759008 100644 --- a/src/rbd_replay/rbd-replay-prep.cc +++ b/src/rbd_replay/rbd-replay-prep.cc @@ -22,6 +22,7 @@ #include <string> #include <assert.h> #include <fstream> +#include <set> #include <boost/thread/thread.hpp> #include "ios.hpp" @@ -42,7 +43,6 @@ public: uint64_t window) : m_id(id), m_window(window), - m_pending_io(IO::ptr()), m_latest_io(IO::ptr()), m_max_ts(0) { } @@ -57,35 +57,26 @@ public: return m_max_ts; } - void issued_io(IO::ptr io, const map<thread_id_t, ptr>& threads) { + void issued_io(IO::ptr io, std::set<IO::ptr> *latest_ios) { assert(io); - io_set_t latest_ios; - for (map<thread_id_t, ptr>::const_iterator itr = threads.begin(), end = threads.end(); itr != end; ++itr) { - assertf(itr->second, "id = %ld", itr->first); - ptr thread(itr->second); - if (thread->m_latest_io) { - if (thread->m_latest_io->start_time() + m_window > io->start_time()) { - latest_ios.insert(thread->m_latest_io); - } - } + if (m_latest_io.get() != NULL) { + latest_ios->erase(m_latest_io); } - io->add_dependencies(latest_ios); m_latest_io = io; - m_pending_io = io; + latest_ios->insert(io); } thread_id_t id() const { return m_id; } - IO::ptr pending_io() { - return m_pending_io; + IO::ptr latest_io() { + return m_latest_io; } private: thread_id_t m_id; uint64_t m_window; - IO::ptr m_pending_io; IO::ptr m_latest_io; uint64_t m_max_ts; }; @@ -137,14 +128,8 @@ class Processor { public: Processor() : m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second - m_threads(), m_io_count(0), - m_recent_completions(io_set_t()), - m_open_images(set<imagectx_id_t>()), - m_ios(vector<IO::ptr>()), - m_pending_ios(map<uint64_t, IO::ptr>()), - m_anonymize(false), - m_anonymized_images(map<string, AnonymizedImage>()) { + m_anonymize(false) { } void run(vector<string> args) { @@ -160,8 +145,6 @@ public: } m_window = (uint64_t)(1e9 * atof(args[++i].c_str())); } else if (arg.find("--window=") == 0) { - // TODO: test - printf("Arg: '%s'\n", arg.c_str() + sizeof("--window=")); m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window="))); } else if (arg == "--anonymize") { m_anonymize = true; @@ -231,25 +214,6 @@ public: insert_thread_stops(); - for (vector<IO::ptr>::const_iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) { - IO::ptr io(*itr); - IO::ptr prev(io->prev()); - if (prev) { - // TODO: explain when prev is and isn't a dep - io_set_t::iterator depitr = io->dependencies().find(prev); - if (depitr != io->dependencies().end()) { - io->dependencies().erase(depitr); - } - } - if (io->is_completion()) { - io->dependencies().clear(); - } - for (io_set_t::const_iterator depitr = io->dependencies().begin(); depitr != io->dependencies().end(); ++depitr) { - IO::ptr dep(*depitr); - dep->set_num_successors(dep->num_successors() + 1); - } - } - ofstream myfile; myfile.open(output_file_name.c_str(), ios::out | ios::binary); Ser ser(myfile); @@ -274,16 +238,10 @@ private: } if (io->start_time() > thread->max_ts()) { ionum = io->ionum(); - if (ionum & 1) { - ionum++; - } break; } } if (ionum == none) { - if (maxIONum & 1) { - maxIONum--; - } ionum = maxIONum + 2; } for (vector<IO::ptr>::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) { @@ -292,7 +250,9 @@ private: io->set_ionum(io->ionum() + 2); } } - IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(), thread->id())); + IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(), + thread->id(), + m_recent_completions)); vector<IO::ptr>::iterator insertion_point = lower_bound(m_ios.begin(), m_ios.end(), stop_thread_io, compare_io_ptrs_by_start_time); m_ios.insert(insertion_point, stop_thread_io); } @@ -366,55 +326,51 @@ private: const struct bt_definition *m_scope; } fields(evt, scope_fields); - if (strcmp(event_name, "librbd:read_enter") == 0 || - strcmp(event_name, "librbd:read2_enter") == 0) { - string name(fields.string("name")); - string snap_name(fields.string("snap_name")); - bool readonly = fields.int64("read_only"); - imagectx_id_t imagectx = fields.uint64("imagectx"); - uint64_t offset = fields.uint64("offset"); - uint64_t length = fields.uint64("length"); - require_image(ts, thread, imagectx, name, snap_name, readonly); - action_id_t ionum = next_id(); - IO::ptr io(new ReadIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length)); - io->add_dependencies(m_recent_completions); - thread->issued_io(io, m_threads); - m_ios.push_back(io); - } else if (strcmp(event_name, "librbd:open_image_enter") == 0) { + if (strcmp(event_name, "librbd:open_image_enter") == 0) { string name(fields.string("name")); string snap_name(fields.string("snap_name")); - bool readonly = fields.int64("read_only"); + bool readonly = fields.uint64("read_only"); imagectx_id_t imagectx = fields.uint64("imagectx"); action_id_t ionum = next_id(); pair<string, string> aname(map_image_snap(name, snap_name)); - IO::ptr io(new OpenImageIO(ionum, ts, threadID, thread->pending_io(), imagectx, aname.first, aname.second, readonly)); - io->add_dependencies(m_recent_completions); - thread->issued_io(io, m_threads); + IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions, + imagectx, aname.first, aname.second, + readonly)); + thread->issued_io(io, &m_latest_ios); m_ios.push_back(io); } else if (strcmp(event_name, "librbd:open_image_exit") == 0) { - IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); - m_ios.push_back(completionIO); - boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->pending_io())); + completed(thread->latest_io()); + boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io())); assert(io); m_open_images.insert(io->imagectx()); } else if (strcmp(event_name, "librbd:close_image_enter") == 0) { imagectx_id_t imagectx = fields.uint64("imagectx"); action_id_t ionum = next_id(); - IO::ptr io(new CloseImageIO(ionum, ts, threadID, thread->pending_io(), imagectx)); - io->add_dependencies(m_recent_completions); - thread->issued_io(io, m_threads); - m_ios.push_back(thread->pending_io()); + IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions, + imagectx)); + thread->issued_io(io, &m_latest_ios); + m_ios.push_back(thread->latest_io()); } else if (strcmp(event_name, "librbd:close_image_exit") == 0) { - IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); - m_ios.push_back(completionIO); - completed(completionIO); - boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->pending_io())); + completed(thread->latest_io()); + boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io())); assert(io); m_open_images.erase(io->imagectx()); + } else if (strcmp(event_name, "librbd:read_enter") == 0 || + strcmp(event_name, "librbd:read2_enter") == 0) { + string name(fields.string("name")); + string snap_name(fields.string("snap_name")); + bool readonly = fields.int64("read_only"); + imagectx_id_t imagectx = fields.uint64("imagectx"); + uint64_t offset = fields.uint64("offset"); + uint64_t length = fields.uint64("length"); + require_image(ts, thread, imagectx, name, snap_name, readonly); + action_id_t ionum = next_id(); + IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx, + offset, length)); + thread->issued_io(io, &m_latest_ios); + m_ios.push_back(io); } else if (strcmp(event_name, "librbd:read_exit") == 0) { - IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); - m_ios.push_back(completionIO); - completed(completionIO); + completed(thread->latest_io()); } else if (strcmp(event_name, "librbd:write_enter") == 0 || strcmp(event_name, "librbd:write2_enter") == 0) { string name(fields.string("name")); @@ -425,14 +381,12 @@ private: imagectx_id_t imagectx = fields.uint64("imagectx"); require_image(ts, thread, imagectx, name, snap_name, readonly); action_id_t ionum = next_id(); - IO::ptr io(new WriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length)); - io->add_dependencies(m_recent_completions); - thread->issued_io(io, m_threads); + IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); + thread->issued_io(io, &m_latest_ios); m_ios.push_back(io); } else if (strcmp(event_name, "librbd:write_exit") == 0) { - IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID)); - m_ios.push_back(completionIO); - completed(completionIO); + completed(thread->latest_io()); } else if (strcmp(event_name, "librbd:aio_read_enter") == 0 || strcmp(event_name, "librbd:aio_read2_enter") == 0) { string name(fields.string("name")); @@ -444,10 +398,10 @@ private: uint64_t length = fields.uint64("length"); require_image(ts, thread, imagectx, name, snap_name, readonly); action_id_t ionum = next_id(); - IO::ptr io(new AioReadIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length)); - io->add_dependencies(m_recent_completions); + IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); m_ios.push_back(io); - thread->issued_io(io, m_threads); + thread->issued_io(io, &m_latest_ios); m_pending_ios[completion] = io; } else if (strcmp(event_name, "librbd:aio_write_enter") == 0 || strcmp(event_name, "librbd:aio_write2_enter") == 0) { @@ -460,9 +414,9 @@ private: imagectx_id_t imagectx = fields.uint64("imagectx"); require_image(ts, thread, imagectx, name, snap_name, readonly); action_id_t ionum = next_id(); - IO::ptr io(new AioWriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length)); - io->add_dependencies(m_recent_completions); - thread->issued_io(io, m_threads); + IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions, + imagectx, offset, length)); + thread->issued_io(io, &m_latest_ios); m_ios.push_back(io); m_pending_ios[completion] = io; } else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) { @@ -471,9 +425,7 @@ private: if (itr != m_pending_ios.end()) { IO::ptr completedIO(itr->second); m_pending_ios.erase(itr); - IO::ptr completionIO(completedIO->create_completion(ts, threadID)); - m_ios.push_back(completionIO); - completed(completionIO); + completed(completedIO); } } @@ -487,9 +439,14 @@ private: } void completed(IO::ptr io) { - uint64_t limit = io->start_time() < m_window ? 0 : io->start_time() - m_window; - for (io_set_t::iterator itr = m_recent_completions.begin(); itr != m_recent_completions.end(); ) { - if ((*itr)->start_time() < limit) { + uint64_t limit = (io->start_time() < m_window ? + 0 : io->start_time() - m_window); + for (io_set_t::iterator itr = m_recent_completions.begin(); + itr != m_recent_completions.end(); ) { + IO::ptr recent_comp(*itr); + if ((recent_comp->start_time() < limit || + io->dependencies().count(recent_comp) != 0) && + m_latest_ios.count(recent_comp) == 0) { m_recent_completions.erase(itr++); } else { ++itr; @@ -521,13 +478,12 @@ private: } action_id_t ionum = next_id(); pair<string, string> aname(map_image_snap(name, snap_name)); - IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(), thread->pending_io(), imagectx, aname.first, aname.second, readonly)); - io->add_dependencies(m_recent_completions); - thread->issued_io(io, m_threads); + IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(), + m_recent_completions, imagectx, aname.first, + aname.second, readonly)); + thread->issued_io(io, &m_latest_ios); m_ios.push_back(io); - IO::ptr completionIO(io->create_completion(ts - 1, thread->id())); - m_ios.push_back(completionIO); - completed(completionIO); + completed(io); m_open_images.insert(imagectx); } @@ -540,6 +496,7 @@ private: // keyed by completion map<uint64_t, IO::ptr> m_pending_ios; + std::set<IO::ptr> m_latest_ios; bool m_anonymize; map<string, AnonymizedImage> m_anonymized_images; |