diff options
author | Sage Weil <sage@newdream.net> | 2017-02-15 23:20:18 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-02-15 23:20:18 +0100 |
commit | eb491a13dc2faee315bf894fc2043aacfb94d624 (patch) | |
tree | 35a331693017ca804002be0c64a9a4d88faf5c90 | |
parent | Merge pull request #13445 from liewegas/wip-rgw-thrash (diff) | |
parent | osd: fix backoff vs reset race (diff) | |
download | ceph-eb491a13dc2faee315bf894fc2043aacfb94d624.tar.xz ceph-eb491a13dc2faee315bf894fc2043aacfb94d624.zip |
Merge pull request #13235 from liewegas/wip-pg-split-interval
osd: have clients resend ops on pg split
Reviewed-by: Greg Farnum <gfarnum@redhat.com>
Reviewed-by: Josh Durgin <jdurgin@redhat.com>
Reviewed-by: Samuel Just <sjust@redhat.com>
48 files changed, 1340 insertions, 1189 deletions
diff --git a/qa/suites/rados/thrash/backoff/aggressive_peering.yaml b/qa/suites/rados/thrash/backoff/aggressive_peering.yaml deleted file mode 100644 index 4743d26eb63..00000000000 --- a/qa/suites/rados/thrash/backoff/aggressive_peering.yaml +++ /dev/null @@ -1,5 +0,0 @@ -overrides: - ceph: - conf: - osd: - osd peering aggressive backoff: true diff --git a/qa/suites/rados/thrash/backoff/peering.yaml b/qa/suites/rados/thrash/backoff/peering.yaml new file mode 100644 index 00000000000..66d06117ea2 --- /dev/null +++ b/qa/suites/rados/thrash/backoff/peering.yaml @@ -0,0 +1,5 @@ +overrides: + ceph: + conf: + osd: + osd backoff on peering: true diff --git a/qa/suites/rados/thrash/backoff/peering_and_degraded.yaml b/qa/suites/rados/thrash/backoff/peering_and_degraded.yaml new file mode 100644 index 00000000000..e6109906503 --- /dev/null +++ b/qa/suites/rados/thrash/backoff/peering_and_degraded.yaml @@ -0,0 +1,6 @@ +overrides: + ceph: + conf: + osd: + osd backoff on peering: true + osd backoff on degraded: true diff --git a/qa/suites/rados/thrash/backoff/peering_and_recovery.yaml b/qa/suites/rados/thrash/backoff/peering_and_recovery.yaml deleted file mode 100644 index 6521afef6b6..00000000000 --- a/qa/suites/rados/thrash/backoff/peering_and_recovery.yaml +++ /dev/null @@ -1,6 +0,0 @@ -overrides: - ceph: - conf: - osd: - osd peering aggressive backoff: true - osd recovery aggressive backoff: true diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 85bb3c2702c..72fe1e6fa7d 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -843,12 +843,13 @@ OPTION(osd_command_max_records, OPT_INT, 256) OPTION(osd_max_pg_blocked_by, OPT_U32, 16) // max peer osds to report that are blocking our progress OPTION(osd_op_log_threshold, OPT_INT, 5) // how many op log messages to show in one go OPTION(osd_verify_sparse_read_holes, OPT_BOOL, false) // read fiemap-reported holes and verify they are zeros -OPTION(osd_peering_aggressive_backoff, OPT_BOOL, false) // issue aggressive client backoff during peering -OPTION(osd_recovery_aggressive_backoff, OPT_BOOL, false) // issue aggressive client backoff during per-object recovery +OPTION(osd_backoff_on_unfound, OPT_BOOL, true) // object unfound +OPTION(osd_backoff_on_degraded, OPT_BOOL, false) // [mainly for debug?] object unreadable/writeable +OPTION(osd_backoff_on_down, OPT_BOOL, true) // pg in down/incomplete state +OPTION(osd_backoff_on_peering, OPT_BOOL, false) // [debug] pg peering OPTION(osd_debug_crash_on_ignored_backoff, OPT_BOOL, false) // crash osd if client ignores a backoff; useful for debugging OPTION(osd_debug_drop_ping_probability, OPT_DOUBLE, 0) OPTION(osd_debug_drop_ping_duration, OPT_INT, 0) -OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op OPTION(osd_debug_op_order, OPT_BOOL, false) OPTION(osd_debug_verify_missing_on_start, OPT_BOOL, false) OPTION(osd_debug_scrub_chance_rewrite_digest, OPT_U64, 0) diff --git a/src/messages/MOSDBackoff.h b/src/messages/MOSDBackoff.h index e501269b7ef..181aaf181b9 100644 --- a/src/messages/MOSDBackoff.h +++ b/src/messages/MOSDBackoff.h @@ -16,50 +16,65 @@ #ifndef CEPH_MOSDBACKOFF_H #define CEPH_MOSDBACKOFF_H -#include "msg/Message.h" +#include "MOSDFastDispatchOp.h" #include "osd/osd_types.h" -class MOSDBackoff : public Message { +class MOSDBackoff : public MOSDFastDispatchOp { public: + const int HEAD_VERSION = 1; + const int COMPAT_VERSION = 1; + + spg_t pgid; + epoch_t map_epoch = 0; uint8_t op = 0; ///< CEPH_OSD_BACKOFF_OP_* uint64_t id = 0; ///< unique id within this session hobject_t begin, end; ///< [) range to block, unless ==, block single obj - epoch_t osd_epoch = 0; - MOSDBackoff() : Message(CEPH_MSG_OSD_BACKOFF) {} - MOSDBackoff(uint8_t op_, uint64_t id_, - hobject_t begin_, hobject_t end_, epoch_t ep) - : Message(CEPH_MSG_OSD_BACKOFF), + spg_t get_spg() const override { + return pgid; + } + epoch_t get_map_epoch() const override { + return map_epoch; + } + + MOSDBackoff() + : MOSDFastDispatchOp(CEPH_MSG_OSD_BACKOFF, HEAD_VERSION, COMPAT_VERSION) {} + MOSDBackoff(spg_t pgid_, epoch_t ep, uint8_t op_, uint64_t id_, + hobject_t begin_, hobject_t end_) + : MOSDFastDispatchOp(CEPH_MSG_OSD_BACKOFF, HEAD_VERSION, COMPAT_VERSION), + pgid(pgid_), + map_epoch(ep), op(op_), id(id_), begin(begin_), - end(end_), - osd_epoch(ep) { } + end(end_) { } void encode_payload(uint64_t features) override { + ::encode(pgid, payload); + ::encode(map_epoch, payload); ::encode(op, payload); ::encode(id, payload); ::encode(begin, payload); ::encode(end, payload); - ::encode(osd_epoch, payload); } void decode_payload() override { auto p = payload.begin(); + ::decode(pgid, p); + ::decode(map_epoch, p); ::decode(op, p); ::decode(id, p); ::decode(begin, p); ::decode(end, p); - ::decode(osd_epoch, p); } const char *get_type_name() const override { return "osd_backoff"; } void print(ostream& out) const override { - out << "osd_backoff(" << ceph_osd_backoff_op_name(op) + out << "osd_backoff(" << pgid << " " << ceph_osd_backoff_op_name(op) << " id " << id << " [" << begin << "," << end << ")" - << " epoch " << osd_epoch << ")"; + << " e" << map_epoch << ")"; } }; diff --git a/src/messages/MOSDECSubOpRead.h b/src/messages/MOSDECSubOpRead.h index 3e315ef26a8..5199d32a27e 100644 --- a/src/messages/MOSDECSubOpRead.h +++ b/src/messages/MOSDECSubOpRead.h @@ -15,11 +15,10 @@ #ifndef MOSDECSUBOPREAD_H #define MOSDECSUBOPREAD_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" #include "osd/ECMsgTypes.h" -class MOSDECSubOpRead : public Message { +class MOSDECSubOpRead : public MOSDFastDispatchOp { static const int HEAD_VERSION = 2; static const int COMPAT_VERSION = 1; @@ -31,9 +30,15 @@ public: int get_cost() const { return 0; } + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } - MOSDECSubOpRead() : - Message(MSG_OSD_EC_READ, HEAD_VERSION, COMPAT_VERSION) + MOSDECSubOpRead() + : MOSDFastDispatchOp(MSG_OSD_EC_READ, HEAD_VERSION, COMPAT_VERSION) {} virtual void decode_payload() { diff --git a/src/messages/MOSDECSubOpReadReply.h b/src/messages/MOSDECSubOpReadReply.h index 28e2cf7e929..7d6512c8dd5 100644 --- a/src/messages/MOSDECSubOpReadReply.h +++ b/src/messages/MOSDECSubOpReadReply.h @@ -15,11 +15,10 @@ #ifndef MOSDECSUBOPREADREPLY_H #define MOSDECSUBOPREADREPLY_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" #include "osd/ECMsgTypes.h" -class MOSDECSubOpReadReply : public Message { +class MOSDECSubOpReadReply : public MOSDFastDispatchOp { static const int HEAD_VERSION = 1; static const int COMPAT_VERSION = 1; @@ -31,9 +30,15 @@ public: int get_cost() const { return 0; } + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } - MOSDECSubOpReadReply() : - Message(MSG_OSD_EC_READ_REPLY, HEAD_VERSION, COMPAT_VERSION) + MOSDECSubOpReadReply() + : MOSDFastDispatchOp(MSG_OSD_EC_READ_REPLY, HEAD_VERSION, COMPAT_VERSION) {} virtual void decode_payload() { diff --git a/src/messages/MOSDECSubOpWrite.h b/src/messages/MOSDECSubOpWrite.h index b3a8e3cdb1f..24b9a4e4120 100644 --- a/src/messages/MOSDECSubOpWrite.h +++ b/src/messages/MOSDECSubOpWrite.h @@ -15,11 +15,10 @@ #ifndef MOSDECSUBOPWRITE_H #define MOSDECSUBOPWRITE_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" #include "osd/ECMsgTypes.h" -class MOSDECSubOpWrite : public Message { +class MOSDECSubOpWrite : public MOSDFastDispatchOp { static const int HEAD_VERSION = 1; static const int COMPAT_VERSION = 1; @@ -31,12 +30,18 @@ public: int get_cost() const { return 0; } + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } MOSDECSubOpWrite() - : Message(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION) + : MOSDFastDispatchOp(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION) {} MOSDECSubOpWrite(ECSubWrite &in_op) - : Message(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION) { + : MOSDFastDispatchOp(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION) { op.claim(in_op); } diff --git a/src/messages/MOSDECSubOpWriteReply.h b/src/messages/MOSDECSubOpWriteReply.h index c2edfb38c3e..4a811cdf7a5 100644 --- a/src/messages/MOSDECSubOpWriteReply.h +++ b/src/messages/MOSDECSubOpWriteReply.h @@ -15,11 +15,10 @@ #ifndef MOSDECSUBOPWRITEREPLY_H #define MOSDECSUBOPWRITEREPLY_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" #include "osd/ECMsgTypes.h" -class MOSDECSubOpWriteReply : public Message { +class MOSDECSubOpWriteReply : public MOSDFastDispatchOp { static const int HEAD_VERSION = 1; static const int COMPAT_VERSION = 1; @@ -31,9 +30,15 @@ public: int get_cost() const { return 0; } + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } - MOSDECSubOpWriteReply() : - Message(MSG_OSD_EC_WRITE_REPLY, HEAD_VERSION, COMPAT_VERSION) + MOSDECSubOpWriteReply() + : MOSDFastDispatchOp(MSG_OSD_EC_WRITE_REPLY, HEAD_VERSION, COMPAT_VERSION) {} virtual void decode_payload() { diff --git a/src/messages/MOSDFastDispatchOp.h b/src/messages/MOSDFastDispatchOp.h new file mode 100644 index 00000000000..6babd16f796 --- /dev/null +++ b/src/messages/MOSDFastDispatchOp.h @@ -0,0 +1,19 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_MOSDFASTDISPATCHOP_H +#define CEPH_MOSDFASTDISPATCHOP_H + +#include "msg/Message.h" +#include "osd/osd_types.h" + +class MOSDFastDispatchOp : public Message { +public: + virtual epoch_t get_map_epoch() const = 0; + virtual spg_t get_spg() const = 0; + + MOSDFastDispatchOp(int t, int version, int compat_version) + : Message(t, version, compat_version) {} +}; + +#endif diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 0d2a03f271a..5c2db7b011b 100755 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -16,9 +16,9 @@ #ifndef CEPH_MOSDOP_H #define CEPH_MOSDOP_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" #include "include/ceph_features.h" +#include "common/hobject.h" #include <atomic> /* @@ -31,9 +31,9 @@ class OSD; -class MOSDOp : public Message { +class MOSDOp : public MOSDFastDispatchOp { - static const int HEAD_VERSION = 7; + static const int HEAD_VERSION = 8; static const int COMPAT_VERSION = 3; private: @@ -41,12 +41,10 @@ private: __u32 osdmap_epoch; __u32 flags; utime_t mtime; - eversion_t reassert_version; int32_t retry_attempt; // 0 is first attempt. -1 if we don't know. - object_t oid; - object_locator_t oloc; - pg_t pgid; + hobject_t hobj; + spg_t pgid; bufferlist::iterator p; // Decoding flags. Decoding is only needed for messages catched by pipe reader. // Transition from true -> false without locks being held @@ -57,8 +55,6 @@ private: public: vector<OSDOp> ops; private: - - snapid_t snapid; snapid_t snap_seq; vector<snapid_t> snaps; @@ -70,7 +66,9 @@ public: friend class MOSDOpReply; ceph_tid_t get_client_tid() { return header.tid; } - void set_snapid(const snapid_t& s) { snapid = s; } + void set_snapid(const snapid_t& s) { + hobj.snap = s; + } void set_snaps(const vector<snapid_t>& i) { snaps = i; } @@ -78,13 +76,24 @@ public: void set_reqid(const osd_reqid_t rid) { reqid = rid; } + void set_spg(spg_t p) { + pgid = p; + } // Fields decoded in partial decoding - const pg_t& get_pg() const { + pg_t get_pg() const { + assert(!partial_decode_needed); + return pgid.pgid; + } + spg_t get_spg() const override { assert(!partial_decode_needed); return pgid; } - epoch_t get_map_epoch() const { + pg_t get_raw_pg() const { + assert(!partial_decode_needed); + return pg_t(hobj.get_hash(), pgid.pgid.pool()); + } + epoch_t get_map_epoch() const override { assert(!partial_decode_needed); return osdmap_epoch; } @@ -92,10 +101,6 @@ public: assert(!partial_decode_needed); return flags; } - const eversion_t& get_version() const { - assert(!partial_decode_needed); - return reassert_version; - } osd_reqid_t get_reqid() const { assert(!partial_decode_needed); if (reqid.name != entity_name_t() || reqid.tid != 0) { @@ -118,17 +123,23 @@ public: assert(!final_decode_needed); return mtime; } - const object_locator_t& get_object_locator() const { + object_locator_t get_object_locator() const { assert(!final_decode_needed); - return oloc; + if (hobj.oid.name.empty()) + return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash()); + else + return object_locator_t(hobj); } object_t& get_oid() { assert(!final_decode_needed); - return oid; + return hobj.oid; + } + const hobject_t &get_hobj() const { + return hobj; } - const snapid_t& get_snapid() { + snapid_t get_snapid() { assert(!final_decode_needed); - return snapid; + return hobj.snap; } const snapid_t& get_snap_seq() const { assert(!final_decode_needed); @@ -156,17 +167,17 @@ public: } MOSDOp() - : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), partial_decode_needed(true), final_decode_needed(true) { } - MOSDOp(int inc, long tid, - object_t& _oid, object_locator_t& _oloc, pg_t& _pgid, + MOSDOp(int inc, long tid, const hobject_t& ho, spg_t& _pgid, epoch_t _osdmap_epoch, int _flags, uint64_t feat) - : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION), client_inc(inc), osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1), - oid(_oid), oloc(_oloc), pgid(_pgid), + hobj(ho), + pgid(_pgid), partial_decode_needed(false), final_decode_needed(false), features(feat) { @@ -180,7 +191,6 @@ private: ~MOSDOp() {} public: - void set_version(eversion_t v) { reassert_version = v; } void set_mtime(utime_t mt) { mtime = mt; } void set_mtime(ceph::real_time mt) { mtime = ceph::real_clock::to_timespec(mt); @@ -272,17 +282,17 @@ struct ceph_osd_request_head { ::encode(client_inc, payload); __u32 su = 0; - ::encode(pgid, payload); + ::encode(get_raw_pg(), payload); ::encode(su, payload); ::encode(osdmap_epoch, payload); ::encode(flags, payload); ::encode(mtime, payload); - ::encode(reassert_version, payload); + ::encode(eversion_t(), payload); // reassert_version - __u32 oid_len = oid.name.length(); + __u32 oid_len = hobj.oid.name.length(); ::encode(oid_len, payload); - ::encode(snapid, payload); + ::encode(hobj.snap, payload); ::encode(snap_seq, payload); __u32 num_snaps = snaps.size(); ::encode(num_snaps, payload); @@ -293,7 +303,7 @@ struct ceph_osd_request_head { for (unsigned i = 0; i < ops.size(); i++) ::encode(ops[i].op, payload); - ::encode_nohead(oid.name, payload); + ::encode_nohead(hobj.oid.name, payload); ::encode_nohead(snaps, payload); } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) { header.version = 6; @@ -301,18 +311,18 @@ struct ceph_osd_request_head { ::encode(osdmap_epoch, payload); ::encode(flags, payload); ::encode(mtime, payload); - ::encode(reassert_version, payload); - ::encode(oloc, payload); - ::encode(pgid, payload); + ::encode(eversion_t(), payload); // reassert_version + ::encode(get_object_locator(), payload); + ::encode(get_raw_pg(), payload); - ::encode(oid, payload); + ::encode(hobj.oid, payload); __u16 num_ops = ops.size(); ::encode(num_ops, payload); for (unsigned i = 0; i < ops.size(); i++) ::encode(ops[i].op, payload); - ::encode(snapid, payload); + ::encode(hobj.snap, payload); ::encode(snap_seq, payload); ::encode(snaps, payload); @@ -325,25 +335,50 @@ struct ceph_osd_request_head { // encoding or else we'll confuse older peers. ::encode(osd_reqid_t(), payload); } + } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) { + // reordered, v7 message encoding + header.version = 7; + ::encode(get_raw_pg(), payload); + ::encode(osdmap_epoch, payload); + ::encode(flags, payload); + ::encode(eversion_t(), payload); // reassert_version + ::encode(reqid, payload); + ::encode(client_inc, payload); + ::encode(mtime, payload); + ::encode(get_object_locator(), payload); + ::encode(hobj.oid, payload); + + __u16 num_ops = ops.size(); + ::encode(num_ops, payload); + for (unsigned i = 0; i < ops.size(); i++) + ::encode(ops[i].op, payload); + + ::encode(hobj.snap, payload); + ::encode(snap_seq, payload); + ::encode(snaps, payload); + + ::encode(retry_attempt, payload); + ::encode(features, payload); } else { - // new, reordered, v7 message encoding + // latest v8 encoding with hobject_t hash separate from pgid, no + // reassert version header.version = HEAD_VERSION; ::encode(pgid, payload); + ::encode(hobj.get_hash(), payload); ::encode(osdmap_epoch, payload); ::encode(flags, payload); - ::encode(reassert_version, payload); ::encode(reqid, payload); ::encode(client_inc, payload); ::encode(mtime, payload); - ::encode(oloc, payload); - ::encode(oid, payload); + ::encode(get_object_locator(), payload); + ::encode(hobj.oid, payload); __u16 num_ops = ops.size(); ::encode(num_ops, payload); for (unsigned i = 0; i < ops.size(); i++) ::encode(ops[i].op, payload); - ::encode(snapid, payload); + ::encode(hobj.snap, payload); ::encode(snap_seq, payload); ::encode(snaps, payload); @@ -358,31 +393,41 @@ struct ceph_osd_request_head { // Always keep here the newest version of decoding order/rule if (header.version == HEAD_VERSION) { - ::decode(pgid, p); - ::decode(osdmap_epoch, p); - ::decode(flags, p); - ::decode(reassert_version, p); - ::decode(reqid, p); + ::decode(pgid, p); // actual pgid + uint32_t hash; + ::decode(hash, p); // raw hash value + hobj.set_hash(hash); + ::decode(osdmap_epoch, p); + ::decode(flags, p); + ::decode(reqid, p); + } else if (header.version == 7) { + ::decode(pgid.pgid, p); // raw pgid + hobj.set_hash(pgid.pgid.ps()); + ::decode(osdmap_epoch, p); + ::decode(flags, p); + eversion_t reassert_version; + ::decode(reassert_version, p); + ::decode(reqid, p); } else if (header.version < 2) { // old decode ::decode(client_inc, p); old_pg_t opgid; ::decode_raw(opgid, p); - pgid = opgid; + pgid.pgid = opgid; __u32 su; ::decode(su, p); - oloc.pool = pgid.pool(); ::decode(osdmap_epoch, p); ::decode(flags, p); ::decode(mtime, p); + eversion_t reassert_version; ::decode(reassert_version, p); __u32 oid_len; ::decode(oid_len, p); - ::decode(snapid, p); + ::decode(hobj.snap, p); ::decode(snap_seq, p); __u32 num_snaps; ::decode(num_snaps, p); @@ -394,13 +439,15 @@ struct ceph_osd_request_head { for (unsigned i = 0; i < num_ops; i++) ::decode(ops[i].op, p); - decode_nohead(oid_len, oid.name, p); + decode_nohead(oid_len, hobj.oid.name, p); decode_nohead(num_snaps, snaps, p); // recalculate pgid hash value - pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS, - oid.name.c_str(), - oid.name.length())); + pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS, + hobj.oid.name.c_str(), + hobj.oid.name.length())); + hobj.pool = pgid.pgid.pool(); + hobj.set_hash(pgid.pgid.ps()); retry_attempt = -1; features = 0; @@ -417,19 +464,21 @@ struct ceph_osd_request_head { ::decode(osdmap_epoch, p); ::decode(flags, p); ::decode(mtime, p); + eversion_t reassert_version; ::decode(reassert_version, p); + object_locator_t oloc; ::decode(oloc, p); if (header.version < 3) { old_pg_t opgid; ::decode_raw(opgid, p); - pgid = opgid; + pgid.pgid = opgid; } else { ::decode(pgid, p); } - ::decode(oid, p); + ::decode(hobj.oid, p); //::decode(ops, p); __u16 num_ops; @@ -438,7 +487,7 @@ struct ceph_osd_request_head { for (unsigned i = 0; i < num_ops; i++) ::decode(ops[i].op, p); - ::decode(snapid, p); + ::decode(hobj.snap, p); ::decode(snap_seq, p); ::decode(snaps, p); @@ -457,6 +506,11 @@ struct ceph_osd_request_head { else reqid = osd_reqid_t(); + hobj.pool = pgid.pgid.pool(); + hobj.set_key(oloc.key); + hobj.nspace = oloc.nspace; + hobj.set_hash(pgid.pgid.ps()); + OSDOp::split_osd_op_vector_in_data(ops, data); // we did the full decode @@ -478,8 +532,9 @@ struct ceph_osd_request_head { ::decode(client_inc, p); ::decode(mtime, p); + object_locator_t oloc; ::decode(oloc, p); - ::decode(oid, p); + ::decode(hobj.oid, p); __u16 num_ops; ::decode(num_ops, p); @@ -487,7 +542,7 @@ struct ceph_osd_request_head { for (unsigned i = 0; i < num_ops; i++) ::decode(ops[i].op, p); - ::decode(snapid, p); + ::decode(hobj.snap, p); ::decode(snap_seq, p); ::decode(snaps, p); @@ -495,6 +550,10 @@ struct ceph_osd_request_head { ::decode(features, p); + hobj.pool = pgid.pgid.pool(); + hobj.set_key(oloc.key); + hobj.nspace = oloc.nspace; + OSDOp::split_osd_op_vector_in_data(ops, data); final_decode_needed = false; @@ -513,21 +572,15 @@ struct ceph_osd_request_head { out << pgid; if (!final_decode_needed) { out << ' '; - if (!oloc.nspace.empty()) - out << oloc.nspace << "/"; - out << oid + out << hobj << " " << ops << " snapc " << get_snap_seq() << "=" << snaps; - if (oloc.key.size()) - out << " " << oloc; if (is_retry_attempt()) out << " RETRY=" << get_retry_attempt(); } else { - out << " (undecoded)"; + out << " " << get_raw_pg() << " (undecoded)"; } out << " " << ceph_osd_flag_string(get_flags()); - if (reassert_version != eversion_t()) - out << " reassert_version=" << reassert_version; out << " e" << osdmap_epoch; } out << ")"; diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h index 1ea3b92ad9b..3d51d1c2116 100644 --- a/src/messages/MOSDOpReply.h +++ b/src/messages/MOSDOpReply.h @@ -130,7 +130,7 @@ public: } MOSDOpReply(MOSDOp *req, int r, epoch_t e, int acktype, bool ignore_out_data) : Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION), - oid(req->oid), pgid(req->pgid), ops(req->ops) { + oid(req->hobj.oid), pgid(req->pgid.pgid), ops(req->ops) { set_tid(req->get_tid()); result = r; diff --git a/src/messages/MOSDPGBackfill.h b/src/messages/MOSDPGBackfill.h index d4b65e82c29..3b099df5aad 100644 --- a/src/messages/MOSDPGBackfill.h +++ b/src/messages/MOSDPGBackfill.h @@ -15,10 +15,9 @@ #ifndef CEPH_MOSDPGBACKFILL_H #define CEPH_MOSDPGBACKFILL_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" -class MOSDPGBackfill : public Message { +class MOSDPGBackfill : public MOSDFastDispatchOp { static const int HEAD_VERSION = 3; static const int COMPAT_VERSION = 1; public: @@ -43,6 +42,13 @@ public: bool compat_stat_sum; pg_stat_t stats; + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(op, p); @@ -85,11 +91,11 @@ public: ::encode(pgid.shard, payload); } - MOSDPGBackfill() : - Message(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION), - compat_stat_sum(false) {} + MOSDPGBackfill() + : MOSDFastDispatchOp(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION), + compat_stat_sum(false) {} MOSDPGBackfill(__u32 o, epoch_t e, epoch_t qe, spg_t p) - : Message(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION), op(o), map_epoch(e), query_epoch(e), pgid(p), diff --git a/src/messages/MOSDPGMissing.h b/src/messages/MOSDPGMissing.h deleted file mode 100644 index b931b36353f..00000000000 --- a/src/messages/MOSDPGMissing.h +++ /dev/null @@ -1,57 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2010 Dreamhost - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - - -#ifndef CEPH_MOSDPGMISSING_H -#define CEPH_MOSDPGMISSING_H - -#include "msg/Message.h" - -class MOSDPGMissing : public Message { - epoch_t epoch; - -public: - pg_info_t info; - pg_missing_t missing; - - epoch_t get_epoch() { return epoch; } - - MOSDPGMissing() : Message(MSG_OSD_PG_MISSING) {} - MOSDPGMissing(version_t mv, const pg_info_t &info_, - const pg_missing_t &missing_) - : Message(MSG_OSD_PG_MISSING), epoch(mv), info(info_), - missing(missing_) { } -private: - ~MOSDPGMissing() {} - -public: - const char *get_type_name() const { return "pg_missing"; } - void print(ostream& out) const { - out << "pg_missing(" << info.pgid << " e" << epoch << ")"; - } - - void encode_payload(uint64_t features) { - ::encode(epoch, payload); - ::encode(info, payload); - ::encode(missing, payload); - } - void decode_payload() { - bufferlist::iterator p = payload.begin(); - ::decode(epoch, p); - ::decode(info, p); - missing.decode(p, info.pgid.pool()); - } -}; - -#endif diff --git a/src/messages/MOSDPGPull.h b/src/messages/MOSDPGPull.h index a6b748c8df6..c274e06ef97 100644 --- a/src/messages/MOSDPGPull.h +++ b/src/messages/MOSDPGPull.h @@ -15,10 +15,9 @@ #ifndef MOSDPGPULL_H #define MOSDPGPULL_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" -class MOSDPGPull : public Message { +class MOSDPGPull : public MOSDFastDispatchOp { static const int HEAD_VERSION = 2; static const int COMPAT_VERSION = 1; @@ -30,9 +29,16 @@ public: vector<PullOp> pulls; uint64_t cost; - MOSDPGPull() : - Message(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION), - cost(0) + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + + MOSDPGPull() + : MOSDFastDispatchOp(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION), + cost(0) {} void compute_cost(CephContext *cct) { diff --git a/src/messages/MOSDPGPush.h b/src/messages/MOSDPGPush.h index 9bf0470ed6a..b463b8ba00b 100644 --- a/src/messages/MOSDPGPush.h +++ b/src/messages/MOSDPGPush.h @@ -15,10 +15,9 @@ #ifndef MOSDPGPUSH_H #define MOSDPGPUSH_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" -class MOSDPGPush : public Message { +class MOSDPGPush : public MOSDFastDispatchOp { static const int HEAD_VERSION = 2; static const int COMPAT_VERSION = 1; @@ -46,13 +45,20 @@ public: return cost; } + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + void set_cost(uint64_t c) { cost = c; } - MOSDPGPush() : - Message(MSG_OSD_PG_PUSH, HEAD_VERSION, COMPAT_VERSION), - cost(0) + MOSDPGPush() + : MOSDFastDispatchOp(MSG_OSD_PG_PUSH, HEAD_VERSION, COMPAT_VERSION), + cost(0) {} virtual void decode_payload() { diff --git a/src/messages/MOSDPGPushReply.h b/src/messages/MOSDPGPushReply.h index 793709e1650..52e3ddd8a58 100644 --- a/src/messages/MOSDPGPushReply.h +++ b/src/messages/MOSDPGPushReply.h @@ -15,10 +15,9 @@ #ifndef MOSDPGPUSHREPLY_H #define MOSDPGPUSHREPLY_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" -class MOSDPGPushReply : public Message { +class MOSDPGPushReply : public MOSDFastDispatchOp { static const int HEAD_VERSION = 2; static const int COMPAT_VERSION = 1; @@ -29,9 +28,16 @@ public: vector<PushReplyOp> replies; uint64_t cost; - MOSDPGPushReply() : - Message(MSG_OSD_PG_PUSH_REPLY, HEAD_VERSION, COMPAT_VERSION), - cost(0) + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + + MOSDPGPushReply() + : MOSDFastDispatchOp(MSG_OSD_PG_PUSH_REPLY, HEAD_VERSION, COMPAT_VERSION), + cost(0) {} void compute_cost(CephContext *cct) { diff --git a/src/messages/MOSDPGScan.h b/src/messages/MOSDPGScan.h index f5fe18e523b..9f66bd0e480 100644 --- a/src/messages/MOSDPGScan.h +++ b/src/messages/MOSDPGScan.h @@ -15,10 +15,9 @@ #ifndef CEPH_MOSDPGSCAN_H #define CEPH_MOSDPGSCAN_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" -class MOSDPGScan : public Message { +class MOSDPGScan : public MOSDFastDispatchOp { static const int HEAD_VERSION = 2; static const int COMPAT_VERSION = 1; @@ -42,6 +41,13 @@ public: spg_t pgid; hobject_t begin, end; + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(op, p); @@ -79,10 +85,11 @@ public: ::encode(pgid.shard, payload); } - MOSDPGScan() : Message(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION) {} + MOSDPGScan() + : MOSDFastDispatchOp(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION) {} MOSDPGScan(__u32 o, pg_shard_t from, epoch_t e, epoch_t qe, spg_t p, hobject_t be, hobject_t en) - : Message(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION), op(o), map_epoch(e), query_epoch(e), from(from), diff --git a/src/messages/MOSDPGUpdateLogMissing.h b/src/messages/MOSDPGUpdateLogMissing.h index 55e50d6a462..b6a597705d5 100644 --- a/src/messages/MOSDPGUpdateLogMissing.h +++ b/src/messages/MOSDPGUpdateLogMissing.h @@ -16,9 +16,9 @@ #ifndef CEPH_MOSDPGUPDATELOGMISSING_H #define CEPH_MOSDPGUPDATELOGMISSING_H -#include "msg/Message.h" +#include "MOSDFastDispatchOp.h" -class MOSDPGUpdateLogMissing : public Message { +class MOSDPGUpdateLogMissing : public MOSDFastDispatchOp { static const int HEAD_VERSION = 1; static const int COMPAT_VERSION = 1; @@ -35,16 +35,24 @@ public: spg_t get_pgid() const { return pgid; } epoch_t get_query_epoch() const { return map_epoch; } ceph_tid_t get_tid() const { return rep_tid; } + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } - MOSDPGUpdateLogMissing() : - Message(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, COMPAT_VERSION) { } + MOSDPGUpdateLogMissing() + : MOSDFastDispatchOp(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, + COMPAT_VERSION) { } MOSDPGUpdateLogMissing( const mempool::osd::list<pg_log_entry_t> &entries, spg_t pgid, shard_id_t from, epoch_t epoch, ceph_tid_t rep_tid) - : Message(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, + COMPAT_VERSION), map_epoch(epoch), pgid(pgid), from(from), diff --git a/src/messages/MOSDPGUpdateLogMissingReply.h b/src/messages/MOSDPGUpdateLogMissingReply.h index d6ff1407147..34e8d9e6c1d 100644 --- a/src/messages/MOSDPGUpdateLogMissingReply.h +++ b/src/messages/MOSDPGUpdateLogMissingReply.h @@ -16,9 +16,9 @@ #ifndef CEPH_MOSDPGUPDATELOGMISSINGREPLY_H #define CEPH_MOSDPGUPDATELOGMISSINGREPLY_H -#include "msg/Message.h" +#include "MOSDFastDispatchOp.h" -class MOSDPGUpdateLogMissingReply : public Message { +class MOSDPGUpdateLogMissingReply : public MOSDFastDispatchOp { static const int HEAD_VERSION = 1; static const int COMPAT_VERSION = 1; @@ -37,9 +37,15 @@ public: pg_shard_t get_from() const { return pg_shard_t(get_source().num(), from); } + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } - MOSDPGUpdateLogMissingReply() : - Message( + MOSDPGUpdateLogMissingReply() + : MOSDFastDispatchOp( MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY, HEAD_VERSION, COMPAT_VERSION) @@ -49,7 +55,7 @@ public: shard_id_t from, epoch_t epoch, ceph_tid_t rep_tid) - : Message( + : MOSDFastDispatchOp( MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY, HEAD_VERSION, COMPAT_VERSION), diff --git a/src/messages/MOSDRepOp.h b/src/messages/MOSDRepOp.h index e6acca66309..de5e4755697 100644 --- a/src/messages/MOSDRepOp.h +++ b/src/messages/MOSDRepOp.h @@ -16,14 +16,13 @@ #ifndef CEPH_MOSDREPOP_H #define CEPH_MOSDREPOP_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" /* * OSD sub op - for internal ops on pobjects between primary and replicas(/stripes/whatever) */ -class MOSDRepOp : public Message { +class MOSDRepOp : public MOSDFastDispatchOp { static const int HEAD_VERSION = 1; static const int COMPAT_VERSION = 1; @@ -64,6 +63,13 @@ public: /// non-empty if this transaction involves a hit_set history update boost::optional<pg_hit_set_history_t> updated_hit_set_history; + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + int get_cost() const { return data.length(); } @@ -116,13 +122,13 @@ public: } MOSDRepOp() - : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION), map_epoch(0), final_decode_needed(true), acks_wanted (0) {} MOSDRepOp(osd_reqid_t r, pg_shard_t from, spg_t p, const hobject_t& po, int aw, epoch_t mape, ceph_tid_t rtid, eversion_t v) - : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION), map_epoch(mape), reqid(r), pgid(p), diff --git a/src/messages/MOSDRepOpReply.h b/src/messages/MOSDRepOpReply.h index 1632ffbf4b3..67c2d76bf73 100644 --- a/src/messages/MOSDRepOpReply.h +++ b/src/messages/MOSDRepOpReply.h @@ -16,7 +16,7 @@ #ifndef CEPH_MOSDREPOPREPLY_H #define CEPH_MOSDREPOPREPLY_H -#include "msg/Message.h" +#include "MOSDFastDispatchOp.h" #include "os/ObjectStore.h" @@ -28,7 +28,7 @@ * */ -class MOSDRepOpReply : public Message { +class MOSDRepOpReply : public MOSDFastDispatchOp { static const int HEAD_VERSION = 1; static const int COMPAT_VERSION = 1; public: @@ -50,6 +50,13 @@ public: // Decoding flags. Decoding is only needed for messages catched by pipe reader. bool final_decode_needed; + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + virtual void decode_payload() { p = payload.begin(); ::decode(map_epoch, p); @@ -93,7 +100,7 @@ public: public: MOSDRepOpReply( MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) : - Message(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION), + MOSDFastDispatchOp(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION), map_epoch(e), reqid(req->reqid), from(from), @@ -104,7 +111,8 @@ public: set_tid(req->get_tid()); } MOSDRepOpReply() - : Message(MSG_OSD_REPOPREPLY), map_epoch(0), + : MOSDFastDispatchOp(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION), + map_epoch(0), ack_type(0), result(0), final_decode_needed(true) {} private: diff --git a/src/messages/MOSDRepScrub.h b/src/messages/MOSDRepScrub.h index fb4080e7046..07e04543b26 100644 --- a/src/messages/MOSDRepScrub.h +++ b/src/messages/MOSDRepScrub.h @@ -16,13 +16,13 @@ #ifndef CEPH_MOSDREPSCRUB_H #define CEPH_MOSDREPSCRUB_H -#include "msg/Message.h" +#include "MOSDFastDispatchOp.h" /* * instruct an OSD initiate a replica scrub on a specific PG */ -struct MOSDRepScrub : public Message { +struct MOSDRepScrub : public MOSDFastDispatchOp { static const int HEAD_VERSION = 6; static const int COMPAT_VERSION = 2; @@ -37,15 +37,22 @@ struct MOSDRepScrub : public Message { bool deep; // true if scrub should be deep uint32_t seed; // seed value for digest calculation + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + MOSDRepScrub() - : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION), chunky(false), deep(false), seed(0) { } MOSDRepScrub(spg_t pgid, eversion_t scrub_to, epoch_t map_epoch, hobject_t start, hobject_t end, bool deep, uint32_t seed) - : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION), pgid(pgid), scrub_to(scrub_to), map_epoch(map_epoch), diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h index cfc3afc4af4..bfd8d4dbe31 100644 --- a/src/messages/MOSDSubOp.h +++ b/src/messages/MOSDSubOp.h @@ -16,8 +16,7 @@ #ifndef CEPH_MOSDSUBOP_H #define CEPH_MOSDSUBOP_H -#include "msg/Message.h" -#include "osd/osd_types.h" +#include "MOSDFastDispatchOp.h" #include "include/ceph_features.h" @@ -25,7 +24,7 @@ * OSD sub op - for internal ops on pobjects between primary and replicas(/stripes/whatever) */ -class MOSDSubOp : public Message { +class MOSDSubOp : public MOSDFastDispatchOp { static const int HEAD_VERSION = 12; static const int COMPAT_VERSION = 7; @@ -93,6 +92,13 @@ public: /// non-empty if this transaction involves a hit_set history update boost::optional<pg_hit_set_history_t> updated_hit_set_history; + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + int get_cost() const { if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL) return ops[0].op.extent.length; @@ -239,11 +245,11 @@ public: } MOSDSubOp() - : Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION) { } + : MOSDFastDispatchOp(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION) { } MOSDSubOp(osd_reqid_t r, pg_shard_t from, spg_t p, const hobject_t& po, int aw, epoch_t mape, ceph_tid_t rtid, eversion_t v) - : Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION), + : MOSDFastDispatchOp(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION), map_epoch(mape), reqid(r), from(from), diff --git a/src/messages/MOSDSubOpReply.h b/src/messages/MOSDSubOpReply.h index 81d1b2836de..aa300efc931 100644 --- a/src/messages/MOSDSubOpReply.h +++ b/src/messages/MOSDSubOpReply.h @@ -16,7 +16,7 @@ #ifndef CEPH_MOSDSUBOPREPLY_H #define CEPH_MOSDSUBOPREPLY_H -#include "msg/Message.h" +#include "MOSDFastDispatchOp.h" #include "MOSDSubOp.h" #include "os/ObjectStore.h" @@ -29,7 +29,7 @@ * */ -class MOSDSubOpReply : public Message { +class MOSDSubOpReply : public MOSDFastDispatchOp { static const int HEAD_VERSION = 2; static const int COMPAT_VERSION = 1; public: @@ -53,6 +53,13 @@ public: map<string,bufferptr> attrset; + epoch_t get_map_epoch() const override { + return map_epoch; + } + spg_t get_spg() const override { + return pgid; + } + virtual void decode_payload() { bufferlist::iterator p = payload.begin(); ::decode(map_epoch, p); @@ -129,20 +136,21 @@ public: public: MOSDSubOpReply( - MOSDSubOp *req, pg_shard_t from, int result_, epoch_t e, int at) : - Message(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION), - map_epoch(e), - reqid(req->reqid), - from(from), - pgid(req->pgid.pgid, req->from.shard), - poid(req->poid), - ops(req->ops), - ack_type(at), - result(result_) { + MOSDSubOp *req, pg_shard_t from, int result_, epoch_t e, int at) + : MOSDFastDispatchOp(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION), + map_epoch(e), + reqid(req->reqid), + from(from), + pgid(req->pgid.pgid, req->from.shard), + poid(req->poid), + ops(req->ops), + ack_type(at), + result(result_) { memset(&peer_stat, 0, sizeof(peer_stat)); set_tid(req->get_tid()); } - MOSDSubOpReply() : Message(MSG_OSD_SUBOPREPLY) {} + MOSDSubOpReply() + : MOSDFastDispatchOp(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION) {} private: ~MOSDSubOpReply() {} diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index ee26e24c32d..df21c2d1c40 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -5222,6 +5222,9 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap, return -E2BIG; } p.set_pg_num(n); + // force pre-luminous clients to resend their ops, since they + // don't understand that split PGs now form a new interval. + p.last_force_op_resend_preluminous = pending_inc.epoch; } else if (var == "pgp_num") { if (p.has_flag(pg_pool_t::FLAG_NOPGCHANGE)) { ss << "pool pgp_num change is disabled; you must unset nopgchange flag for the pool first"; @@ -7520,9 +7523,9 @@ done: pg_pool_t *np = pending_inc.get_new_pool(pool_id, p); np->read_tier = overlaypool_id; np->write_tier = overlaypool_id; - np->last_force_op_resend = pending_inc.epoch; + np->set_last_force_op_resend(pending_inc.epoch); pg_pool_t *noverlay_p = pending_inc.get_new_pool(overlaypool_id, overlay_p); - noverlay_p->last_force_op_resend = pending_inc.epoch; + noverlay_p->set_last_force_op_resend(pending_inc.epoch); ss << "overlay for '" << poolstr << "' is now (or already was) '" << overlaypoolstr << "'"; if (overlay_p->cache_mode == pg_pool_t::CACHEMODE_NONE) ss <<" (WARNING: overlay pool cache_mode is still NONE)"; @@ -7556,16 +7559,16 @@ done: if (np->has_read_tier()) { const pg_pool_t *op = osdmap.get_pg_pool(np->read_tier); pg_pool_t *nop = pending_inc.get_new_pool(np->read_tier,op); - nop->last_force_op_resend = pending_inc.epoch; + nop->set_last_force_op_resend(pending_inc.epoch); } if (np->has_write_tier()) { const pg_pool_t *op = osdmap.get_pg_pool(np->write_tier); pg_pool_t *nop = pending_inc.get_new_pool(np->write_tier, op); - nop->last_force_op_resend = pending_inc.epoch; + nop->set_last_force_op_resend(pending_inc.epoch); } np->clear_read_tier(); np->clear_write_tier(); - np->last_force_op_resend = pending_inc.epoch; + np->set_last_force_op_resend(pending_inc.epoch); ss << "there is now (or already was) no overlay for '" << poolstr << "'"; wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, ss.str(), get_last_committed() + 1)); @@ -7799,8 +7802,8 @@ done: np->tiers.insert(tierpool_id); np->read_tier = np->write_tier = tierpool_id; np->set_snap_epoch(pending_inc.epoch); // tier will update to our snap info - np->last_force_op_resend = pending_inc.epoch; - ntp->last_force_op_resend = pending_inc.epoch; + np->set_last_force_op_resend(pending_inc.epoch); + ntp->set_last_force_op_resend(pending_inc.epoch); ntp->tier_of = pool_id; ntp->cache_mode = mode; ntp->hit_set_count = g_conf->osd_tier_default_cache_hit_set_count; diff --git a/src/msg/Message.cc b/src/msg/Message.cc index a226ecbca72..26c91eef093 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -77,7 +77,6 @@ using namespace std; #include "messages/MOSDPGInfo.h" #include "messages/MOSDPGCreate.h" #include "messages/MOSDPGTrim.h" -#include "messages/MOSDPGMissing.h" #include "messages/MOSDScrub.h" #include "messages/MOSDRepScrub.h" #include "messages/MOSDPGScan.h" @@ -501,9 +500,6 @@ Message *decode_message(CephContext *cct, int crcflags, case MSG_REMOVE_SNAPS: m = new MRemoveSnaps; break; - case MSG_OSD_PG_MISSING: - m = new MOSDPGMissing; - break; case MSG_OSD_REP_SCRUB: m = new MOSDRepScrub; break; diff --git a/src/msg/Message.h b/src/msg/Message.h index 680f4a0aeae..e103f793668 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -85,7 +85,7 @@ #define MSG_REMOVE_SNAPS 90 #define MSG_OSD_SCRUB 91 -#define MSG_OSD_PG_MISSING 92 +//#define MSG_OSD_PG_MISSING 92 // obsolete #define MSG_OSD_REP_SCRUB 93 #define MSG_OSD_PG_SCAN 94 diff --git a/src/os/filestore/JournalingObjectStore.h b/src/os/filestore/JournalingObjectStore.h index 8b0d3eb193c..10b66764dd2 100644 --- a/src/os/filestore/JournalingObjectStore.h +++ b/src/os/filestore/JournalingObjectStore.h @@ -19,6 +19,7 @@ #include "Journal.h" #include "FileJournal.h" #include "common/RWLock.h" +#include "osd/OpRequest.h" class JournalingObjectStore : public ObjectStore { protected: diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 7794f162457..d7812ac36b0 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -85,7 +85,6 @@ #include "messages/MOSDPGTrim.h" #include "messages/MOSDPGScan.h" #include "messages/MOSDPGBackfill.h" -#include "messages/MOSDPGMissing.h" #include "messages/MBackfillReserve.h" #include "messages/MRecoveryReserve.h" #include "messages/MOSDECSubOpWrite.h" @@ -1424,7 +1423,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) << m->get_map_epoch() << ", dropping" << dendl; return; } - pg_t _pgid = m->get_pg(); + pg_t _pgid = m->get_raw_pg(); spg_t pgid; if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) _pgid = opmap->raw_pg_to_pg(_pgid); @@ -1438,7 +1437,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl; clog->warn() << m->get_source_inst() << " misdirected " << m->get_reqid() - << " pg " << m->get_pg() + << " pg " << m->get_raw_pg() << " to osd." << whoami << " not " << pg->acting << " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch() @@ -4949,7 +4948,7 @@ bool OSD::ms_handle_reset(Connection *con) session->con.reset(NULL); // break con <-> session ref cycle // note that we break session->con *before* the session_handle_reset // cleanup below. this avoids a race between us and - // PG::add_backoff, split_backoff, etc. + // PG::add_backoff, Session::check_backoff, etc. session_handle_reset(session); session->put(); return true; @@ -6311,7 +6310,7 @@ epoch_t op_required_epoch(OpRequestRef op) } case CEPH_MSG_OSD_BACKOFF: { MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req()); - return m->osd_epoch; + return m->map_epoch; } case MSG_OSD_SUBOP: return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op); @@ -6386,11 +6385,6 @@ void OSD::dispatch_op(OpRequestRef op) case MSG_OSD_PG_TRIM: handle_pg_trim(op); break; - case MSG_OSD_PG_MISSING: - assert(0 == - "received MOSDPGMissing; this message is supposed to be unused!?!"); - break; - case MSG_OSD_BACKFILL_RESERVE: handle_pg_backfill_reserve(op); break; @@ -6527,7 +6521,15 @@ void OSD::_dispatch(Message *m) // -- need OSDMap -- - default: + case MSG_OSD_PG_CREATE: + case MSG_OSD_PG_NOTIFY: + case MSG_OSD_PG_QUERY: + case MSG_OSD_PG_LOG: + case MSG_OSD_PG_REMOVE: + case MSG_OSD_PG_INFO: + case MSG_OSD_PG_TRIM: + case MSG_OSD_BACKFILL_RESERVE: + case MSG_OSD_RECOVERY_RESERVE: { OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m); // no map? starting up? @@ -7534,7 +7536,7 @@ void OSD::consume_map() for (set<spg_t>::iterator p = pgs_to_check.begin(); p != pgs_to_check.end(); ++p) { - if (!(osdmap->is_acting_osd_shard(p->pgid, whoami, p->shard))) { + if (!(osdmap->is_acting_osd_shard(spg_t(p->pgid, p->shard), whoami))) { set<Session*> concerned_sessions; get_sessions_possibly_interested_in_pg(*p, &concerned_sessions); for (set<Session*>::iterator i = concerned_sessions.begin(); @@ -8781,16 +8783,8 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap) client_session->put(); } - if (cct->_conf->osd_debug_drop_op_probability > 0 && - !m->get_source().is_mds()) { - if ((double)rand() / (double)RAND_MAX < cct->_conf->osd_debug_drop_op_probability) { - dout(0) << "handle_op DEBUG artificially dropping op " << *m << dendl; - return; - } - } - // calc actual pgid - pg_t _pgid = m->get_pg(); + pg_t _pgid = m->get_raw_pg(); int64_t pool = _pgid.pool(); if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0 && @@ -8828,7 +8822,7 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap) if (!send_map->have_pg_pool(pgid.pool())) { dout(7) << "dropping request; pool did not exist" << dendl; clog->warn() << m->get_source_inst() << " invalid " << m->get_reqid() - << " pg " << m->get_pg() + << " pg " << m->get_raw_pg() << " to osd." << whoami << " in e" << osdmap->get_epoch() << ", client e" << m->get_map_epoch() @@ -8839,7 +8833,7 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap) if (!send_map->osd_is_valid_op_target(pgid.pgid, whoami)) { dout(7) << "we are invalid target" << dendl; clog->warn() << m->get_source_inst() << " misdirected " << m->get_reqid() - << " pg " << m->get_pg() + << " pg " << m->get_raw_pg() << " to osd." << whoami << " in e" << osdmap->get_epoch() << ", client e" << m->get_map_epoch() @@ -8875,38 +8869,10 @@ void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap) } // map hobject range to PG(s) - bool queued = false; - hobject_t pos = m->begin; - do { - pg_t _pgid(pos.get_hash(), pos.pool); - if (osdmap->have_pg_pool(pos.pool)) { - _pgid = osdmap->raw_pg_to_pg(_pgid); - } - if (!osdmap->have_pg_pool(_pgid.pool())) { - // missing pool -- drop - return; - } - spg_t pgid; - if (osdmap->get_primary_shard(_pgid, &pgid)) { - dout(10) << __func__ << " pos " << pos << " pgid " << pgid << dendl; - PGRef pg = get_pg_or_queue_for_pg(pgid, op, s); - if (pg) { - if (!queued) { - enqueue_op(pg, op); - queued = true; - } else { - // use a fresh OpRequest - m->get(); // take a ref for the new OpRequest - OpRequestRef newop(op_tracker.create_request<OpRequest, Message*>(m)); - newop->mark_event("duplicated original op for another pg"); - enqueue_op(pg, newop); - } - } - } - // advance - pos = _pgid.get_hobj_end(osdmap->get_pg_pool(pos.pool)->get_pg_num()); - dout(20) << __func__ << " next pg " << pos << dendl; - } while (pos < m->end); + PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, s); + if (pg) { + enqueue_op(pg, op); + } } template<typename T, int MSGTYPE> diff --git a/src/osd/OSD.h b/src/osd/OSD.h index e2d35f01d32..3beb717e419 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -213,7 +213,6 @@ class ObjectStore; class FuseStore; class OSDMap; class MLog; -class MOSDPGMissing; class Objecter; class Watch; @@ -1406,7 +1405,7 @@ public: switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: return (static_cast<MOSDOp*>( - op->get_req())->get_pg().m_seed & mask) == match; + op->get_req())->get_raw_pg().m_seed & mask) == match; } return false; } diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc index f43f1eea574..23d54ddc535 100644 --- a/src/osd/OSDMap.cc +++ b/src/osd/OSDMap.cc @@ -1484,26 +1484,37 @@ int OSDMap::apply_incremental(const Incremental &inc) } // mapping -int OSDMap::object_locator_to_pg( - const object_t& oid, - const object_locator_t& loc, - pg_t &pg) const +int OSDMap::map_to_pg( + int64_t poolid, + const string& name, + const string& key, + const string& nspace, + pg_t *pg) const { // calculate ps (placement seed) - const pg_pool_t *pool = get_pg_pool(loc.get_pool()); + const pg_pool_t *pool = get_pg_pool(poolid); if (!pool) return -ENOENT; ps_t ps; + if (!key.empty()) + ps = pool->hash_key(key, nspace); + else + ps = pool->hash_key(name, nspace); + *pg = pg_t(ps, poolid); + return 0; +} + +int OSDMap::object_locator_to_pg( + const object_t& oid, const object_locator_t& loc, pg_t &pg) const +{ if (loc.hash >= 0) { - ps = loc.hash; - } else { - if (!loc.key.empty()) - ps = pool->hash_key(loc.key, loc.nspace); - else - ps = pool->hash_key(oid.name, loc.nspace); + if (!get_pg_pool(loc.get_pool())) { + return -ENOENT; + } + pg = pg_t(loc.hash, loc.get_pool()); + return 0; } - pg = pg_t(ps, loc.get_pool(), -1); - return 0; + return map_to_pg(loc.get_pool(), oid.name, loc.key, loc.nspace, &pg); } ceph_object_layout OSDMap::make_object_layout( @@ -1714,11 +1725,14 @@ void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const _apply_primary_affinity(pps, *pool, up, primary); } -void OSDMap::_pg_to_up_acting_osds(const pg_t& pg, vector<int> *up, int *up_primary, - vector<int> *acting, int *acting_primary) const +void OSDMap::_pg_to_up_acting_osds( + const pg_t& pg, vector<int> *up, int *up_primary, + vector<int> *acting, int *acting_primary, + bool raw_pg_to_pg) const { const pg_pool_t *pool = get_pg_pool(pg.pool()); - if (!pool) { + if (!pool || + (!raw_pg_to_pg && pg.ps() >= pool->get_pg_num())) { if (up) up->clear(); if (up_primary) diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index 1f9307ee940..7522d7f62f5 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -587,14 +587,23 @@ public: /**** mapping facilities ****/ - int object_locator_to_pg(const object_t& oid, const object_locator_t& loc, pg_t &pg) const; - pg_t object_locator_to_pg(const object_t& oid, const object_locator_t& loc) const { + int map_to_pg( + int64_t pool, + const string& name, + const string& key, + const string& nspace, + pg_t *pg) const; + int object_locator_to_pg(const object_t& oid, const object_locator_t& loc, + pg_t &pg) const; + pg_t object_locator_to_pg(const object_t& oid, + const object_locator_t& loc) const { pg_t pg; int ret = object_locator_to_pg(oid, loc, pg); assert(ret == 0); return pg; } + static object_locator_t file_to_object_locator(const file_layout_t& layout) { return object_locator_t(layout.pool_id, layout.pool_ns); } @@ -647,7 +656,8 @@ private: * map to up and acting. Fills in whatever fields are non-NULL. */ void _pg_to_up_acting_osds(const pg_t& pg, vector<int> *up, int *up_primary, - vector<int> *acting, int *acting_primary) const; + vector<int> *acting, int *acting_primary, + bool raw_pg_to_pg = true) const; public: /*** @@ -769,14 +779,17 @@ public: return -1; // we fail! } - bool is_acting_osd_shard(pg_t pg, int osd, shard_id_t shard) const { + /* + * check whether an spg_t maps to a particular osd + */ + bool is_acting_osd_shard(spg_t pg, int osd) const { vector<int> acting; - int nrep = pg_to_acting_osds(pg, acting); - if (shard == shard_id_t::NO_SHARD) - return calc_pg_role(osd, acting, nrep) >= 0; - if (shard >= (int)acting.size()) + _pg_to_up_acting_osds(pg.pgid, NULL, NULL, &acting, NULL, false); + if (pg.shard == shard_id_t::NO_SHARD) + return calc_pg_role(osd, acting, acting.size()) >= 0; + if (pg.shard >= (int)acting.size()) return false; - return acting[shard] == osd; + return acting[pg.shard] == osd; } diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index 65b8576052c..ef7986d4721 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -22,36 +22,10 @@ #include "include/xlist.h" #include "msg/Message.h" #include "include/memory.h" +#include "osd/osd_types.h" #include "common/TrackedOp.h" /** - * osd request identifier - * - * caller name + incarnation# + tid to unique identify this request. - */ -struct osd_reqid_t { - entity_name_t name; // who - ceph_tid_t tid; - int32_t inc; // incarnation - - osd_reqid_t() - : tid(0), inc(0) {} - osd_reqid_t(const entity_name_t& a, int i, ceph_tid_t t) - : name(a), tid(t), inc(i) {} - - DENC(osd_reqid_t, v, p) { - DENC_START(2, 2, p); - denc(v.name, p); - denc(v.tid, p); - denc(v.inc, p); - DENC_FINISH(p); - } - void dump(Formatter *f) const; - static void generate_test_instances(list<osd_reqid_t*>& o); -}; -WRITE_CLASS_DENC(osd_reqid_t) - -/** * The OpRequest takes in a Message* and takes over a single reference * to it, which it puts() when destroyed. */ diff --git a/src/osd/PG.cc b/src/osd/PG.cc index f28cb34987c..c47dac0b649 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1862,18 +1862,18 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op) OSDCap& caps = session->caps; session->put(); - const string &key = req->get_object_locator().key.empty() ? - req->get_oid().name : - req->get_object_locator().key; + const string &key = req->get_hobj().get_key().empty() ? + req->get_oid().name : + req->get_hobj().get_key(); - bool cap = caps.is_capable(pool.name, req->get_object_locator().nspace, + bool cap = caps.is_capable(pool.name, req->get_hobj().nspace, pool.auid, key, op->need_read_cap(), op->need_write_cap(), op->classes()); dout(20) << "op_has_sufficient_caps pool=" << pool.id << " (" << pool.name - << " " << req->get_object_locator().nspace + << " " << req->get_hobj().nspace << ") owner=" << pool.auid << " need_read_cap=" << op->need_read_cap() << " need_write_cap=" << op->need_write_cap() @@ -2210,27 +2210,6 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue) } } -void PG::split_ops(PG *child, unsigned split_bits) { - unsigned match = child->info.pgid.ps(); - assert(waiting_for_all_missing.empty()); - assert(waiting_for_cache_not_full.empty()); - assert(waiting_for_unreadable_object.empty()); - assert(waiting_for_degraded_object.empty()); - assert(waiting_for_ondisk.empty()); - assert(waiting_for_active.empty()); - assert(waiting_for_scrub.empty()); - - osd->dequeue_pg(this, &waiting_for_peered); - - OSD::split_list( - &waiting_for_peered, &(child->waiting_for_peered), match, split_bits); - { - Mutex::Locker l(map_lock); // to avoid a race with the osd dispatch - OSD::split_list( - &waiting_for_map, &(child->waiting_for_map), match, split_bits); - } -} - void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) { child->update_snap_mapper_bits(split_bits); @@ -2304,14 +2283,10 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits) // History child->past_intervals = past_intervals; - split_ops(child, split_bits); _split_into(child_pgid, child, split_bits); - // release all backoffs so that Objecter doesn't need to handle unblock - // on split backoffs + // release all backoffs for simplicity release_backoffs(hobject_t(), hobject_t::get_max()); - // split any remaining (deleting) backoffs among child PGs - split_backoffs(child, split_bits); child->on_new_interval(); @@ -2326,7 +2301,7 @@ void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end) ConnectionRef con = s->con; if (!con) // OSD::ms_handle_reset clears s->con without a lock return; - Backoff *b = s->have_backoff(begin); + BackoffRef b(s->have_backoff(info.pgid, begin)); if (b) { derr << __func__ << " already have backoff for " << s << " begin " << begin << " " << *b << dendl; @@ -2334,24 +2309,19 @@ void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end) } Mutex::Locker l(backoff_lock); { - Mutex::Locker l(s->backoff_lock); - b = new Backoff(this, s, ++s->backoff_seq, begin, end); - auto& ls = s->backoffs[begin]; - if (ls.empty()) { - ++s->backoff_count; - } - assert(s->backoff_count == (int)s->backoffs.size()); - ls.insert(b); + b = new Backoff(info.pgid, this, s, ++s->backoff_seq, begin, end); backoffs[begin].insert(b); + s->add_backoff(b); dout(10) << __func__ << " session " << s << " added " << *b << dendl; } con->send_message( new MOSDBackoff( + info.pgid, + get_osdmap()->get_epoch(), CEPH_OSD_BACKOFF_OP_BLOCK, b->id, begin, - end, - get_osdmap()->get_epoch())); + end)); } void PG::release_backoffs(const hobject_t& begin, const hobject_t& end) @@ -2398,11 +2368,12 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end) if (con) { // OSD::ms_handle_reset clears s->con without a lock con->send_message( new MOSDBackoff( + info.pgid, + get_osdmap()->get_epoch(), CEPH_OSD_BACKOFF_OP_UNBLOCK, b->id, b->begin, - b->end, - get_osdmap()->get_epoch())); + b->end)); } if (b->is_new()) { b->state = Backoff::STATE_DELETING; @@ -2415,71 +2386,6 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end) } } -void PG::split_backoffs(PG *child, unsigned split_bits) -{ - dout(10) << __func__ << " split_bits " << split_bits << " child " - << child->info.pgid.pgid << dendl; - unsigned mask = ~((~0)<<split_bits); - vector<BackoffRef> backoffs_to_dup; // pg backoffs - vector<BackoffRef> backoffs_to_move; // oid backoffs - { - Mutex::Locker l(backoff_lock); - auto p = backoffs.begin(); - while (p != backoffs.end()) { - if (p->first == info.pgid.pgid.get_hobj_start()) { - // if it is a full PG we always dup it for the child. - for (auto& q : p->second) { - dout(10) << __func__ << " pg backoff " << p->first - << " " << q << dendl; - backoffs_to_dup.push_back(q); - } - } else { - // otherwise, we move it to the child only if falls into the - // childs hash range. - if ((p->first.get_hash() & mask) == child->info.pgid.pgid.ps()) { - for (auto& q : p->second) { - dout(20) << __func__ << " will move " << p->first - << " " << q << dendl; - backoffs_to_move.push_back(q); - } - p = backoffs.erase(p); - continue; - } else { - dout(20) << __func__ << " will not move " << p->first - << " " << p->second << dendl; - } - } - ++p; - } - } - for (auto b : backoffs_to_dup) { - SessionRef s; - { - Mutex::Locker l(b->lock); - b->end = info.pgid.pgid.get_hobj_end(split_bits); - dout(10) << __func__ << " pg backoff " << *b << dendl; - s = b->session; - } - if (s) { - child->add_pg_backoff(b->session); - } else { - dout(20) << __func__ << " didn't dup pg backoff, session is null" - << dendl; - } - } - for (auto b : backoffs_to_move) { - Mutex::Locker l(b->lock); - if (b->pg == this) { - dout(10) << __func__ << " move backoff " << *b << " to child" << dendl; - b->pg = child; - child->backoffs[b->begin].insert(b); - } else { - dout(20) << __func__ << " move backoff " << *b << " nowhere... pg is null" - << dendl; - } - } -} - void PG::clear_backoffs() { dout(10) << __func__ << " " << dendl; @@ -5211,6 +5117,11 @@ void PG::start_peering_interval( dirty_info = true; dirty_big_info = true; info.history.same_interval_since = osdmap->get_epoch(); + if (info.pgid.pgid.is_split(lastmap->get_pg_num(info.pgid.pgid.pool()), + osdmap->get_pg_num(info.pgid.pgid.pool()), + nullptr)) { + info.history.last_epoch_split = osdmap->get_epoch(); + } } } @@ -5468,11 +5379,24 @@ bool PG::can_discard_op(OpRequestRef& op) return true; } - if (m->get_map_epoch() < pool.info.last_force_op_resend && - m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) { - dout(7) << __func__ << " sent before last_force_op_resend " - << pool.info.last_force_op_resend << ", dropping" << *m << dendl; - return true; + if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) { + if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) { + dout(7) << __func__ << " sent before last_force_op_resend " + << pool.info.last_force_op_resend << ", dropping" << *m << dendl; + return true; + } + if (m->get_map_epoch() < info.history.last_epoch_split) { + dout(7) << __func__ << " pg split in " + << info.history.last_epoch_split << ", dropping" << dendl; + return true; + } + } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) { + if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) { + dout(7) << __func__ << " sent before last_force_op_resend_preluminous " + << pool.info.last_force_op_resend_preluminous + << ", dropping" << *m << dendl; + return true; + } } return false; diff --git a/src/osd/PG.h b/src/osd/PG.h index ab81b9a9696..1944bbc1bcc 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -840,8 +840,6 @@ protected: map<eversion_t, list<pair<OpRequestRef, version_t> > > waiting_for_ondisk; - void split_ops(PG *child, unsigned split_bits); - void requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m); void requeue_op(OpRequestRef op); void requeue_ops(list<OpRequestRef> &l); @@ -1076,7 +1074,6 @@ public: release_backoffs(o, o); } void clear_backoffs(); - void split_backoffs(PG *child, unsigned split_bits); void add_pg_backoff(SessionRef s) { hobject_t begin = info.pgid.pgid.get_hobj_start(); diff --git a/src/osd/PGTransaction.h b/src/osd/PGTransaction.h index 2e1013bca86..a77a9dae78f 100644 --- a/src/osd/PGTransaction.h +++ b/src/osd/PGTransaction.h @@ -20,6 +20,7 @@ #include "common/hobject.h" #include "osd/osd_types.h" +#include "osd/osd_internal_types.h" #include "common/interval_map.h" #include "common/inline_variant.h" diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 2af4976b16f..daf667aa959 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -1208,8 +1208,8 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op) continue; // skip wrong namespace - if (m->get_object_locator().nspace != librados::all_nspaces && - candidate.get_namespace() != m->get_object_locator().nspace) + if (m->get_hobj().nspace != librados::all_nspaces && + candidate.get_namespace() != m->get_hobj().nspace) continue; if (filter && !pgls_filter(filter, candidate, filter_out)) @@ -1379,7 +1379,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op) } // skip wrong namespace - if (candidate.get_namespace() != m->get_object_locator().nspace) + if (candidate.get_namespace() != m->get_hobj().nspace) continue; if (filter && !pgls_filter(filter, candidate, filter_out)) @@ -1598,7 +1598,7 @@ void PrimaryLogPG::handle_backoff(OpRequestRef& op) } dout(10) << __func__ << " backoff ack id " << m->id << " [" << begin << "," << end << ")" << dendl; - session->ack_backoff(cct, m->id, begin, end); + session->ack_backoff(cct, m->pgid, m->id, begin, end); } void PrimaryLogPG::do_request( @@ -1619,10 +1619,8 @@ void PrimaryLogPG::do_request( session->put(); // get_priv takes a ref, and so does the SessionRef if (op->get_req()->get_type() == CEPH_MSG_OSD_OP) { - Backoff *b = session->have_backoff(info.pgid.pgid.get_hobj_start()); - if (b) { - dout(10) << " have backoff " << *b << " " << *m << dendl; - assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff); + if (session->check_backoff(cct, info.pgid, + info.pgid.pgid.get_hobj_start(), m)) { return; } @@ -1630,7 +1628,7 @@ void PrimaryLogPG::do_request( is_down() || is_incomplete() || (!is_active() && is_peered()); - if (g_conf->osd_peering_aggressive_backoff && !backoff) { + if (g_conf->osd_backoff_on_peering && !backoff) { if (is_peering()) { backoff = true; } @@ -1767,9 +1765,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op) dout(20) << __func__ << ": op " << *m << dendl; - hobject_t head(m->get_oid(), m->get_object_locator().key, - CEPH_NOSNAP, m->get_pg().ps(), - info.pgid.pool(), m->get_object_locator().nspace); + hobject_t head = m->get_hobj(); + head.snap = CEPH_NOSNAP; bool can_backoff = m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF); @@ -1782,10 +1779,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op) } session->put(); // get_priv() takes a ref, and so does the intrusive_ptr - Backoff *b = session->have_backoff(head); - if (b) { - dout(10) << __func__ << " have backoff " << *b << " " << *m << dendl; - assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff); + if (session->check_backoff(cct, info.pgid, head, m)) { return; } } @@ -1843,16 +1837,14 @@ void PrimaryLogPG::do_op(OpRequestRef& op) osd->reply_op_error(op, -ENAMETOOLONG); return; } - if (m->get_object_locator().key.size() > - cct->_conf->osd_max_object_name_len) { + if (m->get_hobj().get_key().size() > cct->_conf->osd_max_object_name_len) { dout(4) << "do_op locator is longer than " << cct->_conf->osd_max_object_name_len << " bytes" << dendl; osd->reply_op_error(op, -ENAMETOOLONG); return; } - if (m->get_object_locator().nspace.size() > - cct->_conf->osd_max_object_namespace_len) { + if (m->get_hobj().nspace.size() > cct->_conf->osd_max_object_namespace_len) { dout(4) << "do_op namespace is longer than " << cct->_conf->osd_max_object_namespace_len << " bytes" << dendl; @@ -1934,8 +1926,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op) // missing object? if (is_unreadable_object(head)) { if (can_backoff && - (g_conf->osd_recovery_aggressive_backoff || - missing_loc.is_unfound(head))) { + (g_conf->osd_backoff_on_degraded || + (g_conf->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) { add_backoff(session, head, head); maybe_kick_recovery(head); } else { @@ -1946,7 +1938,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op) // degraded object? if (write_ordered && is_degraded_or_backfilling_object(head)) { - if (can_backoff && g_conf->osd_recovery_aggressive_backoff) { + if (can_backoff && g_conf->osd_backoff_on_degraded) { add_backoff(session, head, head); } else { wait_for_degraded_object(head, op); @@ -2036,12 +2028,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op) ObjectContextRef obc; bool can_create = op->may_write() || op->may_cache(); hobject_t missing_oid; - hobject_t oid(m->get_oid(), - m->get_object_locator().key, - m->get_snapid(), - m->get_pg().ps(), - m->get_object_locator().get_pool(), - m->get_object_locator().nspace); + const hobject_t& oid = m->get_hobj(); // io blocked on obc? if (!m->has_flag(CEPH_OSD_FLAG_FLUSH) && @@ -2368,7 +2355,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail( } MOSDOp *m = static_cast<MOSDOp*>(op->get_req()); - const object_locator_t& oloc = m->get_object_locator(); + const object_locator_t oloc = m->get_object_locator(); if (op->need_skip_handle_cache()) { return cache_result_t::NOOP; @@ -2626,12 +2613,7 @@ void PrimaryLogPG::do_proxy_read(OpRequestRef op) object_locator_t oloc(m->get_object_locator()); oloc.pool = pool.info.tier_of; - hobject_t soid(m->get_oid(), - m->get_object_locator().key, - m->get_snapid(), - m->get_pg().ps(), - m->get_object_locator().get_pool(), - m->get_object_locator().nspace); + const hobject_t& soid = m->get_hobj(); unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY; // pass through some original flags that make sense. @@ -2821,12 +2803,7 @@ void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid) oloc.pool = pool.info.tier_of; SnapContext snapc(m->get_snap_seq(), m->get_snaps()); - hobject_t soid(m->get_oid(), - m->get_object_locator().key, - missing_oid.snap, - m->get_pg().ps(), - m->get_object_locator().get_pool(), - m->get_object_locator().nspace); + const hobject_t& soid = m->get_hobj(); unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY; dout(10) << __func__ << " Start proxy write for " << *m << dendl; diff --git a/src/osd/Session.cc b/src/osd/Session.cc index 125bd1f1579..2de44271344 100644 --- a/src/osd/Session.cc +++ b/src/osd/Session.cc @@ -11,25 +11,27 @@ void Session::clear_backoffs() { - map<hobject_t,set<BackoffRef>> ls; + map<spg_t,map<hobject_t,set<BackoffRef>>> ls; { Mutex::Locker l(backoff_lock); ls.swap(backoffs); backoff_count = 0; } - for (auto& p : ls) { - for (auto& b : p.second) { - Mutex::Locker l(b->lock); - if (b->pg) { - assert(b->session == this); - assert(b->is_new() || b->is_acked()); - b->pg->rm_backoff(b); - b->pg.reset(); - b->session.reset(); - } else if (b->session) { - assert(b->session == this); - assert(b->is_deleting()); - b->session.reset(); + for (auto& i : ls) { + for (auto& p : i.second) { + for (auto& b : p.second) { + Mutex::Locker l(b->lock); + if (b->pg) { + assert(b->session == this); + assert(b->is_new() || b->is_acked()); + b->pg->rm_backoff(b); + b->pg.reset(); + b->session.reset(); + } else if (b->session) { + assert(b->session == this); + assert(b->is_deleting()); + b->session.reset(); + } } } } @@ -37,42 +39,65 @@ void Session::clear_backoffs() void Session::ack_backoff( CephContext *cct, + spg_t pgid, uint64_t id, const hobject_t& begin, const hobject_t& end) { Mutex::Locker l(backoff_lock); - // NOTE that ack may be for [a,c] but osd may now have [a,b) and - // [b,c) due to a PG split. - auto p = backoffs.lower_bound(begin); - while (p != backoffs.end()) { - // note: must still examine begin=end=p->first case - int r = cmp(p->first, end); - if (r > 0 || (r == 0 && begin < end)) { - break; - } - auto q = p->second.begin(); - while (q != p->second.end()) { - Backoff *b = (*q).get(); - if (b->id == id) { - if (b->is_new()) { - b->state = Backoff::STATE_ACKED; - dout(20) << __func__ << " now " << *b << dendl; - } else if (b->is_deleting()) { - dout(20) << __func__ << " deleting " << *b << dendl; - q = p->second.erase(q); - continue; - } + auto p = backoffs.find(pgid); + if (p == backoffs.end()) { + dout(20) << __func__ << " " << pgid << " " << id << " [" << begin << "," + << end << ") pg not found" << dendl; + return; + } + auto q = p->second.find(begin); + if (q == p->second.end()) { + dout(20) << __func__ << " " << pgid << " " << id << " [" << begin << "," + << end << ") begin not found" << dendl; + return; + } + for (auto i = q->second.begin(); i != q->second.end(); ++i) { + Backoff *b = (*i).get(); + if (b->id == id) { + if (b->is_new()) { + b->state = Backoff::STATE_ACKED; + dout(20) << __func__ << " now " << *b << dendl; + } else if (b->is_deleting()) { + dout(20) << __func__ << " deleting " << *b << dendl; + q->second.erase(i); + --backoff_count; } - ++q; + break; } + } + if (q->second.empty()) { + dout(20) << __func__ << " clearing begin bin " << q->first << dendl; + p->second.erase(q); if (p->second.empty()) { - dout(20) << __func__ << " clearing bin " << p->first << dendl; - p = backoffs.erase(p); - --backoff_count; - } else { - ++p; + dout(20) << __func__ << " clearing pg bin " << p->first << dendl; + backoffs.erase(p); } } - assert(backoff_count == (int)backoffs.size()); + assert(!backoff_count == backoffs.empty()); +} + +bool Session::check_backoff( + CephContext *cct, spg_t pgid, const hobject_t& oid, Message *m) +{ + BackoffRef b(have_backoff(pgid, oid)); + if (b) { + dout(10) << __func__ << " session " << this << " has backoff " << *b + << " for " << *m << dendl; + assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff); + return true; + } + // we may race with ms_handle_reset. it clears session->con before removing + // backoffs, so if we see con is cleared here we have to abort this + // request. + if (!con) { + dout(10) << __func__ << " session " << this << " disconnected" << dendl; + return true; + } + return false; } diff --git a/src/osd/Session.h b/src/osd/Session.h index 6492c4bd4e4..257bc34a155 100644 --- a/src/osd/Session.h +++ b/src/osd/Session.h @@ -67,6 +67,7 @@ struct Backoff : public RefCountedObject { STATE_DELETING = 3 ///< backoff deleted, but un-acked }; std::atomic_int state = {STATE_NEW}; + spg_t pgid; ///< owning pgid uint64_t id = 0; ///< unique id (within the Session) bool is_new() const { @@ -96,10 +97,11 @@ struct Backoff : public RefCountedObject { SessionRef session; ///< owning session hobject_t begin, end; ///< [) range to block, unless ==, then single obj - Backoff(PGRef pg, SessionRef s, + Backoff(spg_t pgid, PGRef pg, SessionRef s, uint64_t i, const hobject_t& b, const hobject_t& e) : RefCountedObject(g_ceph_context, 0), + pgid(pgid), id(i), lock("Backoff::lock"), pg(pg), @@ -108,7 +110,7 @@ struct Backoff : public RefCountedObject { end(e) {} friend ostream& operator<<(ostream& out, const Backoff& b) { - return out << "Backoff(" << &b << " " << b.id + return out << "Backoff(" << &b << " " << b.pgid << " " << b.id << " " << b.get_state_name() << " [" << b.begin << "," << b.end << ") " << " session " << b.session @@ -139,7 +141,7 @@ struct Session : public RefCountedObject { /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock Mutex backoff_lock; std::atomic_int backoff_count= {0}; ///< simple count of backoffs - map<hobject_t,set<BackoffRef>> backoffs; + map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs; std::atomic<uint64_t> backoff_seq = {0}; @@ -159,26 +161,32 @@ struct Session : public RefCountedObject { void ack_backoff( CephContext *cct, + spg_t pgid, uint64_t id, const hobject_t& start, const hobject_t& end); - Backoff *have_backoff(const hobject_t& oid) { - if (backoff_count.load()) { - Mutex::Locker l(backoff_lock); - assert(backoff_count == (int)backoffs.size()); - auto p = backoffs.lower_bound(oid); - if (p != backoffs.begin() && - p->first > oid) { - --p; - } - if (p != backoffs.end()) { - int r = cmp(oid, p->first); - if (r == 0 || r > 0) { - for (auto& q : p->second) { - if (r == 0 || oid < q->end) { - return &(*q); - } + BackoffRef have_backoff(spg_t pgid, const hobject_t& oid) { + if (!backoff_count.load()) { + return nullptr; + } + Mutex::Locker l(backoff_lock); + assert(!backoff_count == backoffs.empty()); + auto i = backoffs.find(pgid); + if (i == backoffs.end()) { + return nullptr; + } + auto p = i->second.lower_bound(oid); + if (p != i->second.begin() && + p->first > oid) { + --p; + } + if (p != i->second.end()) { + int r = cmp(oid, p->first); + if (r == 0 || r > 0) { + for (auto& q : p->second) { + if (r == 0 || oid < q->end) { + return &(*q); } } } @@ -186,24 +194,40 @@ struct Session : public RefCountedObject { return nullptr; } + bool check_backoff( + CephContext *cct, spg_t pgid, const hobject_t& oid, Message *m); + + void add_backoff(BackoffRef b) { + Mutex::Locker l(backoff_lock); + assert(!backoff_count == backoffs.empty()); + backoffs[b->pgid][b->begin].insert(b); + ++backoff_count; + } + // called by PG::release_*_backoffs and PG::clear_backoffs() void rm_backoff(BackoffRef b) { Mutex::Locker l(backoff_lock); assert(b->lock.is_locked_by_me()); assert(b->session == this); - auto p = backoffs.find(b->begin); - // may race with clear_backoffs() - if (p != backoffs.end()) { - auto q = p->second.find(b); - if (q != p->second.end()) { - p->second.erase(q); - if (p->second.empty()) { - backoffs.erase(p); + auto i = backoffs.find(b->pgid); + if (i != backoffs.end()) { + // may race with clear_backoffs() + auto p = i->second.find(b->begin); + if (p != i->second.end()) { + auto q = p->second.find(b); + if (q != p->second.end()) { + p->second.erase(q); --backoff_count; + if (p->second.empty()) { + i->second.erase(p); + if (i->second.empty()) { + backoffs.erase(i); + } + } } } } - assert(backoff_count == (int)backoffs.size()); + assert(!backoff_count == backoffs.empty()); } void clear_backoffs(); }; diff --git a/src/osd/osd_internal_types.h b/src/osd/osd_internal_types.h new file mode 100644 index 00000000000..a415a4bf6e2 --- /dev/null +++ b/src/osd/osd_internal_types.h @@ -0,0 +1,513 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_OSD_INTERNAL_TYPES_H +#define CEPH_OSD_INTERNAL_TYPES_H + +#include "osd_types.h" +#include "OpRequest.h" + +/* + * keep tabs on object modifications that are in flight. + * we need to know the projected existence, size, snapset, + * etc., because we don't send writes down to disk until after + * replicas ack. + */ + +struct ObjectContext; + +struct ObjectState { + object_info_t oi; + bool exists; ///< the stored object exists (i.e., we will remember the object_info_t) + + ObjectState() : exists(false) {} + + ObjectState(const object_info_t &oi_, bool exists_) + : oi(oi_), exists(exists_) {} +}; + +typedef ceph::shared_ptr<ObjectContext> ObjectContextRef; + +struct ObjectContext { + ObjectState obs; + + SnapSetContext *ssc; // may be null + + Context *destructor_callback; + +private: + Mutex lock; +public: + Cond cond; + int unstable_writes, readers, writers_waiting, readers_waiting; + + + // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. + map<pair<uint64_t, entity_name_t>, WatchRef> watchers; + + // attr cache + map<string, bufferlist> attr_cache; + + struct RWState { + enum State { + RWNONE, + RWREAD, + RWWRITE, + RWEXCL, + }; + static const char *get_state_name(State s) { + switch (s) { + case RWNONE: return "none"; + case RWREAD: return "read"; + case RWWRITE: return "write"; + case RWEXCL: return "excl"; + default: return "???"; + } + } + const char *get_state_name() const { + return get_state_name(state); + } + + list<OpRequestRef> waiters; ///< ops waiting on state change + int count; ///< number of readers or writers + + State state:4; ///< rw state + /// if set, restart backfill when we can get a read lock + bool recovery_read_marker:1; + /// if set, requeue snaptrim on lock release + bool snaptrimmer_write_marker:1; + + RWState() + : count(0), + state(RWNONE), + recovery_read_marker(false), + snaptrimmer_write_marker(false) + {} + bool get_read(OpRequestRef op) { + if (get_read_lock()) { + return true; + } // else + waiters.push_back(op); + return false; + } + /// this function adjusts the counts if necessary + bool get_read_lock() { + // don't starve anybody! + if (!waiters.empty()) { + return false; + } + switch (state) { + case RWNONE: + assert(count == 0); + state = RWREAD; + // fall through + case RWREAD: + count++; + return true; + case RWWRITE: + return false; + case RWEXCL: + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + + bool get_write(OpRequestRef op, bool greedy=false) { + if (get_write_lock(greedy)) { + return true; + } // else + if (op) + waiters.push_back(op); + return false; + } + bool get_write_lock(bool greedy=false) { + if (!greedy) { + // don't starve anybody! + if (!waiters.empty() || + recovery_read_marker) { + return false; + } + } + switch (state) { + case RWNONE: + assert(count == 0); + state = RWWRITE; + // fall through + case RWWRITE: + count++; + return true; + case RWREAD: + return false; + case RWEXCL: + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + bool get_excl_lock() { + switch (state) { + case RWNONE: + assert(count == 0); + state = RWEXCL; + count = 1; + return true; + case RWWRITE: + return false; + case RWREAD: + return false; + case RWEXCL: + return false; + default: + assert(0 == "unhandled case"); + return false; + } + } + bool get_excl(OpRequestRef op) { + if (get_excl_lock()) { + return true; + } // else + if (op) + waiters.push_back(op); + return false; + } + /// same as get_write_lock, but ignore starvation + bool take_write_lock() { + if (state == RWWRITE) { + count++; + return true; + } + return get_write_lock(); + } + void dec(list<OpRequestRef> *requeue) { + assert(count > 0); + assert(requeue); + count--; + if (count == 0) { + state = RWNONE; + requeue->splice(requeue->end(), waiters); + } + } + void put_read(list<OpRequestRef> *requeue) { + assert(state == RWREAD); + dec(requeue); + } + void put_write(list<OpRequestRef> *requeue) { + assert(state == RWWRITE); + dec(requeue); + } + void put_excl(list<OpRequestRef> *requeue) { + assert(state == RWEXCL); + dec(requeue); + } + bool empty() const { return state == RWNONE; } + } rwstate; + + bool get_read(OpRequestRef op) { + return rwstate.get_read(op); + } + bool get_write(OpRequestRef op) { + return rwstate.get_write(op, false); + } + bool get_excl(OpRequestRef op) { + return rwstate.get_excl(op); + } + bool get_lock_type(OpRequestRef op, RWState::State type) { + switch (type) { + case RWState::RWWRITE: + return get_write(op); + case RWState::RWREAD: + return get_read(op); + case RWState::RWEXCL: + return get_excl(op); + default: + assert(0 == "invalid lock type"); + return true; + } + } + bool get_write_greedy(OpRequestRef op) { + return rwstate.get_write(op, true); + } + bool get_snaptrimmer_write(bool mark_if_unsuccessful) { + if (rwstate.get_write_lock()) { + return true; + } else { + if (mark_if_unsuccessful) + rwstate.snaptrimmer_write_marker = true; + return false; + } + } + bool get_recovery_read() { + rwstate.recovery_read_marker = true; + if (rwstate.get_read_lock()) { + return true; + } + return false; + } + bool try_get_read_lock() { + return rwstate.get_read_lock(); + } + void drop_recovery_read(list<OpRequestRef> *ls) { + assert(rwstate.recovery_read_marker); + rwstate.put_read(ls); + rwstate.recovery_read_marker = false; + } + void put_read(list<OpRequestRef> *to_wake) { + rwstate.put_read(to_wake); + } + void put_excl(list<OpRequestRef> *to_wake, + bool *requeue_recovery, + bool *requeue_snaptrimmer) { + rwstate.put_excl(to_wake); + if (rwstate.empty() && rwstate.recovery_read_marker) { + rwstate.recovery_read_marker = false; + *requeue_recovery = true; + } + if (rwstate.empty() && rwstate.snaptrimmer_write_marker) { + rwstate.snaptrimmer_write_marker = false; + *requeue_snaptrimmer = true; + } + } + void put_write(list<OpRequestRef> *to_wake, + bool *requeue_recovery, + bool *requeue_snaptrimmer) { + rwstate.put_write(to_wake); + if (rwstate.empty() && rwstate.recovery_read_marker) { + rwstate.recovery_read_marker = false; + *requeue_recovery = true; + } + if (rwstate.empty() && rwstate.snaptrimmer_write_marker) { + rwstate.snaptrimmer_write_marker = false; + *requeue_snaptrimmer = true; + } + } + void put_lock_type( + ObjectContext::RWState::State type, + list<OpRequestRef> *to_wake, + bool *requeue_recovery, + bool *requeue_snaptrimmer) { + switch (type) { + case ObjectContext::RWState::RWWRITE: + return put_write(to_wake, requeue_recovery, requeue_snaptrimmer); + case ObjectContext::RWState::RWREAD: + return put_read(to_wake); + case ObjectContext::RWState::RWEXCL: + return put_excl(to_wake, requeue_recovery, requeue_snaptrimmer); + default: + assert(0 == "invalid lock type"); + } + } + bool is_request_pending() { + return (rwstate.count > 0); + } + + ObjectContext() + : ssc(NULL), + destructor_callback(0), + lock("PrimaryLogPG::ObjectContext::lock"), + unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0), + blocked(false), requeue_scrub_on_unblock(false) {} + + ~ObjectContext() { + assert(rwstate.empty()); + if (destructor_callback) + destructor_callback->complete(0); + } + + void start_block() { + assert(!blocked); + blocked = true; + } + void stop_block() { + assert(blocked); + blocked = false; + } + bool is_blocked() const { + return blocked; + } + + // do simple synchronous mutual exclusion, for now. no waitqueues or anything fancy. + void ondisk_write_lock() { + lock.Lock(); + writers_waiting++; + while (readers_waiting || readers) + cond.Wait(lock); + writers_waiting--; + unstable_writes++; + lock.Unlock(); + } + void ondisk_write_unlock() { + lock.Lock(); + assert(unstable_writes > 0); + unstable_writes--; + if (!unstable_writes && readers_waiting) + cond.Signal(); + lock.Unlock(); + } + void ondisk_read_lock() { + lock.Lock(); + readers_waiting++; + while (unstable_writes) + cond.Wait(lock); + readers_waiting--; + readers++; + lock.Unlock(); + } + void ondisk_read_unlock() { + lock.Lock(); + assert(readers > 0); + readers--; + if (!readers && writers_waiting) + cond.Signal(); + lock.Unlock(); + } + + /// in-progress copyfrom ops for this object + bool blocked:1; + bool requeue_scrub_on_unblock:1; // true if we need to requeue scrub on unblock + +}; + +inline ostream& operator<<(ostream& out, const ObjectState& obs) +{ + out << obs.oi.soid; + if (!obs.exists) + out << "(dne)"; + return out; +} + +inline ostream& operator<<(ostream& out, const ObjectContext::RWState& rw) +{ + return out << "rwstate(" << rw.get_state_name() + << " n=" << rw.count + << " w=" << rw.waiters.size() + << ")"; +} + +inline ostream& operator<<(ostream& out, const ObjectContext& obc) +{ + return out << "obc(" << obc.obs << " " << obc.rwstate << ")"; +} + +class ObcLockManager { + struct ObjectLockState { + ObjectContextRef obc; + ObjectContext::RWState::State type; + ObjectLockState( + ObjectContextRef obc, + ObjectContext::RWState::State type) + : obc(obc), type(type) {} + }; + map<hobject_t, ObjectLockState> locks; +public: + ObcLockManager() = default; + ObcLockManager(ObcLockManager &&) = default; + ObcLockManager(const ObcLockManager &) = delete; + ObcLockManager &operator=(ObcLockManager &&) = default; + bool empty() const { + return locks.empty(); + } + bool get_lock_type( + ObjectContext::RWState::State type, + const hobject_t &hoid, + ObjectContextRef obc, + OpRequestRef op) { + assert(locks.find(hoid) == locks.end()); + if (obc->get_lock_type(op, type)) { + locks.insert(make_pair(hoid, ObjectLockState(obc, type))); + return true; + } else { + return false; + } + } + /// Get write lock, ignore starvation + bool take_write_lock( + const hobject_t &hoid, + ObjectContextRef obc) { + assert(locks.find(hoid) == locks.end()); + if (obc->rwstate.take_write_lock()) { + locks.insert( + make_pair( + hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE))); + return true; + } else { + return false; + } + } + /// Get write lock for snap trim + bool get_snaptrimmer_write( + const hobject_t &hoid, + ObjectContextRef obc, + bool mark_if_unsuccessful) { + assert(locks.find(hoid) == locks.end()); + if (obc->get_snaptrimmer_write(mark_if_unsuccessful)) { + locks.insert( + make_pair( + hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE))); + return true; + } else { + return false; + } + } + /// Get write lock greedy + bool get_write_greedy( + const hobject_t &hoid, + ObjectContextRef obc, + OpRequestRef op) { + assert(locks.find(hoid) == locks.end()); + if (obc->get_write_greedy(op)) { + locks.insert( + make_pair( + hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE))); + return true; + } else { + return false; + } + } + + /// try get read lock + bool try_get_read_lock( + const hobject_t &hoid, + ObjectContextRef obc) { + assert(locks.find(hoid) == locks.end()); + if (obc->try_get_read_lock()) { + locks.insert( + make_pair( + hoid, + ObjectLockState(obc, ObjectContext::RWState::RWREAD))); + return true; + } else { + return false; + } + } + + void put_locks( + list<pair<hobject_t, list<OpRequestRef> > > *to_requeue, + bool *requeue_recovery, + bool *requeue_snaptrimmer) { + for (auto p: locks) { + list<OpRequestRef> _to_requeue; + p.second.obc->put_lock_type( + p.second.type, + &_to_requeue, + requeue_recovery, + requeue_snaptrimmer); + if (to_requeue) { + to_requeue->push_back( + make_pair( + p.second.obc->obs.oi.soid, + std::move(_to_requeue))); + } + } + locks.clear(); + } + ~ObcLockManager() { + assert(locks.empty()); + } +}; + + + +#endif diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index ff3c97268b6..d16e2d19aa1 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -1130,6 +1130,8 @@ void pg_pool_t::dump(Formatter *f) const f->dump_unsigned("crash_replay_interval", get_crash_replay_interval()); f->dump_stream("last_change") << get_last_change(); f->dump_stream("last_force_op_resend") << get_last_force_op_resend(); + f->dump_stream("last_force_op_resend_preluminous") + << get_last_force_op_resend_preluminous(); f->dump_unsigned("auid", get_auid()); f->dump_string("snap_mode", is_pool_snaps_mode() ? "pool" : "selfmanaged"); f->dump_unsigned("snap_seq", get_snap_seq()); @@ -1482,12 +1484,17 @@ void pg_pool_t::encode(bufferlist& bl, uint64_t features) const return; } - uint8_t v = 24; + uint8_t v = 25; if (!(features & CEPH_FEATURE_NEW_OSDOP_ENCODING)) { // this was the first post-hammer thing we added; if it's missing, encode // like hammer. v = 21; } + if ((features & + (CEPH_FEATURE_RESEND_ON_SPLIT|CEPH_FEATURE_SERVER_JEWEL)) != + (CEPH_FEATURE_RESEND_ON_SPLIT|CEPH_FEATURE_SERVER_JEWEL)) { + v = 24; + } ENCODE_START(v, 5, bl); ::encode(type, bl); @@ -1528,7 +1535,7 @@ void pg_pool_t::encode(bufferlist& bl, uint64_t features) const ::encode(cache_min_flush_age, bl); ::encode(cache_min_evict_age, bl); ::encode(erasure_code_profile, bl); - ::encode(last_force_op_resend, bl); + ::encode(last_force_op_resend_preluminous, bl); ::encode(min_read_recency_for_promote, bl); ::encode(expected_num_objects, bl); if (v >= 19) { @@ -1550,6 +1557,9 @@ void pg_pool_t::encode(bufferlist& bl, uint64_t features) const if (v >= 24) { ::encode(opts, bl); } + if (v >= 25) { + ::encode(last_force_op_resend, bl); + } ENCODE_FINISH(bl); } @@ -1653,9 +1663,9 @@ void pg_pool_t::decode(bufferlist::iterator& bl) ::decode(erasure_code_profile, bl); } if (struct_v >= 15) { - ::decode(last_force_op_resend, bl); + ::decode(last_force_op_resend_preluminous, bl); } else { - last_force_op_resend = 0; + last_force_op_resend_preluminous = 0; } if (struct_v >= 16) { ::decode(min_read_recency_for_promote, bl); @@ -1697,6 +1707,11 @@ void pg_pool_t::decode(bufferlist::iterator& bl) if (struct_v >= 24) { ::decode(opts, bl); } + if (struct_v >= 25) { + ::decode(last_force_op_resend, bl); + } else { + last_force_op_resend = last_force_op_resend_preluminous; + } DECODE_FINISH(bl); calc_pg_masks(); calc_grade_table(); @@ -1715,6 +1730,7 @@ void pg_pool_t::generate_test_instances(list<pg_pool_t*>& o) a.pgp_num = 5; a.last_change = 9; a.last_force_op_resend = 123823; + a.last_force_op_resend_preluminous = 123824; a.snap_seq = 10; a.snap_epoch = 11; a.auid = 12; @@ -1772,8 +1788,10 @@ ostream& operator<<(ostream& out, const pg_pool_t& p) << " pg_num " << p.get_pg_num() << " pgp_num " << p.get_pgp_num() << " last_change " << p.get_last_change(); - if (p.get_last_force_op_resend()) - out << " lfor " << p.get_last_force_op_resend(); + if (p.get_last_force_op_resend() || + p.get_last_force_op_resend_preluminous()) + out << " lfor " << p.get_last_force_op_resend() << "/" + << p.get_last_force_op_resend_preluminous(); if (p.get_auid()) out << " owner " << p.get_auid(); if (p.flags) diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 318a902856a..671e0ecc262 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -41,7 +41,6 @@ #include "common/snap_types.h" #include "HitSet.h" #include "Watch.h" -#include "OpRequest.h" #include "include/cmp.h" #include "librados/ListObjectImpl.h" #include "compressor/Compressor.h" @@ -99,6 +98,36 @@ string ceph_osd_op_flag_string(unsigned flags); /// conver CEPH_OSD_ALLOC_HINT_FLAG_* op flags to a string string ceph_osd_alloc_hint_flag_string(unsigned flags); + +/** + * osd request identifier + * + * caller name + incarnation# + tid to unique identify this request. + */ +struct osd_reqid_t { + entity_name_t name; // who + ceph_tid_t tid; + int32_t inc; // incarnation + + osd_reqid_t() + : tid(0), inc(0) {} + osd_reqid_t(const entity_name_t& a, int i, ceph_tid_t t) + : name(a), tid(t), inc(i) {} + + DENC(osd_reqid_t, v, p) { + DENC_START(2, 2, p); + denc(v.name, p); + denc(v.tid, p); + denc(v.inc, p); + DENC_FINISH(p); + } + void dump(Formatter *f) const; + static void generate_test_instances(list<osd_reqid_t*>& o); +}; +WRITE_CLASS_DENC(osd_reqid_t) + + + struct pg_shard_t { int32_t osd; shard_id_t shard; @@ -1220,6 +1249,8 @@ public: string erasure_code_profile; ///< name of the erasure code profile in OSDMap epoch_t last_change; ///< most recent epoch changed, exclusing snapshot changes epoch_t last_force_op_resend; ///< last epoch that forced clients to resend + /// last epoch that forced clients to resend (pre-luminous clients only) + epoch_t last_force_op_resend_preluminous; snapid_t snap_seq; ///< seq for per-pool snapshot epoch_t snap_epoch; ///< osdmap epoch of last snap uint64_t auid; ///< who owns the pg @@ -1334,6 +1365,7 @@ public: pg_num(0), pgp_num(0), last_change(0), last_force_op_resend(0), + last_force_op_resend_preluminous(0), snap_seq(0), snap_epoch(0), auid(0), crash_replay_interval(0), @@ -1390,6 +1422,9 @@ public: } epoch_t get_last_change() const { return last_change; } epoch_t get_last_force_op_resend() const { return last_force_op_resend; } + epoch_t get_last_force_op_resend_preluminous() const { + return last_force_op_resend_preluminous; + } epoch_t get_snap_epoch() const { return snap_epoch; } snapid_t get_snap_seq() const { return snap_seq; } uint64_t get_auid() const { return auid; } @@ -1462,6 +1497,11 @@ public: return quota_max_objects; } + void set_last_force_op_resend(uint64_t t) { + last_force_op_resend = t; + last_force_op_resend_preluminous = t; + } + void calc_pg_masks(); /* @@ -4081,16 +4121,6 @@ struct object_info_t { }; WRITE_CLASS_ENCODER_FEATURES(object_info_t) -struct ObjectState { - object_info_t oi; - bool exists; ///< the stored object exists (i.e., we will remember the object_info_t) - - ObjectState() : exists(false) {} - - ObjectState(const object_info_t &oi_, bool exists_) - : oi(oi_), exists(exists_) {} -}; - struct SnapSetContext { hobject_t oid; SnapSet snapset; @@ -4102,500 +4132,10 @@ struct SnapSetContext { oid(o), ref(0), registered(false), exists(true) { } }; -/* - * keep tabs on object modifications that are in flight. - * we need to know the projected existence, size, snapset, - * etc., because we don't send writes down to disk until after - * replicas ack. - */ - -struct ObjectContext; - -typedef ceph::shared_ptr<ObjectContext> ObjectContextRef; - -struct ObjectContext { - ObjectState obs; - - SnapSetContext *ssc; // may be null - - Context *destructor_callback; - -private: - Mutex lock; -public: - Cond cond; - int unstable_writes, readers, writers_waiting, readers_waiting; - - - // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers. - map<pair<uint64_t, entity_name_t>, WatchRef> watchers; - - // attr cache - map<string, bufferlist> attr_cache; - - struct RWState { - enum State { - RWNONE, - RWREAD, - RWWRITE, - RWEXCL, - }; - static const char *get_state_name(State s) { - switch (s) { - case RWNONE: return "none"; - case RWREAD: return "read"; - case RWWRITE: return "write"; - case RWEXCL: return "excl"; - default: return "???"; - } - } - const char *get_state_name() const { - return get_state_name(state); - } - - list<OpRequestRef> waiters; ///< ops waiting on state change - int count; ///< number of readers or writers - - State state:4; ///< rw state - /// if set, restart backfill when we can get a read lock - bool recovery_read_marker:1; - /// if set, requeue snaptrim on lock release - bool snaptrimmer_write_marker:1; - - RWState() - : count(0), - state(RWNONE), - recovery_read_marker(false), - snaptrimmer_write_marker(false) - {} - bool get_read(OpRequestRef op) { - if (get_read_lock()) { - return true; - } // else - waiters.push_back(op); - return false; - } - /// this function adjusts the counts if necessary - bool get_read_lock() { - // don't starve anybody! - if (!waiters.empty()) { - return false; - } - switch (state) { - case RWNONE: - assert(count == 0); - state = RWREAD; - // fall through - case RWREAD: - count++; - return true; - case RWWRITE: - return false; - case RWEXCL: - return false; - default: - assert(0 == "unhandled case"); - return false; - } - } - - bool get_write(OpRequestRef op, bool greedy=false) { - if (get_write_lock(greedy)) { - return true; - } // else - if (op) - waiters.push_back(op); - return false; - } - bool get_write_lock(bool greedy=false) { - if (!greedy) { - // don't starve anybody! - if (!waiters.empty() || - recovery_read_marker) { - return false; - } - } - switch (state) { - case RWNONE: - assert(count == 0); - state = RWWRITE; - // fall through - case RWWRITE: - count++; - return true; - case RWREAD: - return false; - case RWEXCL: - return false; - default: - assert(0 == "unhandled case"); - return false; - } - } - bool get_excl_lock() { - switch (state) { - case RWNONE: - assert(count == 0); - state = RWEXCL; - count = 1; - return true; - case RWWRITE: - return false; - case RWREAD: - return false; - case RWEXCL: - return false; - default: - assert(0 == "unhandled case"); - return false; - } - } - bool get_excl(OpRequestRef op) { - if (get_excl_lock()) { - return true; - } // else - if (op) - waiters.push_back(op); - return false; - } - /// same as get_write_lock, but ignore starvation - bool take_write_lock() { - if (state == RWWRITE) { - count++; - return true; - } - return get_write_lock(); - } - void dec(list<OpRequestRef> *requeue) { - assert(count > 0); - assert(requeue); - count--; - if (count == 0) { - state = RWNONE; - requeue->splice(requeue->end(), waiters); - } - } - void put_read(list<OpRequestRef> *requeue) { - assert(state == RWREAD); - dec(requeue); - } - void put_write(list<OpRequestRef> *requeue) { - assert(state == RWWRITE); - dec(requeue); - } - void put_excl(list<OpRequestRef> *requeue) { - assert(state == RWEXCL); - dec(requeue); - } - bool empty() const { return state == RWNONE; } - } rwstate; - - bool get_read(OpRequestRef op) { - return rwstate.get_read(op); - } - bool get_write(OpRequestRef op) { - return rwstate.get_write(op, false); - } - bool get_excl(OpRequestRef op) { - return rwstate.get_excl(op); - } - bool get_lock_type(OpRequestRef op, RWState::State type) { - switch (type) { - case RWState::RWWRITE: - return get_write(op); - case RWState::RWREAD: - return get_read(op); - case RWState::RWEXCL: - return get_excl(op); - default: - assert(0 == "invalid lock type"); - return true; - } - } - bool get_write_greedy(OpRequestRef op) { - return rwstate.get_write(op, true); - } - bool get_snaptrimmer_write(bool mark_if_unsuccessful) { - if (rwstate.get_write_lock()) { - return true; - } else { - if (mark_if_unsuccessful) - rwstate.snaptrimmer_write_marker = true; - return false; - } - } - bool get_recovery_read() { - rwstate.recovery_read_marker = true; - if (rwstate.get_read_lock()) { - return true; - } - return false; - } - bool try_get_read_lock() { - return rwstate.get_read_lock(); - } - void drop_recovery_read(list<OpRequestRef> *ls) { - assert(rwstate.recovery_read_marker); - rwstate.put_read(ls); - rwstate.recovery_read_marker = false; - } - void put_read(list<OpRequestRef> *to_wake) { - rwstate.put_read(to_wake); - } - void put_excl(list<OpRequestRef> *to_wake, - bool *requeue_recovery, - bool *requeue_snaptrimmer) { - rwstate.put_excl(to_wake); - if (rwstate.empty() && rwstate.recovery_read_marker) { - rwstate.recovery_read_marker = false; - *requeue_recovery = true; - } - if (rwstate.empty() && rwstate.snaptrimmer_write_marker) { - rwstate.snaptrimmer_write_marker = false; - *requeue_snaptrimmer = true; - } - } - void put_write(list<OpRequestRef> *to_wake, - bool *requeue_recovery, - bool *requeue_snaptrimmer) { - rwstate.put_write(to_wake); - if (rwstate.empty() && rwstate.recovery_read_marker) { - rwstate.recovery_read_marker = false; - *requeue_recovery = true; - } - if (rwstate.empty() && rwstate.snaptrimmer_write_marker) { - rwstate.snaptrimmer_write_marker = false; - *requeue_snaptrimmer = true; - } - } - void put_lock_type( - ObjectContext::RWState::State type, - list<OpRequestRef> *to_wake, - bool *requeue_recovery, - bool *requeue_snaptrimmer) { - switch (type) { - case ObjectContext::RWState::RWWRITE: - return put_write(to_wake, requeue_recovery, requeue_snaptrimmer); - case ObjectContext::RWState::RWREAD: - return put_read(to_wake); - case ObjectContext::RWState::RWEXCL: - return put_excl(to_wake, requeue_recovery, requeue_snaptrimmer); - default: - assert(0 == "invalid lock type"); - } - } - bool is_request_pending() { - return (rwstate.count > 0); - } - - ObjectContext() - : ssc(NULL), - destructor_callback(0), - lock("PrimaryLogPG::ObjectContext::lock"), - unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0), - blocked(false), requeue_scrub_on_unblock(false) {} - - ~ObjectContext() { - assert(rwstate.empty()); - if (destructor_callback) - destructor_callback->complete(0); - } - - void start_block() { - assert(!blocked); - blocked = true; - } - void stop_block() { - assert(blocked); - blocked = false; - } - bool is_blocked() const { - return blocked; - } - - // do simple synchronous mutual exclusion, for now. no waitqueues or anything fancy. - void ondisk_write_lock() { - lock.Lock(); - writers_waiting++; - while (readers_waiting || readers) - cond.Wait(lock); - writers_waiting--; - unstable_writes++; - lock.Unlock(); - } - void ondisk_write_unlock() { - lock.Lock(); - assert(unstable_writes > 0); - unstable_writes--; - if (!unstable_writes && readers_waiting) - cond.Signal(); - lock.Unlock(); - } - void ondisk_read_lock() { - lock.Lock(); - readers_waiting++; - while (unstable_writes) - cond.Wait(lock); - readers_waiting--; - readers++; - lock.Unlock(); - } - void ondisk_read_unlock() { - lock.Lock(); - assert(readers > 0); - readers--; - if (!readers && writers_waiting) - cond.Signal(); - lock.Unlock(); - } - - /// in-progress copyfrom ops for this object - bool blocked:1; - bool requeue_scrub_on_unblock:1; // true if we need to requeue scrub on unblock - -}; - -inline ostream& operator<<(ostream& out, const ObjectState& obs) -{ - out << obs.oi.soid; - if (!obs.exists) - out << "(dne)"; - return out; -} - -inline ostream& operator<<(ostream& out, const ObjectContext::RWState& rw) -{ - return out << "rwstate(" << rw.get_state_name() - << " n=" << rw.count - << " w=" << rw.waiters.size() - << ")"; -} - -inline ostream& operator<<(ostream& out, const ObjectContext& obc) -{ - return out << "obc(" << obc.obs << " " << obc.rwstate << ")"; -} ostream& operator<<(ostream& out, const object_info_t& oi); -class ObcLockManager { - struct ObjectLockState { - ObjectContextRef obc; - ObjectContext::RWState::State type; - ObjectLockState( - ObjectContextRef obc, - ObjectContext::RWState::State type) - : obc(obc), type(type) {} - }; - map<hobject_t, ObjectLockState> locks; -public: - ObcLockManager() = default; - ObcLockManager(ObcLockManager &&) = default; - ObcLockManager(const ObcLockManager &) = delete; - ObcLockManager &operator=(ObcLockManager &&) = default; - bool empty() const { - return locks.empty(); - } - bool get_lock_type( - ObjectContext::RWState::State type, - const hobject_t &hoid, - ObjectContextRef obc, - OpRequestRef op) { - assert(locks.find(hoid) == locks.end()); - if (obc->get_lock_type(op, type)) { - locks.insert(make_pair(hoid, ObjectLockState(obc, type))); - return true; - } else { - return false; - } - } - /// Get write lock, ignore starvation - bool take_write_lock( - const hobject_t &hoid, - ObjectContextRef obc) { - assert(locks.find(hoid) == locks.end()); - if (obc->rwstate.take_write_lock()) { - locks.insert( - make_pair( - hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE))); - return true; - } else { - return false; - } - } - /// Get write lock for snap trim - bool get_snaptrimmer_write( - const hobject_t &hoid, - ObjectContextRef obc, - bool mark_if_unsuccessful) { - assert(locks.find(hoid) == locks.end()); - if (obc->get_snaptrimmer_write(mark_if_unsuccessful)) { - locks.insert( - make_pair( - hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE))); - return true; - } else { - return false; - } - } - /// Get write lock greedy - bool get_write_greedy( - const hobject_t &hoid, - ObjectContextRef obc, - OpRequestRef op) { - assert(locks.find(hoid) == locks.end()); - if (obc->get_write_greedy(op)) { - locks.insert( - make_pair( - hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE))); - return true; - } else { - return false; - } - } - - /// try get read lock - bool try_get_read_lock( - const hobject_t &hoid, - ObjectContextRef obc) { - assert(locks.find(hoid) == locks.end()); - if (obc->try_get_read_lock()) { - locks.insert( - make_pair( - hoid, - ObjectLockState(obc, ObjectContext::RWState::RWREAD))); - return true; - } else { - return false; - } - } - - void put_locks( - list<pair<hobject_t, list<OpRequestRef> > > *to_requeue, - bool *requeue_recovery, - bool *requeue_snaptrimmer) { - for (auto p: locks) { - list<OpRequestRef> _to_requeue; - p.second.obc->put_lock_type( - p.second.type, - &_to_requeue, - requeue_recovery, - requeue_snaptrimmer); - if (to_requeue) { - to_requeue->push_back( - make_pair( - p.second.obc->obs.oi.soid, - std::move(_to_requeue))); - } - } - locks.clear(); - } - ~ObcLockManager() { - assert(locks.empty()); - } -}; - // Object recovery diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index ad578a81d1b..d83bcf17b50 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -857,7 +857,7 @@ void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul) // Populate Op::target OSDSession *s = NULL; - _calc_target(&info->target); + _calc_target(&info->target, nullptr); // Create LingerOp<->OSDSession relation int r = _get_session(info->target.osd, &s, sul); @@ -1069,7 +1069,8 @@ void Objecter::_scan_requests(OSDSession *s, if (pool_full_map) force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool]; - int r = _calc_target(&op->target); + int r = _calc_target(&op->target, + op->session ? op->session->con.get() : nullptr); switch (r) { case RECALC_OP_TARGET_NO_ACTION: if (!force_resend && @@ -1297,7 +1298,7 @@ void Objecter::handle_osd_map(MOSDMap *m) p != need_resend_linger.end(); ++p) { LingerOp *op = *p; if (!op->session) { - _calc_target(&op->target); + _calc_target(&op->target, nullptr); OSDSession *s = NULL; int const r = _get_session(op->target.osd, &s, sul); assert(r == 0); @@ -2292,7 +2293,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid) assert(op->session == NULL); OSDSession *s = NULL; - bool check_for_latest_map = _calc_target(&op->target) + bool check_for_latest_map = _calc_target(&op->target, nullptr) == RECALC_OP_TARGET_POOL_DNE; // Try to get a session, including a retry if we need to take write lock @@ -2309,7 +2310,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid) // map changed; recalculate mapping ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target" << dendl; - check_for_latest_map = _calc_target(&op->target) + check_for_latest_map = _calc_target(&op->target, nullptr) == RECALC_OP_TARGET_POOL_DNE; if (s) { put_session(s); @@ -2655,21 +2656,27 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key, return p->raw_hash_to_pg(p->hash_key(key, ns)); } -int Objecter::_calc_target(op_target_t *t, bool any_change) +int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change) { // rwlock is locked - bool is_read = t->flags & CEPH_OSD_FLAG_READ; bool is_write = t->flags & CEPH_OSD_FLAG_WRITE; + ldout(cct,20) << __func__ << " base " << t->base_oid << " " << t->base_oloc + << " precalc_pgid " << (int)t->precalc_pgid + << " pgid " << t->base_pgid + << (is_read ? " is_read" : "") + << (is_write ? " is_write" : "") + << dendl; const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool); if (!pi) { t->osd = -1; return RECALC_OP_TARGET_POOL_DNE; } + ldout(cct,30) << __func__ << " base pi " << pi + << " pg_num " << pi->get_pg_num() << dendl; bool force_resend = false; - bool need_check_tiering = false; if (osdmap->get_epoch() == pi->last_force_op_resend) { if (t->last_force_resend < pi->last_force_op_resend) { t->last_force_resend = pi->last_force_op_resend; @@ -2678,41 +2685,28 @@ int Objecter::_calc_target(op_target_t *t, bool any_change) force_resend = true; } } - if (t->target_oid.name.empty() || force_resend) { - t->target_oid = t->base_oid; - need_check_tiering = true; - } - if (t->target_oloc.empty() || force_resend) { - t->target_oloc = t->base_oloc; - need_check_tiering = true; - } - if (need_check_tiering && - (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { + // apply tiering + t->target_oid = t->base_oid; + t->target_oloc = t->base_oloc; + if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) { if (is_read && pi->has_read_tier()) t->target_oloc.pool = pi->read_tier; if (is_write && pi->has_write_tier()) t->target_oloc.pool = pi->write_tier; + pi = osdmap->get_pg_pool(t->target_oloc.pool); + if (!pi) { + t->osd = -1; + return RECALC_OP_TARGET_POOL_DNE; + } } pg_t pgid; if (t->precalc_pgid) { - assert(t->base_oid.name.empty()); // make sure this is a listing op - ldout(cct, 10) << __func__ << " have " << t->base_pgid << " pool " - << osdmap->have_pg_pool(t->base_pgid.pool()) << dendl; - if (!osdmap->have_pg_pool(t->base_pgid.pool())) { - t->osd = -1; - return RECALC_OP_TARGET_POOL_DNE; - } - if (osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) { - // if the SORTBITWISE flag is set, we know all OSDs are running - // jewel+. - pgid = t->base_pgid; - } else { - // legacy behavior. pre-jewel OSDs will fail if we send a - // full-hash pgid value. - pgid = osdmap->raw_pg_to_pg(t->base_pgid); - } + assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY); + assert(t->base_oid.name.empty()); // make sure this is a pg op + assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool()); + pgid = t->base_pgid; } else { int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc, pgid); @@ -2721,6 +2715,10 @@ int Objecter::_calc_target(op_target_t *t, bool any_change) return RECALC_OP_TARGET_POOL_DNE; } } + ldout(cct,20) << __func__ << " target " << t->target_oid << " " + << t->target_oloc << " -> pgid " << pgid << dendl; + ldout(cct,30) << __func__ << " target pi " << pi + << " pg_num " << pi->get_pg_num() << dendl; int size = pi->size; int min_size = pi->min_size; @@ -2731,6 +2729,7 @@ int Objecter::_calc_target(op_target_t *t, bool any_change) &acting, &acting_primary); bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE); unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask); + pg_t prev_pgid(prev_seed, pgid.pool()); if (any_change && pg_interval_t::is_new_interval( t->acting_primary, acting_primary, @@ -2748,7 +2747,7 @@ int Objecter::_calc_target(op_target_t *t, bool any_change) pg_num, t->sort_bitwise, sort_bitwise, - pg_t(prev_seed, pgid.pool(), pgid.preferred()))) { + prev_pgid)) { force_resend = true; } @@ -2759,11 +2758,12 @@ int Objecter::_calc_target(op_target_t *t, bool any_change) t->paused = false; need_resend = true; } - if (t->pgid != pgid || is_pg_changed( t->acting_primary, t->acting, acting_primary, acting, t->used_replica || any_change) || + (con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) && + prev_pgid.is_split(t->pg_num, pg_num, nullptr)) || force_resend) { t->pgid = pgid; t->acting = acting; @@ -2774,9 +2774,14 @@ int Objecter::_calc_target(op_target_t *t, bool any_change) t->min_size = min_size; t->pg_num = pg_num; t->pg_num_mask = pi->get_pg_num_mask(); + osdmap->get_primary_shard( + pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()), + &t->actual_pgid); t->sort_bitwise = sort_bitwise; ldout(cct, 10) << __func__ << " " - << " pgid " << pgid << " acting " << acting << dendl; + << " raw pgid " << pgid << " -> actual " << t->actual_pgid + << " acting " << acting + << " primary " << acting_primary << dendl; t->used_replica = false; if (acting_primary == -1) { t->osd = -1; @@ -2830,7 +2835,7 @@ int Objecter::_calc_target(op_target_t *t, bool any_change) int Objecter::_map_session(op_target_t *target, OSDSession **s, shunique_lock& sul) { - _calc_target(target); + _calc_target(target, nullptr); return _get_session(target->osd, s, sul); } @@ -2939,8 +2944,7 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op, { // rwlock is locked unique - int r = _calc_target(&linger_op->target, - true); + int r = _calc_target(&linger_op->target, nullptr, true); if (r == RECALC_OP_TARGET_NEED_RESEND) { ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id << " pgid " << linger_op->target.pgid @@ -3033,9 +3037,9 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op) op->target.paused = false; op->stamp = ceph::mono_clock::now(); + hobject_t hobj = op->target.get_hobj(); MOSDOp *m = new MOSDOp(client_inc.read(), op->tid, - op->target.target_oid, op->target.target_oloc, - op->target.pgid, + hobj, op->target.actual_pgid, osdmap->get_epoch(), flags, op->features); @@ -3047,9 +3051,6 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op) m->set_mtime(op->mtime); m->set_retry_attempt(op->attempts++); - if (op->replay_version != eversion_t()) - m->set_version(op->replay_version); // we're replaying this op! - if (op->priority) m->set_priority(op->priority); else @@ -3072,21 +3073,25 @@ void Objecter::_send_op(Op *op, MOSDOp *m) // backoff? hobject_t hoid = op->target.get_hobj(); - auto q = op->session->backoffs.lower_bound(hoid); - if (q != op->session->backoffs.begin()) { - --q; - if (hoid >= q->second.end) { - ++q; + auto p = op->session->backoffs.find(op->target.actual_pgid); + if (p != op->session->backoffs.end()) { + auto q = p->second.lower_bound(hoid); + if (q != p->second.begin()) { + --q; + if (hoid >= q->second.end) { + ++q; + } } - } - if (q != op->session->backoffs.end()) { - ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin - << "," << q->second.end << ")" << dendl; - int r = cmp(hoid, q->second.begin); - if (r == 0 || (r > 0 && hoid < q->second.end)) { - ldout(cct, 10) << __func__ << " backoff on " << hoid << ", queuing " - << op << " tid " << op->tid << dendl; - return; + if (q != p->second.end()) { + ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin + << "," << q->second.end << ")" << dendl; + int r = cmp(hoid, q->second.begin); + if (r == 0 || (r > 0 && hoid < q->second.end)) { + ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid + << " id " << q->second.id << " on " << hoid + << ", queuing " << op << " tid " << op->tid << dendl; + return; + } } } @@ -3095,7 +3100,16 @@ void Objecter::_send_op(Op *op, MOSDOp *m) m = _prepare_osd_op(op); } - ldout(cct, 15) << "_send_op " << op->tid << " to osd." << op->session->osd + if (op->target.actual_pgid != m->get_spg()) { + ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from " + << m->get_spg() << " to " << op->target.actual_pgid + << ", updating and reencoding" << dendl; + m->set_spg(op->target.actual_pgid); + m->clear_payload(); // reencode + } + + ldout(cct, 15) << "_send_op " << op->tid << " to " + << op->target.actual_pgid << " on osd." << op->session->osd << dendl; ConnectionRef con = op->session->con; @@ -3223,8 +3237,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) ldout(cct, 7) << "handle_osd_op_reply " << tid << (m->is_ondisk() ? " ondisk" : (m->is_onnvram() ? " onnvram" : " ack")) - << " v " << m->get_replay_version() << " uv " - << m->get_user_version() + << " uv " << m->get_user_version() << " in " << m->get_pg() << " attempt " << m->get_retry_attempt() << dendl; @@ -3414,16 +3427,19 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m) case CEPH_OSD_BACKOFF_OP_BLOCK: { // register - OSDBackoff& b = s->backoffs[m->begin]; + OSDBackoff& b = s->backoffs[m->pgid][m->begin]; s->backoffs_by_id.insert(make_pair(m->id, &b)); + b.pgid = m->pgid; b.id = m->id; b.begin = m->begin; b.end = m->end; - // ack - Message *r = new MOSDBackoff(CEPH_OSD_BACKOFF_OP_ACK_BLOCK, - m->id, m->begin, m->end, - osdmap->get_epoch()); + // ack with original backoff's epoch so that the osd can discard this if + // there was a pg split. + Message *r = new MOSDBackoff(m->pgid, + m->map_epoch, + CEPH_OSD_BACKOFF_OP_ACK_BLOCK, + m->id, m->begin, m->end); // this priority must match the MOSDOps from _prepare_osd_op r->set_priority(cct->_conf->osd_client_op_priority); con->send_message(r); @@ -3433,31 +3449,43 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m) case CEPH_OSD_BACKOFF_OP_UNBLOCK: { auto p = s->backoffs_by_id.find(m->id); - while (p != s->backoffs_by_id.end() && - p->second->id == m->id) { + if (p != s->backoffs_by_id.end()) { OSDBackoff *b = p->second; if (b->begin != m->begin && b->end != m->end) { - lderr(cct) << __func__ << " got id " << m->id << " unblock on [" + lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id + << " unblock on [" << m->begin << "," << m->end << ") but backoff is [" << b->begin << "," << b->end << ")" << dendl; // hrmpf, unblock it anyway. } - ldout(cct, 10) << __func__ << " unblock backoff " << b->id + ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid + << " id " << b->id << " [" << b->begin << "," << b->end << ")" << dendl; - s->backoffs.erase(b->begin); - p = s->backoffs_by_id.erase(p); + auto spgp = s->backoffs.find(b->pgid); + assert(spgp != s->backoffs.end()); + spgp->second.erase(b->begin); + if (spgp->second.empty()) { + s->backoffs.erase(spgp); + } + s->backoffs_by_id.erase(p); // check for any ops to resend for (auto& q : s->ops) { - int r = q.second->target.contained_by(m->begin, m->end); - ldout(cct, 20) << __func__ << " contained_by " << r << " on " - << q.second->target.get_hobj() << dendl; - if (r) { - _send_op(q.second); + if (q.second->target.actual_pgid == m->pgid) { + int r = q.second->target.contained_by(m->begin, m->end); + ldout(cct, 20) << __func__ << " contained_by " << r << " on " + << q.second->target.get_hobj() << dendl; + if (r) { + _send_op(q.second); + } } } + } else { + lderr(cct) << __func__ << " " << m->pgid << " id " << m->id + << " unblock on [" + << m->begin << "," << m->end << ") but backoff dne" << dendl; } } break; @@ -4629,6 +4657,9 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul) c->map_check_error = 0; + // ignore overlays, just like we do with pg ops + c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY; + if (c->target_osd >= 0) { if (!osdmap->exists(c->target_osd)) { c->map_check_error = -ENOENT; @@ -4642,7 +4673,7 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul) } c->target.osd = c->target_osd; } else { - int ret = _calc_target(&(c->target), true); + int ret = _calc_target(&(c->target), nullptr, true); if (ret == RECALC_OP_TARGET_POOL_DNE) { c->map_check_error = -ENOENT; c->map_check_error_str = "pool dne"; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 8a0a45c3cd7..fc8dce0304c 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1193,7 +1193,8 @@ public: ///< explcit pg target, if any pg_t base_pgid; - pg_t pgid; ///< last pg we mapped to + pg_t pgid; ///< last (raw) pg we mapped to + spg_t actual_pgid; ///< last (actual) spg_t we mapped to unsigned pg_num = 0; ///< last pg_num we mapped to unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to vector<int> up; ///< set of up osds for last pg we mapped to @@ -1268,7 +1269,6 @@ public: uint64_t ontimeout; ceph_tid_t tid; - eversion_t replay_version; // for op replay int attempts; version_t *objver; @@ -1708,6 +1708,7 @@ public: // -- osd sessions -- struct OSDBackoff { + spg_t pgid; uint64_t id; hobject_t begin, end; }; @@ -1725,8 +1726,8 @@ public: map<ceph_tid_t,CommandOp*> command_ops; // backoffs - map<hobject_t,OSDBackoff> backoffs; - multimap<uint64_t,OSDBackoff*> backoffs_by_id; + map<spg_t,map<hobject_t,OSDBackoff>> backoffs; + map<uint64_t,OSDBackoff*> backoffs_by_id; int osd; int incarnation; @@ -1810,7 +1811,7 @@ public: bool _osdmap_has_pool_full() const; bool target_should_be_paused(op_target_t *op); - int _calc_target(op_target_t *t, + int _calc_target(op_target_t *t, Connection *con, bool any_change = false); int _map_session(op_target_t *op, OSDSession **s, shunique_lock& lc); @@ -2192,7 +2193,9 @@ public: Context *onack, epoch_t *reply_epoch, int *ctx_budget) { Op *o = new Op(object_t(), oloc, - op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, + op.ops, + flags | global_op_flags.read() | CEPH_OSD_FLAG_READ | + CEPH_OSD_FLAG_IGNORE_OVERLAY, onack, NULL); o->target.precalc_pgid = true; o->target.base_pgid = pg_t(hash, oloc.pool); diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index c0d43b9d46a..3616f88233e 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -558,8 +558,6 @@ MESSAGE(MOSDPGCreate) MESSAGE(MOSDPGInfo) #include "messages/MOSDPGLog.h" MESSAGE(MOSDPGLog) -#include "messages/MOSDPGMissing.h" -MESSAGE(MOSDPGMissing) #include "messages/MOSDPGNotify.h" MESSAGE(MOSDPGNotify) #include "messages/MOSDPGQuery.h" diff --git a/src/test/msgr/perf_msgr_client.cc b/src/test/msgr/perf_msgr_client.cc index f71682ce340..d47238c01a0 100644 --- a/src/test/msgr/perf_msgr_client.cc +++ b/src/test/msgr/perf_msgr_client.cc @@ -95,7 +95,10 @@ class MessengerClient { if (inflight > uint64_t(concurrent)) { cond.Wait(lock); } - MOSDOp *m = new MOSDOp(client_inc.read(), 0, oid, oloc, pgid, 0, 0, 0); + hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(), + oloc.nspace); + spg_t spgid(pgid); + MOSDOp *m = new MOSDOp(client_inc.read(), 0, hobj, spgid, 0, 0, 0); m->write(0, msg_len, data); inflight++; conn->send_message(m); |