summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2017-02-15 23:20:18 +0100
committerGitHub <noreply@github.com>2017-02-15 23:20:18 +0100
commiteb491a13dc2faee315bf894fc2043aacfb94d624 (patch)
tree35a331693017ca804002be0c64a9a4d88faf5c90
parentMerge pull request #13445 from liewegas/wip-rgw-thrash (diff)
parentosd: fix backoff vs reset race (diff)
downloadceph-eb491a13dc2faee315bf894fc2043aacfb94d624.tar.xz
ceph-eb491a13dc2faee315bf894fc2043aacfb94d624.zip
Merge pull request #13235 from liewegas/wip-pg-split-interval
osd: have clients resend ops on pg split Reviewed-by: Greg Farnum <gfarnum@redhat.com> Reviewed-by: Josh Durgin <jdurgin@redhat.com> Reviewed-by: Samuel Just <sjust@redhat.com>
-rw-r--r--qa/suites/rados/thrash/backoff/aggressive_peering.yaml5
-rw-r--r--qa/suites/rados/thrash/backoff/peering.yaml5
-rw-r--r--qa/suites/rados/thrash/backoff/peering_and_degraded.yaml6
-rw-r--r--qa/suites/rados/thrash/backoff/peering_and_recovery.yaml6
-rw-r--r--src/common/config_opts.h7
-rw-r--r--src/messages/MOSDBackoff.h41
-rw-r--r--src/messages/MOSDECSubOpRead.h15
-rw-r--r--src/messages/MOSDECSubOpReadReply.h15
-rw-r--r--src/messages/MOSDECSubOpWrite.h15
-rw-r--r--src/messages/MOSDECSubOpWriteReply.h15
-rw-r--r--src/messages/MOSDFastDispatchOp.h19
-rwxr-xr-xsrc/messages/MOSDOp.h189
-rw-r--r--src/messages/MOSDOpReply.h2
-rw-r--r--src/messages/MOSDPGBackfill.h20
-rw-r--r--src/messages/MOSDPGMissing.h57
-rw-r--r--src/messages/MOSDPGPull.h18
-rw-r--r--src/messages/MOSDPGPush.h18
-rw-r--r--src/messages/MOSDPGPushReply.h18
-rw-r--r--src/messages/MOSDPGScan.h17
-rw-r--r--src/messages/MOSDPGUpdateLogMissing.h18
-rw-r--r--src/messages/MOSDPGUpdateLogMissingReply.h16
-rw-r--r--src/messages/MOSDRepOp.h16
-rw-r--r--src/messages/MOSDRepOpReply.h16
-rw-r--r--src/messages/MOSDRepScrub.h15
-rw-r--r--src/messages/MOSDSubOp.h16
-rw-r--r--src/messages/MOSDSubOpReply.h34
-rw-r--r--src/mon/OSDMonitor.cc17
-rw-r--r--src/msg/Message.cc4
-rw-r--r--src/msg/Message.h2
-rw-r--r--src/os/filestore/JournalingObjectStore.h1
-rw-r--r--src/osd/OSD.cc76
-rw-r--r--src/osd/OSD.h3
-rw-r--r--src/osd/OSDMap.cc46
-rw-r--r--src/osd/OSDMap.h31
-rw-r--r--src/osd/OpRequest.h28
-rw-r--r--src/osd/PG.cc152
-rw-r--r--src/osd/PG.h3
-rw-r--r--src/osd/PGTransaction.h1
-rw-r--r--src/osd/PrimaryLogPG.cc61
-rw-r--r--src/osd/Session.cc109
-rw-r--r--src/osd/Session.h80
-rw-r--r--src/osd/osd_internal_types.h513
-rw-r--r--src/osd/osd_types.cc30
-rw-r--r--src/osd/osd_types.h542
-rw-r--r--src/osdc/Objecter.cc189
-rw-r--r--src/osdc/Objecter.h15
-rw-r--r--src/test/encoding/types.h2
-rw-r--r--src/test/msgr/perf_msgr_client.cc5
48 files changed, 1340 insertions, 1189 deletions
diff --git a/qa/suites/rados/thrash/backoff/aggressive_peering.yaml b/qa/suites/rados/thrash/backoff/aggressive_peering.yaml
deleted file mode 100644
index 4743d26eb63..00000000000
--- a/qa/suites/rados/thrash/backoff/aggressive_peering.yaml
+++ /dev/null
@@ -1,5 +0,0 @@
-overrides:
- ceph:
- conf:
- osd:
- osd peering aggressive backoff: true
diff --git a/qa/suites/rados/thrash/backoff/peering.yaml b/qa/suites/rados/thrash/backoff/peering.yaml
new file mode 100644
index 00000000000..66d06117ea2
--- /dev/null
+++ b/qa/suites/rados/thrash/backoff/peering.yaml
@@ -0,0 +1,5 @@
+overrides:
+ ceph:
+ conf:
+ osd:
+ osd backoff on peering: true
diff --git a/qa/suites/rados/thrash/backoff/peering_and_degraded.yaml b/qa/suites/rados/thrash/backoff/peering_and_degraded.yaml
new file mode 100644
index 00000000000..e6109906503
--- /dev/null
+++ b/qa/suites/rados/thrash/backoff/peering_and_degraded.yaml
@@ -0,0 +1,6 @@
+overrides:
+ ceph:
+ conf:
+ osd:
+ osd backoff on peering: true
+ osd backoff on degraded: true
diff --git a/qa/suites/rados/thrash/backoff/peering_and_recovery.yaml b/qa/suites/rados/thrash/backoff/peering_and_recovery.yaml
deleted file mode 100644
index 6521afef6b6..00000000000
--- a/qa/suites/rados/thrash/backoff/peering_and_recovery.yaml
+++ /dev/null
@@ -1,6 +0,0 @@
-overrides:
- ceph:
- conf:
- osd:
- osd peering aggressive backoff: true
- osd recovery aggressive backoff: true
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 85bb3c2702c..72fe1e6fa7d 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -843,12 +843,13 @@ OPTION(osd_command_max_records, OPT_INT, 256)
OPTION(osd_max_pg_blocked_by, OPT_U32, 16) // max peer osds to report that are blocking our progress
OPTION(osd_op_log_threshold, OPT_INT, 5) // how many op log messages to show in one go
OPTION(osd_verify_sparse_read_holes, OPT_BOOL, false) // read fiemap-reported holes and verify they are zeros
-OPTION(osd_peering_aggressive_backoff, OPT_BOOL, false) // issue aggressive client backoff during peering
-OPTION(osd_recovery_aggressive_backoff, OPT_BOOL, false) // issue aggressive client backoff during per-object recovery
+OPTION(osd_backoff_on_unfound, OPT_BOOL, true) // object unfound
+OPTION(osd_backoff_on_degraded, OPT_BOOL, false) // [mainly for debug?] object unreadable/writeable
+OPTION(osd_backoff_on_down, OPT_BOOL, true) // pg in down/incomplete state
+OPTION(osd_backoff_on_peering, OPT_BOOL, false) // [debug] pg peering
OPTION(osd_debug_crash_on_ignored_backoff, OPT_BOOL, false) // crash osd if client ignores a backoff; useful for debugging
OPTION(osd_debug_drop_ping_probability, OPT_DOUBLE, 0)
OPTION(osd_debug_drop_ping_duration, OPT_INT, 0)
-OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op
OPTION(osd_debug_op_order, OPT_BOOL, false)
OPTION(osd_debug_verify_missing_on_start, OPT_BOOL, false)
OPTION(osd_debug_scrub_chance_rewrite_digest, OPT_U64, 0)
diff --git a/src/messages/MOSDBackoff.h b/src/messages/MOSDBackoff.h
index e501269b7ef..181aaf181b9 100644
--- a/src/messages/MOSDBackoff.h
+++ b/src/messages/MOSDBackoff.h
@@ -16,50 +16,65 @@
#ifndef CEPH_MOSDBACKOFF_H
#define CEPH_MOSDBACKOFF_H
-#include "msg/Message.h"
+#include "MOSDFastDispatchOp.h"
#include "osd/osd_types.h"
-class MOSDBackoff : public Message {
+class MOSDBackoff : public MOSDFastDispatchOp {
public:
+ const int HEAD_VERSION = 1;
+ const int COMPAT_VERSION = 1;
+
+ spg_t pgid;
+ epoch_t map_epoch = 0;
uint8_t op = 0; ///< CEPH_OSD_BACKOFF_OP_*
uint64_t id = 0; ///< unique id within this session
hobject_t begin, end; ///< [) range to block, unless ==, block single obj
- epoch_t osd_epoch = 0;
- MOSDBackoff() : Message(CEPH_MSG_OSD_BACKOFF) {}
- MOSDBackoff(uint8_t op_, uint64_t id_,
- hobject_t begin_, hobject_t end_, epoch_t ep)
- : Message(CEPH_MSG_OSD_BACKOFF),
+ spg_t get_spg() const override {
+ return pgid;
+ }
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+
+ MOSDBackoff()
+ : MOSDFastDispatchOp(CEPH_MSG_OSD_BACKOFF, HEAD_VERSION, COMPAT_VERSION) {}
+ MOSDBackoff(spg_t pgid_, epoch_t ep, uint8_t op_, uint64_t id_,
+ hobject_t begin_, hobject_t end_)
+ : MOSDFastDispatchOp(CEPH_MSG_OSD_BACKOFF, HEAD_VERSION, COMPAT_VERSION),
+ pgid(pgid_),
+ map_epoch(ep),
op(op_),
id(id_),
begin(begin_),
- end(end_),
- osd_epoch(ep) { }
+ end(end_) { }
void encode_payload(uint64_t features) override {
+ ::encode(pgid, payload);
+ ::encode(map_epoch, payload);
::encode(op, payload);
::encode(id, payload);
::encode(begin, payload);
::encode(end, payload);
- ::encode(osd_epoch, payload);
}
void decode_payload() override {
auto p = payload.begin();
+ ::decode(pgid, p);
+ ::decode(map_epoch, p);
::decode(op, p);
::decode(id, p);
::decode(begin, p);
::decode(end, p);
- ::decode(osd_epoch, p);
}
const char *get_type_name() const override { return "osd_backoff"; }
void print(ostream& out) const override {
- out << "osd_backoff(" << ceph_osd_backoff_op_name(op)
+ out << "osd_backoff(" << pgid << " " << ceph_osd_backoff_op_name(op)
<< " id " << id
<< " [" << begin << "," << end << ")"
- << " epoch " << osd_epoch << ")";
+ << " e" << map_epoch << ")";
}
};
diff --git a/src/messages/MOSDECSubOpRead.h b/src/messages/MOSDECSubOpRead.h
index 3e315ef26a8..5199d32a27e 100644
--- a/src/messages/MOSDECSubOpRead.h
+++ b/src/messages/MOSDECSubOpRead.h
@@ -15,11 +15,10 @@
#ifndef MOSDECSUBOPREAD_H
#define MOSDECSUBOPREAD_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
#include "osd/ECMsgTypes.h"
-class MOSDECSubOpRead : public Message {
+class MOSDECSubOpRead : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
@@ -31,9 +30,15 @@ public:
int get_cost() const {
return 0;
}
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
- MOSDECSubOpRead() :
- Message(MSG_OSD_EC_READ, HEAD_VERSION, COMPAT_VERSION)
+ MOSDECSubOpRead()
+ : MOSDFastDispatchOp(MSG_OSD_EC_READ, HEAD_VERSION, COMPAT_VERSION)
{}
virtual void decode_payload() {
diff --git a/src/messages/MOSDECSubOpReadReply.h b/src/messages/MOSDECSubOpReadReply.h
index 28e2cf7e929..7d6512c8dd5 100644
--- a/src/messages/MOSDECSubOpReadReply.h
+++ b/src/messages/MOSDECSubOpReadReply.h
@@ -15,11 +15,10 @@
#ifndef MOSDECSUBOPREADREPLY_H
#define MOSDECSUBOPREADREPLY_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
#include "osd/ECMsgTypes.h"
-class MOSDECSubOpReadReply : public Message {
+class MOSDECSubOpReadReply : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 1;
static const int COMPAT_VERSION = 1;
@@ -31,9 +30,15 @@ public:
int get_cost() const {
return 0;
}
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
- MOSDECSubOpReadReply() :
- Message(MSG_OSD_EC_READ_REPLY, HEAD_VERSION, COMPAT_VERSION)
+ MOSDECSubOpReadReply()
+ : MOSDFastDispatchOp(MSG_OSD_EC_READ_REPLY, HEAD_VERSION, COMPAT_VERSION)
{}
virtual void decode_payload() {
diff --git a/src/messages/MOSDECSubOpWrite.h b/src/messages/MOSDECSubOpWrite.h
index b3a8e3cdb1f..24b9a4e4120 100644
--- a/src/messages/MOSDECSubOpWrite.h
+++ b/src/messages/MOSDECSubOpWrite.h
@@ -15,11 +15,10 @@
#ifndef MOSDECSUBOPWRITE_H
#define MOSDECSUBOPWRITE_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
#include "osd/ECMsgTypes.h"
-class MOSDECSubOpWrite : public Message {
+class MOSDECSubOpWrite : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 1;
static const int COMPAT_VERSION = 1;
@@ -31,12 +30,18 @@ public:
int get_cost() const {
return 0;
}
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
MOSDECSubOpWrite()
- : Message(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION)
+ : MOSDFastDispatchOp(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION)
{}
MOSDECSubOpWrite(ECSubWrite &in_op)
- : Message(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION) {
+ : MOSDFastDispatchOp(MSG_OSD_EC_WRITE, HEAD_VERSION, COMPAT_VERSION) {
op.claim(in_op);
}
diff --git a/src/messages/MOSDECSubOpWriteReply.h b/src/messages/MOSDECSubOpWriteReply.h
index c2edfb38c3e..4a811cdf7a5 100644
--- a/src/messages/MOSDECSubOpWriteReply.h
+++ b/src/messages/MOSDECSubOpWriteReply.h
@@ -15,11 +15,10 @@
#ifndef MOSDECSUBOPWRITEREPLY_H
#define MOSDECSUBOPWRITEREPLY_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
#include "osd/ECMsgTypes.h"
-class MOSDECSubOpWriteReply : public Message {
+class MOSDECSubOpWriteReply : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 1;
static const int COMPAT_VERSION = 1;
@@ -31,9 +30,15 @@ public:
int get_cost() const {
return 0;
}
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
- MOSDECSubOpWriteReply() :
- Message(MSG_OSD_EC_WRITE_REPLY, HEAD_VERSION, COMPAT_VERSION)
+ MOSDECSubOpWriteReply()
+ : MOSDFastDispatchOp(MSG_OSD_EC_WRITE_REPLY, HEAD_VERSION, COMPAT_VERSION)
{}
virtual void decode_payload() {
diff --git a/src/messages/MOSDFastDispatchOp.h b/src/messages/MOSDFastDispatchOp.h
new file mode 100644
index 00000000000..6babd16f796
--- /dev/null
+++ b/src/messages/MOSDFastDispatchOp.h
@@ -0,0 +1,19 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_MOSDFASTDISPATCHOP_H
+#define CEPH_MOSDFASTDISPATCHOP_H
+
+#include "msg/Message.h"
+#include "osd/osd_types.h"
+
+class MOSDFastDispatchOp : public Message {
+public:
+ virtual epoch_t get_map_epoch() const = 0;
+ virtual spg_t get_spg() const = 0;
+
+ MOSDFastDispatchOp(int t, int version, int compat_version)
+ : Message(t, version, compat_version) {}
+};
+
+#endif
diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h
index 0d2a03f271a..5c2db7b011b 100755
--- a/src/messages/MOSDOp.h
+++ b/src/messages/MOSDOp.h
@@ -16,9 +16,9 @@
#ifndef CEPH_MOSDOP_H
#define CEPH_MOSDOP_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
#include "include/ceph_features.h"
+#include "common/hobject.h"
#include <atomic>
/*
@@ -31,9 +31,9 @@
class OSD;
-class MOSDOp : public Message {
+class MOSDOp : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 7;
+ static const int HEAD_VERSION = 8;
static const int COMPAT_VERSION = 3;
private:
@@ -41,12 +41,10 @@ private:
__u32 osdmap_epoch;
__u32 flags;
utime_t mtime;
- eversion_t reassert_version;
int32_t retry_attempt; // 0 is first attempt. -1 if we don't know.
- object_t oid;
- object_locator_t oloc;
- pg_t pgid;
+ hobject_t hobj;
+ spg_t pgid;
bufferlist::iterator p;
// Decoding flags. Decoding is only needed for messages catched by pipe reader.
// Transition from true -> false without locks being held
@@ -57,8 +55,6 @@ private:
public:
vector<OSDOp> ops;
private:
-
- snapid_t snapid;
snapid_t snap_seq;
vector<snapid_t> snaps;
@@ -70,7 +66,9 @@ public:
friend class MOSDOpReply;
ceph_tid_t get_client_tid() { return header.tid; }
- void set_snapid(const snapid_t& s) { snapid = s; }
+ void set_snapid(const snapid_t& s) {
+ hobj.snap = s;
+ }
void set_snaps(const vector<snapid_t>& i) {
snaps = i;
}
@@ -78,13 +76,24 @@ public:
void set_reqid(const osd_reqid_t rid) {
reqid = rid;
}
+ void set_spg(spg_t p) {
+ pgid = p;
+ }
// Fields decoded in partial decoding
- const pg_t& get_pg() const {
+ pg_t get_pg() const {
+ assert(!partial_decode_needed);
+ return pgid.pgid;
+ }
+ spg_t get_spg() const override {
assert(!partial_decode_needed);
return pgid;
}
- epoch_t get_map_epoch() const {
+ pg_t get_raw_pg() const {
+ assert(!partial_decode_needed);
+ return pg_t(hobj.get_hash(), pgid.pgid.pool());
+ }
+ epoch_t get_map_epoch() const override {
assert(!partial_decode_needed);
return osdmap_epoch;
}
@@ -92,10 +101,6 @@ public:
assert(!partial_decode_needed);
return flags;
}
- const eversion_t& get_version() const {
- assert(!partial_decode_needed);
- return reassert_version;
- }
osd_reqid_t get_reqid() const {
assert(!partial_decode_needed);
if (reqid.name != entity_name_t() || reqid.tid != 0) {
@@ -118,17 +123,23 @@ public:
assert(!final_decode_needed);
return mtime;
}
- const object_locator_t& get_object_locator() const {
+ object_locator_t get_object_locator() const {
assert(!final_decode_needed);
- return oloc;
+ if (hobj.oid.name.empty())
+ return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
+ else
+ return object_locator_t(hobj);
}
object_t& get_oid() {
assert(!final_decode_needed);
- return oid;
+ return hobj.oid;
+ }
+ const hobject_t &get_hobj() const {
+ return hobj;
}
- const snapid_t& get_snapid() {
+ snapid_t get_snapid() {
assert(!final_decode_needed);
- return snapid;
+ return hobj.snap;
}
const snapid_t& get_snap_seq() const {
assert(!final_decode_needed);
@@ -156,17 +167,17 @@ public:
}
MOSDOp()
- : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
partial_decode_needed(true),
final_decode_needed(true) { }
- MOSDOp(int inc, long tid,
- object_t& _oid, object_locator_t& _oloc, pg_t& _pgid,
+ MOSDOp(int inc, long tid, const hobject_t& ho, spg_t& _pgid,
epoch_t _osdmap_epoch,
int _flags, uint64_t feat)
- : Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
client_inc(inc),
osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
- oid(_oid), oloc(_oloc), pgid(_pgid),
+ hobj(ho),
+ pgid(_pgid),
partial_decode_needed(false),
final_decode_needed(false),
features(feat) {
@@ -180,7 +191,6 @@ private:
~MOSDOp() {}
public:
- void set_version(eversion_t v) { reassert_version = v; }
void set_mtime(utime_t mt) { mtime = mt; }
void set_mtime(ceph::real_time mt) {
mtime = ceph::real_clock::to_timespec(mt);
@@ -272,17 +282,17 @@ struct ceph_osd_request_head {
::encode(client_inc, payload);
__u32 su = 0;
- ::encode(pgid, payload);
+ ::encode(get_raw_pg(), payload);
::encode(su, payload);
::encode(osdmap_epoch, payload);
::encode(flags, payload);
::encode(mtime, payload);
- ::encode(reassert_version, payload);
+ ::encode(eversion_t(), payload); // reassert_version
- __u32 oid_len = oid.name.length();
+ __u32 oid_len = hobj.oid.name.length();
::encode(oid_len, payload);
- ::encode(snapid, payload);
+ ::encode(hobj.snap, payload);
::encode(snap_seq, payload);
__u32 num_snaps = snaps.size();
::encode(num_snaps, payload);
@@ -293,7 +303,7 @@ struct ceph_osd_request_head {
for (unsigned i = 0; i < ops.size(); i++)
::encode(ops[i].op, payload);
- ::encode_nohead(oid.name, payload);
+ ::encode_nohead(hobj.oid.name, payload);
::encode_nohead(snaps, payload);
} else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
header.version = 6;
@@ -301,18 +311,18 @@ struct ceph_osd_request_head {
::encode(osdmap_epoch, payload);
::encode(flags, payload);
::encode(mtime, payload);
- ::encode(reassert_version, payload);
- ::encode(oloc, payload);
- ::encode(pgid, payload);
+ ::encode(eversion_t(), payload); // reassert_version
+ ::encode(get_object_locator(), payload);
+ ::encode(get_raw_pg(), payload);
- ::encode(oid, payload);
+ ::encode(hobj.oid, payload);
__u16 num_ops = ops.size();
::encode(num_ops, payload);
for (unsigned i = 0; i < ops.size(); i++)
::encode(ops[i].op, payload);
- ::encode(snapid, payload);
+ ::encode(hobj.snap, payload);
::encode(snap_seq, payload);
::encode(snaps, payload);
@@ -325,25 +335,50 @@ struct ceph_osd_request_head {
// encoding or else we'll confuse older peers.
::encode(osd_reqid_t(), payload);
}
+ } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) {
+ // reordered, v7 message encoding
+ header.version = 7;
+ ::encode(get_raw_pg(), payload);
+ ::encode(osdmap_epoch, payload);
+ ::encode(flags, payload);
+ ::encode(eversion_t(), payload); // reassert_version
+ ::encode(reqid, payload);
+ ::encode(client_inc, payload);
+ ::encode(mtime, payload);
+ ::encode(get_object_locator(), payload);
+ ::encode(hobj.oid, payload);
+
+ __u16 num_ops = ops.size();
+ ::encode(num_ops, payload);
+ for (unsigned i = 0; i < ops.size(); i++)
+ ::encode(ops[i].op, payload);
+
+ ::encode(hobj.snap, payload);
+ ::encode(snap_seq, payload);
+ ::encode(snaps, payload);
+
+ ::encode(retry_attempt, payload);
+ ::encode(features, payload);
} else {
- // new, reordered, v7 message encoding
+ // latest v8 encoding with hobject_t hash separate from pgid, no
+ // reassert version
header.version = HEAD_VERSION;
::encode(pgid, payload);
+ ::encode(hobj.get_hash(), payload);
::encode(osdmap_epoch, payload);
::encode(flags, payload);
- ::encode(reassert_version, payload);
::encode(reqid, payload);
::encode(client_inc, payload);
::encode(mtime, payload);
- ::encode(oloc, payload);
- ::encode(oid, payload);
+ ::encode(get_object_locator(), payload);
+ ::encode(hobj.oid, payload);
__u16 num_ops = ops.size();
::encode(num_ops, payload);
for (unsigned i = 0; i < ops.size(); i++)
::encode(ops[i].op, payload);
- ::encode(snapid, payload);
+ ::encode(hobj.snap, payload);
::encode(snap_seq, payload);
::encode(snaps, payload);
@@ -358,31 +393,41 @@ struct ceph_osd_request_head {
// Always keep here the newest version of decoding order/rule
if (header.version == HEAD_VERSION) {
- ::decode(pgid, p);
- ::decode(osdmap_epoch, p);
- ::decode(flags, p);
- ::decode(reassert_version, p);
- ::decode(reqid, p);
+ ::decode(pgid, p); // actual pgid
+ uint32_t hash;
+ ::decode(hash, p); // raw hash value
+ hobj.set_hash(hash);
+ ::decode(osdmap_epoch, p);
+ ::decode(flags, p);
+ ::decode(reqid, p);
+ } else if (header.version == 7) {
+ ::decode(pgid.pgid, p); // raw pgid
+ hobj.set_hash(pgid.pgid.ps());
+ ::decode(osdmap_epoch, p);
+ ::decode(flags, p);
+ eversion_t reassert_version;
+ ::decode(reassert_version, p);
+ ::decode(reqid, p);
} else if (header.version < 2) {
// old decode
::decode(client_inc, p);
old_pg_t opgid;
::decode_raw(opgid, p);
- pgid = opgid;
+ pgid.pgid = opgid;
__u32 su;
::decode(su, p);
- oloc.pool = pgid.pool();
::decode(osdmap_epoch, p);
::decode(flags, p);
::decode(mtime, p);
+ eversion_t reassert_version;
::decode(reassert_version, p);
__u32 oid_len;
::decode(oid_len, p);
- ::decode(snapid, p);
+ ::decode(hobj.snap, p);
::decode(snap_seq, p);
__u32 num_snaps;
::decode(num_snaps, p);
@@ -394,13 +439,15 @@ struct ceph_osd_request_head {
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
- decode_nohead(oid_len, oid.name, p);
+ decode_nohead(oid_len, hobj.oid.name, p);
decode_nohead(num_snaps, snaps, p);
// recalculate pgid hash value
- pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
- oid.name.c_str(),
- oid.name.length()));
+ pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
+ hobj.oid.name.c_str(),
+ hobj.oid.name.length()));
+ hobj.pool = pgid.pgid.pool();
+ hobj.set_hash(pgid.pgid.ps());
retry_attempt = -1;
features = 0;
@@ -417,19 +464,21 @@ struct ceph_osd_request_head {
::decode(osdmap_epoch, p);
::decode(flags, p);
::decode(mtime, p);
+ eversion_t reassert_version;
::decode(reassert_version, p);
+ object_locator_t oloc;
::decode(oloc, p);
if (header.version < 3) {
old_pg_t opgid;
::decode_raw(opgid, p);
- pgid = opgid;
+ pgid.pgid = opgid;
} else {
::decode(pgid, p);
}
- ::decode(oid, p);
+ ::decode(hobj.oid, p);
//::decode(ops, p);
__u16 num_ops;
@@ -438,7 +487,7 @@ struct ceph_osd_request_head {
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
- ::decode(snapid, p);
+ ::decode(hobj.snap, p);
::decode(snap_seq, p);
::decode(snaps, p);
@@ -457,6 +506,11 @@ struct ceph_osd_request_head {
else
reqid = osd_reqid_t();
+ hobj.pool = pgid.pgid.pool();
+ hobj.set_key(oloc.key);
+ hobj.nspace = oloc.nspace;
+ hobj.set_hash(pgid.pgid.ps());
+
OSDOp::split_osd_op_vector_in_data(ops, data);
// we did the full decode
@@ -478,8 +532,9 @@ struct ceph_osd_request_head {
::decode(client_inc, p);
::decode(mtime, p);
+ object_locator_t oloc;
::decode(oloc, p);
- ::decode(oid, p);
+ ::decode(hobj.oid, p);
__u16 num_ops;
::decode(num_ops, p);
@@ -487,7 +542,7 @@ struct ceph_osd_request_head {
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
- ::decode(snapid, p);
+ ::decode(hobj.snap, p);
::decode(snap_seq, p);
::decode(snaps, p);
@@ -495,6 +550,10 @@ struct ceph_osd_request_head {
::decode(features, p);
+ hobj.pool = pgid.pgid.pool();
+ hobj.set_key(oloc.key);
+ hobj.nspace = oloc.nspace;
+
OSDOp::split_osd_op_vector_in_data(ops, data);
final_decode_needed = false;
@@ -513,21 +572,15 @@ struct ceph_osd_request_head {
out << pgid;
if (!final_decode_needed) {
out << ' ';
- if (!oloc.nspace.empty())
- out << oloc.nspace << "/";
- out << oid
+ out << hobj
<< " " << ops
<< " snapc " << get_snap_seq() << "=" << snaps;
- if (oloc.key.size())
- out << " " << oloc;
if (is_retry_attempt())
out << " RETRY=" << get_retry_attempt();
} else {
- out << " (undecoded)";
+ out << " " << get_raw_pg() << " (undecoded)";
}
out << " " << ceph_osd_flag_string(get_flags());
- if (reassert_version != eversion_t())
- out << " reassert_version=" << reassert_version;
out << " e" << osdmap_epoch;
}
out << ")";
diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h
index 1ea3b92ad9b..3d51d1c2116 100644
--- a/src/messages/MOSDOpReply.h
+++ b/src/messages/MOSDOpReply.h
@@ -130,7 +130,7 @@ public:
}
MOSDOpReply(MOSDOp *req, int r, epoch_t e, int acktype, bool ignore_out_data)
: Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION),
- oid(req->oid), pgid(req->pgid), ops(req->ops) {
+ oid(req->hobj.oid), pgid(req->pgid.pgid), ops(req->ops) {
set_tid(req->get_tid());
result = r;
diff --git a/src/messages/MOSDPGBackfill.h b/src/messages/MOSDPGBackfill.h
index d4b65e82c29..3b099df5aad 100644
--- a/src/messages/MOSDPGBackfill.h
+++ b/src/messages/MOSDPGBackfill.h
@@ -15,10 +15,9 @@
#ifndef CEPH_MOSDPGBACKFILL_H
#define CEPH_MOSDPGBACKFILL_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
-class MOSDPGBackfill : public Message {
+class MOSDPGBackfill : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 3;
static const int COMPAT_VERSION = 1;
public:
@@ -43,6 +42,13 @@ public:
bool compat_stat_sum;
pg_stat_t stats;
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(op, p);
@@ -85,11 +91,11 @@ public:
::encode(pgid.shard, payload);
}
- MOSDPGBackfill() :
- Message(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION),
- compat_stat_sum(false) {}
+ MOSDPGBackfill()
+ : MOSDFastDispatchOp(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION),
+ compat_stat_sum(false) {}
MOSDPGBackfill(__u32 o, epoch_t e, epoch_t qe, spg_t p)
- : Message(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_PG_BACKFILL, HEAD_VERSION, COMPAT_VERSION),
op(o),
map_epoch(e), query_epoch(e),
pgid(p),
diff --git a/src/messages/MOSDPGMissing.h b/src/messages/MOSDPGMissing.h
deleted file mode 100644
index b931b36353f..00000000000
--- a/src/messages/MOSDPGMissing.h
+++ /dev/null
@@ -1,57 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2010 Dreamhost
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-
-#ifndef CEPH_MOSDPGMISSING_H
-#define CEPH_MOSDPGMISSING_H
-
-#include "msg/Message.h"
-
-class MOSDPGMissing : public Message {
- epoch_t epoch;
-
-public:
- pg_info_t info;
- pg_missing_t missing;
-
- epoch_t get_epoch() { return epoch; }
-
- MOSDPGMissing() : Message(MSG_OSD_PG_MISSING) {}
- MOSDPGMissing(version_t mv, const pg_info_t &info_,
- const pg_missing_t &missing_)
- : Message(MSG_OSD_PG_MISSING), epoch(mv), info(info_),
- missing(missing_) { }
-private:
- ~MOSDPGMissing() {}
-
-public:
- const char *get_type_name() const { return "pg_missing"; }
- void print(ostream& out) const {
- out << "pg_missing(" << info.pgid << " e" << epoch << ")";
- }
-
- void encode_payload(uint64_t features) {
- ::encode(epoch, payload);
- ::encode(info, payload);
- ::encode(missing, payload);
- }
- void decode_payload() {
- bufferlist::iterator p = payload.begin();
- ::decode(epoch, p);
- ::decode(info, p);
- missing.decode(p, info.pgid.pool());
- }
-};
-
-#endif
diff --git a/src/messages/MOSDPGPull.h b/src/messages/MOSDPGPull.h
index a6b748c8df6..c274e06ef97 100644
--- a/src/messages/MOSDPGPull.h
+++ b/src/messages/MOSDPGPull.h
@@ -15,10 +15,9 @@
#ifndef MOSDPGPULL_H
#define MOSDPGPULL_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
-class MOSDPGPull : public Message {
+class MOSDPGPull : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
@@ -30,9 +29,16 @@ public:
vector<PullOp> pulls;
uint64_t cost;
- MOSDPGPull() :
- Message(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION),
- cost(0)
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
+ MOSDPGPull()
+ : MOSDFastDispatchOp(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION),
+ cost(0)
{}
void compute_cost(CephContext *cct) {
diff --git a/src/messages/MOSDPGPush.h b/src/messages/MOSDPGPush.h
index 9bf0470ed6a..b463b8ba00b 100644
--- a/src/messages/MOSDPGPush.h
+++ b/src/messages/MOSDPGPush.h
@@ -15,10 +15,9 @@
#ifndef MOSDPGPUSH_H
#define MOSDPGPUSH_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
-class MOSDPGPush : public Message {
+class MOSDPGPush : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
@@ -46,13 +45,20 @@ public:
return cost;
}
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
void set_cost(uint64_t c) {
cost = c;
}
- MOSDPGPush() :
- Message(MSG_OSD_PG_PUSH, HEAD_VERSION, COMPAT_VERSION),
- cost(0)
+ MOSDPGPush()
+ : MOSDFastDispatchOp(MSG_OSD_PG_PUSH, HEAD_VERSION, COMPAT_VERSION),
+ cost(0)
{}
virtual void decode_payload() {
diff --git a/src/messages/MOSDPGPushReply.h b/src/messages/MOSDPGPushReply.h
index 793709e1650..52e3ddd8a58 100644
--- a/src/messages/MOSDPGPushReply.h
+++ b/src/messages/MOSDPGPushReply.h
@@ -15,10 +15,9 @@
#ifndef MOSDPGPUSHREPLY_H
#define MOSDPGPUSHREPLY_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
-class MOSDPGPushReply : public Message {
+class MOSDPGPushReply : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
@@ -29,9 +28,16 @@ public:
vector<PushReplyOp> replies;
uint64_t cost;
- MOSDPGPushReply() :
- Message(MSG_OSD_PG_PUSH_REPLY, HEAD_VERSION, COMPAT_VERSION),
- cost(0)
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
+ MOSDPGPushReply()
+ : MOSDFastDispatchOp(MSG_OSD_PG_PUSH_REPLY, HEAD_VERSION, COMPAT_VERSION),
+ cost(0)
{}
void compute_cost(CephContext *cct) {
diff --git a/src/messages/MOSDPGScan.h b/src/messages/MOSDPGScan.h
index f5fe18e523b..9f66bd0e480 100644
--- a/src/messages/MOSDPGScan.h
+++ b/src/messages/MOSDPGScan.h
@@ -15,10 +15,9 @@
#ifndef CEPH_MOSDPGSCAN_H
#define CEPH_MOSDPGSCAN_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
-class MOSDPGScan : public Message {
+class MOSDPGScan : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
@@ -42,6 +41,13 @@ public:
spg_t pgid;
hobject_t begin, end;
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(op, p);
@@ -79,10 +85,11 @@ public:
::encode(pgid.shard, payload);
}
- MOSDPGScan() : Message(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION) {}
+ MOSDPGScan()
+ : MOSDFastDispatchOp(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION) {}
MOSDPGScan(__u32 o, pg_shard_t from,
epoch_t e, epoch_t qe, spg_t p, hobject_t be, hobject_t en)
- : Message(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_PG_SCAN, HEAD_VERSION, COMPAT_VERSION),
op(o),
map_epoch(e), query_epoch(e),
from(from),
diff --git a/src/messages/MOSDPGUpdateLogMissing.h b/src/messages/MOSDPGUpdateLogMissing.h
index 55e50d6a462..b6a597705d5 100644
--- a/src/messages/MOSDPGUpdateLogMissing.h
+++ b/src/messages/MOSDPGUpdateLogMissing.h
@@ -16,9 +16,9 @@
#ifndef CEPH_MOSDPGUPDATELOGMISSING_H
#define CEPH_MOSDPGUPDATELOGMISSING_H
-#include "msg/Message.h"
+#include "MOSDFastDispatchOp.h"
-class MOSDPGUpdateLogMissing : public Message {
+class MOSDPGUpdateLogMissing : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 1;
static const int COMPAT_VERSION = 1;
@@ -35,16 +35,24 @@ public:
spg_t get_pgid() const { return pgid; }
epoch_t get_query_epoch() const { return map_epoch; }
ceph_tid_t get_tid() const { return rep_tid; }
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
- MOSDPGUpdateLogMissing() :
- Message(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, COMPAT_VERSION) { }
+ MOSDPGUpdateLogMissing()
+ : MOSDFastDispatchOp(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION,
+ COMPAT_VERSION) { }
MOSDPGUpdateLogMissing(
const mempool::osd::list<pg_log_entry_t> &entries,
spg_t pgid,
shard_id_t from,
epoch_t epoch,
ceph_tid_t rep_tid)
- : Message(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION,
+ COMPAT_VERSION),
map_epoch(epoch),
pgid(pgid),
from(from),
diff --git a/src/messages/MOSDPGUpdateLogMissingReply.h b/src/messages/MOSDPGUpdateLogMissingReply.h
index d6ff1407147..34e8d9e6c1d 100644
--- a/src/messages/MOSDPGUpdateLogMissingReply.h
+++ b/src/messages/MOSDPGUpdateLogMissingReply.h
@@ -16,9 +16,9 @@
#ifndef CEPH_MOSDPGUPDATELOGMISSINGREPLY_H
#define CEPH_MOSDPGUPDATELOGMISSINGREPLY_H
-#include "msg/Message.h"
+#include "MOSDFastDispatchOp.h"
-class MOSDPGUpdateLogMissingReply : public Message {
+class MOSDPGUpdateLogMissingReply : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 1;
static const int COMPAT_VERSION = 1;
@@ -37,9 +37,15 @@ public:
pg_shard_t get_from() const {
return pg_shard_t(get_source().num(), from);
}
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
- MOSDPGUpdateLogMissingReply() :
- Message(
+ MOSDPGUpdateLogMissingReply()
+ : MOSDFastDispatchOp(
MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY,
HEAD_VERSION,
COMPAT_VERSION)
@@ -49,7 +55,7 @@ public:
shard_id_t from,
epoch_t epoch,
ceph_tid_t rep_tid)
- : Message(
+ : MOSDFastDispatchOp(
MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY,
HEAD_VERSION,
COMPAT_VERSION),
diff --git a/src/messages/MOSDRepOp.h b/src/messages/MOSDRepOp.h
index e6acca66309..de5e4755697 100644
--- a/src/messages/MOSDRepOp.h
+++ b/src/messages/MOSDRepOp.h
@@ -16,14 +16,13 @@
#ifndef CEPH_MOSDREPOP_H
#define CEPH_MOSDREPOP_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
/*
* OSD sub op - for internal ops on pobjects between primary and replicas(/stripes/whatever)
*/
-class MOSDRepOp : public Message {
+class MOSDRepOp : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 1;
static const int COMPAT_VERSION = 1;
@@ -64,6 +63,13 @@ public:
/// non-empty if this transaction involves a hit_set history update
boost::optional<pg_hit_set_history_t> updated_hit_set_history;
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
int get_cost() const {
return data.length();
}
@@ -116,13 +122,13 @@ public:
}
MOSDRepOp()
- : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
map_epoch(0),
final_decode_needed(true), acks_wanted (0) {}
MOSDRepOp(osd_reqid_t r, pg_shard_t from,
spg_t p, const hobject_t& po, int aw,
epoch_t mape, ceph_tid_t rtid, eversion_t v)
- : Message(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_REPOP, HEAD_VERSION, COMPAT_VERSION),
map_epoch(mape),
reqid(r),
pgid(p),
diff --git a/src/messages/MOSDRepOpReply.h b/src/messages/MOSDRepOpReply.h
index 1632ffbf4b3..67c2d76bf73 100644
--- a/src/messages/MOSDRepOpReply.h
+++ b/src/messages/MOSDRepOpReply.h
@@ -16,7 +16,7 @@
#ifndef CEPH_MOSDREPOPREPLY_H
#define CEPH_MOSDREPOPREPLY_H
-#include "msg/Message.h"
+#include "MOSDFastDispatchOp.h"
#include "os/ObjectStore.h"
@@ -28,7 +28,7 @@
*
*/
-class MOSDRepOpReply : public Message {
+class MOSDRepOpReply : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 1;
static const int COMPAT_VERSION = 1;
public:
@@ -50,6 +50,13 @@ public:
// Decoding flags. Decoding is only needed for messages catched by pipe reader.
bool final_decode_needed;
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
virtual void decode_payload() {
p = payload.begin();
::decode(map_epoch, p);
@@ -93,7 +100,7 @@ public:
public:
MOSDRepOpReply(
MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
- Message(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
+ MOSDFastDispatchOp(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
map_epoch(e),
reqid(req->reqid),
from(from),
@@ -104,7 +111,8 @@ public:
set_tid(req->get_tid());
}
MOSDRepOpReply()
- : Message(MSG_OSD_REPOPREPLY), map_epoch(0),
+ : MOSDFastDispatchOp(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
+ map_epoch(0),
ack_type(0), result(0),
final_decode_needed(true) {}
private:
diff --git a/src/messages/MOSDRepScrub.h b/src/messages/MOSDRepScrub.h
index fb4080e7046..07e04543b26 100644
--- a/src/messages/MOSDRepScrub.h
+++ b/src/messages/MOSDRepScrub.h
@@ -16,13 +16,13 @@
#ifndef CEPH_MOSDREPSCRUB_H
#define CEPH_MOSDREPSCRUB_H
-#include "msg/Message.h"
+#include "MOSDFastDispatchOp.h"
/*
* instruct an OSD initiate a replica scrub on a specific PG
*/
-struct MOSDRepScrub : public Message {
+struct MOSDRepScrub : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 6;
static const int COMPAT_VERSION = 2;
@@ -37,15 +37,22 @@ struct MOSDRepScrub : public Message {
bool deep; // true if scrub should be deep
uint32_t seed; // seed value for digest calculation
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
MOSDRepScrub()
- : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
chunky(false),
deep(false),
seed(0) { }
MOSDRepScrub(spg_t pgid, eversion_t scrub_to, epoch_t map_epoch,
hobject_t start, hobject_t end, bool deep, uint32_t seed)
- : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION),
pgid(pgid),
scrub_to(scrub_to),
map_epoch(map_epoch),
diff --git a/src/messages/MOSDSubOp.h b/src/messages/MOSDSubOp.h
index cfc3afc4af4..bfd8d4dbe31 100644
--- a/src/messages/MOSDSubOp.h
+++ b/src/messages/MOSDSubOp.h
@@ -16,8 +16,7 @@
#ifndef CEPH_MOSDSUBOP_H
#define CEPH_MOSDSUBOP_H
-#include "msg/Message.h"
-#include "osd/osd_types.h"
+#include "MOSDFastDispatchOp.h"
#include "include/ceph_features.h"
@@ -25,7 +24,7 @@
* OSD sub op - for internal ops on pobjects between primary and replicas(/stripes/whatever)
*/
-class MOSDSubOp : public Message {
+class MOSDSubOp : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 12;
static const int COMPAT_VERSION = 7;
@@ -93,6 +92,13 @@ public:
/// non-empty if this transaction involves a hit_set history update
boost::optional<pg_hit_set_history_t> updated_hit_set_history;
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
int get_cost() const {
if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL)
return ops[0].op.extent.length;
@@ -239,11 +245,11 @@ public:
}
MOSDSubOp()
- : Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION) { }
+ : MOSDFastDispatchOp(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION) { }
MOSDSubOp(osd_reqid_t r, pg_shard_t from,
spg_t p, const hobject_t& po, int aw,
epoch_t mape, ceph_tid_t rtid, eversion_t v)
- : Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDFastDispatchOp(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION),
map_epoch(mape),
reqid(r),
from(from),
diff --git a/src/messages/MOSDSubOpReply.h b/src/messages/MOSDSubOpReply.h
index 81d1b2836de..aa300efc931 100644
--- a/src/messages/MOSDSubOpReply.h
+++ b/src/messages/MOSDSubOpReply.h
@@ -16,7 +16,7 @@
#ifndef CEPH_MOSDSUBOPREPLY_H
#define CEPH_MOSDSUBOPREPLY_H
-#include "msg/Message.h"
+#include "MOSDFastDispatchOp.h"
#include "MOSDSubOp.h"
#include "os/ObjectStore.h"
@@ -29,7 +29,7 @@
*
*/
-class MOSDSubOpReply : public Message {
+class MOSDSubOpReply : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
public:
@@ -53,6 +53,13 @@ public:
map<string,bufferptr> attrset;
+ epoch_t get_map_epoch() const override {
+ return map_epoch;
+ }
+ spg_t get_spg() const override {
+ return pgid;
+ }
+
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(map_epoch, p);
@@ -129,20 +136,21 @@ public:
public:
MOSDSubOpReply(
- MOSDSubOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
- Message(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION),
- map_epoch(e),
- reqid(req->reqid),
- from(from),
- pgid(req->pgid.pgid, req->from.shard),
- poid(req->poid),
- ops(req->ops),
- ack_type(at),
- result(result_) {
+ MOSDSubOp *req, pg_shard_t from, int result_, epoch_t e, int at)
+ : MOSDFastDispatchOp(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION),
+ map_epoch(e),
+ reqid(req->reqid),
+ from(from),
+ pgid(req->pgid.pgid, req->from.shard),
+ poid(req->poid),
+ ops(req->ops),
+ ack_type(at),
+ result(result_) {
memset(&peer_stat, 0, sizeof(peer_stat));
set_tid(req->get_tid());
}
- MOSDSubOpReply() : Message(MSG_OSD_SUBOPREPLY) {}
+ MOSDSubOpReply()
+ : MOSDFastDispatchOp(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION) {}
private:
~MOSDSubOpReply() {}
diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc
index ee26e24c32d..df21c2d1c40 100644
--- a/src/mon/OSDMonitor.cc
+++ b/src/mon/OSDMonitor.cc
@@ -5222,6 +5222,9 @@ int OSDMonitor::prepare_command_pool_set(map<string,cmd_vartype> &cmdmap,
return -E2BIG;
}
p.set_pg_num(n);
+ // force pre-luminous clients to resend their ops, since they
+ // don't understand that split PGs now form a new interval.
+ p.last_force_op_resend_preluminous = pending_inc.epoch;
} else if (var == "pgp_num") {
if (p.has_flag(pg_pool_t::FLAG_NOPGCHANGE)) {
ss << "pool pgp_num change is disabled; you must unset nopgchange flag for the pool first";
@@ -7520,9 +7523,9 @@ done:
pg_pool_t *np = pending_inc.get_new_pool(pool_id, p);
np->read_tier = overlaypool_id;
np->write_tier = overlaypool_id;
- np->last_force_op_resend = pending_inc.epoch;
+ np->set_last_force_op_resend(pending_inc.epoch);
pg_pool_t *noverlay_p = pending_inc.get_new_pool(overlaypool_id, overlay_p);
- noverlay_p->last_force_op_resend = pending_inc.epoch;
+ noverlay_p->set_last_force_op_resend(pending_inc.epoch);
ss << "overlay for '" << poolstr << "' is now (or already was) '" << overlaypoolstr << "'";
if (overlay_p->cache_mode == pg_pool_t::CACHEMODE_NONE)
ss <<" (WARNING: overlay pool cache_mode is still NONE)";
@@ -7556,16 +7559,16 @@ done:
if (np->has_read_tier()) {
const pg_pool_t *op = osdmap.get_pg_pool(np->read_tier);
pg_pool_t *nop = pending_inc.get_new_pool(np->read_tier,op);
- nop->last_force_op_resend = pending_inc.epoch;
+ nop->set_last_force_op_resend(pending_inc.epoch);
}
if (np->has_write_tier()) {
const pg_pool_t *op = osdmap.get_pg_pool(np->write_tier);
pg_pool_t *nop = pending_inc.get_new_pool(np->write_tier, op);
- nop->last_force_op_resend = pending_inc.epoch;
+ nop->set_last_force_op_resend(pending_inc.epoch);
}
np->clear_read_tier();
np->clear_write_tier();
- np->last_force_op_resend = pending_inc.epoch;
+ np->set_last_force_op_resend(pending_inc.epoch);
ss << "there is now (or already was) no overlay for '" << poolstr << "'";
wait_for_finished_proposal(op, new Monitor::C_Command(mon, op, 0, ss.str(),
get_last_committed() + 1));
@@ -7799,8 +7802,8 @@ done:
np->tiers.insert(tierpool_id);
np->read_tier = np->write_tier = tierpool_id;
np->set_snap_epoch(pending_inc.epoch); // tier will update to our snap info
- np->last_force_op_resend = pending_inc.epoch;
- ntp->last_force_op_resend = pending_inc.epoch;
+ np->set_last_force_op_resend(pending_inc.epoch);
+ ntp->set_last_force_op_resend(pending_inc.epoch);
ntp->tier_of = pool_id;
ntp->cache_mode = mode;
ntp->hit_set_count = g_conf->osd_tier_default_cache_hit_set_count;
diff --git a/src/msg/Message.cc b/src/msg/Message.cc
index a226ecbca72..26c91eef093 100644
--- a/src/msg/Message.cc
+++ b/src/msg/Message.cc
@@ -77,7 +77,6 @@ using namespace std;
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGCreate.h"
#include "messages/MOSDPGTrim.h"
-#include "messages/MOSDPGMissing.h"
#include "messages/MOSDScrub.h"
#include "messages/MOSDRepScrub.h"
#include "messages/MOSDPGScan.h"
@@ -501,9 +500,6 @@ Message *decode_message(CephContext *cct, int crcflags,
case MSG_REMOVE_SNAPS:
m = new MRemoveSnaps;
break;
- case MSG_OSD_PG_MISSING:
- m = new MOSDPGMissing;
- break;
case MSG_OSD_REP_SCRUB:
m = new MOSDRepScrub;
break;
diff --git a/src/msg/Message.h b/src/msg/Message.h
index 680f4a0aeae..e103f793668 100644
--- a/src/msg/Message.h
+++ b/src/msg/Message.h
@@ -85,7 +85,7 @@
#define MSG_REMOVE_SNAPS 90
#define MSG_OSD_SCRUB 91
-#define MSG_OSD_PG_MISSING 92
+//#define MSG_OSD_PG_MISSING 92 // obsolete
#define MSG_OSD_REP_SCRUB 93
#define MSG_OSD_PG_SCAN 94
diff --git a/src/os/filestore/JournalingObjectStore.h b/src/os/filestore/JournalingObjectStore.h
index 8b0d3eb193c..10b66764dd2 100644
--- a/src/os/filestore/JournalingObjectStore.h
+++ b/src/os/filestore/JournalingObjectStore.h
@@ -19,6 +19,7 @@
#include "Journal.h"
#include "FileJournal.h"
#include "common/RWLock.h"
+#include "osd/OpRequest.h"
class JournalingObjectStore : public ObjectStore {
protected:
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc
index 7794f162457..d7812ac36b0 100644
--- a/src/osd/OSD.cc
+++ b/src/osd/OSD.cc
@@ -85,7 +85,6 @@
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGScan.h"
#include "messages/MOSDPGBackfill.h"
-#include "messages/MOSDPGMissing.h"
#include "messages/MBackfillReserve.h"
#include "messages/MRecoveryReserve.h"
#include "messages/MOSDECSubOpWrite.h"
@@ -1424,7 +1423,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
<< m->get_map_epoch() << ", dropping" << dendl;
return;
}
- pg_t _pgid = m->get_pg();
+ pg_t _pgid = m->get_raw_pg();
spg_t pgid;
if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0)
_pgid = opmap->raw_pg_to_pg(_pgid);
@@ -1438,7 +1437,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
clog->warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
- << " pg " << m->get_pg()
+ << " pg " << m->get_raw_pg()
<< " to osd." << whoami
<< " not " << pg->acting
<< " in e" << m->get_map_epoch() << "/" << osdmap->get_epoch()
@@ -4949,7 +4948,7 @@ bool OSD::ms_handle_reset(Connection *con)
session->con.reset(NULL); // break con <-> session ref cycle
// note that we break session->con *before* the session_handle_reset
// cleanup below. this avoids a race between us and
- // PG::add_backoff, split_backoff, etc.
+ // PG::add_backoff, Session::check_backoff, etc.
session_handle_reset(session);
session->put();
return true;
@@ -6311,7 +6310,7 @@ epoch_t op_required_epoch(OpRequestRef op)
}
case CEPH_MSG_OSD_BACKOFF: {
MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
- return m->osd_epoch;
+ return m->map_epoch;
}
case MSG_OSD_SUBOP:
return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
@@ -6386,11 +6385,6 @@ void OSD::dispatch_op(OpRequestRef op)
case MSG_OSD_PG_TRIM:
handle_pg_trim(op);
break;
- case MSG_OSD_PG_MISSING:
- assert(0 ==
- "received MOSDPGMissing; this message is supposed to be unused!?!");
- break;
-
case MSG_OSD_BACKFILL_RESERVE:
handle_pg_backfill_reserve(op);
break;
@@ -6527,7 +6521,15 @@ void OSD::_dispatch(Message *m)
// -- need OSDMap --
- default:
+ case MSG_OSD_PG_CREATE:
+ case MSG_OSD_PG_NOTIFY:
+ case MSG_OSD_PG_QUERY:
+ case MSG_OSD_PG_LOG:
+ case MSG_OSD_PG_REMOVE:
+ case MSG_OSD_PG_INFO:
+ case MSG_OSD_PG_TRIM:
+ case MSG_OSD_BACKFILL_RESERVE:
+ case MSG_OSD_RECOVERY_RESERVE:
{
OpRequestRef op = op_tracker.create_request<OpRequest, Message*>(m);
// no map? starting up?
@@ -7534,7 +7536,7 @@ void OSD::consume_map()
for (set<spg_t>::iterator p = pgs_to_check.begin();
p != pgs_to_check.end();
++p) {
- if (!(osdmap->is_acting_osd_shard(p->pgid, whoami, p->shard))) {
+ if (!(osdmap->is_acting_osd_shard(spg_t(p->pgid, p->shard), whoami))) {
set<Session*> concerned_sessions;
get_sessions_possibly_interested_in_pg(*p, &concerned_sessions);
for (set<Session*>::iterator i = concerned_sessions.begin();
@@ -8781,16 +8783,8 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
client_session->put();
}
- if (cct->_conf->osd_debug_drop_op_probability > 0 &&
- !m->get_source().is_mds()) {
- if ((double)rand() / (double)RAND_MAX < cct->_conf->osd_debug_drop_op_probability) {
- dout(0) << "handle_op DEBUG artificially dropping op " << *m << dendl;
- return;
- }
- }
-
// calc actual pgid
- pg_t _pgid = m->get_pg();
+ pg_t _pgid = m->get_raw_pg();
int64_t pool = _pgid.pool();
if ((m->get_flags() & CEPH_OSD_FLAG_PGOP) == 0 &&
@@ -8828,7 +8822,7 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
if (!send_map->have_pg_pool(pgid.pool())) {
dout(7) << "dropping request; pool did not exist" << dendl;
clog->warn() << m->get_source_inst() << " invalid " << m->get_reqid()
- << " pg " << m->get_pg()
+ << " pg " << m->get_raw_pg()
<< " to osd." << whoami
<< " in e" << osdmap->get_epoch()
<< ", client e" << m->get_map_epoch()
@@ -8839,7 +8833,7 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
if (!send_map->osd_is_valid_op_target(pgid.pgid, whoami)) {
dout(7) << "we are invalid target" << dendl;
clog->warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
- << " pg " << m->get_pg()
+ << " pg " << m->get_raw_pg()
<< " to osd." << whoami
<< " in e" << osdmap->get_epoch()
<< ", client e" << m->get_map_epoch()
@@ -8875,38 +8869,10 @@ void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap)
}
// map hobject range to PG(s)
- bool queued = false;
- hobject_t pos = m->begin;
- do {
- pg_t _pgid(pos.get_hash(), pos.pool);
- if (osdmap->have_pg_pool(pos.pool)) {
- _pgid = osdmap->raw_pg_to_pg(_pgid);
- }
- if (!osdmap->have_pg_pool(_pgid.pool())) {
- // missing pool -- drop
- return;
- }
- spg_t pgid;
- if (osdmap->get_primary_shard(_pgid, &pgid)) {
- dout(10) << __func__ << " pos " << pos << " pgid " << pgid << dendl;
- PGRef pg = get_pg_or_queue_for_pg(pgid, op, s);
- if (pg) {
- if (!queued) {
- enqueue_op(pg, op);
- queued = true;
- } else {
- // use a fresh OpRequest
- m->get(); // take a ref for the new OpRequest
- OpRequestRef newop(op_tracker.create_request<OpRequest, Message*>(m));
- newop->mark_event("duplicated original op for another pg");
- enqueue_op(pg, newop);
- }
- }
- }
- // advance
- pos = _pgid.get_hobj_end(osdmap->get_pg_pool(pos.pool)->get_pg_num());
- dout(20) << __func__ << " next pg " << pos << dendl;
- } while (pos < m->end);
+ PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, s);
+ if (pg) {
+ enqueue_op(pg, op);
+ }
}
template<typename T, int MSGTYPE>
diff --git a/src/osd/OSD.h b/src/osd/OSD.h
index e2d35f01d32..3beb717e419 100644
--- a/src/osd/OSD.h
+++ b/src/osd/OSD.h
@@ -213,7 +213,6 @@ class ObjectStore;
class FuseStore;
class OSDMap;
class MLog;
-class MOSDPGMissing;
class Objecter;
class Watch;
@@ -1406,7 +1405,7 @@ public:
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
return (static_cast<MOSDOp*>(
- op->get_req())->get_pg().m_seed & mask) == match;
+ op->get_req())->get_raw_pg().m_seed & mask) == match;
}
return false;
}
diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc
index f43f1eea574..23d54ddc535 100644
--- a/src/osd/OSDMap.cc
+++ b/src/osd/OSDMap.cc
@@ -1484,26 +1484,37 @@ int OSDMap::apply_incremental(const Incremental &inc)
}
// mapping
-int OSDMap::object_locator_to_pg(
- const object_t& oid,
- const object_locator_t& loc,
- pg_t &pg) const
+int OSDMap::map_to_pg(
+ int64_t poolid,
+ const string& name,
+ const string& key,
+ const string& nspace,
+ pg_t *pg) const
{
// calculate ps (placement seed)
- const pg_pool_t *pool = get_pg_pool(loc.get_pool());
+ const pg_pool_t *pool = get_pg_pool(poolid);
if (!pool)
return -ENOENT;
ps_t ps;
+ if (!key.empty())
+ ps = pool->hash_key(key, nspace);
+ else
+ ps = pool->hash_key(name, nspace);
+ *pg = pg_t(ps, poolid);
+ return 0;
+}
+
+int OSDMap::object_locator_to_pg(
+ const object_t& oid, const object_locator_t& loc, pg_t &pg) const
+{
if (loc.hash >= 0) {
- ps = loc.hash;
- } else {
- if (!loc.key.empty())
- ps = pool->hash_key(loc.key, loc.nspace);
- else
- ps = pool->hash_key(oid.name, loc.nspace);
+ if (!get_pg_pool(loc.get_pool())) {
+ return -ENOENT;
+ }
+ pg = pg_t(loc.hash, loc.get_pool());
+ return 0;
}
- pg = pg_t(ps, loc.get_pool(), -1);
- return 0;
+ return map_to_pg(loc.get_pool(), oid.name, loc.key, loc.nspace, &pg);
}
ceph_object_layout OSDMap::make_object_layout(
@@ -1714,11 +1725,14 @@ void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const
_apply_primary_affinity(pps, *pool, up, primary);
}
-void OSDMap::_pg_to_up_acting_osds(const pg_t& pg, vector<int> *up, int *up_primary,
- vector<int> *acting, int *acting_primary) const
+void OSDMap::_pg_to_up_acting_osds(
+ const pg_t& pg, vector<int> *up, int *up_primary,
+ vector<int> *acting, int *acting_primary,
+ bool raw_pg_to_pg) const
{
const pg_pool_t *pool = get_pg_pool(pg.pool());
- if (!pool) {
+ if (!pool ||
+ (!raw_pg_to_pg && pg.ps() >= pool->get_pg_num())) {
if (up)
up->clear();
if (up_primary)
diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h
index 1f9307ee940..7522d7f62f5 100644
--- a/src/osd/OSDMap.h
+++ b/src/osd/OSDMap.h
@@ -587,14 +587,23 @@ public:
/**** mapping facilities ****/
- int object_locator_to_pg(const object_t& oid, const object_locator_t& loc, pg_t &pg) const;
- pg_t object_locator_to_pg(const object_t& oid, const object_locator_t& loc) const {
+ int map_to_pg(
+ int64_t pool,
+ const string& name,
+ const string& key,
+ const string& nspace,
+ pg_t *pg) const;
+ int object_locator_to_pg(const object_t& oid, const object_locator_t& loc,
+ pg_t &pg) const;
+ pg_t object_locator_to_pg(const object_t& oid,
+ const object_locator_t& loc) const {
pg_t pg;
int ret = object_locator_to_pg(oid, loc, pg);
assert(ret == 0);
return pg;
}
+
static object_locator_t file_to_object_locator(const file_layout_t& layout) {
return object_locator_t(layout.pool_id, layout.pool_ns);
}
@@ -647,7 +656,8 @@ private:
* map to up and acting. Fills in whatever fields are non-NULL.
*/
void _pg_to_up_acting_osds(const pg_t& pg, vector<int> *up, int *up_primary,
- vector<int> *acting, int *acting_primary) const;
+ vector<int> *acting, int *acting_primary,
+ bool raw_pg_to_pg = true) const;
public:
/***
@@ -769,14 +779,17 @@ public:
return -1; // we fail!
}
- bool is_acting_osd_shard(pg_t pg, int osd, shard_id_t shard) const {
+ /*
+ * check whether an spg_t maps to a particular osd
+ */
+ bool is_acting_osd_shard(spg_t pg, int osd) const {
vector<int> acting;
- int nrep = pg_to_acting_osds(pg, acting);
- if (shard == shard_id_t::NO_SHARD)
- return calc_pg_role(osd, acting, nrep) >= 0;
- if (shard >= (int)acting.size())
+ _pg_to_up_acting_osds(pg.pgid, NULL, NULL, &acting, NULL, false);
+ if (pg.shard == shard_id_t::NO_SHARD)
+ return calc_pg_role(osd, acting, acting.size()) >= 0;
+ if (pg.shard >= (int)acting.size())
return false;
- return acting[shard] == osd;
+ return acting[pg.shard] == osd;
}
diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h
index 65b8576052c..ef7986d4721 100644
--- a/src/osd/OpRequest.h
+++ b/src/osd/OpRequest.h
@@ -22,36 +22,10 @@
#include "include/xlist.h"
#include "msg/Message.h"
#include "include/memory.h"
+#include "osd/osd_types.h"
#include "common/TrackedOp.h"
/**
- * osd request identifier
- *
- * caller name + incarnation# + tid to unique identify this request.
- */
-struct osd_reqid_t {
- entity_name_t name; // who
- ceph_tid_t tid;
- int32_t inc; // incarnation
-
- osd_reqid_t()
- : tid(0), inc(0) {}
- osd_reqid_t(const entity_name_t& a, int i, ceph_tid_t t)
- : name(a), tid(t), inc(i) {}
-
- DENC(osd_reqid_t, v, p) {
- DENC_START(2, 2, p);
- denc(v.name, p);
- denc(v.tid, p);
- denc(v.inc, p);
- DENC_FINISH(p);
- }
- void dump(Formatter *f) const;
- static void generate_test_instances(list<osd_reqid_t*>& o);
-};
-WRITE_CLASS_DENC(osd_reqid_t)
-
-/**
* The OpRequest takes in a Message* and takes over a single reference
* to it, which it puts() when destroyed.
*/
diff --git a/src/osd/PG.cc b/src/osd/PG.cc
index f28cb34987c..c47dac0b649 100644
--- a/src/osd/PG.cc
+++ b/src/osd/PG.cc
@@ -1862,18 +1862,18 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
OSDCap& caps = session->caps;
session->put();
- const string &key = req->get_object_locator().key.empty() ?
- req->get_oid().name :
- req->get_object_locator().key;
+ const string &key = req->get_hobj().get_key().empty() ?
+ req->get_oid().name :
+ req->get_hobj().get_key();
- bool cap = caps.is_capable(pool.name, req->get_object_locator().nspace,
+ bool cap = caps.is_capable(pool.name, req->get_hobj().nspace,
pool.auid, key,
op->need_read_cap(),
op->need_write_cap(),
op->classes());
dout(20) << "op_has_sufficient_caps pool=" << pool.id << " (" << pool.name
- << " " << req->get_object_locator().nspace
+ << " " << req->get_hobj().nspace
<< ") owner=" << pool.auid
<< " need_read_cap=" << op->need_read_cap()
<< " need_write_cap=" << op->need_write_cap()
@@ -2210,27 +2210,6 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
}
}
-void PG::split_ops(PG *child, unsigned split_bits) {
- unsigned match = child->info.pgid.ps();
- assert(waiting_for_all_missing.empty());
- assert(waiting_for_cache_not_full.empty());
- assert(waiting_for_unreadable_object.empty());
- assert(waiting_for_degraded_object.empty());
- assert(waiting_for_ondisk.empty());
- assert(waiting_for_active.empty());
- assert(waiting_for_scrub.empty());
-
- osd->dequeue_pg(this, &waiting_for_peered);
-
- OSD::split_list(
- &waiting_for_peered, &(child->waiting_for_peered), match, split_bits);
- {
- Mutex::Locker l(map_lock); // to avoid a race with the osd dispatch
- OSD::split_list(
- &waiting_for_map, &(child->waiting_for_map), match, split_bits);
- }
-}
-
void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
{
child->update_snap_mapper_bits(split_bits);
@@ -2304,14 +2283,10 @@ void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
// History
child->past_intervals = past_intervals;
- split_ops(child, split_bits);
_split_into(child_pgid, child, split_bits);
- // release all backoffs so that Objecter doesn't need to handle unblock
- // on split backoffs
+ // release all backoffs for simplicity
release_backoffs(hobject_t(), hobject_t::get_max());
- // split any remaining (deleting) backoffs among child PGs
- split_backoffs(child, split_bits);
child->on_new_interval();
@@ -2326,7 +2301,7 @@ void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end)
ConnectionRef con = s->con;
if (!con) // OSD::ms_handle_reset clears s->con without a lock
return;
- Backoff *b = s->have_backoff(begin);
+ BackoffRef b(s->have_backoff(info.pgid, begin));
if (b) {
derr << __func__ << " already have backoff for " << s << " begin " << begin
<< " " << *b << dendl;
@@ -2334,24 +2309,19 @@ void PG::add_backoff(SessionRef s, const hobject_t& begin, const hobject_t& end)
}
Mutex::Locker l(backoff_lock);
{
- Mutex::Locker l(s->backoff_lock);
- b = new Backoff(this, s, ++s->backoff_seq, begin, end);
- auto& ls = s->backoffs[begin];
- if (ls.empty()) {
- ++s->backoff_count;
- }
- assert(s->backoff_count == (int)s->backoffs.size());
- ls.insert(b);
+ b = new Backoff(info.pgid, this, s, ++s->backoff_seq, begin, end);
backoffs[begin].insert(b);
+ s->add_backoff(b);
dout(10) << __func__ << " session " << s << " added " << *b << dendl;
}
con->send_message(
new MOSDBackoff(
+ info.pgid,
+ get_osdmap()->get_epoch(),
CEPH_OSD_BACKOFF_OP_BLOCK,
b->id,
begin,
- end,
- get_osdmap()->get_epoch()));
+ end));
}
void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
@@ -2398,11 +2368,12 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
if (con) { // OSD::ms_handle_reset clears s->con without a lock
con->send_message(
new MOSDBackoff(
+ info.pgid,
+ get_osdmap()->get_epoch(),
CEPH_OSD_BACKOFF_OP_UNBLOCK,
b->id,
b->begin,
- b->end,
- get_osdmap()->get_epoch()));
+ b->end));
}
if (b->is_new()) {
b->state = Backoff::STATE_DELETING;
@@ -2415,71 +2386,6 @@ void PG::release_backoffs(const hobject_t& begin, const hobject_t& end)
}
}
-void PG::split_backoffs(PG *child, unsigned split_bits)
-{
- dout(10) << __func__ << " split_bits " << split_bits << " child "
- << child->info.pgid.pgid << dendl;
- unsigned mask = ~((~0)<<split_bits);
- vector<BackoffRef> backoffs_to_dup; // pg backoffs
- vector<BackoffRef> backoffs_to_move; // oid backoffs
- {
- Mutex::Locker l(backoff_lock);
- auto p = backoffs.begin();
- while (p != backoffs.end()) {
- if (p->first == info.pgid.pgid.get_hobj_start()) {
- // if it is a full PG we always dup it for the child.
- for (auto& q : p->second) {
- dout(10) << __func__ << " pg backoff " << p->first
- << " " << q << dendl;
- backoffs_to_dup.push_back(q);
- }
- } else {
- // otherwise, we move it to the child only if falls into the
- // childs hash range.
- if ((p->first.get_hash() & mask) == child->info.pgid.pgid.ps()) {
- for (auto& q : p->second) {
- dout(20) << __func__ << " will move " << p->first
- << " " << q << dendl;
- backoffs_to_move.push_back(q);
- }
- p = backoffs.erase(p);
- continue;
- } else {
- dout(20) << __func__ << " will not move " << p->first
- << " " << p->second << dendl;
- }
- }
- ++p;
- }
- }
- for (auto b : backoffs_to_dup) {
- SessionRef s;
- {
- Mutex::Locker l(b->lock);
- b->end = info.pgid.pgid.get_hobj_end(split_bits);
- dout(10) << __func__ << " pg backoff " << *b << dendl;
- s = b->session;
- }
- if (s) {
- child->add_pg_backoff(b->session);
- } else {
- dout(20) << __func__ << " didn't dup pg backoff, session is null"
- << dendl;
- }
- }
- for (auto b : backoffs_to_move) {
- Mutex::Locker l(b->lock);
- if (b->pg == this) {
- dout(10) << __func__ << " move backoff " << *b << " to child" << dendl;
- b->pg = child;
- child->backoffs[b->begin].insert(b);
- } else {
- dout(20) << __func__ << " move backoff " << *b << " nowhere... pg is null"
- << dendl;
- }
- }
-}
-
void PG::clear_backoffs()
{
dout(10) << __func__ << " " << dendl;
@@ -5211,6 +5117,11 @@ void PG::start_peering_interval(
dirty_info = true;
dirty_big_info = true;
info.history.same_interval_since = osdmap->get_epoch();
+ if (info.pgid.pgid.is_split(lastmap->get_pg_num(info.pgid.pgid.pool()),
+ osdmap->get_pg_num(info.pgid.pgid.pool()),
+ nullptr)) {
+ info.history.last_epoch_split = osdmap->get_epoch();
+ }
}
}
@@ -5468,11 +5379,24 @@ bool PG::can_discard_op(OpRequestRef& op)
return true;
}
- if (m->get_map_epoch() < pool.info.last_force_op_resend &&
- m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
- dout(7) << __func__ << " sent before last_force_op_resend "
- << pool.info.last_force_op_resend << ", dropping" << *m << dendl;
- return true;
+ if (m->get_connection()->has_feature(CEPH_FEATURE_RESEND_ON_SPLIT)) {
+ if (m->get_map_epoch() < pool.info.get_last_force_op_resend()) {
+ dout(7) << __func__ << " sent before last_force_op_resend "
+ << pool.info.last_force_op_resend << ", dropping" << *m << dendl;
+ return true;
+ }
+ if (m->get_map_epoch() < info.history.last_epoch_split) {
+ dout(7) << __func__ << " pg split in "
+ << info.history.last_epoch_split << ", dropping" << dendl;
+ return true;
+ }
+ } else if (m->get_connection()->has_feature(CEPH_FEATURE_OSD_POOLRESEND)) {
+ if (m->get_map_epoch() < pool.info.get_last_force_op_resend_preluminous()) {
+ dout(7) << __func__ << " sent before last_force_op_resend_preluminous "
+ << pool.info.last_force_op_resend_preluminous
+ << ", dropping" << *m << dendl;
+ return true;
+ }
}
return false;
diff --git a/src/osd/PG.h b/src/osd/PG.h
index ab81b9a9696..1944bbc1bcc 100644
--- a/src/osd/PG.h
+++ b/src/osd/PG.h
@@ -840,8 +840,6 @@ protected:
map<eversion_t,
list<pair<OpRequestRef, version_t> > > waiting_for_ondisk;
- void split_ops(PG *child, unsigned split_bits);
-
void requeue_object_waiters(map<hobject_t, list<OpRequestRef>>& m);
void requeue_op(OpRequestRef op);
void requeue_ops(list<OpRequestRef> &l);
@@ -1076,7 +1074,6 @@ public:
release_backoffs(o, o);
}
void clear_backoffs();
- void split_backoffs(PG *child, unsigned split_bits);
void add_pg_backoff(SessionRef s) {
hobject_t begin = info.pgid.pgid.get_hobj_start();
diff --git a/src/osd/PGTransaction.h b/src/osd/PGTransaction.h
index 2e1013bca86..a77a9dae78f 100644
--- a/src/osd/PGTransaction.h
+++ b/src/osd/PGTransaction.h
@@ -20,6 +20,7 @@
#include "common/hobject.h"
#include "osd/osd_types.h"
+#include "osd/osd_internal_types.h"
#include "common/interval_map.h"
#include "common/inline_variant.h"
diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc
index 2af4976b16f..daf667aa959 100644
--- a/src/osd/PrimaryLogPG.cc
+++ b/src/osd/PrimaryLogPG.cc
@@ -1208,8 +1208,8 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
continue;
// skip wrong namespace
- if (m->get_object_locator().nspace != librados::all_nspaces &&
- candidate.get_namespace() != m->get_object_locator().nspace)
+ if (m->get_hobj().nspace != librados::all_nspaces &&
+ candidate.get_namespace() != m->get_hobj().nspace)
continue;
if (filter && !pgls_filter(filter, candidate, filter_out))
@@ -1379,7 +1379,7 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
}
// skip wrong namespace
- if (candidate.get_namespace() != m->get_object_locator().nspace)
+ if (candidate.get_namespace() != m->get_hobj().nspace)
continue;
if (filter && !pgls_filter(filter, candidate, filter_out))
@@ -1598,7 +1598,7 @@ void PrimaryLogPG::handle_backoff(OpRequestRef& op)
}
dout(10) << __func__ << " backoff ack id " << m->id
<< " [" << begin << "," << end << ")" << dendl;
- session->ack_backoff(cct, m->id, begin, end);
+ session->ack_backoff(cct, m->pgid, m->id, begin, end);
}
void PrimaryLogPG::do_request(
@@ -1619,10 +1619,8 @@ void PrimaryLogPG::do_request(
session->put(); // get_priv takes a ref, and so does the SessionRef
if (op->get_req()->get_type() == CEPH_MSG_OSD_OP) {
- Backoff *b = session->have_backoff(info.pgid.pgid.get_hobj_start());
- if (b) {
- dout(10) << " have backoff " << *b << " " << *m << dendl;
- assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff);
+ if (session->check_backoff(cct, info.pgid,
+ info.pgid.pgid.get_hobj_start(), m)) {
return;
}
@@ -1630,7 +1628,7 @@ void PrimaryLogPG::do_request(
is_down() ||
is_incomplete() ||
(!is_active() && is_peered());
- if (g_conf->osd_peering_aggressive_backoff && !backoff) {
+ if (g_conf->osd_backoff_on_peering && !backoff) {
if (is_peering()) {
backoff = true;
}
@@ -1767,9 +1765,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
dout(20) << __func__ << ": op " << *m << dendl;
- hobject_t head(m->get_oid(), m->get_object_locator().key,
- CEPH_NOSNAP, m->get_pg().ps(),
- info.pgid.pool(), m->get_object_locator().nspace);
+ hobject_t head = m->get_hobj();
+ head.snap = CEPH_NOSNAP;
bool can_backoff =
m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF);
@@ -1782,10 +1779,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
}
session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
- Backoff *b = session->have_backoff(head);
- if (b) {
- dout(10) << __func__ << " have backoff " << *b << " " << *m << dendl;
- assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff);
+ if (session->check_backoff(cct, info.pgid, head, m)) {
return;
}
}
@@ -1843,16 +1837,14 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
osd->reply_op_error(op, -ENAMETOOLONG);
return;
}
- if (m->get_object_locator().key.size() >
- cct->_conf->osd_max_object_name_len) {
+ if (m->get_hobj().get_key().size() > cct->_conf->osd_max_object_name_len) {
dout(4) << "do_op locator is longer than "
<< cct->_conf->osd_max_object_name_len
<< " bytes" << dendl;
osd->reply_op_error(op, -ENAMETOOLONG);
return;
}
- if (m->get_object_locator().nspace.size() >
- cct->_conf->osd_max_object_namespace_len) {
+ if (m->get_hobj().nspace.size() > cct->_conf->osd_max_object_namespace_len) {
dout(4) << "do_op namespace is longer than "
<< cct->_conf->osd_max_object_namespace_len
<< " bytes" << dendl;
@@ -1934,8 +1926,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
// missing object?
if (is_unreadable_object(head)) {
if (can_backoff &&
- (g_conf->osd_recovery_aggressive_backoff ||
- missing_loc.is_unfound(head))) {
+ (g_conf->osd_backoff_on_degraded ||
+ (g_conf->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {
add_backoff(session, head, head);
maybe_kick_recovery(head);
} else {
@@ -1946,7 +1938,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
// degraded object?
if (write_ordered && is_degraded_or_backfilling_object(head)) {
- if (can_backoff && g_conf->osd_recovery_aggressive_backoff) {
+ if (can_backoff && g_conf->osd_backoff_on_degraded) {
add_backoff(session, head, head);
} else {
wait_for_degraded_object(head, op);
@@ -2036,12 +2028,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
ObjectContextRef obc;
bool can_create = op->may_write() || op->may_cache();
hobject_t missing_oid;
- hobject_t oid(m->get_oid(),
- m->get_object_locator().key,
- m->get_snapid(),
- m->get_pg().ps(),
- m->get_object_locator().get_pool(),
- m->get_object_locator().nspace);
+ const hobject_t& oid = m->get_hobj();
// io blocked on obc?
if (!m->has_flag(CEPH_OSD_FLAG_FLUSH) &&
@@ -2368,7 +2355,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
}
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
- const object_locator_t& oloc = m->get_object_locator();
+ const object_locator_t oloc = m->get_object_locator();
if (op->need_skip_handle_cache()) {
return cache_result_t::NOOP;
@@ -2626,12 +2613,7 @@ void PrimaryLogPG::do_proxy_read(OpRequestRef op)
object_locator_t oloc(m->get_object_locator());
oloc.pool = pool.info.tier_of;
- hobject_t soid(m->get_oid(),
- m->get_object_locator().key,
- m->get_snapid(),
- m->get_pg().ps(),
- m->get_object_locator().get_pool(),
- m->get_object_locator().nspace);
+ const hobject_t& soid = m->get_hobj();
unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
// pass through some original flags that make sense.
@@ -2821,12 +2803,7 @@ void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid)
oloc.pool = pool.info.tier_of;
SnapContext snapc(m->get_snap_seq(), m->get_snaps());
- hobject_t soid(m->get_oid(),
- m->get_object_locator().key,
- missing_oid.snap,
- m->get_pg().ps(),
- m->get_object_locator().get_pool(),
- m->get_object_locator().nspace);
+ const hobject_t& soid = m->get_hobj();
unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
dout(10) << __func__ << " Start proxy write for " << *m << dendl;
diff --git a/src/osd/Session.cc b/src/osd/Session.cc
index 125bd1f1579..2de44271344 100644
--- a/src/osd/Session.cc
+++ b/src/osd/Session.cc
@@ -11,25 +11,27 @@
void Session::clear_backoffs()
{
- map<hobject_t,set<BackoffRef>> ls;
+ map<spg_t,map<hobject_t,set<BackoffRef>>> ls;
{
Mutex::Locker l(backoff_lock);
ls.swap(backoffs);
backoff_count = 0;
}
- for (auto& p : ls) {
- for (auto& b : p.second) {
- Mutex::Locker l(b->lock);
- if (b->pg) {
- assert(b->session == this);
- assert(b->is_new() || b->is_acked());
- b->pg->rm_backoff(b);
- b->pg.reset();
- b->session.reset();
- } else if (b->session) {
- assert(b->session == this);
- assert(b->is_deleting());
- b->session.reset();
+ for (auto& i : ls) {
+ for (auto& p : i.second) {
+ for (auto& b : p.second) {
+ Mutex::Locker l(b->lock);
+ if (b->pg) {
+ assert(b->session == this);
+ assert(b->is_new() || b->is_acked());
+ b->pg->rm_backoff(b);
+ b->pg.reset();
+ b->session.reset();
+ } else if (b->session) {
+ assert(b->session == this);
+ assert(b->is_deleting());
+ b->session.reset();
+ }
}
}
}
@@ -37,42 +39,65 @@ void Session::clear_backoffs()
void Session::ack_backoff(
CephContext *cct,
+ spg_t pgid,
uint64_t id,
const hobject_t& begin,
const hobject_t& end)
{
Mutex::Locker l(backoff_lock);
- // NOTE that ack may be for [a,c] but osd may now have [a,b) and
- // [b,c) due to a PG split.
- auto p = backoffs.lower_bound(begin);
- while (p != backoffs.end()) {
- // note: must still examine begin=end=p->first case
- int r = cmp(p->first, end);
- if (r > 0 || (r == 0 && begin < end)) {
- break;
- }
- auto q = p->second.begin();
- while (q != p->second.end()) {
- Backoff *b = (*q).get();
- if (b->id == id) {
- if (b->is_new()) {
- b->state = Backoff::STATE_ACKED;
- dout(20) << __func__ << " now " << *b << dendl;
- } else if (b->is_deleting()) {
- dout(20) << __func__ << " deleting " << *b << dendl;
- q = p->second.erase(q);
- continue;
- }
+ auto p = backoffs.find(pgid);
+ if (p == backoffs.end()) {
+ dout(20) << __func__ << " " << pgid << " " << id << " [" << begin << ","
+ << end << ") pg not found" << dendl;
+ return;
+ }
+ auto q = p->second.find(begin);
+ if (q == p->second.end()) {
+ dout(20) << __func__ << " " << pgid << " " << id << " [" << begin << ","
+ << end << ") begin not found" << dendl;
+ return;
+ }
+ for (auto i = q->second.begin(); i != q->second.end(); ++i) {
+ Backoff *b = (*i).get();
+ if (b->id == id) {
+ if (b->is_new()) {
+ b->state = Backoff::STATE_ACKED;
+ dout(20) << __func__ << " now " << *b << dendl;
+ } else if (b->is_deleting()) {
+ dout(20) << __func__ << " deleting " << *b << dendl;
+ q->second.erase(i);
+ --backoff_count;
}
- ++q;
+ break;
}
+ }
+ if (q->second.empty()) {
+ dout(20) << __func__ << " clearing begin bin " << q->first << dendl;
+ p->second.erase(q);
if (p->second.empty()) {
- dout(20) << __func__ << " clearing bin " << p->first << dendl;
- p = backoffs.erase(p);
- --backoff_count;
- } else {
- ++p;
+ dout(20) << __func__ << " clearing pg bin " << p->first << dendl;
+ backoffs.erase(p);
}
}
- assert(backoff_count == (int)backoffs.size());
+ assert(!backoff_count == backoffs.empty());
+}
+
+bool Session::check_backoff(
+ CephContext *cct, spg_t pgid, const hobject_t& oid, Message *m)
+{
+ BackoffRef b(have_backoff(pgid, oid));
+ if (b) {
+ dout(10) << __func__ << " session " << this << " has backoff " << *b
+ << " for " << *m << dendl;
+ assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff);
+ return true;
+ }
+ // we may race with ms_handle_reset. it clears session->con before removing
+ // backoffs, so if we see con is cleared here we have to abort this
+ // request.
+ if (!con) {
+ dout(10) << __func__ << " session " << this << " disconnected" << dendl;
+ return true;
+ }
+ return false;
}
diff --git a/src/osd/Session.h b/src/osd/Session.h
index 6492c4bd4e4..257bc34a155 100644
--- a/src/osd/Session.h
+++ b/src/osd/Session.h
@@ -67,6 +67,7 @@ struct Backoff : public RefCountedObject {
STATE_DELETING = 3 ///< backoff deleted, but un-acked
};
std::atomic_int state = {STATE_NEW};
+ spg_t pgid; ///< owning pgid
uint64_t id = 0; ///< unique id (within the Session)
bool is_new() const {
@@ -96,10 +97,11 @@ struct Backoff : public RefCountedObject {
SessionRef session; ///< owning session
hobject_t begin, end; ///< [) range to block, unless ==, then single obj
- Backoff(PGRef pg, SessionRef s,
+ Backoff(spg_t pgid, PGRef pg, SessionRef s,
uint64_t i,
const hobject_t& b, const hobject_t& e)
: RefCountedObject(g_ceph_context, 0),
+ pgid(pgid),
id(i),
lock("Backoff::lock"),
pg(pg),
@@ -108,7 +110,7 @@ struct Backoff : public RefCountedObject {
end(e) {}
friend ostream& operator<<(ostream& out, const Backoff& b) {
- return out << "Backoff(" << &b << " " << b.id
+ return out << "Backoff(" << &b << " " << b.pgid << " " << b.id
<< " " << b.get_state_name()
<< " [" << b.begin << "," << b.end << ") "
<< " session " << b.session
@@ -139,7 +141,7 @@ struct Session : public RefCountedObject {
/// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
Mutex backoff_lock;
std::atomic_int backoff_count= {0}; ///< simple count of backoffs
- map<hobject_t,set<BackoffRef>> backoffs;
+ map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs;
std::atomic<uint64_t> backoff_seq = {0};
@@ -159,26 +161,32 @@ struct Session : public RefCountedObject {
void ack_backoff(
CephContext *cct,
+ spg_t pgid,
uint64_t id,
const hobject_t& start,
const hobject_t& end);
- Backoff *have_backoff(const hobject_t& oid) {
- if (backoff_count.load()) {
- Mutex::Locker l(backoff_lock);
- assert(backoff_count == (int)backoffs.size());
- auto p = backoffs.lower_bound(oid);
- if (p != backoffs.begin() &&
- p->first > oid) {
- --p;
- }
- if (p != backoffs.end()) {
- int r = cmp(oid, p->first);
- if (r == 0 || r > 0) {
- for (auto& q : p->second) {
- if (r == 0 || oid < q->end) {
- return &(*q);
- }
+ BackoffRef have_backoff(spg_t pgid, const hobject_t& oid) {
+ if (!backoff_count.load()) {
+ return nullptr;
+ }
+ Mutex::Locker l(backoff_lock);
+ assert(!backoff_count == backoffs.empty());
+ auto i = backoffs.find(pgid);
+ if (i == backoffs.end()) {
+ return nullptr;
+ }
+ auto p = i->second.lower_bound(oid);
+ if (p != i->second.begin() &&
+ p->first > oid) {
+ --p;
+ }
+ if (p != i->second.end()) {
+ int r = cmp(oid, p->first);
+ if (r == 0 || r > 0) {
+ for (auto& q : p->second) {
+ if (r == 0 || oid < q->end) {
+ return &(*q);
}
}
}
@@ -186,24 +194,40 @@ struct Session : public RefCountedObject {
return nullptr;
}
+ bool check_backoff(
+ CephContext *cct, spg_t pgid, const hobject_t& oid, Message *m);
+
+ void add_backoff(BackoffRef b) {
+ Mutex::Locker l(backoff_lock);
+ assert(!backoff_count == backoffs.empty());
+ backoffs[b->pgid][b->begin].insert(b);
+ ++backoff_count;
+ }
+
// called by PG::release_*_backoffs and PG::clear_backoffs()
void rm_backoff(BackoffRef b) {
Mutex::Locker l(backoff_lock);
assert(b->lock.is_locked_by_me());
assert(b->session == this);
- auto p = backoffs.find(b->begin);
- // may race with clear_backoffs()
- if (p != backoffs.end()) {
- auto q = p->second.find(b);
- if (q != p->second.end()) {
- p->second.erase(q);
- if (p->second.empty()) {
- backoffs.erase(p);
+ auto i = backoffs.find(b->pgid);
+ if (i != backoffs.end()) {
+ // may race with clear_backoffs()
+ auto p = i->second.find(b->begin);
+ if (p != i->second.end()) {
+ auto q = p->second.find(b);
+ if (q != p->second.end()) {
+ p->second.erase(q);
--backoff_count;
+ if (p->second.empty()) {
+ i->second.erase(p);
+ if (i->second.empty()) {
+ backoffs.erase(i);
+ }
+ }
}
}
}
- assert(backoff_count == (int)backoffs.size());
+ assert(!backoff_count == backoffs.empty());
}
void clear_backoffs();
};
diff --git a/src/osd/osd_internal_types.h b/src/osd/osd_internal_types.h
new file mode 100644
index 00000000000..a415a4bf6e2
--- /dev/null
+++ b/src/osd/osd_internal_types.h
@@ -0,0 +1,513 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_OSD_INTERNAL_TYPES_H
+#define CEPH_OSD_INTERNAL_TYPES_H
+
+#include "osd_types.h"
+#include "OpRequest.h"
+
+/*
+ * keep tabs on object modifications that are in flight.
+ * we need to know the projected existence, size, snapset,
+ * etc., because we don't send writes down to disk until after
+ * replicas ack.
+ */
+
+struct ObjectContext;
+
+struct ObjectState {
+ object_info_t oi;
+ bool exists; ///< the stored object exists (i.e., we will remember the object_info_t)
+
+ ObjectState() : exists(false) {}
+
+ ObjectState(const object_info_t &oi_, bool exists_)
+ : oi(oi_), exists(exists_) {}
+};
+
+typedef ceph::shared_ptr<ObjectContext> ObjectContextRef;
+
+struct ObjectContext {
+ ObjectState obs;
+
+ SnapSetContext *ssc; // may be null
+
+ Context *destructor_callback;
+
+private:
+ Mutex lock;
+public:
+ Cond cond;
+ int unstable_writes, readers, writers_waiting, readers_waiting;
+
+
+ // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
+ map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
+
+ // attr cache
+ map<string, bufferlist> attr_cache;
+
+ struct RWState {
+ enum State {
+ RWNONE,
+ RWREAD,
+ RWWRITE,
+ RWEXCL,
+ };
+ static const char *get_state_name(State s) {
+ switch (s) {
+ case RWNONE: return "none";
+ case RWREAD: return "read";
+ case RWWRITE: return "write";
+ case RWEXCL: return "excl";
+ default: return "???";
+ }
+ }
+ const char *get_state_name() const {
+ return get_state_name(state);
+ }
+
+ list<OpRequestRef> waiters; ///< ops waiting on state change
+ int count; ///< number of readers or writers
+
+ State state:4; ///< rw state
+ /// if set, restart backfill when we can get a read lock
+ bool recovery_read_marker:1;
+ /// if set, requeue snaptrim on lock release
+ bool snaptrimmer_write_marker:1;
+
+ RWState()
+ : count(0),
+ state(RWNONE),
+ recovery_read_marker(false),
+ snaptrimmer_write_marker(false)
+ {}
+ bool get_read(OpRequestRef op) {
+ if (get_read_lock()) {
+ return true;
+ } // else
+ waiters.push_back(op);
+ return false;
+ }
+ /// this function adjusts the counts if necessary
+ bool get_read_lock() {
+ // don't starve anybody!
+ if (!waiters.empty()) {
+ return false;
+ }
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWREAD;
+ // fall through
+ case RWREAD:
+ count++;
+ return true;
+ case RWWRITE:
+ return false;
+ case RWEXCL:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+
+ bool get_write(OpRequestRef op, bool greedy=false) {
+ if (get_write_lock(greedy)) {
+ return true;
+ } // else
+ if (op)
+ waiters.push_back(op);
+ return false;
+ }
+ bool get_write_lock(bool greedy=false) {
+ if (!greedy) {
+ // don't starve anybody!
+ if (!waiters.empty() ||
+ recovery_read_marker) {
+ return false;
+ }
+ }
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWWRITE;
+ // fall through
+ case RWWRITE:
+ count++;
+ return true;
+ case RWREAD:
+ return false;
+ case RWEXCL:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ bool get_excl_lock() {
+ switch (state) {
+ case RWNONE:
+ assert(count == 0);
+ state = RWEXCL;
+ count = 1;
+ return true;
+ case RWWRITE:
+ return false;
+ case RWREAD:
+ return false;
+ case RWEXCL:
+ return false;
+ default:
+ assert(0 == "unhandled case");
+ return false;
+ }
+ }
+ bool get_excl(OpRequestRef op) {
+ if (get_excl_lock()) {
+ return true;
+ } // else
+ if (op)
+ waiters.push_back(op);
+ return false;
+ }
+ /// same as get_write_lock, but ignore starvation
+ bool take_write_lock() {
+ if (state == RWWRITE) {
+ count++;
+ return true;
+ }
+ return get_write_lock();
+ }
+ void dec(list<OpRequestRef> *requeue) {
+ assert(count > 0);
+ assert(requeue);
+ count--;
+ if (count == 0) {
+ state = RWNONE;
+ requeue->splice(requeue->end(), waiters);
+ }
+ }
+ void put_read(list<OpRequestRef> *requeue) {
+ assert(state == RWREAD);
+ dec(requeue);
+ }
+ void put_write(list<OpRequestRef> *requeue) {
+ assert(state == RWWRITE);
+ dec(requeue);
+ }
+ void put_excl(list<OpRequestRef> *requeue) {
+ assert(state == RWEXCL);
+ dec(requeue);
+ }
+ bool empty() const { return state == RWNONE; }
+ } rwstate;
+
+ bool get_read(OpRequestRef op) {
+ return rwstate.get_read(op);
+ }
+ bool get_write(OpRequestRef op) {
+ return rwstate.get_write(op, false);
+ }
+ bool get_excl(OpRequestRef op) {
+ return rwstate.get_excl(op);
+ }
+ bool get_lock_type(OpRequestRef op, RWState::State type) {
+ switch (type) {
+ case RWState::RWWRITE:
+ return get_write(op);
+ case RWState::RWREAD:
+ return get_read(op);
+ case RWState::RWEXCL:
+ return get_excl(op);
+ default:
+ assert(0 == "invalid lock type");
+ return true;
+ }
+ }
+ bool get_write_greedy(OpRequestRef op) {
+ return rwstate.get_write(op, true);
+ }
+ bool get_snaptrimmer_write(bool mark_if_unsuccessful) {
+ if (rwstate.get_write_lock()) {
+ return true;
+ } else {
+ if (mark_if_unsuccessful)
+ rwstate.snaptrimmer_write_marker = true;
+ return false;
+ }
+ }
+ bool get_recovery_read() {
+ rwstate.recovery_read_marker = true;
+ if (rwstate.get_read_lock()) {
+ return true;
+ }
+ return false;
+ }
+ bool try_get_read_lock() {
+ return rwstate.get_read_lock();
+ }
+ void drop_recovery_read(list<OpRequestRef> *ls) {
+ assert(rwstate.recovery_read_marker);
+ rwstate.put_read(ls);
+ rwstate.recovery_read_marker = false;
+ }
+ void put_read(list<OpRequestRef> *to_wake) {
+ rwstate.put_read(to_wake);
+ }
+ void put_excl(list<OpRequestRef> *to_wake,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ rwstate.put_excl(to_wake);
+ if (rwstate.empty() && rwstate.recovery_read_marker) {
+ rwstate.recovery_read_marker = false;
+ *requeue_recovery = true;
+ }
+ if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
+ rwstate.snaptrimmer_write_marker = false;
+ *requeue_snaptrimmer = true;
+ }
+ }
+ void put_write(list<OpRequestRef> *to_wake,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ rwstate.put_write(to_wake);
+ if (rwstate.empty() && rwstate.recovery_read_marker) {
+ rwstate.recovery_read_marker = false;
+ *requeue_recovery = true;
+ }
+ if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
+ rwstate.snaptrimmer_write_marker = false;
+ *requeue_snaptrimmer = true;
+ }
+ }
+ void put_lock_type(
+ ObjectContext::RWState::State type,
+ list<OpRequestRef> *to_wake,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ switch (type) {
+ case ObjectContext::RWState::RWWRITE:
+ return put_write(to_wake, requeue_recovery, requeue_snaptrimmer);
+ case ObjectContext::RWState::RWREAD:
+ return put_read(to_wake);
+ case ObjectContext::RWState::RWEXCL:
+ return put_excl(to_wake, requeue_recovery, requeue_snaptrimmer);
+ default:
+ assert(0 == "invalid lock type");
+ }
+ }
+ bool is_request_pending() {
+ return (rwstate.count > 0);
+ }
+
+ ObjectContext()
+ : ssc(NULL),
+ destructor_callback(0),
+ lock("PrimaryLogPG::ObjectContext::lock"),
+ unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
+ blocked(false), requeue_scrub_on_unblock(false) {}
+
+ ~ObjectContext() {
+ assert(rwstate.empty());
+ if (destructor_callback)
+ destructor_callback->complete(0);
+ }
+
+ void start_block() {
+ assert(!blocked);
+ blocked = true;
+ }
+ void stop_block() {
+ assert(blocked);
+ blocked = false;
+ }
+ bool is_blocked() const {
+ return blocked;
+ }
+
+ // do simple synchronous mutual exclusion, for now. no waitqueues or anything fancy.
+ void ondisk_write_lock() {
+ lock.Lock();
+ writers_waiting++;
+ while (readers_waiting || readers)
+ cond.Wait(lock);
+ writers_waiting--;
+ unstable_writes++;
+ lock.Unlock();
+ }
+ void ondisk_write_unlock() {
+ lock.Lock();
+ assert(unstable_writes > 0);
+ unstable_writes--;
+ if (!unstable_writes && readers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
+ void ondisk_read_lock() {
+ lock.Lock();
+ readers_waiting++;
+ while (unstable_writes)
+ cond.Wait(lock);
+ readers_waiting--;
+ readers++;
+ lock.Unlock();
+ }
+ void ondisk_read_unlock() {
+ lock.Lock();
+ assert(readers > 0);
+ readers--;
+ if (!readers && writers_waiting)
+ cond.Signal();
+ lock.Unlock();
+ }
+
+ /// in-progress copyfrom ops for this object
+ bool blocked:1;
+ bool requeue_scrub_on_unblock:1; // true if we need to requeue scrub on unblock
+
+};
+
+inline ostream& operator<<(ostream& out, const ObjectState& obs)
+{
+ out << obs.oi.soid;
+ if (!obs.exists)
+ out << "(dne)";
+ return out;
+}
+
+inline ostream& operator<<(ostream& out, const ObjectContext::RWState& rw)
+{
+ return out << "rwstate(" << rw.get_state_name()
+ << " n=" << rw.count
+ << " w=" << rw.waiters.size()
+ << ")";
+}
+
+inline ostream& operator<<(ostream& out, const ObjectContext& obc)
+{
+ return out << "obc(" << obc.obs << " " << obc.rwstate << ")";
+}
+
+class ObcLockManager {
+ struct ObjectLockState {
+ ObjectContextRef obc;
+ ObjectContext::RWState::State type;
+ ObjectLockState(
+ ObjectContextRef obc,
+ ObjectContext::RWState::State type)
+ : obc(obc), type(type) {}
+ };
+ map<hobject_t, ObjectLockState> locks;
+public:
+ ObcLockManager() = default;
+ ObcLockManager(ObcLockManager &&) = default;
+ ObcLockManager(const ObcLockManager &) = delete;
+ ObcLockManager &operator=(ObcLockManager &&) = default;
+ bool empty() const {
+ return locks.empty();
+ }
+ bool get_lock_type(
+ ObjectContext::RWState::State type,
+ const hobject_t &hoid,
+ ObjectContextRef obc,
+ OpRequestRef op) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->get_lock_type(op, type)) {
+ locks.insert(make_pair(hoid, ObjectLockState(obc, type)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /// Get write lock, ignore starvation
+ bool take_write_lock(
+ const hobject_t &hoid,
+ ObjectContextRef obc) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->rwstate.take_write_lock()) {
+ locks.insert(
+ make_pair(
+ hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /// Get write lock for snap trim
+ bool get_snaptrimmer_write(
+ const hobject_t &hoid,
+ ObjectContextRef obc,
+ bool mark_if_unsuccessful) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->get_snaptrimmer_write(mark_if_unsuccessful)) {
+ locks.insert(
+ make_pair(
+ hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+ /// Get write lock greedy
+ bool get_write_greedy(
+ const hobject_t &hoid,
+ ObjectContextRef obc,
+ OpRequestRef op) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->get_write_greedy(op)) {
+ locks.insert(
+ make_pair(
+ hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /// try get read lock
+ bool try_get_read_lock(
+ const hobject_t &hoid,
+ ObjectContextRef obc) {
+ assert(locks.find(hoid) == locks.end());
+ if (obc->try_get_read_lock()) {
+ locks.insert(
+ make_pair(
+ hoid,
+ ObjectLockState(obc, ObjectContext::RWState::RWREAD)));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ void put_locks(
+ list<pair<hobject_t, list<OpRequestRef> > > *to_requeue,
+ bool *requeue_recovery,
+ bool *requeue_snaptrimmer) {
+ for (auto p: locks) {
+ list<OpRequestRef> _to_requeue;
+ p.second.obc->put_lock_type(
+ p.second.type,
+ &_to_requeue,
+ requeue_recovery,
+ requeue_snaptrimmer);
+ if (to_requeue) {
+ to_requeue->push_back(
+ make_pair(
+ p.second.obc->obs.oi.soid,
+ std::move(_to_requeue)));
+ }
+ }
+ locks.clear();
+ }
+ ~ObcLockManager() {
+ assert(locks.empty());
+ }
+};
+
+
+
+#endif
diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc
index ff3c97268b6..d16e2d19aa1 100644
--- a/src/osd/osd_types.cc
+++ b/src/osd/osd_types.cc
@@ -1130,6 +1130,8 @@ void pg_pool_t::dump(Formatter *f) const
f->dump_unsigned("crash_replay_interval", get_crash_replay_interval());
f->dump_stream("last_change") << get_last_change();
f->dump_stream("last_force_op_resend") << get_last_force_op_resend();
+ f->dump_stream("last_force_op_resend_preluminous")
+ << get_last_force_op_resend_preluminous();
f->dump_unsigned("auid", get_auid());
f->dump_string("snap_mode", is_pool_snaps_mode() ? "pool" : "selfmanaged");
f->dump_unsigned("snap_seq", get_snap_seq());
@@ -1482,12 +1484,17 @@ void pg_pool_t::encode(bufferlist& bl, uint64_t features) const
return;
}
- uint8_t v = 24;
+ uint8_t v = 25;
if (!(features & CEPH_FEATURE_NEW_OSDOP_ENCODING)) {
// this was the first post-hammer thing we added; if it's missing, encode
// like hammer.
v = 21;
}
+ if ((features &
+ (CEPH_FEATURE_RESEND_ON_SPLIT|CEPH_FEATURE_SERVER_JEWEL)) !=
+ (CEPH_FEATURE_RESEND_ON_SPLIT|CEPH_FEATURE_SERVER_JEWEL)) {
+ v = 24;
+ }
ENCODE_START(v, 5, bl);
::encode(type, bl);
@@ -1528,7 +1535,7 @@ void pg_pool_t::encode(bufferlist& bl, uint64_t features) const
::encode(cache_min_flush_age, bl);
::encode(cache_min_evict_age, bl);
::encode(erasure_code_profile, bl);
- ::encode(last_force_op_resend, bl);
+ ::encode(last_force_op_resend_preluminous, bl);
::encode(min_read_recency_for_promote, bl);
::encode(expected_num_objects, bl);
if (v >= 19) {
@@ -1550,6 +1557,9 @@ void pg_pool_t::encode(bufferlist& bl, uint64_t features) const
if (v >= 24) {
::encode(opts, bl);
}
+ if (v >= 25) {
+ ::encode(last_force_op_resend, bl);
+ }
ENCODE_FINISH(bl);
}
@@ -1653,9 +1663,9 @@ void pg_pool_t::decode(bufferlist::iterator& bl)
::decode(erasure_code_profile, bl);
}
if (struct_v >= 15) {
- ::decode(last_force_op_resend, bl);
+ ::decode(last_force_op_resend_preluminous, bl);
} else {
- last_force_op_resend = 0;
+ last_force_op_resend_preluminous = 0;
}
if (struct_v >= 16) {
::decode(min_read_recency_for_promote, bl);
@@ -1697,6 +1707,11 @@ void pg_pool_t::decode(bufferlist::iterator& bl)
if (struct_v >= 24) {
::decode(opts, bl);
}
+ if (struct_v >= 25) {
+ ::decode(last_force_op_resend, bl);
+ } else {
+ last_force_op_resend = last_force_op_resend_preluminous;
+ }
DECODE_FINISH(bl);
calc_pg_masks();
calc_grade_table();
@@ -1715,6 +1730,7 @@ void pg_pool_t::generate_test_instances(list<pg_pool_t*>& o)
a.pgp_num = 5;
a.last_change = 9;
a.last_force_op_resend = 123823;
+ a.last_force_op_resend_preluminous = 123824;
a.snap_seq = 10;
a.snap_epoch = 11;
a.auid = 12;
@@ -1772,8 +1788,10 @@ ostream& operator<<(ostream& out, const pg_pool_t& p)
<< " pg_num " << p.get_pg_num()
<< " pgp_num " << p.get_pgp_num()
<< " last_change " << p.get_last_change();
- if (p.get_last_force_op_resend())
- out << " lfor " << p.get_last_force_op_resend();
+ if (p.get_last_force_op_resend() ||
+ p.get_last_force_op_resend_preluminous())
+ out << " lfor " << p.get_last_force_op_resend() << "/"
+ << p.get_last_force_op_resend_preluminous();
if (p.get_auid())
out << " owner " << p.get_auid();
if (p.flags)
diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h
index 318a902856a..671e0ecc262 100644
--- a/src/osd/osd_types.h
+++ b/src/osd/osd_types.h
@@ -41,7 +41,6 @@
#include "common/snap_types.h"
#include "HitSet.h"
#include "Watch.h"
-#include "OpRequest.h"
#include "include/cmp.h"
#include "librados/ListObjectImpl.h"
#include "compressor/Compressor.h"
@@ -99,6 +98,36 @@ string ceph_osd_op_flag_string(unsigned flags);
/// conver CEPH_OSD_ALLOC_HINT_FLAG_* op flags to a string
string ceph_osd_alloc_hint_flag_string(unsigned flags);
+
+/**
+ * osd request identifier
+ *
+ * caller name + incarnation# + tid to unique identify this request.
+ */
+struct osd_reqid_t {
+ entity_name_t name; // who
+ ceph_tid_t tid;
+ int32_t inc; // incarnation
+
+ osd_reqid_t()
+ : tid(0), inc(0) {}
+ osd_reqid_t(const entity_name_t& a, int i, ceph_tid_t t)
+ : name(a), tid(t), inc(i) {}
+
+ DENC(osd_reqid_t, v, p) {
+ DENC_START(2, 2, p);
+ denc(v.name, p);
+ denc(v.tid, p);
+ denc(v.inc, p);
+ DENC_FINISH(p);
+ }
+ void dump(Formatter *f) const;
+ static void generate_test_instances(list<osd_reqid_t*>& o);
+};
+WRITE_CLASS_DENC(osd_reqid_t)
+
+
+
struct pg_shard_t {
int32_t osd;
shard_id_t shard;
@@ -1220,6 +1249,8 @@ public:
string erasure_code_profile; ///< name of the erasure code profile in OSDMap
epoch_t last_change; ///< most recent epoch changed, exclusing snapshot changes
epoch_t last_force_op_resend; ///< last epoch that forced clients to resend
+ /// last epoch that forced clients to resend (pre-luminous clients only)
+ epoch_t last_force_op_resend_preluminous;
snapid_t snap_seq; ///< seq for per-pool snapshot
epoch_t snap_epoch; ///< osdmap epoch of last snap
uint64_t auid; ///< who owns the pg
@@ -1334,6 +1365,7 @@ public:
pg_num(0), pgp_num(0),
last_change(0),
last_force_op_resend(0),
+ last_force_op_resend_preluminous(0),
snap_seq(0), snap_epoch(0),
auid(0),
crash_replay_interval(0),
@@ -1390,6 +1422,9 @@ public:
}
epoch_t get_last_change() const { return last_change; }
epoch_t get_last_force_op_resend() const { return last_force_op_resend; }
+ epoch_t get_last_force_op_resend_preluminous() const {
+ return last_force_op_resend_preluminous;
+ }
epoch_t get_snap_epoch() const { return snap_epoch; }
snapid_t get_snap_seq() const { return snap_seq; }
uint64_t get_auid() const { return auid; }
@@ -1462,6 +1497,11 @@ public:
return quota_max_objects;
}
+ void set_last_force_op_resend(uint64_t t) {
+ last_force_op_resend = t;
+ last_force_op_resend_preluminous = t;
+ }
+
void calc_pg_masks();
/*
@@ -4081,16 +4121,6 @@ struct object_info_t {
};
WRITE_CLASS_ENCODER_FEATURES(object_info_t)
-struct ObjectState {
- object_info_t oi;
- bool exists; ///< the stored object exists (i.e., we will remember the object_info_t)
-
- ObjectState() : exists(false) {}
-
- ObjectState(const object_info_t &oi_, bool exists_)
- : oi(oi_), exists(exists_) {}
-};
-
struct SnapSetContext {
hobject_t oid;
SnapSet snapset;
@@ -4102,500 +4132,10 @@ struct SnapSetContext {
oid(o), ref(0), registered(false), exists(true) { }
};
-/*
- * keep tabs on object modifications that are in flight.
- * we need to know the projected existence, size, snapset,
- * etc., because we don't send writes down to disk until after
- * replicas ack.
- */
-
-struct ObjectContext;
-
-typedef ceph::shared_ptr<ObjectContext> ObjectContextRef;
-
-struct ObjectContext {
- ObjectState obs;
-
- SnapSetContext *ssc; // may be null
-
- Context *destructor_callback;
-
-private:
- Mutex lock;
-public:
- Cond cond;
- int unstable_writes, readers, writers_waiting, readers_waiting;
-
-
- // any entity in obs.oi.watchers MUST be in either watchers or unconnected_watchers.
- map<pair<uint64_t, entity_name_t>, WatchRef> watchers;
-
- // attr cache
- map<string, bufferlist> attr_cache;
-
- struct RWState {
- enum State {
- RWNONE,
- RWREAD,
- RWWRITE,
- RWEXCL,
- };
- static const char *get_state_name(State s) {
- switch (s) {
- case RWNONE: return "none";
- case RWREAD: return "read";
- case RWWRITE: return "write";
- case RWEXCL: return "excl";
- default: return "???";
- }
- }
- const char *get_state_name() const {
- return get_state_name(state);
- }
-
- list<OpRequestRef> waiters; ///< ops waiting on state change
- int count; ///< number of readers or writers
-
- State state:4; ///< rw state
- /// if set, restart backfill when we can get a read lock
- bool recovery_read_marker:1;
- /// if set, requeue snaptrim on lock release
- bool snaptrimmer_write_marker:1;
-
- RWState()
- : count(0),
- state(RWNONE),
- recovery_read_marker(false),
- snaptrimmer_write_marker(false)
- {}
- bool get_read(OpRequestRef op) {
- if (get_read_lock()) {
- return true;
- } // else
- waiters.push_back(op);
- return false;
- }
- /// this function adjusts the counts if necessary
- bool get_read_lock() {
- // don't starve anybody!
- if (!waiters.empty()) {
- return false;
- }
- switch (state) {
- case RWNONE:
- assert(count == 0);
- state = RWREAD;
- // fall through
- case RWREAD:
- count++;
- return true;
- case RWWRITE:
- return false;
- case RWEXCL:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
-
- bool get_write(OpRequestRef op, bool greedy=false) {
- if (get_write_lock(greedy)) {
- return true;
- } // else
- if (op)
- waiters.push_back(op);
- return false;
- }
- bool get_write_lock(bool greedy=false) {
- if (!greedy) {
- // don't starve anybody!
- if (!waiters.empty() ||
- recovery_read_marker) {
- return false;
- }
- }
- switch (state) {
- case RWNONE:
- assert(count == 0);
- state = RWWRITE;
- // fall through
- case RWWRITE:
- count++;
- return true;
- case RWREAD:
- return false;
- case RWEXCL:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
- bool get_excl_lock() {
- switch (state) {
- case RWNONE:
- assert(count == 0);
- state = RWEXCL;
- count = 1;
- return true;
- case RWWRITE:
- return false;
- case RWREAD:
- return false;
- case RWEXCL:
- return false;
- default:
- assert(0 == "unhandled case");
- return false;
- }
- }
- bool get_excl(OpRequestRef op) {
- if (get_excl_lock()) {
- return true;
- } // else
- if (op)
- waiters.push_back(op);
- return false;
- }
- /// same as get_write_lock, but ignore starvation
- bool take_write_lock() {
- if (state == RWWRITE) {
- count++;
- return true;
- }
- return get_write_lock();
- }
- void dec(list<OpRequestRef> *requeue) {
- assert(count > 0);
- assert(requeue);
- count--;
- if (count == 0) {
- state = RWNONE;
- requeue->splice(requeue->end(), waiters);
- }
- }
- void put_read(list<OpRequestRef> *requeue) {
- assert(state == RWREAD);
- dec(requeue);
- }
- void put_write(list<OpRequestRef> *requeue) {
- assert(state == RWWRITE);
- dec(requeue);
- }
- void put_excl(list<OpRequestRef> *requeue) {
- assert(state == RWEXCL);
- dec(requeue);
- }
- bool empty() const { return state == RWNONE; }
- } rwstate;
-
- bool get_read(OpRequestRef op) {
- return rwstate.get_read(op);
- }
- bool get_write(OpRequestRef op) {
- return rwstate.get_write(op, false);
- }
- bool get_excl(OpRequestRef op) {
- return rwstate.get_excl(op);
- }
- bool get_lock_type(OpRequestRef op, RWState::State type) {
- switch (type) {
- case RWState::RWWRITE:
- return get_write(op);
- case RWState::RWREAD:
- return get_read(op);
- case RWState::RWEXCL:
- return get_excl(op);
- default:
- assert(0 == "invalid lock type");
- return true;
- }
- }
- bool get_write_greedy(OpRequestRef op) {
- return rwstate.get_write(op, true);
- }
- bool get_snaptrimmer_write(bool mark_if_unsuccessful) {
- if (rwstate.get_write_lock()) {
- return true;
- } else {
- if (mark_if_unsuccessful)
- rwstate.snaptrimmer_write_marker = true;
- return false;
- }
- }
- bool get_recovery_read() {
- rwstate.recovery_read_marker = true;
- if (rwstate.get_read_lock()) {
- return true;
- }
- return false;
- }
- bool try_get_read_lock() {
- return rwstate.get_read_lock();
- }
- void drop_recovery_read(list<OpRequestRef> *ls) {
- assert(rwstate.recovery_read_marker);
- rwstate.put_read(ls);
- rwstate.recovery_read_marker = false;
- }
- void put_read(list<OpRequestRef> *to_wake) {
- rwstate.put_read(to_wake);
- }
- void put_excl(list<OpRequestRef> *to_wake,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- rwstate.put_excl(to_wake);
- if (rwstate.empty() && rwstate.recovery_read_marker) {
- rwstate.recovery_read_marker = false;
- *requeue_recovery = true;
- }
- if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
- rwstate.snaptrimmer_write_marker = false;
- *requeue_snaptrimmer = true;
- }
- }
- void put_write(list<OpRequestRef> *to_wake,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- rwstate.put_write(to_wake);
- if (rwstate.empty() && rwstate.recovery_read_marker) {
- rwstate.recovery_read_marker = false;
- *requeue_recovery = true;
- }
- if (rwstate.empty() && rwstate.snaptrimmer_write_marker) {
- rwstate.snaptrimmer_write_marker = false;
- *requeue_snaptrimmer = true;
- }
- }
- void put_lock_type(
- ObjectContext::RWState::State type,
- list<OpRequestRef> *to_wake,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- switch (type) {
- case ObjectContext::RWState::RWWRITE:
- return put_write(to_wake, requeue_recovery, requeue_snaptrimmer);
- case ObjectContext::RWState::RWREAD:
- return put_read(to_wake);
- case ObjectContext::RWState::RWEXCL:
- return put_excl(to_wake, requeue_recovery, requeue_snaptrimmer);
- default:
- assert(0 == "invalid lock type");
- }
- }
- bool is_request_pending() {
- return (rwstate.count > 0);
- }
-
- ObjectContext()
- : ssc(NULL),
- destructor_callback(0),
- lock("PrimaryLogPG::ObjectContext::lock"),
- unstable_writes(0), readers(0), writers_waiting(0), readers_waiting(0),
- blocked(false), requeue_scrub_on_unblock(false) {}
-
- ~ObjectContext() {
- assert(rwstate.empty());
- if (destructor_callback)
- destructor_callback->complete(0);
- }
-
- void start_block() {
- assert(!blocked);
- blocked = true;
- }
- void stop_block() {
- assert(blocked);
- blocked = false;
- }
- bool is_blocked() const {
- return blocked;
- }
-
- // do simple synchronous mutual exclusion, for now. no waitqueues or anything fancy.
- void ondisk_write_lock() {
- lock.Lock();
- writers_waiting++;
- while (readers_waiting || readers)
- cond.Wait(lock);
- writers_waiting--;
- unstable_writes++;
- lock.Unlock();
- }
- void ondisk_write_unlock() {
- lock.Lock();
- assert(unstable_writes > 0);
- unstable_writes--;
- if (!unstable_writes && readers_waiting)
- cond.Signal();
- lock.Unlock();
- }
- void ondisk_read_lock() {
- lock.Lock();
- readers_waiting++;
- while (unstable_writes)
- cond.Wait(lock);
- readers_waiting--;
- readers++;
- lock.Unlock();
- }
- void ondisk_read_unlock() {
- lock.Lock();
- assert(readers > 0);
- readers--;
- if (!readers && writers_waiting)
- cond.Signal();
- lock.Unlock();
- }
-
- /// in-progress copyfrom ops for this object
- bool blocked:1;
- bool requeue_scrub_on_unblock:1; // true if we need to requeue scrub on unblock
-
-};
-
-inline ostream& operator<<(ostream& out, const ObjectState& obs)
-{
- out << obs.oi.soid;
- if (!obs.exists)
- out << "(dne)";
- return out;
-}
-
-inline ostream& operator<<(ostream& out, const ObjectContext::RWState& rw)
-{
- return out << "rwstate(" << rw.get_state_name()
- << " n=" << rw.count
- << " w=" << rw.waiters.size()
- << ")";
-}
-
-inline ostream& operator<<(ostream& out, const ObjectContext& obc)
-{
- return out << "obc(" << obc.obs << " " << obc.rwstate << ")";
-}
ostream& operator<<(ostream& out, const object_info_t& oi);
-class ObcLockManager {
- struct ObjectLockState {
- ObjectContextRef obc;
- ObjectContext::RWState::State type;
- ObjectLockState(
- ObjectContextRef obc,
- ObjectContext::RWState::State type)
- : obc(obc), type(type) {}
- };
- map<hobject_t, ObjectLockState> locks;
-public:
- ObcLockManager() = default;
- ObcLockManager(ObcLockManager &&) = default;
- ObcLockManager(const ObcLockManager &) = delete;
- ObcLockManager &operator=(ObcLockManager &&) = default;
- bool empty() const {
- return locks.empty();
- }
- bool get_lock_type(
- ObjectContext::RWState::State type,
- const hobject_t &hoid,
- ObjectContextRef obc,
- OpRequestRef op) {
- assert(locks.find(hoid) == locks.end());
- if (obc->get_lock_type(op, type)) {
- locks.insert(make_pair(hoid, ObjectLockState(obc, type)));
- return true;
- } else {
- return false;
- }
- }
- /// Get write lock, ignore starvation
- bool take_write_lock(
- const hobject_t &hoid,
- ObjectContextRef obc) {
- assert(locks.find(hoid) == locks.end());
- if (obc->rwstate.take_write_lock()) {
- locks.insert(
- make_pair(
- hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
- return true;
- } else {
- return false;
- }
- }
- /// Get write lock for snap trim
- bool get_snaptrimmer_write(
- const hobject_t &hoid,
- ObjectContextRef obc,
- bool mark_if_unsuccessful) {
- assert(locks.find(hoid) == locks.end());
- if (obc->get_snaptrimmer_write(mark_if_unsuccessful)) {
- locks.insert(
- make_pair(
- hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
- return true;
- } else {
- return false;
- }
- }
- /// Get write lock greedy
- bool get_write_greedy(
- const hobject_t &hoid,
- ObjectContextRef obc,
- OpRequestRef op) {
- assert(locks.find(hoid) == locks.end());
- if (obc->get_write_greedy(op)) {
- locks.insert(
- make_pair(
- hoid, ObjectLockState(obc, ObjectContext::RWState::RWWRITE)));
- return true;
- } else {
- return false;
- }
- }
-
- /// try get read lock
- bool try_get_read_lock(
- const hobject_t &hoid,
- ObjectContextRef obc) {
- assert(locks.find(hoid) == locks.end());
- if (obc->try_get_read_lock()) {
- locks.insert(
- make_pair(
- hoid,
- ObjectLockState(obc, ObjectContext::RWState::RWREAD)));
- return true;
- } else {
- return false;
- }
- }
-
- void put_locks(
- list<pair<hobject_t, list<OpRequestRef> > > *to_requeue,
- bool *requeue_recovery,
- bool *requeue_snaptrimmer) {
- for (auto p: locks) {
- list<OpRequestRef> _to_requeue;
- p.second.obc->put_lock_type(
- p.second.type,
- &_to_requeue,
- requeue_recovery,
- requeue_snaptrimmer);
- if (to_requeue) {
- to_requeue->push_back(
- make_pair(
- p.second.obc->obs.oi.soid,
- std::move(_to_requeue)));
- }
- }
- locks.clear();
- }
- ~ObcLockManager() {
- assert(locks.empty());
- }
-};
-
// Object recovery
diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc
index ad578a81d1b..d83bcf17b50 100644
--- a/src/osdc/Objecter.cc
+++ b/src/osdc/Objecter.cc
@@ -857,7 +857,7 @@ void Objecter::_linger_submit(LingerOp *info, shunique_lock& sul)
// Populate Op::target
OSDSession *s = NULL;
- _calc_target(&info->target);
+ _calc_target(&info->target, nullptr);
// Create LingerOp<->OSDSession relation
int r = _get_session(info->target.osd, &s, sul);
@@ -1069,7 +1069,8 @@ void Objecter::_scan_requests(OSDSession *s,
if (pool_full_map)
force_resend_writes = force_resend_writes ||
(*pool_full_map)[op->target.base_oloc.pool];
- int r = _calc_target(&op->target);
+ int r = _calc_target(&op->target,
+ op->session ? op->session->con.get() : nullptr);
switch (r) {
case RECALC_OP_TARGET_NO_ACTION:
if (!force_resend &&
@@ -1297,7 +1298,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
p != need_resend_linger.end(); ++p) {
LingerOp *op = *p;
if (!op->session) {
- _calc_target(&op->target);
+ _calc_target(&op->target, nullptr);
OSDSession *s = NULL;
int const r = _get_session(op->target.osd, &s, sul);
assert(r == 0);
@@ -2292,7 +2293,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
assert(op->session == NULL);
OSDSession *s = NULL;
- bool check_for_latest_map = _calc_target(&op->target)
+ bool check_for_latest_map = _calc_target(&op->target, nullptr)
== RECALC_OP_TARGET_POOL_DNE;
// Try to get a session, including a retry if we need to take write lock
@@ -2309,7 +2310,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
// map changed; recalculate mapping
ldout(cct, 10) << __func__ << " relock raced with osdmap, recalc target"
<< dendl;
- check_for_latest_map = _calc_target(&op->target)
+ check_for_latest_map = _calc_target(&op->target, nullptr)
== RECALC_OP_TARGET_POOL_DNE;
if (s) {
put_session(s);
@@ -2655,21 +2656,27 @@ int64_t Objecter::get_object_pg_hash_position(int64_t pool, const string& key,
return p->raw_hash_to_pg(p->hash_key(key, ns));
}
-int Objecter::_calc_target(op_target_t *t, bool any_change)
+int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
{
// rwlock is locked
-
bool is_read = t->flags & CEPH_OSD_FLAG_READ;
bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
+ ldout(cct,20) << __func__ << " base " << t->base_oid << " " << t->base_oloc
+ << " precalc_pgid " << (int)t->precalc_pgid
+ << " pgid " << t->base_pgid
+ << (is_read ? " is_read" : "")
+ << (is_write ? " is_write" : "")
+ << dendl;
const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
if (!pi) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
+ ldout(cct,30) << __func__ << " base pi " << pi
+ << " pg_num " << pi->get_pg_num() << dendl;
bool force_resend = false;
- bool need_check_tiering = false;
if (osdmap->get_epoch() == pi->last_force_op_resend) {
if (t->last_force_resend < pi->last_force_op_resend) {
t->last_force_resend = pi->last_force_op_resend;
@@ -2678,41 +2685,28 @@ int Objecter::_calc_target(op_target_t *t, bool any_change)
force_resend = true;
}
}
- if (t->target_oid.name.empty() || force_resend) {
- t->target_oid = t->base_oid;
- need_check_tiering = true;
- }
- if (t->target_oloc.empty() || force_resend) {
- t->target_oloc = t->base_oloc;
- need_check_tiering = true;
- }
- if (need_check_tiering &&
- (t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+ // apply tiering
+ t->target_oid = t->base_oid;
+ t->target_oloc = t->base_oloc;
+ if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
if (is_read && pi->has_read_tier())
t->target_oloc.pool = pi->read_tier;
if (is_write && pi->has_write_tier())
t->target_oloc.pool = pi->write_tier;
+ pi = osdmap->get_pg_pool(t->target_oloc.pool);
+ if (!pi) {
+ t->osd = -1;
+ return RECALC_OP_TARGET_POOL_DNE;
+ }
}
pg_t pgid;
if (t->precalc_pgid) {
- assert(t->base_oid.name.empty()); // make sure this is a listing op
- ldout(cct, 10) << __func__ << " have " << t->base_pgid << " pool "
- << osdmap->have_pg_pool(t->base_pgid.pool()) << dendl;
- if (!osdmap->have_pg_pool(t->base_pgid.pool())) {
- t->osd = -1;
- return RECALC_OP_TARGET_POOL_DNE;
- }
- if (osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE)) {
- // if the SORTBITWISE flag is set, we know all OSDs are running
- // jewel+.
- pgid = t->base_pgid;
- } else {
- // legacy behavior. pre-jewel OSDs will fail if we send a
- // full-hash pgid value.
- pgid = osdmap->raw_pg_to_pg(t->base_pgid);
- }
+ assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
+ assert(t->base_oid.name.empty()); // make sure this is a pg op
+ assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
+ pgid = t->base_pgid;
} else {
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
pgid);
@@ -2721,6 +2715,10 @@ int Objecter::_calc_target(op_target_t *t, bool any_change)
return RECALC_OP_TARGET_POOL_DNE;
}
}
+ ldout(cct,20) << __func__ << " target " << t->target_oid << " "
+ << t->target_oloc << " -> pgid " << pgid << dendl;
+ ldout(cct,30) << __func__ << " target pi " << pi
+ << " pg_num " << pi->get_pg_num() << dendl;
int size = pi->size;
int min_size = pi->min_size;
@@ -2731,6 +2729,7 @@ int Objecter::_calc_target(op_target_t *t, bool any_change)
&acting, &acting_primary);
bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
+ pg_t prev_pgid(prev_seed, pgid.pool());
if (any_change && pg_interval_t::is_new_interval(
t->acting_primary,
acting_primary,
@@ -2748,7 +2747,7 @@ int Objecter::_calc_target(op_target_t *t, bool any_change)
pg_num,
t->sort_bitwise,
sort_bitwise,
- pg_t(prev_seed, pgid.pool(), pgid.preferred()))) {
+ prev_pgid)) {
force_resend = true;
}
@@ -2759,11 +2758,12 @@ int Objecter::_calc_target(op_target_t *t, bool any_change)
t->paused = false;
need_resend = true;
}
-
if (t->pgid != pgid ||
is_pg_changed(
t->acting_primary, t->acting, acting_primary, acting,
t->used_replica || any_change) ||
+ (con && con->has_features(CEPH_FEATUREMASK_RESEND_ON_SPLIT) &&
+ prev_pgid.is_split(t->pg_num, pg_num, nullptr)) ||
force_resend) {
t->pgid = pgid;
t->acting = acting;
@@ -2774,9 +2774,14 @@ int Objecter::_calc_target(op_target_t *t, bool any_change)
t->min_size = min_size;
t->pg_num = pg_num;
t->pg_num_mask = pi->get_pg_num_mask();
+ osdmap->get_primary_shard(
+ pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
+ &t->actual_pgid);
t->sort_bitwise = sort_bitwise;
ldout(cct, 10) << __func__ << " "
- << " pgid " << pgid << " acting " << acting << dendl;
+ << " raw pgid " << pgid << " -> actual " << t->actual_pgid
+ << " acting " << acting
+ << " primary " << acting_primary << dendl;
t->used_replica = false;
if (acting_primary == -1) {
t->osd = -1;
@@ -2830,7 +2835,7 @@ int Objecter::_calc_target(op_target_t *t, bool any_change)
int Objecter::_map_session(op_target_t *target, OSDSession **s,
shunique_lock& sul)
{
- _calc_target(target);
+ _calc_target(target, nullptr);
return _get_session(target->osd, s, sul);
}
@@ -2939,8 +2944,7 @@ int Objecter::_recalc_linger_op_target(LingerOp *linger_op,
{
// rwlock is locked unique
- int r = _calc_target(&linger_op->target,
- true);
+ int r = _calc_target(&linger_op->target, nullptr, true);
if (r == RECALC_OP_TARGET_NEED_RESEND) {
ldout(cct, 10) << "recalc_linger_op_target tid " << linger_op->linger_id
<< " pgid " << linger_op->target.pgid
@@ -3033,9 +3037,9 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
op->target.paused = false;
op->stamp = ceph::mono_clock::now();
+ hobject_t hobj = op->target.get_hobj();
MOSDOp *m = new MOSDOp(client_inc.read(), op->tid,
- op->target.target_oid, op->target.target_oloc,
- op->target.pgid,
+ hobj, op->target.actual_pgid,
osdmap->get_epoch(),
flags, op->features);
@@ -3047,9 +3051,6 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
m->set_mtime(op->mtime);
m->set_retry_attempt(op->attempts++);
- if (op->replay_version != eversion_t())
- m->set_version(op->replay_version); // we're replaying this op!
-
if (op->priority)
m->set_priority(op->priority);
else
@@ -3072,21 +3073,25 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
// backoff?
hobject_t hoid = op->target.get_hobj();
- auto q = op->session->backoffs.lower_bound(hoid);
- if (q != op->session->backoffs.begin()) {
- --q;
- if (hoid >= q->second.end) {
- ++q;
+ auto p = op->session->backoffs.find(op->target.actual_pgid);
+ if (p != op->session->backoffs.end()) {
+ auto q = p->second.lower_bound(hoid);
+ if (q != p->second.begin()) {
+ --q;
+ if (hoid >= q->second.end) {
+ ++q;
+ }
}
- }
- if (q != op->session->backoffs.end()) {
- ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
- << "," << q->second.end << ")" << dendl;
- int r = cmp(hoid, q->second.begin);
- if (r == 0 || (r > 0 && hoid < q->second.end)) {
- ldout(cct, 10) << __func__ << " backoff on " << hoid << ", queuing "
- << op << " tid " << op->tid << dendl;
- return;
+ if (q != p->second.end()) {
+ ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
+ << "," << q->second.end << ")" << dendl;
+ int r = cmp(hoid, q->second.begin);
+ if (r == 0 || (r > 0 && hoid < q->second.end)) {
+ ldout(cct, 10) << __func__ << " backoff " << op->target.actual_pgid
+ << " id " << q->second.id << " on " << hoid
+ << ", queuing " << op << " tid " << op->tid << dendl;
+ return;
+ }
}
}
@@ -3095,7 +3100,16 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
m = _prepare_osd_op(op);
}
- ldout(cct, 15) << "_send_op " << op->tid << " to osd." << op->session->osd
+ if (op->target.actual_pgid != m->get_spg()) {
+ ldout(cct, 10) << __func__ << " " << op->tid << " pgid change from "
+ << m->get_spg() << " to " << op->target.actual_pgid
+ << ", updating and reencoding" << dendl;
+ m->set_spg(op->target.actual_pgid);
+ m->clear_payload(); // reencode
+ }
+
+ ldout(cct, 15) << "_send_op " << op->tid << " to "
+ << op->target.actual_pgid << " on osd." << op->session->osd
<< dendl;
ConnectionRef con = op->session->con;
@@ -3223,8 +3237,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
ldout(cct, 7) << "handle_osd_op_reply " << tid
<< (m->is_ondisk() ? " ondisk" :
(m->is_onnvram() ? " onnvram" : " ack"))
- << " v " << m->get_replay_version() << " uv "
- << m->get_user_version()
+ << " uv " << m->get_user_version()
<< " in " << m->get_pg()
<< " attempt " << m->get_retry_attempt()
<< dendl;
@@ -3414,16 +3427,19 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
case CEPH_OSD_BACKOFF_OP_BLOCK:
{
// register
- OSDBackoff& b = s->backoffs[m->begin];
+ OSDBackoff& b = s->backoffs[m->pgid][m->begin];
s->backoffs_by_id.insert(make_pair(m->id, &b));
+ b.pgid = m->pgid;
b.id = m->id;
b.begin = m->begin;
b.end = m->end;
- // ack
- Message *r = new MOSDBackoff(CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
- m->id, m->begin, m->end,
- osdmap->get_epoch());
+ // ack with original backoff's epoch so that the osd can discard this if
+ // there was a pg split.
+ Message *r = new MOSDBackoff(m->pgid,
+ m->map_epoch,
+ CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
+ m->id, m->begin, m->end);
// this priority must match the MOSDOps from _prepare_osd_op
r->set_priority(cct->_conf->osd_client_op_priority);
con->send_message(r);
@@ -3433,31 +3449,43 @@ void Objecter::handle_osd_backoff(MOSDBackoff *m)
case CEPH_OSD_BACKOFF_OP_UNBLOCK:
{
auto p = s->backoffs_by_id.find(m->id);
- while (p != s->backoffs_by_id.end() &&
- p->second->id == m->id) {
+ if (p != s->backoffs_by_id.end()) {
OSDBackoff *b = p->second;
if (b->begin != m->begin &&
b->end != m->end) {
- lderr(cct) << __func__ << " got id " << m->id << " unblock on ["
+ lderr(cct) << __func__ << " got " << m->pgid << " id " << m->id
+ << " unblock on ["
<< m->begin << "," << m->end << ") but backoff is ["
<< b->begin << "," << b->end << ")" << dendl;
// hrmpf, unblock it anyway.
}
- ldout(cct, 10) << __func__ << " unblock backoff " << b->id
+ ldout(cct, 10) << __func__ << " unblock backoff " << b->pgid
+ << " id " << b->id
<< " [" << b->begin << "," << b->end
<< ")" << dendl;
- s->backoffs.erase(b->begin);
- p = s->backoffs_by_id.erase(p);
+ auto spgp = s->backoffs.find(b->pgid);
+ assert(spgp != s->backoffs.end());
+ spgp->second.erase(b->begin);
+ if (spgp->second.empty()) {
+ s->backoffs.erase(spgp);
+ }
+ s->backoffs_by_id.erase(p);
// check for any ops to resend
for (auto& q : s->ops) {
- int r = q.second->target.contained_by(m->begin, m->end);
- ldout(cct, 20) << __func__ << " contained_by " << r << " on "
- << q.second->target.get_hobj() << dendl;
- if (r) {
- _send_op(q.second);
+ if (q.second->target.actual_pgid == m->pgid) {
+ int r = q.second->target.contained_by(m->begin, m->end);
+ ldout(cct, 20) << __func__ << " contained_by " << r << " on "
+ << q.second->target.get_hobj() << dendl;
+ if (r) {
+ _send_op(q.second);
+ }
}
}
+ } else {
+ lderr(cct) << __func__ << " " << m->pgid << " id " << m->id
+ << " unblock on ["
+ << m->begin << "," << m->end << ") but backoff dne" << dendl;
}
}
break;
@@ -4629,6 +4657,9 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
c->map_check_error = 0;
+ // ignore overlays, just like we do with pg ops
+ c->target.flags |= CEPH_OSD_FLAG_IGNORE_OVERLAY;
+
if (c->target_osd >= 0) {
if (!osdmap->exists(c->target_osd)) {
c->map_check_error = -ENOENT;
@@ -4642,7 +4673,7 @@ int Objecter::_calc_command_target(CommandOp *c, shunique_lock& sul)
}
c->target.osd = c->target_osd;
} else {
- int ret = _calc_target(&(c->target), true);
+ int ret = _calc_target(&(c->target), nullptr, true);
if (ret == RECALC_OP_TARGET_POOL_DNE) {
c->map_check_error = -ENOENT;
c->map_check_error_str = "pool dne";
diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h
index 8a0a45c3cd7..fc8dce0304c 100644
--- a/src/osdc/Objecter.h
+++ b/src/osdc/Objecter.h
@@ -1193,7 +1193,8 @@ public:
///< explcit pg target, if any
pg_t base_pgid;
- pg_t pgid; ///< last pg we mapped to
+ pg_t pgid; ///< last (raw) pg we mapped to
+ spg_t actual_pgid; ///< last (actual) spg_t we mapped to
unsigned pg_num = 0; ///< last pg_num we mapped to
unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
vector<int> up; ///< set of up osds for last pg we mapped to
@@ -1268,7 +1269,6 @@ public:
uint64_t ontimeout;
ceph_tid_t tid;
- eversion_t replay_version; // for op replay
int attempts;
version_t *objver;
@@ -1708,6 +1708,7 @@ public:
// -- osd sessions --
struct OSDBackoff {
+ spg_t pgid;
uint64_t id;
hobject_t begin, end;
};
@@ -1725,8 +1726,8 @@ public:
map<ceph_tid_t,CommandOp*> command_ops;
// backoffs
- map<hobject_t,OSDBackoff> backoffs;
- multimap<uint64_t,OSDBackoff*> backoffs_by_id;
+ map<spg_t,map<hobject_t,OSDBackoff>> backoffs;
+ map<uint64_t,OSDBackoff*> backoffs_by_id;
int osd;
int incarnation;
@@ -1810,7 +1811,7 @@ public:
bool _osdmap_has_pool_full() const;
bool target_should_be_paused(op_target_t *op);
- int _calc_target(op_target_t *t,
+ int _calc_target(op_target_t *t, Connection *con,
bool any_change = false);
int _map_session(op_target_t *op, OSDSession **s,
shunique_lock& lc);
@@ -2192,7 +2193,9 @@ public:
Context *onack, epoch_t *reply_epoch,
int *ctx_budget) {
Op *o = new Op(object_t(), oloc,
- op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ,
+ op.ops,
+ flags | global_op_flags.read() | CEPH_OSD_FLAG_READ |
+ CEPH_OSD_FLAG_IGNORE_OVERLAY,
onack, NULL);
o->target.precalc_pgid = true;
o->target.base_pgid = pg_t(hash, oloc.pool);
diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h
index c0d43b9d46a..3616f88233e 100644
--- a/src/test/encoding/types.h
+++ b/src/test/encoding/types.h
@@ -558,8 +558,6 @@ MESSAGE(MOSDPGCreate)
MESSAGE(MOSDPGInfo)
#include "messages/MOSDPGLog.h"
MESSAGE(MOSDPGLog)
-#include "messages/MOSDPGMissing.h"
-MESSAGE(MOSDPGMissing)
#include "messages/MOSDPGNotify.h"
MESSAGE(MOSDPGNotify)
#include "messages/MOSDPGQuery.h"
diff --git a/src/test/msgr/perf_msgr_client.cc b/src/test/msgr/perf_msgr_client.cc
index f71682ce340..d47238c01a0 100644
--- a/src/test/msgr/perf_msgr_client.cc
+++ b/src/test/msgr/perf_msgr_client.cc
@@ -95,7 +95,10 @@ class MessengerClient {
if (inflight > uint64_t(concurrent)) {
cond.Wait(lock);
}
- MOSDOp *m = new MOSDOp(client_inc.read(), 0, oid, oloc, pgid, 0, 0, 0);
+ hobject_t hobj(oid, oloc.key, CEPH_NOSNAP, pgid.ps(), pgid.pool(),
+ oloc.nspace);
+ spg_t spgid(pgid);
+ MOSDOp *m = new MOSDOp(client_inc.read(), 0, hobj, spgid, 0, 0, 0);
m->write(0, msg_len, data);
inflight++;
conn->send_message(m);