diff options
author | Venky Shankar <vshankar@redhat.com> | 2024-12-13 08:54:05 +0100 |
---|---|---|
committer | Venky Shankar <vshankar@redhat.com> | 2024-12-13 08:54:05 +0100 |
commit | 955bf28fe4763792e98d9d006c5fa9de48774227 (patch) | |
tree | 0ab04b76b2d25f68b98e2dc70870012cedf63c4d | |
parent | Merge pull request #60770 from Matan-B/wip-matanb-crimson-enable-alienstore-d... (diff) | |
parent | Temporarily change the libcephfs dependencies (diff) | |
download | ceph-955bf28fe4763792e98d9d006c5fa9de48774227.tar.xz ceph-955bf28fe4763792e98d9d006c5fa9de48774227.zip |
Merge PR #58376 into main
* refs/pull/58376/head:
Temporarily change the libcephfs dependencies
proxy: Add the design document
proxy: Add the proxy to the deb builds
proxy: Add the proxy to the rpm builds
Initial version of the libcephfs proxy
Reviewed-by: Sachin Prabhu <sp@spui.uk>
Reviewed-by: Venky Shankar <vshankar@redhat.com>
26 files changed, 6243 insertions, 0 deletions
diff --git a/.githubmap b/.githubmap index 68c711aa587..5265fa59bed 100644 --- a/.githubmap +++ b/.githubmap @@ -188,3 +188,4 @@ robbat2 Robin H. Johnson <robbat2@orbis-terrarum.net> leonid-s-usov Leonid Usov <leonid.usov@ibm.com> ffilz Frank S. Filz <ffilzlnx@mindspring.com> Jayaprakash-ibm Jaya Prakash Madaka <jayaprakash@ibm.com> +spuiuk Sachin Prabhu <sp@spui.uk> diff --git a/ceph.spec.in b/ceph.spec.in index ece1ebf2ec8..aedb6ae38b4 100644 --- a/ceph.spec.in +++ b/ceph.spec.in @@ -1186,18 +1186,41 @@ Obsoletes: libcephfs1 < %{_epoch_prefix}%{version}-%{release} Obsoletes: ceph-libs < %{_epoch_prefix}%{version}-%{release} Obsoletes: ceph-libcephfs < %{_epoch_prefix}%{version}-%{release} %endif +Recommends: libcephfs-proxy2 = %{_epoch_prefix}%{version}-%{release} +Requires: libcephfs-daemon %description -n libcephfs2 Ceph is a distributed network file system designed to provide excellent performance, reliability, and scalability. This is a shared library allowing applications to access a Ceph distributed file system via a POSIX-like interface. +%package -n libcephfs-proxy2 +Summary: Proxy for libcephfs +%if 0%{?suse_version} +Group: System/Libraries +%endif +Recommends: libcephfs-daemon = %{_epoch_prefix}%{version}-%{release} +%description -n libcephfs-proxy2 +This package contains the libcephfs_proxy.so library that allow applications +to share cephfs mounts to reduce resource consumption. + +%package -n libcephfs-daemon +Summary: Deamon for the libcephfs proxy +%if 0%{?suse_version} +Group: System/Filesystems +%endif +Requires: libcephfs2 = %{_epoch_prefix}%{version}-%{release} +%description -n libcephfs-daemon +This package contains the libcephfsd daemon that allows applications to share +cephfs mounts to reduce resource consumption. + %package -n libcephfs-devel Summary: Ceph distributed file system headers %if 0%{?suse_version} Group: Development/Libraries/C and C++ %endif Requires: libcephfs2 = %{_epoch_prefix}%{version}-%{release} +Requires: libcephfs-proxy2 = %{_epoch_prefix}%{version}-%{release} Requires: librados-devel = %{_epoch_prefix}%{version}-%{release} Obsoletes: ceph-devel < %{_epoch_prefix}%{version}-%{release} Provides: libcephfs2-devel = %{_epoch_prefix}%{version}-%{release} @@ -2524,6 +2547,16 @@ fi %postun -n libcephfs2 -p /sbin/ldconfig +%files -n libcephfs-proxy2 +%{_libdir}/libcephfs_proxy.so.* + +%post -n libcephfs-proxy2 -p /sbin/ldconfig + +%postun -n libcephfs-proxy2 -p /sbin/ldconfig + +%files -n libcephfs-daemon +%{_sbindir}/libcephfsd + %files -n libcephfs-devel %dir %{_includedir}/cephfs %{_includedir}/cephfs/libcephfs.h @@ -2532,6 +2565,7 @@ fi %dir %{_includedir}/cephfs/metrics %{_includedir}/cephfs/metrics/Types.h %{_libdir}/libcephfs.so +%{_libdir}/libcephfs_proxy.so %files -n python%{python3_pkgversion}-cephfs %{python3_sitearch}/cephfs.cpython*.so diff --git a/debian/.gitignore b/debian/.gitignore index 32ca866d753..1d6ef3a34b5 100644 --- a/debian/.gitignore +++ b/debian/.gitignore @@ -38,4 +38,6 @@ /python-cephfs /libcephfs-java /libcephfs-jni +/libcephfs-proxy0-dbg +/libcephfs-proxy0 /tmp diff --git a/debian/control b/debian/control index d31a82bbc75..ec04c2599cd 100644 --- a/debian/control +++ b/debian/control @@ -891,6 +891,7 @@ Conflicts: libceph, Replaces: libceph, libceph1, libcephfs, +Recommends: libcephfs-proxy2 (= ${binary:Version}) Architecture: linux-any Section: libs Depends: ${misc:Depends}, @@ -919,10 +920,61 @@ Description: debugging symbols for libcephfs2 . This package contains debugging symbols for libcephfs2. +Package: libcephfs-proxy2 +Architecture: linux-any +Section: libs +Depends: ${misc:Depends}, + ${shlibs:Depends}, +Recommends: libcephfs-daemon (= ${binary:Version}) +Description: Libcephfs proxy library + Ceph is a massively scalable, open-source, distributed + storage system that runs on commodity hardware and delivers object, + block and file system storage. This allows applications to share + libcephfs' CephFS mounts to reduce resource consumption. + +Package: libcephfs-proxy2-dbg +Architecture: linux-any +Section: debug +Priority: extra +Depends: libcephfs-proxy2 (= ${binary:Version}), + ${misc:Depends}, +Description: debugging symbols for libcephfs-proxy2 + Ceph is a massively scalable, open-source, distributed + storage system that runs on commodity hardware and delivers object, + block and file system storage. This allows applications to share + libcephfs' CephFS mounts to reduce resource consumption. + . + This package contains debugging symbols for libcephfs-proxy2. + +Package: libcephfs-daemon +Architecture: linux-any +Depends: libcephfs2 (= ${binary:Version}), + ${misc:Depends}, +Description: Libcephfs proxy daemon + Ceph is a massively scalable, open-source, distributed + storage system that runs on commodity hardware and delivers object, + block and file system storage. This allows applications to share + libcephfs' CephFS mounts to reduce resource consumption. + +Package: libcephfs-daemon-dbg +Architecture: linux-any +Section: debug +Priority: extra +Depends: libcephfs-daemon (= ${binary:Version}), + ${misc:Depends}, +Description: debugging symbols for libcephfs-daemon + Ceph is a massively scalable, open-source, distributed + storage system that runs on commodity hardware and delivers object, + block and file system storage. This allows applications to share + libcephfs' CephFS mounts to reduce resource consumption. + . + This package contains debugging symbols for libcephfs-proxy2. + Package: libcephfs-dev Architecture: linux-any Section: libdevel Depends: libcephfs2 (= ${binary:Version}), + libcephfs-proxy2 (= ${binary:Version}), ${misc:Depends}, Conflicts: libceph-dev, libceph1-dev, diff --git a/debian/libcephfs-daemon.install b/debian/libcephfs-daemon.install new file mode 100644 index 00000000000..454de46d2d7 --- /dev/null +++ b/debian/libcephfs-daemon.install @@ -0,0 +1 @@ +usr/sbin/libcephfsd diff --git a/debian/libcephfs-dev.install b/debian/libcephfs-dev.install index cf22dce62d4..40e37414051 100644 --- a/debian/libcephfs-dev.install +++ b/debian/libcephfs-dev.install @@ -3,3 +3,4 @@ usr/include/cephfs/libcephfs.h usr/include/cephfs/types.h usr/include/cephfs/metrics/Types.h usr/lib/libcephfs.so +usr/lib/libcephfs_proxy.so diff --git a/debian/libcephfs-proxy2.install b/debian/libcephfs-proxy2.install new file mode 100644 index 00000000000..fc363125bc2 --- /dev/null +++ b/debian/libcephfs-proxy2.install @@ -0,0 +1 @@ +usr/lib/libcephfs_proxy.so.* diff --git a/debian/rules b/debian/rules index 3fbed3f3a2e..6c0ab5e12c6 100755 --- a/debian/rules +++ b/debian/rules @@ -121,6 +121,8 @@ override_dh_strip: dh_strip -plibradosstriper1 --dbg-package=libradosstriper1-dbg dh_strip -plibrbd1 --dbg-package=librbd1-dbg dh_strip -plibcephfs2 --dbg-package=libcephfs2-dbg + dh_strip -plibcephfs-proxy2 --dbg-package=libcephfs-proxy2-dbg + dh_strip -plibcephfs-daemon --dbg-package=libcephfs-daemon-dbg dh_strip -plibrgw2 --dbg-package=librgw2-dbg dh_strip -pradosgw --dbg-package=radosgw-dbg dh_strip -pceph-test --dbg-package=ceph-test-dbg diff --git a/doc/dev/libcephfs_proxy.rst b/doc/dev/libcephfs_proxy.rst new file mode 100644 index 00000000000..baa96f765c9 --- /dev/null +++ b/doc/dev/libcephfs_proxy.rst @@ -0,0 +1,289 @@ +Design of the libcephfs proxy +============================= + +Description of the problem +-------------------------- + +When an application connects to a Ceph volume through the *libcephfs.so* +library, a cache is created locally inside the process. The *libcephfs.so* +implementation already deals with memory usage of the cache and adjusts it so +that it doesn't consume all the available memory. However, if multiple +processes connect to CephFS through different instances of the library, each +one of them will keep a private cache. In this case memory management is not +effective because, even configuring memory limits, the number of libcephfs +instances that can be created is unbounded and they can't work in a coordinated +way to correctly control ressource usage. Due to this, it's relatively easy to +consume all memory when all processes are using data cache intensively. This +causes the OOM killer to terminate those processes. + +Proposed solution +----------------- + +High level approach +^^^^^^^^^^^^^^^^^^^ + +The main idea is to create a *libcephfs_proxy.so* library that will provide the +same API as the original *libcephfs.so*, but won't cache any data. This library +can be used by any application currently using *libcephfs.so* in a transparent +way (i.e. no code modification required) just by linking against +*libcephfs_proxy.so* instead of *libcephfs.so*, or even using *LD_PRELOAD*. + +A new *libcephfsd* daemon will also be created. This daemon will link against +the real *libcephfs.so* library and will listen for incoming connections on a +UNIX socket. + +When an application starts and initiates CephFS requests through the +*libcephfs_proxy.so* library, it will connect to the *libcephfsd* daemon +through the UNIX socket and it will forward all CephFS requests to it. The +daemon will use the real *libcephfs.so* to execute those requests and the +answers will be returned back to the application, potentially caching data in +the *libcephfsd* process itself. All this will happen transparently, without +any knowledge from the application. + +The daemon will share low level *libcephfs.so* mounts between different +applications to avoid creating an instance for each application, which would +have the same effect on memory as linking each application directly to the +*libcephfs.so* library. This will be done only if the configuration defined by +the applications is identical. Otherwise new independent instances will still +be created. + +Some *libcephfs.so* functions will need to be implemented in an special way +inside the *libcephfsd* daemon to hide the differences caused by sharing the +same mount instance with more than one client (for example chdir/getcwd cannot +rely directly on the ``ceph_chdir()``/``ceph_getcwd()`` of *libcephfs.so*). + +Initially, only the subset of the low-level interface functions of +*libcephfs.so* that are used by the Samba's VFS CephFS module will be provided. + +Design of common components +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Network protocol +"""""""""""""""" + +Since the connection through the UNIX socket is to another process that runs on +the same machine and the data we need to pass is quite simple, we'll avoid all +the overhead of generic XDR encoding/decoding and RPC transmissions by using a +very simple serialization implemented in the code itself. For the future we may +consider using cap'n proto (https://capnproto.org), which claims to have zero +overhead for encoding and decoding, and would provide an easy way to support +backward compatibility if the network protocol needs to be modified in the +future. + +Design of the *libcephfs_proxy.so* library +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This library will basically connect to the UNIX socket where the *libcephfsd* +daemon is listening, wait for requests coming from the application, serialize +all function arguments and send them to the daemon. Once the daemon responds it +will deserialize the answer and return the result to the application. + +Local caching +""""""""""""" + +While the main purpose of this library is to avoid independent caches on each +process, some preliminary testing has shown a big performance drop for +workloads based on metadata operations and/or small files when all requests go +through the proxy daemon. To minimize this, metadata caching should be +implemented. Metadata cache is much smaller than data cache and will provide a +good trade-off between memory usage and performance. + +To implement caching in a safe way, it's required to correctly invalidate data +before it becomes stale. Currently libcephfs.so provides invalidation +notifications that can be used to implement this, but its semantics are not +fully understood yet, so the cache in the libcephfs_proxy.so library will be +designed and implemented in a future version. + + +Design of the *libcephfsd* daemon +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The daemon will be a regular process that will centralize libcephfs requests +coming from other processes on the same machine. + +Process maintenance +""""""""""""""""""" + +Since the process will work as a standalone daemon, a simple systemd unit file +will be provided to manage it as a regular system service. Most probably this +will be integrated inside cephadm in the future. + +In case the *libcephfsd* daemon crashes, we'll rely on systemd to restart it. + + +Special functions +^^^^^^^^^^^^^^^^^ + +Some functions will need to be handled in a special way inside the *libcephfsd* +daemon to provide correct functionality since forwarding them directly to +*libcephfs.so* could return incorrect results due to the sharing of low-level +mounts. + +**Sharing of underlying struct ceph_mount_info** + +The main purpose of the proxy is to avoid creating a new mount for each process +when they are accessing the same data. To be able to provide this we need to +"virtualize" the mount points and let the application believe it's using its +own mount when, in fact, it could be using a shared mount. + +The daemon will track the Ceph account used to connect to the volume, the +configuration file and any specific configuration changes done before mounting +the volume. Only if all settings are exactly the same as another already +mounted instance, then the mount will be shared. The daemon won't understand +CephFS settings nor any potential dependencies between settings. For this +reason, a very strict comparison will be done: the configuration file needs to +be identical and any other changes made afterwards need to be set to the exact +same value and in the same order so that two configurations can be considered +identical. + +The check to determine whether two configurations are identical or not will be +done just before mounting the volume (i.e. ``ceph_mount()``). This means that +during the configuration phase, we may have many simultaneous mounts allocated +but not yet mounted. However only one of them will become a real mount. The +others will remain unmounted and will be eventually destroyed once users +unmount and release them. + +The following functions will be affected: + +* **ceph_create** + + This one will allocate a new ceph_mount_info structure, and the provided id + will be recorded for future comparison of potentially matching mounts. + +* **ceph_release** + + This one will release an unmounted ceph_mount_info structure. Unmounted + structures won't be shared with anyone else. + +* **ceph_conf_read_file** + + This one will read the configuration file, compute a checksum and make a + copy. The copy will make sure that there are no changes in the configuration + file since the time the checksum was computed, and the checksum will be + recorded for future comparison of potentially matching mounts. + +* **ceph_conf_get** + + This one will obtain the requested setting, recording it for future + comparison of potentially matching mounts. + + Even though this may seem unnecessary, since the daemon is considering the + configuration as a black box, it could be possible to have some dynamic + setting that could return different values depending on external factors, so + the daemon also requires that any requested setting returns the same value to + consider two configurations identical. + +* **ceph_conf_set** + + This one will record the modified value for future comparison of potentially + matching mounts. + + In normal circumstances, some settings may be set even after having mounted + the volume. The proxy won't allow that to avoid potential interferences with + other clients sharing the same mount. + +* **ceph_init** + + This one will be a no-op. Calling this function triggers the allocation of + several resources and starts some threads. This is just a waste of resources + if this *ceph_mount_info* structure is not finally mounted because it matches + with an already existing mount. + + Only if at the time of mount (i.e. ``ceph_mount()``) there's no match with + already existing mounts, then the mount will be initialized and mounted at + the same time. + +* **ceph_select_filesystem** + + This one will record the selected file system for future comparison of + potentially matching mounts. + +* **ceph_mount** + + This one will try to find an active mount that matches with all the + configurations defined for this *ceph_mount_info* structure. If none is + found, it will be mounted. Otherwise, the already existing mount will be + shared with this client. + + The unmounted *ceph_mount_info* structures will be kept around associated + with the mounted one. + + All "real" mounts will be made against the absolute root of the volume + (i.e. "/") to make sure they can be shared with other clients later, + regardless of whether they use the same mount point or not. This means that + just after mounting, the daemon will need to resolve and store the root inode + of the "virtual" mount point. + + The CWD (Current Working Directory) will also be initialized to the same + inode. + +* **ceph_unmount** + + This one will detach the client from the mounted *ceph_mount_info* structure + and reattach it to one of the associated unmounted structures. If this was + the last user of the mount, it's finally unmounted instead. + + After calling this function, the client continues using a private + *ceph_mount_info* structure that is used exclusively by itself, so other + configuration changes and operations can be done safely. + +**Confine accesses to the intended mount point** + +Since the effective mount point may not match the real mount point, some +functions could be able to return inodes outside of the effective mount point +if not handled with care. To avoid it and provide the result that the user +application expects, we will need to simulate some of them inside the +*libcephfsd* daemon. + +There are three special cases to consider: + +1. Handling of paths starting with "/" +2. Handling of paths containing ".." (i.e. parent directory) +3. Handling of paths containing symbolic links + +When these special paths are found, they need to be handled in a special way to +make sure that the returned inodes are what the client expects. + +The following functions will be affected: + +* **ceph_ll_lookup** + + Lookup accepts ".." as the name to resolve. If the parent directory is the + root of the "virtual" mount point (which may not be the same as the real + mount point), we'll need to return the inode corresponding to the "virtual" + mount point stored at the time of mount, instead of the real parent. + +* **ceph_ll_lookup_root** + + This one needs to return the root inode stored at the time of mount. + +* **ceph_ll_walk** + + This one will be completely reimplemented inside the daemon to be able to + correctly parse each path component and symbolic link, and handle "/" and + ".." in the correct way. + +* **ceph_chdir** + + This one will resolve the passed path and store it along the corresponding + inode inside the current "virtual" mount. The real ``ceph_chdir()`` won't be + called. + +* **ceph_getcwd** + + This one will just return the path stored in the "virtual" mount from + previous ``ceph_chdir()`` calls. + +**Handle AT_FDCWD** + +Any function that receives a file descriptor could also receive the special +*AT_FDCWD* value. These functions need to check for that value and use the +"virtual" CWD instead. + +Testing +------- + +The proxy should be transparent to any application already using +*libcephfs.so*. This also applies to testing scripts and applications. So any +existing test against the regular *libcephfs.so* library can also be used to +test the proxy. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 43bab75680d..4a1c5768aa7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -857,6 +857,7 @@ if(WITH_LIBCEPHFS) if(LINUX) add_subdirectory(mount) endif() + add_subdirectory(libcephfs_proxy) endif(WITH_LIBCEPHFS) if(WITH_LIBCEPHSQLITE) diff --git a/src/libcephfs_proxy/CMakeLists.txt b/src/libcephfs_proxy/CMakeLists.txt new file mode 100644 index 00000000000..e19841241e7 --- /dev/null +++ b/src/libcephfs_proxy/CMakeLists.txt @@ -0,0 +1,18 @@ +set(proxy_common_srcs proxy_link.c proxy_log.c) +set(libcephfsd_srcs libcephfsd.c proxy_manager.c proxy_mount.c proxy_helpers.c ${proxy_common_srcs}) +set(libcephfs_proxy_srcs libcephfs_proxy.c ${proxy_common_srcs}) + +add_executable(libcephfsd ${libcephfsd_srcs}) +add_library(cephfs_proxy ${CEPH_SHARED} ${libcephfs_proxy_srcs}) + +target_link_libraries(libcephfsd cephfs ${CRYPTO_LIBS}) + +if(ENABLE_SHARED) + set_target_properties(cephfs_proxy PROPERTIES + OUTPUT_NAME cephfs_proxy + VERSION 2.0.0 + SOVERSION 2) +endif(ENABLE_SHARED) + +install(TARGETS libcephfsd DESTINATION ${CMAKE_INSTALL_SBINDIR}) +install(TARGETS cephfs_proxy DESTINATION ${CMAKE_INSTALL_LIBDIR}) diff --git a/src/libcephfs_proxy/libcephfs_proxy.c b/src/libcephfs_proxy/libcephfs_proxy.c new file mode 100644 index 00000000000..149fae123f7 --- /dev/null +++ b/src/libcephfs_proxy/libcephfs_proxy.c @@ -0,0 +1,869 @@ + +#include <stdlib.h> + +#include "include/cephfs/libcephfs.h" + +#include "proxy_log.h" +#include "proxy_helpers.h" +#include "proxy_requests.h" + +/* We override the definition of the ceph_mount_info structure to contain + * internal proxy information. This is already a black box for libcephfs users, + * so this won't be noticed. */ +struct ceph_mount_info { + proxy_link_t link; + uint64_t cmount; +}; + +/* The global_cmount is used to stablish an initial connection to serve requests + * not related to a real cmount, like ceph_version or ceph_userperm_new. */ +static struct ceph_mount_info global_cmount = { PROXY_LINK_DISCONNECTED, 0 }; + +static bool client_stop(proxy_link_t *link) +{ + return false; +} + +static int32_t proxy_connect(proxy_link_t *link) +{ + CEPH_REQ(hello, req, 0, ans, 0); + char *path, *env; + int32_t sd, err; + + path = PROXY_SOCKET; + env = getenv(PROXY_SOCKET_ENV); + if (env != NULL) { + path = env; + } + + sd = proxy_link_client(link, path, client_stop); + if (sd < 0) { + return sd; + } + + req.id = LIBCEPHFS_LIB_CLIENT; + err = proxy_link_send(sd, req_iov, 1); + if (err < 0) { + goto failed; + } + err = proxy_link_recv(sd, ans_iov, 1); + if (err < 0) { + goto failed; + } + + proxy_log(LOG_INFO, 0, "Connected to libcephfsd version %d.%d", + ans.major, ans.minor); + + if ((ans.major != LIBCEPHFSD_MAJOR) || + (ans.minor != LIBCEPHFSD_MINOR)) { + err = proxy_log(LOG_ERR, ENOTSUP, "Version not supported"); + goto failed; + } + + return sd; + +failed: + proxy_link_close(link); + + return err; +} + +static void proxy_disconnect(proxy_link_t *link) +{ + proxy_link_close(link); +} + +static int32_t proxy_global_connect(void) +{ + int32_t err; + + err = 0; + + if (!proxy_link_is_connected(&global_cmount.link)) { + err = proxy_connect(&global_cmount.link); + } + + return err; +} + +static int32_t proxy_check(struct ceph_mount_info *cmount, int32_t err, + int32_t result) +{ + if (err < 0) { + proxy_disconnect(&cmount->link); + proxy_log(LOG_ERR, err, "Disconnected from libcephfsd"); + + return err; + } + + return result; +} + +/* Macros to simplify communication with the server. */ +#define CEPH_RUN(_cmount, _op, _req, _ans) \ + ({ \ + int32_t __err = \ + CEPH_CALL((_cmount)->link.sd, _op, _req, _ans); \ + __err = proxy_check(_cmount, __err, (_ans).header.result); \ + __err; \ + }) + +#define CEPH_PROCESS(_cmount, _op, _req, _ans) \ + ({ \ + int32_t __err = -ENOTCONN; \ + if (proxy_link_is_connected(&(_cmount)->link)) { \ + (_req).cmount = (_cmount)->cmount; \ + __err = CEPH_RUN(_cmount, _op, _req, _ans); \ + } \ + __err; \ + }) + +__public int ceph_chdir(struct ceph_mount_info *cmount, const char *path) +{ + CEPH_REQ(ceph_chdir, req, 1, ans, 0); + + CEPH_STR_ADD(req, path, path); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_CHDIR, req, ans); +} + +__public int ceph_conf_get(struct ceph_mount_info *cmount, const char *option, + char *buf, size_t len) +{ + CEPH_REQ(ceph_conf_get, req, 1, ans, 1); + + req.size = len; + + CEPH_STR_ADD(req, option, option); + CEPH_BUFF_ADD(ans, buf, len); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_CONF_GET, req, ans); +} + +__public int ceph_conf_read_file(struct ceph_mount_info *cmount, + const char *path_list) +{ + CEPH_REQ(ceph_conf_read_file, req, 1, ans, 0); + + CEPH_STR_ADD(req, path, path_list); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_CONF_READ_FILE, req, ans); +} + +__public int ceph_conf_set(struct ceph_mount_info *cmount, const char *option, + const char *value) +{ + CEPH_REQ(ceph_conf_set, req, 2, ans, 0); + + CEPH_STR_ADD(req, option, option); + CEPH_STR_ADD(req, value, value); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_CONF_SET, req, ans); +} + +__public int ceph_create(struct ceph_mount_info **cmount, const char *const id) +{ + CEPH_REQ(ceph_create, req, 1, ans, 0); + struct ceph_mount_info *ceph_mount; + int32_t sd, err; + + ceph_mount = proxy_malloc(sizeof(struct ceph_mount_info)); + if (ceph_mount == NULL) { + return -ENOMEM; + } + + err = proxy_connect(&ceph_mount->link); + if (err < 0) { + goto failed; + } + sd = err; + + CEPH_STR_ADD(req, id, id); + + err = CEPH_CALL(sd, LIBCEPHFSD_OP_CREATE, req, ans); + if ((err < 0) || ((err = ans.header.result) < 0)) { + goto failed_link; + } + + ceph_mount->cmount = ans.cmount; + + *cmount = ceph_mount; + + return 0; + +failed_link: + proxy_disconnect(&ceph_mount->link); + +failed: + proxy_free(ceph_mount); + + return err; +} + +__public const char *ceph_getcwd(struct ceph_mount_info *cmount) +{ + static char cwd[PATH_MAX]; + int32_t err; + + CEPH_REQ(ceph_getcwd, req, 0, ans, 1); + + CEPH_BUFF_ADD(ans, cwd, sizeof(cwd)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_GETCWD, req, ans); + if (err >= 0) { + return cwd; + } + + errno = -err; + + return NULL; +} + +__public int ceph_init(struct ceph_mount_info *cmount) +{ + CEPH_REQ(ceph_init, req, 0, ans, 0); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_INIT, req, ans); +} + +__public int ceph_ll_close(struct ceph_mount_info *cmount, + struct Fh *filehandle) +{ + CEPH_REQ(ceph_ll_close, req, 0, ans, 0); + + req.fh = ptr_value(filehandle); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_CLOSE, req, ans); +} + +__public int ceph_ll_create(struct ceph_mount_info *cmount, Inode *parent, + const char *name, mode_t mode, int oflags, + Inode **outp, Fh **fhp, struct ceph_statx *stx, + unsigned want, unsigned lflags, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_create, req, 1, ans, 1); + int32_t err; + + req.userperm = ptr_value(perms); + req.parent = ptr_value(parent); + req.mode = mode; + req.oflags = oflags; + req.want = want; + req.flags = lflags; + + CEPH_STR_ADD(req, name, name); + CEPH_BUFF_ADD(ans, stx, sizeof(*stx)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_CREATE, req, ans); + if (err >= 0) { + *outp = value_ptr(ans.inode); + *fhp = value_ptr(ans.fh); + } + + return err; +} + +__public int ceph_ll_fallocate(struct ceph_mount_info *cmount, struct Fh *fh, + int mode, int64_t offset, int64_t length) +{ + CEPH_REQ(ceph_ll_fallocate, req, 0, ans, 0); + + req.fh = ptr_value(fh); + req.mode = mode; + req.offset = offset; + req.length = length; + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_FALLOCATE, req, ans); +} + +__public int ceph_ll_fsync(struct ceph_mount_info *cmount, struct Fh *fh, + int syncdataonly) +{ + CEPH_REQ(ceph_ll_fsync, req, 0, ans, 0); + + req.fh = ptr_value(fh); + req.dataonly = syncdataonly; + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_FSYNC, req, ans); +} + +__public int ceph_ll_getattr(struct ceph_mount_info *cmount, struct Inode *in, + struct ceph_statx *stx, unsigned int want, + unsigned int flags, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_getattr, req, 0, ans, 1); + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.want = want; + req.flags = flags; + + CEPH_BUFF_ADD(ans, stx, sizeof(*stx)); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_GETATTR, req, ans); +} + +__public int ceph_ll_getxattr(struct ceph_mount_info *cmount, struct Inode *in, + const char *name, void *value, size_t size, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_getxattr, req, 1, ans, 1); + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.size = size; + CEPH_STR_ADD(req, name, name); + + CEPH_BUFF_ADD(ans, value, size); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_GETXATTR, req, ans); +} + +__public int ceph_ll_link(struct ceph_mount_info *cmount, struct Inode *in, + struct Inode *newparent, const char *name, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_link, req, 1, ans, 0); + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.parent = ptr_value(newparent); + CEPH_STR_ADD(req, name, name); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_LINK, req, ans); +} + +__public int ceph_ll_listxattr(struct ceph_mount_info *cmount, struct Inode *in, + char *list, size_t buf_size, size_t *list_size, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_listxattr, req, 0, ans, 1); + int32_t err; + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.size = buf_size; + + CEPH_BUFF_ADD(ans, list, buf_size); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_LISTXATTR, req, ans); + if (err >= 0) { + *list_size = ans.size; + } + + return err; +} + +__public int ceph_ll_lookup(struct ceph_mount_info *cmount, Inode *parent, + const char *name, Inode **out, + struct ceph_statx *stx, unsigned want, + unsigned flags, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_lookup, req, 1, ans, 1); + int32_t err; + + req.userperm = ptr_value(perms); + req.parent = ptr_value(parent); + req.want = want; + req.flags = flags; + CEPH_STR_ADD(req, name, name); + + CEPH_BUFF_ADD(ans, stx, sizeof(*stx)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_LOOKUP, req, ans); + if (err >= 0) { + *out = value_ptr(ans.inode); + } + + return err; +} + +__public int ceph_ll_lookup_inode(struct ceph_mount_info *cmount, + struct inodeno_t ino, Inode **inode) +{ + CEPH_REQ(ceph_ll_lookup_inode, req, 0, ans, 0); + int32_t err; + + req.ino = ino; + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_LOOKUP_INODE, req, ans); + if (err >= 0) { + *inode = value_ptr(ans.inode); + } + + return err; +} + +__public int ceph_ll_lookup_root(struct ceph_mount_info *cmount, Inode **parent) +{ + CEPH_REQ(ceph_ll_lookup_root, req, 0, ans, 0); + int32_t err; + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_LOOKUP_ROOT, req, ans); + if (err >= 0) { + *parent = value_ptr(ans.inode); + } + + return err; +} + +__public off_t ceph_ll_lseek(struct ceph_mount_info *cmount, + struct Fh *filehandle, off_t offset, int whence) +{ + CEPH_REQ(ceph_ll_lseek, req, 0, ans, 0); + int32_t err; + + req.fh = ptr_value(filehandle); + req.offset = offset; + req.whence = whence; + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_LSEEK, req, ans); + if (err >= 0) { + return ans.offset; + } + + return err; +} + +__public int ceph_ll_mkdir(struct ceph_mount_info *cmount, Inode *parent, + const char *name, mode_t mode, Inode **out, + struct ceph_statx *stx, unsigned want, + unsigned flags, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_mkdir, req, 1, ans, 1); + int32_t err; + + req.userperm = ptr_value(perms); + req.parent = ptr_value(parent); + req.mode = mode; + req.want = want; + req.flags = flags; + CEPH_STR_ADD(req, name, name); + + CEPH_BUFF_ADD(ans, stx, sizeof(*stx)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_MKDIR, req, ans); + if (err >= 0) { + *out = value_ptr(ans.inode); + } + + return err; +} + +__public int ceph_ll_mknod(struct ceph_mount_info *cmount, Inode *parent, + const char *name, mode_t mode, dev_t rdev, + Inode **out, struct ceph_statx *stx, unsigned want, + unsigned flags, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_mknod, req, 1, ans, 1); + int32_t err; + + req.userperm = ptr_value(perms); + req.parent = ptr_value(parent); + req.mode = mode; + req.rdev = rdev; + req.want = want; + req.flags = flags; + CEPH_STR_ADD(req, name, name); + + CEPH_BUFF_ADD(ans, stx, sizeof(*stx)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_MKNOD, req, ans); + if (err >= 0) { + *out = value_ptr(ans.inode); + } + + return err; +} + +__public int ceph_ll_open(struct ceph_mount_info *cmount, struct Inode *in, + int flags, struct Fh **fh, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_open, req, 0, ans, 0); + int32_t err; + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.flags = flags; + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_OPEN, req, ans); + if (err >= 0) { + *fh = value_ptr(ans.fh); + } + + return err; +} + +__public int ceph_ll_opendir(struct ceph_mount_info *cmount, struct Inode *in, + struct ceph_dir_result **dirpp, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_opendir, req, 0, ans, 0); + int32_t err; + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_OPENDIR, req, ans); + if (err >= 0) { + *dirpp = value_ptr(ans.dir); + } + + return err; +} + +__public int ceph_ll_put(struct ceph_mount_info *cmount, struct Inode *in) +{ + CEPH_REQ(ceph_ll_put, req, 0, ans, 0); + + req.inode = ptr_value(in); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_PUT, req, ans); +} + +__public int ceph_ll_read(struct ceph_mount_info *cmount, struct Fh *filehandle, + int64_t off, uint64_t len, char *buf) +{ + CEPH_REQ(ceph_ll_read, req, 0, ans, 1); + + req.fh = ptr_value(filehandle); + req.offset = off; + req.len = len; + + CEPH_BUFF_ADD(ans, buf, len); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_READ, req, ans); +} + +__public int ceph_ll_readlink(struct ceph_mount_info *cmount, struct Inode *in, + char *buf, size_t bufsize, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_readlink, req, 0, ans, 1); + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.size = bufsize; + + CEPH_BUFF_ADD(ans, buf, bufsize); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_READLINK, req, ans); +} + +__public int ceph_ll_releasedir(struct ceph_mount_info *cmount, + struct ceph_dir_result *dir) +{ + CEPH_REQ(ceph_ll_releasedir, req, 0, ans, 0); + + req.dir = ptr_value(dir); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_RELEASEDIR, req, ans); +} + +__public int ceph_ll_removexattr(struct ceph_mount_info *cmount, + struct Inode *in, const char *name, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_removexattr, req, 1, ans, 0); + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + CEPH_STR_ADD(req, name, name); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_REMOVEXATTR, req, ans); +} + +__public int ceph_ll_rename(struct ceph_mount_info *cmount, + struct Inode *parent, const char *name, + struct Inode *newparent, const char *newname, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_rename, req, 2, ans, 0); + + req.userperm = ptr_value(perms); + req.old_parent = ptr_value(parent); + req.new_parent = ptr_value(newparent); + CEPH_STR_ADD(req, old_name, name); + CEPH_STR_ADD(req, new_name, newname); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_RENAME, req, ans); +} + +__public void ceph_rewinddir(struct ceph_mount_info *cmount, + struct ceph_dir_result *dirp) +{ + CEPH_REQ(ceph_rewinddir, req, 0, ans, 0); + + req.dir = ptr_value(dirp); + + CEPH_PROCESS(cmount, LIBCEPHFSD_OP_REWINDDIR, req, ans); +} + +__public int ceph_ll_rmdir(struct ceph_mount_info *cmount, struct Inode *in, + const char *name, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_rmdir, req, 1, ans, 0); + + req.userperm = ptr_value(perms); + req.parent = ptr_value(in); + CEPH_STR_ADD(req, name, name); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_RMDIR, req, ans); +} + +__public int ceph_ll_setattr(struct ceph_mount_info *cmount, struct Inode *in, + struct ceph_statx *stx, int mask, + const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_setattr, req, 1, ans, 0); + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.mask = mask; + CEPH_BUFF_ADD(req, stx, sizeof(*stx)); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_SETATTR, req, ans); +} + +__public int ceph_ll_setxattr(struct ceph_mount_info *cmount, struct Inode *in, + const char *name, const void *value, size_t size, + int flags, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_setxattr, req, 2, ans, 0); + + req.userperm = ptr_value(perms); + req.inode = ptr_value(in); + req.size = size; + req.flags = flags; + CEPH_STR_ADD(req, name, name); + CEPH_BUFF_ADD(req, value, size); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_SETXATTR, req, ans); +} + +__public int ceph_ll_statfs(struct ceph_mount_info *cmount, struct Inode *in, + struct statvfs *stbuf) +{ + CEPH_REQ(ceph_ll_statfs, req, 0, ans, 1); + + req.inode = ptr_value(in); + + CEPH_BUFF_ADD(ans, stbuf, sizeof(*stbuf)); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_STATFS, req, ans); +} + +__public int ceph_ll_symlink(struct ceph_mount_info *cmount, Inode *in, + const char *name, const char *value, Inode **out, + struct ceph_statx *stx, unsigned want, + unsigned flags, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_symlink, req, 2, ans, 1); + int32_t err; + + req.userperm = ptr_value(perms); + req.parent = ptr_value(in); + req.want = want; + req.flags = flags; + CEPH_STR_ADD(req, name, name); + CEPH_STR_ADD(req, target, value); + + CEPH_BUFF_ADD(req, stx, sizeof(*stx)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_SYMLINK, req, ans); + if (err >= 0) { + *out = value_ptr(ans.inode); + } + + return err; +} + +__public int ceph_ll_unlink(struct ceph_mount_info *cmount, struct Inode *in, + const char *name, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_unlink, req, 1, ans, 0); + + req.userperm = ptr_value(perms); + req.parent = ptr_value(in); + CEPH_STR_ADD(req, name, name); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_UNLINK, req, ans); +} + +__public int ceph_ll_walk(struct ceph_mount_info *cmount, const char *name, + Inode **i, struct ceph_statx *stx, unsigned int want, + unsigned int flags, const UserPerm *perms) +{ + CEPH_REQ(ceph_ll_walk, req, 1, ans, 1); + int32_t err; + + req.userperm = ptr_value(perms); + req.want = want; + req.flags = flags; + CEPH_STR_ADD(req, path, name); + + CEPH_BUFF_ADD(ans, stx, sizeof(*stx)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_WALK, req, ans); + if (err >= 0) { + *i = value_ptr(ans.inode); + } + + return err; +} + +__public int ceph_ll_write(struct ceph_mount_info *cmount, + struct Fh *filehandle, int64_t off, uint64_t len, + const char *data) +{ + CEPH_REQ(ceph_ll_write, req, 1, ans, 0); + + req.fh = ptr_value(filehandle); + req.offset = off; + req.len = len; + CEPH_BUFF_ADD(req, data, len); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_LL_WRITE, req, ans); +} + +__public int ceph_mount(struct ceph_mount_info *cmount, const char *root) +{ + CEPH_REQ(ceph_mount, req, 1, ans, 0); + + CEPH_STR_ADD(req, root, root); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_MOUNT, req, ans); +} + +__public struct dirent *ceph_readdir(struct ceph_mount_info *cmount, + struct ceph_dir_result *dirp) +{ + static struct dirent de; + int32_t err; + + CEPH_REQ(ceph_readdir, req, 0, ans, 1); + + req.dir = ptr_value(dirp); + + CEPH_BUFF_ADD(ans, &de, sizeof(de)); + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_READDIR, req, ans); + if (err < 0) { + errno = -err; + return NULL; + } + if (ans.eod) { + return NULL; + } + + return &de; +} + +__public int ceph_release(struct ceph_mount_info *cmount) +{ + CEPH_REQ(ceph_release, req, 0, ans, 0); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_RELEASE, req, ans); +} + +__public int ceph_select_filesystem(struct ceph_mount_info *cmount, + const char *fs_name) +{ + CEPH_REQ(ceph_select_filesystem, req, 1, ans, 0); + + CEPH_STR_ADD(req, fs, fs_name); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_SELECT_FILESYSTEM, req, ans); +} + +__public int ceph_unmount(struct ceph_mount_info *cmount) +{ + CEPH_REQ(ceph_unmount, req, 0, ans, 0); + + return CEPH_PROCESS(cmount, LIBCEPHFSD_OP_UNMOUNT, req, ans); +} + +__public void ceph_userperm_destroy(UserPerm *perms) +{ + CEPH_REQ(ceph_userperm_destroy, req, 0, ans, 0); + + req.userperm = ptr_value(perms); + + CEPH_RUN(&global_cmount, LIBCEPHFSD_OP_USERPERM_DESTROY, req, ans); +} + +__public UserPerm *ceph_userperm_new(uid_t uid, gid_t gid, int ngids, + gid_t *gidlist) +{ + CEPH_REQ(ceph_userperm_new, req, 1, ans, 0); + int32_t err; + + req.uid = uid; + req.gid = gid; + req.groups = ngids; + CEPH_BUFF_ADD(req, gidlist, sizeof(gid_t) * ngids); + + err = proxy_global_connect(); + if (err >= 0) { + err = CEPH_RUN(&global_cmount, LIBCEPHFSD_OP_USERPERM_NEW, req, + ans); + } + if (err >= 0) { + return value_ptr(ans.userperm); + } + + errno = -err; + + return NULL; +} + +__public const char *ceph_version(int *major, int *minor, int *patch) +{ + static char cached_version[128]; + static int32_t cached_major = -1, cached_minor, cached_patch; + + if (cached_major < 0) { + CEPH_REQ(ceph_version, req, 0, ans, 1); + int32_t err; + + CEPH_BUFF_ADD(ans, cached_version, sizeof(cached_version)); + + err = proxy_global_connect(); + if (err >= 0) { + err = CEPH_RUN(&global_cmount, LIBCEPHFSD_OP_VERSION, + req, ans); + } + + if (err < 0) { + *major = 0; + *minor = 0; + *patch = 0; + + return "Unknown"; + } + + cached_major = ans.major; + cached_minor = ans.minor; + cached_patch = ans.patch; + } + + *major = cached_major; + *minor = cached_minor; + *patch = cached_patch; + + return cached_version; +} + +__public UserPerm *ceph_mount_perms(struct ceph_mount_info *cmount) +{ + CEPH_REQ(ceph_mount_perms, req, 0, ans, 0); + int32_t err; + + err = CEPH_PROCESS(cmount, LIBCEPHFSD_OP_MOUNT_PERMS, req, ans); + if (err < 0) { + errno = -err; + return NULL; + } + + return value_ptr(ans.userperm); +} diff --git a/src/libcephfs_proxy/libcephfsd.c b/src/libcephfs_proxy/libcephfsd.c new file mode 100644 index 00000000000..ee2d99a0aae --- /dev/null +++ b/src/libcephfs_proxy/libcephfsd.c @@ -0,0 +1,1823 @@ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <getopt.h> +#include <endian.h> + +#include "include/cephfs/libcephfs.h" + +#include "proxy_manager.h" +#include "proxy_link.h" +#include "proxy_helpers.h" +#include "proxy_log.h" +#include "proxy_requests.h" +#include "proxy_mount.h" + +typedef struct _proxy_server { + proxy_link_t link; + proxy_manager_t *manager; +} proxy_server_t; + +typedef struct _proxy_client { + proxy_worker_t worker; + proxy_link_t *link; + proxy_random_t random; + void *buffer; + uint32_t buffer_size; + int32_t sd; +} proxy_client_t; + +typedef struct _proxy { + proxy_manager_t manager; + proxy_log_handler_t log_handler; + const char *socket_path; +} proxy_t; + +typedef int32_t (*proxy_handler_t)(proxy_client_t *, proxy_req_t *, + const void *data, int32_t data_size); + +/* This is used for requests that are not associated with a cmount. */ +static proxy_random_t global_random; + +static int32_t send_error(proxy_client_t *client, int32_t error) +{ + proxy_link_ans_t ans; + struct iovec iov[1]; + + iov[0].iov_base = &ans; + iov[0].iov_len = sizeof(ans); + + return proxy_link_ans_send(client->sd, error, iov, 1); +} + +static uint64_t uint64_checksum(uint64_t value) +{ + value = (value & 0xff00ff00ff00ffULL) + + ((value >> 8) & 0xff00ff00ff00ffULL); + value += value >> 16; + value += value >> 32; + + return value & 0xff; +} + +static uint64_t ptr_checksum(proxy_random_t *rnd, void *ptr) +{ + uint64_t value; + + if (ptr == NULL) { + return 0; + } + + value = (uint64_t)(uintptr_t)ptr; + /* Many current processors don't use the full 64-bits for the virtual + * address space, and Linux assigns the lower 128 TiB (47 bits) for + * user-space applications on most architectures, so the highest 8 bits + * of all valid addressess are always 0. + * + * We use this to encode a checksum in the high byte of the address to + * be able to do a verification before dereferencing the pointer, + * avoiding crashes if the client passes an invalid or corrupted pointer + * value. + * + * Alternatives like using indexes in a table or registering valid + * pointers require access to a shared data structure that will require + * thread synchronization, making it slower. */ + if ((value & 0xff00000000000007ULL) != 0) { + proxy_log(LOG_ERR, EINVAL, + "Unexpected pointer value"); + abort(); + } + + value -= uint64_checksum(value) << 56; + + return random_scramble(rnd, value); +} + +static int32_t ptr_check(proxy_random_t *rnd, uint64_t value, void **pptr) +{ + if (value == 0) { + *pptr = NULL; + return 0; + } + + value = random_unscramble(rnd, value); + + if ((uint64_checksum(value) != 0) || ((value & 7) != 0)) { + proxy_log(LOG_ERR, EFAULT, "Unexpected pointer value"); + return -EFAULT; + } + + *pptr = (void *)(uintptr_t)(value & 0xffffffffffffffULL); + + return 0; +} + +/* Macro to simplify request handling. */ +#define CEPH_COMPLETE(_client, _err, _ans) \ + ({ \ + int32_t __err = (_err); \ + if (__err < 0) { \ + __err = send_error(_client, __err); \ + } else { \ + __err = CEPH_RET(_client->sd, __err, _ans); \ + } \ + __err; \ + }) + +#ifdef PROXY_TRACE +#define TRACE(_fmt, _args...) printf(_fmt "\n", ##_args) +#else +#define TRACE(_fmt, _args...) do { } while (0) +#endif + +static int32_t libcephfsd_version(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_version, ans, 1); + const char *text; + int32_t major, minor, patch; + + text = ceph_version(&major, &minor, &patch); + TRACE("ceph_version(%d, %d, %d) -> %s", major, minor, patch, text); + + ans.major = major; + ans.minor = minor; + ans.patch = patch; + + CEPH_STR_ADD(ans, text, text); + + return CEPH_RET(client->sd, 0, ans); +} + +static int32_t libcephfsd_userperm_new(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_userperm_new, ans, 0); + UserPerm *userperm; + int32_t err; + + userperm = ceph_userperm_new(req->userperm_new.uid, + req->userperm_new.gid, + req->userperm_new.groups, (gid_t *)data); + TRACE("ceph_userperm_new(%u, %u, %u) -> %p", req->userperm_new.uid, + req->userperm_new.gid, req->userperm_new.groups, userperm); + + err = -ENOMEM; + if (userperm != NULL) { + ans.userperm = ptr_checksum(&global_random, userperm); + err = 0; + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_userperm_destroy(proxy_client_t *client, + proxy_req_t *req, const void *data, + int32_t data_size) +{ + CEPH_DATA(ceph_userperm_destroy, ans, 0); + UserPerm *perms; + int32_t err; + + err = ptr_check(&global_random, req->userperm_destroy.userperm, + (void **)&perms); + + if (err >= 0) { + ceph_userperm_destroy(perms); + TRACE("ceph_userperm_destroy(%p)", perms); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_create(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_create, ans, 0); + proxy_mount_t *mount; + const char *id; + int32_t err; + + id = CEPH_STR_GET(req->create, id, data); + + err = proxy_mount_create(&mount, id); + TRACE("ceph_create(%p, '%s') -> %d", mount, id, err); + + if (err >= 0) { + ans.cmount = ptr_checksum(&client->random, mount); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_release(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_release, ans, 0); + proxy_mount_t *mount; + int32_t err; + + err = ptr_check(&client->random, req->release.cmount, (void **)&mount); + if (err >= 0) { + err = proxy_mount_release(mount); + TRACE("ceph_release(%p) -> %d", mount, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_conf_read_file(proxy_client_t *client, + proxy_req_t *req, const void *data, + int32_t data_size) +{ + CEPH_DATA(ceph_conf_read_file, ans, 0); + proxy_mount_t *mount; + const char *path; + int32_t err; + + err = ptr_check(&client->random, req->conf_read_file.cmount, + (void **)&mount); + if (err >= 0) { + path = CEPH_STR_GET(req->conf_read_file, path, data); + + err = proxy_mount_config(mount, path); + TRACE("ceph_conf_read_file(%p, '%s') ->%d", mount, path, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_conf_get(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_conf_get, ans, 1); + proxy_mount_t *mount; + const char *option; + void *buffer; + uint32_t size; + int32_t err; + + buffer = client->buffer; + size = client->buffer_size; + if (req->conf_get.size < size) { + size = req->conf_get.size; + } + err = ptr_check(&client->random, req->conf_get.cmount, (void **)&mount); + if (err >= 0) { + option = CEPH_STR_GET(req->conf_get, option, data); + + err = proxy_mount_get(mount, option, buffer, size); + TRACE("ceph_conf_get(%p, '%s', '%s') -> %d", mount, option, + (char *)buffer, err); + + if (err >= 0) { + CEPH_DATA_ADD(ans, value, buffer, strlen(buffer) + 1); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_conf_set(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_conf_set, ans, 0); + proxy_mount_t *mount; + const char *option, *value; + int32_t err; + + err = ptr_check(&client->random, req->conf_set.cmount, (void **)&mount); + if (err >= 0) { + option = CEPH_STR_GET(req->conf_set, option, data); + value = CEPH_STR_GET(req->conf_set, value, + data + req->conf_set.option); + + err = proxy_mount_set(mount, option, value); + TRACE("ceph_conf_set(%p, '%s', '%s') -> %d", mount, option, + value, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_init(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_init, ans, 0); + proxy_mount_t *mount; + int32_t err; + + err = ptr_check(&client->random, req->init.cmount, (void **)&mount); + if (err >= 0) { + err = proxy_mount_init(mount); + TRACE("ceph_init(%p) -> %d", mount, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_select_filesystem(proxy_client_t *client, + proxy_req_t *req, const void *data, + int32_t data_size) +{ + CEPH_DATA(ceph_select_filesystem, ans, 0); + proxy_mount_t *mount; + const char *fs; + int32_t err; + + err = ptr_check(&client->random, req->select_filesystem.cmount, + (void **)&mount); + if (err >= 0) { + fs = CEPH_STR_GET(req->select_filesystem, fs, data); + + err = proxy_mount_select(mount, fs); + TRACE("ceph_select_filesystem(%p, '%s') -> %d", mount, fs, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_mount(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_mount, ans, 0); + proxy_mount_t *mount; + const char *root; + int32_t err; + + err = ptr_check(&client->random, req->mount.cmount, (void **)&mount); + if (err >= 0) { + root = CEPH_STR_GET(req->mount, root, data); + + err = proxy_mount_mount(mount, root); + TRACE("ceph_mount(%p, '%s') -> %d", mount, root, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_unmount(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_unmount, ans, 0); + proxy_mount_t *mount; + int32_t err; + + err = ptr_check(&client->random, req->unmount.cmount, (void **)&mount); + + if (err >= 0) { + err = proxy_mount_unmount(mount); + TRACE("ceph_unmount(%p) -> %d", mount, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_statfs(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_statfs, ans, 1); + struct statvfs st; + proxy_mount_t *mount; + struct Inode *inode; + int32_t err; + + err = ptr_check(&client->random, req->ll_statfs.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_statfs.inode, + (void **)&inode); + } + + if (err >= 0) { + CEPH_BUFF_ADD(ans, &st, sizeof(st)); + + err = ceph_ll_statfs(proxy_cmount(mount), inode, &st); + TRACE("ceph_ll_statfs(%p, %p) -> %d", mount, inode, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_lookup(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_lookup, ans, 1); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *parent, *out; + const char *name; + UserPerm *perms; + uint32_t want, flags; + int32_t err; + + err = ptr_check(&client->random, req->ll_lookup.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_lookup.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_lookup.userperm, + (void **)&perms); + } + if (err >= 0) { + want = req->ll_lookup.want; + flags = req->ll_lookup.flags; + name = CEPH_STR_GET(req->ll_lookup, name, data); + + CEPH_BUFF_ADD(ans, &stx, sizeof(stx)); + + if (name == NULL) { + err = proxy_log(LOG_ERR, EINVAL, + "NULL name passed to ceph_ll_lookup()"); + } else { + // Forbid going outside of the root mount point + if ((parent == mount->root) && + (strcmp(name, "..") == 0)) { + name = "."; + } + + err = ceph_ll_lookup(proxy_cmount(mount), parent, name, + &out, &stx, want, flags, perms); + } + + TRACE("ceph_ll_lookup(%p, %p, '%s', %p, %x, %x, %p) -> %d", + mount, parent, name, out, want, flags, perms, err); + + if (err >= 0) { + ans.inode = ptr_checksum(&client->random, out); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_lookup_inode(proxy_client_t *client, + proxy_req_t *req, const void *data, + int32_t data_size) +{ + CEPH_DATA(ceph_ll_lookup_inode, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + struct inodeno_t ino; + int32_t err; + + err = ptr_check(&client->random, req->ll_lookup_inode.cmount, + (void **)&mount); + if (err >= 0) { + ino = req->ll_lookup_inode.ino; + + err = ceph_ll_lookup_inode(proxy_cmount(mount), ino, &inode); + TRACE("ceph_ll_lookup_inode(%p, %lu, %p) -> %d", mount, ino.val, + inode, err); + + if (err >= 0) { + ans.inode = ptr_checksum(&client->random, inode); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_lookup_root(proxy_client_t *client, + proxy_req_t *req, const void *data, + int32_t data_size) +{ + CEPH_DATA(ceph_ll_lookup_root, ans, 0); + proxy_mount_t *mount; + int32_t err; + + err = ptr_check(&client->random, req->ll_lookup_root.cmount, + (void **)&mount); + if (err >= 0) { + /* The libcephfs view of the root of the mount could be + * different than ours, so we can't rely on + * ceph_ll_lookup_root(). We fake it by returning the cached + * root inode at the time of mount. */ + err = proxy_inode_ref(mount, mount->root_ino); + TRACE("ceph_ll_lookup_root(%p, %p) -> %d", mount, mount->root, + err); + + if (err >= 0) { + ans.inode = ptr_checksum(&client->random, mount->root); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_put(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_put, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + int32_t err; + + err = ptr_check(&client->random, req->ll_put.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_put.inode, + (void **)&inode); + } + + if (err >= 0) { + err = ceph_ll_put(proxy_cmount(mount), inode); + TRACE("ceph_ll_put(%p, %p) -> %d", mount, inode, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_walk(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_walk, ans, 1); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *inode; + const char *path; + UserPerm *perms; + uint32_t want, flags; + int32_t err; + + err = ptr_check(&client->random, req->ll_walk.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&global_random, req->ll_walk.userperm, + (void **)&perms); + } + if (err >= 0) { + want = req->ll_walk.want; + flags = req->ll_walk.flags; + path = CEPH_STR_GET(req->ll_walk, path, data); + + CEPH_BUFF_ADD(ans, &stx, sizeof(stx)); + + err = proxy_path_resolve(mount, path, &inode, &stx, want, flags, + perms, NULL); + TRACE("ceph_ll_walk(%p, '%s', %p, %x, %x, %p) -> %d", mount, + path, inode, want, flags, perms, err); + + if (err >= 0) { + ans.inode = ptr_checksum(&client->random, inode); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_chdir(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_chdir, ans, 0); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *inode; + const char *path; + char *realpath; + int32_t err; + + err = ptr_check(&client->random, req->chdir.cmount, (void **)&mount); + if (err >= 0) { + path = CEPH_STR_GET(req->chdir, path, data); + + /* Since the libcephfs mount may be shared, we can't really + * change the current directory to avoid interferences with + * other users, so we just lookup the new directory and keep an + * internal reference. */ + err = proxy_path_resolve(mount, path, &inode, &stx, + CEPH_STATX_INO, 0, mount->perms, + &realpath); + TRACE("ceph_chdir(%p, '%s') -> %d", mount, path, err); + if (err >= 0) { + ceph_ll_put(proxy_cmount(mount), mount->cwd); + mount->cwd = inode; + mount->cwd_ino = stx.stx_ino; + + /* TODO: This path may become outdated if the parent + * directories are moved, however this seems the + * best we can do for now. */ + proxy_free(mount->cwd_path); + mount->cwd_path = realpath; + mount->cwd_path_len = strlen(realpath); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_getcwd(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_getcwd, ans, 1); + proxy_mount_t *mount; + const char *path; + int32_t err; + + err = ptr_check(&client->random, req->getcwd.cmount, (void **)&mount); + + if (err >= 0) { + /* We just return the cached name from the last chdir(). */ + path = mount->cwd_path; + TRACE("ceph_getcwd(%p) -> '%s'", mount, path); + CEPH_STR_ADD(ans, path, path); + err = 0; + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_readdir(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_readdir, ans, 1); + struct dirent de; + proxy_mount_t *mount; + struct ceph_dir_result *dirp; + int32_t err; + + err = ptr_check(&client->random, req->readdir.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->readdir.dir, + (void **)&dirp); + } + + if (err >= 0) { + err = ceph_readdir_r(proxy_cmount(mount), dirp, &de); + TRACE("ceph_readdir_r(%p, %p, %p) -> %d", mount, dirp, &de, + err); + ans.eod = true; + if (err > 0) { + ans.eod = false; + CEPH_BUFF_ADD(ans, &de, + offset_of(struct dirent, d_name) + + strlen(de.d_name) + 1); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_rewinddir(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_rewinddir, ans, 0); + proxy_mount_t *mount; + struct ceph_dir_result *dirp; + int32_t err; + + err = ptr_check(&client->random, req->rewinddir.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->rewinddir.dir, + (void **)&dirp); + } + + if (err >= 0) { + ceph_rewinddir(proxy_cmount(mount), dirp); + TRACE("ceph_rewinddir(%p, %p)", mount, dirp); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_open(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_open, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + UserPerm *perms; + struct Fh *fh; + int32_t flags, err; + + err = ptr_check(&client->random, req->ll_open.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_open.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_open.userperm, + (void **)&perms); + } + if (err >= 0) { + flags = req->ll_open.flags; + + err = ceph_ll_open(proxy_cmount(mount), inode, flags, &fh, + perms); + TRACE("ceph_ll_open(%p, %p, %x, %p, %p) -> %d", mount, inode, + flags, fh, perms, err); + + if (err >= 0) { + ans.fh = ptr_checksum(&client->random, fh); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_create(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_create, ans, 1); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *parent, *inode; + struct Fh *fh; + const char *name; + UserPerm *perms; + mode_t mode; + uint32_t want, flags; + int32_t oflags, err; + + err = ptr_check(&client->random, req->ll_create.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_create.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_create.userperm, + (void **)&perms); + } + if (err >= 0) { + mode = req->ll_create.mode; + oflags = req->ll_create.oflags; + want = req->ll_create.want; + flags = req->ll_create.flags; + name = CEPH_STR_GET(req->ll_create, name, data); + + CEPH_BUFF_ADD(ans, &stx, sizeof(stx)); + + err = ceph_ll_create(proxy_cmount(mount), parent, name, mode, + oflags, &inode, &fh, &stx, want, flags, + perms); + TRACE("ceph_ll_create(%p, %p, '%s', %o, %x, %p, %p, %x, %x, " + "%p) -> %d", + mount, parent, name, mode, oflags, inode, fh, want, flags, + perms, err); + + if (err >= 0) { + ans.fh = ptr_checksum(&client->random, fh); + ans.inode = ptr_checksum(&client->random, inode); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_mknod(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_mknod, ans, 1); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *parent, *inode; + const char *name; + UserPerm *perms; + dev_t rdev; + mode_t mode; + uint32_t want, flags; + int32_t err; + + err = ptr_check(&client->random, req->ll_mknod.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_mknod.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_mknod.userperm, + (void **)&perms); + } + if (err >= 0) { + mode = req->ll_mknod.mode; + rdev = req->ll_mknod.rdev; + want = req->ll_mknod.want; + flags = req->ll_mknod.flags; + name = CEPH_STR_GET(req->ll_mknod, name, data); + + CEPH_BUFF_ADD(ans, &stx, sizeof(stx)); + + err = ceph_ll_mknod(proxy_cmount(mount), parent, name, mode, + rdev, &inode, &stx, want, flags, perms); + TRACE("ceph_ll_mknod(%p, %p, '%s', %o, %lx, %p, %x, %x, %p) -> " + "%d", + mount, parent, name, mode, rdev, inode, want, flags, + perms, err); + + if (err >= 0) { + ans.inode = ptr_checksum(&client->random, inode); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_close(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_close, ans, 0); + proxy_mount_t *mount; + struct Fh *fh; + int32_t err; + + err = ptr_check(&client->random, req->ll_close.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_close.fh, + (void **)&fh); + } + + if (err >= 0) { + err = ceph_ll_close(proxy_cmount(mount), fh); + TRACE("ceph_ll_close(%p, %p) -> %d", mount, fh, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_rename(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_rename, ans, 0); + proxy_mount_t *mount; + struct Inode *old_parent, *new_parent; + const char *old_name, *new_name; + UserPerm *perms; + int32_t err; + + err = ptr_check(&client->random, req->ll_rename.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_rename.old_parent, + (void **)&old_parent); + } + if (err >= 0) { + err = ptr_check(&client->random, req->ll_rename.new_parent, + (void **)&new_parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_rename.userperm, + (void **)&perms); + } + if (err >= 0) { + old_name = CEPH_STR_GET(req->ll_rename, old_name, data); + new_name = CEPH_STR_GET(req->ll_rename, new_name, + data + req->ll_rename.old_name); + + err = ceph_ll_rename(proxy_cmount(mount), old_parent, old_name, + new_parent, new_name, perms); + TRACE("ceph_ll_rename(%p, %p, '%s', %p, '%s', %p) -> %d", mount, + old_parent, old_name, new_parent, new_name, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_lseek(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_lseek, ans, 0); + proxy_mount_t *mount; + struct Fh *fh; + off_t offset, pos; + int32_t whence, err; + + err = ptr_check(&client->random, req->ll_lseek.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_lseek.fh, + (void **)&fh); + } + if (err >= 0) { + offset = req->ll_lseek.offset; + whence = req->ll_lseek.whence; + + pos = ceph_ll_lseek(proxy_cmount(mount), fh, offset, whence); + err = -errno; + TRACE("ceph_ll_lseek(%p, %p, %ld, %d) -> %ld (%d)", mount, fh, + offset, whence, pos, -err); + + if (pos >= 0) { + ans.offset = pos; + err = 0; + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_read(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_read, ans, 1); + proxy_mount_t *mount; + struct Fh *fh; + void *buffer; + uint64_t len; + int64_t offset; + uint32_t size; + int32_t err; + + buffer = client->buffer; + + err = ptr_check(&client->random, req->ll_read.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_read.fh, (void **)&fh); + } + if (err >= 0) { + offset = req->ll_read.offset; + len = req->ll_read.len; + + size = client->buffer_size; + if (len > size) { + buffer = proxy_malloc(len); + if (buffer == NULL) { + err = -ENOMEM; + } + } + if (err >= 0) { + err = ceph_ll_read(proxy_cmount(mount), fh, offset, len, + buffer); + TRACE("ceph_ll_read(%p, %p, %ld, %lu) -> %d", mount, fh, + offset, len, err); + + if (err >= 0) { + CEPH_BUFF_ADD(ans, buffer, err); + } + } + } + + err = CEPH_COMPLETE(client, err, ans); + + if (buffer != client->buffer) { + proxy_free(buffer); + } + + return err; +} + +static int32_t libcephfsd_ll_write(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_write, ans, 0); + proxy_mount_t *mount; + struct Fh *fh; + uint64_t len; + int64_t offset; + int32_t err; + + err = ptr_check(&client->random, req->ll_write.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_write.fh, + (void **)&fh); + } + if (err >= 0) { + offset = req->ll_write.offset; + len = req->ll_write.len; + + err = ceph_ll_write(proxy_cmount(mount), fh, offset, len, data); + TRACE("ceph_ll_write(%p, %p, %ld, %lu) -> %d", mount, fh, + offset, len, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_link(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_link, ans, 0); + proxy_mount_t *mount; + struct Inode *parent, *inode; + const char *name; + UserPerm *perms; + int32_t err; + + err = ptr_check(&client->random, req->ll_link.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_link.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&client->random, req->ll_link.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_link.userperm, + (void **)&perms); + } + if (err >= 0) { + name = CEPH_STR_GET(req->ll_link, name, data); + + err = ceph_ll_link(proxy_cmount(mount), inode, parent, name, + perms); + TRACE("ceph_ll_link(%p, %p, %p, '%s', %p) -> %d", mount, inode, + parent, name, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_unlink(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_unlink, ans, 0); + proxy_mount_t *mount; + struct Inode *parent; + const char *name; + UserPerm *perms; + int32_t err; + + err = ptr_check(&client->random, req->ll_unlink.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_unlink.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_unlink.userperm, + (void **)&perms); + } + if (err >= 0) { + name = CEPH_STR_GET(req->ll_unlink, name, data); + + err = ceph_ll_unlink(proxy_cmount(mount), parent, name, perms); + TRACE("ceph_ll_unlink(%p, %p, '%s', %p) -> %d", mount, parent, + name, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_getattr(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_getattr, ans, 1); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *inode; + UserPerm *perms; + uint32_t want, flags; + int32_t err; + + err = ptr_check(&client->random, req->ll_getattr.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_getattr.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_getattr.userperm, + (void **)&perms); + } + if (err >= 0) { + want = req->ll_getattr.want; + flags = req->ll_getattr.flags; + + CEPH_BUFF_ADD(ans, &stx, sizeof(stx)); + + err = ceph_ll_getattr(proxy_cmount(mount), inode, &stx, want, + flags, perms); + TRACE("ceph_ll_getattr(%p, %p, %x, %x, %p) -> %d", mount, inode, + want, flags, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_setattr(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_setattr, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + UserPerm *perms; + int32_t mask, err; + + err = ptr_check(&client->random, req->ll_setattr.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_setattr.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_setattr.userperm, + (void **)&perms); + } + if (err >= 0) { + mask = req->ll_setattr.mask; + + err = ceph_ll_setattr(proxy_cmount(mount), inode, (void *)data, + mask, perms); + TRACE("ceph_ll_setattr(%p, %p, %x, %p) -> %d", mount, inode, + mask, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_fallocate(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_fallocate, ans, 0); + proxy_mount_t *mount; + struct Fh *fh; + int64_t offset, len; + mode_t mode; + int32_t err; + + err = ptr_check(&client->random, req->ll_fallocate.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_fallocate.fh, + (void **)&fh); + } + if (err >= 0) { + mode = req->ll_fallocate.mode; + offset = req->ll_fallocate.offset; + len = req->ll_fallocate.length; + + err = ceph_ll_fallocate(proxy_cmount(mount), fh, mode, offset, + len); + TRACE("ceph_ll_fallocate(%p, %p, %o, %ld, %lu) -> %d", mount, + fh, mode, offset, len, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_fsync(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_fsync, ans, 0); + proxy_mount_t *mount; + struct Fh *fh; + int32_t dataonly, err; + + err = ptr_check(&client->random, req->ll_fsync.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_fsync.fh, + (void **)&fh); + } + if (err >= 0) { + dataonly = req->ll_fsync.dataonly; + + err = ceph_ll_fsync(proxy_cmount(mount), fh, dataonly); + TRACE("ceph_ll_fsync(%p, %p, %d) -> %d", mount, fh, dataonly, + err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_listxattr(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_listxattr, ans, 1); + proxy_mount_t *mount; + struct Inode *inode; + UserPerm *perms; + size_t size; + int32_t err; + + err = ptr_check(&client->random, req->ll_listxattr.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_listxattr.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_listxattr.userperm, + (void **)&perms); + } + if (err >= 0) { + size = req->ll_listxattr.size; + if (size > client->buffer_size) { + size = client->buffer_size; + } + err = ceph_ll_listxattr(proxy_cmount(mount), inode, + client->buffer, size, &size, perms); + TRACE("ceph_ll_listxattr(%p, %p, %lu, %p) -> %d", mount, inode, + size, perms, err); + + if (err >= 0) { + ans.size = size; + CEPH_BUFF_ADD(ans, client->buffer, size); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_getxattr(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_getxattr, ans, 1); + proxy_mount_t *mount; + struct Inode *inode; + const char *name; + UserPerm *perms; + size_t size; + int32_t err; + + err = ptr_check(&client->random, req->ll_getxattr.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_getxattr.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_getxattr.userperm, + (void **)&perms); + } + if (err >= 0) { + size = req->ll_getxattr.size; + name = CEPH_STR_GET(req->ll_getxattr, name, data); + + if (size > client->buffer_size) { + size = client->buffer_size; + } + err = ceph_ll_getxattr(proxy_cmount(mount), inode, name, + client->buffer, size, perms); + TRACE("ceph_ll_getxattr(%p, %p, '%s', %p) -> %d", mount, inode, + name, perms, err); + + if (err >= 0) { + CEPH_BUFF_ADD(ans, client->buffer, err); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_setxattr(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_setxattr, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + const char *name, *value; + UserPerm *perms; + size_t size; + int32_t flags, err; + + err = ptr_check(&client->random, req->ll_setxattr.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_setxattr.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_setxattr.userperm, + (void **)&perms); + } + if (err >= 0) { + name = CEPH_STR_GET(req->ll_setxattr, name, data); + value = data + req->ll_setxattr.name; + size = req->ll_setxattr.size; + flags = req->ll_setxattr.flags; + + err = ceph_ll_setxattr(proxy_cmount(mount), inode, name, value, + size, flags, perms); + TRACE("ceph_ll_setxattr(%p, %p, '%s', %p, %x, %p) -> %d", mount, + inode, name, value, flags, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_removexattr(proxy_client_t *client, + proxy_req_t *req, const void *data, + int32_t data_size) +{ + CEPH_DATA(ceph_ll_removexattr, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + const char *name; + UserPerm *perms; + int32_t err; + + err = ptr_check(&client->random, req->ll_removexattr.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_removexattr.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_removexattr.userperm, + (void **)&perms); + } + if (err >= 0) { + name = CEPH_STR_GET(req->ll_removexattr, name, data); + + err = ceph_ll_removexattr(proxy_cmount(mount), inode, name, + perms); + TRACE("ceph_ll_removexattr(%p, %p, '%s', %p) -> %d", mount, + inode, name, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_readlink(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_readlink, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + UserPerm *perms; + size_t size; + int32_t err; + + err = ptr_check(&client->random, req->ll_readlink.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_readlink.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_readlink.userperm, + (void **)&perms); + } + if (err >= 0) { + size = req->ll_readlink.size; + + if (size > client->buffer_size) { + size = client->buffer_size; + } + err = ceph_ll_readlink(proxy_cmount(mount), inode, + client->buffer, size, perms); + TRACE("ceph_ll_readlink(%p, %p, %p) -> %d", mount, inode, perms, + err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_symlink(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_symlink, ans, 1); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *parent, *inode; + UserPerm *perms; + const char *name, *value; + uint32_t want, flags; + int32_t err; + + err = ptr_check(&client->random, req->ll_symlink.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_symlink.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_symlink.userperm, + (void **)&perms); + } + if (err >= 0) { + name = CEPH_STR_GET(req->ll_symlink, name, data); + value = CEPH_STR_GET(req->ll_symlink, target, + data + req->ll_symlink.name); + want = req->ll_symlink.want; + flags = req->ll_symlink.flags; + + CEPH_BUFF_ADD(ans, &stx, sizeof(stx)); + + err = ceph_ll_symlink(proxy_cmount(mount), parent, name, value, + &inode, &stx, want, flags, perms); + TRACE("ceph_ll_symlink(%p, %p, '%s', '%s', %p, %x, %x, %p) -> " + "%d", + mount, parent, name, value, inode, want, flags, perms, + err); + + if (err >= 0) { + ans.inode = ptr_checksum(&client->random, inode); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_opendir(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_opendir, ans, 0); + proxy_mount_t *mount; + struct Inode *inode; + struct ceph_dir_result *dirp; + UserPerm *perms; + int32_t err; + + err = ptr_check(&client->random, req->ll_opendir.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_opendir.inode, + (void **)&inode); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_opendir.userperm, + (void **)&perms); + } + + if (err >= 0) { + err = ceph_ll_opendir(proxy_cmount(mount), inode, &dirp, perms); + TRACE("ceph_ll_opendir(%p, %p, %p, %p) -> %d", mount, inode, + dirp, perms, err); + + if (err >= 0) { + ans.dir = ptr_checksum(&client->random, dirp); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_mkdir(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_mkdir, ans, 1); + struct ceph_statx stx; + proxy_mount_t *mount; + struct Inode *parent, *inode; + const char *name; + UserPerm *perms; + mode_t mode; + uint32_t want, flags; + int32_t err; + + err = ptr_check(&client->random, req->ll_mkdir.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_mkdir.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_mkdir.userperm, + (void **)&perms); + } + if (err >= 0) { + mode = req->ll_mkdir.mode; + want = req->ll_mkdir.want; + flags = req->ll_mkdir.flags; + name = CEPH_STR_GET(req->ll_mkdir, name, data); + + CEPH_BUFF_ADD(ans, &stx, sizeof(stx)); + + err = ceph_ll_mkdir(proxy_cmount(mount), parent, name, mode, + &inode, &stx, want, flags, perms); + TRACE("ceph_ll_mkdir(%p, %p, '%s', %o, %p, %x, %x, %p) -> %d", + mount, parent, name, mode, inode, want, flags, perms, + err); + + if (err >= 0) { + ans.inode = ptr_checksum(&client->random, inode); + } + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_rmdir(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_ll_rmdir, ans, 0); + proxy_mount_t *mount; + struct Inode *parent; + const char *name; + UserPerm *perms; + int32_t err; + + err = ptr_check(&client->random, req->ll_rmdir.cmount, (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_rmdir.parent, + (void **)&parent); + } + if (err >= 0) { + err = ptr_check(&global_random, req->ll_rmdir.userperm, + (void **)&perms); + } + if (err >= 0) { + name = CEPH_STR_GET(req->ll_rmdir, name, data); + + err = ceph_ll_rmdir(proxy_cmount(mount), parent, name, perms); + TRACE("ceph_ll_rmdir(%p, %p, '%s', %p) -> %d", mount, parent, + name, perms, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_ll_releasedir(proxy_client_t *client, + proxy_req_t *req, const void *data, + int32_t data_size) +{ + CEPH_DATA(ceph_ll_releasedir, ans, 0); + proxy_mount_t *mount; + struct ceph_dir_result *dirp; + int32_t err; + + err = ptr_check(&client->random, req->ll_releasedir.cmount, + (void **)&mount); + if (err >= 0) { + err = ptr_check(&client->random, req->ll_releasedir.dir, + (void **)&dirp); + } + + if (err >= 0) { + err = ceph_ll_releasedir(proxy_cmount(mount), dirp); + TRACE("ceph_ll_releasedir(%p, %p) -> %d", mount, dirp, err); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static int32_t libcephfsd_mount_perms(proxy_client_t *client, proxy_req_t *req, + const void *data, int32_t data_size) +{ + CEPH_DATA(ceph_mount_perms, ans, 0); + proxy_mount_t *mount; + UserPerm *perms; + int32_t err; + + err = ptr_check(&client->random, req->mount_perms.cmount, + (void **)&mount); + if (err >= 0) { + perms = ceph_mount_perms(proxy_cmount(mount)); + TRACE("ceph_mount_perms(%p) -> %p", mount, perms); + + ans.userperm = ptr_checksum(&global_random, perms); + } + + return CEPH_COMPLETE(client, err, ans); +} + +static proxy_handler_t libcephfsd_handlers[LIBCEPHFSD_OP_TOTAL_OPS] = { + [LIBCEPHFSD_OP_VERSION] = libcephfsd_version, + [LIBCEPHFSD_OP_USERPERM_NEW] = libcephfsd_userperm_new, + [LIBCEPHFSD_OP_USERPERM_DESTROY] = libcephfsd_userperm_destroy, + [LIBCEPHFSD_OP_CREATE] = libcephfsd_create, + [LIBCEPHFSD_OP_RELEASE] = libcephfsd_release, + [LIBCEPHFSD_OP_CONF_READ_FILE] = libcephfsd_conf_read_file, + [LIBCEPHFSD_OP_CONF_GET] = libcephfsd_conf_get, + [LIBCEPHFSD_OP_CONF_SET] = libcephfsd_conf_set, + [LIBCEPHFSD_OP_INIT] = libcephfsd_init, + [LIBCEPHFSD_OP_SELECT_FILESYSTEM] = libcephfsd_select_filesystem, + [LIBCEPHFSD_OP_MOUNT] = libcephfsd_mount, + [LIBCEPHFSD_OP_UNMOUNT] = libcephfsd_unmount, + [LIBCEPHFSD_OP_LL_STATFS] = libcephfsd_ll_statfs, + [LIBCEPHFSD_OP_LL_LOOKUP] = libcephfsd_ll_lookup, + [LIBCEPHFSD_OP_LL_LOOKUP_INODE] = libcephfsd_ll_lookup_inode, + [LIBCEPHFSD_OP_LL_LOOKUP_ROOT] = libcephfsd_ll_lookup_root, + [LIBCEPHFSD_OP_LL_PUT] = libcephfsd_ll_put, + [LIBCEPHFSD_OP_LL_WALK] = libcephfsd_ll_walk, + [LIBCEPHFSD_OP_CHDIR] = libcephfsd_chdir, + [LIBCEPHFSD_OP_GETCWD] = libcephfsd_getcwd, + [LIBCEPHFSD_OP_READDIR] = libcephfsd_readdir, + [LIBCEPHFSD_OP_REWINDDIR] = libcephfsd_rewinddir, + [LIBCEPHFSD_OP_LL_OPEN] = libcephfsd_ll_open, + [LIBCEPHFSD_OP_LL_CREATE] = libcephfsd_ll_create, + [LIBCEPHFSD_OP_LL_MKNOD] = libcephfsd_ll_mknod, + [LIBCEPHFSD_OP_LL_CLOSE] = libcephfsd_ll_close, + [LIBCEPHFSD_OP_LL_RENAME] = libcephfsd_ll_rename, + [LIBCEPHFSD_OP_LL_LSEEK] = libcephfsd_ll_lseek, + [LIBCEPHFSD_OP_LL_READ] = libcephfsd_ll_read, + [LIBCEPHFSD_OP_LL_WRITE] = libcephfsd_ll_write, + [LIBCEPHFSD_OP_LL_LINK] = libcephfsd_ll_link, + [LIBCEPHFSD_OP_LL_UNLINK] = libcephfsd_ll_unlink, + [LIBCEPHFSD_OP_LL_GETATTR] = libcephfsd_ll_getattr, + [LIBCEPHFSD_OP_LL_SETATTR] = libcephfsd_ll_setattr, + [LIBCEPHFSD_OP_LL_FALLOCATE] = libcephfsd_ll_fallocate, + [LIBCEPHFSD_OP_LL_FSYNC] = libcephfsd_ll_fsync, + [LIBCEPHFSD_OP_LL_LISTXATTR] = libcephfsd_ll_listxattr, + [LIBCEPHFSD_OP_LL_GETXATTR] = libcephfsd_ll_getxattr, + [LIBCEPHFSD_OP_LL_SETXATTR] = libcephfsd_ll_setxattr, + [LIBCEPHFSD_OP_LL_REMOVEXATTR] = libcephfsd_ll_removexattr, + [LIBCEPHFSD_OP_LL_READLINK] = libcephfsd_ll_readlink, + [LIBCEPHFSD_OP_LL_SYMLINK] = libcephfsd_ll_symlink, + [LIBCEPHFSD_OP_LL_OPENDIR] = libcephfsd_ll_opendir, + [LIBCEPHFSD_OP_LL_MKDIR] = libcephfsd_ll_mkdir, + [LIBCEPHFSD_OP_LL_RMDIR] = libcephfsd_ll_rmdir, + [LIBCEPHFSD_OP_LL_RELEASEDIR] = libcephfsd_ll_releasedir, + [LIBCEPHFSD_OP_MOUNT_PERMS] = libcephfsd_mount_perms, +}; + +static void serve_binary(proxy_client_t *client) +{ + proxy_req_t req; + CEPH_DATA(hello, ans, 0); + struct iovec req_iov[2]; + void *buffer; + uint32_t size; + int32_t err; + + /* This buffer will be used by most of the requests. For requests that + * require more space (probably just some writes), a new temporary + * buffer will be allocated by proxy_link_req_recv() code. */ + size = 65536; + buffer = proxy_malloc(size); + if (buffer == NULL) { + return; + } + + ans.major = LIBCEPHFSD_MAJOR; + ans.minor = LIBCEPHFSD_MINOR; + err = proxy_link_send(client->sd, ans_iov, ans_count); + if (err < 0) { + proxy_free(buffer); + return; + } + + while (true) { + req_iov[0].iov_base = &req; + req_iov[0].iov_len = sizeof(req); + req_iov[1].iov_base = buffer; + req_iov[1].iov_len = size; + + err = proxy_link_req_recv(client->sd, req_iov, 2); + if (err > 0) { + if (req.header.op >= LIBCEPHFSD_OP_TOTAL_OPS) { + err = send_error(client, -ENOSYS); + } else if (libcephfsd_handlers[req.header.op] == NULL) { + err = send_error(client, -EOPNOTSUPP); + } else { + err = libcephfsd_handlers[req.header.op]( + client, &req, req_iov[1].iov_base, + req.header.data_len); + } + } + + if (req_iov[1].iov_base != buffer) { + /* Free the buffer if it was temporarily allocated. */ + proxy_free(req_iov[1].iov_base); + } + + if (err < 0) { + break; + } + } + + proxy_free(buffer); +} + +static void serve_connection(proxy_worker_t *worker) +{ + CEPH_DATA(hello, req, 0); + proxy_client_t *client; + int32_t err; + + client = container_of(worker, proxy_client_t, worker); + + err = proxy_link_recv(client->sd, req_iov, req_count); + if (err >= 0) { + if (req.id == LIBCEPHFS_LIB_CLIENT) { + serve_binary(client); + } else { + proxy_log(LOG_ERR, EINVAL, + "Invalid client initial message"); + } + } + + close(client->sd); +} + +static void destroy_connection(proxy_worker_t *worker) +{ + proxy_client_t *client; + + client = container_of(worker, proxy_client_t, worker); + + proxy_free(client->buffer); + proxy_free(client); +} + +static int32_t accept_connection(proxy_link_t *link, int32_t sd) +{ + proxy_server_t *server; + proxy_client_t *client; + int32_t err; + + server = container_of(link, proxy_server_t, link); + + client = proxy_malloc(sizeof(proxy_client_t)); + if (client == NULL) { + err = -ENOMEM; + goto failed_close; + } + + client->buffer_size = 65536; + client->buffer = proxy_malloc(client->buffer_size); + if (client->buffer == NULL) { + err = -ENOMEM; + goto failed_client; + } + + random_init(&client->random); + client->sd = sd; + client->link = link; + + /* TODO: Make request management asynchronous and avoid creating a + * thread for each connection. */ + err = proxy_manager_launch(server->manager, &client->worker, + serve_connection, destroy_connection); + if (err < 0) { + goto failed_buffer; + } + + return 0; + +failed_buffer: + proxy_free(client->buffer); + +failed_client: + proxy_free(client); + +failed_close: + close(sd); + + return err; +} + +static bool check_stop(proxy_link_t *link) +{ + proxy_server_t *server; + + server = container_of(link, proxy_server_t, link); + + return proxy_manager_stop(server->manager); +} + +static int32_t server_start(proxy_manager_t *manager) +{ + proxy_server_t server; + proxy_t *proxy; + + proxy = container_of(manager, proxy_t, manager); + + server.manager = manager; + + return proxy_link_server(&server.link, proxy->socket_path, + accept_connection, check_stop); +} + +static void log_print(proxy_log_handler_t *handler, int32_t level, int32_t err, + const char *msg) +{ + printf("[%d] %s\n", level, msg); +} + +static struct option main_opts[] = { + {"socket", required_argument, NULL, 's'}, + {} +}; + +int32_t main(int32_t argc, char *argv[]) +{ + struct timespec now; + proxy_t proxy; + char *env; + int32_t err, val; + + clock_gettime(CLOCK_MONOTONIC, &now); + srand(now.tv_nsec); + + random_init(&global_random); + + proxy_log_register(&proxy.log_handler, log_print); + + proxy.socket_path = PROXY_SOCKET; + + env = getenv(PROXY_SOCKET_ENV); + if (env != NULL) { + proxy.socket_path = env; + } + + while ((val = getopt_long(argc, argv, ":s:", main_opts, NULL)) >= 0) { + if (val == 's') { + proxy.socket_path = optarg; + } else if (val == ':') { + proxy_log(LOG_ERR, ENODATA, + "Argument missing for '%s'\n", optopt); + return 1; + } else if (val == '?') { + proxy_log(LOG_ERR, EINVAL, + "Unknown option '%s'\n", optopt); + return 1; + } else { + proxy_log(LOG_ERR, EINVAL, + "Unexpected error parsing the options\n"); + return 1; + } + } + if (optind < argc) { + proxy_log(LOG_ERR, EINVAL, + "Unexpected arguments in command line"); + return 1; + } + + err = proxy_manager_run(&proxy.manager, server_start); + + proxy_log_deregister(&proxy.log_handler); + + return err < 0 ? 1 : 0; +} diff --git a/src/libcephfs_proxy/proxy.h b/src/libcephfs_proxy/proxy.h new file mode 100644 index 00000000000..cfb69072f19 --- /dev/null +++ b/src/libcephfs_proxy/proxy.h @@ -0,0 +1,67 @@ + +#ifndef __LIBCEPHFSD_PROXY_H__ +#define __LIBCEPHFSD_PROXY_H__ + +#include <string.h> +#include <errno.h> +#include <stdarg.h> +#include <stdint.h> +#include <stdbool.h> + +#define LIBCEPHFSD_MAJOR 0 +#define LIBCEPHFSD_MINOR 2 + +#define LIBCEPHFS_LIB_CLIENT 0xe3e5f0e8 // 'ceph' xor 0x80808080 + +#define PROXY_SOCKET "/run/libcephfsd.sock" +#define PROXY_SOCKET_ENV "LIBCEPHFSD_SOCKET" + +#define offset_of(_type, _field) ((uintptr_t) & ((_type *)0)->_field) + +#define container_of(_ptr, _type, _field) \ + ((_type *)((uintptr_t)(_ptr) - offset_of(_type, _field))) + +struct _list; +typedef struct _list list_t; + +struct _proxy_buffer_ops; +typedef struct _proxy_buffer_ops proxy_buffer_ops_t; + +struct _proxy_buffer; +typedef struct _proxy_buffer proxy_buffer_t; + +struct _proxy_output; +typedef struct _proxy_output proxy_output_t; + +struct _proxy_log_handler; +typedef struct _proxy_log_handler proxy_log_handler_t; + +struct _proxy_worker; +typedef struct _proxy_worker proxy_worker_t; + +struct _proxy_manager; +typedef struct _proxy_manager proxy_manager_t; + +struct _proxy_link; +typedef struct _proxy_link proxy_link_t; + +typedef int32_t (*proxy_output_write_t)(proxy_output_t *); +typedef int32_t (*proxy_output_full_t)(proxy_output_t *); + +typedef void (*proxy_log_callback_t)(proxy_log_handler_t *, int32_t, int32_t, + const char *); + +typedef void (*proxy_worker_start_t)(proxy_worker_t *); +typedef void (*proxy_worker_destroy_t)(proxy_worker_t *); + +typedef int32_t (*proxy_manager_start_t)(proxy_manager_t *); + +typedef int32_t (*proxy_link_start_t)(proxy_link_t *, int32_t); +typedef bool (*proxy_link_stop_t)(proxy_link_t *); + +struct _list { + list_t *next; + list_t *prev; +}; + +#endif diff --git a/src/libcephfs_proxy/proxy_helpers.c b/src/libcephfs_proxy/proxy_helpers.c new file mode 100644 index 00000000000..149d84d34bb --- /dev/null +++ b/src/libcephfs_proxy/proxy_helpers.c @@ -0,0 +1,81 @@ + +#include "proxy_helpers.h" + +#include <openssl/evp.h> + +static const char hex_digits[] = "0123456789abcdef"; + +int32_t proxy_hash(uint8_t *hash, size_t size, + int32_t (*feed)(void **, void *, int32_t), void *data) +{ + EVP_MD_CTX *ctx; + void *ptr; + uint32_t bytes; + int32_t i, err, len; + + if (size < 32) { + return proxy_log(LOG_ERR, ENOBUFS, + "Digest buffer is too small"); + } + + ctx = EVP_MD_CTX_new(); + if (ctx == NULL) { + return proxy_log(LOG_ERR, ENOMEM, "EVP_MD_CTX_new() failed"); + } + + if (!EVP_DigestInit_ex2(ctx, EVP_sha256(), NULL)) { + err = proxy_log(LOG_ERR, ENOMEM, "EVP_DigestInit_ex2() failed"); + goto done; + } + + i = 0; + while ((len = feed(&ptr, data, i)) > 0) { + if (!EVP_DigestUpdate(ctx, ptr, len)) { + err = proxy_log(LOG_ERR, ENOMEM, + "EVP_DigestUpdate() failed"); + goto done; + } + i++; + } + if (len < 0) { + err = len; + goto done; + } + + if (!EVP_DigestFinal_ex(ctx, hash, &bytes)) { + err = proxy_log(LOG_ERR, ENOMEM, "EVP_DigestFinal_ex() failed"); + goto done; + } + + err = 0; + +done: + EVP_MD_CTX_free(ctx); + + return err; +} + +int32_t proxy_hash_hex(char *digest, size_t size, + int32_t (*feed)(void **, void *, int32_t), void *data) +{ + uint8_t hash[32]; + int32_t i, err; + + if (size < 65) { + return proxy_log(LOG_ERR, ENOBUFS, + "Digest buffer is too small"); + } + + err = proxy_hash(hash, sizeof(hash), feed, data); + if (err < 0) { + return err; + } + + for (i = 0; i < 32; i++) { + *digest++ = hex_digits[hash[i] >> 4]; + *digest++ = hex_digits[hash[i] & 15]; + } + *digest = 0; + + return 0; +} diff --git a/src/libcephfs_proxy/proxy_helpers.h b/src/libcephfs_proxy/proxy_helpers.h new file mode 100644 index 00000000000..b4f58e7e3b3 --- /dev/null +++ b/src/libcephfs_proxy/proxy_helpers.h @@ -0,0 +1,311 @@ + +#ifndef __LIBCEPHFS_PROXY_HELPERS_H__ +#define __LIBCEPHFS_PROXY_HELPERS_H__ + +#include <stdlib.h> +#include <signal.h> +#include <pthread.h> +#include <stdint.h> +#include <stdbool.h> + +#include "proxy_log.h" + +#define __public __attribute__((__visibility__("default"))) + +#define ptr_value(_ptr) ((uint64_t)(uintptr_t)(_ptr)) +#define value_ptr(_val) ((void *)(uintptr_t)(_val)) + +typedef struct _proxy_random { + uint64_t mask; + uint64_t factor; + uint64_t factor_inv; + uint64_t shift; +} proxy_random_t; + +/* Generate a 64-bits random number different than 0. */ +static inline uint64_t random_u64(void) +{ + uint64_t value; + int32_t i; + + do { + value = 0; + for (i = 0; i < 4; i++) { + value <<= 16; + value ^= (random() >> 8) & 0xffff; + } + } while (value == 0); + + return value; +} + +/* Randomly initialize the data used to scramble pointers. */ +static inline void random_init(proxy_random_t *rnd) +{ + uint64_t inv; + + rnd->mask = random_u64(); + + /* Generate an odd multiplicative factor different than 1. */ + do { + rnd->factor = random_u64() | 1; + } while (rnd->factor == 1); + + /* Compute the inverse of 'factor' modulo 2^64. */ + inv = rnd->factor & 0x3; + inv *= 0x000000012 - rnd->factor * inv; + inv *= 0x000000102 - rnd->factor * inv; + inv *= 0x000010002 - rnd->factor * inv; + inv *= 0x100000002 - rnd->factor * inv; + rnd->factor_inv = inv * (2 - rnd->factor * inv); + + rnd->shift = random_u64(); +} + +/* Obfuscate a pointer. */ +static inline uint64_t random_scramble(proxy_random_t *rnd, uint64_t value) +{ + uint32_t bits; + + bits = __builtin_popcountll(value); + + /* rnd->shift is rotated by the amount of bits set to 1 in the original + * value, and the lowest 6 bits are extracted. This generates a + * pseudo-random number that depends on the number of bits of the + * value. */ + bits = ((rnd->shift >> bits) | (rnd->shift << (64 - bits))) & 0x3f; + + /* The value is rotated by the amount just computed. */ + value = (value << bits) | (value >> (64 - bits)); + + /* The final result is masked with a random number. */ + value ^= rnd->mask; + + /* And multiplied by a random factor modulo 2^64. */ + return value * rnd->factor; +} + +/* Recover a pointer. */ +static inline uint64_t random_unscramble(proxy_random_t *rnd, uint64_t value) +{ + uint32_t bits; + + /* Divide by the random factor (i.e. multiply by the inverse of the + * factor). */ + value *= rnd->factor_inv; + + /* Remove the mask. */ + value ^= rnd->mask; + + /* Get the number of bits the pointer was rotated. */ + bits = __builtin_popcountll(value); + bits = ((rnd->shift >> bits) | (rnd->shift << (64 - bits))) & 0x3f; + + /* Undo the rotation to recover the original value. */ + return (value >> bits) | (value << (64 - bits)); +} + +static inline void *proxy_malloc(size_t size) +{ + void *ptr; + + ptr = malloc(size); + if (ptr == NULL) { + proxy_log(LOG_ERR, errno, "Failed to allocate memory"); + } + + return ptr; +} + +static inline int32_t proxy_realloc(void **pptr, size_t size) +{ + void *ptr; + + ptr = realloc(*pptr, size); + if (ptr == NULL) { + return proxy_log(LOG_ERR, errno, "Failed to reallocate memory"); + } + + *pptr = ptr; + + return 0; +} + +static inline void proxy_free(void *ptr) +{ + free(ptr); +} + +static inline char *proxy_strdup(const char *str) +{ + char *ptr; + + ptr = strdup(str); + if (ptr == NULL) { + proxy_log(LOG_ERR, errno, "Failed to copy a string"); + return NULL; + } + + return ptr; +} + +static inline int32_t proxy_mutex_init(pthread_mutex_t *mutex) +{ + int32_t err; + + err = pthread_mutex_init(mutex, NULL); + if (err != 0) { + return proxy_log(LOG_ERR, err, "Failed to initialize a mutex"); + } + + return 0; +} + +static inline void proxy_mutex_lock(pthread_mutex_t *mutex) +{ + int32_t err; + + err = pthread_mutex_lock(mutex); + if (err != 0) { + proxy_abort(err, "Mutex cannot be acquired"); + } +} + +static inline void proxy_mutex_unlock(pthread_mutex_t *mutex) +{ + int32_t err; + + err = pthread_mutex_unlock(mutex); + if (err != 0) { + proxy_abort(err, "Mutex cannot be released"); + } +} + +static inline int32_t proxy_rwmutex_init(pthread_rwlock_t *mutex) +{ + int32_t err; + + err = pthread_rwlock_init(mutex, NULL); + if (err != 0) { + return proxy_log(LOG_ERR, err, + "Failed to initialize a rwmutex"); + } + + return 0; +} + +static inline void proxy_rwmutex_rdlock(pthread_rwlock_t *mutex) +{ + int32_t err; + + err = pthread_rwlock_rdlock(mutex); + if (err != 0) { + proxy_abort(err, "RWMutex cannot be acquired for read"); + } +} + +static inline void proxy_rwmutex_wrlock(pthread_rwlock_t *mutex) +{ + int32_t err; + + err = pthread_rwlock_wrlock(mutex); + if (err != 0) { + proxy_abort(err, "RWMutex cannot be acquired for write"); + } +} + +static inline void proxy_rwmutex_unlock(pthread_rwlock_t *mutex) +{ + int32_t err; + + err = pthread_rwlock_unlock(mutex); + if (err != 0) { + proxy_abort(err, "RWMutex cannot be released"); + } +} + +static inline int32_t proxy_condition_init(pthread_cond_t *condition) +{ + int32_t err; + + err = pthread_cond_init(condition, NULL); + if (err != 0) { + return proxy_log(LOG_ERR, err, + "Failed to initialize a condition variable"); + } + + return 0; +} + +static inline void proxy_condition_signal(pthread_cond_t *condition) +{ + int32_t err; + + err = pthread_cond_signal(condition); + if (err != 0) { + proxy_abort(err, "Condition variable cannot be signaled"); + } +} + +static inline void proxy_condition_wait(pthread_cond_t *condition, + pthread_mutex_t *mutex) +{ + int32_t err; + + err = pthread_cond_wait(condition, mutex); + if (err != 0) { + proxy_abort(err, "Condition variable cannot be waited"); + } +} + +static inline int32_t proxy_thread_create(pthread_t *tid, + void *(*start)(void *), void *arg) +{ + int32_t err; + + err = pthread_create(tid, NULL, start, arg); + if (err != 0) { + proxy_log(LOG_ERR, err, "Failed to create a thread"); + } + + return err; +} + +static inline void proxy_thread_kill(pthread_t tid, int32_t signum) +{ + int32_t err; + + err = pthread_kill(tid, signum); + if (err != 0) { + proxy_abort(err, "Failed to send a signal to a thread"); + } +} + +static inline void proxy_thread_join(pthread_t tid) +{ + int32_t err; + + err = pthread_join(tid, NULL); + if (err != 0) { + proxy_log(LOG_ERR, err, "Unable to join a thread"); + } +} + +static inline int32_t proxy_signal_set(int32_t signum, struct sigaction *action, + struct sigaction *old) +{ + if (sigaction(signum, action, old) < 0) { + return proxy_log(LOG_ERR, errno, + "Failed to configure a signal"); + } + + return 0; +} + +int32_t proxy_hash(uint8_t *hash, size_t size, + int32_t (*feed)(void **, void *, int32_t), void *data); + +int32_t proxy_hash_hex(char *digest, size_t size, + int32_t (*feed)(void **, void *, int32_t), void *data); + +#endif diff --git a/src/libcephfs_proxy/proxy_link.c b/src/libcephfs_proxy/proxy_link.c new file mode 100644 index 00000000000..20d9086ffa9 --- /dev/null +++ b/src/libcephfs_proxy/proxy_link.c @@ -0,0 +1,421 @@ + +#include <stdio.h> +#include <unistd.h> +#include <sys/uio.h> + +#include "proxy_link.h" +#include "proxy_manager.h" +#include "proxy_helpers.h" +#include "proxy_log.h" + +static int32_t iov_length(struct iovec *iov, int32_t count) +{ + int32_t len; + + len = 0; + while (count > 0) { + len += iov->iov_len; + iov++; + count--; + } + + return len; +} + +static int32_t proxy_link_prepare(struct sockaddr_un *addr, const char *path) +{ + struct sigaction action; + int32_t sd, len, err; + + memset(&action, 0, sizeof(action)); + action.sa_handler = SIG_IGN; + err = proxy_signal_set(SIGPIPE, &action, NULL); + if (err < 0) { + return err; + } + + memset(addr, 0, sizeof(*addr)); + addr->sun_family = AF_UNIX; + len = snprintf(addr->sun_path, sizeof(addr->sun_path), "%s", path); + if (len < 0) { + return proxy_log(LOG_ERR, EINVAL, + "Failed to copy Unix socket path"); + } + if (len >= sizeof(addr->sun_path)) { + return proxy_log(LOG_ERR, ENAMETOOLONG, + "Unix socket path too long"); + } + + sd = socket(AF_UNIX, SOCK_STREAM, 0); + if (sd < 0) { + return proxy_log(LOG_ERR, errno, + "Failed to create a Unix socket"); + } + + return sd; +} + +int32_t proxy_link_client(proxy_link_t *link, const char *path, + proxy_link_stop_t stop) +{ + struct sockaddr_un addr; + int32_t sd, err; + + link->stop = stop; + link->sd = -1; + + sd = proxy_link_prepare(&addr, path); + if (sd < 0) { + return sd; + } + + err = 0; + while (err >= 0) { + if (connect(sd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + if (errno == EINTR) { + continue; + } + + err = proxy_log(LOG_ERR, errno, + "Failed to connect to libcephfsd"); + } else { + link->sd = sd; + return sd; + } + } + + close(sd); + + return err; +} + +void proxy_link_close(proxy_link_t *link) +{ + close(link->sd); + link->sd = -1; +} + +int32_t proxy_link_server(proxy_link_t *link, const char *path, + proxy_link_start_t start, proxy_link_stop_t stop) +{ + struct sockaddr_un addr; + socklen_t len; + int32_t cd, err; + + link->stop = stop; + link->sd = -1; + + err = proxy_link_prepare(&addr, path); + if (err < 0) { + return err; + } + link->sd = err; + + if ((unlink(path) < 0) && (errno != ENOENT) && (errno != ENOTDIR)) { + err = proxy_log(LOG_ERR, errno, + "Failed to remove existing socket"); + goto done; + } + + if (bind(link->sd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + err = proxy_log(LOG_ERR, errno, "Failed to bind Unix socket"); + goto done; + } + + if (listen(link->sd, SOMAXCONN) < 0) { + err = proxy_log(LOG_ERR, errno, + "Failed to listen from Unix socket"); + goto done; + } + + while (!stop(link)) { + len = sizeof(addr); + cd = accept(link->sd, (struct sockaddr *)&addr, &len); + if (cd < 0) { + if (errno != EINTR) { + proxy_log(LOG_ERR, errno, + "Failed to accept a connection"); + } + } else { + start(link, cd); + } + } + + err = 0; + +done: + close(link->sd); + + return err; +} + +int32_t proxy_link_read(proxy_link_t *link, int32_t sd, void *buffer, + int32_t size) +{ + ssize_t len; + + do { + len = read(sd, buffer, size); + if (len < 0) { + if (errno == EINTR) { + if (link->stop(link)) { + return -EINTR; + } + continue; + } + return proxy_log(LOG_ERR, errno, + "Failed to read from socket"); + } + } while (len < 0); + + return len; +} + +int32_t proxy_link_write(proxy_link_t *link, int32_t sd, void *buffer, + int32_t size) +{ + ssize_t len; + int32_t total; + + total = size; + while (total > 0) { + len = write(sd, buffer, total); + if (len < 0) { + if (errno == EINTR) { + if (link->stop(link)) { + return -EINTR; + } + continue; + } + return proxy_log(LOG_ERR, errno, + "Failed to write to socket"); + } + if (len == 0) { + return proxy_log(LOG_ERR, ENOBUFS, + "No data written to socket"); + } + + buffer += len; + total -= len; + } + + return size; +} + +int32_t proxy_link_send(int32_t sd, struct iovec *iov, int32_t count) +{ + struct iovec iov_copy[count]; + ssize_t len; + int32_t total; + + memcpy(iov_copy, iov, sizeof(struct iovec) * count); + iov = iov_copy; + + total = 0; + while (count > 0) { + len = writev(sd, iov, count); + if (len < 0) { + return proxy_log(LOG_ERR, errno, "Failed to send data"); + } + if (len == 0) { + return proxy_log(LOG_ERR, ENOBUFS, "Partial write"); + } + total += len; + + while ((count > 0) && (iov->iov_len <= len)) { + len -= iov->iov_len; + iov++; + count--; + } + + if (count > 0) { + iov->iov_base += len; + iov->iov_len -= len; + } + } + + return total; +} + +int32_t proxy_link_recv(int32_t sd, struct iovec *iov, int32_t count) +{ + struct iovec iov_copy[count]; + ssize_t len; + int32_t total; + + memcpy(iov_copy, iov, sizeof(struct iovec) * count); + iov = iov_copy; + + total = 0; + while (count > 0) { + len = readv(sd, iov, count); + if (len < 0) { + return proxy_log(LOG_ERR, errno, + "Failed to receive data"); + } + if (len == 0) { + return proxy_log(LOG_ERR, ENODATA, "Partial read"); + } + total += len; + + while ((count > 0) && (iov->iov_len <= len)) { + len -= iov->iov_len; + iov++; + count--; + } + + if (count > 0) { + iov->iov_base += len; + iov->iov_len -= len; + } + } + + return total; +} + +int32_t proxy_link_req_send(int32_t sd, int32_t op, struct iovec *iov, + int32_t count) +{ + proxy_link_req_t *req; + + req = iov[0].iov_base; + + req->header_len = iov[0].iov_len; + req->op = op; + req->data_len = iov_length(iov + 1, count - 1); + + return proxy_link_send(sd, iov, count); +} + +int32_t proxy_link_req_recv(int32_t sd, struct iovec *iov, int32_t count) +{ + proxy_link_req_t *req; + void *buffer; + int32_t err, len, total; + + len = iov->iov_len; + iov->iov_len = sizeof(proxy_link_req_t); + err = proxy_link_recv(sd, iov, 1); + if (err < 0) { + return err; + } + total = err; + + req = iov->iov_base; + + if (req->data_len > 0) { + if (count == 1) { + return proxy_log(LOG_ERR, ENOBUFS, + "Request data is too long"); + } + if (iov[1].iov_len < req->data_len) { + buffer = proxy_malloc(req->data_len); + if (buffer == NULL) { + return -ENOMEM; + } + iov[1].iov_base = buffer; + } + iov[1].iov_len = req->data_len; + } else { + count = 1; + } + + if (req->header_len > sizeof(proxy_link_req_t)) { + if (len < req->header_len) { + return proxy_log(LOG_ERR, ENOBUFS, + "Request is too long"); + } + iov->iov_base += sizeof(proxy_link_req_t); + iov->iov_len = req->header_len - sizeof(proxy_link_req_t); + } else { + iov++; + count--; + if (count == 0) { + return total; + } + } + + err = proxy_link_recv(sd, iov, count); + if (err < 0) { + return err; + } + + return total + err; +} + +int32_t proxy_link_ans_send(int32_t sd, int32_t result, struct iovec *iov, + int32_t count) +{ + proxy_link_ans_t *ans; + + ans = iov->iov_base; + + ans->header_len = iov->iov_len; + ans->flags = 0; + ans->result = result; + ans->data_len = iov_length(iov + 1, count - 1); + + return proxy_link_send(sd, iov, count); +} + +int32_t proxy_link_ans_recv(int32_t sd, struct iovec *iov, int32_t count) +{ + proxy_link_ans_t *ans; + int32_t err, len, total; + + len = iov->iov_len; + iov->iov_len = sizeof(proxy_link_ans_t); + err = proxy_link_recv(sd, iov, 1); + if (err < 0) { + return err; + } + total = err; + + ans = iov->iov_base; + + if (ans->data_len > 0) { + if ((count == 1) || (iov[1].iov_len < ans->data_len)) { + return proxy_log(LOG_ERR, ENOBUFS, + "Answer data is too long"); + } + iov[1].iov_len = ans->data_len; + } else { + count = 1; + } + + if (ans->header_len > sizeof(proxy_link_ans_t)) { + if (len < ans->header_len) { + return proxy_log(LOG_ERR, ENOBUFS, + "Answer is too long"); + } + iov->iov_base += sizeof(proxy_link_ans_t); + iov->iov_len = ans->header_len - sizeof(proxy_link_ans_t); + } else { + iov++; + count--; + if (count == 0) { + return total; + } + } + + err = proxy_link_recv(sd, iov, count); + if (err < 0) { + return err; + } + + return total + err; +} + +int32_t proxy_link_request(int32_t sd, int32_t op, struct iovec *req_iov, + int32_t req_count, struct iovec *ans_iov, + int32_t ans_count) +{ + int32_t err; + + err = proxy_link_req_send(sd, op, req_iov, req_count); + if (err < 0) { + return err; + } + + return proxy_link_ans_recv(sd, ans_iov, ans_count); +} diff --git a/src/libcephfs_proxy/proxy_link.h b/src/libcephfs_proxy/proxy_link.h new file mode 100644 index 00000000000..01a32d94377 --- /dev/null +++ b/src/libcephfs_proxy/proxy_link.h @@ -0,0 +1,67 @@ + +#ifndef __LIBCEPHFS_PROXY_LINK_H__ +#define __LIBCEPHFS_PROXY_LINK_H__ + +#include <sys/socket.h> +#include <sys/un.h> + +#include "proxy.h" + +#define PROXY_LINK_DISCONNECTED { NULL, -1 } + +struct _proxy_link { + proxy_link_stop_t stop; + int32_t sd; +}; + +typedef struct _proxy_link_req { + uint16_t header_len; + uint16_t op; + uint32_t data_len; +} proxy_link_req_t; + +typedef struct _proxy_link_ans { + uint16_t header_len; + uint16_t flags; + int32_t result; + uint32_t data_len; +} proxy_link_ans_t; + +static inline bool proxy_link_is_connected(proxy_link_t *link) +{ + return link->sd >= 0; +} + +int32_t proxy_link_client(proxy_link_t *link, const char *path, + proxy_link_stop_t stop); + +void proxy_link_close(proxy_link_t *link); + +int32_t proxy_link_server(proxy_link_t *link, const char *path, + proxy_link_start_t start, proxy_link_stop_t stop); + +int32_t proxy_link_read(proxy_link_t *link, int32_t sd, void *buffer, + int32_t size); + +int32_t proxy_link_write(proxy_link_t *link, int32_t sd, void *buffer, + int32_t size); + +int32_t proxy_link_send(int32_t sd, struct iovec *iov, int32_t count); + +int32_t proxy_link_recv(int32_t sd, struct iovec *iov, int32_t count); + +int32_t proxy_link_req_send(int32_t sd, int32_t op, struct iovec *iov, + int32_t count); + +int32_t proxy_link_req_recv(int32_t sd, struct iovec *iov, int32_t count); + +int32_t proxy_link_ans_send(int32_t sd, int32_t result, struct iovec *iov, + int32_t count); + +int32_t proxy_link_ans_recv(int32_t sd, struct iovec *iov, int32_t count); + +int32_t proxy_link_request(int32_t sd, int32_t op, struct iovec *req_iov, + int32_t req_count, struct iovec *ans_iov, + int32_t ans_count); + +#endif diff --git a/src/libcephfs_proxy/proxy_list.h b/src/libcephfs_proxy/proxy_list.h new file mode 100644 index 00000000000..3dcb2ff9791 --- /dev/null +++ b/src/libcephfs_proxy/proxy_list.h @@ -0,0 +1,121 @@ + +#ifndef __LIBCEPHFS_PROXY_LIST_H__ +#define __LIBCEPHFS_PROXY_LIST_H__ + +#include "proxy.h" + +#define LIST_INIT(_list) { _list, _list } + +#define list_entry(_ptr, _type, _field) container_of(_ptr, _type, _field) + +#define list_first_entry(_list, _type, _field) \ + list_entry((_list)->next, _type, _field) + +#define list_last_entry(_list, _type, _field) \ + list_entry((_list)->prev, _type, _field) + +#define list_next_entry(_ptr, _field) \ + list_first_entry(&_ptr->_field, __typeof(*_ptr), _field) + +#define list_for_each_entry(_ptr, _list, _field) \ + for (_ptr = list_first_entry(_list, __typeof(*_ptr), _field); \ + &_ptr->_field != _list; _ptr = list_next_entry(_ptr, _field)) + +static inline void list_init(list_t *list) +{ + list->next = list; + list->prev = list; +} + +static inline bool list_empty(list_t *list) +{ + return list->next == list; +} + +static inline void list_add_between(list_t *item, list_t *prev, list_t *next) +{ + item->next = next; + item->prev = prev; + prev->next = item; + next->prev = item; +} + +static inline void list_add(list_t *item, list_t *list) +{ + list_add_between(item, list, list->next); +} + +static inline void list_add_tail(list_t *item, list_t *list) +{ + list_add_between(item, list->prev, list); +} + +static inline void list_del(list_t *list) +{ + list->next->prev = list->prev; + list->prev->next = list->next; +} + +static inline void list_del_init(list_t *list) +{ + list_del(list); + list_init(list); +} + +static inline void list_move(list_t *item, list_t *list) +{ + list_del(item); + list_add(item, list); +} + +static inline void list_move_tail(list_t *item, list_t *list) +{ + list_del(item); + list_add_tail(item, list); +} + +static inline void list_splice_between(list_t *src, list_t *prev, list_t *next) +{ + list_t *first, *last; + + first = src->next; + last = src->prev; + + first->prev = prev; + prev->next = first; + + last->next = next; + next->prev = last; +} + +static inline void list_splice(list_t *src, list_t *dst) +{ + if (!list_empty(src)) { + list_splice_between(src, dst, dst->next); + } +} + +static inline void list_splice_tail(list_t *src, list_t *dst) +{ + if (!list_empty(src)) { + list_splice_between(src, dst->prev, dst); + } +} + +static inline void list_splice_init(list_t *src, list_t *dst) +{ + if (!list_empty(src)) { + list_splice_between(src, dst, dst->next); + list_init(src); + } +} + +static inline void list_splice_tail_init(list_t *src, list_t *dst) +{ + if (!list_empty(src)) { + list_splice_between(src, dst->prev, dst); + list_init(src); + } +} + +#endif diff --git a/src/libcephfs_proxy/proxy_log.c b/src/libcephfs_proxy/proxy_log.c new file mode 100644 index 00000000000..dc1afed63de --- /dev/null +++ b/src/libcephfs_proxy/proxy_log.c @@ -0,0 +1,110 @@ + +#include <stdio.h> +#include <stdarg.h> + +#include "proxy_log.h" +#include "proxy_helpers.h" +#include "proxy_list.h" + +#define PROXY_LOG_BUFFER_SIZE 4096 + +static __thread char proxy_log_buffer[PROXY_LOG_BUFFER_SIZE]; + +static pthread_rwlock_t proxy_log_mutex = PTHREAD_RWLOCK_INITIALIZER; +static list_t proxy_log_handlers = LIST_INIT(&proxy_log_handlers); + +static void proxy_log_write(int32_t level, int32_t err, const char *msg) +{ + proxy_log_handler_t *handler; + + proxy_rwmutex_rdlock(&proxy_log_mutex); + + list_for_each_entry(handler, &proxy_log_handlers, list) { + handler->callback(handler, level, err, msg); + } + + proxy_rwmutex_unlock(&proxy_log_mutex); +} + +__public void proxy_log_register(proxy_log_handler_t *handler, + proxy_log_callback_t callback) +{ + handler->callback = callback; + + proxy_rwmutex_wrlock(&proxy_log_mutex); + + list_add_tail(&handler->list, &proxy_log_handlers); + + proxy_rwmutex_unlock(&proxy_log_mutex); +} + +__public void proxy_log_deregister(proxy_log_handler_t *handler) +{ + proxy_rwmutex_wrlock(&proxy_log_mutex); + + list_del_init(&handler->list); + + proxy_rwmutex_unlock(&proxy_log_mutex); +} + +static void proxy_log_msg(char *buffer, const char *text) +{ + int32_t len; + + len = strlen(text) + 1; + + memcpy(buffer, text, len); +} + +int32_t proxy_log_args(int32_t level, int32_t err, const char *fmt, + va_list args) +{ + static __thread bool busy = false; + int32_t len; + + if (busy) { + return -err; + } + busy = true; + + len = vsnprintf(proxy_log_buffer, sizeof(proxy_log_buffer), fmt, args); + if (len < 0) { + proxy_log_msg(proxy_log_buffer, + "<log message formatting failed>"); + } else if (len >= sizeof(proxy_log_buffer)) { + proxy_log_msg(proxy_log_buffer + sizeof(proxy_log_buffer) - 6, + "[...]"); + } + + proxy_log_write(level, err, proxy_log_buffer); + + busy = false; + + return -err; +} + +int32_t proxy_log(int32_t level, int32_t err, const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + err = proxy_log_args(level, err, fmt, args); + va_end(args); + + return err; +} + +void proxy_abort_args(int32_t err, const char *fmt, va_list args) +{ + proxy_log_args(LOG_CRIT, err, fmt, args); + abort(); +} + +void proxy_abort(int32_t err, const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + proxy_abort_args(err, fmt, args); + va_end(args); +} diff --git a/src/libcephfs_proxy/proxy_log.h b/src/libcephfs_proxy/proxy_log.h new file mode 100644 index 00000000000..02f45f9b110 --- /dev/null +++ b/src/libcephfs_proxy/proxy_log.h @@ -0,0 +1,28 @@ + +#ifndef __LIBCEPHFSD_PROXY_LOG_H__ +#define __LIBCEPHFSD_PROXY_LOG_H__ + +#include "proxy.h" + +enum { LOG_CRIT, LOG_ERR, LOG_WARN, LOG_INFO, LOG_DBG }; + +struct _proxy_log_handler { + list_t list; + proxy_log_callback_t callback; +}; + +int32_t proxy_log_args(int32_t level, int32_t err, const char *fmt, + va_list args); + +int32_t proxy_log(int32_t level, int32_t err, const char *fmt, ...); + +void proxy_abort_args(int32_t err, const char *fmt, va_list args); + +void proxy_abort(int32_t err, const char *fmt, ...); + +void proxy_log_register(proxy_log_handler_t *handler, + proxy_log_callback_t callback); + +void proxy_log_deregister(proxy_log_handler_t *handler); + +#endif diff --git a/src/libcephfs_proxy/proxy_manager.c b/src/libcephfs_proxy/proxy_manager.c new file mode 100644 index 00000000000..ea57083e700 --- /dev/null +++ b/src/libcephfs_proxy/proxy_manager.c @@ -0,0 +1,247 @@ + +#include <signal.h> + +#include "proxy_manager.h" +#include "proxy_helpers.h" +#include "proxy_list.h" +#include "proxy_log.h" + +static void proxy_manager_signal_handler(int32_t signum, siginfo_t *info, + void *ctx) +{ +} + +static void proxy_worker_register(proxy_worker_t *worker) +{ + proxy_manager_t *manager; + + manager = worker->manager; + + proxy_mutex_lock(&manager->mutex); + + list_add_tail(&worker->list, &manager->workers); + + proxy_mutex_unlock(&manager->mutex); +} + +static void proxy_worker_deregister(proxy_worker_t *worker) +{ + proxy_manager_t *manager; + + manager = worker->manager; + + proxy_mutex_lock(&manager->mutex); + + list_del_init(&worker->list); + if (list_empty(&manager->workers)) { + proxy_condition_signal(&manager->condition); + } + + proxy_mutex_unlock(&manager->mutex); +} + +static void proxy_worker_finished(proxy_worker_t *worker) +{ + proxy_manager_t *manager; + + manager = worker->manager; + + proxy_mutex_lock(&manager->mutex); + + if (list_empty(&manager->finished)) { + proxy_condition_signal(&manager->condition); + } + + list_move_tail(&worker->list, &manager->finished); + + proxy_mutex_unlock(&manager->mutex); +} + +static void *proxy_worker_start(void *arg) +{ + proxy_worker_t *worker; + + worker = arg; + + worker->start(worker); + + proxy_worker_finished(worker); + + return NULL; +} + +static void *proxy_manager_start(void *arg) +{ + proxy_manager_t *manager; + proxy_worker_t *worker; + + manager = arg; + + proxy_mutex_lock(&manager->mutex); + + while (true) { + while (!list_empty(&manager->finished)) { + worker = list_first_entry(&manager->finished, + proxy_worker_t, list); + list_del_init(&worker->list); + + proxy_mutex_unlock(&manager->mutex); + + proxy_thread_join(worker->tid); + + if (worker->destroy != NULL) { + worker->destroy(worker); + } + + proxy_mutex_lock(&manager->mutex); + } + + if (manager->stop && list_empty(&manager->workers)) { + break; + } + + proxy_condition_wait(&manager->condition, &manager->mutex); + } + + manager->done = true; + proxy_condition_signal(&manager->condition); + + proxy_mutex_unlock(&manager->mutex); + + return NULL; +} + +static int32_t proxy_manager_init(proxy_manager_t *manager) +{ + int32_t err; + + list_init(&manager->workers); + list_init(&manager->finished); + + manager->stop = false; + manager->done = false; + + manager->main_tid = pthread_self(); + + err = proxy_mutex_init(&manager->mutex); + if (err < 0) { + return err; + } + + err = proxy_condition_init(&manager->condition); + if (err < 0) { + pthread_mutex_destroy(&manager->mutex); + } + + return err; +} + +static void proxy_manager_destroy(proxy_manager_t *manager) +{ + pthread_cond_destroy(&manager->condition); + pthread_mutex_destroy(&manager->mutex); +} + +static int32_t proxy_manager_setup_signals(struct sigaction *old) +{ + struct sigaction action; + + /* The CONT signal will be used to wake threads blocked in I/O. */ + memset(&action, 0, sizeof(action)); + action.sa_flags = SA_SIGINFO; + action.sa_sigaction = proxy_manager_signal_handler; + + return proxy_signal_set(SIGCONT, &action, old); +} + +static void proxy_manager_restore_signals(struct sigaction *action) +{ + proxy_signal_set(SIGCONT, action, NULL); +} + +static void proxy_manager_terminate(proxy_manager_t *manager) +{ + proxy_worker_t *worker; + + proxy_mutex_lock(&manager->mutex); + + list_for_each_entry(worker, &manager->workers, list) { + worker->stop = true; + proxy_thread_kill(worker->tid, SIGCONT); + } + + while (!manager->done) { + proxy_condition_wait(&manager->condition, &manager->mutex); + } + + proxy_mutex_unlock(&manager->mutex); + + proxy_thread_join(manager->tid); +} + +int32_t proxy_manager_run(proxy_manager_t *manager, proxy_manager_start_t start) +{ + struct sigaction old_action; + int32_t err; + + err = proxy_manager_init(manager); + if (err < 0) { + return err; + } + + err = proxy_manager_setup_signals(&old_action); + if (err < 0) { + goto done_destroy; + } + + err = proxy_thread_create(&manager->tid, proxy_manager_start, manager); + if (err < 0) { + goto done_signal; + } + + err = start(manager); + + proxy_manager_terminate(manager); + +done_signal: + proxy_manager_restore_signals(&old_action); + +done_destroy: + proxy_manager_destroy(manager); + + return err; +} + +void proxy_manager_shutdown(proxy_manager_t *manager) +{ + proxy_mutex_lock(&manager->mutex); + + manager->stop = true; + proxy_condition_signal(&manager->condition); + + proxy_mutex_unlock(&manager->mutex); + + /* Wake the thread if it was blocked in an I/O operation. */ + proxy_thread_kill(manager->main_tid, SIGCONT); +} + +int32_t proxy_manager_launch(proxy_manager_t *manager, proxy_worker_t *worker, + proxy_worker_start_t start, + proxy_worker_destroy_t destroy) +{ + int32_t err; + + worker->manager = manager; + worker->start = start; + worker->destroy = destroy; + worker->stop = false; + + proxy_worker_register(worker); + + err = proxy_thread_create(&worker->tid, proxy_worker_start, worker); + if (err < 0) { + proxy_worker_deregister(worker); + } + + return err; +} diff --git a/src/libcephfs_proxy/proxy_manager.h b/src/libcephfs_proxy/proxy_manager.h new file mode 100644 index 00000000000..6a539be8d5b --- /dev/null +++ b/src/libcephfs_proxy/proxy_manager.h @@ -0,0 +1,43 @@ + +#ifndef __LIBCEPHFSD_PROXY_MANAGER_H__ +#define __LIBCEPHFSD_PROXY_MANAGER_H__ + +#include <pthread.h> + +#include "proxy.h" + +struct _proxy_worker { + list_t list; + pthread_t tid; + proxy_manager_t *manager; + proxy_worker_start_t start; + proxy_worker_destroy_t destroy; + bool stop; +}; + +struct _proxy_manager { + list_t workers; + list_t finished; + pthread_t main_tid; + pthread_t tid; + pthread_mutex_t mutex; + pthread_cond_t condition; + bool stop; + bool done; +}; + +int32_t proxy_manager_run(proxy_manager_t *manager, + proxy_manager_start_t start); + +void proxy_manager_shutdown(proxy_manager_t *manager); + +int32_t proxy_manager_launch(proxy_manager_t *manager, proxy_worker_t *worker, + proxy_worker_start_t start, + proxy_worker_destroy_t destroy); + +static inline bool proxy_manager_stop(proxy_manager_t *manager) +{ + return manager->stop; +} + +#endif diff --git a/src/libcephfs_proxy/proxy_mount.c b/src/libcephfs_proxy/proxy_mount.c new file mode 100644 index 00000000000..abfef1232c2 --- /dev/null +++ b/src/libcephfs_proxy/proxy_mount.c @@ -0,0 +1,1246 @@ + +#include "proxy_mount.h" +#include "proxy_helpers.h" + +#include <stdio.h> +#include <unistd.h> +#include <sys/stat.h> +#include <fcntl.h> + +/* Maximum number of symlinks to visit while resolving a path before returning + * ELOOP. */ +#define PROXY_MAX_SYMLINKS 16 + +struct _proxy_linked_str; +typedef struct _proxy_linked_str proxy_linked_str_t; + +/* This structure is used to handle symlinks found during the walk of a path. + * + * We'll start with an initial string representing a path. If one of the + * components is found to be a symlink, a new proxy_linked_str_t will be + * created with the content of the symlink. Then the new string will point + * to the old string, which may still contain some additional path components. + * The new string will be traversed resolving symlinks as they are found in the + * same way. Once it finished, the old string is recovered and traversal + * continues from the point it was left. */ +struct _proxy_linked_str { + proxy_linked_str_t *next; + char *remaining; + char data[]; +}; + +/* This structure is used to traverse a path while resolving any symlink + * found. At the end, it will contain the realpath of the entry and its + * inode. */ +typedef struct _proxy_path_iterator { + struct ceph_statx stx; + struct ceph_mount_info *cmount; + proxy_linked_str_t *lstr; + UserPerm *perms; + struct Inode *root; + struct Inode *base; + char *realpath; + uint64_t root_ino; + uint64_t base_ino; + uint32_t realpath_size; + uint32_t realpath_len; + uint32_t symlinks; + bool release; + bool follow; +} proxy_path_iterator_t; + +typedef struct _proxy_config { + int32_t src; + int32_t dst; + int32_t size; + int32_t total; + void *buffer; +} proxy_config_t; + +typedef struct _proxy_change { + list_t list; + uint32_t size; + char data[]; +} proxy_change_t; + +typedef struct _proxy_iter { + proxy_instance_t *instance; + list_t *item; +} proxy_iter_t; + +typedef struct _proxy_instance_pool { + pthread_mutex_t mutex; + list_t hash[256]; +} proxy_mount_pool_t; + +static proxy_mount_pool_t instance_pool = { + .mutex = PTHREAD_MUTEX_INITIALIZER, +}; + +/* Ceph client instance sharing + * + * The main purpose of the libcephfs proxy is to avoid the multiple independent + * data caches that are created when libcephfs is used from different processes. + * However the cache is not created per process but per client instance, so each + * call to `ceph_create()` creates its own private data cache instance. Just + * forwarding the libcephfs API calls to a single proxy process is not enough to + * solve the problem. + * + * The proxy will try to reuse existing client instances to reduce the number of + * independent caches. However it's not always possible to map all proxy clients + * to a single libcephfs instance. When different settings are used, separate + * Ceph instances are required to avoid unwanted behaviors. + * + * Even though it's possible that some Ceph options may be compatible even if + * they have different values, the proxy won't try to handle these cases. It + * will consider the configuration as a black box, and only 100% equal + * configurations will share the Ceph client instance. + */ + +/* Ceph configuration file management + * + * We won't try to parse Ceph configuration files. The proxy only wants to know + * if a configuration is equal or not. To do so, when a configuration file is + * passed to the proxy, it will create a private copy and compute an SHA256 + * hash. If the hash doesn't match, the configuration is considered different, + * even if it's not a real difference (like additional empty lines or the order + * of the options). + * + * The private copy is necessary to enforce that the settings are not changed + * concurrently, which could make us believe that two configurations are equal + * when they are not. + * + * Besides a configuration file, the user can also make manual configuration + * changes by using `ceph_conf_set()`. These changes are also tracked and + * compared to be sure that the active configuration matches. Only if the + * configuration file is exactly equal and all the applied changes are the same, + * and in the same order, the Ceph client instance will be shared. + */ + +int32_t proxy_inode_ref(proxy_mount_t *mount, uint64_t inode) +{ + inodeno_t ino; + struct Inode *tmp; + int32_t err; + + /* There's no way to tell libcephfs to increase the reference counter of + * an inode, so we do a full lookup for now. */ + + ino.val = inode; + + err = ceph_ll_lookup_inode(proxy_cmount(mount), ino, &tmp); + if (err < 0) { + proxy_log(LOG_ERR, -err, "ceph_ll_loolkup_inode() failed"); + } + + return err; +} + +static proxy_linked_str_t *proxy_linked_str_create(const char *str, + proxy_linked_str_t *next) +{ + proxy_linked_str_t *lstr; + uint32_t len; + + len = strlen(str) + 1; + lstr = proxy_malloc(sizeof(proxy_linked_str_t) + len); + if (lstr != NULL) { + lstr->next = next; + if (len > 1) { + lstr->remaining = lstr->data; + memcpy(lstr->data, str, len); + } else { + lstr->remaining = NULL; + } + } + + return lstr; +} + +static proxy_linked_str_t *proxy_linked_str_next(proxy_linked_str_t *lstr) +{ + proxy_linked_str_t *next; + + next = lstr->next; + proxy_free(lstr); + + return next; +} + +static void proxy_linked_str_destroy(proxy_linked_str_t *lstr) +{ + while (lstr != NULL) { + lstr = proxy_linked_str_next(lstr); + } +} + +static bool proxy_linked_str_empty(proxy_linked_str_t *lstr) +{ + return lstr->remaining == NULL; +} + +static char *proxy_linked_str_scan(proxy_linked_str_t *lstr, char ch) +{ + char *current; + + current = lstr->remaining; + lstr->remaining = strchr(lstr->remaining, ch); + if (lstr->remaining != NULL) { + *lstr->remaining++ = 0; + } + + return current; +} + +static int32_t proxy_path_iterator_init(proxy_path_iterator_t *iter, + proxy_mount_t *mount, const char *path, + UserPerm *perms, bool realpath, + bool follow) +{ + uint32_t len; + char ch; + + if (path == NULL) { + return proxy_log(LOG_ERR, EINVAL, "NULL path received"); + } + + memset(&iter->stx, 0, sizeof(iter->stx)); + iter->cmount = proxy_cmount(mount); + iter->perms = perms; + iter->root = mount->root; + iter->root_ino = mount->root_ino; + iter->base = mount->cwd; + iter->base_ino = mount->cwd_ino; + iter->symlinks = 0; + iter->release = false; + iter->follow = follow; + + len = strlen(path) + 1; + + ch = *path; + if (ch == '/') { + iter->base = mount->root; + iter->base_ino = mount->root_ino; + path++; + } + + iter->realpath = NULL; + iter->realpath_len = 0; + iter->realpath_size = 0; + + if (realpath) { + if (ch != '/') { + len += mount->cwd_path_len; + } + len = (len + 63) & ~63; + iter->realpath_size = len; + + iter->realpath = proxy_malloc(len); + if (iter->realpath == NULL) { + return -ENOMEM; + } + if (ch != '/') { + memcpy(iter->realpath, mount->cwd_path, + mount->cwd_path_len + 1); + iter->realpath_len = mount->cwd_path_len; + } else { + iter->realpath[0] = '/'; + iter->realpath[1] = 0; + iter->realpath_len = 1; + } + } + + iter->lstr = proxy_linked_str_create(path, NULL); + if (iter->lstr == NULL) { + proxy_free(iter->realpath); + return -ENOMEM; + } + + return 0; +} + +static char *proxy_path_iterator_next(proxy_path_iterator_t *iter) +{ + while (proxy_linked_str_empty(iter->lstr)) { + iter->lstr = proxy_linked_str_next(iter->lstr); + if (iter->lstr == NULL) { + return NULL; + } + } + + return proxy_linked_str_scan(iter->lstr, '/'); +} + +static bool proxy_path_iterator_is_last(proxy_path_iterator_t *iter) +{ + proxy_linked_str_t *lstr; + + lstr = iter->lstr; + while (proxy_linked_str_empty(iter->lstr)) { + lstr = lstr->next; + if (lstr == NULL) { + return true; + } + } + + return false; +} + +static void proxy_path_iterator_destroy(proxy_path_iterator_t *iter) +{ + if (iter->release) { + ceph_ll_put(iter->cmount, iter->base); + } + + proxy_free(iter->realpath); + proxy_linked_str_destroy(iter->lstr); +} + +static int32_t proxy_path_iterator_resolve(proxy_path_iterator_t *iter) +{ + static __thread char path[PATH_MAX]; + proxy_linked_str_t *lstr; + char *ptr; + int32_t err; + + if (++iter->symlinks > PROXY_MAX_SYMLINKS) { + return proxy_log(LOG_ERR, ELOOP, "Too many symbolic links"); + } + + err = ceph_ll_readlink(iter->cmount, iter->base, path, sizeof(path), + iter->perms); + if (err < 0) { + return proxy_log(LOG_ERR, -err, "ceph_ll_readlink() failed"); + } + + ptr = path; + if (*ptr == '/') { + if (iter->release) { + ceph_ll_put(iter->cmount, iter->base); + } + iter->base = iter->root; + iter->base_ino = iter->root_ino; + iter->release = false; + if (iter->realpath != NULL) { + iter->realpath[1] = 0; + iter->realpath_len = 1; + } + + ptr++; + } + + lstr = proxy_linked_str_create(ptr, iter->lstr); + if (lstr == NULL) { + return -ENOMEM; + } + iter->lstr = lstr; + + return 0; +} + +static int32_t proxy_path_iterator_append(proxy_path_iterator_t *iter, + const char *name) +{ + uint32_t len, size; + int32_t err; + + len = strlen(name) + 1; + size = iter->realpath_size; + if (iter->realpath_len + len >= size) { + do { + size <<= 1; + } while (iter->realpath_len + len >= size); + err = proxy_realloc((void **)&iter->realpath, size); + if (err < 0) { + return err; + } + iter->realpath_size = size; + } + + if (iter->realpath_len > 1) { + iter->realpath[iter->realpath_len++] = '/'; + } + memcpy(iter->realpath + iter->realpath_len, name, len); + iter->realpath_len += len - 1; + + return 0; +} + +static void proxy_path_iterator_remove(proxy_path_iterator_t *iter) +{ + while ((iter->realpath_len > 0) && + (iter->realpath[--iter->realpath_len] != '/')) { + } +} + +static int32_t proxy_path_lookup(struct ceph_mount_info *cmount, + struct Inode *parent, const char *name, + struct Inode **inode, struct ceph_statx *stx, + uint32_t want, uint32_t flags, UserPerm *perms) +{ + int32_t err; + + err = ceph_ll_lookup(cmount, parent, name, inode, stx, want, flags, + perms); + if (err < 0) { + return proxy_log(LOG_ERR, -err, "ceph_ll_lookup() failed"); + } + + return err; +} + +static int32_t proxy_path_iterator_lookup(proxy_path_iterator_t *iter, + const char *name) +{ + struct Inode *inode; + int32_t err; + + if (S_ISLNK(iter->stx.stx_mode)) { + return proxy_path_iterator_resolve(iter); + } + + err = proxy_path_lookup(iter->cmount, iter->base, name, &inode, + &iter->stx, CEPH_STATX_INO | CEPH_STATX_MODE, + AT_SYMLINK_NOFOLLOW, iter->perms); + if (err < 0) { + return err; + } + + if (iter->realpath != NULL) { + if ((name[0] == '.') && (name[1] == '.') && (name[2] == 0)) { + proxy_path_iterator_remove(iter); + } else { + err = proxy_path_iterator_append(iter, name); + if (err < 0) { + ceph_ll_put(iter->cmount, inode); + return err; + } + } + } + + if (iter->release) { + ceph_ll_put(iter->cmount, iter->base); + } + iter->base = inode; + iter->base_ino = iter->stx.stx_ino; + iter->release = true; + + if (iter->follow && S_ISLNK(iter->stx.stx_mode) && + proxy_path_iterator_is_last(iter)) { + return proxy_path_iterator_resolve(iter); + } + + return 0; +} + +/* Implements a path walk ensuring that it's not possible to go higher than the + * root mount point used in ceph_mount(). This means that it handles absolute + * paths and ".." entries in a special way, including paths found in symbolic + * links. */ +int32_t proxy_path_resolve(proxy_mount_t *mount, const char *path, + struct Inode **inode, struct ceph_statx *stx, + uint32_t want, uint32_t flags, UserPerm *perms, + char **realpath) +{ + proxy_path_iterator_t iter; + char *name, c; + int32_t err; + + err = proxy_path_iterator_init(&iter, mount, path, perms, + realpath != NULL, + (flags & AT_SYMLINK_NOFOLLOW) == 0); + if (err < 0) { + return err; + } + + while ((err >= 0) && + ((name = proxy_path_iterator_next(&iter)) != NULL)) { + c = *name; + if (c == '.') { + c = name[1]; + if ((c == '.') && (iter.base == mount->root)) { + c = name[2]; + } + } + if (c == 0) { + continue; + } + + err = proxy_path_iterator_lookup(&iter, name); + } + + if (err >= 0) { + err = proxy_path_lookup(proxy_cmount(mount), iter.base, ".", + inode, stx, want, flags, iter.perms); + } + + if ((err >= 0) && (realpath != NULL)) { + *realpath = iter.realpath; + iter.realpath = NULL; + } + + proxy_path_iterator_destroy(&iter); + + return err; +} + +static int32_t proxy_config_source_prepare(const char *config, struct stat *st) +{ + int32_t fd, err; + + fd = open(config, O_RDONLY); + if (fd < 0) { + return proxy_log(LOG_ERR, errno, "open() failed"); + } + + if (fstat(fd, st) < 0) { + err = proxy_log(LOG_ERR, errno, "fstat() failed"); + goto failed; + } + + if (!S_ISREG(st->st_mode)) { + err = proxy_log(LOG_ERR, EINVAL, + "Configuration file is not a regular file"); + goto failed; + } + + return fd; + +failed: + close(fd); + + return err; +} + +static void proxy_config_source_close(int32_t fd) +{ + close(fd); +} + +static int32_t proxy_config_source_read(int32_t fd, void *buffer, size_t size) +{ + ssize_t len; + + len = read(fd, buffer, size); + if (len < 0) { + return proxy_log(LOG_ERR, errno, "read() failed"); + } + + return len; +} + +static int32_t proxy_config_source_validate(int32_t fd, struct stat *before, + int32_t size) +{ + struct stat after; + + if (fstat(fd, &after) < 0) { + return proxy_log(LOG_ERR, errno, "fstat() failed"); + } + + if ((before->st_size != size) || (before->st_size != after.st_size) || + (before->st_blocks != after.st_blocks) || + (before->st_ctim.tv_sec != after.st_ctim.tv_sec) || + (before->st_ctim.tv_nsec != after.st_ctim.tv_nsec) || + (before->st_mtim.tv_sec != after.st_mtim.tv_sec) || + (before->st_mtim.tv_nsec != after.st_mtim.tv_nsec)) { + proxy_log(LOG_WARN, 0, + "Configuration file has been modified while " + "reading it"); + + return 0; + } + + return 1; +} + +static int32_t proxy_config_destination_prepare(void) +{ + int32_t fd; + + fd = openat(AT_FDCWD, ".", O_TMPFILE | O_WRONLY, 0600); + if (fd < 0) { + return proxy_log(LOG_ERR, errno, "openat() failed"); + } + + return fd; +} + +static void proxy_config_destination_close(int32_t fd) +{ + close(fd); +} + +static int32_t proxy_config_destination_write(int32_t fd, void *data, + size_t size) +{ + ssize_t len; + + len = write(fd, data, size); + if (len < 0) { + return proxy_log(LOG_ERR, errno, "write() failed"); + } + if (len != size) { + return proxy_log(LOG_ERR, ENOSPC, "Partial write"); + } + + return size; +} + +static int32_t proxy_config_destination_commit(int32_t fd, const char *name) +{ + char path[32]; + + if (fsync(fd) < 0) { + return proxy_log(LOG_ERR, errno, "fsync() failed"); + } + + if (linkat(fd, "", AT_FDCWD, name, AT_EMPTY_PATH) < 0) { + if (errno == EEXIST) { + return 0; + } + + /* This may fail if the user doesn't have CAP_DAC_READ_SEARCH. + * In this case we attempt to link it using the /proc + * filesystem. */ + } + + snprintf(path, sizeof(path), "/proc/self/fd/%d", fd); + if (linkat(AT_FDCWD, path, AT_FDCWD, name, AT_SYMLINK_FOLLOW) < 0) { + if (errno != EEXIST) { + return proxy_log(LOG_ERR, errno, "linkat() failed"); + } + } + + return 0; +} + +static int32_t proxy_config_transfer(void **ptr, void *data, int32_t idx) +{ + proxy_config_t *cfg; + int32_t len, err; + + cfg = data; + + len = proxy_config_source_read(cfg->src, cfg->buffer, cfg->size); + if (len <= 0) { + return len; + } + + err = proxy_config_destination_write(cfg->dst, cfg->buffer, len); + if (err < 0) { + return err; + } + + cfg->total += len; + + *ptr = cfg->buffer; + + return len; +} + +/* Copies and checksums a given configuration to a file and makes sure that it + * has not been modified. */ +static int32_t proxy_config_prepare(const char *config, char *path, + int32_t size) +{ + char hash[65]; + proxy_config_t cfg; + struct stat before; + int32_t err; + + cfg.size = 4096; + cfg.buffer = proxy_malloc(cfg.size); + if (cfg.buffer == NULL) { + return -ENOMEM; + } + cfg.total = 0; + + cfg.src = proxy_config_source_prepare(config, &before); + if (cfg.src < 0) { + err = cfg.src; + goto done_mem; + } + + cfg.dst = proxy_config_destination_prepare(); + if (cfg.dst < 0) { + err = cfg.dst; + goto done_src; + } + + err = proxy_hash_hex(hash, sizeof(hash), proxy_config_transfer, &cfg); + if (err < 0) { + goto done_dst; + } + + err = proxy_config_source_validate(cfg.src, &before, cfg.total); + if (err < 0) { + goto done_dst; + } + + err = snprintf(path, size, "ceph-%s.conf", hash); + if (err < 0) { + err = proxy_log(LOG_ERR, errno, "snprintf() failed"); + goto done_dst; + } + if (err >= size) { + err = proxy_log(LOG_ERR, ENOBUFS, + "Insufficient space to store the name"); + goto done_dst; + } + + err = proxy_config_destination_commit(cfg.dst, path); + +done_dst: + proxy_config_destination_close(cfg.dst); + +done_src: + proxy_config_source_close(cfg.src); + +done_mem: + proxy_free(cfg.buffer); + + return err; +} + +/* Record changes to the configuration. */ +static int32_t proxy_instance_change_add(proxy_instance_t *instance, + const char *arg1, const char *arg2, + const char *arg3) +{ + proxy_change_t *change; + int32_t len[3], total; + + len[0] = strlen(arg1) + 1; + if (arg2 == NULL) { + arg2 = "<null>"; + } + len[1] = strlen(arg2) + 1; + len[2] = 0; + if (arg3 != NULL) { + len[2] = strlen(arg3) + 1; + } + + total = len[0] + len[1] + len[2]; + + change = proxy_malloc(sizeof(proxy_change_t) + total); + if (change == NULL) { + return -ENOMEM; + } + change->size = total; + + memcpy(change->data, arg1, len[0]); + memcpy(change->data + len[0], arg2, len[1]); + if (arg3 != NULL) { + memcpy(change->data + len[0] + len[1], arg3, len[2]); + } + + list_add_tail(&change->list, &instance->changes); + + return 0; +} + +static void proxy_instance_change_del(proxy_instance_t *instance) +{ + proxy_change_t *change; + + change = list_last_entry(&instance->changes, proxy_change_t, list); + list_del(&change->list); + + proxy_free(change); +} + +/* Destroy a Ceph client instance */ +static void proxy_instance_destroy(proxy_instance_t *instance) +{ + if (instance->mounted) { + ceph_unmount(instance->cmount); + } + + if (instance->cmount != NULL) { + ceph_release(instance->cmount); + } + + while (!list_empty(&instance->changes)) { + proxy_instance_change_del(instance); + } + + proxy_free(instance); +} + +/* Create a new Ceph client instance with the provided id */ +static int32_t proxy_instance_create(proxy_instance_t **pinstance, + const char *id) +{ + struct ceph_mount_info *cmount; + proxy_instance_t *instance; + int32_t err; + + instance = proxy_malloc(sizeof(proxy_instance_t)); + if (instance == NULL) { + return -ENOMEM; + } + + list_init(&instance->siblings); + list_init(&instance->changes); + instance->cmount = NULL; + instance->inited = false; + instance->mounted = false; + + err = proxy_instance_change_add(instance, "id", id, NULL); + if (err < 0) { + goto failed; + } + + err = ceph_create(&cmount, id); + if (err < 0) { + proxy_log(LOG_ERR, -err, "ceph_create() failed"); + goto failed; + } + + instance->cmount = cmount; + + *pinstance = instance; + + return 0; + +failed: + proxy_instance_destroy(instance); + + return err; +} + +static int32_t proxy_instance_release(proxy_instance_t *instance) +{ + if (instance->mounted) { + return proxy_log(LOG_ERR, EISCONN, + "Cannot release an active connection"); + } + + proxy_instance_destroy(instance); + + return 0; +} + +/* Assign a configuration file to the instance. */ +static int32_t proxy_instance_config(proxy_instance_t *instance, + const char *config) +{ + char path[128], *ppath; + int32_t err; + + if (instance->mounted) { + return proxy_log(LOG_ERR, EISCONN, + "Cannot configure a mounted instance"); + } + + ppath = NULL; + if (config != NULL) { + err = proxy_config_prepare(config, path, sizeof(path)); + if (err < 0) { + return err; + } + ppath = path; + } + + err = proxy_instance_change_add(instance, "conf", ppath, NULL); + if (err < 0) { + return err; + } + + err = ceph_conf_read_file(instance->cmount, ppath); + if (err < 0) { + proxy_instance_change_del(instance); + } + + return err; +} + +static int32_t proxy_instance_option_get(proxy_instance_t *instance, + const char *name, char *value, + size_t size) +{ + int32_t err, res; + + if (name == NULL) { + return proxy_log(LOG_ERR, EINVAL, "NULL option name"); + } + + res = ceph_conf_get(instance->cmount, name, value, size); + if (res < 0) { + return proxy_log( + LOG_ERR, -res, + "Failed to get configuration from a client instance"); + } + + err = proxy_instance_change_add(instance, "get", name, value); + if (err < 0) { + return err; + } + + return res; +} + +static int32_t proxy_instance_option_set(proxy_instance_t *instance, + const char *name, const char *value) +{ + int32_t err; + + if ((name == NULL) || (value == NULL)) { + return proxy_log(LOG_ERR, EINVAL, "NULL value or option name"); + } + + if (instance->mounted) { + return proxy_log(LOG_ERR, EISCONN, + "Cannot configure a mounted instance"); + } + + err = proxy_instance_change_add(instance, "set", name, value); + if (err < 0) { + return err; + } + + err = ceph_conf_set(instance->cmount, name, value); + if (err < 0) { + proxy_log(LOG_ERR, -err, + "Failed to configure a client instance"); + proxy_instance_change_del(instance); + } + + return err; +} + +static int32_t proxy_instance_select(proxy_instance_t *instance, const char *fs) +{ + int32_t err; + + if (instance->mounted) { + return proxy_log( + LOG_ERR, EISCONN, + "Cannot select a filesystem on a mounted instance"); + } + + err = proxy_instance_change_add(instance, "fs", fs, NULL); + if (err < 0) { + return err; + } + + err = ceph_select_filesystem(instance->cmount, fs); + if (err < 0) { + proxy_log(LOG_ERR, -err, + "Failed to select a filesystem on a client instance"); + proxy_instance_change_del(instance); + } + + return err; +} + +static int32_t proxy_instance_init(proxy_instance_t *instance) +{ + if (instance->mounted || instance->inited) { + return 0; + } + + /* ceph_init() does start several internal threads. However, an instance + * may not end up being mounted if the configuration matches with + * another mounted instance. Since ceph_mount() also calls ceph_init() + * if not already done, we avoid initializing it here to reduce resource + * consumption. */ + + instance->inited = true; + + return 0; +} + +static int32_t proxy_instance_hash(void **ptr, void *data, int32_t idx) +{ + proxy_iter_t *iter; + proxy_change_t *change; + + iter = data; + + if (iter->item == &iter->instance->changes) { + return 0; + } + + change = list_entry(iter->item, proxy_change_t, list); + iter->item = iter->item->next; + + *ptr = change->data; + + return change->size; +} + +/* Check if an existing instance matches the configuration used for the current + * one. If so, share the mount. Otherwise, create a new mount. */ +static int32_t proxy_instance_mount(proxy_instance_t **pinstance) +{ + proxy_instance_t *instance, *existing; + proxy_iter_t iter; + list_t *list; + int32_t err; + + instance = *pinstance; + + if (instance->mounted) { + return proxy_log(LOG_ERR, EISCONN, + "Cannot mount and already mounted instance"); + } + + iter.instance = instance; + iter.item = instance->changes.next; + + /* Create a hash that includes all settings. */ + err = proxy_hash(instance->hash, sizeof(instance->hash), + proxy_instance_hash, &iter); + if (err < 0) { + return err; + } + + list = &instance_pool.hash[instance->hash[0]]; + + proxy_mutex_lock(&instance_pool.mutex); + + if (list->next == NULL) { + list_init(list); + } else { + list_for_each_entry(existing, list, list) { + if (memcmp(existing->hash, instance->hash, 32) == 0) { + /* A match has been found. Instead of destroying + * the current instance, it's stored as a + * sibling of the one found. It will be + * reassigned to an instance when someone + * unmounts. */ + list_add(&instance->list, &existing->siblings); + goto found; + } + } + } + + /* No matching instance has been found. Just create a new one. The root + * is always "/". Each virtual mount point will locally store its root + * path. */ + err = ceph_mount(instance->cmount, "/"); + if (err >= 0) { + err = ceph_ll_lookup_root(instance->cmount, &instance->root); + if (err >= 0) { + instance->inited = true; + instance->mounted = true; + list_add(&instance->list, list); + } else { + ceph_unmount(instance->cmount); + } + } + + existing = NULL; + +found: + proxy_mutex_unlock(&instance_pool.mutex); + + if (err < 0) { + return proxy_log(LOG_ERR, -err, "ceph_mount() failed"); + } + + if (existing != NULL) { + proxy_log(LOG_INFO, 0, "Shared a client instance (%p)", + existing); + *pinstance = existing; + } else { + proxy_log(LOG_INFO, 0, "Created a new client instance (%p)", + instance); + } + + return 0; +} + +static int32_t proxy_instance_unmount(proxy_instance_t **pinstance) +{ + proxy_instance_t *instance, *sibling; + int32_t err; + + instance = *pinstance; + + if (!instance->mounted) { + return proxy_log(LOG_ERR, ENOTCONN, + "Cannot unmount an already unmount instance"); + } + + sibling = NULL; + + proxy_mutex_lock(&instance_pool.mutex); + + if (list_empty(&instance->siblings)) { + /* This is the last mount using this instance. We unmount it. */ + list_del(&instance->list); + instance->mounted = false; + } else { + /* There are other mounts sharing this instance. Take one of the + * saved siblings, which share the exact same configuration but + * are not mounted, to assign it to the current mount. */ + sibling = list_first_entry(&instance->siblings, + proxy_instance_t, list); + list_del_init(&sibling->list); + } + + proxy_mutex_unlock(&instance_pool.mutex); + + if (sibling == NULL) { + ceph_ll_put(instance->cmount, instance->root); + + err = ceph_unmount(instance->cmount); + if (err < 0) { + return proxy_log(LOG_ERR, -err, + "ceph_unmount() failed"); + } + } else { + *pinstance = sibling; + } + + return 0; +} + +int32_t proxy_mount_create(proxy_mount_t **pmount, const char *id) +{ + proxy_mount_t *mount; + int32_t err; + + mount = proxy_malloc(sizeof(proxy_mount_t)); + if (mount == NULL) { + return -ENOMEM; + } + mount->root = NULL; + + err = proxy_instance_create(&mount->instance, id); + if (err < 0) { + proxy_free(mount); + return err; + } + + *pmount = mount; + + return 0; +} + +int32_t proxy_mount_config(proxy_mount_t *mount, const char *config) +{ + return proxy_instance_config(mount->instance, config); +} + +int32_t proxy_mount_set(proxy_mount_t *mount, const char *name, + const char *value) +{ + return proxy_instance_option_set(mount->instance, name, value); +} + +int32_t proxy_mount_get(proxy_mount_t *mount, const char *name, char *value, + size_t size) +{ + return proxy_instance_option_get(mount->instance, name, value, size); +} + +int32_t proxy_mount_select(proxy_mount_t *mount, const char *fs) +{ + return proxy_instance_select(mount->instance, fs); +} + +int32_t proxy_mount_init(proxy_mount_t *mount) +{ + return proxy_instance_init(mount->instance); +} + +int32_t proxy_mount_mount(proxy_mount_t *mount, const char *root) +{ + struct ceph_statx stx; + struct ceph_mount_info *cmount; + int32_t err; + + err = proxy_instance_mount(&mount->instance); + if (err < 0) { + return err; + } + + cmount = proxy_cmount(mount); + + mount->perms = ceph_mount_perms(cmount); + + if (root == NULL) { + root = "/"; + } + + /* Temporarily set the root and cwd inodes to make proxy_path_resolve() + * to work correctly. */ + mount->root = mount->instance->root; + mount->root_ino = CEPH_INO_ROOT; + + mount->cwd = mount->instance->root; + mount->cwd_ino = CEPH_INO_ROOT; + + /* Resolve the desired root directory. */ + err = proxy_path_resolve(mount, root, &mount->root, &stx, + CEPH_STATX_ALL_STATS, 0, mount->perms, NULL); + if (err < 0) { + goto failed; + } + if (!S_ISDIR(stx.stx_mode)) { + err = proxy_log(LOG_ERR, ENOTDIR, + "The root path is not a directory"); + goto failed_root; + } + + mount->cwd_path = proxy_strdup("/"); + if (mount->cwd_path == NULL) { + err = -ENOMEM; + goto failed_root; + } + mount->cwd_path_len = 1; + + mount->root_ino = stx.stx_ino; + + err = proxy_inode_ref(mount, stx.stx_ino); + if (err < 0) { + goto failed_path; + } + + mount->cwd = mount->root; + mount->cwd_ino = stx.stx_ino; + + return 0; + +failed_path: + proxy_free(mount->cwd_path); + +failed_root: + ceph_ll_put(proxy_cmount(mount), mount->root); + +failed: + proxy_instance_unmount(&mount->instance); + + return err; +} + +int32_t proxy_mount_unmount(proxy_mount_t *mount) +{ + ceph_ll_put(proxy_cmount(mount), mount->root); + mount->root = NULL; + mount->root_ino = 0; + + ceph_ll_put(proxy_cmount(mount), mount->cwd); + mount->cwd = NULL; + mount->cwd_ino = 0; + + proxy_free(mount->cwd_path); + + return proxy_instance_unmount(&mount->instance); +} + +int32_t proxy_mount_release(proxy_mount_t *mount) +{ + int32_t err; + + err = proxy_instance_release(mount->instance); + if (err >= 0) { + proxy_free(mount); + } + + return err; +} diff --git a/src/libcephfs_proxy/proxy_mount.h b/src/libcephfs_proxy/proxy_mount.h new file mode 100644 index 00000000000..14bd58fabb2 --- /dev/null +++ b/src/libcephfs_proxy/proxy_mount.h @@ -0,0 +1,64 @@ + +#ifndef __LIBCEPHFSD_PROXY_MOUNT_H__ +#define __LIBCEPHFSD_PROXY_MOUNT_H__ + +#include "proxy.h" +#include "proxy_list.h" + +#include "include/cephfs/libcephfs.h" + +typedef struct _proxy_instance { + uint8_t hash[32]; + list_t list; + list_t siblings; + list_t changes; + struct ceph_mount_info *cmount; + struct Inode *root; + bool inited; + bool mounted; +} proxy_instance_t; + +typedef struct _proxy_mount { + proxy_instance_t *instance; + UserPerm *perms; + struct Inode *root; + struct Inode *cwd; + char *cwd_path; + uint64_t root_ino; + uint64_t cwd_ino; + uint32_t cwd_path_len; +} proxy_mount_t; + +static inline struct ceph_mount_info *proxy_cmount(proxy_mount_t *mount) +{ + return mount->instance->cmount; +} + +int32_t proxy_inode_ref(proxy_mount_t *mount, uint64_t inode); + +int32_t proxy_mount_create(proxy_mount_t **pmount, const char *id); + +int32_t proxy_mount_config(proxy_mount_t *mount, const char *config); + +int32_t proxy_mount_set(proxy_mount_t *mount, const char *name, + const char *value); + +int32_t proxy_mount_get(proxy_mount_t *mount, const char *name, char *value, + size_t size); + +int32_t proxy_mount_select(proxy_mount_t *mount, const char *fs); + +int32_t proxy_mount_init(proxy_mount_t *mount); + +int32_t proxy_mount_mount(proxy_mount_t *mount, const char *root); + +int32_t proxy_mount_unmount(proxy_mount_t *mount); + +int32_t proxy_mount_release(proxy_mount_t *mount); + +int32_t proxy_path_resolve(proxy_mount_t *mount, const char *path, + struct Inode **inode, struct ceph_statx *stx, + uint32_t want, uint32_t flags, UserPerm *perms, + char **realpath); + +#endif diff --git a/src/libcephfs_proxy/proxy_requests.h b/src/libcephfs_proxy/proxy_requests.h new file mode 100644 index 00000000000..4e3739276bb --- /dev/null +++ b/src/libcephfs_proxy/proxy_requests.h @@ -0,0 +1,343 @@ + +#ifndef __LIBCEPHFSD_PROXY_REQUESTS_H__ +#define __LIBCEPHFSD_PROXY_REQUESTS_H__ + +#include "proxy.h" +#include "proxy_link.h" + +/* Macros to add and get data from communication buffers. */ + +#define CEPH_BUFF_ADD(_data, _ptr, _size) \ + do { \ + _data##_iov[_data##_count].iov_base = (void *)(_ptr); \ + _data##_iov[_data##_count].iov_len = (_size); \ + _data##_count++; \ + } while (0) + +#define CEPH_DATA_ADD(_data, _field, _ptr, _size) \ + do { \ + (_data)._field = (_size); \ + CEPH_BUFF_ADD(_data, _ptr, (_data)._field); \ + } while (0) + +#define CEPH_STR_ADD(_data, _field, _str) \ + do { \ + if ((_str) != NULL) { \ + CEPH_DATA_ADD(_data, _field, _str, strlen(_str) + 1); \ + } else { \ + (_data)._field = 0; \ + } \ + } while (0) + +#define CEPH_STR_GET(_data, _field, _ptr) \ + ({ \ + const void *__ptr = (_ptr); \ + if ((_data)._field == 0) { \ + __ptr = NULL; \ + } \ + __ptr; \ + }) + +#define CEPH_DATA(_name, _data, _data_count) \ + proxy_##_name##_##_data##_t _data; \ + struct iovec _data##_iov[_data_count + 1]; \ + int32_t _data##_count = 0; \ + CEPH_BUFF_ADD(_data, &_data, sizeof(_data)) + +#define CEPH_REQ(_name, _req, _req_count, _ans, _ans_count) \ + CEPH_DATA(_name, _req, _req_count); \ + CEPH_DATA(_name, _ans, _ans_count) + +#define CEPH_CALL(_sd, _op, _req, _ans) \ + proxy_link_request((_sd), _op, _req##_iov, _req##_count, _ans##_iov, \ + _ans##_count) + +#define CEPH_RET(_sd, _res, _ans) \ + proxy_link_ans_send((_sd), (_res), _ans##_iov, _ans##_count) + +enum { + LIBCEPHFSD_OP_NULL = 0, + + LIBCEPHFSD_OP_VERSION, + LIBCEPHFSD_OP_USERPERM_NEW, + LIBCEPHFSD_OP_USERPERM_DESTROY, + LIBCEPHFSD_OP_CREATE, + LIBCEPHFSD_OP_RELEASE, + LIBCEPHFSD_OP_CONF_READ_FILE, + LIBCEPHFSD_OP_CONF_GET, + LIBCEPHFSD_OP_CONF_SET, + LIBCEPHFSD_OP_INIT, + LIBCEPHFSD_OP_SELECT_FILESYSTEM, + LIBCEPHFSD_OP_MOUNT, + LIBCEPHFSD_OP_UNMOUNT, + LIBCEPHFSD_OP_LL_STATFS, + LIBCEPHFSD_OP_LL_LOOKUP, + LIBCEPHFSD_OP_LL_LOOKUP_INODE, + LIBCEPHFSD_OP_LL_LOOKUP_ROOT, + LIBCEPHFSD_OP_LL_PUT, + LIBCEPHFSD_OP_LL_WALK, + LIBCEPHFSD_OP_CHDIR, + LIBCEPHFSD_OP_GETCWD, + LIBCEPHFSD_OP_READDIR, + LIBCEPHFSD_OP_REWINDDIR, + LIBCEPHFSD_OP_LL_OPEN, + LIBCEPHFSD_OP_LL_CREATE, + LIBCEPHFSD_OP_LL_MKNOD, + LIBCEPHFSD_OP_LL_CLOSE, + LIBCEPHFSD_OP_LL_RENAME, + LIBCEPHFSD_OP_LL_LSEEK, + LIBCEPHFSD_OP_LL_READ, + LIBCEPHFSD_OP_LL_WRITE, + LIBCEPHFSD_OP_LL_LINK, + LIBCEPHFSD_OP_LL_UNLINK, + LIBCEPHFSD_OP_LL_GETATTR, + LIBCEPHFSD_OP_LL_SETATTR, + LIBCEPHFSD_OP_LL_FALLOCATE, + LIBCEPHFSD_OP_LL_FSYNC, + LIBCEPHFSD_OP_LL_LISTXATTR, + LIBCEPHFSD_OP_LL_GETXATTR, + LIBCEPHFSD_OP_LL_SETXATTR, + LIBCEPHFSD_OP_LL_REMOVEXATTR, + LIBCEPHFSD_OP_LL_READLINK, + LIBCEPHFSD_OP_LL_SYMLINK, + LIBCEPHFSD_OP_LL_OPENDIR, + LIBCEPHFSD_OP_LL_MKDIR, + LIBCEPHFSD_OP_LL_RMDIR, + LIBCEPHFSD_OP_LL_RELEASEDIR, + LIBCEPHFSD_OP_MOUNT_PERMS, + + LIBCEPHFSD_OP_TOTAL_OPS +}; + +#define CEPH_TYPE_REQ(_name, _fields...) \ + struct _proxy_##_name##_req; \ + typedef struct _proxy_##_name##_req proxy_##_name##_req_t; \ + struct _proxy_##_name##_req { \ + _fields \ + } + +#define CEPH_TYPE_ANS(_name, _fields...) \ + struct _proxy_##_name##_ans; \ + typedef struct _proxy_##_name##_ans proxy_##_name##_ans_t; \ + struct _proxy_##_name##_ans { \ + _fields \ + } + +#define FIELDS(_fields...) _fields +#define REQ(_fields...) FIELDS(proxy_link_req_t header; _fields) +#define REQ_CMOUNT(_fields...) REQ(uint64_t cmount; _fields) +#define ANS(_fields...) FIELDS(proxy_link_ans_t header; _fields) +#define ANS_CMOUNT(_fields...) ANS(uint64_t cmount; _fields) + +#define CEPH_TYPE(_name, _req, _ans) \ + CEPH_TYPE_REQ(_name, _req); \ + CEPH_TYPE_ANS(_name, _ans) + +/* Declaration of types used to transder requests and answers. */ + +CEPH_TYPE(hello, FIELDS(uint32_t id;), FIELDS(int16_t major; int16_t minor;)); + +CEPH_TYPE(ceph_version, REQ(), + ANS(int32_t major; int32_t minor; int32_t patch; int16_t text;)); + +CEPH_TYPE(ceph_userperm_new, REQ(uint32_t uid; uint32_t gid; uint32_t groups;), + ANS(uint64_t userperm;)); + +CEPH_TYPE(ceph_userperm_destroy, REQ(uint64_t userperm;), ANS()); + +CEPH_TYPE(ceph_create, REQ(int16_t id;), ANS_CMOUNT()); + +CEPH_TYPE(ceph_release, REQ_CMOUNT(), ANS()); + +CEPH_TYPE(ceph_conf_read_file, REQ_CMOUNT(uint16_t path;), ANS()); + +CEPH_TYPE(ceph_conf_get, REQ_CMOUNT(uint32_t size; uint16_t option;), + ANS(uint16_t value;)); + +CEPH_TYPE(ceph_conf_set, REQ_CMOUNT(uint16_t option; uint16_t value;), ANS()); + +CEPH_TYPE(ceph_init, REQ_CMOUNT(), ANS()); + +CEPH_TYPE(ceph_select_filesystem, REQ_CMOUNT(uint16_t fs;), ANS()); + +CEPH_TYPE(ceph_mount, REQ_CMOUNT(uint16_t root;), ANS()); + +CEPH_TYPE(ceph_unmount, REQ_CMOUNT(), ANS()); + +CEPH_TYPE(ceph_ll_statfs, REQ_CMOUNT(uint64_t inode;), ANS()); + +CEPH_TYPE(ceph_ll_lookup, + REQ_CMOUNT(uint64_t userperm; uint64_t parent; uint32_t want; + uint32_t flags; uint16_t name;), + ANS(uint64_t inode;)); + +CEPH_TYPE(ceph_ll_lookup_inode, REQ_CMOUNT(struct inodeno_t ino;), + ANS(uint64_t inode;)); + +CEPH_TYPE(ceph_ll_lookup_root, REQ_CMOUNT(), ANS(uint64_t inode;)); + +CEPH_TYPE(ceph_ll_put, REQ_CMOUNT(uint64_t inode;), ANS()); + +CEPH_TYPE(ceph_ll_walk, + REQ_CMOUNT(uint64_t userperm; uint32_t want; uint32_t flags; + uint16_t path;), + ANS(uint64_t inode;)); + +CEPH_TYPE(ceph_chdir, REQ_CMOUNT(uint16_t path;), ANS()); + +CEPH_TYPE(ceph_getcwd, REQ_CMOUNT(), ANS(uint16_t path;)); + +CEPH_TYPE(ceph_readdir, REQ_CMOUNT(uint64_t dir;), ANS(bool eod;)); + +CEPH_TYPE(ceph_rewinddir, REQ_CMOUNT(uint64_t dir;), ANS()); + +CEPH_TYPE(ceph_ll_open, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; int32_t flags;), + ANS(uint64_t fh;)); + +CEPH_TYPE(ceph_ll_create, + REQ_CMOUNT(uint64_t userperm; uint64_t parent; mode_t mode; + int32_t oflags; uint32_t want; uint32_t flags; + uint16_t name;), + ANS(uint64_t inode; uint64_t fh;)); + +CEPH_TYPE(ceph_ll_mknod, + REQ_CMOUNT(uint64_t userperm; uint64_t parent; mode_t mode; + dev_t rdev; uint32_t want; uint32_t flags; uint16_t name;), + ANS(uint64_t inode;)); + +CEPH_TYPE(ceph_ll_close, REQ_CMOUNT(uint64_t fh;), ANS()); + +CEPH_TYPE(ceph_ll_rename, + REQ_CMOUNT(uint64_t userperm; uint64_t old_parent; + uint64_t new_parent; uint16_t old_name; + uint16_t new_name;), + ANS()); + +CEPH_TYPE(ceph_ll_lseek, REQ_CMOUNT(uint64_t fh; off_t offset; int32_t whence;), + ANS(off_t offset;)); + +CEPH_TYPE(ceph_ll_read, REQ_CMOUNT(uint64_t fh; int64_t offset; uint64_t len;), + ANS()); + +CEPH_TYPE(ceph_ll_write, REQ_CMOUNT(uint64_t fh; int64_t offset; uint64_t len;), + ANS()); + +CEPH_TYPE(ceph_ll_link, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; uint64_t parent; + uint16_t name;), + ANS()); + +CEPH_TYPE(ceph_ll_unlink, + REQ_CMOUNT(uint64_t userperm; uint64_t parent; uint16_t name;), + ANS()); + +CEPH_TYPE(ceph_ll_getattr, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; uint32_t want; + uint32_t flags;), + ANS()); + +CEPH_TYPE(ceph_ll_setattr, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; int32_t mask;), ANS()); + +CEPH_TYPE(ceph_ll_fallocate, + REQ_CMOUNT(uint64_t fh; int64_t offset; int64_t length; + int32_t mode;), + ANS()); + +CEPH_TYPE(ceph_ll_fsync, REQ_CMOUNT(uint64_t fh; int32_t dataonly;), ANS()); + +CEPH_TYPE(ceph_ll_listxattr, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; size_t size;), + ANS(size_t size;)); + +CEPH_TYPE(ceph_ll_getxattr, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; size_t size; + uint16_t name;), + ANS()); + +CEPH_TYPE(ceph_ll_setxattr, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; size_t size; + int32_t flags; uint16_t name;), + ANS()); + +CEPH_TYPE(ceph_ll_removexattr, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; uint16_t name;), ANS()); + +CEPH_TYPE(ceph_ll_readlink, + REQ_CMOUNT(uint64_t userperm; uint64_t inode; size_t size;), ANS()); + +CEPH_TYPE(ceph_ll_symlink, + REQ_CMOUNT(uint64_t userperm; uint64_t parent; uint32_t want; + uint32_t flags; uint16_t name; uint16_t target;), + ANS(uint64_t inode;)); + +CEPH_TYPE(ceph_ll_opendir, REQ_CMOUNT(uint64_t userperm; uint64_t inode;), + ANS(uint64_t dir;)); + +CEPH_TYPE(ceph_ll_mkdir, + REQ_CMOUNT(uint64_t userperm; uint64_t parent; mode_t mode; + uint32_t want; uint32_t flags; uint16_t name;), + ANS(uint64_t inode;)); + +CEPH_TYPE(ceph_ll_rmdir, + REQ_CMOUNT(uint64_t userperm; uint64_t parent; uint16_t name;), + ANS()); + +CEPH_TYPE(ceph_ll_releasedir, REQ_CMOUNT(uint64_t dir;), ANS()); + +CEPH_TYPE(ceph_mount_perms, REQ_CMOUNT(), ANS(uint64_t userperm;)); + +typedef union _proxy_req { + proxy_link_req_t header; + + proxy_ceph_version_req_t version; + proxy_ceph_userperm_new_req_t userperm_new; + proxy_ceph_userperm_destroy_req_t userperm_destroy; + proxy_ceph_create_req_t create; + proxy_ceph_release_req_t release; + proxy_ceph_conf_read_file_req_t conf_read_file; + proxy_ceph_conf_get_req_t conf_get; + proxy_ceph_conf_set_req_t conf_set; + proxy_ceph_init_req_t init; + proxy_ceph_select_filesystem_req_t select_filesystem; + proxy_ceph_mount_req_t mount; + proxy_ceph_unmount_req_t unmount; + proxy_ceph_ll_statfs_req_t ll_statfs; + proxy_ceph_ll_lookup_req_t ll_lookup; + proxy_ceph_ll_lookup_inode_req_t ll_lookup_inode; + proxy_ceph_ll_lookup_root_req_t ll_lookup_root; + proxy_ceph_ll_put_req_t ll_put; + proxy_ceph_ll_walk_req_t ll_walk; + proxy_ceph_chdir_req_t chdir; + proxy_ceph_getcwd_req_t getcwd; + proxy_ceph_readdir_req_t readdir; + proxy_ceph_rewinddir_req_t rewinddir; + proxy_ceph_ll_open_req_t ll_open; + proxy_ceph_ll_create_req_t ll_create; + proxy_ceph_ll_mknod_req_t ll_mknod; + proxy_ceph_ll_close_req_t ll_close; + proxy_ceph_ll_rename_req_t ll_rename; + proxy_ceph_ll_lseek_req_t ll_lseek; + proxy_ceph_ll_read_req_t ll_read; + proxy_ceph_ll_write_req_t ll_write; + proxy_ceph_ll_link_req_t ll_link; + proxy_ceph_ll_unlink_req_t ll_unlink; + proxy_ceph_ll_getattr_req_t ll_getattr; + proxy_ceph_ll_setattr_req_t ll_setattr; + proxy_ceph_ll_fallocate_req_t ll_fallocate; + proxy_ceph_ll_fsync_req_t ll_fsync; + proxy_ceph_ll_listxattr_req_t ll_listxattr; + proxy_ceph_ll_getxattr_req_t ll_getxattr; + proxy_ceph_ll_setxattr_req_t ll_setxattr; + proxy_ceph_ll_removexattr_req_t ll_removexattr; + proxy_ceph_ll_readlink_req_t ll_readlink; + proxy_ceph_ll_symlink_req_t ll_symlink; + proxy_ceph_ll_opendir_req_t ll_opendir; + proxy_ceph_ll_mkdir_req_t ll_mkdir; + proxy_ceph_ll_rmdir_req_t ll_rmdir; + proxy_ceph_ll_releasedir_req_t ll_releasedir; + proxy_ceph_mount_perms_req_t mount_perms; +} proxy_req_t; + +#endif |