summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMykola Golub <mgolub@suse.com>2019-05-31 14:41:22 +0200
committerMykola Golub <mgolub@suse.com>2019-06-23 11:06:45 +0200
commit21ec96158cb0cf8340ef217b45b7972bec453f34 (patch)
tree30bc75cfb79ad25900e9ed47987e5c75e88455ae /src
parentcls/journal: new append method (diff)
downloadceph-21ec96158cb0cf8340ef217b45b7972bec453f34.tar.xz
ceph-21ec96158cb0cf8340ef217b45b7972bec453f34.zip
journal: add support for aligned appends
Fixes: https://tracker.ceph.com/issues/39968 Signed-off-by: Mykola Golub <mgolub@suse.com>
Diffstat (limited to 'src')
-rw-r--r--src/journal/Entry.cc9
-rw-r--r--src/journal/ObjectPlayer.cc48
-rw-r--r--src/journal/ObjectRecorder.cc28
-rw-r--r--src/journal/ObjectRecorder.h2
4 files changed, 81 insertions, 6 deletions
diff --git a/src/journal/Entry.cc b/src/journal/Entry.cc
index 48648a872df..7899bf1cc5a 100644
--- a/src/journal/Entry.cc
+++ b/src/journal/Entry.cc
@@ -86,7 +86,14 @@ bool Entry::is_readable(bufferlist::const_iterator iter, uint32_t *bytes_needed)
using ceph::decode;
uint32_t start_off = iter.get_off();
if (iter.get_remaining() < HEADER_FIXED_SIZE) {
- *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining();
+ bufferlist sub_bl;
+ sub_bl.substr_of(iter.get_bl(), iter.get_off(), iter.get_remaining());
+ if (sub_bl.length() > 0 && sub_bl.is_zero()) {
+ // pad bytes
+ *bytes_needed = 0;
+ } else {
+ *bytes_needed = HEADER_FIXED_SIZE - iter.get_remaining();
+ }
return false;
}
uint64_t bl_preamble;
diff --git a/src/journal/ObjectPlayer.cc b/src/journal/ObjectPlayer.cc
index d4d9fb750b5..0a85b8853f2 100644
--- a/src/journal/ObjectPlayer.cc
+++ b/src/journal/ObjectPlayer.cc
@@ -12,6 +12,36 @@
namespace journal {
+namespace {
+
+bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
+ uint32_t *pad_len, bool *partial_entry) {
+ const uint32_t MAX_PAD = 8;
+ auto pad_bytes = MAX_PAD - off % MAX_PAD;
+ auto next = *iter;
+
+ ceph_assert(!next.end());
+ if (*next != '\0') {
+ return false;
+ }
+
+ for (auto i = pad_bytes - 1; i > 0; i--) {
+ if ((++next).end()) {
+ *partial_entry = true;
+ return false;
+ }
+ if (*next != '\0') {
+ return false;
+ }
+ }
+
+ *iter = next;
+ *pad_len += pad_bytes;
+ return true;
+}
+
+} // anonymous namespace
+
ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
uint64_t object_num, SafeTimer &timer,
@@ -131,6 +161,7 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
clear_invalid_range(m_read_bl_off, m_read_bl.length());
bufferlist::const_iterator iter{&m_read_bl, 0};
+ uint32_t pad_len = 0;
while (!iter.end()) {
uint32_t bytes_needed;
uint32_t bl_off = iter.get_off();
@@ -149,9 +180,21 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
break;
}
- if (!invalid) {
+ if (!invalid &&
+ !advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
+ &pad_len, &partial_entry)) {
invalid_start_off = m_read_bl_off + bl_off;
invalid = true;
+ if (partial_entry) {
+ if (full_fetch) {
+ lderr(m_cct) << ": partial pad at offset " << invalid_start_off
+ << dendl;
+ } else {
+ ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
+ << dendl;
+ }
+ break;
+ }
lderr(m_cct) << ": detected corrupt journal entry at offset "
<< invalid_start_off << dendl;
}
@@ -193,7 +236,8 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
iter = bufferlist::iterator(&m_read_bl, 0);
// advance the decoded entry offset
- m_read_bl_off += entry_len;
+ m_read_bl_off += entry_len + pad_len;
+ pad_len = 0;
}
if (invalid) {
diff --git a/src/journal/ObjectRecorder.cc b/src/journal/ObjectRecorder.cc
index 127731e95c3..2366759cc5c 100644
--- a/src/journal/ObjectRecorder.cc
+++ b/src/journal/ObjectRecorder.cc
@@ -6,6 +6,7 @@
#include "journal/Utils.h"
#include "include/ceph_assert.h"
#include "common/Timer.h"
+#include "common/errno.h"
#include "cls/journal/cls_journal_client.h"
#define dout_subsys ceph_subsys_journaler
@@ -31,6 +32,16 @@ ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
ceph_assert(m_handler != NULL);
+
+ librados::Rados rados(m_ioctx);
+ int8_t require_osd_release = 0;
+ int r = rados.get_min_compatible_osd(&require_osd_release);
+ if (r < 0) {
+ ldout(m_cct, 0) << "failed to retrieve min OSD release: "
+ << cpp_strerror(r) << dendl;
+ }
+ m_compat_mode = require_osd_release < CEPH_RELEASE_OCTOPUS;
+
ldout(m_cct, 20) << dendl;
}
@@ -287,10 +298,13 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
}
librados::ObjectWriteOperation op;
- client::guard_append(&op, m_soft_max_size);
+ if (m_compat_mode) {
+ client::guard_append(&op, m_soft_max_size);
+ }
size_t append_bytes = 0;
AppendBuffers append_buffers;
+ bufferlist append_bl;
for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
auto& future = it->first;
auto& bl = it->second;
@@ -308,8 +322,12 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
ldout(m_cct, 20) << "flushing " << *future << dendl;
future->set_flush_in_progress();
- op.append(bl);
- op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+ if (m_compat_mode) {
+ op.append(bl);
+ op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
+ } else {
+ append_bl.append(bl);
+ }
append_bytes += bl.length();
append_buffers.push_back(*it);
@@ -332,6 +350,10 @@ bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
ceph_assert(m_pending_bytes >= append_bytes);
m_pending_bytes -= append_bytes;
+ if (!m_compat_mode) {
+ client::append(&op, m_soft_max_size, append_bl);
+ }
+
auto rados_completion = librados::Rados::aio_create_completion(
new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
diff --git a/src/journal/ObjectRecorder.h b/src/journal/ObjectRecorder.h
index ff00e0a0a1f..1f6014773ef 100644
--- a/src/journal/ObjectRecorder.h
+++ b/src/journal/ObjectRecorder.h
@@ -122,6 +122,8 @@ private:
double m_flush_age = 0;
int32_t m_max_in_flight_appends;
+ bool m_compat_mode;
+
FlushHandler m_flush_handler;
mutable std::shared_ptr<Mutex> m_lock;