summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatt Benjamin <mbenjamin@redhat.com>2015-10-29 23:03:16 +0100
committerMatt Benjamin <mbenjamin@redhat.com>2016-02-12 18:05:27 +0100
commit5139c312f9ef4ba19d9fc46379c0455ac54e0a43 (patch)
tree78773354d57358af0f7ec4549aa700af74be0714
parentlibrgw: improve rgw_write and add WRITE_READ_VERIFY (diff)
downloadceph-5139c312f9ef4ba19d9fc46379c0455ac54e0a43.tar.xz
ceph-5139c312f9ef4ba19d9fc46379c0455ac54e0a43.zip
librgw: try-implement rgw_readv/rgw_writev
The model in the rgw_readv call is the caller owns the struct rgw_uio, and uses it to pass uio_offset. The caller returns the rgw_uio filled out, and provides indirectly rgw_uio->uio_vio (the full private data is at uio->uio_p1 on return, the caller (p==private) must not touch it. The caller returns private data in rgw_readv_release() when finished. Meanwhile rgw_writev is atomic. Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
-rw-r--r--src/include/rados/rgw_file.h41
-rw-r--r--src/rgw/rgw_file.cc103
-rw-r--r--src/test/librgw_file_gp.cc97
3 files changed, 220 insertions, 21 deletions
diff --git a/src/include/rados/rgw_file.h b/src/include/rados/rgw_file.h
index 5689d7469cc..a5f4b1199b8 100644
--- a/src/include/rados/rgw_file.h
+++ b/src/include/rados/rgw_file.h
@@ -194,11 +194,6 @@ int rgw_read(struct rgw_fs *rgw_fs,
struct rgw_file_handle *fh, uint64_t offset,
size_t length, size_t *bytes_read, void *buffer);
-/* XXX add release fn and UIO type */
-int rgw_readv(struct rgw_fs *rgw_fs,
- struct rgw_file_handle *fh, uint64_t offset,
- size_t length, void *buffer);
-
/*
write data to file
*/
@@ -206,12 +201,40 @@ int rgw_write(struct rgw_fs *rgw_fs,
struct rgw_file_handle *fh, uint64_t offset,
size_t length, size_t *bytes_written, void *buffer);
-/* XXX add release fn and UIO type */
+#define RGW_UIO_NONE 0x0000
+#define RGW_UIO_GIFT 0x0001
+#define RGW_UIO_FREE 0x0002
+#define RGW_UIO_BUFQ 0x0004
-int rgw_writev(struct rgw_fs *rgw_fs,
- const struct rgw_file_handle *fh, uint64_t offset,
- size_t length, void *buffer);
+struct rgw_uio;
+typedef void (*rgw_uio_release)(struct rgw_uio *, uint32_t);
+
+/* buffer vector descriptors */
+struct rgw_vio {
+ void *vio_p1;
+ void *vio_u1;
+ void *vio_base;
+ int32_t vio_len;
+};
+
+struct rgw_uio {
+ rgw_uio_release uio_rele;
+ void *uio_p1;
+ void *uio_u1;
+ uint64_t uio_offset;
+ uint64_t uio_resid;
+ uint32_t uio_cnt;
+ uint32_t uio_flags;
+ struct rgw_vio *uio_vio; /* appended vectors */
+};
+
+typedef struct rgw_uio rgw_uio;
+int rgw_readv(struct rgw_fs *rgw_fs,
+ rgw_file_handle *fh, rgw_uio *uio);
+
+int rgw_writev(struct rgw_fs *rgw_fs,
+ struct rgw_file_handle *fh, rgw_uio *uio);
/*
sync written data
diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc
index 6136f7848bd..a3b6dc409af 100644
--- a/src/rgw/rgw_file.cc
+++ b/src/rgw/rgw_file.cc
@@ -373,6 +373,109 @@ int rgw_write(struct rgw_fs *rgw_fs,
}
/*
+ read data from file (vector)
+*/
+class RGWReadV
+{
+ buffer::list bl;
+ struct rgw_vio* vio;
+
+public:
+ RGWReadV(buffer::list& _bl, rgw_vio* _vio) : vio(_vio) {
+ bl.claim(_bl);
+ }
+
+ struct rgw_vio* get_vio() { return vio; }
+
+ const std::list<buffer::ptr>& buffers() { return bl.buffers(); }
+
+ unsigned /* XXX */ length() { return bl.length(); }
+
+};
+
+void rgw_readv_rele(struct rgw_uio *uio, uint32_t flags)
+{
+ RGWReadV* rdv = static_cast<RGWReadV*>(uio->uio_p1);
+ rdv->~RGWReadV();
+ ::operator delete(rdv);
+}
+
+int rgw_readv(struct rgw_fs *rgw_fs,
+ struct rgw_file_handle *fh, rgw_uio *uio)
+{
+ CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
+ RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+ RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+ if (! rgw_fh->is_object())
+ return EINVAL;
+
+ buffer::list bl;
+ RGWGetObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(),
+ rgw_fh->object_name(), uio->uio_offset, uio->uio_resid,
+ bl);
+
+ int rc = librgw.get_fe()->execute_req(&req);
+
+ if (! rc) {
+ RGWReadV* rdv = static_cast<RGWReadV*>(
+ ::operator new(sizeof(RGWReadV) +
+ (bl.buffers().size() * sizeof(struct rgw_vio))));
+
+ (void) new (rdv)
+ RGWReadV(bl, reinterpret_cast<rgw_vio*>(rdv+sizeof(RGWReadV)));
+
+ uio->uio_p1 = rdv;
+ uio->uio_cnt = rdv->buffers().size();
+ uio->uio_resid = rdv->length();
+ uio->uio_vio = rdv->get_vio();
+ uio->uio_rele = rgw_readv_rele;
+
+ int ix = 0;
+ auto& buffers = rdv->buffers();
+ for (auto& bp : buffers) {
+ rgw_vio *vio = &(uio->uio_vio[ix]);
+ vio->vio_base = const_cast<char*>(bp.c_str());
+ vio->vio_len = bp.length();
+ vio->vio_u1 = nullptr;
+ vio->vio_p1 = nullptr;
+ ++ix;
+ }
+ }
+
+ return rc;
+}
+
+/*
+ write data to file (vector)
+*/
+ int rgw_writev(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh,
+ rgw_uio *uio)
+{
+ CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw);
+ RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
+ RGWFileHandle* rgw_fh = get_rgwfh(fh);
+
+ if (! rgw_fh->is_object())
+ return EINVAL;
+
+ buffer::list bl;
+ for (unsigned int ix = 0; ix < uio->uio_cnt; ++ix) {
+ rgw_vio *vio = &(uio->uio_vio[ix]);
+ bl.push_back(
+ buffer::create_static(vio->vio_len,
+ static_cast<char*>(vio->vio_base)));
+ }
+
+ RGWPutObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(),
+ rgw_fh->object_name(), bl);
+
+ int rc = librgw.get_fe()->execute_req(&req);
+
+ return rc;
+}
+
+/*
sync written data
*/
int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *handle)
diff --git a/src/test/librgw_file_gp.cc b/src/test/librgw_file_gp.cc
index 4b9fa9b28cf..6de26aaa36e 100644
--- a/src/test/librgw_file_gp.cc
+++ b/src/test/librgw_file_gp.cc
@@ -40,6 +40,8 @@ namespace {
bool do_pre_list = false;
bool do_put = false;
bool do_bulk = false;
+ bool do_writev = false;
+ bool do_readv = false;
bool do_get = false;
bool do_delete = false;
@@ -54,10 +56,13 @@ namespace {
std::uniform_int_distribution<uint8_t> uint_dist;
std::mt19937 rng;
-
+
+ constexpr int iovcnt = 16;
+ constexpr int page_size = 65536;
+
struct ZPage
{
- char data[65536];
+ char data[page_size];
uint64_t cksum;
}; /* ZPage */
@@ -71,15 +76,15 @@ namespace {
iovs = (struct iovec*) calloc(n, sizeof(struct iovec));
for (int page_ix = 0; page_ix < n; ++page_ix) {
ZPage* p = new ZPage();
- for (int data_ix = 0; data_ix < 65536; ++data_ix) {
+ for (int data_ix = 0; data_ix < page_size; ++data_ix) {
p->data[data_ix] = uint_dist(rng);
} // data_ix
- p->cksum = XXH64(p->data, 65536, 8675309);
+ p->cksum = XXH64(p->data, page_size, 8675309);
pages.emplace_back(p);
// and iovs
struct iovec* iov = &iovs[page_ix];
iov->iov_base = p->data;
- iov->iov_len = 65536;
+ iov->iov_len = page_size;
} // page_ix
}
@@ -102,7 +107,7 @@ namespace {
int n = size();
for (int page_ix = 0; page_ix < n; ++page_ix) {
ZPage* p = pages[page_ix];
- p->cksum = XXH64(p->data, 65536, 8675309);
+ p->cksum = XXH64(p->data, page_size, 8675309);
}
}
@@ -112,7 +117,7 @@ namespace {
ZPage* p = pages[page_ix];
struct iovec* iov = &iovs[page_ix];
iov->iov_base = p->data;
- iov->iov_len = 65536;
+ iov->iov_len = page_size;
}
}
@@ -216,23 +221,85 @@ TEST(LibRGW, GET_OBJECT) {
TEST(LibRGW, WRITE_READ_VERIFY)
{
- if (do_bulk) {
- const int iovcnt = 16;
+ if (do_bulk && do_put) {
ZPageSet zp_set1{iovcnt}; // 1M random data in 16 64K pages
struct iovec *iovs = zp_set1.get_iovs();
/* read after write POSIX-style */
size_t nbytes, off = 0;
- for (int ix = 0; ix < 16; ++ix, off += 65536) {
+ for (int ix = 0; ix < 16; ++ix, off += page_size) {
struct iovec *iov = &iovs[ix];
- int ret = rgw_write(fs, object_fh, off, 65536, &nbytes, iov->iov_base);
+ int ret = rgw_write(fs, object_fh, off, page_size, &nbytes,
+ iov->iov_base);
ASSERT_EQ(ret, 0);
- ASSERT_EQ(nbytes, size_t(65536));
+ ASSERT_EQ(nbytes, size_t(page_size));
}
zp_set1.reset_iovs();
}
}
+/* "functions that call alloca are not inlined"
+ * --alexandre oliva
+ * http://gcc.gnu.org/ml/gcc-help/2004-04/msg00158.html
+ */
+#define alloca_uio() \
+ do {\
+ int uiosz = sizeof(rgw_uio) + iovcnt*sizeof(rgw_vio); \
+ uio = static_cast<rgw_uio*>(alloca(uiosz)); \
+ memset(uio, 0, uiosz); \
+ uio->uio_vio = reinterpret_cast<rgw_vio*>(uio+sizeof(rgw_uio)); \
+ } while (0); \
+
+TEST(LibRGW, WRITEV)
+{
+ if (do_writev && do_put) {
+ rgw_uio* uio;
+ ZPageSet zp_set1{iovcnt}; // 1M random data in 16 64K pages
+ struct iovec *iovs = zp_set1.get_iovs();
+ alloca_uio();
+ ASSERT_NE(uio, nullptr);
+
+ for (int ix = 0; ix < iovcnt; ++ix) {
+ struct iovec *iov = &iovs[ix];
+ rgw_vio *vio = &(uio->uio_vio[ix]);
+ vio->vio_base = iov->iov_base;
+ vio->vio_len = iov->iov_len;
+ vio->vio_u1 = iov; // private data
+ }
+ uio->uio_cnt = iovcnt;
+ uio->uio_offset = iovcnt * page_size;
+
+ int ret = rgw_writev(fs, object_fh, uio);
+ ASSERT_EQ(ret, 0);
+ //zp_set1.reset_iovs();
+ }
+}
+
+TEST(LibRGW, READV)
+{
+ if (do_readv && do_get) {
+ rgw_uio uio[1];
+ memset(uio, 0, sizeof(rgw_uio));
+ uio->uio_offset = 0; // ok, it was already 0
+ int ret = rgw_readv(fs, object_fh, uio);
+ ASSERT_EQ(ret, 0);
+ //buffer::list bl;
+ buffer::list& bl = *(new buffer::list());
+ for (unsigned int ix = 0; ix < uio->uio_cnt; ++ix) {
+ rgw_vio *vio = &(uio->uio_vio[ix]);
+ bl.push_back(
+ buffer::create_static(vio->vio_len,
+ static_cast<char*>(vio->vio_base)));
+ }
+ bl.hexdump(std::cout);
+ // release resources
+ ASSERT_NE(uio->uio_rele, nullptr);
+ if (uio->uio_rele) {
+ uio->uio_rele(uio, RGW_UIO_NONE);
+ }
+ }
+}
+
TEST(LibRGW, DELETE_OBJECT) {
if (do_delete) {
int ret = rgw_unlink(fs, bucket_fh, object_name.c_str());
@@ -302,6 +369,12 @@ int main(int argc, char *argv[])
} else if (ceph_argparse_flag(args, arg_iter, "--bulk",
(char*) nullptr)) {
do_bulk = true;
+ } else if (ceph_argparse_flag(args, arg_iter, "--writev",
+ (char*) nullptr)) {
+ do_writev = true;
+ } else if (ceph_argparse_flag(args, arg_iter, "--readv",
+ (char*) nullptr)) {
+ do_readv = true;
} else if (ceph_argparse_flag(args, arg_iter, "--delete",
(char*) nullptr)) {
do_delete = true;