summaryrefslogtreecommitdiffstats
path: root/src/librados/RadosClient.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/librados/RadosClient.cc')
-rw-r--r--src/librados/RadosClient.cc260
1 files changed, 134 insertions, 126 deletions
diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc
index 15fef61f7ea..372419816d8 100644
--- a/src/librados/RadosClient.cc
+++ b/src/librados/RadosClient.cc
@@ -28,7 +28,6 @@
#include "common/ceph_json.h"
#include "common/errno.h"
#include "common/ceph_json.h"
-#include "common/async/waiter.h"
#include "include/buffer.h"
#include "include/stringify.h"
#include "include/util.h"
@@ -57,13 +56,22 @@
#undef dout_prefix
#define dout_prefix *_dout << "librados: "
-namespace ca = ceph::async;
-namespace cb = ceph::buffer;
-namespace bc = boost::container;
-namespace bs = boost::system;
-
librados::RadosClient::RadosClient(CephContext *cct_)
- : Dispatcher(cct_->get()) {}
+ : Dispatcher(cct_->get()),
+ cct_deleter{cct_, [](CephContext *p) {p->put();}},
+ conf(cct_->_conf),
+ state(DISCONNECTED),
+ monclient(cct_),
+ mgrclient(cct_, nullptr, &monclient.monmap),
+ messenger(NULL),
+ instance_id(0),
+ objecter(NULL),
+ timer(cct, lock),
+ refcnt(1),
+ log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
+ finisher(cct, "radosclient", "fn-radosclient")
+{
+}
int64_t librados::RadosClient::lookup_pool(const char *name)
{
@@ -225,7 +233,7 @@ int librados::RadosClient::connect()
}
{
- MonClient mc_bootstrap(cct, poolctx);
+ MonClient mc_bootstrap(cct);
err = mc_bootstrap.get_monmap_and_config();
if (err < 0)
return err;
@@ -233,8 +241,6 @@ int librados::RadosClient::connect()
common_init_finish(cct);
- poolctx.start(cct->_conf.get_val<std::uint64_t>("librados_thread_count"));
-
// get monmap
err = monclient.build_initial_monmap();
if (err < 0)
@@ -255,9 +261,9 @@ int librados::RadosClient::connect()
ldout(cct, 1) << "starting objecter" << dendl;
objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
- poolctx,
- cct->_conf->rados_mon_op_timeout,
- cct->_conf->rados_osd_op_timeout);
+ &finisher,
+ cct->_conf->rados_mon_op_timeout,
+ cct->_conf->rados_osd_op_timeout);
if (!objecter)
goto out;
objecter->set_balanced_budget();
@@ -312,6 +318,10 @@ int librados::RadosClient::connect()
objecter->start();
lock.lock();
+ timer.init();
+
+ finisher.start();
+
state = CONNECTED;
instance_id = monclient.get_global_id();
@@ -354,9 +364,12 @@ void librados::RadosClient::shutdown()
// make sure watch callbacks are flushed
watch_flush();
}
+ finisher.wait_for_empty();
+ finisher.stop();
}
state = DISCONNECTED;
instance_id = 0;
+ timer.shutdown(); // will drop+retake lock
l.unlock();
if (need_objecter) {
objecter->shutdown();
@@ -368,40 +381,41 @@ void librados::RadosClient::shutdown()
messenger->shutdown();
messenger->wait();
}
- poolctx.stop();
ldout(cct, 1) << "shutdown" << dendl;
}
int librados::RadosClient::watch_flush()
{
ldout(cct, 10) << __func__ << " enter" << dendl;
- ca::waiter<> w;
- objecter->linger_callback_flush(w);
-
- w.wait();
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::watch_flush::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ objecter->linger_callback_flush(new C_SafeCond(mylock, cond, &done));
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
ldout(cct, 10) << __func__ << " exit" << dendl;
return 0;
}
-struct CB_aio_watch_flush_Complete {
+struct C_aio_watch_flush_Complete : public Context {
librados::RadosClient *client;
librados::AioCompletionImpl *c;
- CB_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
+ C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
: client(_client), c(_c) {
c->get();
}
- void operator()() {
+ void finish(int r) override {
c->lock.lock();
- c->rval = 0;
+ c->rval = r;
c->complete = true;
c->cond.notify_all();
if (c->callback_complete ||
c->callback_safe) {
- boost::asio::defer(client->finish_strand, librados::CB_AioComplete(c));
+ client->finisher.queue(new librados::C_AioComplete(c));
}
c->put_unlock();
}
@@ -410,7 +424,8 @@ struct CB_aio_watch_flush_Complete {
int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
{
ldout(cct, 10) << __func__ << " enter" << dendl;
- objecter->linger_callback_flush(CB_aio_watch_flush_Complete(this, c));
+ Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
+ objecter->linger_callback_flush(oncomplete);
ldout(cct, 10) << __func__ << " exit" << dendl;
return 0;
}
@@ -588,10 +603,15 @@ int librados::RadosClient::wait_for_osdmap()
int librados::RadosClient::wait_for_latest_osdmap()
{
- ca::waiter<bs::error_code> w;
- objecter->wait_for_latest_osdmap(w);
- auto ec = w.wait();
- return ceph::from_error_code(ec);
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::wait_for_latest_osdmap");
+ ceph::condition_variable cond;
+ bool done;
+
+ objecter->wait_for_latest_osdmap(new C_SafeCond(mylock, cond, &done));
+
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] {return done;});
+ return 0;
}
int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
@@ -609,24 +629,20 @@ int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
int librados::RadosClient::get_pool_stats(std::list<string>& pools,
map<string,::pool_stat_t> *result,
- bool *pper_pool)
+ bool *per_pool)
{
- ca::waiter<bs::error_code,
- bc::flat_map<std::string, ::pool_stat_t>, bool> w;
-
- std::vector<std::string> v(pools.begin(), pools.end());
- objecter->get_pool_stats(v, w);
-
- auto [ec, res, per_pool] = w.wait();
- if (ec)
- return ceph::from_error_code(ec);
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::get_pool_stats::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ int ret = 0;
- if (per_pool)
- *pper_pool = per_pool;
- if (result)
- result->insert(res.begin(), res.end());
+ objecter->get_pool_stats(pools, result, per_pool,
+ new C_SafeCond(mylock, cond, &done,
+ &ret));
- return 0;
+ unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ return ret;
}
bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
@@ -688,10 +704,14 @@ int librados::RadosClient::pool_create(string& name,
ceph::condition_variable cond;
bool done;
Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
- objecter->create_pool(name, onfinish, crush_rule);
+ reply = objecter->create_pool(name, onfinish, crush_rule);
- std::unique_lock l{mylock};
- cond.wait(l, [&done] { return done; });
+ if (reply < 0) {
+ delete onfinish;
+ } else {
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done; });
+ }
return reply;
}
@@ -703,8 +723,11 @@ int librados::RadosClient::pool_create_async(string& name,
if (r < 0)
return r;
- Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c));
- objecter->create_pool(name, onfinish, crush_rule);
+ Context *onfinish = new C_PoolAsync_Safe(c);
+ r = objecter->create_pool(name, onfinish, crush_rule);
+ if (r < 0) {
+ delete onfinish;
+ }
return r;
}
@@ -743,10 +766,14 @@ int librados::RadosClient::pool_delete(const char *name)
bool done;
int ret;
Context *onfinish = new C_SafeCond(mylock, cond, &done, &ret);
- objecter->delete_pool(name, onfinish);
+ ret = objecter->delete_pool(name, onfinish);
- std::unique_lock l{mylock};
- cond.wait(l, [&done] { return done;});
+ if (ret < 0) {
+ delete onfinish;
+ } else {
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ }
return ret;
}
@@ -756,8 +783,11 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti
if (r < 0)
return r;
- Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c));
- objecter->delete_pool(name, onfinish);
+ Context *onfinish = new C_PoolAsync_Safe(c);
+ r = objecter->delete_pool(name, onfinish);
+ if (r < 0) {
+ delete onfinish;
+ }
return r;
}
@@ -813,20 +843,7 @@ void librados::RadosClient::mon_command_async(const vector<string>& cmd,
Context *on_finish)
{
std::lock_guard l{lock};
- monclient.start_mon_command(cmd, inbl,
- [outs, outbl,
- on_finish = std::unique_ptr<Context>(on_finish)]
- (bs::error_code e,
- std::string&& s,
- ceph::bufferlist&& b) mutable {
- if (outs)
- *outs = std::move(s);
- if (outbl)
- *outbl = std::move(b);
- if (on_finish)
- on_finish.release()->complete(
- ceph::from_error_code(e));
- });
+ monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
}
int librados::RadosClient::mgr_command(const vector<string>& cmd,
@@ -880,68 +897,80 @@ int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
- ca::waiter<bs::error_code, std::string, ceph::bufferlist> w;
- monclient.start_mon_command(rank, cmd, inbl, w);
- auto&& [ec, s, bl] = w.wait();
-
- if (outs)
- *outs = std::move(s);
- if (outbl)
- *outbl = std::move(bl);
-
- return ceph::from_error_code(ec);
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ int rval;
+ {
+ std::lock_guard l{mylock};
+ monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
+ new C_SafeCond(mylock, cond, &done, &rval));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ return rval;
}
int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
{
- ca::waiter<bs::error_code, std::string, ceph::bufferlist> w;
- monclient.start_mon_command(name, cmd, inbl, w);
- auto&& [ec, s, bl] = w.wait();
-
- if (outs)
- *outs = std::move(s);
- if (outbl)
- *outbl = std::move(bl);
-
- return ceph::from_error_code(ec);
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ int rval;
+ {
+ std::lock_guard l{mylock};
+ monclient.start_mon_command(name, cmd, inbl, outbl, outs,
+ new C_SafeCond(mylock, cond, &done, &rval));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ return rval;
}
int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
const bufferlist& inbl,
bufferlist *poutbl, string *prs)
{
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::osd_command::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ int ret;
ceph_tid_t tid;
if (osd < 0)
return -EINVAL;
-
- ca::waiter<bs::error_code, std::string, cb::list> w;
- // XXX do anything with tid?
- objecter->osd_command(osd, std::move(cmd), cb::list(inbl), &tid, w);
- auto [ec, s, bl] = w.wait();
- if (poutbl)
- *poutbl = std::move(bl);
- if (prs)
- *prs = std::move(s);
- return ceph::from_error_code(ec);
+ {
+ std::lock_guard l{mylock};
+ // XXX do anything with tid?
+ objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
+ new C_SafeCond(mylock, cond, &done, &ret));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ return ret;
}
int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
const bufferlist& inbl,
bufferlist *poutbl, string *prs)
{
+ ceph::mutex mylock = ceph::make_mutex("RadosClient::pg_command::mylock");
+ ceph::condition_variable cond;
+ bool done;
+ int ret;
ceph_tid_t tid;
- ca::waiter<bs::error_code, std::string, cb::list> w;
- objecter->pg_command(pgid, std::move(cmd), inbl, &tid, w);
- auto [ec, s, bl] = w.wait();
- if (poutbl)
- *poutbl = std::move(bl);
- if (prs)
- *prs = std::move(s);
- return ceph::from_error_code(ec);
+
+ {
+ std::lock_guard l{lock};
+ objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
+ new C_SafeCond(mylock, cond, &done, &ret));
+ }
+ std::unique_lock l{mylock};
+ cond.wait(l, [&done] { return done;});
+ return ret;
}
int librados::RadosClient::monitor_log(const string& level,
@@ -1140,24 +1169,3 @@ int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
}
return 0;
}
-
-namespace {
-const char *config_keys[] = {
- "librados_thread_count",
- NULL
-};
-}
-
-const char** librados::RadosClient::get_tracked_conf_keys() const
-{
- return config_keys;
-}
-
-void librados::RadosClient::handle_conf_change(const ConfigProxy& conf,
- const std::set<std::string> &changed)
-{
- if (changed.count("librados_thread_count")) {
- poolctx.stop();
- poolctx.start(conf.get_val<std::uint64_t>("librados_thread_count"));
- }
-}