summaryrefslogtreecommitdiffstats
path: root/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/msg/async/rdma/RDMAConnectedSocketImpl.cc')
-rw-r--r--src/msg/async/rdma/RDMAConnectedSocketImpl.cc123
1 files changed, 24 insertions, 99 deletions
diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
index 9b4e9cec948..0b7b0bae155 100644
--- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
+++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc
@@ -29,11 +29,7 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, shared_ptr<In
{
if (!cct->_conf->ms_async_rdma_cm) {
qp = ib->create_queue_pair(cct, dispatcher->get_tx_cq(), dispatcher->get_rx_cq(), IBV_QPT_RC, NULL);
- local_cm_meta.local_qpn = qp->get_local_qp_number();
- local_cm_meta.psn = qp->get_initial_psn();
- local_cm_meta.lid = ib->get_lid();
- local_cm_meta.peer_qpn = 0;
- local_cm_meta.gid = ib->get_gid();
+ local_qpn = qp->get_local_qp_number();
notify_fd = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
@@ -46,7 +42,7 @@ RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
ldout(cct, 20) << __func__ << " destruct." << dendl;
cleanup();
worker->remove_pending_conn(this);
- dispatcher->erase_qpn(local_cm_meta.local_qpn);
+ dispatcher->erase_qpn(local_qpn);
for (unsigned i=0; i < wc.size(); ++i) {
dispatcher->post_chunk_to_pool(reinterpret_cast<Chunk*>(wc[i].wr_id));
@@ -83,89 +79,20 @@ void RDMAConnectedSocketImpl::get_wc(std::vector<ibv_wc> &w)
int RDMAConnectedSocketImpl::activate()
{
- ibv_qp_attr qpa;
- int r;
-
- // now connect up the qps and switch to RTR
- memset(&qpa, 0, sizeof(qpa));
- qpa.qp_state = IBV_QPS_RTR;
- qpa.path_mtu = IBV_MTU_1024;
- qpa.dest_qp_num = peer_cm_meta.local_qpn;
- qpa.rq_psn = peer_cm_meta.psn;
- qpa.max_dest_rd_atomic = 1;
- qpa.min_rnr_timer = 12;
- //qpa.ah_attr.is_global = 0;
- qpa.ah_attr.is_global = 1;
- qpa.ah_attr.grh.hop_limit = 6;
- qpa.ah_attr.grh.dgid = peer_cm_meta.gid;
-
- qpa.ah_attr.grh.sgid_index = ib->get_device()->get_gid_idx();
-
- qpa.ah_attr.dlid = peer_cm_meta.lid;
- qpa.ah_attr.sl = cct->_conf->ms_async_rdma_sl;
- qpa.ah_attr.grh.traffic_class = cct->_conf->ms_async_rdma_dscp;
- qpa.ah_attr.src_path_bits = 0;
- qpa.ah_attr.port_num = (uint8_t)(ib->get_ib_physical_port());
-
- ldout(cct, 20) << __func__ << " Choosing gid_index " << (int)qpa.ah_attr.grh.sgid_index << ", sl " << (int)qpa.ah_attr.sl << dendl;
-
- r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
- IBV_QP_AV |
- IBV_QP_PATH_MTU |
- IBV_QP_DEST_QPN |
- IBV_QP_RQ_PSN |
- IBV_QP_MIN_RNR_TIMER |
- IBV_QP_MAX_DEST_RD_ATOMIC);
- if (r) {
- lderr(cct) << __func__ << " failed to transition to RTR state: "
- << cpp_strerror(errno) << dendl;
+ qp->get_local_cm_meta().peer_qpn = qp->get_peer_cm_meta().local_qpn;
+ if (qp->modify_qp_to_rtr() != 0)
return -1;
- }
- ldout(cct, 20) << __func__ << " transition to RTR state successfully." << dendl;
-
- // now move to RTS
- qpa.qp_state = IBV_QPS_RTS;
-
- // How long to wait before retrying if packet lost or server dead.
- // Supposedly the timeout is 4.096us*2^timeout. However, the actual
- // timeout appears to be 4.096us*2^(timeout+1), so the setting
- // below creates a 135ms timeout.
- qpa.timeout = 14;
-
- // How many times to retry after timeouts before giving up.
- qpa.retry_cnt = 7;
-
- // How many times to retry after RNR (receiver not ready) condition
- // before giving up. Occurs when the remote side has not yet posted
- // a receive request.
- qpa.rnr_retry = 7; // 7 is infinite retry.
- qpa.sq_psn = local_cm_meta.psn;
- qpa.max_rd_atomic = 1;
-
- r = ibv_modify_qp(qp->get_qp(), &qpa, IBV_QP_STATE |
- IBV_QP_TIMEOUT |
- IBV_QP_RETRY_CNT |
- IBV_QP_RNR_RETRY |
- IBV_QP_SQ_PSN |
- IBV_QP_MAX_QP_RD_ATOMIC);
- if (r) {
- lderr(cct) << __func__ << " failed to transition to RTS state: "
- << cpp_strerror(errno) << dendl;
+ if (qp->modify_qp_to_rts() != 0)
return -1;
- }
-
- // the queue pair should be ready to use once the client has finished
- // setting up their end.
- ldout(cct, 20) << __func__ << " transition to RTS state successfully." << dendl;
- ldout(cct, 20) << __func__ << " QueuePair: " << qp << " with qp:" << qp->get_qp() << dendl;
if (!is_server) {
connected = 1; //indicate successfully
- ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_cm_meta.local_qpn << dendl;
+ ldout(cct, 20) << __func__ << " handle fake send, wake it up. QP: " << local_qpn << dendl;
submit(false);
}
active = true;
+ peer_qpn = qp->get_local_cm_meta().peer_qpn;
return 0;
}
@@ -189,8 +116,8 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
ldout(cct, 20) << __func__ << " tcp_fd: " << tcp_fd << dendl;
net.set_priority(tcp_fd, opts.priority, peer_addr.get_family());
- local_cm_meta.peer_qpn = 0;
- r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
+ qp->get_local_cm_meta().peer_qpn = 0;
+ r = qp->send_cm_meta(cct, tcp_fd);
if (r < 0)
return r;
@@ -199,8 +126,8 @@ int RDMAConnectedSocketImpl::try_connect(const entity_addr_t& peer_addr, const S
}
void RDMAConnectedSocketImpl::handle_connection() {
- ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
- int r = qp->recv_cm_meta(cct, tcp_fd, peer_cm_meta);
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << " tcp_fd: " << tcp_fd << " notify_fd: " << notify_fd << dendl;
+ int r = qp->recv_cm_meta(cct, tcp_fd);
if (r <= 0) {
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
@@ -216,37 +143,34 @@ void RDMAConnectedSocketImpl::handle_connection() {
return;
}
- if (!is_server) {// syn + ack from server
- local_cm_meta.peer_qpn = peer_cm_meta.local_qpn;
- ldout(cct, 20) << __func__ << " peer msg : < " << peer_cm_meta.local_qpn << ", " << peer_cm_meta.psn
- << ", " << peer_cm_meta.lid << ", " << peer_cm_meta.peer_qpn << "> " << dendl;
+ if (!is_server) {// first time: cm meta sync + ack from server
if (!connected) {
r = activate();
ceph_assert(!r);
}
notify();
- r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
+ r = qp->send_cm_meta(cct, tcp_fd);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
}
} else {
- if (peer_cm_meta.peer_qpn == 0) {// syn from client
+ if (qp->get_peer_cm_meta().peer_qpn == 0) {// first time: cm meta sync from client
if (active) {
ldout(cct, 10) << __func__ << " server is already active." << dendl;
return ;
}
r = activate();
ceph_assert(!r);
- r = qp->send_cm_meta(cct, tcp_fd, local_cm_meta);
+ r = qp->send_cm_meta(cct, tcp_fd);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
return ;
}
- } else { // ack from client
+ } else { // second time: cm meta ack from client
connected = 1;
ldout(cct, 10) << __func__ << " handshake of rdma is done. server connected: " << connected << dendl;
//cleanup();
@@ -260,13 +184,14 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
{
eventfd_t event_val = 0;
int r = eventfd_read(notify_fd, &event_val);
- ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_cm_meta.local_qpn << " r = " << r << dendl;
-
+ ldout(cct, 20) << __func__ << " notify_fd : " << event_val << " in " << local_qpn
+ << " r = " << r << dendl;
+
if (!active) {
ldout(cct, 1) << __func__ << " when ib not active. len: " << len << dendl;
return -EAGAIN;
}
-
+
if (0 == connected) {
ldout(cct, 1) << __func__ << " when ib not connected. len: " << len <<dendl;
return -EAGAIN;
@@ -275,7 +200,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
read = read_buffers(buf,len);
if (is_server && connected == 0) {
- ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_cm_meta.local_qpn << " peer QP: " << peer_cm_meta.local_qpn << dendl;
+ ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << local_qpn << " peer QP: " << peer_qpn << dendl;
connected = 1; //if so, we don't need the last handshake
cleanup();
submit(false);
@@ -411,11 +336,11 @@ ssize_t RDMAConnectedSocketImpl::send(bufferlist &bl, bool more)
std::lock_guard l{lock};
pending_bl.claim_append(bl);
if (!connected) {
- ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_cm_meta.local_qpn << dendl;
+ ldout(cct, 20) << __func__ << " fake send to upper, QP: " << local_qpn << dendl;
return bytes;
}
}
- ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << dendl;
ssize_t r = submit(more);
if (r < 0 && r != -EAGAIN)
return r;
@@ -522,7 +447,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
{
- ldout(cct, 20) << __func__ << " QP: " << local_cm_meta.local_qpn << " " << tx_buffers[0] << dendl;
+ ldout(cct, 20) << __func__ << " QP: " << local_qpn << " " << tx_buffers[0] << dendl;
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
ibv_sge isge[tx_buffers.size()];
uint32_t current_sge = 0;