summaryrefslogtreecommitdiffstats
path: root/src/journal
diff options
context:
space:
mode:
authorMykola Golub <mgolub@mirantis.com>2016-07-13 14:49:40 +0200
committerMykola Golub <mgolub@mirantis.com>2016-09-05 07:51:54 +0200
commit0b8b1aaedc10f7f46e91bf6ad809414feb770c8d (patch)
tree87aa40e23cbc24c6ccd7b5b8102f5754b5792eb9 /src/journal
parentcls/journal: add async client_update_state method (diff)
downloadceph-0b8b1aaedc10f7f46e91bf6ad809414feb770c8d.tar.xz
ceph-0b8b1aaedc10f7f46e91bf6ad809414feb770c8d.zip
journal: allow to trim journal for "laggy" clients
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/JournalMetadata.cc63
-rw-r--r--src/journal/JournalMetadata.h2
-rw-r--r--src/journal/JournalTrimmer.cc5
-rw-r--r--src/journal/Settings.h3
4 files changed, 70 insertions, 3 deletions
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc
index de46bd7ff0a..08a7e9307aa 100644
--- a/src/journal/JournalMetadata.cc
+++ b/src/journal/JournalMetadata.cc
@@ -749,6 +749,10 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) {
Client client(m_client_id, bufferlist());
RegisteredClients::iterator it = refresh->registered_clients.find(client);
if (it != refresh->registered_clients.end()) {
+ if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id
+ << dendl;
+ }
m_minimum_set = MAX(m_minimum_set, refresh->minimum_set);
m_active_set = MAX(m_active_set, refresh->active_set);
m_registered_clients = refresh->registered_clients;
@@ -810,9 +814,11 @@ void JournalMetadata::handle_commit_position_task() {
librados::ObjectWriteOperation op;
client::client_commit(&op, m_client_id, m_commit_position);
- C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
+ Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx);
m_commit_position_ctx = NULL;
+ ctx = schedule_laggy_clients_disconnect(ctx);
+
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
@@ -839,7 +845,7 @@ void JournalMetadata::handle_watch_reset() {
if (r == -ENOENT) {
ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl;
} else {
- lderr(m_cct) << __func__ << ": failed to watch journal"
+ lderr(m_cct) << __func__ << ": failed to watch journal: "
<< cpp_strerror(r) << dendl;
}
schedule_watch_reset();
@@ -1023,6 +1029,59 @@ void JournalMetadata::handle_notified(int r) {
ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl;
}
+Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) {
+ assert(m_lock.is_locked());
+
+ ldout(m_cct, 20) << __func__ << dendl;
+
+ if (m_settings.max_concurrent_object_sets <= 0) {
+ return on_finish;
+ }
+
+ Context *ctx = on_finish;
+
+ for (auto &c : m_registered_clients) {
+ if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED ||
+ c.id == m_client_id ||
+ m_settings.whitelisted_laggy_clients.count(c.id) > 0) {
+ continue;
+ }
+ const std::string &client_id = c.id;
+ uint64_t object_set = 0;
+ if (!c.commit_position.object_positions.empty()) {
+ auto &position = *(c.commit_position.object_positions.begin());
+ object_set = position.object_number / m_splay_width;
+ }
+
+ if (m_active_set > object_set + m_settings.max_concurrent_object_sets) {
+ ldout(m_cct, 1) << __func__ << ": " << client_id
+ << ": scheduling disconnect" << dendl;
+
+ ctx = new FunctionContext([this, client_id, ctx](int r1) {
+ ldout(m_cct, 10) << __func__ << ": " << client_id
+ << ": flagging disconnected" << dendl;
+
+ librados::ObjectWriteOperation op;
+ client::client_update_state(&op, client_id,
+ cls::journal::CLIENT_STATE_DISCONNECTED);
+
+ librados::AioCompletion *comp =
+ librados::Rados::aio_create_completion(ctx, nullptr,
+ utils::rados_ctx_callback);
+ int r = m_ioctx.aio_operate(m_oid, comp, &op);
+ assert(r == 0);
+ comp->release();
+ });
+ }
+ }
+
+ if (ctx == on_finish) {
+ ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl;
+ }
+
+ return ctx;
+}
+
std::ostream &operator<<(std::ostream &os,
const JournalMetadata::RegisteredClients &clients) {
os << "[";
diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h
index 01116d7d1e0..4055f993704 100644
--- a/src/journal/JournalMetadata.h
+++ b/src/journal/JournalMetadata.h
@@ -344,6 +344,8 @@ private:
void handle_watch_error(int err);
void handle_notified(int r);
+ Context *schedule_laggy_clients_disconnect(Context *on_finish);
+
friend std::ostream &operator<<(std::ostream &os,
const JournalMetadata &journal_metadata);
};
diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc
index 5e68349513c..283fe764ba0 100644
--- a/src/journal/JournalTrimmer.cc
+++ b/src/journal/JournalTrimmer.cc
@@ -141,8 +141,11 @@ void JournalTrimmer::handle_metadata_updated() {
uint64_t minimum_commit_set = active_set;
std::string minimum_client_id;
- // TODO: add support for trimming past "laggy" clients
for (auto &client : registered_clients) {
+ if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
+ continue;
+ }
+
if (client.commit_position.object_positions.empty()) {
// client hasn't recorded any commits
minimum_commit_set = minimum_set;
diff --git a/src/journal/Settings.h b/src/journal/Settings.h
index 603770cbaf6..ca57125a859 100644
--- a/src/journal/Settings.h
+++ b/src/journal/Settings.h
@@ -12,6 +12,9 @@ struct Settings {
double commit_interval = 5; ///< commit position throttle (in secs)
uint64_t max_fetch_bytes = 0; ///< 0 implies no limit
uint64_t max_payload_bytes = 0; ///< 0 implies object size limit
+ int max_concurrent_object_sets = 0; ///< 0 implies no limit
+ std::set<std::string> whitelisted_laggy_clients;
+ ///< clients that mustn't be disconnected
};
} // namespace journal