summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJason Dillaman <dillaman@redhat.com>2015-10-07 20:56:22 +0200
committerJason Dillaman <dillaman@redhat.com>2015-10-07 22:34:55 +0200
commit65fb1b86cbab65023f6207798c9e189bce55dcf6 (patch)
tree0f58000d22c93e6626881b4c80b9c6d463087e4e
parentrbd-replay: improve error messages (diff)
downloadceph-65fb1b86cbab65023f6207798c9e189bce55dcf6.tar.xz
ceph-65fb1b86cbab65023f6207798c9e189bce55dcf6.zip
rbd-replay-prep: simplify IO dependency calculation
Only track read-after-write and write-after-write IO dependencies via the associated write completions. All IO events after a write completion are considered to be dependent and can be pruned down to at most the number of concurrent IOs. This reduces the prep time from a simple 'rbd bench-write' from over 4 hrs down to seconds. Fixes: #13378, #13384 Backport: hammer Signed-off-by: Jason Dillaman <dillaman@redhat.com>
-rw-r--r--src/rbd_replay/actions.cc10
-rw-r--r--src/rbd_replay/actions.hpp8
-rw-r--r--src/rbd_replay/ios.cc130
-rw-r--r--src/rbd_replay/ios.hpp114
-rw-r--r--src/rbd_replay/rbd-replay-prep.cc173
5 files changed, 93 insertions, 342 deletions
diff --git a/src/rbd_replay/actions.cc b/src/rbd_replay/actions.cc
index 2bcdfb194ec..0c327b653ed 100644
--- a/src/rbd_replay/actions.cc
+++ b/src/rbd_replay/actions.cc
@@ -25,13 +25,9 @@ using namespace std;
Action::Action(action_id_t id,
thread_id_t thread_id,
- int num_successors,
- int num_completion_successors,
std::vector<dependency_d> &predecessors)
: m_id(id),
m_thread_id(thread_id),
- m_num_successors(num_successors),
- m_num_completion_successors(num_completion_successors),
m_predecessors(predecessors) {
}
@@ -45,8 +41,8 @@ Action::ptr Action::read_from(Deser &d) {
}
uint32_t ionum = d.read_uint32_t();
uint64_t thread_id = d.read_uint64_t();
- uint32_t num_successors = d.read_uint32_t();
- uint32_t num_completion_successors = d.read_uint32_t();
+ d.read_uint32_t(); // unused
+ d.read_uint32_t(); // unused
uint32_t num_dependencies = d.read_uint32_t();
vector<dependency_d> deps;
for (unsigned int i = 0; i < num_dependencies; i++) {
@@ -54,7 +50,7 @@ Action::ptr Action::read_from(Deser &d) {
uint64_t time_delta = d.read_uint64_t();
deps.push_back(dependency_d(dep_id, time_delta));
}
- DummyAction dummy(ionum, thread_id, num_successors, num_completion_successors, deps);
+ DummyAction dummy(ionum, thread_id, deps);
switch (type) {
case IO_START_THREAD:
return StartThreadAction::read_from(dummy, d);
diff --git a/src/rbd_replay/actions.hpp b/src/rbd_replay/actions.hpp
index 068e4dc4139..e9522dbf99f 100644
--- a/src/rbd_replay/actions.hpp
+++ b/src/rbd_replay/actions.hpp
@@ -133,8 +133,6 @@ public:
Action(action_id_t id,
thread_id_t thread_id,
- int num_successors,
- int num_completion_successors,
std::vector<dependency_d> &predecessors);
virtual ~Action();
@@ -176,8 +174,6 @@ private:
const action_id_t m_id;
const thread_id_t m_thread_id;
- const int m_num_successors;
- const int m_num_completion_successors;
const std::vector<dependency_d> m_predecessors;
};
@@ -194,10 +190,8 @@ class DummyAction : public Action {
public:
DummyAction(action_id_t id,
thread_id_t thread_id,
- int num_successors,
- int num_completion_successors,
std::vector<dependency_d> &predecessors)
- : Action(id, thread_id, num_successors, num_completion_successors, predecessors) {
+ : Action(id, thread_id, predecessors) {
}
void perform(ActionCtx &ctx) {
diff --git a/src/rbd_replay/ios.cc b/src/rbd_replay/ios.cc
index ccc560f85c8..21a68019ece 100644
--- a/src/rbd_replay/ios.cc
+++ b/src/rbd_replay/ios.cc
@@ -24,33 +24,6 @@ bool rbd_replay::compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2) {
return p1->start_time() < p2->start_time();
}
-static uint64_t min_time(const map<action_id_t, IO::ptr>& s) {
- if (s.empty()) {
- return 0;
- }
- return s.begin()->second->start_time();
-}
-
-static uint64_t max_time(const map<action_id_t, IO::ptr>& s) {
- if (s.empty()) {
- return 0;
- }
- map<action_id_t, IO::ptr>::const_iterator itr(s.end());
- --itr;
- return itr->second->start_time();
-}
-
-void IO::add_dependencies(const io_set_t& deps) {
- io_set_t base(m_dependencies);
- for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
- ptr dep(*itr);
- for (io_set_t::const_iterator itr2 = dep->m_dependencies.begin(); itr2 != dep->m_dependencies.end(); ++itr2) {
- base.insert(*itr2);
- }
- }
- batch_unreachable_from(deps, base, &m_dependencies);
-}
-
void IO::write_debug_base(ostream& out, string type) const {
out << m_ionum << ": " << m_start_time / 1000000.0 << ": " << type << ", thread = " << m_thread_id << ", deps = {";
bool first = true;
@@ -62,16 +35,17 @@ void IO::write_debug_base(ostream& out, string type) const {
}
out << (*itr)->m_ionum << ": " << m_start_time - (*itr)->m_start_time;
}
- out << "}, num_successors = " << m_num_successors << ", numCompletionSuccessors = " << num_completion_successors();
+ out << "}";
}
void IO::write_to(Ser& out, io_type iotype) const {
+ // TODO break compatibility now to add version (and yank unused fields)?
out.write_uint8_t(iotype);
out.write_uint32_t(m_ionum);
out.write_uint64_t(m_thread_id);
- out.write_uint32_t(m_num_successors);
- out.write_uint32_t(num_completion_successors());
+ out.write_uint32_t(0);
+ out.write_uint32_t(0);
out.write_uint32_t(m_dependencies.size());
vector<IO::ptr> deps;
for (io_set_t::const_iterator itr = m_dependencies.begin(), end = m_dependencies.end(); itr != end; ++itr) {
@@ -84,102 +58,6 @@ void IO::write_to(Ser& out, io_type iotype) const {
}
}
-IO::ptr IO::create_completion(uint64_t start_time, thread_id_t thread_id) {
- assert(!m_completion.lock());
- IO::ptr completion(new CompletionIO(m_ionum + 1, start_time, thread_id));
- m_completion = completion;
- completion->m_dependencies.insert(shared_from_this());
- return completion;
-}
-
-
-// TODO: Add unit tests
-// Anything in 'deps' which is not reachable from 'base' is added to 'unreachable'
-void rbd_replay::batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable) {
- if (deps.empty()) {
- return;
- }
-
- map<action_id_t, IO::ptr> searching_for;
- for (io_set_t::const_iterator itr = deps.begin(); itr != deps.end(); ++itr) {
- searching_for[(*itr)->ionum()] = *itr;
- }
-
- map<action_id_t, IO::ptr> boundary;
- for (io_set_t::const_iterator itr = base.begin(); itr != base.end(); ++itr) {
- boundary[(*itr)->ionum()] = *itr;
- }
-
- // The boundary horizon is the maximum timestamp of IOs in the boundary.
- // This monotonically decreases, because dependencies (which are added to the set)
- // have earlier timestamp than the dependent IOs (which were just removed from the set).
- uint64_t boundary_horizon = max_time(boundary);
-
- for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
- if (boundary_horizon >= itr->second->start_time()) {
- break;
- }
- unreachable->insert(itr->second);
- searching_for.erase(itr++);
- }
- if (searching_for.empty()) {
- return;
- }
-
- // The searching horizon is the minimum timestamp of IOs in the searching set.
- // This monotonically increases, because elements are only removed from the set.
- uint64_t searching_horizon = min_time(searching_for);
-
- while (!boundary.empty()) {
- // Take an IO from the end, which has the highest timestamp.
- // This reduces the boundary horizon as early as possible,
- // which means we can short cut as soon as possible.
- map<action_id_t, boost::shared_ptr<IO> >::iterator b_itr(boundary.end());
- --b_itr;
- boost::shared_ptr<IO> io(b_itr->second);
- boundary.erase(b_itr);
-
- for (io_set_t::const_iterator itr = io->dependencies().begin(), end = io->dependencies().end(); itr != end; ++itr) {
- IO::ptr dep(*itr);
- assertf(dep->ionum() < io->ionum(), "IO: %d, dependency: %d", io->ionum(), dep->ionum());
- io_map_t::iterator p = searching_for.find(dep->ionum());
- if (p != searching_for.end()) {
- searching_for.erase(p);
- if (dep->start_time() == searching_horizon) {
- searching_horizon = min_time(searching_for);
- if (searching_horizon == 0) {
- return;
- }
- }
- }
- boundary[dep->ionum()] = dep;
- }
-
- boundary_horizon = max_time(boundary);
- if (boundary_horizon != 0) {
- // Anything we're searching for that has a timestamp greater than the
- // boundary horizon will never be found, since the boundary horizon
- // falls monotonically.
- for (io_map_t::iterator itr = searching_for.begin(); itr != searching_for.end(); ) {
- if (boundary_horizon >= itr->second->start_time()) {
- break;
- }
- unreachable->insert(itr->second);
- searching_for.erase(itr++);
- }
- searching_horizon = min_time(searching_for);
- if (searching_horizon == 0) {
- return;
- }
- }
- }
-
- // Anything we're still searching for has not been found.
- for (io_map_t::iterator itr = searching_for.begin(), end = searching_for.end(); itr != end; ++itr) {
- unreachable->insert(itr->second);
- }
-}
-
ostream& operator<<(ostream& out, IO::ptr io) {
io->write_debug(out);
return out;
diff --git a/src/rbd_replay/ios.hpp b/src/rbd_replay/ios.hpp
index 5bebcd71731..7d4153a82e3 100644
--- a/src/rbd_replay/ios.hpp
+++ b/src/rbd_replay/ios.hpp
@@ -36,23 +36,6 @@ typedef std::set<boost::shared_ptr<IO> > io_set_t;
typedef std::map<action_id_t, boost::shared_ptr<IO> > io_map_t;
/**
- Calculates reachability of IOs in the dependency graph.
- All IOs in \c deps which are not transitive dependencies of anything in \c base
- is added to \c unreachable.
- In other words, for every IO \c x in \c deps: if nothing in \c base depends on \c x,
- and nothing in \c base has dependencies that depend on \c x, etc.,
- then \c x is added to \c unreachable.
- Note that \c unreachable is \em not cleared, so the same set can be used across multiple
- calls to collect dependencies.
- @param[in] deps IOs to search for
- @param[in] base root set of IOs to search from
- @param[out] unreachable collects unreachable IOs
- @related IO
-*/
-void batch_unreachable_from(const io_set_t& deps, const io_set_t& base, io_set_t* unreachable);
-
-
-/**
Used by rbd-replay-prep for processing the raw trace.
Corresponds to the Action class, except that Actions are executed by rbd-replay,
and IOs are used by rbd-replay-prep for processing the raw trace.
@@ -67,19 +50,16 @@ public:
@param ionum ID of this %IO
@param start_time time the %IO started, in nanoseconds
@param thread_id ID of the thread that issued the %IO
- @param prev previously issued %IO on the same thread. NULL for the first %IO on a thread.
*/
IO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id,
- ptr prev)
+ const io_set_t& deps)
: m_ionum(ionum),
m_start_time(start_time),
- m_dependencies(io_set_t()),
- m_completion(weak_ptr()),
- m_num_successors(0),
+ m_dependencies(deps),
m_thread_id(thread_id),
- m_prev(prev) {
+ m_completed(false) {
}
virtual ~IO() {
@@ -97,22 +77,8 @@ public:
return m_dependencies;
}
- void add_dependencies(const io_set_t& deps);
-
- /**
- Returns the completion's number of successors, or 0 if the %IO does not have a completion.
- */
- uint64_t num_completion_successors() const {
- ptr c(m_completion.lock());
- return c ? c->m_num_successors : 0;
- }
-
virtual void write_to(Ser& out) const = 0;
- virtual bool is_completion() const {
- return false;
- }
-
void set_ionum(action_id_t ionum) {
m_ionum = ionum;
}
@@ -121,27 +87,8 @@ public:
return m_ionum;
}
- ptr prev() const {
- return m_prev;
- }
-
- void set_num_successors(uint32_t n) {
- m_num_successors = n;
- }
-
- uint32_t num_successors() const {
- return m_num_successors;
- }
-
virtual void write_debug(std::ostream& out) const = 0;
- /**
- Creates the completion for this IO.
- This may only be called once per IO, and may not be called on completion IOs.
- The completion must be stored, or else m_completion will expire.
- */
- ptr create_completion(uint64_t start_time, thread_id_t thread_id);
-
protected:
void write_to(Ser& out, io_type iotype) const;
@@ -151,10 +98,8 @@ private:
action_id_t m_ionum;
uint64_t m_start_time;
io_set_t m_dependencies;
- boost::weak_ptr<IO> m_completion;
- uint32_t m_num_successors;
thread_id_t m_thread_id;
- ptr m_prev;
+ bool m_completed;
};
/// Used for dumping debug info.
@@ -167,7 +112,7 @@ public:
StartThreadIO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id)
- : IO(ionum, start_time, thread_id, IO::ptr()) {
+ : IO(ionum, start_time, thread_id, io_set_t()) {
}
void write_to(Ser& out) const;
@@ -179,8 +124,9 @@ class StopThreadIO : public IO {
public:
StopThreadIO(action_id_t ionum,
uint64_t start_time,
- thread_id_t thread_id)
- : IO(ionum, start_time, thread_id, IO::ptr()) {
+ thread_id_t thread_id,
+ const io_set_t& deps)
+ : IO(ionum, start_time, thread_id, deps) {
}
void write_to(Ser& out) const;
@@ -193,11 +139,11 @@ public:
ReadIO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id,
- IO::ptr prev,
+ const io_set_t& deps,
imagectx_id_t imagectx,
uint64_t offset,
uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
+ : IO(ionum, start_time, thread_id, deps),
m_imagectx(imagectx),
m_offset(offset),
m_length(length) {
@@ -218,11 +164,11 @@ public:
WriteIO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id,
- IO::ptr prev,
+ const io_set_t& deps,
imagectx_id_t imagectx,
uint64_t offset,
uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
+ : IO(ionum, start_time, thread_id, deps),
m_imagectx(imagectx),
m_offset(offset),
m_length(length) {
@@ -243,11 +189,11 @@ public:
AioReadIO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id,
- IO::ptr prev,
+ const io_set_t& deps,
imagectx_id_t imagectx,
uint64_t offset,
uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
+ : IO(ionum, start_time, thread_id, deps),
m_imagectx(imagectx),
m_offset(offset),
m_length(length) {
@@ -268,11 +214,11 @@ public:
AioWriteIO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id,
- IO::ptr prev,
+ const io_set_t& deps,
imagectx_id_t imagectx,
uint64_t offset,
uint64_t length)
- : IO(ionum, start_time, thread_id, prev),
+ : IO(ionum, start_time, thread_id, deps),
m_imagectx(imagectx),
m_offset(offset),
m_length(length) {
@@ -293,12 +239,12 @@ public:
OpenImageIO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id,
- IO::ptr prev,
+ const io_set_t& deps,
imagectx_id_t imagectx,
const std::string& name,
const std::string& snap_name,
bool readonly)
- : IO(ionum, start_time, thread_id, prev),
+ : IO(ionum, start_time, thread_id, deps),
m_imagectx(imagectx),
m_name(name),
m_snap_name(snap_name),
@@ -325,9 +271,9 @@ public:
CloseImageIO(action_id_t ionum,
uint64_t start_time,
thread_id_t thread_id,
- IO::ptr prev,
+ const io_set_t& deps,
imagectx_id_t imagectx)
- : IO(ionum, start_time, thread_id, prev),
+ : IO(ionum, start_time, thread_id, deps),
m_imagectx(imagectx) {
}
@@ -343,26 +289,6 @@ private:
imagectx_id_t m_imagectx;
};
-class CompletionIO : public IO {
-public:
- CompletionIO(action_id_t ionum,
- uint64_t start_time,
- thread_id_t thread_id)
- : IO(ionum, start_time, thread_id, IO::ptr()) {
- }
-
- void write_to(Ser& out) const {
- }
-
- bool is_completion() const {
- return true;
- }
-
- void write_debug(std::ostream& out) const {
- write_debug_base(out, "completion");
- }
-};
-
/// @related IO
bool compare_io_ptrs_by_start_time(IO::ptr p1, IO::ptr p2);
diff --git a/src/rbd_replay/rbd-replay-prep.cc b/src/rbd_replay/rbd-replay-prep.cc
index e0998f5b2f3..0a108759008 100644
--- a/src/rbd_replay/rbd-replay-prep.cc
+++ b/src/rbd_replay/rbd-replay-prep.cc
@@ -22,6 +22,7 @@
#include <string>
#include <assert.h>
#include <fstream>
+#include <set>
#include <boost/thread/thread.hpp>
#include "ios.hpp"
@@ -42,7 +43,6 @@ public:
uint64_t window)
: m_id(id),
m_window(window),
- m_pending_io(IO::ptr()),
m_latest_io(IO::ptr()),
m_max_ts(0) {
}
@@ -57,35 +57,26 @@ public:
return m_max_ts;
}
- void issued_io(IO::ptr io, const map<thread_id_t, ptr>& threads) {
+ void issued_io(IO::ptr io, std::set<IO::ptr> *latest_ios) {
assert(io);
- io_set_t latest_ios;
- for (map<thread_id_t, ptr>::const_iterator itr = threads.begin(), end = threads.end(); itr != end; ++itr) {
- assertf(itr->second, "id = %ld", itr->first);
- ptr thread(itr->second);
- if (thread->m_latest_io) {
- if (thread->m_latest_io->start_time() + m_window > io->start_time()) {
- latest_ios.insert(thread->m_latest_io);
- }
- }
+ if (m_latest_io.get() != NULL) {
+ latest_ios->erase(m_latest_io);
}
- io->add_dependencies(latest_ios);
m_latest_io = io;
- m_pending_io = io;
+ latest_ios->insert(io);
}
thread_id_t id() const {
return m_id;
}
- IO::ptr pending_io() {
- return m_pending_io;
+ IO::ptr latest_io() {
+ return m_latest_io;
}
private:
thread_id_t m_id;
uint64_t m_window;
- IO::ptr m_pending_io;
IO::ptr m_latest_io;
uint64_t m_max_ts;
};
@@ -137,14 +128,8 @@ class Processor {
public:
Processor()
: m_window(1000000000ULL), // 1 billion nanoseconds, i.e., one second
- m_threads(),
m_io_count(0),
- m_recent_completions(io_set_t()),
- m_open_images(set<imagectx_id_t>()),
- m_ios(vector<IO::ptr>()),
- m_pending_ios(map<uint64_t, IO::ptr>()),
- m_anonymize(false),
- m_anonymized_images(map<string, AnonymizedImage>()) {
+ m_anonymize(false) {
}
void run(vector<string> args) {
@@ -160,8 +145,6 @@ public:
}
m_window = (uint64_t)(1e9 * atof(args[++i].c_str()));
} else if (arg.find("--window=") == 0) {
- // TODO: test
- printf("Arg: '%s'\n", arg.c_str() + sizeof("--window="));
m_window = (uint64_t)(1e9 * atof(arg.c_str() + sizeof("--window=")));
} else if (arg == "--anonymize") {
m_anonymize = true;
@@ -231,25 +214,6 @@ public:
insert_thread_stops();
- for (vector<IO::ptr>::const_iterator itr = m_ios.begin(); itr != m_ios.end(); ++itr) {
- IO::ptr io(*itr);
- IO::ptr prev(io->prev());
- if (prev) {
- // TODO: explain when prev is and isn't a dep
- io_set_t::iterator depitr = io->dependencies().find(prev);
- if (depitr != io->dependencies().end()) {
- io->dependencies().erase(depitr);
- }
- }
- if (io->is_completion()) {
- io->dependencies().clear();
- }
- for (io_set_t::const_iterator depitr = io->dependencies().begin(); depitr != io->dependencies().end(); ++depitr) {
- IO::ptr dep(*depitr);
- dep->set_num_successors(dep->num_successors() + 1);
- }
- }
-
ofstream myfile;
myfile.open(output_file_name.c_str(), ios::out | ios::binary);
Ser ser(myfile);
@@ -274,16 +238,10 @@ private:
}
if (io->start_time() > thread->max_ts()) {
ionum = io->ionum();
- if (ionum & 1) {
- ionum++;
- }
break;
}
}
if (ionum == none) {
- if (maxIONum & 1) {
- maxIONum--;
- }
ionum = maxIONum + 2;
}
for (vector<IO::ptr>::const_iterator itr2 = m_ios.begin(); itr2 != m_ios.end(); ++itr2) {
@@ -292,7 +250,9 @@ private:
io->set_ionum(io->ionum() + 2);
}
}
- IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(), thread->id()));
+ IO::ptr stop_thread_io(new StopThreadIO(ionum, thread->max_ts(),
+ thread->id(),
+ m_recent_completions));
vector<IO::ptr>::iterator insertion_point = lower_bound(m_ios.begin(), m_ios.end(), stop_thread_io, compare_io_ptrs_by_start_time);
m_ios.insert(insertion_point, stop_thread_io);
}
@@ -366,55 +326,51 @@ private:
const struct bt_definition *m_scope;
} fields(evt, scope_fields);
- if (strcmp(event_name, "librbd:read_enter") == 0 ||
- strcmp(event_name, "librbd:read2_enter") == 0) {
- string name(fields.string("name"));
- string snap_name(fields.string("snap_name"));
- bool readonly = fields.int64("read_only");
- imagectx_id_t imagectx = fields.uint64("imagectx");
- uint64_t offset = fields.uint64("offset");
- uint64_t length = fields.uint64("length");
- require_image(ts, thread, imagectx, name, snap_name, readonly);
- action_id_t ionum = next_id();
- IO::ptr io(new ReadIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
- io->add_dependencies(m_recent_completions);
- thread->issued_io(io, m_threads);
- m_ios.push_back(io);
- } else if (strcmp(event_name, "librbd:open_image_enter") == 0) {
+ if (strcmp(event_name, "librbd:open_image_enter") == 0) {
string name(fields.string("name"));
string snap_name(fields.string("snap_name"));
- bool readonly = fields.int64("read_only");
+ bool readonly = fields.uint64("read_only");
imagectx_id_t imagectx = fields.uint64("imagectx");
action_id_t ionum = next_id();
pair<string, string> aname(map_image_snap(name, snap_name));
- IO::ptr io(new OpenImageIO(ionum, ts, threadID, thread->pending_io(), imagectx, aname.first, aname.second, readonly));
- io->add_dependencies(m_recent_completions);
- thread->issued_io(io, m_threads);
+ IO::ptr io(new OpenImageIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, aname.first, aname.second,
+ readonly));
+ thread->issued_io(io, &m_latest_ios);
m_ios.push_back(io);
} else if (strcmp(event_name, "librbd:open_image_exit") == 0) {
- IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
- m_ios.push_back(completionIO);
- boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->pending_io()));
+ completed(thread->latest_io());
+ boost::shared_ptr<OpenImageIO> io(boost::dynamic_pointer_cast<OpenImageIO>(thread->latest_io()));
assert(io);
m_open_images.insert(io->imagectx());
} else if (strcmp(event_name, "librbd:close_image_enter") == 0) {
imagectx_id_t imagectx = fields.uint64("imagectx");
action_id_t ionum = next_id();
- IO::ptr io(new CloseImageIO(ionum, ts, threadID, thread->pending_io(), imagectx));
- io->add_dependencies(m_recent_completions);
- thread->issued_io(io, m_threads);
- m_ios.push_back(thread->pending_io());
+ IO::ptr io(new CloseImageIO(ionum, ts, threadID, m_recent_completions,
+ imagectx));
+ thread->issued_io(io, &m_latest_ios);
+ m_ios.push_back(thread->latest_io());
} else if (strcmp(event_name, "librbd:close_image_exit") == 0) {
- IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
- m_ios.push_back(completionIO);
- completed(completionIO);
- boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->pending_io()));
+ completed(thread->latest_io());
+ boost::shared_ptr<CloseImageIO> io(boost::dynamic_pointer_cast<CloseImageIO>(thread->latest_io()));
assert(io);
m_open_images.erase(io->imagectx());
+ } else if (strcmp(event_name, "librbd:read_enter") == 0 ||
+ strcmp(event_name, "librbd:read2_enter") == 0) {
+ string name(fields.string("name"));
+ string snap_name(fields.string("snap_name"));
+ bool readonly = fields.int64("read_only");
+ imagectx_id_t imagectx = fields.uint64("imagectx");
+ uint64_t offset = fields.uint64("offset");
+ uint64_t length = fields.uint64("length");
+ require_image(ts, thread, imagectx, name, snap_name, readonly);
+ action_id_t ionum = next_id();
+ IO::ptr io(new ReadIO(ionum, ts, threadID, m_recent_completions, imagectx,
+ offset, length));
+ thread->issued_io(io, &m_latest_ios);
+ m_ios.push_back(io);
} else if (strcmp(event_name, "librbd:read_exit") == 0) {
- IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
- m_ios.push_back(completionIO);
- completed(completionIO);
+ completed(thread->latest_io());
} else if (strcmp(event_name, "librbd:write_enter") == 0 ||
strcmp(event_name, "librbd:write2_enter") == 0) {
string name(fields.string("name"));
@@ -425,14 +381,12 @@ private:
imagectx_id_t imagectx = fields.uint64("imagectx");
require_image(ts, thread, imagectx, name, snap_name, readonly);
action_id_t ionum = next_id();
- IO::ptr io(new WriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
- io->add_dependencies(m_recent_completions);
- thread->issued_io(io, m_threads);
+ IO::ptr io(new WriteIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
+ thread->issued_io(io, &m_latest_ios);
m_ios.push_back(io);
} else if (strcmp(event_name, "librbd:write_exit") == 0) {
- IO::ptr completionIO(thread->pending_io()->create_completion(ts, threadID));
- m_ios.push_back(completionIO);
- completed(completionIO);
+ completed(thread->latest_io());
} else if (strcmp(event_name, "librbd:aio_read_enter") == 0 ||
strcmp(event_name, "librbd:aio_read2_enter") == 0) {
string name(fields.string("name"));
@@ -444,10 +398,10 @@ private:
uint64_t length = fields.uint64("length");
require_image(ts, thread, imagectx, name, snap_name, readonly);
action_id_t ionum = next_id();
- IO::ptr io(new AioReadIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
- io->add_dependencies(m_recent_completions);
+ IO::ptr io(new AioReadIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
m_ios.push_back(io);
- thread->issued_io(io, m_threads);
+ thread->issued_io(io, &m_latest_ios);
m_pending_ios[completion] = io;
} else if (strcmp(event_name, "librbd:aio_write_enter") == 0 ||
strcmp(event_name, "librbd:aio_write2_enter") == 0) {
@@ -460,9 +414,9 @@ private:
imagectx_id_t imagectx = fields.uint64("imagectx");
require_image(ts, thread, imagectx, name, snap_name, readonly);
action_id_t ionum = next_id();
- IO::ptr io(new AioWriteIO(ionum, ts, threadID, thread->pending_io(), imagectx, offset, length));
- io->add_dependencies(m_recent_completions);
- thread->issued_io(io, m_threads);
+ IO::ptr io(new AioWriteIO(ionum, ts, threadID, m_recent_completions,
+ imagectx, offset, length));
+ thread->issued_io(io, &m_latest_ios);
m_ios.push_back(io);
m_pending_ios[completion] = io;
} else if (strcmp(event_name, "librbd:aio_complete_enter") == 0) {
@@ -471,9 +425,7 @@ private:
if (itr != m_pending_ios.end()) {
IO::ptr completedIO(itr->second);
m_pending_ios.erase(itr);
- IO::ptr completionIO(completedIO->create_completion(ts, threadID));
- m_ios.push_back(completionIO);
- completed(completionIO);
+ completed(completedIO);
}
}
@@ -487,9 +439,14 @@ private:
}
void completed(IO::ptr io) {
- uint64_t limit = io->start_time() < m_window ? 0 : io->start_time() - m_window;
- for (io_set_t::iterator itr = m_recent_completions.begin(); itr != m_recent_completions.end(); ) {
- if ((*itr)->start_time() < limit) {
+ uint64_t limit = (io->start_time() < m_window ?
+ 0 : io->start_time() - m_window);
+ for (io_set_t::iterator itr = m_recent_completions.begin();
+ itr != m_recent_completions.end(); ) {
+ IO::ptr recent_comp(*itr);
+ if ((recent_comp->start_time() < limit ||
+ io->dependencies().count(recent_comp) != 0) &&
+ m_latest_ios.count(recent_comp) == 0) {
m_recent_completions.erase(itr++);
} else {
++itr;
@@ -521,13 +478,12 @@ private:
}
action_id_t ionum = next_id();
pair<string, string> aname(map_image_snap(name, snap_name));
- IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(), thread->pending_io(), imagectx, aname.first, aname.second, readonly));
- io->add_dependencies(m_recent_completions);
- thread->issued_io(io, m_threads);
+ IO::ptr io(new OpenImageIO(ionum, ts - 2, thread->id(),
+ m_recent_completions, imagectx, aname.first,
+ aname.second, readonly));
+ thread->issued_io(io, &m_latest_ios);
m_ios.push_back(io);
- IO::ptr completionIO(io->create_completion(ts - 1, thread->id()));
- m_ios.push_back(completionIO);
- completed(completionIO);
+ completed(io);
m_open_images.insert(imagectx);
}
@@ -540,6 +496,7 @@ private:
// keyed by completion
map<uint64_t, IO::ptr> m_pending_ios;
+ std::set<IO::ptr> m_latest_ios;
bool m_anonymize;
map<string, AnonymizedImage> m_anonymized_images;