diff options
author | Casey Bodley <cbodley@redhat.com> | 2016-03-13 21:38:20 +0100 |
---|---|---|
committer | Casey Bodley <cbodley@redhat.com> | 2017-04-25 16:37:50 +0200 |
commit | c293b5925d9e0ecb28505de09b43f29c7c57fa8f (patch) | |
tree | 7ff84310d9cbdacdf3f16af24e434476a0223935 | |
parent | Merge pull request #14741 from trociny/wip-19405 (diff) | |
download | ceph-c293b5925d9e0ecb28505de09b43f29c7c57fa8f.tar.xz ceph-c293b5925d9e0ecb28505de09b43f29c7c57fa8f.zip |
core: introduce DirectMessenger
DirectMessenger provides an efficient mechanism to support
in-process embedding of Ceph components (e.g., embedding of the
Ceph OSD in storage targets such as NFSv4 or iSCSI targets).
Signed-off-by: Casey Bodley <cbodley@redhat.com>
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
-rw-r--r-- | src/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/msg/DispatchStrategy.h (renamed from src/msg/xio/DispatchStrategy.h) | 0 | ||||
-rw-r--r-- | src/msg/FastStrategy.h (renamed from src/msg/xio/FastStrategy.h) | 0 | ||||
-rw-r--r-- | src/msg/QueueStrategy.cc (renamed from src/msg/xio/QueueStrategy.cc) | 0 | ||||
-rw-r--r-- | src/msg/QueueStrategy.h (renamed from src/msg/xio/QueueStrategy.h) | 0 | ||||
-rw-r--r-- | src/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/test/direct_messenger/CMakeLists.txt | 4 | ||||
-rw-r--r-- | src/test/direct_messenger/DirectMessenger.cc | 252 | ||||
-rw-r--r-- | src/test/direct_messenger/DirectMessenger.h | 97 | ||||
-rw-r--r-- | src/test/direct_messenger/test_direct_messenger.cc | 437 | ||||
-rw-r--r-- | src/test/messenger/CMakeLists.txt | 1 |
11 files changed, 793 insertions, 3 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f5e90761586..77b647bf6f6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -339,8 +339,7 @@ if(HAVE_XIO) msg/xio/XioMsg.cc msg/xio/XioPool.cc msg/xio/XioMessenger.cc - msg/xio/XioPortal.cc - msg/xio/QueueStrategy.cc) + msg/xio/XioPortal.cc) endif(HAVE_XIO) set(async_rdma_common_srcs) @@ -448,6 +447,7 @@ set(libcommon_files msg/async/Stack.cc msg/async/PosixStack.cc msg/async/net_handler.cc + msg/QueueStrategy.cc ${xio_common_srcs} ${async_rdma_common_srcs} ${dpdk_common_srcs} diff --git a/src/msg/xio/DispatchStrategy.h b/src/msg/DispatchStrategy.h index 44d63d47368..44d63d47368 100644 --- a/src/msg/xio/DispatchStrategy.h +++ b/src/msg/DispatchStrategy.h diff --git a/src/msg/xio/FastStrategy.h b/src/msg/FastStrategy.h index 001ff40045f..001ff40045f 100644 --- a/src/msg/xio/FastStrategy.h +++ b/src/msg/FastStrategy.h diff --git a/src/msg/xio/QueueStrategy.cc b/src/msg/QueueStrategy.cc index 0ce279b31e9..0ce279b31e9 100644 --- a/src/msg/xio/QueueStrategy.cc +++ b/src/msg/QueueStrategy.cc diff --git a/src/msg/xio/QueueStrategy.h b/src/msg/QueueStrategy.h index 41f28bb9e71..41f28bb9e71 100644 --- a/src/msg/xio/QueueStrategy.h +++ b/src/msg/QueueStrategy.h diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 378a1563b05..4da3ef05922 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -24,6 +24,7 @@ add_subdirectory(cls_lua) add_subdirectory(common) add_subdirectory(compressor) add_subdirectory(crush) +add_subdirectory(direct_messenger) add_subdirectory(encoding) add_subdirectory(erasure-code) add_subdirectory(filestore) diff --git a/src/test/direct_messenger/CMakeLists.txt b/src/test/direct_messenger/CMakeLists.txt new file mode 100644 index 00000000000..6776cae2250 --- /dev/null +++ b/src/test/direct_messenger/CMakeLists.txt @@ -0,0 +1,4 @@ +# unittest_direct_messenger +add_executable(unittest_direct_messenger test_direct_messenger.cc DirectMessenger.cc) +add_ceph_unittest(unittest_direct_messenger ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_direct_messenger) +target_link_libraries(unittest_direct_messenger global) diff --git a/src/test/direct_messenger/DirectMessenger.cc b/src/test/direct_messenger/DirectMessenger.cc new file mode 100644 index 00000000000..ea6439e18d3 --- /dev/null +++ b/src/test/direct_messenger/DirectMessenger.cc @@ -0,0 +1,252 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "DirectMessenger.h" +#include "msg/DispatchStrategy.h" + + +class DirectConnection : public Connection { + /// sent messages are dispatched here + DispatchStrategy *const dispatchers; + + /// the connection that will be attached to outgoing messages, so that replies + /// can be dispatched back to the sender. the pointer is atomic for + /// thread-safety between mark_down() and send_message(). no reference is held + /// on this Connection to avoid cyclical refs. we don't need a reference + /// because its owning DirectMessenger will mark both connections down (and + /// clear this pointer) before dropping its own reference + std::atomic<Connection*> reply_connection{nullptr}; + + public: + DirectConnection(CephContext *cct, DirectMessenger *m, + DispatchStrategy *dispatchers) + : Connection(cct, m), + dispatchers(dispatchers) + {} + + /// sets the Connection that will receive replies to outgoing messages + void set_direct_reply_connection(ConnectionRef conn); + + /// return true if a peer connection exists + bool is_connected() override; + + /// pass the given message directly to our dispatchers + int send_message(Message *m) override; + + /// release our pointer to the peer connection. later calls to is_connected() + /// will return false, and send_message() will fail with -ENOTCONN + void mark_down() override; + + /// noop - keepalive messages are not needed within a process + void send_keepalive() override {} + + /// noop - reconnect/recovery semantics are not needed within a process + void mark_disposable() override {} +}; + +void DirectConnection::set_direct_reply_connection(ConnectionRef conn) +{ + reply_connection.store(conn.get()); +} + +bool DirectConnection::is_connected() +{ + // true between calls to set_direct_reply_connection() and mark_down() + return reply_connection.load() != nullptr; +} + +int DirectConnection::send_message(Message *m) +{ + // read reply_connection atomically and take a reference + ConnectionRef conn = reply_connection.load(); + if (!conn) { + m->put(); + return -ENOTCONN; + } + // attach reply_connection to the Message, so that calls to + // m->get_connection()->send_message() can be dispatched back to the sender + m->set_connection(conn); + + dispatchers->ds_dispatch(m); + return 0; +} + +void DirectConnection::mark_down() +{ + Connection *conn = reply_connection.load(); + if (!conn) { + return; // already marked down + } + if (!reply_connection.compare_exchange_weak(conn, nullptr)) { + return; // lost the race to mark down + } + // called only once to avoid loops + conn->mark_down(); +} + + +static ConnectionRef create_loopback(DirectMessenger *m, + entity_name_t name, + DispatchStrategy *dispatchers) +{ + auto loopback = boost::intrusive_ptr<DirectConnection>( + new DirectConnection(m->cct, m, dispatchers)); + // loopback replies go to itself + loopback->set_direct_reply_connection(loopback); + loopback->set_peer_type(name.type()); + loopback->set_features(CEPH_FEATURES_ALL); + return loopback; +} + +DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + DispatchStrategy *dispatchers) + : SimplePolicyMessenger(cct, name, mname, nonce), + dispatchers(dispatchers), + loopback_connection(create_loopback(this, name, dispatchers)) +{ + dispatchers->set_messenger(this); +} + +DirectMessenger::~DirectMessenger() +{ +} + +int DirectMessenger::set_direct_peer(DirectMessenger *peer) +{ + if (get_myinst() == peer->get_myinst()) { + return -EADDRINUSE; // must have a different entity instance + } + peer_inst = peer->get_myinst(); + + // allocate a Connection that dispatches to the peer messenger + auto direct_connection = boost::intrusive_ptr<DirectConnection>( + new DirectConnection(cct, peer, peer->dispatchers.get())); + + direct_connection->set_peer_addr(peer_inst.addr); + direct_connection->set_peer_type(peer_inst.name.type()); + direct_connection->set_features(CEPH_FEATURES_ALL); + + // if set_direct_peer() was already called on the peer messenger, we can + // finish by attaching their connections. if not, the later call to + // peer->set_direct_peer() will attach their connection to ours + auto connection = peer->get_connection(get_myinst()); + if (connection) { + auto p = static_cast<DirectConnection*>(connection.get()); + + p->set_direct_reply_connection(direct_connection); + direct_connection->set_direct_reply_connection(p); + } + + peer_connection = std::move(direct_connection); + return 0; +} + +int DirectMessenger::bind(const entity_addr_t &bind_addr) +{ + if (peer_connection) { + return -EINVAL; // can't change address after sharing it with the peer + } + set_myaddr(bind_addr); + loopback_connection->set_peer_addr(bind_addr); + return 0; +} + +int DirectMessenger::client_bind(const entity_addr_t &bind_addr) +{ + // same as bind + return bind(bind_addr); +} + +int DirectMessenger::start() +{ + if (!peer_connection) { + return -EINVAL; // did not connect to a peer + } + if (started) { + return -EINVAL; // already started + } + + dispatchers->start(); + return SimplePolicyMessenger::start(); +} + +int DirectMessenger::shutdown() +{ + if (!started) { + return -EINVAL; // not started + } + + mark_down_all(); + peer_connection.reset(); + loopback_connection.reset(); + + dispatchers->shutdown(); + SimplePolicyMessenger::shutdown(); + sem.Put(); // signal wait() + return 0; +} + +void DirectMessenger::wait() +{ + sem.Get(); // wait on signal from shutdown() + dispatchers->wait(); +} + +ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst) +{ + if (dst == peer_inst) { + return peer_connection; + } + if (dst == get_myinst()) { + return loopback_connection; + } + return nullptr; +} + +ConnectionRef DirectMessenger::get_loopback_connection() +{ + return loopback_connection; +} + +int DirectMessenger::send_message(Message *m, const entity_inst_t& dst) +{ + auto conn = get_connection(dst); + if (!conn) { + m->put(); + return -ENOTCONN; + } + return conn->send_message(m); +} + +void DirectMessenger::mark_down(const entity_addr_t& addr) +{ + ConnectionRef conn; + if (addr == peer_inst.addr) { + conn = peer_connection; + } else if (addr == get_myaddr()) { + conn = loopback_connection; + } + if (conn) { + conn->mark_down(); + } +} + +void DirectMessenger::mark_down_all() +{ + if (peer_connection) { + peer_connection->mark_down(); + } + loopback_connection->mark_down(); +} diff --git a/src/test/direct_messenger/DirectMessenger.h b/src/test/direct_messenger/DirectMessenger.h new file mode 100644 index 00000000000..dd9d39ed9d3 --- /dev/null +++ b/src/test/direct_messenger/DirectMessenger.h @@ -0,0 +1,97 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_DIRECTMESSENGER_H +#define CEPH_MSG_DIRECTMESSENGER_H + +#include "msg/SimplePolicyMessenger.h" +#include "common/Semaphore.h" + + +class DispatchStrategy; + +/** + * DirectMessenger provides a direct path between two messengers + * within a process. A pair of DirectMessengers share their + * DispatchStrategy with each other, and calls to send_message() + * forward the message directly to the other. + * + * This is for testing and i/o injection only, and cannot be used + * for normal messengers with ms_type. + */ +class DirectMessenger : public SimplePolicyMessenger { + private: + /// strategy for local dispatch + std::unique_ptr<DispatchStrategy> dispatchers; + /// peer instance for comparison in get_connection() + entity_inst_t peer_inst; + /// connection that sends to the peer's dispatchers + ConnectionRef peer_connection; + /// connection that sends to my own dispatchers + ConnectionRef loopback_connection; + /// semaphore for signalling wait() from shutdown() + Semaphore sem; + + public: + DirectMessenger(CephContext *cct, entity_name_t name, + string mname, uint64_t nonce, + DispatchStrategy *dispatchers); + ~DirectMessenger(); + + /// attach to a peer messenger. must be called before start() + int set_direct_peer(DirectMessenger *peer); + + + // Messenger interface + + /// sets the addr. must not be called after set_direct_peer() or start() + int bind(const entity_addr_t& bind_addr) override; + + /// sets the addr. must not be called after set_direct_peer() or start() + int client_bind(const entity_addr_t& bind_addr) override; + + /// starts dispatchers + int start() override; + + /// breaks connections, stops dispatchers, and unblocks callers of wait() + int shutdown() override; + + /// blocks until shutdown() completes + void wait() override; + + /// returns a connection to the peer instance, a loopback connection to our + /// own instance, or null if not connected + ConnectionRef get_connection(const entity_inst_t& dst) override; + + /// returns a loopback connection that dispatches to this messenger + ConnectionRef get_loopback_connection() override; + + /// dispatches a message to the peer instance if connected + int send_message(Message *m, const entity_inst_t& dst) override; + + /// mark down the connection for the given address + void mark_down(const entity_addr_t& a) override; + + /// mark down all connections + void mark_down_all() override; + + + // unimplemented Messenger interface + void set_addr_unknowns(const entity_addr_t &addr) override {} + int get_dispatch_queue_len() override { return 0; } + double get_dispatch_queue_max_age(utime_t now) override { return 0; } + void set_cluster_protocol(int p) override {} +}; + +#endif diff --git a/src/test/direct_messenger/test_direct_messenger.cc b/src/test/direct_messenger/test_direct_messenger.cc new file mode 100644 index 00000000000..0540baa48cd --- /dev/null +++ b/src/test/direct_messenger/test_direct_messenger.cc @@ -0,0 +1,437 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <condition_variable> +#include <mutex> +#include <thread> + +#include <gtest/gtest.h> + +#include "global/global_init.h" +#include "common/ceph_argparse.h" + +#include "DirectMessenger.h" +#include "msg/FastStrategy.h" +#include "msg/QueueStrategy.h" +#include "messages/MPing.h" + + +/// mock dispatcher that calls the given callback +class MockDispatcher : public Dispatcher { + std::function<void(Message*)> callback; + public: + MockDispatcher(CephContext *cct, std::function<void(Message*)> callback) + : Dispatcher(cct), callback(std::move(callback)) {} + bool ms_handle_reset(Connection *con) override { return false; } + void ms_handle_remote_reset(Connection *con) override {} + bool ms_handle_refused(Connection *con) override { return false; } + bool ms_dispatch(Message *m) override { + callback(m); + m->put(); + return true; + } +}; + +/// test synchronous dispatch of messenger and connection interfaces +TEST(DirectMessenger, SyncDispatch) +{ + auto cct = g_ceph_context; + + // use FastStrategy for synchronous dispatch + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + bool got_request = false; + bool got_reply = false; + + MockDispatcher client_dispatcher(cct, [&] (Message *m) { + got_reply = true; + }); + client.add_dispatcher_head(&client_dispatcher); + + MockDispatcher server_dispatcher(cct, [&] (Message *m) { + got_request = true; + ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); + }); + server.add_dispatcher_head(&server_dispatcher); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + // test DirectMessenger::send_message() + ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); + ASSERT_TRUE(got_request); + ASSERT_TRUE(got_reply); + + // test DirectConnection::send_message() + { + got_request = false; + got_reply = false; + auto conn = client.get_connection(server.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_TRUE(got_request); + ASSERT_TRUE(got_reply); + } + + // test DirectMessenger::send_message() with loopback address + got_request = false; + got_reply = false; + ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + + // test DirectConnection::send_message() with loopback address + { + got_request = false; + got_reply = false; + auto conn = client.get_connection(client.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + } + + // test DirectConnection::send_message() with loopback connection + { + got_request = false; + got_reply = false; + auto conn = client.get_loopback_connection(); + ASSERT_EQ(0, conn->send_message(new MPing())); + ASSERT_FALSE(got_request); // server should never see this + ASSERT_TRUE(got_reply); + } + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test asynchronous dispatch of messenger and connection interfaces +TEST(DirectMessenger, AsyncDispatch) +{ + auto cct = g_ceph_context; + + // use QueueStrategy for async replies + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new QueueStrategy(1)); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + // condition variable to wait on ping reply + std::mutex mutex; + std::condition_variable cond; + bool done = false; + + auto wait_for_reply = [&] { + std::unique_lock<std::mutex> lock(mutex); + while (!done) { + cond.wait(lock); + } + done = false; // clear for reuse + }; + + // client dispatcher signals the condition variable on reply + MockDispatcher client_dispatcher(cct, [&] (Message *m) { + std::lock_guard<std::mutex> lock(mutex); + done = true; + cond.notify_one(); + }); + client.add_dispatcher_head(&client_dispatcher); + + MockDispatcher server_dispatcher(cct, [&] (Message *m) { + // hold the lock over the call to send_message() to prove that the client's + // dispatch is asynchronous. if it isn't, it will deadlock + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, m->get_connection()->send_message(new MPing())); + }); + server.add_dispatcher_head(&server_dispatcher); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + // test DirectMessenger::send_message() + ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst())); + wait_for_reply(); + + // test DirectConnection::send_message() + { + auto conn = client.get_connection(server.get_myinst()); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + // test DirectMessenger::send_message() with loopback address + { + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst())); + } + wait_for_reply(); + + // test DirectConnection::send_message() with loopback address + { + auto conn = client.get_connection(client.get_myinst()); + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + // test DirectConnection::send_message() with loopback connection + { + auto conn = client.get_loopback_connection(); + // hold the lock to test that loopback dispatch is asynchronous + std::lock_guard<std::mutex> lock(mutex); + ASSERT_EQ(0, conn->send_message(new MPing())); + } + wait_for_reply(); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test that wait() blocks until shutdown() +TEST(DirectMessenger, WaitShutdown) +{ + auto cct = g_ceph_context; + + // test wait() with both Queue- and FastStrategy + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new QueueStrategy(1)); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + std::atomic<bool> client_waiting{false}; + std::atomic<bool> server_waiting{false}; + + // spawn threads to wait() on each of the messengers + std::thread client_thread([&] { + client_waiting = true; + client.wait(); + client_waiting = false; + }); + std::thread server_thread([&] { + server_waiting = true; + server.wait(); + server_waiting = false; + }); + + // give them time to start + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + ASSERT_TRUE(client_waiting); + ASSERT_TRUE(server_waiting); + + // call shutdown to unblock the waiting threads + ASSERT_EQ(0, client.shutdown()); + ASSERT_EQ(0, server.shutdown()); + + client_thread.join(); + server_thread.join(); + + ASSERT_FALSE(client_waiting); + ASSERT_FALSE(server_waiting); +} + +/// test connection and messenger interfaces after mark_down() +TEST(DirectMessenger, MarkDown) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + auto client_to_server = client.get_connection(server.get_myinst()); + auto server_to_client = server.get_connection(client.get_myinst()); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // mark_down() breaks the connection on both sides + client_to_server->mark_down(); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); + + ASSERT_FALSE(server_to_client->is_connected()); + ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst())); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces after shutdown() +TEST(DirectMessenger, SendShutdown) +{ + auto cct = g_ceph_context; + + // put client on the heap so we can free it early + std::unique_ptr<DirectMessenger> client{ + new DirectMessenger(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy())}; + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + ASSERT_EQ(0, client->set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(client.get())); + + ASSERT_EQ(0, client->start()); + ASSERT_EQ(0, server.start()); + + const auto client_inst = client->get_myinst(); + const auto server_inst = server.get_myinst(); + + auto client_to_server = client->get_connection(server_inst); + auto server_to_client = server.get_connection(client_inst); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // shut down the client to break connections + ASSERT_EQ(0, client->shutdown()); + client->wait(); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst)); + + // free the client connection/messenger to test that calls to the server no + // longer try to dereference them + client_to_server.reset(); + client.reset(); + + ASSERT_FALSE(server_to_client->is_connected()); + ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst)); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces after bind() +TEST(DirectMessenger, Bind) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + entity_addr_t client_addr; + client_addr.set_family(AF_INET); + client_addr.set_port(1); + + // client bind succeeds before set_direct_peer() + ASSERT_EQ(0, client.bind(client_addr)); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + ASSERT_EQ(0, server.set_direct_peer(&client)); + + // server bind fails after set_direct_peer() + entity_addr_t empty_addr; + ASSERT_EQ(-EINVAL, server.bind(empty_addr)); + + ASSERT_EQ(0, client.start()); + ASSERT_EQ(0, server.start()); + + auto client_to_server = client.get_connection(server.get_myinst()); + auto server_to_client = server.get_connection(client.get_myinst()); + + ASSERT_TRUE(client_to_server->is_connected()); + ASSERT_TRUE(server_to_client->is_connected()); + + // no address in connection to server + ASSERT_EQ(empty_addr, client_to_server->get_peer_addr()); + // bind address is reflected in connection to client + ASSERT_EQ(client_addr, server_to_client->get_peer_addr()); + + // mark_down() with bind address breaks the connection + server.mark_down(client_addr); + + ASSERT_FALSE(client_to_server->is_connected()); + ASSERT_FALSE(server_to_client->is_connected()); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); + + ASSERT_EQ(0, server.shutdown()); + server.wait(); +} + +/// test connection and messenger interfaces before calls to set_direct_peer() +TEST(DirectMessenger, StartWithoutPeer) +{ + auto cct = g_ceph_context; + + DirectMessenger client(cct, entity_name_t::CLIENT(1), + "client", 0, new FastStrategy()); + DirectMessenger server(cct, entity_name_t::CLIENT(2), + "server", 0, new FastStrategy()); + + // can't start until set_direct_peer() + ASSERT_EQ(-EINVAL, client.start()); + ASSERT_EQ(-EINVAL, server.start()); + + ASSERT_EQ(0, client.set_direct_peer(&server)); + + // only client can start + ASSERT_EQ(0, client.start()); + ASSERT_EQ(-EINVAL, server.start()); + + // client has a connection but can't send + auto conn = client.get_connection(server.get_myinst()); + ASSERT_NE(nullptr, conn); + ASSERT_FALSE(conn->is_connected()); + ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing())); + ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst())); + + ASSERT_EQ(0, client.shutdown()); + client.wait(); +} + +int main(int argc, char **argv) +{ + // command-line arguments + vector<const char*> args; + argv_to_vec(argc, (const char **)argv, args); + env_to_vec(args); + + auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY, + CODE_ENVIRONMENT_DAEMON, 0); + common_init_finish(cct.get()); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/test/messenger/CMakeLists.txt b/src/test/messenger/CMakeLists.txt index c1b7c3a3ae1..ebdd00f2081 100644 --- a/src/test/messenger/CMakeLists.txt +++ b/src/test/messenger/CMakeLists.txt @@ -41,4 +41,3 @@ if(HAVE_XIO) ${CMAKE_DL_LIBS} ) endif(HAVE_XIO) - |