diff options
author | Mykola Golub <mgolub@mirantis.com> | 2016-07-13 14:49:40 +0200 |
---|---|---|
committer | Mykola Golub <mgolub@mirantis.com> | 2016-09-05 07:51:54 +0200 |
commit | 0b8b1aaedc10f7f46e91bf6ad809414feb770c8d (patch) | |
tree | 87aa40e23cbc24c6ccd7b5b8102f5754b5792eb9 /src/journal | |
parent | cls/journal: add async client_update_state method (diff) | |
download | ceph-0b8b1aaedc10f7f46e91bf6ad809414feb770c8d.tar.xz ceph-0b8b1aaedc10f7f46e91bf6ad809414feb770c8d.zip |
journal: allow to trim journal for "laggy" clients
Signed-off-by: Mykola Golub <mgolub@mirantis.com>
Diffstat (limited to 'src/journal')
-rw-r--r-- | src/journal/JournalMetadata.cc | 63 | ||||
-rw-r--r-- | src/journal/JournalMetadata.h | 2 | ||||
-rw-r--r-- | src/journal/JournalTrimmer.cc | 5 | ||||
-rw-r--r-- | src/journal/Settings.h | 3 |
4 files changed, 70 insertions, 3 deletions
diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index de46bd7ff0a..08a7e9307aa 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -749,6 +749,10 @@ void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { Client client(m_client_id, bufferlist()); RegisteredClients::iterator it = refresh->registered_clients.find(client); if (it != refresh->registered_clients.end()) { + if (it->state == cls::journal::CLIENT_STATE_DISCONNECTED) { + ldout(m_cct, 0) << "client flagged disconnected: " << m_client_id + << dendl; + } m_minimum_set = MAX(m_minimum_set, refresh->minimum_set); m_active_set = MAX(m_active_set, refresh->active_set); m_registered_clients = refresh->registered_clients; @@ -810,9 +814,11 @@ void JournalMetadata::handle_commit_position_task() { librados::ObjectWriteOperation op; client::client_commit(&op, m_client_id, m_commit_position); - C_NotifyUpdate *ctx = new C_NotifyUpdate(this, m_commit_position_ctx); + Context *ctx = new C_NotifyUpdate(this, m_commit_position_ctx); m_commit_position_ctx = NULL; + ctx = schedule_laggy_clients_disconnect(ctx); + librados::AioCompletion *comp = librados::Rados::aio_create_completion(ctx, NULL, utils::rados_ctx_callback); @@ -839,7 +845,7 @@ void JournalMetadata::handle_watch_reset() { if (r == -ENOENT) { ldout(m_cct, 5) << __func__ << ": journal header not found" << dendl; } else { - lderr(m_cct) << __func__ << ": failed to watch journal" + lderr(m_cct) << __func__ << ": failed to watch journal: " << cpp_strerror(r) << dendl; } schedule_watch_reset(); @@ -1023,6 +1029,59 @@ void JournalMetadata::handle_notified(int r) { ldout(m_cct, 10) << "notified journal header update: r=" << r << dendl; } +Context *JournalMetadata::schedule_laggy_clients_disconnect(Context *on_finish) { + assert(m_lock.is_locked()); + + ldout(m_cct, 20) << __func__ << dendl; + + if (m_settings.max_concurrent_object_sets <= 0) { + return on_finish; + } + + Context *ctx = on_finish; + + for (auto &c : m_registered_clients) { + if (c.state == cls::journal::CLIENT_STATE_DISCONNECTED || + c.id == m_client_id || + m_settings.whitelisted_laggy_clients.count(c.id) > 0) { + continue; + } + const std::string &client_id = c.id; + uint64_t object_set = 0; + if (!c.commit_position.object_positions.empty()) { + auto &position = *(c.commit_position.object_positions.begin()); + object_set = position.object_number / m_splay_width; + } + + if (m_active_set > object_set + m_settings.max_concurrent_object_sets) { + ldout(m_cct, 1) << __func__ << ": " << client_id + << ": scheduling disconnect" << dendl; + + ctx = new FunctionContext([this, client_id, ctx](int r1) { + ldout(m_cct, 10) << __func__ << ": " << client_id + << ": flagging disconnected" << dendl; + + librados::ObjectWriteOperation op; + client::client_update_state(&op, client_id, + cls::journal::CLIENT_STATE_DISCONNECTED); + + librados::AioCompletion *comp = + librados::Rados::aio_create_completion(ctx, nullptr, + utils::rados_ctx_callback); + int r = m_ioctx.aio_operate(m_oid, comp, &op); + assert(r == 0); + comp->release(); + }); + } + } + + if (ctx == on_finish) { + ldout(m_cct, 20) << __func__ << ": no laggy clients to disconnect" << dendl; + } + + return ctx; +} + std::ostream &operator<<(std::ostream &os, const JournalMetadata::RegisteredClients &clients) { os << "["; diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 01116d7d1e0..4055f993704 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -344,6 +344,8 @@ private: void handle_watch_error(int err); void handle_notified(int r); + Context *schedule_laggy_clients_disconnect(Context *on_finish); + friend std::ostream &operator<<(std::ostream &os, const JournalMetadata &journal_metadata); }; diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc index 5e68349513c..283fe764ba0 100644 --- a/src/journal/JournalTrimmer.cc +++ b/src/journal/JournalTrimmer.cc @@ -141,8 +141,11 @@ void JournalTrimmer::handle_metadata_updated() { uint64_t minimum_commit_set = active_set; std::string minimum_client_id; - // TODO: add support for trimming past "laggy" clients for (auto &client : registered_clients) { + if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) { + continue; + } + if (client.commit_position.object_positions.empty()) { // client hasn't recorded any commits minimum_commit_set = minimum_set; diff --git a/src/journal/Settings.h b/src/journal/Settings.h index 603770cbaf6..ca57125a859 100644 --- a/src/journal/Settings.h +++ b/src/journal/Settings.h @@ -12,6 +12,9 @@ struct Settings { double commit_interval = 5; ///< commit position throttle (in secs) uint64_t max_fetch_bytes = 0; ///< 0 implies no limit uint64_t max_payload_bytes = 0; ///< 0 implies object size limit + int max_concurrent_object_sets = 0; ///< 0 implies no limit + std::set<std::string> whitelisted_laggy_clients; + ///< clients that mustn't be disconnected }; } // namespace journal |