diff options
author | David Lamparter <equinox@opensourcerouting.org> | 2017-08-24 18:13:31 +0200 |
---|---|---|
committer | David Lamparter <equinox@opensourcerouting.org> | 2017-08-28 18:00:13 +0200 |
commit | a0b974def7e6c76d4e4c31ee93bc33f67c57c611 (patch) | |
tree | f22ff65c0e6073748e3315263ba30609b53ff634 /tests/lib/test_zmq.c | |
parent | lib: ZeroMQ read handler, v2 (diff) | |
download | frr-a0b974def7e6c76d4e4c31ee93bc33f67c57c611.tar.xz frr-a0b974def7e6c76d4e4c31ee93bc33f67c57c611.zip |
tests: add ZeroMQ test
Signed-off-by: David Lamparter <equinox@opensourcerouting.org>
Diffstat (limited to '')
-rw-r--r-- | tests/lib/test_zmq.c | 212 |
1 files changed, 212 insertions, 0 deletions
diff --git a/tests/lib/test_zmq.c b/tests/lib/test_zmq.c new file mode 100644 index 000000000..c270ec3d1 --- /dev/null +++ b/tests/lib/test_zmq.c @@ -0,0 +1,212 @@ +/* + * ZeroMQ event test + * Copyright (C) 2017 David Lamparter, for NetDEF, Inc. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the License, or (at your option) + * any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; see the file COPYING; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include <zebra.h> +#include "memory.h" +#include "sigevent.h" +#include "frr_zmq.h" + +DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer") + +static struct thread_master *master; + +static void msg_buf_free(void *data, void *hint) +{ + XFREE(MTYPE_TESTBUF, data); +} + +static void run_client(int syncfd) +{ + int i, j; + char buf[32]; + char dummy; + void *zmqctx = NULL; + void *zmqsock; + + read(syncfd, &dummy, 1); + + zmqctx = zmq_ctx_new(); + zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); + + zmqsock = zmq_socket(zmqctx, ZMQ_REQ); + if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { + perror("zmq_connect"); + exit(1); + } + + /* single-part */ + for (i = 0; i < 8; i++) { + snprintf(buf, sizeof(buf), "msg #%d %c%c%c", + i, 'a' + i, 'b' + i, 'c' + i); + printf("client send: %s\n", buf); + fflush(stdout); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("client recv: %s\n", buf); + } + + /* multipart */ + for (i = 2; i < 5; i++) { + int more; + + printf("---\n"); + for (j = 1; j <= i; j++) { + zmq_msg_t part; + char *dyn = XMALLOC(MTYPE_TESTBUF, 32); + + snprintf(dyn, 32, "part %d/%d", j, i); + printf("client send: %s\n", dyn); + fflush(stdout); + + zmq_msg_init_data(&part, dyn, strlen(dyn) + 1, + msg_buf_free, NULL); + zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); + } + + zmq_msg_t part; + do { + char *data; + + zmq_msg_recv(&part, zmqsock, 0); + data = zmq_msg_data(&part); + more = zmq_msg_more(&part); + printf("client recv (more: %d): %s\n", more, data); + } while (more); + zmq_msg_close(&part); + } + zmq_close(zmqsock); + zmq_ctx_term(zmqctx); +} + +static struct frrzmq_cb *cb; + +static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, + unsigned partnum) +{ + int more = zmq_msg_more(msg); + char *in = zmq_msg_data(msg); + size_t i; + zmq_msg_t reply; + char *out; + + printf("server recv part %u (more: %d): %s\n", partnum, more, in); + fflush(stdout); + /* REQ-REP doesn't allow sending a reply here */ + if (more) + return; + + out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); + for (i = 0; i < strlen(in); i++) + out[i] = toupper(in[i]); + out[i] = '\0'; + zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); + zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); + + out = XMALLOC(MTYPE_TESTBUF, 32); + snprintf(out, 32, "msg# was %u", partnum); + zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); + zmq_msg_send(&reply, zmqsock, 0); +} + +static void serverfn(void *arg, void *zmqsock) +{ + static int num = 0; + + char buf[32]; + size_t i; + zmq_recv(zmqsock, buf, sizeof(buf), 0); + + printf("server recv: %s\n", buf); + fflush(stdout); + for (i = 0; i < strlen(buf); i++) + buf[i] = toupper(buf[i]); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + + if (++num < 4) + return; + + /* change to multipart callback */ + frrzmq_thread_cancel(cb); + + cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock); +} + +static void sigchld(void) +{ + printf("child exited.\n"); + frrzmq_thread_cancel(cb); +} + +static struct quagga_signal_t sigs[] = { + { + .signal = SIGCHLD, + .handler = sigchld, + }, +}; + +static void run_server(int syncfd) +{ + void *zmqsock; + char dummy = 0; + struct thread t; + + master = thread_master_create(NULL); + signal_init(master, array_size(sigs), sigs); + frrzmq_init(); + + zmqsock = zmq_socket(frrzmq_context, ZMQ_REP); + if (zmq_bind(zmqsock, "tcp://*:17171")) { + perror("zmq_bind"); + exit(1); + } + + cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock); + + write(syncfd, &dummy, sizeof(dummy)); + while (thread_fetch(master, &t)) + thread_call(&t); + + zmq_close(zmqsock); + frrzmq_finish(); + thread_master_free(master); + log_memstats_stderr("test"); +} + +int main(void) +{ + int syncpipe[2]; + pid_t child; + + if (pipe(syncpipe)) { + perror("pipe"); + exit(1); + } + + child = fork(); + if (child < 0) { + perror("fork"); + exit(1); + } else if (child == 0) { + run_client(syncpipe[0]); + exit(0); + } + + run_server(syncpipe[1]); + exit(0); +} |