diff options
author | Yuval Lifshitz <yuvalif@yahoo.com> | 2020-04-02 15:32:06 +0200 |
---|---|---|
committer | Yuval Lifshitz <yuvalif@yahoo.com> | 2020-04-19 09:25:56 +0200 |
commit | 73db460e52717ede87e9c0041d097853fde27721 (patch) | |
tree | aaf147599490c2c034291b3fd26b520d420bb568 /src/rgw/rgw_amqp.cc | |
parent | Merge pull request #34604 from tchaikov/wip-cmake-fuse (diff) | |
download | ceph-73db460e52717ede87e9c0041d097853fde27721.tar.xz ceph-73db460e52717ede87e9c0041d097853fde27721.zip |
rgw/amqp: fix the "routable" delivery mode
this option was not exposed to the configuration API
however, it was still set, as hardcoded value in the code
(details:
https://www.rabbitmq.com/confirms.html#publisher-confirms)
Fixes: https://tracker.ceph.com/issues/44915
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
Diffstat (limited to 'src/rgw/rgw_amqp.cc')
-rw-r--r-- | src/rgw/rgw_amqp.cc | 109 |
1 files changed, 70 insertions, 39 deletions
diff --git a/src/rgw/rgw_amqp.cc b/src/rgw/rgw_amqp.cc index a5f6ca00d84..78446e88f68 100644 --- a/src/rgw/rgw_amqp.cc +++ b/src/rgw/rgw_amqp.cc @@ -72,7 +72,7 @@ struct connection_id_t { }; std::string to_string(const connection_id_t& id) { - return id.host+":"+std::to_string(id.port)+"/"+id.vhost; + return id.host+":"+std::to_string(id.port)+id.vhost; } // connection_t state cleaner @@ -124,6 +124,8 @@ struct connection_t { mutable std::atomic<int> ref_count; CephContext* cct; CallbackList callbacks; + ceph::coarse_real_clock::time_point next_reconnect; + bool mandatory; // default ctor connection_t() : @@ -135,7 +137,10 @@ struct connection_t { reply_type(AMQP_RESPONSE_NORMAL), reply_code(RGW_AMQP_NO_REPLY_CODE), ref_count(0), - cct(nullptr) {} + cct(nullptr), + next_reconnect(ceph::coarse_real_clock::now()), + mandatory(false) + {} // cleanup of all internal connection resource // the object can still remain, and internal connection @@ -489,12 +494,13 @@ connection_ptr_t& create_connection(connection_ptr_t& conn, const amqp_connectio // utility function to create a new connection connection_ptr_t create_new_connection(const amqp_connection_info& info, - const std::string& exchange, CephContext* cct) { + const std::string& exchange, bool mandatory_delivery, CephContext* cct) { // create connection state connection_ptr_t conn = new connection_t; conn->exchange = exchange; conn->user.assign(info.user); conn->password.assign(info.password); + conn->mandatory = mandatory_delivery; conn->cct = cct; return create_connection(conn, info); } @@ -542,6 +548,8 @@ private: CephContext* const cct; mutable std::mutex connections_lock; std::thread runner; + const ceph::coarse_real_clock::duration idle_time; + const ceph::coarse_real_clock::duration reconnect_time; void publish_internal(message_wrapper_t* message) { const std::unique_ptr<message_wrapper_t> msg_owner(message); @@ -563,9 +571,9 @@ private: CHANNEL_ID, amqp_cstring_bytes(conn->exchange.c_str()), amqp_cstring_bytes(message->topic.c_str()), - 1, // mandatory, TODO: take from conf + 0, // does not have to be routable 0, // not immediate - nullptr, + nullptr, // no properties needed amqp_cstring_bytes(message->message.c_str())); if (rc == AMQP_STATUS_OK) { ldout(conn->cct, 20) << "AMQP publish (no callback): OK" << dendl; @@ -589,7 +597,7 @@ private: CONFIRMING_CHANNEL_ID, amqp_cstring_bytes(conn->exchange.c_str()), amqp_cstring_bytes(message->topic.c_str()), - 1, // mandatory, TODO: take from conf + conn->mandatory, 0, // not immediate &props, amqp_cstring_bytes(message->message.c_str())); @@ -653,21 +661,26 @@ private: // try to reconnect the connection if it has an error if (!conn->is_ok()) { - // pointers are used temporarily inside the amqp_connection_info object - // as read-only values, hence the assignment, and const_cast are safe here - amqp_connection_info info; - info.host = const_cast<char*>(conn_it->first.host.c_str()); - info.port = conn_it->first.port; - info.vhost = const_cast<char*>(conn_it->first.vhost.c_str()); - info.user = const_cast<char*>(conn->user.c_str()); - info.password = const_cast<char*>(conn->password.c_str()); - ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl; - if (create_connection(conn, info)->is_ok() == false) { - ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed" << dendl; - // TODO: add error counter for failed retries - // TODO: add exponential backoff for retries - } else { - ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl; + const auto now = ceph::coarse_real_clock::now(); + if (now >= conn->next_reconnect) { + // pointers are used temporarily inside the amqp_connection_info object + // as read-only values, hence the assignment, and const_cast are safe here + amqp_connection_info info; + info.host = const_cast<char*>(conn_it->first.host.c_str()); + info.port = conn_it->first.port; + info.vhost = const_cast<char*>(conn_it->first.vhost.c_str()); + info.user = const_cast<char*>(conn->user.c_str()); + info.password = const_cast<char*>(conn->password.c_str()); + ldout(conn->cct, 20) << "AMQP run: retry connection" << dendl; + if (create_connection(conn, info)->is_ok() == false) { + ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry failed. error: " << + status_to_string(conn->status) << " (" << conn->reply_code << ")" << dendl; + // TODO: add error counter for failed retries + // TODO: add exponential backoff for retries + conn->next_reconnect = now + reconnect_time; + } else { + ldout(conn->cct, 10) << "AMQP run: connection (" << to_string(conn_it->first) << ") retry successfull" << dendl; + } } INCREMENT_AND_CONTINUE(conn_it); } @@ -693,9 +706,9 @@ private: } if (frame.frame_type != AMQP_FRAME_METHOD) { - ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages" << dendl; + ldout(conn->cct, 10) << "AMQP run: ignoring non n/ack messages. frame type: " + << unsigned(frame.frame_type) << dendl; // handler is for publish confirmation only - handle only method frames - // TODO: add a counter INCREMENT_AND_CONTINUE(conn_it); } @@ -722,6 +735,14 @@ private: multiple = nack->multiple; break; } + case AMQP_BASIC_REJECT_METHOD: + { + result = RGW_AMQP_STATUS_BROKER_NACK; + const auto reject = (amqp_basic_reject_t*)frame.payload.method.decoded; + tag = reject->delivery_tag; + multiple = false; + break; + } case AMQP_CONNECTION_CLOSE_METHOD: // TODO on channel close, no need to reopen the connection case AMQP_CHANNEL_CLOSE_METHOD: @@ -733,13 +754,11 @@ private: } case AMQP_BASIC_RETURN_METHOD: // message was not delivered, returned to sender - // TODO: add a counter - ldout(conn->cct, 10) << "AMQP run: message delivery error" << dendl; + ldout(conn->cct, 10) << "AMQP run: message was not routable" << dendl; INCREMENT_AND_CONTINUE(conn_it); break; default: // unexpected method - // TODO: add a counter ldout(conn->cct, 10) << "AMQP run: unexpected message" << dendl; INCREMENT_AND_CONTINUE(conn_it); } @@ -764,7 +783,6 @@ private: conn->callbacks.erase(tag_it); } } else { - // TODO add counter for acks with no callback ldout(conn->cct, 10) << "AMQP run: unsolicited n/ack received with tag=" << tag << dendl; } // just increment the iterator @@ -772,7 +790,7 @@ private: } // if no messages were received or published, sleep for 100ms if (count == 0 && !incoming_message) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::this_thread::sleep_for(idle_time); } } } @@ -787,6 +805,8 @@ public: size_t _max_inflight, size_t _max_queue, long _usec_timeout, + unsigned reconnect_time_ms, + unsigned idle_time_ms, CephContext* _cct) : max_connections(_max_connections), max_inflight(_max_inflight), @@ -799,7 +819,9 @@ public: queued(0), dequeued(0), cct(_cct), - runner(&Manager::run, this) { + runner(&Manager::run, this), + idle_time(std::chrono::milliseconds(idle_time_ms)), + reconnect_time(std::chrono::milliseconds(reconnect_time_ms)) { // The hashmap has "max connections" as the initial number of buckets, // and allows for 10 collisions per bucket before rehash. // This is to prevent rehashing so that iterators are not invalidated @@ -829,9 +851,8 @@ public: } // connect to a broker, or reuse an existing connection if already connected - connection_ptr_t connect(const std::string& url, const std::string& exchange) { + connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) { if (stopped) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: manager is stopped" << dendl; return nullptr; } @@ -840,7 +861,6 @@ public: // cache the URL so that parsing could happen in-place std::vector<char> url_cache(url.c_str(), url.c_str()+url.size()+1); if (AMQP_STATUS_OK != amqp_parse_url(url_cache.data(), &info)) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: URL parsing failed" << dendl; return nullptr; } @@ -850,11 +870,9 @@ public: const auto it = connections.find(id); if (it != connections.end()) { if (it->second->marked_for_deletion) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: endpoint marked for deletion" << dendl; return nullptr; } else if (it->second->exchange != exchange) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: exchange mismatch" << dendl; return nullptr; } @@ -865,11 +883,14 @@ public: // connection not found, creating a new one if (connection_count >= max_connections) { - // TODO: increment counter ldout(cct, 1) << "AMQP connect: max connections exceeded" << dendl; return nullptr; } - const auto conn = create_new_connection(info, exchange, cct); + const auto conn = create_new_connection(info, exchange, mandatory_delivery, cct); + if (!conn->is_ok()) { + ldout(cct, 10) << "AMQP connect: connection (" << to_string(id) << ") creation failed. error:" << + status_to_string(conn->status) << "(" << conn->reply_code << ")" << dendl; + } // create_new_connection must always return a connection object // even if error occurred during creation. // in such a case the creation will be retried in the main thread @@ -885,15 +906,18 @@ public: const std::string& topic, const std::string& message) { if (stopped) { + ldout(cct, 1) << "AMQP publish: manager is not running" << dendl; return RGW_AMQP_STATUS_MANAGER_STOPPED; } if (!conn || !conn->is_ok()) { + ldout(cct, 1) << "AMQP publish: no connection" << dendl; return RGW_AMQP_STATUS_CONNECTION_CLOSED; } if (messages.push(new message_wrapper_t(conn, topic, message, nullptr))) { ++queued; return AMQP_STATUS_OK; } + ldout(cct, 1) << "AMQP publish: queue is full" << dendl; return RGW_AMQP_STATUS_QUEUE_FULL; } @@ -902,15 +926,18 @@ public: const std::string& message, reply_callback_t cb) { if (stopped) { + ldout(cct, 1) << "AMQP publish_with_confirm: manager is not running" << dendl; return RGW_AMQP_STATUS_MANAGER_STOPPED; } if (!conn || !conn->is_ok()) { + ldout(cct, 1) << "AMQP publish_with_confirm: no connection" << dendl; return RGW_AMQP_STATUS_CONNECTION_CLOSED; } if (messages.push(new message_wrapper_t(conn, topic, message, cb))) { ++queued; return AMQP_STATUS_OK; } + ldout(cct, 1) << "AMQP publish_with_confirm: queue is full" << dendl; return RGW_AMQP_STATUS_QUEUE_FULL; } @@ -956,13 +983,17 @@ static Manager* s_manager = nullptr; static const size_t MAX_CONNECTIONS_DEFAULT = 256; static const size_t MAX_INFLIGHT_DEFAULT = 8192; static const size_t MAX_QUEUE_DEFAULT = 8192; +static const long READ_TIMEOUT_USEC = 100; +static const unsigned IDLE_TIME_MS = 100; +static const unsigned RECONNECT_TIME_MS = 100; bool init(CephContext* cct) { if (s_manager) { return false; } // TODO: take conf from CephContext - s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, 100, cct); + s_manager = new Manager(MAX_CONNECTIONS_DEFAULT, MAX_INFLIGHT_DEFAULT, MAX_QUEUE_DEFAULT, + READ_TIMEOUT_USEC, IDLE_TIME_MS, RECONNECT_TIME_MS, cct); return true; } @@ -971,9 +1002,9 @@ void shutdown() { s_manager = nullptr; } -connection_ptr_t connect(const std::string& url, const std::string& exchange) { +connection_ptr_t connect(const std::string& url, const std::string& exchange, bool mandatory_delivery) { if (!s_manager) return nullptr; - return s_manager->connect(url, exchange); + return s_manager->connect(url, exchange, mandatory_delivery); } int publish(connection_ptr_t& conn, |