diff options
-rw-r--r-- | src/osd/OSD.cc | 10 | ||||
-rw-r--r-- | src/osd/OSD.h | 4 | ||||
-rw-r--r-- | src/osd/PG.cc | 6 | ||||
-rw-r--r-- | src/osd/PG.h | 14 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.cc | 28 | ||||
-rw-r--r-- | src/osd/ReplicatedPG.h | 16 |
6 files changed, 56 insertions, 22 deletions
diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 1d51e02ad43..987f8354c80 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -6664,7 +6664,7 @@ void OSD::do_recovery(PG *pg, ThreadPool::TPHandle &handle) #endif PG::RecoveryCtx rctx = create_context(); - int started = pg->start_recovery_ops(max, &rctx); + int started = pg->start_recovery_ops(max, &rctx, handle); dout(10) << "do_recovery started " << started << "/" << max << " on " << *pg << dendl; /* @@ -7052,7 +7052,7 @@ void OSD::OpWQ::_process(PGRef pg, ThreadPool::TPHandle &handle) if (!(pg_for_processing[&*pg].size())) pg_for_processing.erase(&*pg); } - osd->dequeue_op(pg, op); + osd->dequeue_op(pg, op, handle); pg->unlock(); } @@ -7065,7 +7065,9 @@ void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued) /* * NOTE: dequeue called in worker thread, with pg lock */ -void OSD::dequeue_op(PGRef pg, OpRequestRef op) +void OSD::dequeue_op( + PGRef pg, OpRequestRef op, + ThreadPool::TPHandle &handle) { utime_t latency = ceph_clock_now(g_ceph_context) - op->request->get_recv_stamp(); dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority() @@ -7078,7 +7080,7 @@ void OSD::dequeue_op(PGRef pg, OpRequestRef op) op->mark_reached_pg(); - pg->do_request(op); + pg->do_request(op, handle); // finish dout(10) << "dequeue_op " << op << " finish" << dendl; diff --git a/src/osd/OSD.h b/src/osd/OSD.h index ae77644eeeb..82a251d9a80 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -915,7 +915,9 @@ private: } op_wq; void enqueue_op(PG *pg, OpRequestRef op); - void dequeue_op(PGRef pg, OpRequestRef op); + void dequeue_op( + PGRef pg, OpRequestRef op, + ThreadPool::TPHandle &handle); // -- peering queue -- struct PeeringWQ : public ThreadPool::BatchWorkQueue<PG> { diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 63e760e3b21..8e78eaa7a16 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1397,7 +1397,9 @@ void PG::queue_op(OpRequestRef op) osd->op_wq.queue(make_pair(PGRef(this), op)); } -void PG::do_request(OpRequestRef op) +void PG::do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle) { // do any pending flush do_pending_flush(); @@ -1435,7 +1437,7 @@ void PG::do_request(OpRequestRef op) break; case MSG_OSD_PG_SCAN: - do_scan(op); + do_scan(op, handle); break; case MSG_OSD_PG_BACKFILL: diff --git a/src/osd/PG.h b/src/osd/PG.h index 8f572c75e19..d4679ce4fd8 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -645,7 +645,9 @@ public: virtual void check_local() = 0; - virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0; + virtual int start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle) = 0; void purge_strays(); @@ -1804,12 +1806,18 @@ public: // abstract bits - void do_request(OpRequestRef op); + void do_request( + OpRequestRef op, + ThreadPool::TPHandle &handle + ); virtual void do_op(OpRequestRef op) = 0; virtual void do_sub_op(OpRequestRef op) = 0; virtual void do_sub_op_reply(OpRequestRef op) = 0; - virtual void do_scan(OpRequestRef op) = 0; + virtual void do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle + ) = 0; virtual void do_backfill(OpRequestRef op) = 0; virtual void do_push(OpRequestRef op) = 0; virtual void do_pull(OpRequestRef op) = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 658ea7cb746..73ac2807404 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1252,7 +1252,9 @@ void ReplicatedPG::do_sub_op_reply(OpRequestRef op) sub_op_modify_reply(op); } -void ReplicatedPG::do_scan(OpRequestRef op) +void ReplicatedPG::do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle) { MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request); assert(m->get_header().type == MSG_OSD_PG_SCAN); @@ -1278,7 +1280,9 @@ void ReplicatedPG::do_scan(OpRequestRef op) BackfillInterval bi; osr->flush(); - scan_range(m->begin, g_conf->osd_backfill_scan_min, g_conf->osd_backfill_scan_max, &bi); + scan_range( + m->begin, g_conf->osd_backfill_scan_min, + g_conf->osd_backfill_scan_max, &bi, handle); MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST, get_osdmap()->get_epoch(), m->query_epoch, info.pgid, bi.begin, bi.end); @@ -6875,7 +6879,9 @@ void ReplicatedPG::check_recovery_sources(const OSDMapRef osdmap) } -int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) +int ReplicatedPG::start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle) { int started = 0; assert(is_primary()); @@ -6931,7 +6937,7 @@ int ReplicatedPG::start_recovery_ops(int max, RecoveryCtx *prctx) } deferred_backfill = true; } else { - started += recover_backfill(max - started); + started += recover_backfill(max - started, handle); } } @@ -7275,7 +7281,9 @@ int ReplicatedPG::recover_replicas(int max) * peer_info[backfill_target].last_backfill = MIN(peer_backfill_info.begin, * backfill_info.begin, backfills_in_flight) */ -int ReplicatedPG::recover_backfill(int max) +int ReplicatedPG::recover_backfill( + int max, + ThreadPool::TPHandle &handle) { dout(10) << "recover_backfill (" << max << ")" << dendl; assert(backfill_target >= 0); @@ -7305,7 +7313,7 @@ int ReplicatedPG::recover_backfill(int max) dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl; backfill_info.clear(); osr->flush(); - scan_range(backfill_pos, local_min, local_max, &backfill_info); + scan_range(backfill_pos, local_min, local_max, &backfill_info, handle); int ops = 0; map<hobject_t, pair<eversion_t, eversion_t> > to_push; @@ -7319,7 +7327,8 @@ int ReplicatedPG::recover_backfill(int max) if (backfill_info.begin <= pbi.begin && !backfill_info.extends_to_end() && backfill_info.empty()) { osr->flush(); - scan_range(backfill_info.end, local_min, local_max, &backfill_info); + scan_range(backfill_info.end, local_min, local_max, &backfill_info, + handle); backfill_info.trim(); } backfill_pos = backfill_info.begin > pbi.begin ? pbi.begin : backfill_info.begin; @@ -7480,7 +7489,9 @@ void ReplicatedPG::prep_backfill_object_push( put_object_context(obc); } -void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterval *bi) +void ReplicatedPG::scan_range( + hobject_t begin, int min, int max, BackfillInterval *bi, + ThreadPool::TPHandle &handle) { assert(is_locked()); dout(10) << "scan_range from " << begin << dendl; @@ -7496,6 +7507,7 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva dout(20) << ls << dendl; for (vector<hobject_t>::iterator p = ls.begin(); p != ls.end(); ++p) { + handle.reset_tp_timeout(); ObjectContext *obc = NULL; if (is_primary()) obc = _lookup_object_context(*p); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 7b70b4381ea..ac6694555f5 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -759,10 +759,13 @@ protected: void _clear_recovery_state(); void queue_for_recovery(); - int start_recovery_ops(int max, RecoveryCtx *prctx); + int start_recovery_ops( + int max, RecoveryCtx *prctx, + ThreadPool::TPHandle &handle); + int recover_primary(int max); int recover_replicas(int max); - int recover_backfill(int max); + int recover_backfill(int max, ThreadPool::TPHandle &handle); /** * scan a (hash) range of objects in the current pg @@ -772,7 +775,10 @@ protected: * @max return no more than this many items * @bi [out] resulting map of objects to eversion_t's */ - void scan_range(hobject_t begin, int min, int max, BackfillInterval *bi); + void scan_range( + hobject_t begin, int min, int max, BackfillInterval *bi, + ThreadPool::TPHandle &handle + ); void prep_backfill_object_push( hobject_t oid, eversion_t v, eversion_t have, int peer, @@ -939,7 +945,9 @@ public: void do_pg_op(OpRequestRef op); void do_sub_op(OpRequestRef op); void do_sub_op_reply(OpRequestRef op); - void do_scan(OpRequestRef op); + void do_scan( + OpRequestRef op, + ThreadPool::TPHandle &handle); void do_backfill(OpRequestRef op); void _do_push(OpRequestRef op); void _do_pull_response(OpRequestRef op); |