diff options
author | Matt Benjamin <mbenjamin@redhat.com> | 2015-10-29 23:03:16 +0100 |
---|---|---|
committer | Matt Benjamin <mbenjamin@redhat.com> | 2016-02-12 18:05:27 +0100 |
commit | 5139c312f9ef4ba19d9fc46379c0455ac54e0a43 (patch) | |
tree | 78773354d57358af0f7ec4549aa700af74be0714 | |
parent | librgw: improve rgw_write and add WRITE_READ_VERIFY (diff) | |
download | ceph-5139c312f9ef4ba19d9fc46379c0455ac54e0a43.tar.xz ceph-5139c312f9ef4ba19d9fc46379c0455ac54e0a43.zip |
librgw: try-implement rgw_readv/rgw_writev
The model in the rgw_readv call is the caller owns the struct rgw_uio,
and uses it to pass uio_offset.
The caller returns the rgw_uio filled out, and provides indirectly
rgw_uio->uio_vio (the full private data is at uio->uio_p1 on return,
the caller (p==private) must not touch it. The caller returns
private data in rgw_readv_release() when finished.
Meanwhile rgw_writev is atomic.
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
-rw-r--r-- | src/include/rados/rgw_file.h | 41 | ||||
-rw-r--r-- | src/rgw/rgw_file.cc | 103 | ||||
-rw-r--r-- | src/test/librgw_file_gp.cc | 97 |
3 files changed, 220 insertions, 21 deletions
diff --git a/src/include/rados/rgw_file.h b/src/include/rados/rgw_file.h index 5689d7469cc..a5f4b1199b8 100644 --- a/src/include/rados/rgw_file.h +++ b/src/include/rados/rgw_file.h @@ -194,11 +194,6 @@ int rgw_read(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh, uint64_t offset, size_t length, size_t *bytes_read, void *buffer); -/* XXX add release fn and UIO type */ -int rgw_readv(struct rgw_fs *rgw_fs, - struct rgw_file_handle *fh, uint64_t offset, - size_t length, void *buffer); - /* write data to file */ @@ -206,12 +201,40 @@ int rgw_write(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh, uint64_t offset, size_t length, size_t *bytes_written, void *buffer); -/* XXX add release fn and UIO type */ +#define RGW_UIO_NONE 0x0000 +#define RGW_UIO_GIFT 0x0001 +#define RGW_UIO_FREE 0x0002 +#define RGW_UIO_BUFQ 0x0004 -int rgw_writev(struct rgw_fs *rgw_fs, - const struct rgw_file_handle *fh, uint64_t offset, - size_t length, void *buffer); +struct rgw_uio; +typedef void (*rgw_uio_release)(struct rgw_uio *, uint32_t); + +/* buffer vector descriptors */ +struct rgw_vio { + void *vio_p1; + void *vio_u1; + void *vio_base; + int32_t vio_len; +}; + +struct rgw_uio { + rgw_uio_release uio_rele; + void *uio_p1; + void *uio_u1; + uint64_t uio_offset; + uint64_t uio_resid; + uint32_t uio_cnt; + uint32_t uio_flags; + struct rgw_vio *uio_vio; /* appended vectors */ +}; + +typedef struct rgw_uio rgw_uio; +int rgw_readv(struct rgw_fs *rgw_fs, + rgw_file_handle *fh, rgw_uio *uio); + +int rgw_writev(struct rgw_fs *rgw_fs, + struct rgw_file_handle *fh, rgw_uio *uio); /* sync written data diff --git a/src/rgw/rgw_file.cc b/src/rgw/rgw_file.cc index 6136f7848bd..a3b6dc409af 100644 --- a/src/rgw/rgw_file.cc +++ b/src/rgw/rgw_file.cc @@ -373,6 +373,109 @@ int rgw_write(struct rgw_fs *rgw_fs, } /* + read data from file (vector) +*/ +class RGWReadV +{ + buffer::list bl; + struct rgw_vio* vio; + +public: + RGWReadV(buffer::list& _bl, rgw_vio* _vio) : vio(_vio) { + bl.claim(_bl); + } + + struct rgw_vio* get_vio() { return vio; } + + const std::list<buffer::ptr>& buffers() { return bl.buffers(); } + + unsigned /* XXX */ length() { return bl.length(); } + +}; + +void rgw_readv_rele(struct rgw_uio *uio, uint32_t flags) +{ + RGWReadV* rdv = static_cast<RGWReadV*>(uio->uio_p1); + rdv->~RGWReadV(); + ::operator delete(rdv); +} + +int rgw_readv(struct rgw_fs *rgw_fs, + struct rgw_file_handle *fh, rgw_uio *uio) +{ + CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw); + RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private); + RGWFileHandle* rgw_fh = get_rgwfh(fh); + + if (! rgw_fh->is_object()) + return EINVAL; + + buffer::list bl; + RGWGetObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(), + rgw_fh->object_name(), uio->uio_offset, uio->uio_resid, + bl); + + int rc = librgw.get_fe()->execute_req(&req); + + if (! rc) { + RGWReadV* rdv = static_cast<RGWReadV*>( + ::operator new(sizeof(RGWReadV) + + (bl.buffers().size() * sizeof(struct rgw_vio)))); + + (void) new (rdv) + RGWReadV(bl, reinterpret_cast<rgw_vio*>(rdv+sizeof(RGWReadV))); + + uio->uio_p1 = rdv; + uio->uio_cnt = rdv->buffers().size(); + uio->uio_resid = rdv->length(); + uio->uio_vio = rdv->get_vio(); + uio->uio_rele = rgw_readv_rele; + + int ix = 0; + auto& buffers = rdv->buffers(); + for (auto& bp : buffers) { + rgw_vio *vio = &(uio->uio_vio[ix]); + vio->vio_base = const_cast<char*>(bp.c_str()); + vio->vio_len = bp.length(); + vio->vio_u1 = nullptr; + vio->vio_p1 = nullptr; + ++ix; + } + } + + return rc; +} + +/* + write data to file (vector) +*/ + int rgw_writev(struct rgw_fs *rgw_fs, struct rgw_file_handle *fh, + rgw_uio *uio) +{ + CephContext* cct = static_cast<CephContext*>(rgw_fs->rgw); + RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private); + RGWFileHandle* rgw_fh = get_rgwfh(fh); + + if (! rgw_fh->is_object()) + return EINVAL; + + buffer::list bl; + for (unsigned int ix = 0; ix < uio->uio_cnt; ++ix) { + rgw_vio *vio = &(uio->uio_vio[ix]); + bl.push_back( + buffer::create_static(vio->vio_len, + static_cast<char*>(vio->vio_base))); + } + + RGWPutObjRequest req(cct, fs->get_user(), rgw_fh->bucket_name(), + rgw_fh->object_name(), bl); + + int rc = librgw.get_fe()->execute_req(&req); + + return rc; +} + +/* sync written data */ int rgw_fsync(struct rgw_fs *rgw_fs, struct rgw_file_handle *handle) diff --git a/src/test/librgw_file_gp.cc b/src/test/librgw_file_gp.cc index 4b9fa9b28cf..6de26aaa36e 100644 --- a/src/test/librgw_file_gp.cc +++ b/src/test/librgw_file_gp.cc @@ -40,6 +40,8 @@ namespace { bool do_pre_list = false; bool do_put = false; bool do_bulk = false; + bool do_writev = false; + bool do_readv = false; bool do_get = false; bool do_delete = false; @@ -54,10 +56,13 @@ namespace { std::uniform_int_distribution<uint8_t> uint_dist; std::mt19937 rng; - + + constexpr int iovcnt = 16; + constexpr int page_size = 65536; + struct ZPage { - char data[65536]; + char data[page_size]; uint64_t cksum; }; /* ZPage */ @@ -71,15 +76,15 @@ namespace { iovs = (struct iovec*) calloc(n, sizeof(struct iovec)); for (int page_ix = 0; page_ix < n; ++page_ix) { ZPage* p = new ZPage(); - for (int data_ix = 0; data_ix < 65536; ++data_ix) { + for (int data_ix = 0; data_ix < page_size; ++data_ix) { p->data[data_ix] = uint_dist(rng); } // data_ix - p->cksum = XXH64(p->data, 65536, 8675309); + p->cksum = XXH64(p->data, page_size, 8675309); pages.emplace_back(p); // and iovs struct iovec* iov = &iovs[page_ix]; iov->iov_base = p->data; - iov->iov_len = 65536; + iov->iov_len = page_size; } // page_ix } @@ -102,7 +107,7 @@ namespace { int n = size(); for (int page_ix = 0; page_ix < n; ++page_ix) { ZPage* p = pages[page_ix]; - p->cksum = XXH64(p->data, 65536, 8675309); + p->cksum = XXH64(p->data, page_size, 8675309); } } @@ -112,7 +117,7 @@ namespace { ZPage* p = pages[page_ix]; struct iovec* iov = &iovs[page_ix]; iov->iov_base = p->data; - iov->iov_len = 65536; + iov->iov_len = page_size; } } @@ -216,23 +221,85 @@ TEST(LibRGW, GET_OBJECT) { TEST(LibRGW, WRITE_READ_VERIFY) { - if (do_bulk) { - const int iovcnt = 16; + if (do_bulk && do_put) { ZPageSet zp_set1{iovcnt}; // 1M random data in 16 64K pages struct iovec *iovs = zp_set1.get_iovs(); /* read after write POSIX-style */ size_t nbytes, off = 0; - for (int ix = 0; ix < 16; ++ix, off += 65536) { + for (int ix = 0; ix < 16; ++ix, off += page_size) { struct iovec *iov = &iovs[ix]; - int ret = rgw_write(fs, object_fh, off, 65536, &nbytes, iov->iov_base); + int ret = rgw_write(fs, object_fh, off, page_size, &nbytes, + iov->iov_base); ASSERT_EQ(ret, 0); - ASSERT_EQ(nbytes, size_t(65536)); + ASSERT_EQ(nbytes, size_t(page_size)); } zp_set1.reset_iovs(); } } +/* "functions that call alloca are not inlined" + * --alexandre oliva + * http://gcc.gnu.org/ml/gcc-help/2004-04/msg00158.html + */ +#define alloca_uio() \ + do {\ + int uiosz = sizeof(rgw_uio) + iovcnt*sizeof(rgw_vio); \ + uio = static_cast<rgw_uio*>(alloca(uiosz)); \ + memset(uio, 0, uiosz); \ + uio->uio_vio = reinterpret_cast<rgw_vio*>(uio+sizeof(rgw_uio)); \ + } while (0); \ + +TEST(LibRGW, WRITEV) +{ + if (do_writev && do_put) { + rgw_uio* uio; + ZPageSet zp_set1{iovcnt}; // 1M random data in 16 64K pages + struct iovec *iovs = zp_set1.get_iovs(); + alloca_uio(); + ASSERT_NE(uio, nullptr); + + for (int ix = 0; ix < iovcnt; ++ix) { + struct iovec *iov = &iovs[ix]; + rgw_vio *vio = &(uio->uio_vio[ix]); + vio->vio_base = iov->iov_base; + vio->vio_len = iov->iov_len; + vio->vio_u1 = iov; // private data + } + uio->uio_cnt = iovcnt; + uio->uio_offset = iovcnt * page_size; + + int ret = rgw_writev(fs, object_fh, uio); + ASSERT_EQ(ret, 0); + //zp_set1.reset_iovs(); + } +} + +TEST(LibRGW, READV) +{ + if (do_readv && do_get) { + rgw_uio uio[1]; + memset(uio, 0, sizeof(rgw_uio)); + uio->uio_offset = 0; // ok, it was already 0 + int ret = rgw_readv(fs, object_fh, uio); + ASSERT_EQ(ret, 0); + //buffer::list bl; + buffer::list& bl = *(new buffer::list()); + for (unsigned int ix = 0; ix < uio->uio_cnt; ++ix) { + rgw_vio *vio = &(uio->uio_vio[ix]); + bl.push_back( + buffer::create_static(vio->vio_len, + static_cast<char*>(vio->vio_base))); + } + bl.hexdump(std::cout); + // release resources + ASSERT_NE(uio->uio_rele, nullptr); + if (uio->uio_rele) { + uio->uio_rele(uio, RGW_UIO_NONE); + } + } +} + TEST(LibRGW, DELETE_OBJECT) { if (do_delete) { int ret = rgw_unlink(fs, bucket_fh, object_name.c_str()); @@ -302,6 +369,12 @@ int main(int argc, char *argv[]) } else if (ceph_argparse_flag(args, arg_iter, "--bulk", (char*) nullptr)) { do_bulk = true; + } else if (ceph_argparse_flag(args, arg_iter, "--writev", + (char*) nullptr)) { + do_writev = true; + } else if (ceph_argparse_flag(args, arg_iter, "--readv", + (char*) nullptr)) { + do_readv = true; } else if (ceph_argparse_flag(args, arg_iter, "--delete", (char*) nullptr)) { do_delete = true; |