diff options
Diffstat (limited to 'src/librados/RadosClient.cc')
-rw-r--r-- | src/librados/RadosClient.cc | 260 |
1 files changed, 134 insertions, 126 deletions
diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 15fef61f7ea..372419816d8 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -28,7 +28,6 @@ #include "common/ceph_json.h" #include "common/errno.h" #include "common/ceph_json.h" -#include "common/async/waiter.h" #include "include/buffer.h" #include "include/stringify.h" #include "include/util.h" @@ -57,13 +56,22 @@ #undef dout_prefix #define dout_prefix *_dout << "librados: " -namespace ca = ceph::async; -namespace cb = ceph::buffer; -namespace bc = boost::container; -namespace bs = boost::system; - librados::RadosClient::RadosClient(CephContext *cct_) - : Dispatcher(cct_->get()) {} + : Dispatcher(cct_->get()), + cct_deleter{cct_, [](CephContext *p) {p->put();}}, + conf(cct_->_conf), + state(DISCONNECTED), + monclient(cct_), + mgrclient(cct_, nullptr, &monclient.monmap), + messenger(NULL), + instance_id(0), + objecter(NULL), + timer(cct, lock), + refcnt(1), + log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL), + finisher(cct, "radosclient", "fn-radosclient") +{ +} int64_t librados::RadosClient::lookup_pool(const char *name) { @@ -225,7 +233,7 @@ int librados::RadosClient::connect() } { - MonClient mc_bootstrap(cct, poolctx); + MonClient mc_bootstrap(cct); err = mc_bootstrap.get_monmap_and_config(); if (err < 0) return err; @@ -233,8 +241,6 @@ int librados::RadosClient::connect() common_init_finish(cct); - poolctx.start(cct->_conf.get_val<std::uint64_t>("librados_thread_count")); - // get monmap err = monclient.build_initial_monmap(); if (err < 0) @@ -255,9 +261,9 @@ int librados::RadosClient::connect() ldout(cct, 1) << "starting objecter" << dendl; objecter = new (std::nothrow) Objecter(cct, messenger, &monclient, - poolctx, - cct->_conf->rados_mon_op_timeout, - cct->_conf->rados_osd_op_timeout); + &finisher, + cct->_conf->rados_mon_op_timeout, + cct->_conf->rados_osd_op_timeout); if (!objecter) goto out; objecter->set_balanced_budget(); @@ -312,6 +318,10 @@ int librados::RadosClient::connect() objecter->start(); lock.lock(); + timer.init(); + + finisher.start(); + state = CONNECTED; instance_id = monclient.get_global_id(); @@ -354,9 +364,12 @@ void librados::RadosClient::shutdown() // make sure watch callbacks are flushed watch_flush(); } + finisher.wait_for_empty(); + finisher.stop(); } state = DISCONNECTED; instance_id = 0; + timer.shutdown(); // will drop+retake lock l.unlock(); if (need_objecter) { objecter->shutdown(); @@ -368,40 +381,41 @@ void librados::RadosClient::shutdown() messenger->shutdown(); messenger->wait(); } - poolctx.stop(); ldout(cct, 1) << "shutdown" << dendl; } int librados::RadosClient::watch_flush() { ldout(cct, 10) << __func__ << " enter" << dendl; - ca::waiter<> w; - objecter->linger_callback_flush(w); - - w.wait(); + ceph::mutex mylock = ceph::make_mutex("RadosClient::watch_flush::mylock"); + ceph::condition_variable cond; + bool done; + objecter->linger_callback_flush(new C_SafeCond(mylock, cond, &done)); + std::unique_lock l{mylock}; + cond.wait(l, [&done] { return done; }); ldout(cct, 10) << __func__ << " exit" << dendl; return 0; } -struct CB_aio_watch_flush_Complete { +struct C_aio_watch_flush_Complete : public Context { librados::RadosClient *client; librados::AioCompletionImpl *c; - CB_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c) + C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c) : client(_client), c(_c) { c->get(); } - void operator()() { + void finish(int r) override { c->lock.lock(); - c->rval = 0; + c->rval = r; c->complete = true; c->cond.notify_all(); if (c->callback_complete || c->callback_safe) { - boost::asio::defer(client->finish_strand, librados::CB_AioComplete(c)); + client->finisher.queue(new librados::C_AioComplete(c)); } c->put_unlock(); } @@ -410,7 +424,8 @@ struct CB_aio_watch_flush_Complete { int librados::RadosClient::async_watch_flush(AioCompletionImpl *c) { ldout(cct, 10) << __func__ << " enter" << dendl; - objecter->linger_callback_flush(CB_aio_watch_flush_Complete(this, c)); + Context *oncomplete = new C_aio_watch_flush_Complete(this, c); + objecter->linger_callback_flush(oncomplete); ldout(cct, 10) << __func__ << " exit" << dendl; return 0; } @@ -588,10 +603,15 @@ int librados::RadosClient::wait_for_osdmap() int librados::RadosClient::wait_for_latest_osdmap() { - ca::waiter<bs::error_code> w; - objecter->wait_for_latest_osdmap(w); - auto ec = w.wait(); - return ceph::from_error_code(ec); + ceph::mutex mylock = ceph::make_mutex("RadosClient::wait_for_latest_osdmap"); + ceph::condition_variable cond; + bool done; + + objecter->wait_for_latest_osdmap(new C_SafeCond(mylock, cond, &done)); + + std::unique_lock l{mylock}; + cond.wait(l, [&done] {return done;}); + return 0; } int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v) @@ -609,24 +629,20 @@ int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v) int librados::RadosClient::get_pool_stats(std::list<string>& pools, map<string,::pool_stat_t> *result, - bool *pper_pool) + bool *per_pool) { - ca::waiter<bs::error_code, - bc::flat_map<std::string, ::pool_stat_t>, bool> w; - - std::vector<std::string> v(pools.begin(), pools.end()); - objecter->get_pool_stats(v, w); - - auto [ec, res, per_pool] = w.wait(); - if (ec) - return ceph::from_error_code(ec); + ceph::mutex mylock = ceph::make_mutex("RadosClient::get_pool_stats::mylock"); + ceph::condition_variable cond; + bool done; + int ret = 0; - if (per_pool) - *pper_pool = per_pool; - if (result) - result->insert(res.begin(), res.end()); + objecter->get_pool_stats(pools, result, per_pool, + new C_SafeCond(mylock, cond, &done, + &ret)); - return 0; + unique_lock l{mylock}; + cond.wait(l, [&done] { return done;}); + return ret; } bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode( @@ -688,10 +704,14 @@ int librados::RadosClient::pool_create(string& name, ceph::condition_variable cond; bool done; Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply); - objecter->create_pool(name, onfinish, crush_rule); + reply = objecter->create_pool(name, onfinish, crush_rule); - std::unique_lock l{mylock}; - cond.wait(l, [&done] { return done; }); + if (reply < 0) { + delete onfinish; + } else { + std::unique_lock l{mylock}; + cond.wait(l, [&done] { return done; }); + } return reply; } @@ -703,8 +723,11 @@ int librados::RadosClient::pool_create_async(string& name, if (r < 0) return r; - Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c)); - objecter->create_pool(name, onfinish, crush_rule); + Context *onfinish = new C_PoolAsync_Safe(c); + r = objecter->create_pool(name, onfinish, crush_rule); + if (r < 0) { + delete onfinish; + } return r; } @@ -743,10 +766,14 @@ int librados::RadosClient::pool_delete(const char *name) bool done; int ret; Context *onfinish = new C_SafeCond(mylock, cond, &done, &ret); - objecter->delete_pool(name, onfinish); + ret = objecter->delete_pool(name, onfinish); - std::unique_lock l{mylock}; - cond.wait(l, [&done] { return done;}); + if (ret < 0) { + delete onfinish; + } else { + std::unique_lock l{mylock}; + cond.wait(l, [&done] { return done;}); + } return ret; } @@ -756,8 +783,11 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti if (r < 0) return r; - Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c)); - objecter->delete_pool(name, onfinish); + Context *onfinish = new C_PoolAsync_Safe(c); + r = objecter->delete_pool(name, onfinish); + if (r < 0) { + delete onfinish; + } return r; } @@ -813,20 +843,7 @@ void librados::RadosClient::mon_command_async(const vector<string>& cmd, Context *on_finish) { std::lock_guard l{lock}; - monclient.start_mon_command(cmd, inbl, - [outs, outbl, - on_finish = std::unique_ptr<Context>(on_finish)] - (bs::error_code e, - std::string&& s, - ceph::bufferlist&& b) mutable { - if (outs) - *outs = std::move(s); - if (outbl) - *outbl = std::move(b); - if (on_finish) - on_finish.release()->complete( - ceph::from_error_code(e)); - }); + monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish); } int librados::RadosClient::mgr_command(const vector<string>& cmd, @@ -880,68 +897,80 @@ int librados::RadosClient::mon_command(int rank, const vector<string>& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs) { - ca::waiter<bs::error_code, std::string, ceph::bufferlist> w; - monclient.start_mon_command(rank, cmd, inbl, w); - auto&& [ec, s, bl] = w.wait(); - - if (outs) - *outs = std::move(s); - if (outbl) - *outbl = std::move(bl); - - return ceph::from_error_code(ec); + ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock"); + ceph::condition_variable cond; + bool done; + int rval; + { + std::lock_guard l{mylock}; + monclient.start_mon_command(rank, cmd, inbl, outbl, outs, + new C_SafeCond(mylock, cond, &done, &rval)); + } + std::unique_lock l{mylock}; + cond.wait(l, [&done] { return done;}); + return rval; } int librados::RadosClient::mon_command(string name, const vector<string>& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs) { - ca::waiter<bs::error_code, std::string, ceph::bufferlist> w; - monclient.start_mon_command(name, cmd, inbl, w); - auto&& [ec, s, bl] = w.wait(); - - if (outs) - *outs = std::move(s); - if (outbl) - *outbl = std::move(bl); - - return ceph::from_error_code(ec); + ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock"); + ceph::condition_variable cond; + bool done; + int rval; + { + std::lock_guard l{mylock}; + monclient.start_mon_command(name, cmd, inbl, outbl, outs, + new C_SafeCond(mylock, cond, &done, &rval)); + } + std::unique_lock l{mylock}; + cond.wait(l, [&done] { return done;}); + return rval; } int librados::RadosClient::osd_command(int osd, vector<string>& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs) { + ceph::mutex mylock = ceph::make_mutex("RadosClient::osd_command::mylock"); + ceph::condition_variable cond; + bool done; + int ret; ceph_tid_t tid; if (osd < 0) return -EINVAL; - - ca::waiter<bs::error_code, std::string, cb::list> w; - // XXX do anything with tid? - objecter->osd_command(osd, std::move(cmd), cb::list(inbl), &tid, w); - auto [ec, s, bl] = w.wait(); - if (poutbl) - *poutbl = std::move(bl); - if (prs) - *prs = std::move(s); - return ceph::from_error_code(ec); + { + std::lock_guard l{mylock}; + // XXX do anything with tid? + objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs, + new C_SafeCond(mylock, cond, &done, &ret)); + } + std::unique_lock l{mylock}; + cond.wait(l, [&done] { return done;}); + return ret; } int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd, const bufferlist& inbl, bufferlist *poutbl, string *prs) { + ceph::mutex mylock = ceph::make_mutex("RadosClient::pg_command::mylock"); + ceph::condition_variable cond; + bool done; + int ret; ceph_tid_t tid; - ca::waiter<bs::error_code, std::string, cb::list> w; - objecter->pg_command(pgid, std::move(cmd), inbl, &tid, w); - auto [ec, s, bl] = w.wait(); - if (poutbl) - *poutbl = std::move(bl); - if (prs) - *prs = std::move(s); - return ceph::from_error_code(ec); + + { + std::lock_guard l{lock}; + objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs, + new C_SafeCond(mylock, cond, &done, &ret)); + } + std::unique_lock l{mylock}; + cond.wait(l, [&done] { return done;}); + return ret; } int librados::RadosClient::monitor_log(const string& level, @@ -1140,24 +1169,3 @@ int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id, } return 0; } - -namespace { -const char *config_keys[] = { - "librados_thread_count", - NULL -}; -} - -const char** librados::RadosClient::get_tracked_conf_keys() const -{ - return config_keys; -} - -void librados::RadosClient::handle_conf_change(const ConfigProxy& conf, - const std::set<std::string> &changed) -{ - if (changed.count("librados_thread_count")) { - poolctx.stop(); - poolctx.start(conf.get_val<std::uint64_t>("librados_thread_count")); - } -} |