summaryrefslogtreecommitdiffstats
path: root/src/osd/OSDMapMapping.cc
diff options
context:
space:
mode:
authorSage Weil <sage@redhat.com>2017-01-30 22:51:01 +0100
committerSage Weil <sage@redhat.com>2017-02-16 18:04:07 +0100
commite7bfc65b91bc096aafb5fdcc134cf27bc58e756e (patch)
tree81ed7abc4c8f99ca42e2ec61f29688775887b202 /src/osd/OSDMapMapping.cc
parentmon/OSDMonitor: prime_pg_temp based on OSDMapMapping (diff)
downloadceph-e7bfc65b91bc096aafb5fdcc134cf27bc58e756e.tar.xz
ceph-e7bfc65b91bc096aafb5fdcc134cf27bc58e756e.zip
osd/OSDMapMapping: add ParallelMapper
Calculate a mapping in parallel over a workqueue + threadpool. Signed-off-by: Sage Weil <sage@redhat.com>
Diffstat (limited to 'src/osd/OSDMapMapping.cc')
-rw-r--r--src/osd/OSDMapMapping.cc71
1 files changed, 69 insertions, 2 deletions
diff --git a/src/osd/OSDMapMapping.cc b/src/osd/OSDMapMapping.cc
index 7c022419d1f..7ba53b2e77f 100644
--- a/src/osd/OSDMapMapping.cc
+++ b/src/osd/OSDMapMapping.cc
@@ -4,6 +4,10 @@
#include "OSDMapMapping.h"
#include "OSDMap.h"
+#define dout_subsys ceph_subsys_mon
+
+#include "common/debug.h"
+
// ensure that we have a PoolMappings for each pool and that
// the dimensions (pg_num and size) match up.
void OSDMapMapping::_init_mappings(const OSDMap& osdmap)
@@ -34,14 +38,19 @@ void OSDMapMapping::_init_mappings(const OSDMap& osdmap)
void OSDMapMapping::update(const OSDMap& osdmap)
{
- _init_mappings(osdmap);
+ _start(osdmap);
for (auto& p : osdmap.get_pools()) {
_update_range(osdmap, p.first, 0, p.second.get_pg_num());
}
- _build_rmap(osdmap);
+ _finish(osdmap);
//_dump(); // for debugging
}
+void OSDMapMapping::update(const OSDMap& osdmap, pg_t pgid)
+{
+ _update_range(osdmap, pgid.pool(), pgid.ps(), pgid.ps() + 1);
+}
+
void OSDMapMapping::_build_rmap(const OSDMap& osdmap)
{
acting_rmap.resize(osdmap.get_max_osd());
@@ -69,6 +78,12 @@ void OSDMapMapping::_build_rmap(const OSDMap& osdmap)
}
}
+void OSDMapMapping::_finish(const OSDMap& osdmap)
+{
+ _build_rmap(osdmap);
+ epoch = osdmap.get_epoch();
+}
+
void OSDMapMapping::_dump()
{
for (auto& p : pools) {
@@ -101,3 +116,55 @@ void OSDMapMapping::_update_range(
std::move(acting), acting_primary);
}
}
+
+// ---------------------------
+
+void ParallelOSDMapper::Job::finish_one()
+{
+ Context *fin = nullptr;
+ {
+ Mutex::Locker l(lock);
+ if (--shards == 0) {
+ if (!aborted) {
+ mapping->_finish(*osdmap);
+ }
+ cond.Signal();
+ fin = onfinish;
+ onfinish = nullptr;
+ }
+ }
+ if (fin) {
+ fin->complete(0);
+ }
+}
+
+void ParallelOSDMapper::WQ::_process(
+ item *i,
+ ThreadPool::TPHandle &h)
+{
+ ldout(m->cct, 10) << __func__ << " " << i->osdmap << " " << i->pool << " ["
+ << i->begin << "," << i->end << ")" << dendl;
+ i->mapping->_update_range(*i->osdmap, i->pool, i->begin, i->end);
+ i->job->finish_one();
+ delete i;
+}
+
+std::unique_ptr<ParallelOSDMapper::Job> ParallelOSDMapper::queue(
+ const OSDMap& osdmap,
+ OSDMapMapping *mapping,
+ unsigned pgs_per_item)
+{
+ std::unique_ptr<Job> job(new Job(&osdmap, mapping));
+ mapping->_start(osdmap);
+ for (auto& p : osdmap.get_pools()) {
+ for (unsigned ps = 0; ps < p.second.get_pg_num(); ps += pgs_per_item) {
+ unsigned ps_end = MIN(ps + pgs_per_item, p.second.get_pg_num());
+ job->start_one();
+ wq.queue(new item(job.get(), &osdmap, mapping, p.first, ps, ps_end));
+ ldout(cct, 20) << __func__ << " queue " << &osdmap << " "
+ << p.first << " [" << ps
+ << "," << ps_end << ")" << dendl;
+ }
+ }
+ return job;
+}