diff options
author | Mykola Golub <mgolub@suse.com> | 2019-05-31 14:41:22 +0200 |
---|---|---|
committer | Mykola Golub <mgolub@suse.com> | 2019-06-23 11:06:45 +0200 |
commit | 21ec96158cb0cf8340ef217b45b7972bec453f34 (patch) | |
tree | 30bc75cfb79ad25900e9ed47987e5c75e88455ae /src | |
parent | cls/journal: new append method (diff) | |
download | ceph-21ec96158cb0cf8340ef217b45b7972bec453f34.tar.xz ceph-21ec96158cb0cf8340ef217b45b7972bec453f34.zip |
journal: add support for aligned appends
Fixes: https://tracker.ceph.com/issues/39968
Signed-off-by: Mykola Golub <mgolub@suse.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/journal/Entry.cc | 9 | ||||
-rw-r--r-- | src/journal/ObjectPlayer.cc | 48 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.cc | 28 | ||||
-rw-r--r-- | src/journal/ObjectRecorder.h | 2 |
4 files changed, 81 insertions, 6 deletions
diff --git a/src/journal/Entry.cc b/src/journal/Entry.cc index 48648a872df..7899bf1cc5a 100644 --- a/src/journal/Entry.cc +++ b/src/journal/Entry.cc @@ -86,7 +86,14 @@ bool Entry::is_readable(bufferlist::const_iterator iter, uint32_t *bytes_needed) using ceph::decode; uint32_t start_off = iter.get_off(); if (iter.get_remaining() < HEADER_FIXED_SIZE) { - *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining(); + bufferlist sub_bl; + sub_bl.substr_of(iter.get_bl(), iter.get_off(), iter.get_remaining()); + if (sub_bl.length() > 0 && sub_bl.is_zero()) { + // pad bytes + *bytes_needed = 0; + } else { + *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining(); + } return false; } uint64_t bl_preamble; diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc index d4d9fb750b5..0a85b8853f2 100644 --- a/src/journal/ObjectPlayer.cc +++ b/src/journal/ObjectPlayer.cc @@ -12,6 +12,36 @@ namespace journal { +namespace { + +bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter, + uint32_t *pad_len, bool *partial_entry) { + const uint32_t MAX_PAD = 8; + auto pad_bytes = MAX_PAD - off % MAX_PAD; + auto next = *iter; + + ceph_assert(!next.end()); + if (*next != '\0') { + return false; + } + + for (auto i = pad_bytes - 1; i > 0; i--) { + if ((++next).end()) { + *partial_entry = true; + return false; + } + if (*next != '\0') { + return false; + } + } + + *iter = next; + *pad_len += pad_bytes; + return true; +} + +} // anonymous namespace + ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, uint64_t object_num, SafeTimer &timer, @@ -131,6 +161,7 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl, clear_invalid_range(m_read_bl_off, m_read_bl.length()); bufferlist::const_iterator iter{&m_read_bl, 0}; + uint32_t pad_len = 0; while (!iter.end()) { uint32_t bytes_needed; uint32_t bl_off = iter.get_off(); @@ -149,9 +180,21 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl, break; } - if (!invalid) { + if (!invalid && + !advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter, + &pad_len, &partial_entry)) { invalid_start_off = m_read_bl_off + bl_off; invalid = true; + if (partial_entry) { + if (full_fetch) { + lderr(m_cct) << ": partial pad at offset " << invalid_start_off + << dendl; + } else { + ldout(m_cct, 20) << ": partial pad detected, will re-fetch" + << dendl; + } + break; + } lderr(m_cct) << ": detected corrupt journal entry at offset " << invalid_start_off << dendl; } @@ -193,7 +236,8 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl, iter = bufferlist::iterator(&m_read_bl, 0); // advance the decoded entry offset - m_read_bl_off += entry_len; + m_read_bl_off += entry_len + pad_len; + pad_len = 0; } if (invalid) { diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc index 127731e95c3..2366759cc5c 100644 --- a/src/journal/ObjectRecorder.cc +++ b/src/journal/ObjectRecorder.cc @@ -6,6 +6,7 @@ #include "journal/Utils.h" #include "include/ceph_assert.h" #include "common/Timer.h" +#include "common/errno.h" #include "cls/journal/cls_journal_client.h" #define dout_subsys ceph_subsys_journaler @@ -31,6 +32,16 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, m_ioctx.dup(ioctx); m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); ceph_assert(m_handler != NULL); + + librados::Rados rados(m_ioctx); + int8_t require_osd_release = 0; + int r = rados.get_min_compatible_osd(&require_osd_release); + if (r < 0) { + ldout(m_cct, 0) << "failed to retrieve min OSD release: " + << cpp_strerror(r) << dendl; + } + m_compat_mode = require_osd_release < CEPH_RELEASE_OCTOPUS; + ldout(m_cct, 20) << dendl; } @@ -287,10 +298,13 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { } librados::ObjectWriteOperation op; - client::guard_append(&op, m_soft_max_size); + if (m_compat_mode) { + client::guard_append(&op, m_soft_max_size); + } size_t append_bytes = 0; AppendBuffers append_buffers; + bufferlist append_bl; for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) { auto& future = it->first; auto& bl = it->second; @@ -308,8 +322,12 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { ldout(m_cct, 20) << "flushing " << *future << dendl; future->set_flush_in_progress(); - op.append(bl); - op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + if (m_compat_mode) { + op.append(bl); + op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + } else { + append_bl.append(bl); + } append_bytes += bl.length(); append_buffers.push_back(*it); @@ -332,6 +350,10 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { ceph_assert(m_pending_bytes >= append_bytes); m_pending_bytes -= append_bytes; + if (!m_compat_mode) { + client::append(&op, m_soft_max_size, append_bl); + } + auto rados_completion = librados::Rados::aio_create_completion( new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback); int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h index ff00e0a0a1f..1f6014773ef 100644 --- a/src/journal/ObjectRecorder.h +++ b/src/journal/ObjectRecorder.h @@ -122,6 +122,8 @@ private: double m_flush_age = 0; int32_t m_max_in_flight_appends; + bool m_compat_mode; + FlushHandler m_flush_handler; mutable std::shared_ptr<Mutex> m_lock; |