summaryrefslogtreecommitdiffstats
path: root/src/libcephfs_proxy/proxy_link.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libcephfs_proxy/proxy_link.c')
-rw-r--r--src/libcephfs_proxy/proxy_link.c421
1 files changed, 421 insertions, 0 deletions
diff --git a/src/libcephfs_proxy/proxy_link.c b/src/libcephfs_proxy/proxy_link.c
new file mode 100644
index 00000000000..20d9086ffa9
--- /dev/null
+++ b/src/libcephfs_proxy/proxy_link.c
@@ -0,0 +1,421 @@
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/uio.h>
+
+#include "proxy_link.h"
+#include "proxy_manager.h"
+#include "proxy_helpers.h"
+#include "proxy_log.h"
+
+static int32_t iov_length(struct iovec *iov, int32_t count)
+{
+ int32_t len;
+
+ len = 0;
+ while (count > 0) {
+ len += iov->iov_len;
+ iov++;
+ count--;
+ }
+
+ return len;
+}
+
+static int32_t proxy_link_prepare(struct sockaddr_un *addr, const char *path)
+{
+ struct sigaction action;
+ int32_t sd, len, err;
+
+ memset(&action, 0, sizeof(action));
+ action.sa_handler = SIG_IGN;
+ err = proxy_signal_set(SIGPIPE, &action, NULL);
+ if (err < 0) {
+ return err;
+ }
+
+ memset(addr, 0, sizeof(*addr));
+ addr->sun_family = AF_UNIX;
+ len = snprintf(addr->sun_path, sizeof(addr->sun_path), "%s", path);
+ if (len < 0) {
+ return proxy_log(LOG_ERR, EINVAL,
+ "Failed to copy Unix socket path");
+ }
+ if (len >= sizeof(addr->sun_path)) {
+ return proxy_log(LOG_ERR, ENAMETOOLONG,
+ "Unix socket path too long");
+ }
+
+ sd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sd < 0) {
+ return proxy_log(LOG_ERR, errno,
+ "Failed to create a Unix socket");
+ }
+
+ return sd;
+}
+
+int32_t proxy_link_client(proxy_link_t *link, const char *path,
+ proxy_link_stop_t stop)
+{
+ struct sockaddr_un addr;
+ int32_t sd, err;
+
+ link->stop = stop;
+ link->sd = -1;
+
+ sd = proxy_link_prepare(&addr, path);
+ if (sd < 0) {
+ return sd;
+ }
+
+ err = 0;
+ while (err >= 0) {
+ if (connect(sd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ err = proxy_log(LOG_ERR, errno,
+ "Failed to connect to libcephfsd");
+ } else {
+ link->sd = sd;
+ return sd;
+ }
+ }
+
+ close(sd);
+
+ return err;
+}
+
+void proxy_link_close(proxy_link_t *link)
+{
+ close(link->sd);
+ link->sd = -1;
+}
+
+int32_t proxy_link_server(proxy_link_t *link, const char *path,
+ proxy_link_start_t start, proxy_link_stop_t stop)
+{
+ struct sockaddr_un addr;
+ socklen_t len;
+ int32_t cd, err;
+
+ link->stop = stop;
+ link->sd = -1;
+
+ err = proxy_link_prepare(&addr, path);
+ if (err < 0) {
+ return err;
+ }
+ link->sd = err;
+
+ if ((unlink(path) < 0) && (errno != ENOENT) && (errno != ENOTDIR)) {
+ err = proxy_log(LOG_ERR, errno,
+ "Failed to remove existing socket");
+ goto done;
+ }
+
+ if (bind(link->sd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+ err = proxy_log(LOG_ERR, errno, "Failed to bind Unix socket");
+ goto done;
+ }
+
+ if (listen(link->sd, SOMAXCONN) < 0) {
+ err = proxy_log(LOG_ERR, errno,
+ "Failed to listen from Unix socket");
+ goto done;
+ }
+
+ while (!stop(link)) {
+ len = sizeof(addr);
+ cd = accept(link->sd, (struct sockaddr *)&addr, &len);
+ if (cd < 0) {
+ if (errno != EINTR) {
+ proxy_log(LOG_ERR, errno,
+ "Failed to accept a connection");
+ }
+ } else {
+ start(link, cd);
+ }
+ }
+
+ err = 0;
+
+done:
+ close(link->sd);
+
+ return err;
+}
+
+int32_t proxy_link_read(proxy_link_t *link, int32_t sd, void *buffer,
+ int32_t size)
+{
+ ssize_t len;
+
+ do {
+ len = read(sd, buffer, size);
+ if (len < 0) {
+ if (errno == EINTR) {
+ if (link->stop(link)) {
+ return -EINTR;
+ }
+ continue;
+ }
+ return proxy_log(LOG_ERR, errno,
+ "Failed to read from socket");
+ }
+ } while (len < 0);
+
+ return len;
+}
+
+int32_t proxy_link_write(proxy_link_t *link, int32_t sd, void *buffer,
+ int32_t size)
+{
+ ssize_t len;
+ int32_t total;
+
+ total = size;
+ while (total > 0) {
+ len = write(sd, buffer, total);
+ if (len < 0) {
+ if (errno == EINTR) {
+ if (link->stop(link)) {
+ return -EINTR;
+ }
+ continue;
+ }
+ return proxy_log(LOG_ERR, errno,
+ "Failed to write to socket");
+ }
+ if (len == 0) {
+ return proxy_log(LOG_ERR, ENOBUFS,
+ "No data written to socket");
+ }
+
+ buffer += len;
+ total -= len;
+ }
+
+ return size;
+}
+
+int32_t proxy_link_send(int32_t sd, struct iovec *iov, int32_t count)
+{
+ struct iovec iov_copy[count];
+ ssize_t len;
+ int32_t total;
+
+ memcpy(iov_copy, iov, sizeof(struct iovec) * count);
+ iov = iov_copy;
+
+ total = 0;
+ while (count > 0) {
+ len = writev(sd, iov, count);
+ if (len < 0) {
+ return proxy_log(LOG_ERR, errno, "Failed to send data");
+ }
+ if (len == 0) {
+ return proxy_log(LOG_ERR, ENOBUFS, "Partial write");
+ }
+ total += len;
+
+ while ((count > 0) && (iov->iov_len <= len)) {
+ len -= iov->iov_len;
+ iov++;
+ count--;
+ }
+
+ if (count > 0) {
+ iov->iov_base += len;
+ iov->iov_len -= len;
+ }
+ }
+
+ return total;
+}
+
+int32_t proxy_link_recv(int32_t sd, struct iovec *iov, int32_t count)
+{
+ struct iovec iov_copy[count];
+ ssize_t len;
+ int32_t total;
+
+ memcpy(iov_copy, iov, sizeof(struct iovec) * count);
+ iov = iov_copy;
+
+ total = 0;
+ while (count > 0) {
+ len = readv(sd, iov, count);
+ if (len < 0) {
+ return proxy_log(LOG_ERR, errno,
+ "Failed to receive data");
+ }
+ if (len == 0) {
+ return proxy_log(LOG_ERR, ENODATA, "Partial read");
+ }
+ total += len;
+
+ while ((count > 0) && (iov->iov_len <= len)) {
+ len -= iov->iov_len;
+ iov++;
+ count--;
+ }
+
+ if (count > 0) {
+ iov->iov_base += len;
+ iov->iov_len -= len;
+ }
+ }
+
+ return total;
+}
+
+int32_t proxy_link_req_send(int32_t sd, int32_t op, struct iovec *iov,
+ int32_t count)
+{
+ proxy_link_req_t *req;
+
+ req = iov[0].iov_base;
+
+ req->header_len = iov[0].iov_len;
+ req->op = op;
+ req->data_len = iov_length(iov + 1, count - 1);
+
+ return proxy_link_send(sd, iov, count);
+}
+
+int32_t proxy_link_req_recv(int32_t sd, struct iovec *iov, int32_t count)
+{
+ proxy_link_req_t *req;
+ void *buffer;
+ int32_t err, len, total;
+
+ len = iov->iov_len;
+ iov->iov_len = sizeof(proxy_link_req_t);
+ err = proxy_link_recv(sd, iov, 1);
+ if (err < 0) {
+ return err;
+ }
+ total = err;
+
+ req = iov->iov_base;
+
+ if (req->data_len > 0) {
+ if (count == 1) {
+ return proxy_log(LOG_ERR, ENOBUFS,
+ "Request data is too long");
+ }
+ if (iov[1].iov_len < req->data_len) {
+ buffer = proxy_malloc(req->data_len);
+ if (buffer == NULL) {
+ return -ENOMEM;
+ }
+ iov[1].iov_base = buffer;
+ }
+ iov[1].iov_len = req->data_len;
+ } else {
+ count = 1;
+ }
+
+ if (req->header_len > sizeof(proxy_link_req_t)) {
+ if (len < req->header_len) {
+ return proxy_log(LOG_ERR, ENOBUFS,
+ "Request is too long");
+ }
+ iov->iov_base += sizeof(proxy_link_req_t);
+ iov->iov_len = req->header_len - sizeof(proxy_link_req_t);
+ } else {
+ iov++;
+ count--;
+ if (count == 0) {
+ return total;
+ }
+ }
+
+ err = proxy_link_recv(sd, iov, count);
+ if (err < 0) {
+ return err;
+ }
+
+ return total + err;
+}
+
+int32_t proxy_link_ans_send(int32_t sd, int32_t result, struct iovec *iov,
+ int32_t count)
+{
+ proxy_link_ans_t *ans;
+
+ ans = iov->iov_base;
+
+ ans->header_len = iov->iov_len;
+ ans->flags = 0;
+ ans->result = result;
+ ans->data_len = iov_length(iov + 1, count - 1);
+
+ return proxy_link_send(sd, iov, count);
+}
+
+int32_t proxy_link_ans_recv(int32_t sd, struct iovec *iov, int32_t count)
+{
+ proxy_link_ans_t *ans;
+ int32_t err, len, total;
+
+ len = iov->iov_len;
+ iov->iov_len = sizeof(proxy_link_ans_t);
+ err = proxy_link_recv(sd, iov, 1);
+ if (err < 0) {
+ return err;
+ }
+ total = err;
+
+ ans = iov->iov_base;
+
+ if (ans->data_len > 0) {
+ if ((count == 1) || (iov[1].iov_len < ans->data_len)) {
+ return proxy_log(LOG_ERR, ENOBUFS,
+ "Answer data is too long");
+ }
+ iov[1].iov_len = ans->data_len;
+ } else {
+ count = 1;
+ }
+
+ if (ans->header_len > sizeof(proxy_link_ans_t)) {
+ if (len < ans->header_len) {
+ return proxy_log(LOG_ERR, ENOBUFS,
+ "Answer is too long");
+ }
+ iov->iov_base += sizeof(proxy_link_ans_t);
+ iov->iov_len = ans->header_len - sizeof(proxy_link_ans_t);
+ } else {
+ iov++;
+ count--;
+ if (count == 0) {
+ return total;
+ }
+ }
+
+ err = proxy_link_recv(sd, iov, count);
+ if (err < 0) {
+ return err;
+ }
+
+ return total + err;
+}
+
+int32_t proxy_link_request(int32_t sd, int32_t op, struct iovec *req_iov,
+ int32_t req_count, struct iovec *ans_iov,
+ int32_t ans_count)
+{
+ int32_t err;
+
+ err = proxy_link_req_send(sd, op, req_iov, req_count);
+ if (err < 0) {
+ return err;
+ }
+
+ return proxy_link_ans_recv(sd, ans_iov, ans_count);
+}