summaryrefslogtreecommitdiff
path: root/net/rds/ib_cm.c
diff options
context:
space:
mode:
authorSantosh Shilimkar <santosh.shilimkar@oracle.com>2015-09-06 02:18:51 -0400
committerSantosh Shilimkar <santosh.shilimkar@oracle.com>2015-10-05 11:19:01 -0700
commit0c28c04500cf956c82d542c199f5bddabd590af3 (patch)
treea3cb6c63606e5cf459bfd4f21783c24dc62fd949 /net/rds/ib_cm.c
parentf4f943c958a2869b0601092857c1cf0e485d3ce8 (diff)
RDS: IB: split send completion handling and do batch ack
Similar to what we did with receive CQ completion handling, we split the transmit completion handler so that it lets us implement batched work completion handling. We re-use the cq_poll routine and makes use of RDS_IB_SEND_OP to identify the send vs receive completion event handler invocation. Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org> Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
Diffstat (limited to 'net/rds/ib_cm.c')
-rw-r--r--net/rds/ib_cm.c45
1 files changed, 42 insertions, 3 deletions
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 28e0979720b2..8f51d0d26578 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -250,11 +250,34 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
(unsigned long long)wc->wr_id, wc->status,
wc->byte_len, be32_to_cpu(wc->ex.imm_data));
- rds_ib_recv_cqe_handler(ic, wc, ack_state);
+
+ if (wc->wr_id & RDS_IB_SEND_OP)
+ rds_ib_send_cqe_handler(ic, wc);
+ else
+ rds_ib_recv_cqe_handler(ic, wc, ack_state);
}
}
}
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+ struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+ struct rds_connection *conn = ic->conn;
+ struct rds_ib_ack_state state;
+
+ rds_ib_stats_inc(s_ib_tasklet_call);
+
+ memset(&state, 0, sizeof(state));
+ poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+ ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+ poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+ if (rds_conn_up(conn) &&
+ (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+ test_bit(0, &conn->c_map_queued)))
+ rds_send_xmit(ic->conn);
+}
+
static void rds_ib_tasklet_fn_recv(unsigned long data)
{
struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
@@ -304,6 +327,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
}
}
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+ struct rds_connection *conn = context;
+ struct rds_ib_connection *ic = conn->c_transport_data;
+
+ rdsdebug("conn %p cq %p\n", conn, cq);
+
+ rds_ib_stats_inc(s_ib_evt_handler_call);
+
+ tasklet_schedule(&ic->i_send_tasklet);
+}
+
/*
* This needs to be very careful to not leave IS_ERR pointers around for
* cleanup to trip over.
@@ -337,7 +372,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
ic->i_pd = rds_ibdev->pd;
cq_attr.cqe = ic->i_send_ring.w_nr + 1;
- ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+
+ ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
rds_ib_cq_event_handler, conn,
&cq_attr);
if (IS_ERR(ic->i_send_cq)) {
@@ -703,6 +739,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
wait_event(rds_ib_ring_empty_wait,
rds_ib_ring_empty(&ic->i_recv_ring) &&
(atomic_read(&ic->i_signaled_sends) == 0));
+ tasklet_kill(&ic->i_send_tasklet);
tasklet_kill(&ic->i_recv_tasklet);
/* first destroy the ib state that generates callbacks */
@@ -809,8 +846,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
}
INIT_LIST_HEAD(&ic->ib_node);
+ tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+ (unsigned long)ic);
tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
- (unsigned long) ic);
+ (unsigned long)ic);
mutex_init(&ic->i_recv_mutex);
#ifndef KERNEL_HAS_ATOMIC64
spin_lock_init(&ic->i_ack_lock);