GNU Linux-libre 4.9.309-gnu1
[releases.git] / drivers / staging / lustre / lnet / klnds / o2iblnd / o2iblnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/o2iblnd/o2iblnd_cb.c
33  *
34  * Author: Eric Barton <eric@bartonsoftware.com>
35  */
36
37 #include "o2iblnd.h"
38
39 #define MAX_CONN_RACES_BEFORE_ABORT 20
40
41 static void kiblnd_peer_alive(struct kib_peer *peer);
42 static void kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error);
43 static void kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx,
44                                int type, int body_nob);
45 static int kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
46                             int resid, struct kib_rdma_desc *dstrd,
47                             __u64 dstcookie);
48 static void kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn);
49 static void kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn);
50 static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx);
51 static void kiblnd_check_sends_locked(struct kib_conn *conn);
52
53 static void
54 kiblnd_tx_done(lnet_ni_t *ni, struct kib_tx *tx)
55 {
56         lnet_msg_t *lntmsg[2];
57         struct kib_net *net = ni->ni_data;
58         int rc;
59         int i;
60
61         LASSERT(net);
62         LASSERT(!in_interrupt());
63         LASSERT(!tx->tx_queued);               /* mustn't be queued for sending */
64         LASSERT(!tx->tx_sending);         /* mustn't be awaiting sent callback */
65         LASSERT(!tx->tx_waiting);             /* mustn't be awaiting peer response */
66         LASSERT(tx->tx_pool);
67
68         kiblnd_unmap_tx(ni, tx);
69
70         /* tx may have up to 2 lnet msgs to finalise */
71         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
72         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
73         rc = tx->tx_status;
74
75         if (tx->tx_conn) {
76                 LASSERT(ni == tx->tx_conn->ibc_peer->ibp_ni);
77
78                 kiblnd_conn_decref(tx->tx_conn);
79                 tx->tx_conn = NULL;
80         }
81
82         tx->tx_nwrq = 0;
83         tx->tx_status = 0;
84
85         kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
86
87         /* delay finalize until my descs have been freed */
88         for (i = 0; i < 2; i++) {
89                 if (!lntmsg[i])
90                         continue;
91
92                 lnet_finalize(ni, lntmsg[i], rc);
93         }
94 }
95
96 void
97 kiblnd_txlist_done(lnet_ni_t *ni, struct list_head *txlist, int status)
98 {
99         struct kib_tx *tx;
100
101         while (!list_empty(txlist)) {
102                 tx = list_entry(txlist->next, struct kib_tx, tx_list);
103
104                 list_del(&tx->tx_list);
105                 /* complete now */
106                 tx->tx_waiting = 0;
107                 tx->tx_status = status;
108                 kiblnd_tx_done(ni, tx);
109         }
110 }
111
112 static struct kib_tx *
113 kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
114 {
115         struct kib_net *net = (struct kib_net *)ni->ni_data;
116         struct list_head *node;
117         struct kib_tx *tx;
118         struct kib_tx_poolset *tps;
119
120         tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
121         node = kiblnd_pool_alloc_node(&tps->tps_poolset);
122         if (!node)
123                 return NULL;
124         tx = list_entry(node, struct kib_tx, tx_list);
125
126         LASSERT(!tx->tx_nwrq);
127         LASSERT(!tx->tx_queued);
128         LASSERT(!tx->tx_sending);
129         LASSERT(!tx->tx_waiting);
130         LASSERT(!tx->tx_status);
131         LASSERT(!tx->tx_conn);
132         LASSERT(!tx->tx_lntmsg[0]);
133         LASSERT(!tx->tx_lntmsg[1]);
134         LASSERT(!tx->tx_nfrags);
135
136         return tx;
137 }
138
139 static void
140 kiblnd_drop_rx(struct kib_rx *rx)
141 {
142         struct kib_conn *conn = rx->rx_conn;
143         struct kib_sched_info *sched = conn->ibc_sched;
144         unsigned long flags;
145
146         spin_lock_irqsave(&sched->ibs_lock, flags);
147         LASSERT(conn->ibc_nrx > 0);
148         conn->ibc_nrx--;
149         spin_unlock_irqrestore(&sched->ibs_lock, flags);
150
151         kiblnd_conn_decref(conn);
152 }
153
154 int
155 kiblnd_post_rx(struct kib_rx *rx, int credit)
156 {
157         struct kib_conn *conn = rx->rx_conn;
158         struct kib_net *net = conn->ibc_peer->ibp_ni->ni_data;
159         struct ib_recv_wr *bad_wrq = NULL;
160         struct ib_mr *mr = conn->ibc_hdev->ibh_mrs;
161         int rc;
162
163         LASSERT(net);
164         LASSERT(!in_interrupt());
165         LASSERT(credit == IBLND_POSTRX_NO_CREDIT ||
166                 credit == IBLND_POSTRX_PEER_CREDIT ||
167                 credit == IBLND_POSTRX_RSRVD_CREDIT);
168         LASSERT(mr);
169
170         rx->rx_sge.lkey   = mr->lkey;
171         rx->rx_sge.addr   = rx->rx_msgaddr;
172         rx->rx_sge.length = IBLND_MSG_SIZE;
173
174         rx->rx_wrq.next    = NULL;
175         rx->rx_wrq.sg_list = &rx->rx_sge;
176         rx->rx_wrq.num_sge = 1;
177         rx->rx_wrq.wr_id   = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
178
179         LASSERT(conn->ibc_state >= IBLND_CONN_INIT);
180         LASSERT(rx->rx_nob >= 0);             /* not posted */
181
182         if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
183                 kiblnd_drop_rx(rx);          /* No more posts for this rx */
184                 return 0;
185         }
186
187         rx->rx_nob = -1;                        /* flag posted */
188
189         /* NB: need an extra reference after ib_post_recv because we don't
190          * own this rx (and rx::rx_conn) anymore, LU-5678.
191          */
192         kiblnd_conn_addref(conn);
193         rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
194         if (unlikely(rc)) {
195                 CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
196                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
197                 rx->rx_nob = 0;
198         }
199
200         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
201                 goto out;
202
203         if (unlikely(rc)) {
204                 kiblnd_close_conn(conn, rc);
205                 kiblnd_drop_rx(rx);          /* No more posts for this rx */
206                 goto out;
207         }
208
209         if (credit == IBLND_POSTRX_NO_CREDIT)
210                 goto out;
211
212         spin_lock(&conn->ibc_lock);
213         if (credit == IBLND_POSTRX_PEER_CREDIT)
214                 conn->ibc_outstanding_credits++;
215         else
216                 conn->ibc_reserved_credits++;
217         kiblnd_check_sends_locked(conn);
218         spin_unlock(&conn->ibc_lock);
219
220 out:
221         kiblnd_conn_decref(conn);
222         return rc;
223 }
224
225 static struct kib_tx *
226 kiblnd_find_waiting_tx_locked(struct kib_conn *conn, int txtype, __u64 cookie)
227 {
228         struct list_head *tmp;
229
230         list_for_each(tmp, &conn->ibc_active_txs) {
231                 struct kib_tx *tx = list_entry(tmp, struct kib_tx, tx_list);
232
233                 LASSERT(!tx->tx_queued);
234                 LASSERT(tx->tx_sending || tx->tx_waiting);
235
236                 if (tx->tx_cookie != cookie)
237                         continue;
238
239                 if (tx->tx_waiting &&
240                     tx->tx_msg->ibm_type == txtype)
241                         return tx;
242
243                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
244                       tx->tx_waiting ? "" : "NOT ",
245                       tx->tx_msg->ibm_type, txtype);
246         }
247         return NULL;
248 }
249
250 static void
251 kiblnd_handle_completion(struct kib_conn *conn, int txtype, int status, __u64 cookie)
252 {
253         struct kib_tx *tx;
254         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
255         int idle;
256
257         spin_lock(&conn->ibc_lock);
258
259         tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
260         if (!tx) {
261                 spin_unlock(&conn->ibc_lock);
262
263                 CWARN("Unmatched completion type %x cookie %#llx from %s\n",
264                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
265                 kiblnd_close_conn(conn, -EPROTO);
266                 return;
267         }
268
269         if (!tx->tx_status) {          /* success so far */
270                 if (status < 0) /* failed? */
271                         tx->tx_status = status;
272                 else if (txtype == IBLND_MSG_GET_REQ)
273                         lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
274         }
275
276         tx->tx_waiting = 0;
277
278         idle = !tx->tx_queued && !tx->tx_sending;
279         if (idle)
280                 list_del(&tx->tx_list);
281
282         spin_unlock(&conn->ibc_lock);
283
284         if (idle)
285                 kiblnd_tx_done(ni, tx);
286 }
287
288 static void
289 kiblnd_send_completion(struct kib_conn *conn, int type, int status, __u64 cookie)
290 {
291         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
292         struct kib_tx *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
293
294         if (!tx) {
295                 CERROR("Can't get tx for completion %x for %s\n",
296                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
297                 return;
298         }
299
300         tx->tx_msg->ibm_u.completion.ibcm_status = status;
301         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
302         kiblnd_init_tx_msg(ni, tx, type, sizeof(struct kib_completion_msg));
303
304         kiblnd_queue_tx(tx, conn);
305 }
306
307 static void
308 kiblnd_handle_rx(struct kib_rx *rx)
309 {
310         struct kib_msg *msg = rx->rx_msg;
311         struct kib_conn *conn = rx->rx_conn;
312         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
313         int credits = msg->ibm_credits;
314         struct kib_tx *tx;
315         int rc = 0;
316         int rc2;
317         int post_credit;
318
319         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
320
321         CDEBUG(D_NET, "Received %x[%d] from %s\n",
322                msg->ibm_type, credits,
323                libcfs_nid2str(conn->ibc_peer->ibp_nid));
324
325         if (credits) {
326                 /* Have I received credits that will let me send? */
327                 spin_lock(&conn->ibc_lock);
328
329                 if (conn->ibc_credits + credits >
330                     conn->ibc_queue_depth) {
331                         rc2 = conn->ibc_credits;
332                         spin_unlock(&conn->ibc_lock);
333
334                         CERROR("Bad credits from %s: %d + %d > %d\n",
335                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
336                                rc2, credits, conn->ibc_queue_depth);
337
338                         kiblnd_close_conn(conn, -EPROTO);
339                         kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
340                         return;
341                 }
342
343                 conn->ibc_credits += credits;
344
345                 /* This ensures the credit taken by NOOP can be returned */
346                 if (msg->ibm_type == IBLND_MSG_NOOP &&
347                     !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
348                         conn->ibc_outstanding_credits++;
349
350                 kiblnd_check_sends_locked(conn);
351                 spin_unlock(&conn->ibc_lock);
352         }
353
354         switch (msg->ibm_type) {
355         default:
356                 CERROR("Bad IBLND message type %x from %s\n",
357                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
358                 post_credit = IBLND_POSTRX_NO_CREDIT;
359                 rc = -EPROTO;
360                 break;
361
362         case IBLND_MSG_NOOP:
363                 if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
364                         post_credit = IBLND_POSTRX_NO_CREDIT;
365                         break;
366                 }
367
368                 if (credits) /* credit already posted */
369                         post_credit = IBLND_POSTRX_NO_CREDIT;
370                 else          /* a keepalive NOOP */
371                         post_credit = IBLND_POSTRX_PEER_CREDIT;
372                 break;
373
374         case IBLND_MSG_IMMEDIATE:
375                 post_credit = IBLND_POSTRX_DONT_POST;
376                 rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
377                                 msg->ibm_srcnid, rx, 0);
378                 if (rc < 0)                  /* repost on error */
379                         post_credit = IBLND_POSTRX_PEER_CREDIT;
380                 break;
381
382         case IBLND_MSG_PUT_REQ:
383                 post_credit = IBLND_POSTRX_DONT_POST;
384                 rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
385                                 msg->ibm_srcnid, rx, 1);
386                 if (rc < 0)                  /* repost on error */
387                         post_credit = IBLND_POSTRX_PEER_CREDIT;
388                 break;
389
390         case IBLND_MSG_PUT_NAK:
391                 CWARN("PUT_NACK from %s\n",
392                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
393                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
394                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
395                                          msg->ibm_u.completion.ibcm_status,
396                                          msg->ibm_u.completion.ibcm_cookie);
397                 break;
398
399         case IBLND_MSG_PUT_ACK:
400                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
401
402                 spin_lock(&conn->ibc_lock);
403                 tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
404                                                    msg->ibm_u.putack.ibpam_src_cookie);
405                 if (tx)
406                         list_del(&tx->tx_list);
407                 spin_unlock(&conn->ibc_lock);
408
409                 if (!tx) {
410                         CERROR("Unmatched PUT_ACK from %s\n",
411                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
412                         rc = -EPROTO;
413                         break;
414                 }
415
416                 LASSERT(tx->tx_waiting);
417                 /*
418                  * CAVEAT EMPTOR: I could be racing with tx_complete, but...
419                  * (a) I can overwrite tx_msg since my peer has received it!
420                  * (b) tx_waiting set tells tx_complete() it's not done.
421                  */
422                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
423
424                 rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
425                                        kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
426                                        &msg->ibm_u.putack.ibpam_rd,
427                                        msg->ibm_u.putack.ibpam_dst_cookie);
428                 if (rc2 < 0)
429                         CERROR("Can't setup rdma for PUT to %s: %d\n",
430                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
431
432                 spin_lock(&conn->ibc_lock);
433                 tx->tx_waiting = 0;     /* clear waiting and queue atomically */
434                 kiblnd_queue_tx_locked(tx, conn);
435                 spin_unlock(&conn->ibc_lock);
436                 break;
437
438         case IBLND_MSG_PUT_DONE:
439                 post_credit = IBLND_POSTRX_PEER_CREDIT;
440                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
441                                          msg->ibm_u.completion.ibcm_status,
442                                          msg->ibm_u.completion.ibcm_cookie);
443                 break;
444
445         case IBLND_MSG_GET_REQ:
446                 post_credit = IBLND_POSTRX_DONT_POST;
447                 rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
448                                 msg->ibm_srcnid, rx, 1);
449                 if (rc < 0)                  /* repost on error */
450                         post_credit = IBLND_POSTRX_PEER_CREDIT;
451                 break;
452
453         case IBLND_MSG_GET_DONE:
454                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
455                 kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
456                                          msg->ibm_u.completion.ibcm_status,
457                                          msg->ibm_u.completion.ibcm_cookie);
458                 break;
459         }
460
461         if (rc < 0)                          /* protocol error */
462                 kiblnd_close_conn(conn, rc);
463
464         if (post_credit != IBLND_POSTRX_DONT_POST)
465                 kiblnd_post_rx(rx, post_credit);
466 }
467
468 static void
469 kiblnd_rx_complete(struct kib_rx *rx, int status, int nob)
470 {
471         struct kib_msg *msg = rx->rx_msg;
472         struct kib_conn *conn = rx->rx_conn;
473         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
474         struct kib_net *net = ni->ni_data;
475         int rc;
476         int err = -EIO;
477
478         LASSERT(net);
479         LASSERT(rx->rx_nob < 0);               /* was posted */
480         rx->rx_nob = 0;                  /* isn't now */
481
482         if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
483                 goto ignore;
484
485         if (status != IB_WC_SUCCESS) {
486                 CNETERR("Rx from %s failed: %d\n",
487                         libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
488                 goto failed;
489         }
490
491         LASSERT(nob >= 0);
492         rx->rx_nob = nob;
493
494         rc = kiblnd_unpack_msg(msg, rx->rx_nob);
495         if (rc) {
496                 CERROR("Error %d unpacking rx from %s\n",
497                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
498                 goto failed;
499         }
500
501         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
502             msg->ibm_dstnid != ni->ni_nid ||
503             msg->ibm_srcstamp != conn->ibc_incarnation ||
504             msg->ibm_dststamp != net->ibn_incarnation) {
505                 CERROR("Stale rx from %s\n",
506                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
507                 err = -ESTALE;
508                 goto failed;
509         }
510
511         /* set time last known alive */
512         kiblnd_peer_alive(conn->ibc_peer);
513
514         /* racing with connection establishment/teardown! */
515
516         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
517                 rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
518                 unsigned long flags;
519
520                 write_lock_irqsave(g_lock, flags);
521                 /* must check holding global lock to eliminate race */
522                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
523                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
524                         write_unlock_irqrestore(g_lock, flags);
525                         return;
526                 }
527                 write_unlock_irqrestore(g_lock, flags);
528         }
529         kiblnd_handle_rx(rx);
530         return;
531
532  failed:
533         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
534         kiblnd_close_conn(conn, err);
535  ignore:
536         kiblnd_drop_rx(rx);                  /* Don't re-post rx. */
537 }
538
539 static struct page *
540 kiblnd_kvaddr_to_page(unsigned long vaddr)
541 {
542         struct page *page;
543
544         if (is_vmalloc_addr((void *)vaddr)) {
545                 page = vmalloc_to_page((void *)vaddr);
546                 LASSERT(page);
547                 return page;
548         }
549 #ifdef CONFIG_HIGHMEM
550         if (vaddr >= PKMAP_BASE &&
551             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
552                 /* No highmem pages only used for bulk (kiov) I/O */
553                 CERROR("find page for address in highmem\n");
554                 LBUG();
555         }
556 #endif
557         page = virt_to_page(vaddr);
558         LASSERT(page);
559         return page;
560 }
561
562 static int
563 kiblnd_fmr_map_tx(struct kib_net *net, struct kib_tx *tx, struct kib_rdma_desc *rd, __u32 nob)
564 {
565         struct kib_hca_dev *hdev;
566         struct kib_fmr_poolset *fps;
567         int cpt;
568         int rc;
569
570         LASSERT(tx->tx_pool);
571         LASSERT(tx->tx_pool->tpo_pool.po_owner);
572
573         hdev = tx->tx_pool->tpo_hdev;
574         cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
575
576         fps = net->ibn_fmr_ps[cpt];
577         rc = kiblnd_fmr_pool_map(fps, tx, rd, nob, 0, &tx->fmr);
578         if (rc) {
579                 CERROR("Can't map %u bytes: %d\n", nob, rc);
580                 return rc;
581         }
582
583         /*
584          * If rd is not tx_rd, it's going to get sent to a peer, who will need
585          * the rkey
586          */
587         rd->rd_key = tx->fmr.fmr_key;
588         rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
589         rd->rd_frags[0].rf_nob = nob;
590         rd->rd_nfrags = 1;
591
592         return 0;
593 }
594
595 static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx)
596 {
597         struct kib_net *net = ni->ni_data;
598
599         LASSERT(net);
600
601         if (net->ibn_fmr_ps)
602                 kiblnd_fmr_pool_unmap(&tx->fmr, tx->tx_status);
603
604         if (tx->tx_nfrags) {
605                 kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
606                                     tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
607                 tx->tx_nfrags = 0;
608         }
609 }
610
611 static int kiblnd_map_tx(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
612                          int nfrags)
613 {
614         struct kib_net *net = ni->ni_data;
615         struct kib_hca_dev *hdev = net->ibn_dev->ibd_hdev;
616         struct ib_mr *mr    = NULL;
617         __u32 nob;
618         int i;
619
620         /*
621          * If rd is not tx_rd, it's going to get sent to a peer and I'm the
622          * RDMA sink
623          */
624         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
625         tx->tx_nfrags = nfrags;
626
627         rd->rd_nfrags = kiblnd_dma_map_sg(hdev->ibh_ibdev, tx->tx_frags,
628                                           tx->tx_nfrags, tx->tx_dmadir);
629
630         for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
631                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
632                         hdev->ibh_ibdev, &tx->tx_frags[i]);
633                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
634                         hdev->ibh_ibdev, &tx->tx_frags[i]);
635                 nob += rd->rd_frags[i].rf_nob;
636         }
637
638         mr = kiblnd_find_rd_dma_mr(ni, rd, tx->tx_conn ?
639                                    tx->tx_conn->ibc_max_frags : -1);
640         if (mr) {
641                 /* found pre-mapping MR */
642                 rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
643                 return 0;
644         }
645
646         if (net->ibn_fmr_ps)
647                 return kiblnd_fmr_map_tx(net, tx, rd, nob);
648
649         return -EINVAL;
650 }
651
652 static int
653 kiblnd_setup_rd_iov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
654                     unsigned int niov, const struct kvec *iov, int offset, int nob)
655 {
656         struct kib_net *net = ni->ni_data;
657         struct page *page;
658         struct scatterlist *sg;
659         unsigned long vaddr;
660         int fragnob;
661         int page_offset;
662
663         LASSERT(nob > 0);
664         LASSERT(niov > 0);
665         LASSERT(net);
666
667         while (offset >= iov->iov_len) {
668                 offset -= iov->iov_len;
669                 niov--;
670                 iov++;
671                 LASSERT(niov > 0);
672         }
673
674         sg = tx->tx_frags;
675         do {
676                 LASSERT(niov > 0);
677
678                 vaddr = ((unsigned long)iov->iov_base) + offset;
679                 page_offset = vaddr & (PAGE_SIZE - 1);
680                 page = kiblnd_kvaddr_to_page(vaddr);
681                 if (!page) {
682                         CERROR("Can't find page\n");
683                         return -EFAULT;
684                 }
685
686                 fragnob = min((int)(iov->iov_len - offset), nob);
687                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
688
689                 sg_set_page(sg, page, fragnob, page_offset);
690                 sg = sg_next(sg);
691                 if (!sg) {
692                         CERROR("lacking enough sg entries to map tx\n");
693                         return -EFAULT;
694                 }
695
696                 if (offset + fragnob < iov->iov_len) {
697                         offset += fragnob;
698                 } else {
699                         offset = 0;
700                         iov++;
701                         niov--;
702                 }
703                 nob -= fragnob;
704         } while (nob > 0);
705
706         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
707 }
708
709 static int
710 kiblnd_setup_rd_kiov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
711                      int nkiov, const lnet_kiov_t *kiov, int offset, int nob)
712 {
713         struct kib_net *net = ni->ni_data;
714         struct scatterlist *sg;
715         int fragnob;
716
717         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
718
719         LASSERT(nob > 0);
720         LASSERT(nkiov > 0);
721         LASSERT(net);
722
723         while (offset >= kiov->bv_len) {
724                 offset -= kiov->bv_len;
725                 nkiov--;
726                 kiov++;
727                 LASSERT(nkiov > 0);
728         }
729
730         sg = tx->tx_frags;
731         do {
732                 LASSERT(nkiov > 0);
733
734                 fragnob = min((int)(kiov->bv_len - offset), nob);
735
736                 sg_set_page(sg, kiov->bv_page, fragnob,
737                             kiov->bv_offset + offset);
738                 sg = sg_next(sg);
739                 if (!sg) {
740                         CERROR("lacking enough sg entries to map tx\n");
741                         return -EFAULT;
742                 }
743
744                 offset = 0;
745                 kiov++;
746                 nkiov--;
747                 nob -= fragnob;
748         } while (nob > 0);
749
750         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
751 }
752
753 static int
754 kiblnd_post_tx_locked(struct kib_conn *conn, struct kib_tx *tx, int credit)
755         __must_hold(&conn->ibc_lock)
756 {
757         struct kib_msg *msg = tx->tx_msg;
758         struct kib_peer *peer = conn->ibc_peer;
759         struct lnet_ni *ni = peer->ibp_ni;
760         int ver = conn->ibc_version;
761         int rc;
762         int done;
763
764         LASSERT(tx->tx_queued);
765         /* We rely on this for QP sizing */
766         LASSERT(tx->tx_nwrq > 0);
767
768         LASSERT(!credit || credit == 1);
769         LASSERT(conn->ibc_outstanding_credits >= 0);
770         LASSERT(conn->ibc_outstanding_credits <= conn->ibc_queue_depth);
771         LASSERT(conn->ibc_credits >= 0);
772         LASSERT(conn->ibc_credits <= conn->ibc_queue_depth);
773
774         if (conn->ibc_nsends_posted == kiblnd_concurrent_sends(ver, ni)) {
775                 /* tx completions outstanding... */
776                 CDEBUG(D_NET, "%s: posted enough\n",
777                        libcfs_nid2str(peer->ibp_nid));
778                 return -EAGAIN;
779         }
780
781         if (credit && !conn->ibc_credits) {   /* no credits */
782                 CDEBUG(D_NET, "%s: no credits\n",
783                        libcfs_nid2str(peer->ibp_nid));
784                 return -EAGAIN;
785         }
786
787         if (credit && !IBLND_OOB_CAPABLE(ver) &&
788             conn->ibc_credits == 1 &&   /* last credit reserved */
789             msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
790                 CDEBUG(D_NET, "%s: not using last credit\n",
791                        libcfs_nid2str(peer->ibp_nid));
792                 return -EAGAIN;
793         }
794
795         /* NB don't drop ibc_lock before bumping tx_sending */
796         list_del(&tx->tx_list);
797         tx->tx_queued = 0;
798
799         if (msg->ibm_type == IBLND_MSG_NOOP &&
800             (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
801              (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
802               conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
803                 /*
804                  * OK to drop when posted enough NOOPs, since
805                  * kiblnd_check_sends_locked will queue NOOP again when
806                  * posted NOOPs complete
807                  */
808                 spin_unlock(&conn->ibc_lock);
809                 kiblnd_tx_done(peer->ibp_ni, tx);
810                 spin_lock(&conn->ibc_lock);
811                 CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
812                        libcfs_nid2str(peer->ibp_nid),
813                        conn->ibc_noops_posted);
814                 return 0;
815         }
816
817         kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
818                         peer->ibp_nid, conn->ibc_incarnation);
819
820         conn->ibc_credits -= credit;
821         conn->ibc_outstanding_credits = 0;
822         conn->ibc_nsends_posted++;
823         if (msg->ibm_type == IBLND_MSG_NOOP)
824                 conn->ibc_noops_posted++;
825
826         /*
827          * CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
828          * PUT.  If so, it was first queued here as a PUT_REQ, sent and
829          * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
830          * and then re-queued here.  It's (just) possible that
831          * tx_sending is non-zero if we've not done the tx_complete()
832          * from the first send; hence the ++ rather than = below.
833          */
834         tx->tx_sending++;
835         list_add(&tx->tx_list, &conn->ibc_active_txs);
836
837         /* I'm still holding ibc_lock! */
838         if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
839                 rc = -ECONNABORTED;
840         } else if (tx->tx_pool->tpo_pool.po_failed ||
841                  conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
842                 /* close_conn will launch failover */
843                 rc = -ENETDOWN;
844         } else {
845                 struct kib_fast_reg_descriptor *frd = tx->fmr.fmr_frd;
846                 struct ib_send_wr *bad = &tx->tx_wrq[tx->tx_nwrq - 1].wr;
847                 struct ib_send_wr *wrq = &tx->tx_wrq[0].wr;
848
849                 if (frd) {
850                         if (!frd->frd_valid) {
851                                 wrq = &frd->frd_inv_wr;
852                                 wrq->next = &frd->frd_fastreg_wr.wr;
853                         } else {
854                                 wrq = &frd->frd_fastreg_wr.wr;
855                         }
856                         frd->frd_fastreg_wr.wr.next = &tx->tx_wrq[0].wr;
857                 }
858
859                 LASSERTF(bad->wr_id == kiblnd_ptr2wreqid(tx, IBLND_WID_TX),
860                          "bad wr_id %llx, opc %d, flags %d, peer: %s\n",
861                          bad->wr_id, bad->opcode, bad->send_flags,
862                          libcfs_nid2str(conn->ibc_peer->ibp_nid));
863                 bad = NULL;
864                 rc = ib_post_send(conn->ibc_cmid->qp, wrq, &bad);
865         }
866
867         conn->ibc_last_send = jiffies;
868
869         if (!rc)
870                 return 0;
871
872         /*
873          * NB credits are transferred in the actual
874          * message, which can only be the last work item
875          */
876         conn->ibc_credits += credit;
877         conn->ibc_outstanding_credits += msg->ibm_credits;
878         conn->ibc_nsends_posted--;
879         if (msg->ibm_type == IBLND_MSG_NOOP)
880                 conn->ibc_noops_posted--;
881
882         tx->tx_status = rc;
883         tx->tx_waiting = 0;
884         tx->tx_sending--;
885
886         done = !tx->tx_sending;
887         if (done)
888                 list_del(&tx->tx_list);
889
890         spin_unlock(&conn->ibc_lock);
891
892         if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
893                 CERROR("Error %d posting transmit to %s\n",
894                        rc, libcfs_nid2str(peer->ibp_nid));
895         else
896                 CDEBUG(D_NET, "Error %d posting transmit to %s\n",
897                        rc, libcfs_nid2str(peer->ibp_nid));
898
899         kiblnd_close_conn(conn, rc);
900
901         if (done)
902                 kiblnd_tx_done(peer->ibp_ni, tx);
903
904         spin_lock(&conn->ibc_lock);
905
906         return -EIO;
907 }
908
909 static void
910 kiblnd_check_sends_locked(struct kib_conn *conn)
911 {
912         int ver = conn->ibc_version;
913         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
914         struct kib_tx *tx;
915
916         /* Don't send anything until after the connection is established */
917         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
918                 CDEBUG(D_NET, "%s too soon\n",
919                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
920                 return;
921         }
922
923         LASSERT(conn->ibc_nsends_posted <= kiblnd_concurrent_sends(ver, ni));
924         LASSERT(!IBLND_OOB_CAPABLE(ver) ||
925                 conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
926         LASSERT(conn->ibc_reserved_credits >= 0);
927
928         while (conn->ibc_reserved_credits > 0 &&
929                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
930                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
931                                 struct kib_tx, tx_list);
932                 list_del(&tx->tx_list);
933                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
934                 conn->ibc_reserved_credits--;
935         }
936
937         if (kiblnd_need_noop(conn)) {
938                 spin_unlock(&conn->ibc_lock);
939
940                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
941                 if (tx)
942                         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
943
944                 spin_lock(&conn->ibc_lock);
945                 if (tx)
946                         kiblnd_queue_tx_locked(tx, conn);
947         }
948
949         for (;;) {
950                 int credit;
951
952                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
953                         credit = 0;
954                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
955                                         struct kib_tx, tx_list);
956                 } else if (!list_empty(&conn->ibc_tx_noops)) {
957                         LASSERT(!IBLND_OOB_CAPABLE(ver));
958                         credit = 1;
959                         tx = list_entry(conn->ibc_tx_noops.next,
960                                         struct kib_tx, tx_list);
961                 } else if (!list_empty(&conn->ibc_tx_queue)) {
962                         credit = 1;
963                         tx = list_entry(conn->ibc_tx_queue.next,
964                                         struct kib_tx, tx_list);
965                 } else {
966                         break;
967                 }
968
969                 if (kiblnd_post_tx_locked(conn, tx, credit))
970                         break;
971         }
972 }
973
974 static void
975 kiblnd_tx_complete(struct kib_tx *tx, int status)
976 {
977         int failed = (status != IB_WC_SUCCESS);
978         struct kib_conn *conn = tx->tx_conn;
979         int idle;
980
981         LASSERT(tx->tx_sending > 0);
982
983         if (failed) {
984                 if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
985                         CNETERR("Tx -> %s cookie %#llx sending %d waiting %d: failed %d\n",
986                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
987                                 tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
988                                 status);
989
990                 kiblnd_close_conn(conn, -EIO);
991         } else {
992                 kiblnd_peer_alive(conn->ibc_peer);
993         }
994
995         spin_lock(&conn->ibc_lock);
996
997         /*
998          * I could be racing with rdma completion.  Whoever makes 'tx' idle
999          * gets to free it, which also drops its ref on 'conn'.
1000          */
1001         tx->tx_sending--;
1002         conn->ibc_nsends_posted--;
1003         if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1004                 conn->ibc_noops_posted--;
1005
1006         if (failed) {
1007                 tx->tx_waiting = 0;          /* don't wait for peer */
1008                 tx->tx_status = -EIO;
1009         }
1010
1011         idle = !tx->tx_sending &&        /* This is the final callback */
1012                !tx->tx_waiting &&              /* Not waiting for peer */
1013                !tx->tx_queued;            /* Not re-queued (PUT_DONE) */
1014         if (idle)
1015                 list_del(&tx->tx_list);
1016
1017         kiblnd_check_sends_locked(conn);
1018         spin_unlock(&conn->ibc_lock);
1019
1020         if (idle)
1021                 kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1022 }
1023
1024 static void
1025 kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx, int type, int body_nob)
1026 {
1027         struct kib_hca_dev *hdev = tx->tx_pool->tpo_hdev;
1028         struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq];
1029         struct ib_rdma_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1030         int nob = offsetof(struct kib_msg, ibm_u) + body_nob;
1031         struct ib_mr *mr = hdev->ibh_mrs;
1032
1033         LASSERT(tx->tx_nwrq >= 0);
1034         LASSERT(tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1035         LASSERT(nob <= IBLND_MSG_SIZE);
1036         LASSERT(mr);
1037
1038         kiblnd_init_msg(tx->tx_msg, type, body_nob);
1039
1040         sge->lkey   = mr->lkey;
1041         sge->addr   = tx->tx_msgaddr;
1042         sge->length = nob;
1043
1044         memset(wrq, 0, sizeof(*wrq));
1045
1046         wrq->wr.next       = NULL;
1047         wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1048         wrq->wr.sg_list    = sge;
1049         wrq->wr.num_sge    = 1;
1050         wrq->wr.opcode     = IB_WR_SEND;
1051         wrq->wr.send_flags = IB_SEND_SIGNALED;
1052
1053         tx->tx_nwrq++;
1054 }
1055
1056 static int
1057 kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
1058                  int resid, struct kib_rdma_desc *dstrd, __u64 dstcookie)
1059 {
1060         struct kib_msg *ibmsg = tx->tx_msg;
1061         struct kib_rdma_desc *srcrd = tx->tx_rd;
1062         struct ib_sge *sge = &tx->tx_sge[0];
1063         struct ib_rdma_wr *wrq, *next;
1064         int rc  = resid;
1065         int srcidx = 0;
1066         int dstidx = 0;
1067         int wrknob;
1068
1069         LASSERT(!in_interrupt());
1070         LASSERT(!tx->tx_nwrq);
1071         LASSERT(type == IBLND_MSG_GET_DONE ||
1072                 type == IBLND_MSG_PUT_DONE);
1073
1074         if (kiblnd_rd_size(srcrd) > conn->ibc_max_frags << PAGE_SHIFT) {
1075                 CERROR("RDMA is too large for peer %s (%d), src size: %d dst size: %d\n",
1076                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
1077                        conn->ibc_max_frags << PAGE_SHIFT,
1078                        kiblnd_rd_size(srcrd), kiblnd_rd_size(dstrd));
1079                 rc = -EMSGSIZE;
1080                 goto too_big;
1081         }
1082
1083         while (resid > 0) {
1084                 if (srcidx >= srcrd->rd_nfrags) {
1085                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1086                         rc = -EPROTO;
1087                         break;
1088                 }
1089
1090                 if (dstidx == dstrd->rd_nfrags) {
1091                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1092                         rc = -EPROTO;
1093                         break;
1094                 }
1095
1096                 if (tx->tx_nwrq >= IBLND_MAX_RDMA_FRAGS) {
1097                         CERROR("RDMA has too many fragments for peer %s (%d), src idx/frags: %d/%d dst idx/frags: %d/%d\n",
1098                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1099                                IBLND_MAX_RDMA_FRAGS,
1100                                srcidx, srcrd->rd_nfrags,
1101                                dstidx, dstrd->rd_nfrags);
1102                         rc = -EMSGSIZE;
1103                         break;
1104                 }
1105
1106                 wrknob = min(min(kiblnd_rd_frag_size(srcrd, srcidx),
1107                                  kiblnd_rd_frag_size(dstrd, dstidx)),
1108                              (__u32)resid);
1109
1110                 sge = &tx->tx_sge[tx->tx_nwrq];
1111                 sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1112                 sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1113                 sge->length = wrknob;
1114
1115                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1116                 next = wrq + 1;
1117
1118                 wrq->wr.next       = &next->wr;
1119                 wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1120                 wrq->wr.sg_list    = sge;
1121                 wrq->wr.num_sge    = 1;
1122                 wrq->wr.opcode     = IB_WR_RDMA_WRITE;
1123                 wrq->wr.send_flags = 0;
1124
1125                 wrq->remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1126                 wrq->rkey        = kiblnd_rd_frag_key(dstrd, dstidx);
1127
1128                 srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1129                 dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1130
1131                 resid -= wrknob;
1132
1133                 tx->tx_nwrq++;
1134                 wrq++;
1135                 sge++;
1136         }
1137 too_big:
1138         if (rc < 0)                          /* no RDMA if completing with failure */
1139                 tx->tx_nwrq = 0;
1140
1141         ibmsg->ibm_u.completion.ibcm_status = rc;
1142         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1143         kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1144                            type, sizeof(struct kib_completion_msg));
1145
1146         return rc;
1147 }
1148
1149 static void
1150 kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn)
1151 {
1152         struct list_head *q;
1153
1154         LASSERT(tx->tx_nwrq > 0);             /* work items set up */
1155         LASSERT(!tx->tx_queued);               /* not queued for sending already */
1156         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1157
1158         tx->tx_queued = 1;
1159         tx->tx_deadline = jiffies +
1160                           msecs_to_jiffies(*kiblnd_tunables.kib_timeout *
1161                                            MSEC_PER_SEC);
1162
1163         if (!tx->tx_conn) {
1164                 kiblnd_conn_addref(conn);
1165                 tx->tx_conn = conn;
1166                 LASSERT(tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1167         } else {
1168                 /* PUT_DONE first attached to conn as a PUT_REQ */
1169                 LASSERT(tx->tx_conn == conn);
1170                 LASSERT(tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1171         }
1172
1173         switch (tx->tx_msg->ibm_type) {
1174         default:
1175                 LBUG();
1176
1177         case IBLND_MSG_PUT_REQ:
1178         case IBLND_MSG_GET_REQ:
1179                 q = &conn->ibc_tx_queue_rsrvd;
1180                 break;
1181
1182         case IBLND_MSG_PUT_NAK:
1183         case IBLND_MSG_PUT_ACK:
1184         case IBLND_MSG_PUT_DONE:
1185         case IBLND_MSG_GET_DONE:
1186                 q = &conn->ibc_tx_queue_nocred;
1187                 break;
1188
1189         case IBLND_MSG_NOOP:
1190                 if (IBLND_OOB_CAPABLE(conn->ibc_version))
1191                         q = &conn->ibc_tx_queue_nocred;
1192                 else
1193                         q = &conn->ibc_tx_noops;
1194                 break;
1195
1196         case IBLND_MSG_IMMEDIATE:
1197                 q = &conn->ibc_tx_queue;
1198                 break;
1199         }
1200
1201         list_add_tail(&tx->tx_list, q);
1202 }
1203
1204 static void
1205 kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn)
1206 {
1207         spin_lock(&conn->ibc_lock);
1208         kiblnd_queue_tx_locked(tx, conn);
1209         kiblnd_check_sends_locked(conn);
1210         spin_unlock(&conn->ibc_lock);
1211 }
1212
1213 static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1214                                struct sockaddr_in *srcaddr,
1215                                struct sockaddr_in *dstaddr,
1216                                int timeout_ms)
1217 {
1218         unsigned short port;
1219         int rc;
1220
1221         /* allow the port to be reused */
1222         rc = rdma_set_reuseaddr(cmid, 1);
1223         if (rc) {
1224                 CERROR("Unable to set reuse on cmid: %d\n", rc);
1225                 return rc;
1226         }
1227
1228         /* look for a free privileged port */
1229         for (port = PROT_SOCK - 1; port > 0; port--) {
1230                 srcaddr->sin_port = htons(port);
1231                 rc = rdma_resolve_addr(cmid,
1232                                        (struct sockaddr *)srcaddr,
1233                                        (struct sockaddr *)dstaddr,
1234                                        timeout_ms);
1235                 if (!rc) {
1236                         CDEBUG(D_NET, "bound to port %hu\n", port);
1237                         return 0;
1238                 } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1239                         CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1240                                port, rc);
1241                 } else {
1242                         return rc;
1243                 }
1244         }
1245
1246         CERROR("Failed to bind to a free privileged port\n");
1247         return rc;
1248 }
1249
1250 static void
1251 kiblnd_connect_peer(struct kib_peer *peer)
1252 {
1253         struct rdma_cm_id *cmid;
1254         struct kib_dev *dev;
1255         struct kib_net *net = peer->ibp_ni->ni_data;
1256         struct sockaddr_in srcaddr;
1257         struct sockaddr_in dstaddr;
1258         int rc;
1259
1260         LASSERT(net);
1261         LASSERT(peer->ibp_connecting > 0);
1262         LASSERT(!peer->ibp_reconnecting);
1263
1264         cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1265                                      IB_QPT_RC);
1266
1267         if (IS_ERR(cmid)) {
1268                 CERROR("Can't create CMID for %s: %ld\n",
1269                        libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1270                 rc = PTR_ERR(cmid);
1271                 goto failed;
1272         }
1273
1274         dev = net->ibn_dev;
1275         memset(&srcaddr, 0, sizeof(srcaddr));
1276         srcaddr.sin_family = AF_INET;
1277         srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1278
1279         memset(&dstaddr, 0, sizeof(dstaddr));
1280         dstaddr.sin_family = AF_INET;
1281         dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1282         dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1283
1284         kiblnd_peer_addref(peer);              /* cmid's ref */
1285
1286         if (*kiblnd_tunables.kib_use_priv_port) {
1287                 rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1288                                          *kiblnd_tunables.kib_timeout * 1000);
1289         } else {
1290                 rc = rdma_resolve_addr(cmid,
1291                                        (struct sockaddr *)&srcaddr,
1292                                        (struct sockaddr *)&dstaddr,
1293                                        *kiblnd_tunables.kib_timeout * 1000);
1294         }
1295         if (rc) {
1296                 /* Can't initiate address resolution:  */
1297                 CERROR("Can't resolve addr for %s: %d\n",
1298                        libcfs_nid2str(peer->ibp_nid), rc);
1299                 goto failed2;
1300         }
1301
1302         return;
1303
1304  failed2:
1305         kiblnd_peer_connect_failed(peer, 1, rc);
1306         kiblnd_peer_decref(peer);              /* cmid's ref */
1307         rdma_destroy_id(cmid);
1308         return;
1309  failed:
1310         kiblnd_peer_connect_failed(peer, 1, rc);
1311 }
1312
1313 bool
1314 kiblnd_reconnect_peer(struct kib_peer *peer)
1315 {
1316         rwlock_t *glock = &kiblnd_data.kib_global_lock;
1317         char *reason = NULL;
1318         struct list_head txs;
1319         unsigned long flags;
1320
1321         INIT_LIST_HEAD(&txs);
1322
1323         write_lock_irqsave(glock, flags);
1324         if (!peer->ibp_reconnecting) {
1325                 if (peer->ibp_accepting)
1326                         reason = "accepting";
1327                 else if (peer->ibp_connecting)
1328                         reason = "connecting";
1329                 else if (!list_empty(&peer->ibp_conns))
1330                         reason = "connected";
1331                 else /* connected then closed */
1332                         reason = "closed";
1333
1334                 goto no_reconnect;
1335         }
1336
1337         LASSERT(!peer->ibp_accepting && !peer->ibp_connecting &&
1338                 list_empty(&peer->ibp_conns));
1339         peer->ibp_reconnecting = 0;
1340
1341         if (!kiblnd_peer_active(peer)) {
1342                 list_splice_init(&peer->ibp_tx_queue, &txs);
1343                 reason = "unlinked";
1344                 goto no_reconnect;
1345         }
1346
1347         peer->ibp_connecting++;
1348         peer->ibp_reconnected++;
1349         write_unlock_irqrestore(glock, flags);
1350
1351         kiblnd_connect_peer(peer);
1352         return true;
1353
1354 no_reconnect:
1355         write_unlock_irqrestore(glock, flags);
1356
1357         CWARN("Abort reconnection of %s: %s\n",
1358               libcfs_nid2str(peer->ibp_nid), reason);
1359         kiblnd_txlist_done(peer->ibp_ni, &txs, -ECONNABORTED);
1360         return false;
1361 }
1362
1363 void
1364 kiblnd_launch_tx(lnet_ni_t *ni, struct kib_tx *tx, lnet_nid_t nid)
1365 {
1366         struct kib_peer *peer;
1367         struct kib_peer *peer2;
1368         struct kib_conn *conn;
1369         rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
1370         unsigned long flags;
1371         int rc;
1372
1373         /*
1374          * If I get here, I've committed to send, so I complete the tx with
1375          * failure on any problems
1376          */
1377         LASSERT(!tx || !tx->tx_conn); /* only set when assigned a conn */
1378         LASSERT(!tx || tx->tx_nwrq > 0);     /* work items have been set up */
1379
1380         /*
1381          * First time, just use a read lock since I expect to find my peer
1382          * connected
1383          */
1384         read_lock_irqsave(g_lock, flags);
1385
1386         peer = kiblnd_find_peer_locked(nid);
1387         if (peer && !list_empty(&peer->ibp_conns)) {
1388                 /* Found a peer with an established connection */
1389                 conn = kiblnd_get_conn_locked(peer);
1390                 kiblnd_conn_addref(conn); /* 1 ref for me... */
1391
1392                 read_unlock_irqrestore(g_lock, flags);
1393
1394                 if (tx)
1395                         kiblnd_queue_tx(tx, conn);
1396                 kiblnd_conn_decref(conn); /* ...to here */
1397                 return;
1398         }
1399
1400         read_unlock(g_lock);
1401         /* Re-try with a write lock */
1402         write_lock(g_lock);
1403
1404         peer = kiblnd_find_peer_locked(nid);
1405         if (peer) {
1406                 if (list_empty(&peer->ibp_conns)) {
1407                         /* found a peer, but it's still connecting... */
1408                         LASSERT(kiblnd_peer_connecting(peer));
1409                         if (tx)
1410                                 list_add_tail(&tx->tx_list,
1411                                               &peer->ibp_tx_queue);
1412                         write_unlock_irqrestore(g_lock, flags);
1413                 } else {
1414                         conn = kiblnd_get_conn_locked(peer);
1415                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1416
1417                         write_unlock_irqrestore(g_lock, flags);
1418
1419                         if (tx)
1420                                 kiblnd_queue_tx(tx, conn);
1421                         kiblnd_conn_decref(conn); /* ...to here */
1422                 }
1423                 return;
1424         }
1425
1426         write_unlock_irqrestore(g_lock, flags);
1427
1428         /* Allocate a peer ready to add to the peer table and retry */
1429         rc = kiblnd_create_peer(ni, &peer, nid);
1430         if (rc) {
1431                 CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1432                 if (tx) {
1433                         tx->tx_status = -EHOSTUNREACH;
1434                         tx->tx_waiting = 0;
1435                         kiblnd_tx_done(ni, tx);
1436                 }
1437                 return;
1438         }
1439
1440         write_lock_irqsave(g_lock, flags);
1441
1442         peer2 = kiblnd_find_peer_locked(nid);
1443         if (peer2) {
1444                 if (list_empty(&peer2->ibp_conns)) {
1445                         /* found a peer, but it's still connecting... */
1446                         LASSERT(kiblnd_peer_connecting(peer2));
1447                         if (tx)
1448                                 list_add_tail(&tx->tx_list,
1449                                               &peer2->ibp_tx_queue);
1450                         write_unlock_irqrestore(g_lock, flags);
1451                 } else {
1452                         conn = kiblnd_get_conn_locked(peer2);
1453                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1454
1455                         write_unlock_irqrestore(g_lock, flags);
1456
1457                         if (tx)
1458                                 kiblnd_queue_tx(tx, conn);
1459                         kiblnd_conn_decref(conn); /* ...to here */
1460                 }
1461
1462                 kiblnd_peer_decref(peer);
1463                 return;
1464         }
1465
1466         /* Brand new peer */
1467         LASSERT(!peer->ibp_connecting);
1468         peer->ibp_connecting = 1;
1469
1470         /* always called with a ref on ni, which prevents ni being shutdown */
1471         LASSERT(!((struct kib_net *)ni->ni_data)->ibn_shutdown);
1472
1473         if (tx)
1474                 list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1475
1476         kiblnd_peer_addref(peer);
1477         list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1478
1479         write_unlock_irqrestore(g_lock, flags);
1480
1481         kiblnd_connect_peer(peer);
1482         kiblnd_peer_decref(peer);
1483 }
1484
1485 int
1486 kiblnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1487 {
1488         lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1489         int type = lntmsg->msg_type;
1490         lnet_process_id_t target = lntmsg->msg_target;
1491         int target_is_router = lntmsg->msg_target_is_router;
1492         int routing = lntmsg->msg_routing;
1493         unsigned int payload_niov = lntmsg->msg_niov;
1494         struct kvec *payload_iov = lntmsg->msg_iov;
1495         lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1496         unsigned int payload_offset = lntmsg->msg_offset;
1497         unsigned int payload_nob = lntmsg->msg_len;
1498         struct iov_iter from;
1499         struct kib_msg *ibmsg;
1500         struct kib_rdma_desc  *rd;
1501         struct kib_tx *tx;
1502         int nob;
1503         int rc;
1504
1505         /* NB 'private' is different depending on what we're sending.... */
1506
1507         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1508                payload_nob, payload_niov, libcfs_id2str(target));
1509
1510         LASSERT(!payload_nob || payload_niov > 0);
1511         LASSERT(payload_niov <= LNET_MAX_IOV);
1512
1513         /* Thread context */
1514         LASSERT(!in_interrupt());
1515         /* payload is either all vaddrs or all pages */
1516         LASSERT(!(payload_kiov && payload_iov));
1517
1518         if (payload_kiov)
1519                 iov_iter_bvec(&from, ITER_BVEC | WRITE,
1520                               payload_kiov, payload_niov,
1521                               payload_nob + payload_offset);
1522         else
1523                 iov_iter_kvec(&from, ITER_KVEC | WRITE,
1524                               payload_iov, payload_niov,
1525                               payload_nob + payload_offset);
1526
1527         iov_iter_advance(&from, payload_offset);
1528
1529         switch (type) {
1530         default:
1531                 LBUG();
1532                 return -EIO;
1533
1534         case LNET_MSG_ACK:
1535                 LASSERT(!payload_nob);
1536                 break;
1537
1538         case LNET_MSG_GET:
1539                 if (routing || target_is_router)
1540                         break;            /* send IMMEDIATE */
1541
1542                 /* is the REPLY message too small for RDMA? */
1543                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1544                 if (nob <= IBLND_MSG_SIZE)
1545                         break;            /* send IMMEDIATE */
1546
1547                 tx = kiblnd_get_idle_tx(ni, target.nid);
1548                 if (!tx) {
1549                         CERROR("Can't allocate txd for GET to %s\n",
1550                                libcfs_nid2str(target.nid));
1551                         return -ENOMEM;
1552                 }
1553
1554                 ibmsg = tx->tx_msg;
1555                 rd = &ibmsg->ibm_u.get.ibgm_rd;
1556                 if (!(lntmsg->msg_md->md_options & LNET_MD_KIOV))
1557                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1558                                                  lntmsg->msg_md->md_niov,
1559                                                  lntmsg->msg_md->md_iov.iov,
1560                                                  0, lntmsg->msg_md->md_length);
1561                 else
1562                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1563                                                   lntmsg->msg_md->md_niov,
1564                                                   lntmsg->msg_md->md_iov.kiov,
1565                                                   0, lntmsg->msg_md->md_length);
1566                 if (rc) {
1567                         CERROR("Can't setup GET sink for %s: %d\n",
1568                                libcfs_nid2str(target.nid), rc);
1569                         kiblnd_tx_done(ni, tx);
1570                         return -EIO;
1571                 }
1572
1573                 nob = offsetof(struct kib_get_msg, ibgm_rd.rd_frags[rd->rd_nfrags]);
1574                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1575                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1576
1577                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1578
1579                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1580                 if (!tx->tx_lntmsg[1]) {
1581                         CERROR("Can't create reply for GET -> %s\n",
1582                                libcfs_nid2str(target.nid));
1583                         kiblnd_tx_done(ni, tx);
1584                         return -EIO;
1585                 }
1586
1587                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1588                 tx->tx_waiting = 1;          /* waiting for GET_DONE */
1589                 kiblnd_launch_tx(ni, tx, target.nid);
1590                 return 0;
1591
1592         case LNET_MSG_REPLY:
1593         case LNET_MSG_PUT:
1594                 /* Is the payload small enough not to need RDMA? */
1595                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob]);
1596                 if (nob <= IBLND_MSG_SIZE)
1597                         break;            /* send IMMEDIATE */
1598
1599                 tx = kiblnd_get_idle_tx(ni, target.nid);
1600                 if (!tx) {
1601                         CERROR("Can't allocate %s txd for %s\n",
1602                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1603                                libcfs_nid2str(target.nid));
1604                         return -ENOMEM;
1605                 }
1606
1607                 if (!payload_kiov)
1608                         rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1609                                                  payload_niov, payload_iov,
1610                                                  payload_offset, payload_nob);
1611                 else
1612                         rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1613                                                   payload_niov, payload_kiov,
1614                                                   payload_offset, payload_nob);
1615                 if (rc) {
1616                         CERROR("Can't setup PUT src for %s: %d\n",
1617                                libcfs_nid2str(target.nid), rc);
1618                         kiblnd_tx_done(ni, tx);
1619                         return -EIO;
1620                 }
1621
1622                 ibmsg = tx->tx_msg;
1623                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1624                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1625                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(struct kib_putreq_msg));
1626
1627                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1628                 tx->tx_waiting = 1;          /* waiting for PUT_{ACK,NAK} */
1629                 kiblnd_launch_tx(ni, tx, target.nid);
1630                 return 0;
1631         }
1632
1633         /* send IMMEDIATE */
1634
1635         LASSERT(offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob])
1636                  <= IBLND_MSG_SIZE);
1637
1638         tx = kiblnd_get_idle_tx(ni, target.nid);
1639         if (!tx) {
1640                 CERROR("Can't send %d to %s: tx descs exhausted\n",
1641                        type, libcfs_nid2str(target.nid));
1642                 return -ENOMEM;
1643         }
1644
1645         ibmsg = tx->tx_msg;
1646         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1647
1648         rc = copy_from_iter(&ibmsg->ibm_u.immediate.ibim_payload, payload_nob,
1649                             &from);
1650         if (rc != payload_nob) {
1651                 kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
1652                 return -EFAULT;
1653         }
1654
1655         nob = offsetof(struct kib_immediate_msg, ibim_payload[payload_nob]);
1656         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1657
1658         tx->tx_lntmsg[0] = lntmsg;            /* finalise lntmsg on completion */
1659         kiblnd_launch_tx(ni, tx, target.nid);
1660         return 0;
1661 }
1662
1663 static void
1664 kiblnd_reply(lnet_ni_t *ni, struct kib_rx *rx, lnet_msg_t *lntmsg)
1665 {
1666         lnet_process_id_t target = lntmsg->msg_target;
1667         unsigned int niov = lntmsg->msg_niov;
1668         struct kvec *iov = lntmsg->msg_iov;
1669         lnet_kiov_t *kiov = lntmsg->msg_kiov;
1670         unsigned int offset = lntmsg->msg_offset;
1671         unsigned int nob = lntmsg->msg_len;
1672         struct kib_tx *tx;
1673         int rc;
1674
1675         tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1676         if (!tx) {
1677                 CERROR("Can't get tx for REPLY to %s\n",
1678                        libcfs_nid2str(target.nid));
1679                 goto failed_0;
1680         }
1681
1682         if (!nob)
1683                 rc = 0;
1684         else if (!kiov)
1685                 rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1686                                          niov, iov, offset, nob);
1687         else
1688                 rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1689                                           niov, kiov, offset, nob);
1690
1691         if (rc) {
1692                 CERROR("Can't setup GET src for %s: %d\n",
1693                        libcfs_nid2str(target.nid), rc);
1694                 goto failed_1;
1695         }
1696
1697         rc = kiblnd_init_rdma(rx->rx_conn, tx,
1698                               IBLND_MSG_GET_DONE, nob,
1699                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1700                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1701         if (rc < 0) {
1702                 CERROR("Can't setup rdma for GET from %s: %d\n",
1703                        libcfs_nid2str(target.nid), rc);
1704                 goto failed_1;
1705         }
1706
1707         if (!nob) {
1708                 /* No RDMA: local completion may happen now! */
1709                 lnet_finalize(ni, lntmsg, 0);
1710         } else {
1711                 /* RDMA: lnet_finalize(lntmsg) when it completes */
1712                 tx->tx_lntmsg[0] = lntmsg;
1713         }
1714
1715         kiblnd_queue_tx(tx, rx->rx_conn);
1716         return;
1717
1718  failed_1:
1719         kiblnd_tx_done(ni, tx);
1720  failed_0:
1721         lnet_finalize(ni, lntmsg, -EIO);
1722 }
1723
1724 int
1725 kiblnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1726             struct iov_iter *to, unsigned int rlen)
1727 {
1728         struct kib_rx *rx = private;
1729         struct kib_msg *rxmsg = rx->rx_msg;
1730         struct kib_conn *conn = rx->rx_conn;
1731         struct kib_tx *tx;
1732         int nob;
1733         int post_credit = IBLND_POSTRX_PEER_CREDIT;
1734         int rc = 0;
1735
1736         LASSERT(iov_iter_count(to) <= rlen);
1737         LASSERT(!in_interrupt());
1738         /* Either all pages or all vaddrs */
1739
1740         switch (rxmsg->ibm_type) {
1741         default:
1742                 LBUG();
1743
1744         case IBLND_MSG_IMMEDIATE:
1745                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[rlen]);
1746                 if (nob > rx->rx_nob) {
1747                         CERROR("Immediate message from %s too big: %d(%d)\n",
1748                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1749                                nob, rx->rx_nob);
1750                         rc = -EPROTO;
1751                         break;
1752                 }
1753
1754                 rc = copy_to_iter(&rxmsg->ibm_u.immediate.ibim_payload, rlen,
1755                                   to);
1756                 if (rc != rlen) {
1757                         rc = -EFAULT;
1758                         break;
1759                 }
1760
1761                 rc = 0;
1762                 lnet_finalize(ni, lntmsg, 0);
1763                 break;
1764
1765         case IBLND_MSG_PUT_REQ: {
1766                 struct kib_msg  *txmsg;
1767                 struct kib_rdma_desc *rd;
1768
1769                 if (!iov_iter_count(to)) {
1770                         lnet_finalize(ni, lntmsg, 0);
1771                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1772                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1773                         break;
1774                 }
1775
1776                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1777                 if (!tx) {
1778                         CERROR("Can't allocate tx for %s\n",
1779                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1780                         /* Not replying will break the connection */
1781                         rc = -ENOMEM;
1782                         break;
1783                 }
1784
1785                 txmsg = tx->tx_msg;
1786                 rd = &txmsg->ibm_u.putack.ibpam_rd;
1787                 if (!(to->type & ITER_BVEC))
1788                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1789                                                  to->nr_segs, to->kvec,
1790                                                  to->iov_offset,
1791                                                  iov_iter_count(to));
1792                 else
1793                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1794                                                   to->nr_segs, to->bvec,
1795                                                   to->iov_offset,
1796                                                   iov_iter_count(to));
1797                 if (rc) {
1798                         CERROR("Can't setup PUT sink for %s: %d\n",
1799                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1800                         kiblnd_tx_done(ni, tx);
1801                         /* tell peer it's over */
1802                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1803                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1804                         break;
1805                 }
1806
1807                 nob = offsetof(struct kib_putack_msg, ibpam_rd.rd_frags[rd->rd_nfrags]);
1808                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1809                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1810
1811                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1812
1813                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1814                 tx->tx_waiting = 1;          /* waiting for PUT_DONE */
1815                 kiblnd_queue_tx(tx, conn);
1816
1817                 /* reposted buffer reserved for PUT_DONE */
1818                 post_credit = IBLND_POSTRX_NO_CREDIT;
1819                 break;
1820                 }
1821
1822         case IBLND_MSG_GET_REQ:
1823                 if (lntmsg) {
1824                         /* Optimized GET; RDMA lntmsg's payload */
1825                         kiblnd_reply(ni, rx, lntmsg);
1826                 } else {
1827                         /* GET didn't match anything */
1828                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1829                                                -ENODATA,
1830                                                rxmsg->ibm_u.get.ibgm_cookie);
1831                 }
1832                 break;
1833         }
1834
1835         kiblnd_post_rx(rx, post_credit);
1836         return rc;
1837 }
1838
1839 int
1840 kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1841 {
1842         struct task_struct *task = kthread_run(fn, arg, "%s", name);
1843
1844         if (IS_ERR(task))
1845                 return PTR_ERR(task);
1846
1847         atomic_inc(&kiblnd_data.kib_nthreads);
1848         return 0;
1849 }
1850
1851 static void
1852 kiblnd_thread_fini(void)
1853 {
1854         atomic_dec(&kiblnd_data.kib_nthreads);
1855 }
1856
1857 static void
1858 kiblnd_peer_alive(struct kib_peer *peer)
1859 {
1860         /* This is racy, but everyone's only writing cfs_time_current() */
1861         peer->ibp_last_alive = cfs_time_current();
1862         mb();
1863 }
1864
1865 static void
1866 kiblnd_peer_notify(struct kib_peer *peer)
1867 {
1868         int error = 0;
1869         unsigned long last_alive = 0;
1870         unsigned long flags;
1871
1872         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1873
1874         if (kiblnd_peer_idle(peer) && peer->ibp_error) {
1875                 error = peer->ibp_error;
1876                 peer->ibp_error = 0;
1877
1878                 last_alive = peer->ibp_last_alive;
1879         }
1880
1881         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1882
1883         if (error)
1884                 lnet_notify(peer->ibp_ni,
1885                             peer->ibp_nid, 0, last_alive);
1886 }
1887
1888 void
1889 kiblnd_close_conn_locked(struct kib_conn *conn, int error)
1890 {
1891         /*
1892          * This just does the immediate housekeeping. 'error' is zero for a
1893          * normal shutdown which can happen only after the connection has been
1894          * established.  If the connection is established, schedule the
1895          * connection to be finished off by the connd. Otherwise the connd is
1896          * already dealing with it (either to set it up or tear it down).
1897          * Caller holds kib_global_lock exclusively in irq context
1898          */
1899         struct kib_peer *peer = conn->ibc_peer;
1900         struct kib_dev *dev;
1901         unsigned long flags;
1902
1903         LASSERT(error || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1904
1905         if (error && !conn->ibc_comms_error)
1906                 conn->ibc_comms_error = error;
1907
1908         if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1909                 return; /* already being handled  */
1910
1911         if (!error &&
1912             list_empty(&conn->ibc_tx_noops) &&
1913             list_empty(&conn->ibc_tx_queue) &&
1914             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1915             list_empty(&conn->ibc_tx_queue_nocred) &&
1916             list_empty(&conn->ibc_active_txs)) {
1917                 CDEBUG(D_NET, "closing conn to %s\n",
1918                        libcfs_nid2str(peer->ibp_nid));
1919         } else {
1920                 CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1921                         libcfs_nid2str(peer->ibp_nid), error,
1922                         list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1923                         list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1924                         list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1925                         list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1926                         list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1927         }
1928
1929         dev = ((struct kib_net *)peer->ibp_ni->ni_data)->ibn_dev;
1930         list_del(&conn->ibc_list);
1931         /* connd (see below) takes over ibc_list's ref */
1932
1933         if (list_empty(&peer->ibp_conns) &&    /* no more conns */
1934             kiblnd_peer_active(peer)) {  /* still in peer table */
1935                 kiblnd_unlink_peer_locked(peer);
1936
1937                 /* set/clear error on last conn */
1938                 peer->ibp_error = conn->ibc_comms_error;
1939         }
1940
1941         kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1942
1943         if (error &&
1944             kiblnd_dev_can_failover(dev)) {
1945                 list_add_tail(&dev->ibd_fail_list,
1946                               &kiblnd_data.kib_failed_devs);
1947                 wake_up(&kiblnd_data.kib_failover_waitq);
1948         }
1949
1950         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1951
1952         list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1953         wake_up(&kiblnd_data.kib_connd_waitq);
1954
1955         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1956 }
1957
1958 void
1959 kiblnd_close_conn(struct kib_conn *conn, int error)
1960 {
1961         unsigned long flags;
1962
1963         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1964
1965         kiblnd_close_conn_locked(conn, error);
1966
1967         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1968 }
1969
1970 static void
1971 kiblnd_handle_early_rxs(struct kib_conn *conn)
1972 {
1973         unsigned long flags;
1974         struct kib_rx *rx;
1975         struct kib_rx *tmp;
1976
1977         LASSERT(!in_interrupt());
1978         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1979
1980         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1981         list_for_each_entry_safe(rx, tmp, &conn->ibc_early_rxs, rx_list) {
1982                 list_del(&rx->rx_list);
1983                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1984
1985                 kiblnd_handle_rx(rx);
1986
1987                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1988         }
1989         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1990 }
1991
1992 static void
1993 kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
1994 {
1995         LIST_HEAD(zombies);
1996         struct list_head *tmp;
1997         struct list_head *nxt;
1998         struct kib_tx *tx;
1999
2000         spin_lock(&conn->ibc_lock);
2001
2002         list_for_each_safe(tmp, nxt, txs) {
2003                 tx = list_entry(tmp, struct kib_tx, tx_list);
2004
2005                 if (txs == &conn->ibc_active_txs) {
2006                         LASSERT(!tx->tx_queued);
2007                         LASSERT(tx->tx_waiting || tx->tx_sending);
2008                 } else {
2009                         LASSERT(tx->tx_queued);
2010                 }
2011
2012                 tx->tx_status = -ECONNABORTED;
2013                 tx->tx_waiting = 0;
2014
2015                 if (!tx->tx_sending) {
2016                         tx->tx_queued = 0;
2017                         list_del(&tx->tx_list);
2018                         list_add(&tx->tx_list, &zombies);
2019                 }
2020         }
2021
2022         spin_unlock(&conn->ibc_lock);
2023
2024         kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
2025 }
2026
2027 static void
2028 kiblnd_finalise_conn(struct kib_conn *conn)
2029 {
2030         LASSERT(!in_interrupt());
2031         LASSERT(conn->ibc_state > IBLND_CONN_INIT);
2032
2033         kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2034
2035         /*
2036          * abort_receives moves QP state to IB_QPS_ERR.  This is only required
2037          * for connections that didn't get as far as being connected, because
2038          * rdma_disconnect() does this for free.
2039          */
2040         kiblnd_abort_receives(conn);
2041
2042         /*
2043          * Complete all tx descs not waiting for sends to complete.
2044          * NB we should be safe from RDMA now that the QP has changed state
2045          */
2046         kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2047         kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2048         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2049         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2050         kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2051
2052         kiblnd_handle_early_rxs(conn);
2053 }
2054
2055 static void
2056 kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error)
2057 {
2058         LIST_HEAD(zombies);
2059         unsigned long flags;
2060
2061         LASSERT(error);
2062         LASSERT(!in_interrupt());
2063
2064         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2065
2066         if (active) {
2067                 LASSERT(peer->ibp_connecting > 0);
2068                 peer->ibp_connecting--;
2069         } else {
2070                 LASSERT(peer->ibp_accepting > 0);
2071                 peer->ibp_accepting--;
2072         }
2073
2074         if (kiblnd_peer_connecting(peer)) {
2075                 /* another connection attempt under way... */
2076                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2077                                         flags);
2078                 return;
2079         }
2080
2081         peer->ibp_reconnected = 0;
2082         if (list_empty(&peer->ibp_conns)) {
2083                 /* Take peer's blocked transmits to complete with error */
2084                 list_add(&zombies, &peer->ibp_tx_queue);
2085                 list_del_init(&peer->ibp_tx_queue);
2086
2087                 if (kiblnd_peer_active(peer))
2088                         kiblnd_unlink_peer_locked(peer);
2089
2090                 peer->ibp_error = error;
2091         } else {
2092                 /* Can't have blocked transmits if there are connections */
2093                 LASSERT(list_empty(&peer->ibp_tx_queue));
2094         }
2095
2096         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2097
2098         kiblnd_peer_notify(peer);
2099
2100         if (list_empty(&zombies))
2101                 return;
2102
2103         CNETERR("Deleting messages for %s: connection failed\n",
2104                 libcfs_nid2str(peer->ibp_nid));
2105
2106         kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2107 }
2108
2109 static void
2110 kiblnd_connreq_done(struct kib_conn *conn, int status)
2111 {
2112         struct kib_peer *peer = conn->ibc_peer;
2113         struct kib_tx *tx;
2114         struct kib_tx *tmp;
2115         struct list_head txs;
2116         unsigned long flags;
2117         int active;
2118
2119         active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2120
2121         CDEBUG(D_NET, "%s: active(%d), version(%x), status(%d)\n",
2122                libcfs_nid2str(peer->ibp_nid), active,
2123                conn->ibc_version, status);
2124
2125         LASSERT(!in_interrupt());
2126         LASSERT((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2127                  peer->ibp_connecting > 0) ||
2128                  (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2129                  peer->ibp_accepting > 0));
2130
2131         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2132         conn->ibc_connvars = NULL;
2133
2134         if (status) {
2135                 /* failed to establish connection */
2136                 kiblnd_peer_connect_failed(peer, active, status);
2137                 kiblnd_finalise_conn(conn);
2138                 return;
2139         }
2140
2141         /* connection established */
2142         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2143
2144         conn->ibc_last_send = jiffies;
2145         kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2146         kiblnd_peer_alive(peer);
2147
2148         /*
2149          * Add conn to peer's list and nuke any dangling conns from a different
2150          * peer instance...
2151          */
2152         kiblnd_conn_addref(conn);              /* +1 ref for ibc_list */
2153         list_add(&conn->ibc_list, &peer->ibp_conns);
2154         peer->ibp_reconnected = 0;
2155         if (active)
2156                 peer->ibp_connecting--;
2157         else
2158                 peer->ibp_accepting--;
2159
2160         if (!peer->ibp_version) {
2161                 peer->ibp_version     = conn->ibc_version;
2162                 peer->ibp_incarnation = conn->ibc_incarnation;
2163         }
2164
2165         if (peer->ibp_version     != conn->ibc_version ||
2166             peer->ibp_incarnation != conn->ibc_incarnation) {
2167                 kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2168                                                 conn->ibc_incarnation);
2169                 peer->ibp_version     = conn->ibc_version;
2170                 peer->ibp_incarnation = conn->ibc_incarnation;
2171         }
2172
2173         /* grab pending txs while I have the lock */
2174         list_add(&txs, &peer->ibp_tx_queue);
2175         list_del_init(&peer->ibp_tx_queue);
2176
2177         if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2178             conn->ibc_comms_error) {       /* error has happened already */
2179                 lnet_ni_t *ni = peer->ibp_ni;
2180
2181                 /* start to shut down connection */
2182                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
2183                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2184
2185                 kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2186
2187                 return;
2188         }
2189
2190         /*
2191          * +1 ref for myself, this connection is visible to other threads
2192          * now, refcount of peer:ibp_conns can be released by connection
2193          * close from either a different thread, or the calling of
2194          * kiblnd_check_sends_locked() below. See bz21911 for details.
2195          */
2196         kiblnd_conn_addref(conn);
2197         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2198
2199         /* Schedule blocked txs */
2200         spin_lock(&conn->ibc_lock);
2201         list_for_each_entry_safe(tx, tmp, &txs, tx_list) {
2202                 list_del(&tx->tx_list);
2203
2204                 kiblnd_queue_tx_locked(tx, conn);
2205         }
2206         kiblnd_check_sends_locked(conn);
2207         spin_unlock(&conn->ibc_lock);
2208
2209         /* schedule blocked rxs */
2210         kiblnd_handle_early_rxs(conn);
2211
2212         kiblnd_conn_decref(conn);
2213 }
2214
2215 static void
2216 kiblnd_reject(struct rdma_cm_id *cmid, struct kib_rej *rej)
2217 {
2218         int rc;
2219
2220         rc = rdma_reject(cmid, rej, sizeof(*rej));
2221
2222         if (rc)
2223                 CWARN("Error %d sending reject\n", rc);
2224 }
2225
2226 static int
2227 kiblnd_passive_connect(struct rdma_cm_id *cmid, void *priv, int priv_nob)
2228 {
2229         rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
2230         struct kib_msg *reqmsg = priv;
2231         struct kib_msg *ackmsg;
2232         struct kib_dev *ibdev;
2233         struct kib_peer *peer;
2234         struct kib_peer *peer2;
2235         struct kib_conn *conn;
2236         lnet_ni_t *ni  = NULL;
2237         struct kib_net *net = NULL;
2238         lnet_nid_t nid;
2239         struct rdma_conn_param cp;
2240         struct kib_rej rej;
2241         int version = IBLND_MSG_VERSION;
2242         unsigned long flags;
2243         int max_frags;
2244         int rc;
2245         struct sockaddr_in *peer_addr;
2246
2247         LASSERT(!in_interrupt());
2248
2249         /* cmid inherits 'context' from the corresponding listener id */
2250         ibdev = (struct kib_dev *)cmid->context;
2251         LASSERT(ibdev);
2252
2253         memset(&rej, 0, sizeof(rej));
2254         rej.ibr_magic = IBLND_MSG_MAGIC;
2255         rej.ibr_why = IBLND_REJECT_FATAL;
2256         rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2257
2258         peer_addr = (struct sockaddr_in *)&cmid->route.addr.dst_addr;
2259         if (*kiblnd_tunables.kib_require_priv_port &&
2260             ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2261                 __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2262
2263                 CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2264                        &ip, ntohs(peer_addr->sin_port));
2265                 goto failed;
2266         }
2267
2268         if (priv_nob < offsetof(struct kib_msg, ibm_type)) {
2269                 CERROR("Short connection request\n");
2270                 goto failed;
2271         }
2272
2273         /*
2274          * Future protocol version compatibility support!  If the
2275          * o2iblnd-specific protocol changes, or when LNET unifies
2276          * protocols over all LNDs, the initial connection will
2277          * negotiate a protocol version.  I trap this here to avoid
2278          * console errors; the reject tells the peer which protocol I
2279          * speak.
2280          */
2281         if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2282             reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2283                 goto failed;
2284         if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2285             reqmsg->ibm_version != IBLND_MSG_VERSION &&
2286             reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2287                 goto failed;
2288         if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2289             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2290             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2291                 goto failed;
2292
2293         rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2294         if (rc) {
2295                 CERROR("Can't parse connection request: %d\n", rc);
2296                 goto failed;
2297         }
2298
2299         nid = reqmsg->ibm_srcnid;
2300         ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2301
2302         if (ni) {
2303                 net = (struct kib_net *)ni->ni_data;
2304                 rej.ibr_incarnation = net->ibn_incarnation;
2305         }
2306
2307         if (!ni ||                       /* no matching net */
2308             ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2309             net->ibn_dev != ibdev) {          /* wrong device */
2310                 CERROR("Can't accept conn from %s on %s (%s:%d:%pI4h): bad dst nid %s\n",
2311                        libcfs_nid2str(nid),
2312                        !ni ? "NA" : libcfs_nid2str(ni->ni_nid),
2313                        ibdev->ibd_ifname, ibdev->ibd_nnets,
2314                        &ibdev->ibd_ifip,
2315                        libcfs_nid2str(reqmsg->ibm_dstnid));
2316
2317                 goto failed;
2318         }
2319
2320        /* check time stamp as soon as possible */
2321         if (reqmsg->ibm_dststamp &&
2322             reqmsg->ibm_dststamp != net->ibn_incarnation) {
2323                 CWARN("Stale connection request\n");
2324                 rej.ibr_why = IBLND_REJECT_CONN_STALE;
2325                 goto failed;
2326         }
2327
2328         /* I can accept peer's version */
2329         version = reqmsg->ibm_version;
2330
2331         if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2332                 CERROR("Unexpected connreq msg type: %x from %s\n",
2333                        reqmsg->ibm_type, libcfs_nid2str(nid));
2334                 goto failed;
2335         }
2336
2337         if (reqmsg->ibm_u.connparams.ibcp_queue_depth >
2338             kiblnd_msg_queue_size(version, ni)) {
2339                 CERROR("Can't accept conn from %s, queue depth too large: %d (<=%d wanted)\n",
2340                        libcfs_nid2str(nid),
2341                        reqmsg->ibm_u.connparams.ibcp_queue_depth,
2342                        kiblnd_msg_queue_size(version, ni));
2343
2344                 if (version == IBLND_MSG_VERSION)
2345                         rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2346
2347                 goto failed;
2348         }
2349
2350         max_frags = reqmsg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2351         if (max_frags > kiblnd_rdma_frags(version, ni)) {
2352                 CWARN("Can't accept conn from %s (version %x): max message size %d is too large (%d wanted)\n",
2353                       libcfs_nid2str(nid), version, max_frags,
2354                       kiblnd_rdma_frags(version, ni));
2355
2356                 if (version >= IBLND_MSG_VERSION)
2357                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2358
2359                 goto failed;
2360         } else if (max_frags < kiblnd_rdma_frags(version, ni) &&
2361                    !net->ibn_fmr_ps) {
2362                 CWARN("Can't accept conn from %s (version %x): max message size %d incompatible without FMR pool (%d wanted)\n",
2363                       libcfs_nid2str(nid), version, max_frags,
2364                       kiblnd_rdma_frags(version, ni));
2365
2366                 if (version == IBLND_MSG_VERSION)
2367                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2368
2369                 goto failed;
2370         }
2371
2372         if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2373                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2374                        libcfs_nid2str(nid),
2375                        reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2376                        IBLND_MSG_SIZE);
2377                 goto failed;
2378         }
2379
2380         /* assume 'nid' is a new peer; create  */
2381         rc = kiblnd_create_peer(ni, &peer, nid);
2382         if (rc) {
2383                 CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2384                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2385                 goto failed;
2386         }
2387
2388         /* We have validated the peer's parameters so use those */
2389         peer->ibp_max_frags = max_frags;
2390         peer->ibp_queue_depth = reqmsg->ibm_u.connparams.ibcp_queue_depth;
2391
2392         write_lock_irqsave(g_lock, flags);
2393
2394         peer2 = kiblnd_find_peer_locked(nid);
2395         if (peer2) {
2396                 if (!peer2->ibp_version) {
2397                         peer2->ibp_version     = version;
2398                         peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2399                 }
2400
2401                 /* not the guy I've talked with */
2402                 if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2403                     peer2->ibp_version     != version) {
2404                         kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2405
2406                         if (kiblnd_peer_active(peer2)) {
2407                                 peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2408                                 peer2->ibp_version = version;
2409                         }
2410                         write_unlock_irqrestore(g_lock, flags);
2411
2412                         CWARN("Conn stale %s version %x/%x incarnation %llu/%llu\n",
2413                               libcfs_nid2str(nid), peer2->ibp_version, version,
2414                               peer2->ibp_incarnation, reqmsg->ibm_srcstamp);
2415
2416                         kiblnd_peer_decref(peer);
2417                         rej.ibr_why = IBLND_REJECT_CONN_STALE;
2418                         goto failed;
2419                 }
2420
2421                 /*
2422                  * Tie-break connection race in favour of the higher NID.
2423                  * If we keep running into a race condition multiple times,
2424                  * we have to assume that the connection attempt with the
2425                  * higher NID is stuck in a connecting state and will never
2426                  * recover.  As such, we pass through this if-block and let
2427                  * the lower NID connection win so we can move forward.
2428                  */
2429                 if (peer2->ibp_connecting &&
2430                     nid < ni->ni_nid && peer2->ibp_races <
2431                     MAX_CONN_RACES_BEFORE_ABORT) {
2432                         peer2->ibp_races++;
2433                         write_unlock_irqrestore(g_lock, flags);
2434
2435                         CDEBUG(D_NET, "Conn race %s\n",
2436                                libcfs_nid2str(peer2->ibp_nid));
2437
2438                         kiblnd_peer_decref(peer);
2439                         rej.ibr_why = IBLND_REJECT_CONN_RACE;
2440                         goto failed;
2441                 }
2442                 if (peer2->ibp_races >= MAX_CONN_RACES_BEFORE_ABORT)
2443                         CNETERR("Conn race %s: unresolved after %d attempts, letting lower NID win\n",
2444                                 libcfs_nid2str(peer2->ibp_nid),
2445                                 MAX_CONN_RACES_BEFORE_ABORT);
2446                 /**
2447                  * passive connection is allowed even this peer is waiting for
2448                  * reconnection.
2449                  */
2450                 peer2->ibp_reconnecting = 0;
2451                 peer2->ibp_races = 0;
2452                 peer2->ibp_accepting++;
2453                 kiblnd_peer_addref(peer2);
2454
2455                 /**
2456                  * Race with kiblnd_launch_tx (active connect) to create peer
2457                  * so copy validated parameters since we now know what the
2458                  * peer's limits are
2459                  */
2460                 peer2->ibp_max_frags = peer->ibp_max_frags;
2461                 peer2->ibp_queue_depth = peer->ibp_queue_depth;
2462
2463                 write_unlock_irqrestore(g_lock, flags);
2464                 kiblnd_peer_decref(peer);
2465                 peer = peer2;
2466         } else {
2467                 /* Brand new peer */
2468                 LASSERT(!peer->ibp_accepting);
2469                 LASSERT(!peer->ibp_version &&
2470                         !peer->ibp_incarnation);
2471
2472                 peer->ibp_accepting   = 1;
2473                 peer->ibp_version     = version;
2474                 peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2475
2476                 /* I have a ref on ni that prevents it being shutdown */
2477                 LASSERT(!net->ibn_shutdown);
2478
2479                 kiblnd_peer_addref(peer);
2480                 list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2481
2482                 write_unlock_irqrestore(g_lock, flags);
2483         }
2484
2485         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT,
2486                                   version);
2487         if (!conn) {
2488                 kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2489                 kiblnd_peer_decref(peer);
2490                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2491                 goto failed;
2492         }
2493
2494         /*
2495          * conn now "owns" cmid, so I return success from here on to ensure the
2496          * CM callback doesn't destroy cmid.
2497          */
2498         conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2499         conn->ibc_credits          = conn->ibc_queue_depth;
2500         conn->ibc_reserved_credits = conn->ibc_queue_depth;
2501         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2502                 IBLND_OOB_MSGS(version) <= IBLND_RX_MSGS(conn));
2503
2504         ackmsg = &conn->ibc_connvars->cv_msg;
2505         memset(ackmsg, 0, sizeof(*ackmsg));
2506
2507         kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2508                         sizeof(ackmsg->ibm_u.connparams));
2509         ackmsg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2510         ackmsg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2511         ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2512
2513         kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2514
2515         memset(&cp, 0, sizeof(cp));
2516         cp.private_data = ackmsg;
2517         cp.private_data_len = ackmsg->ibm_nob;
2518         cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2519         cp.initiator_depth = 0;
2520         cp.flow_control = 1;
2521         cp.retry_count = *kiblnd_tunables.kib_retry_count;
2522         cp.rnr_retry_count = *kiblnd_tunables.kib_rnr_retry_count;
2523
2524         CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2525
2526         rc = rdma_accept(cmid, &cp);
2527         if (rc) {
2528                 CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2529                 rej.ibr_version = version;
2530                 rej.ibr_why     = IBLND_REJECT_FATAL;
2531
2532                 kiblnd_reject(cmid, &rej);
2533                 kiblnd_connreq_done(conn, rc);
2534                 kiblnd_conn_decref(conn);
2535         }
2536
2537         lnet_ni_decref(ni);
2538         return 0;
2539
2540  failed:
2541         if (ni) {
2542                 rej.ibr_cp.ibcp_queue_depth = kiblnd_msg_queue_size(version, ni);
2543                 rej.ibr_cp.ibcp_max_frags = kiblnd_rdma_frags(version, ni);
2544                 lnet_ni_decref(ni);
2545         }
2546
2547         rej.ibr_version             = version;
2548         kiblnd_reject(cmid, &rej);
2549
2550         return -ECONNREFUSED;
2551 }
2552
2553 static void
2554 kiblnd_check_reconnect(struct kib_conn *conn, int version,
2555                        __u64 incarnation, int why, struct kib_connparams *cp)
2556 {
2557         rwlock_t *glock = &kiblnd_data.kib_global_lock;
2558         struct kib_peer *peer = conn->ibc_peer;
2559         char *reason;
2560         int msg_size = IBLND_MSG_SIZE;
2561         int frag_num = -1;
2562         int queue_dep = -1;
2563         bool reconnect;
2564         unsigned long flags;
2565
2566         LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2567         LASSERT(peer->ibp_connecting > 0);     /* 'conn' at least */
2568         LASSERT(!peer->ibp_reconnecting);
2569
2570         if (cp) {
2571                 msg_size = cp->ibcp_max_msg_size;
2572                 frag_num        = cp->ibcp_max_frags << IBLND_FRAG_SHIFT;
2573                 queue_dep = cp->ibcp_queue_depth;
2574         }
2575
2576         write_lock_irqsave(glock, flags);
2577         /**
2578          * retry connection if it's still needed and no other connection
2579          * attempts (active or passive) are in progress
2580          * NB: reconnect is still needed even when ibp_tx_queue is
2581          * empty if ibp_version != version because reconnect may be
2582          * initiated by kiblnd_query()
2583          */
2584         reconnect = (!list_empty(&peer->ibp_tx_queue) ||
2585                      peer->ibp_version != version) &&
2586                     peer->ibp_connecting == 1 &&
2587                     !peer->ibp_accepting;
2588         if (!reconnect) {
2589                 reason = "no need";
2590                 goto out;
2591         }
2592
2593         switch (why) {
2594         default:
2595                 reason = "Unknown";
2596                 break;
2597
2598         case IBLND_REJECT_RDMA_FRAGS: {
2599                 struct lnet_ioctl_config_lnd_tunables *tunables;
2600
2601                 if (!cp) {
2602                         reason = "can't negotiate max frags";
2603                         goto out;
2604                 }
2605                 tunables = peer->ibp_ni->ni_lnd_tunables;
2606                 if (!tunables->lt_tun_u.lt_o2ib.lnd_map_on_demand) {
2607                         reason = "map_on_demand must be enabled";
2608                         goto out;
2609                 }
2610                 if (conn->ibc_max_frags <= frag_num) {
2611                         reason = "unsupported max frags";
2612                         goto out;
2613                 }
2614
2615                 peer->ibp_max_frags = frag_num;
2616                 reason = "rdma fragments";
2617                 break;
2618         }
2619         case IBLND_REJECT_MSG_QUEUE_SIZE:
2620                 if (!cp) {
2621                         reason = "can't negotiate queue depth";
2622                         goto out;
2623                 }
2624                 if (conn->ibc_queue_depth <= queue_dep) {
2625                         reason = "unsupported queue depth";
2626                         goto out;
2627                 }
2628
2629                 peer->ibp_queue_depth = queue_dep;
2630                 reason = "queue depth";
2631                 break;
2632
2633         case IBLND_REJECT_CONN_STALE:
2634                 reason = "stale";
2635                 break;
2636
2637         case IBLND_REJECT_CONN_RACE:
2638                 reason = "conn race";
2639                 break;
2640
2641         case IBLND_REJECT_CONN_UNCOMPAT:
2642                 reason = "version negotiation";
2643                 break;
2644         }
2645
2646         conn->ibc_reconnect = 1;
2647         peer->ibp_reconnecting = 1;
2648         peer->ibp_version = version;
2649         if (incarnation)
2650                 peer->ibp_incarnation = incarnation;
2651 out:
2652         write_unlock_irqrestore(glock, flags);
2653
2654         CNETERR("%s: %s (%s), %x, %x, msg_size: %d, queue_depth: %d/%d, max_frags: %d/%d\n",
2655                 libcfs_nid2str(peer->ibp_nid),
2656                 reconnect ? "reconnect" : "don't reconnect",
2657                 reason, IBLND_MSG_VERSION, version, msg_size,
2658                 conn->ibc_queue_depth, queue_dep,
2659                 conn->ibc_max_frags, frag_num);
2660         /**
2661          * if conn::ibc_reconnect is TRUE, connd will reconnect to the peer
2662          * while destroying the zombie
2663          */
2664 }
2665
2666 static void
2667 kiblnd_rejected(struct kib_conn *conn, int reason, void *priv, int priv_nob)
2668 {
2669         struct kib_peer *peer = conn->ibc_peer;
2670
2671         LASSERT(!in_interrupt());
2672         LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2673
2674         switch (reason) {
2675         case IB_CM_REJ_STALE_CONN:
2676                 kiblnd_check_reconnect(conn, IBLND_MSG_VERSION, 0,
2677                                        IBLND_REJECT_CONN_STALE, NULL);
2678                 break;
2679
2680         case IB_CM_REJ_INVALID_SERVICE_ID:
2681                 CNETERR("%s rejected: no listener at %d\n",
2682                         libcfs_nid2str(peer->ibp_nid),
2683                         *kiblnd_tunables.kib_service);
2684                 break;
2685
2686         case IB_CM_REJ_CONSUMER_DEFINED:
2687                 if (priv_nob >= offsetof(struct kib_rej, ibr_padding)) {
2688                         struct kib_rej *rej = priv;
2689                         struct kib_connparams *cp = NULL;
2690                         int flip = 0;
2691                         __u64 incarnation = -1;
2692
2693                         /* NB. default incarnation is -1 because:
2694                          * a) V1 will ignore dst incarnation in connreq.
2695                          * b) V2 will provide incarnation while rejecting me,
2696                          *    -1 will be overwrote.
2697                          *
2698                          * if I try to connect to a V1 peer with V2 protocol,
2699                          * it rejected me then upgrade to V2, I have no idea
2700                          * about the upgrading and try to reconnect with V1,
2701                          * in this case upgraded V2 can find out I'm trying to
2702                          * talk to the old guy and reject me(incarnation is -1).
2703                          */
2704
2705                         if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2706                             rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2707                                 __swab32s(&rej->ibr_magic);
2708                                 __swab16s(&rej->ibr_version);
2709                                 flip = 1;
2710                         }
2711
2712                         if (priv_nob >= sizeof(struct kib_rej) &&
2713                             rej->ibr_version > IBLND_MSG_VERSION_1) {
2714                                 /*
2715                                  * priv_nob is always 148 in current version
2716                                  * of OFED, so we still need to check version.
2717                                  * (define of IB_CM_REJ_PRIVATE_DATA_SIZE)
2718                                  */
2719                                 cp = &rej->ibr_cp;
2720
2721                                 if (flip) {
2722                                         __swab64s(&rej->ibr_incarnation);
2723                                         __swab16s(&cp->ibcp_queue_depth);
2724                                         __swab16s(&cp->ibcp_max_frags);
2725                                         __swab32s(&cp->ibcp_max_msg_size);
2726                                 }
2727
2728                                 incarnation = rej->ibr_incarnation;
2729                         }
2730
2731                         if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2732                             rej->ibr_magic != LNET_PROTO_MAGIC) {
2733                                 CERROR("%s rejected: consumer defined fatal error\n",
2734                                        libcfs_nid2str(peer->ibp_nid));
2735                                 break;
2736                         }
2737
2738                         if (rej->ibr_version != IBLND_MSG_VERSION &&
2739                             rej->ibr_version != IBLND_MSG_VERSION_1) {
2740                                 CERROR("%s rejected: o2iblnd version %x error\n",
2741                                        libcfs_nid2str(peer->ibp_nid),
2742                                        rej->ibr_version);
2743                                 break;
2744                         }
2745
2746                         if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2747                             rej->ibr_version == IBLND_MSG_VERSION_1) {
2748                                 CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2749                                        libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2750
2751                                 if (conn->ibc_version != IBLND_MSG_VERSION_1)
2752                                         rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2753                         }
2754
2755                         switch (rej->ibr_why) {
2756                         case IBLND_REJECT_CONN_RACE:
2757                         case IBLND_REJECT_CONN_STALE:
2758                         case IBLND_REJECT_CONN_UNCOMPAT:
2759                         case IBLND_REJECT_MSG_QUEUE_SIZE:
2760                         case IBLND_REJECT_RDMA_FRAGS:
2761                                 kiblnd_check_reconnect(conn, rej->ibr_version,
2762                                                        incarnation,
2763                                                        rej->ibr_why, cp);
2764                                 break;
2765
2766                         case IBLND_REJECT_NO_RESOURCES:
2767                                 CERROR("%s rejected: o2iblnd no resources\n",
2768                                        libcfs_nid2str(peer->ibp_nid));
2769                                 break;
2770
2771                         case IBLND_REJECT_FATAL:
2772                                 CERROR("%s rejected: o2iblnd fatal error\n",
2773                                        libcfs_nid2str(peer->ibp_nid));
2774                                 break;
2775
2776                         default:
2777                                 CERROR("%s rejected: o2iblnd reason %d\n",
2778                                        libcfs_nid2str(peer->ibp_nid),
2779                                        rej->ibr_why);
2780                                 break;
2781                         }
2782                         break;
2783                 }
2784                 /* fall through */
2785         default:
2786                 CNETERR("%s rejected: reason %d, size %d\n",
2787                         libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2788                 break;
2789         }
2790
2791         kiblnd_connreq_done(conn, -ECONNREFUSED);
2792 }
2793
2794 static void
2795 kiblnd_check_connreply(struct kib_conn *conn, void *priv, int priv_nob)
2796 {
2797         struct kib_peer *peer = conn->ibc_peer;
2798         lnet_ni_t *ni = peer->ibp_ni;
2799         struct kib_net *net = ni->ni_data;
2800         struct kib_msg *msg = priv;
2801         int ver = conn->ibc_version;
2802         int rc = kiblnd_unpack_msg(msg, priv_nob);
2803         unsigned long flags;
2804
2805         LASSERT(net);
2806
2807         if (rc) {
2808                 CERROR("Can't unpack connack from %s: %d\n",
2809                        libcfs_nid2str(peer->ibp_nid), rc);
2810                 goto failed;
2811         }
2812
2813         if (msg->ibm_type != IBLND_MSG_CONNACK) {
2814                 CERROR("Unexpected message %d from %s\n",
2815                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2816                 rc = -EPROTO;
2817                 goto failed;
2818         }
2819
2820         if (ver != msg->ibm_version) {
2821                 CERROR("%s replied version %x is different with requested version %x\n",
2822                        libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2823                 rc = -EPROTO;
2824                 goto failed;
2825         }
2826
2827         if (msg->ibm_u.connparams.ibcp_queue_depth >
2828             conn->ibc_queue_depth) {
2829                 CERROR("%s has incompatible queue depth %d (<=%d wanted)\n",
2830                        libcfs_nid2str(peer->ibp_nid),
2831                        msg->ibm_u.connparams.ibcp_queue_depth,
2832                        conn->ibc_queue_depth);
2833                 rc = -EPROTO;
2834                 goto failed;
2835         }
2836
2837         if ((msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT) >
2838             conn->ibc_max_frags) {
2839                 CERROR("%s has incompatible max_frags %d (<=%d wanted)\n",
2840                        libcfs_nid2str(peer->ibp_nid),
2841                        msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT,
2842                        conn->ibc_max_frags);
2843                 rc = -EPROTO;
2844                 goto failed;
2845         }
2846
2847         if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2848                 CERROR("%s max message size %d too big (%d max)\n",
2849                        libcfs_nid2str(peer->ibp_nid),
2850                        msg->ibm_u.connparams.ibcp_max_msg_size,
2851                        IBLND_MSG_SIZE);
2852                 rc = -EPROTO;
2853                 goto failed;
2854         }
2855
2856         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2857         if (msg->ibm_dstnid == ni->ni_nid &&
2858             msg->ibm_dststamp == net->ibn_incarnation)
2859                 rc = 0;
2860         else
2861                 rc = -ESTALE;
2862         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2863
2864         if (rc) {
2865                 CERROR("Bad connection reply from %s, rc = %d, version: %x max_frags: %d\n",
2866                        libcfs_nid2str(peer->ibp_nid), rc,
2867                        msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2868                 goto failed;
2869         }
2870
2871         conn->ibc_incarnation = msg->ibm_srcstamp;
2872         conn->ibc_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2873         conn->ibc_reserved_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2874         conn->ibc_queue_depth = msg->ibm_u.connparams.ibcp_queue_depth;
2875         conn->ibc_max_frags = msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2876         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2877                 IBLND_OOB_MSGS(ver) <= IBLND_RX_MSGS(conn));
2878
2879         kiblnd_connreq_done(conn, 0);
2880         return;
2881
2882  failed:
2883         /*
2884          * NB My QP has already established itself, so I handle anything going
2885          * wrong here by setting ibc_comms_error.
2886          * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2887          * immediately tears it down.
2888          */
2889         LASSERT(rc);
2890         conn->ibc_comms_error = rc;
2891         kiblnd_connreq_done(conn, 0);
2892 }
2893
2894 static int
2895 kiblnd_active_connect(struct rdma_cm_id *cmid)
2896 {
2897         struct kib_peer *peer = (struct kib_peer *)cmid->context;
2898         struct kib_conn *conn;
2899         struct kib_msg *msg;
2900         struct rdma_conn_param cp;
2901         int version;
2902         __u64 incarnation;
2903         unsigned long flags;
2904         int rc;
2905
2906         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2907
2908         incarnation = peer->ibp_incarnation;
2909         version = !peer->ibp_version ? IBLND_MSG_VERSION :
2910                                        peer->ibp_version;
2911
2912         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2913
2914         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT,
2915                                   version);
2916         if (!conn) {
2917                 kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2918                 kiblnd_peer_decref(peer); /* lose cmid's ref */
2919                 return -ENOMEM;
2920         }
2921
2922         /*
2923          * conn "owns" cmid now, so I return success from here on to ensure the
2924          * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2925          * on peer
2926          */
2927         msg = &conn->ibc_connvars->cv_msg;
2928
2929         memset(msg, 0, sizeof(*msg));
2930         kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2931         msg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2932         msg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2933         msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2934
2935         kiblnd_pack_msg(peer->ibp_ni, msg, version,
2936                         0, peer->ibp_nid, incarnation);
2937
2938         memset(&cp, 0, sizeof(cp));
2939         cp.private_data = msg;
2940         cp.private_data_len    = msg->ibm_nob;
2941         cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2942         cp.initiator_depth     = 0;
2943         cp.flow_control        = 1;
2944         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2945         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2946
2947         LASSERT(cmid->context == (void *)conn);
2948         LASSERT(conn->ibc_cmid == cmid);
2949
2950         rc = rdma_connect(cmid, &cp);
2951         if (rc) {
2952                 CERROR("Can't connect to %s: %d\n",
2953                        libcfs_nid2str(peer->ibp_nid), rc);
2954                 kiblnd_connreq_done(conn, rc);
2955                 kiblnd_conn_decref(conn);
2956         }
2957
2958         return 0;
2959 }
2960
2961 int
2962 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2963 {
2964         struct kib_peer *peer;
2965         struct kib_conn *conn;
2966         int rc;
2967
2968         switch (event->event) {
2969         default:
2970                 CERROR("Unexpected event: %d, status: %d\n",
2971                        event->event, event->status);
2972                 LBUG();
2973
2974         case RDMA_CM_EVENT_CONNECT_REQUEST:
2975                 /* destroy cmid on failure */
2976                 rc = kiblnd_passive_connect(cmid,
2977                                             (void *)KIBLND_CONN_PARAM(event),
2978                                             KIBLND_CONN_PARAM_LEN(event));
2979                 CDEBUG(D_NET, "connreq: %d\n", rc);
2980                 return rc;
2981
2982         case RDMA_CM_EVENT_ADDR_ERROR:
2983                 peer = (struct kib_peer *)cmid->context;
2984                 CNETERR("%s: ADDR ERROR %d\n",
2985                         libcfs_nid2str(peer->ibp_nid), event->status);
2986                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2987                 kiblnd_peer_decref(peer);
2988                 return -EHOSTUNREACH;      /* rc destroys cmid */
2989
2990         case RDMA_CM_EVENT_ADDR_RESOLVED:
2991                 peer = (struct kib_peer *)cmid->context;
2992
2993                 CDEBUG(D_NET, "%s Addr resolved: %d\n",
2994                        libcfs_nid2str(peer->ibp_nid), event->status);
2995
2996                 if (event->status) {
2997                         CNETERR("Can't resolve address for %s: %d\n",
2998                                 libcfs_nid2str(peer->ibp_nid), event->status);
2999                         rc = event->status;
3000                 } else {
3001                         rc = rdma_resolve_route(
3002                                 cmid, *kiblnd_tunables.kib_timeout * 1000);
3003                         if (!rc) {
3004                                 struct kib_net *net = peer->ibp_ni->ni_data;
3005                                 struct kib_dev *dev = net->ibn_dev;
3006
3007                                 CDEBUG(D_NET, "%s: connection bound to "\
3008                                        "%s:%pI4h:%s\n",
3009                                        libcfs_nid2str(peer->ibp_nid),
3010                                        dev->ibd_ifname,
3011                                        &dev->ibd_ifip, cmid->device->name);
3012
3013                                 return 0;
3014                         }
3015
3016                         /* Can't initiate route resolution */
3017                         CERROR("Can't resolve route for %s: %d\n",
3018                                libcfs_nid2str(peer->ibp_nid), rc);
3019                 }
3020                 kiblnd_peer_connect_failed(peer, 1, rc);
3021                 kiblnd_peer_decref(peer);
3022                 return rc;                    /* rc destroys cmid */
3023
3024         case RDMA_CM_EVENT_ROUTE_ERROR:
3025                 peer = (struct kib_peer *)cmid->context;
3026                 CNETERR("%s: ROUTE ERROR %d\n",
3027                         libcfs_nid2str(peer->ibp_nid), event->status);
3028                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
3029                 kiblnd_peer_decref(peer);
3030                 return -EHOSTUNREACH;      /* rc destroys cmid */
3031
3032         case RDMA_CM_EVENT_ROUTE_RESOLVED:
3033                 peer = (struct kib_peer *)cmid->context;
3034                 CDEBUG(D_NET, "%s Route resolved: %d\n",
3035                        libcfs_nid2str(peer->ibp_nid), event->status);
3036
3037                 if (!event->status)
3038                         return kiblnd_active_connect(cmid);
3039
3040                 CNETERR("Can't resolve route for %s: %d\n",
3041                         libcfs_nid2str(peer->ibp_nid), event->status);
3042                 kiblnd_peer_connect_failed(peer, 1, event->status);
3043                 kiblnd_peer_decref(peer);
3044                 return event->status;      /* rc destroys cmid */
3045
3046         case RDMA_CM_EVENT_UNREACHABLE:
3047                 conn = (struct kib_conn *)cmid->context;
3048                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3049                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3050                 CNETERR("%s: UNREACHABLE %d\n",
3051                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3052                 kiblnd_connreq_done(conn, -ENETDOWN);
3053                 kiblnd_conn_decref(conn);
3054                 return 0;
3055
3056         case RDMA_CM_EVENT_CONNECT_ERROR:
3057                 conn = (struct kib_conn *)cmid->context;
3058                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3059                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3060                 CNETERR("%s: CONNECT ERROR %d\n",
3061                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3062                 kiblnd_connreq_done(conn, -ENOTCONN);
3063                 kiblnd_conn_decref(conn);
3064                 return 0;
3065
3066         case RDMA_CM_EVENT_REJECTED:
3067                 conn = (struct kib_conn *)cmid->context;
3068                 switch (conn->ibc_state) {
3069                 default:
3070                         LBUG();
3071
3072                 case IBLND_CONN_PASSIVE_WAIT:
3073                         CERROR("%s: REJECTED %d\n",
3074                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
3075                                event->status);
3076                         kiblnd_connreq_done(conn, -ECONNRESET);
3077                         break;
3078
3079                 case IBLND_CONN_ACTIVE_CONNECT:
3080                         kiblnd_rejected(conn, event->status,
3081                                         (void *)KIBLND_CONN_PARAM(event),
3082                                         KIBLND_CONN_PARAM_LEN(event));
3083                         break;
3084                 }
3085                 kiblnd_conn_decref(conn);
3086                 return 0;
3087
3088         case RDMA_CM_EVENT_ESTABLISHED:
3089                 conn = (struct kib_conn *)cmid->context;
3090                 switch (conn->ibc_state) {
3091                 default:
3092                         LBUG();
3093
3094                 case IBLND_CONN_PASSIVE_WAIT:
3095                         CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
3096                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3097                         kiblnd_connreq_done(conn, 0);
3098                         break;
3099
3100                 case IBLND_CONN_ACTIVE_CONNECT:
3101                         CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
3102                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3103                         kiblnd_check_connreply(conn,
3104                                                (void *)KIBLND_CONN_PARAM(event),
3105                                                KIBLND_CONN_PARAM_LEN(event));
3106                         break;
3107                 }
3108                 /* net keeps its ref on conn! */
3109                 return 0;
3110
3111         case RDMA_CM_EVENT_TIMEWAIT_EXIT:
3112                 CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
3113                 return 0;
3114         case RDMA_CM_EVENT_DISCONNECTED:
3115                 conn = (struct kib_conn *)cmid->context;
3116                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
3117                         CERROR("%s DISCONNECTED\n",
3118                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3119                         kiblnd_connreq_done(conn, -ECONNRESET);
3120                 } else {
3121                         kiblnd_close_conn(conn, 0);
3122                 }
3123                 kiblnd_conn_decref(conn);
3124                 cmid->context = NULL;
3125                 return 0;
3126
3127         case RDMA_CM_EVENT_DEVICE_REMOVAL:
3128                 LCONSOLE_ERROR_MSG(0x131,
3129                                    "Received notification of device removal\n"
3130                                    "Please shutdown LNET to allow this to proceed\n");
3131                 /*
3132                  * Can't remove network from underneath LNET for now, so I have
3133                  * to ignore this
3134                  */
3135                 return 0;
3136
3137         case RDMA_CM_EVENT_ADDR_CHANGE:
3138                 LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
3139                 return 0;
3140         }
3141 }
3142
3143 static int
3144 kiblnd_check_txs_locked(struct kib_conn *conn, struct list_head *txs)
3145 {
3146         struct kib_tx *tx;
3147         struct list_head *ttmp;
3148
3149         list_for_each(ttmp, txs) {
3150                 tx = list_entry(ttmp, struct kib_tx, tx_list);
3151
3152                 if (txs != &conn->ibc_active_txs) {
3153                         LASSERT(tx->tx_queued);
3154                 } else {
3155                         LASSERT(!tx->tx_queued);
3156                         LASSERT(tx->tx_waiting || tx->tx_sending);
3157                 }
3158
3159                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
3160                         CERROR("Timed out tx: %s, %lu seconds\n",
3161                                kiblnd_queue2str(conn, txs),
3162                                cfs_duration_sec(jiffies - tx->tx_deadline));
3163                         return 1;
3164                 }
3165         }
3166
3167         return 0;
3168 }
3169
3170 static int
3171 kiblnd_conn_timed_out_locked(struct kib_conn *conn)
3172 {
3173         return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3174                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3175                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3176                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3177                 kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3178 }
3179
3180 static void
3181 kiblnd_check_conns(int idx)
3182 {
3183         LIST_HEAD(closes);
3184         LIST_HEAD(checksends);
3185         struct list_head *peers = &kiblnd_data.kib_peers[idx];
3186         struct list_head *ptmp;
3187         struct kib_peer *peer;
3188         struct kib_conn *conn;
3189         struct kib_conn *temp;
3190         struct kib_conn *tmp;
3191         struct list_head *ctmp;
3192         unsigned long flags;
3193
3194         /*
3195          * NB. We expect to have a look at all the peers and not find any
3196          * RDMAs to time out, so we just use a shared lock while we
3197          * take a look...
3198          */
3199         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3200
3201         list_for_each(ptmp, peers) {
3202                 peer = list_entry(ptmp, struct kib_peer, ibp_list);
3203
3204                 list_for_each(ctmp, &peer->ibp_conns) {
3205                         int timedout;
3206                         int sendnoop;
3207
3208                         conn = list_entry(ctmp, struct kib_conn, ibc_list);
3209
3210                         LASSERT(conn->ibc_state == IBLND_CONN_ESTABLISHED);
3211
3212                         spin_lock(&conn->ibc_lock);
3213
3214                         sendnoop = kiblnd_need_noop(conn);
3215                         timedout = kiblnd_conn_timed_out_locked(conn);
3216                         if (!sendnoop && !timedout) {
3217                                 spin_unlock(&conn->ibc_lock);
3218                                 continue;
3219                         }
3220
3221                         if (timedout) {
3222                                 CERROR("Timed out RDMA with %s (%lu): c: %u, oc: %u, rc: %u\n",
3223                                        libcfs_nid2str(peer->ibp_nid),
3224                                        cfs_duration_sec(cfs_time_current() -
3225                                                         peer->ibp_last_alive),
3226                                        conn->ibc_credits,
3227                                        conn->ibc_outstanding_credits,
3228                                        conn->ibc_reserved_credits);
3229                                 list_add(&conn->ibc_connd_list, &closes);
3230                         } else {
3231                                 list_add(&conn->ibc_connd_list, &checksends);
3232                         }
3233                         /* +ref for 'closes' or 'checksends' */
3234                         kiblnd_conn_addref(conn);
3235
3236                         spin_unlock(&conn->ibc_lock);
3237                 }
3238         }
3239
3240         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3241
3242         /*
3243          * Handle timeout by closing the whole
3244          * connection. We can only be sure RDMA activity
3245          * has ceased once the QP has been modified.
3246          */
3247         list_for_each_entry_safe(conn, tmp, &closes, ibc_connd_list) {
3248                 list_del(&conn->ibc_connd_list);
3249                 kiblnd_close_conn(conn, -ETIMEDOUT);
3250                 kiblnd_conn_decref(conn);
3251         }
3252
3253         /*
3254          * In case we have enough credits to return via a
3255          * NOOP, but there were no non-blocking tx descs
3256          * free to do it last time...
3257          */
3258         list_for_each_entry_safe(conn, temp, &checksends, ibc_connd_list) {
3259                 list_del(&conn->ibc_connd_list);
3260
3261                 spin_lock(&conn->ibc_lock);
3262                 kiblnd_check_sends_locked(conn);
3263                 spin_unlock(&conn->ibc_lock);
3264
3265                 kiblnd_conn_decref(conn);
3266         }
3267 }
3268
3269 static void
3270 kiblnd_disconnect_conn(struct kib_conn *conn)
3271 {
3272         LASSERT(!in_interrupt());
3273         LASSERT(current == kiblnd_data.kib_connd);
3274         LASSERT(conn->ibc_state == IBLND_CONN_CLOSING);
3275
3276         rdma_disconnect(conn->ibc_cmid);
3277         kiblnd_finalise_conn(conn);
3278
3279         kiblnd_peer_notify(conn->ibc_peer);
3280 }
3281
3282 /**
3283  * High-water for reconnection to the same peer, reconnection attempt should
3284  * be delayed after trying more than KIB_RECONN_HIGH_RACE.
3285  */
3286 #define KIB_RECONN_HIGH_RACE    10
3287 /**
3288  * Allow connd to take a break and handle other things after consecutive
3289  * reconnection attemps.
3290  */
3291 #define KIB_RECONN_BREAK        100
3292
3293 int
3294 kiblnd_connd(void *arg)
3295 {
3296         spinlock_t *lock= &kiblnd_data.kib_connd_lock;
3297         wait_queue_t wait;
3298         unsigned long flags;
3299         struct kib_conn *conn;
3300         int timeout;
3301         int i;
3302         int dropped_lock;
3303         int peer_index = 0;
3304         unsigned long deadline = jiffies;
3305
3306         cfs_block_allsigs();
3307
3308         init_waitqueue_entry(&wait, current);
3309         kiblnd_data.kib_connd = current;
3310
3311         spin_lock_irqsave(lock, flags);
3312
3313         while (!kiblnd_data.kib_shutdown) {
3314                 int reconn = 0;
3315
3316                 dropped_lock = 0;
3317
3318                 if (!list_empty(&kiblnd_data.kib_connd_zombies)) {
3319                         struct kib_peer *peer = NULL;
3320
3321                         conn = list_entry(kiblnd_data.kib_connd_zombies.next,
3322                                           struct kib_conn, ibc_list);
3323                         list_del(&conn->ibc_list);
3324                         if (conn->ibc_reconnect) {
3325                                 peer = conn->ibc_peer;
3326                                 kiblnd_peer_addref(peer);
3327                         }
3328
3329                         spin_unlock_irqrestore(lock, flags);
3330                         dropped_lock = 1;
3331
3332                         kiblnd_destroy_conn(conn);
3333
3334                         spin_lock_irqsave(lock, flags);
3335                         if (!peer) {
3336                                 kfree(conn);
3337                                 continue;
3338                         }
3339
3340                         conn->ibc_peer = peer;
3341                         if (peer->ibp_reconnected < KIB_RECONN_HIGH_RACE)
3342                                 list_add_tail(&conn->ibc_list,
3343                                               &kiblnd_data.kib_reconn_list);
3344                         else
3345                                 list_add_tail(&conn->ibc_list,
3346                                               &kiblnd_data.kib_reconn_wait);
3347                 }
3348
3349                 if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3350                         conn = list_entry(kiblnd_data.kib_connd_conns.next,
3351                                           struct kib_conn, ibc_list);
3352                         list_del(&conn->ibc_list);
3353
3354                         spin_unlock_irqrestore(lock, flags);
3355                         dropped_lock = 1;
3356
3357                         kiblnd_disconnect_conn(conn);
3358                         kiblnd_conn_decref(conn);
3359
3360                         spin_lock_irqsave(lock, flags);
3361                 }
3362
3363                 while (reconn < KIB_RECONN_BREAK) {
3364                         if (kiblnd_data.kib_reconn_sec !=
3365                             ktime_get_real_seconds()) {
3366                                 kiblnd_data.kib_reconn_sec = ktime_get_real_seconds();
3367                                 list_splice_init(&kiblnd_data.kib_reconn_wait,
3368                                                  &kiblnd_data.kib_reconn_list);
3369                         }
3370
3371                         if (list_empty(&kiblnd_data.kib_reconn_list))
3372                                 break;
3373
3374                         conn = list_entry(kiblnd_data.kib_reconn_list.next,
3375                                           struct kib_conn, ibc_list);
3376                         list_del(&conn->ibc_list);
3377
3378                         spin_unlock_irqrestore(lock, flags);
3379                         dropped_lock = 1;
3380
3381                         reconn += kiblnd_reconnect_peer(conn->ibc_peer);
3382                         kiblnd_peer_decref(conn->ibc_peer);
3383                         LIBCFS_FREE(conn, sizeof(*conn));
3384
3385                         spin_lock_irqsave(lock, flags);
3386                 }
3387
3388                 /* careful with the jiffy wrap... */
3389                 timeout = (int)(deadline - jiffies);
3390                 if (timeout <= 0) {
3391                         const int n = 4;
3392                         const int p = 1;
3393                         int chunk = kiblnd_data.kib_peer_hash_size;
3394
3395                         spin_unlock_irqrestore(lock, flags);
3396                         dropped_lock = 1;
3397
3398                         /*
3399                          * Time to check for RDMA timeouts on a few more
3400                          * peers: I do checks every 'p' seconds on a
3401                          * proportion of the peer table and I need to check
3402                          * every connection 'n' times within a timeout
3403                          * interval, to ensure I detect a timeout on any
3404                          * connection within (n+1)/n times the timeout
3405                          * interval.
3406                          */
3407                         if (*kiblnd_tunables.kib_timeout > n * p)
3408                                 chunk = (chunk * n * p) /
3409                                         *kiblnd_tunables.kib_timeout;
3410                         if (!chunk)
3411                                 chunk = 1;
3412
3413                         for (i = 0; i < chunk; i++) {
3414                                 kiblnd_check_conns(peer_index);
3415                                 peer_index = (peer_index + 1) %
3416                                              kiblnd_data.kib_peer_hash_size;
3417                         }
3418
3419                         deadline += msecs_to_jiffies(p * MSEC_PER_SEC);
3420                         spin_lock_irqsave(lock, flags);
3421                 }
3422
3423                 if (dropped_lock)
3424                         continue;
3425
3426                 /* Nothing to do for 'timeout'  */
3427                 set_current_state(TASK_INTERRUPTIBLE);
3428                 add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3429                 spin_unlock_irqrestore(lock, flags);
3430
3431                 schedule_timeout(timeout);
3432
3433                 remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3434                 spin_lock_irqsave(lock, flags);
3435         }
3436
3437         spin_unlock_irqrestore(lock, flags);
3438
3439         kiblnd_thread_fini();
3440         return 0;
3441 }
3442
3443 void
3444 kiblnd_qp_event(struct ib_event *event, void *arg)
3445 {
3446         struct kib_conn *conn = arg;
3447
3448         switch (event->event) {
3449         case IB_EVENT_COMM_EST:
3450                 CDEBUG(D_NET, "%s established\n",
3451                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3452                 /*
3453                  * We received a packet but connection isn't established
3454                  * probably handshake packet was lost, so free to
3455                  * force make connection established
3456                  */
3457                 rdma_notify(conn->ibc_cmid, IB_EVENT_COMM_EST);
3458                 return;
3459
3460         default:
3461                 CERROR("%s: Async QP event type %d\n",
3462                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3463                 return;
3464         }
3465 }
3466
3467 static void
3468 kiblnd_complete(struct ib_wc *wc)
3469 {
3470         switch (kiblnd_wreqid2type(wc->wr_id)) {
3471         default:
3472                 LBUG();
3473
3474         case IBLND_WID_MR:
3475                 if (wc->status != IB_WC_SUCCESS &&
3476                     wc->status != IB_WC_WR_FLUSH_ERR)
3477                         CNETERR("FastReg failed: %d\n", wc->status);
3478                 break;
3479
3480         case IBLND_WID_RDMA:
3481                 /*
3482                  * We only get RDMA completion notification if it fails.  All
3483                  * subsequent work items, including the final SEND will fail
3484                  * too.  However we can't print out any more info about the
3485                  * failing RDMA because 'tx' might be back on the idle list or
3486                  * even reused already if we didn't manage to post all our work
3487                  * items
3488                  */
3489                 CNETERR("RDMA (tx: %p) failed: %d\n",
3490                         kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3491                 return;
3492
3493         case IBLND_WID_TX:
3494                 kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3495                 return;
3496
3497         case IBLND_WID_RX:
3498                 kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3499                                    wc->byte_len);
3500                 return;
3501         }
3502 }
3503
3504 void
3505 kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3506 {
3507         /*
3508          * NB I'm not allowed to schedule this conn once its refcount has
3509          * reached 0.  Since fundamentally I'm racing with scheduler threads
3510          * consuming my CQ I could be called after all completions have
3511          * occurred.  But in this case, !ibc_nrx && !ibc_nsends_posted
3512          * and this CQ is about to be destroyed so I NOOP.
3513          */
3514         struct kib_conn *conn = arg;
3515         struct kib_sched_info *sched = conn->ibc_sched;
3516         unsigned long flags;
3517
3518         LASSERT(cq == conn->ibc_cq);
3519
3520         spin_lock_irqsave(&sched->ibs_lock, flags);
3521
3522         conn->ibc_ready = 1;
3523
3524         if (!conn->ibc_scheduled &&
3525             (conn->ibc_nrx > 0 ||
3526              conn->ibc_nsends_posted > 0)) {
3527                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3528                 conn->ibc_scheduled = 1;
3529                 list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3530
3531                 if (waitqueue_active(&sched->ibs_waitq))
3532                         wake_up(&sched->ibs_waitq);
3533         }
3534
3535         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3536 }
3537
3538 void
3539 kiblnd_cq_event(struct ib_event *event, void *arg)
3540 {
3541         struct kib_conn *conn = arg;
3542
3543         CERROR("%s: async CQ event type %d\n",
3544                libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3545 }
3546
3547 int
3548 kiblnd_scheduler(void *arg)
3549 {
3550         long id = (long)arg;
3551         struct kib_sched_info *sched;
3552         struct kib_conn *conn;
3553         wait_queue_t wait;
3554         unsigned long flags;
3555         struct ib_wc wc;
3556         int did_something;
3557         int busy_loops = 0;
3558         int rc;
3559
3560         cfs_block_allsigs();
3561
3562         init_waitqueue_entry(&wait, current);
3563
3564         sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3565
3566         rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3567         if (rc) {
3568                 CWARN("Failed to bind on CPT %d, please verify whether all CPUs are healthy and reload modules if necessary, otherwise your system might under risk of low performance\n",
3569                       sched->ibs_cpt);
3570         }
3571
3572         spin_lock_irqsave(&sched->ibs_lock, flags);
3573
3574         while (!kiblnd_data.kib_shutdown) {
3575                 if (busy_loops++ >= IBLND_RESCHED) {
3576                         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3577
3578                         cond_resched();
3579                         busy_loops = 0;
3580
3581                         spin_lock_irqsave(&sched->ibs_lock, flags);
3582                 }
3583
3584                 did_something = 0;
3585
3586                 if (!list_empty(&sched->ibs_conns)) {
3587                         conn = list_entry(sched->ibs_conns.next, struct kib_conn,
3588                                           ibc_sched_list);
3589                         /* take over kib_sched_conns' ref on conn... */
3590                         LASSERT(conn->ibc_scheduled);
3591                         list_del(&conn->ibc_sched_list);
3592                         conn->ibc_ready = 0;
3593
3594                         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3595
3596                         wc.wr_id = IBLND_WID_INVAL;
3597
3598                         rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3599                         if (!rc) {
3600                                 rc = ib_req_notify_cq(conn->ibc_cq,
3601                                                       IB_CQ_NEXT_COMP);
3602                                 if (rc < 0) {
3603                                         CWARN("%s: ib_req_notify_cq failed: %d, closing connection\n",
3604                                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3605                                         kiblnd_close_conn(conn, -EIO);
3606                                         kiblnd_conn_decref(conn);
3607                                         spin_lock_irqsave(&sched->ibs_lock,
3608                                                           flags);
3609                                         continue;
3610                                 }
3611
3612                                 rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3613                         }
3614
3615                         if (unlikely(rc > 0 && wc.wr_id == IBLND_WID_INVAL)) {
3616                                 LCONSOLE_ERROR("ib_poll_cq (rc: %d) returned invalid wr_id, opcode %d, status: %d, vendor_err: %d, conn: %s status: %d\nplease upgrade firmware and OFED or contact vendor.\n",
3617                                                rc, wc.opcode, wc.status,
3618                                                wc.vendor_err,
3619                                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
3620                                                conn->ibc_state);
3621                                 rc = -EINVAL;
3622                         }
3623
3624                         if (rc < 0) {
3625                                 CWARN("%s: ib_poll_cq failed: %d, closing connection\n",
3626                                       libcfs_nid2str(conn->ibc_peer->ibp_nid),
3627                                       rc);
3628                                 kiblnd_close_conn(conn, -EIO);
3629                                 kiblnd_conn_decref(conn);
3630                                 spin_lock_irqsave(&sched->ibs_lock, flags);
3631                                 continue;
3632                         }
3633
3634                         spin_lock_irqsave(&sched->ibs_lock, flags);
3635
3636                         if (rc || conn->ibc_ready) {
3637                                 /*
3638                                  * There may be another completion waiting; get
3639                                  * another scheduler to check while I handle
3640                                  * this one...
3641                                  */
3642                                 /* +1 ref for sched_conns */
3643                                 kiblnd_conn_addref(conn);
3644                                 list_add_tail(&conn->ibc_sched_list,
3645                                               &sched->ibs_conns);
3646                                 if (waitqueue_active(&sched->ibs_waitq))
3647                                         wake_up(&sched->ibs_waitq);
3648                         } else {
3649                                 conn->ibc_scheduled = 0;
3650                         }
3651
3652                         if (rc) {
3653                                 spin_unlock_irqrestore(&sched->ibs_lock, flags);
3654                                 kiblnd_complete(&wc);
3655
3656                                 spin_lock_irqsave(&sched->ibs_lock, flags);
3657                         }
3658
3659                         kiblnd_conn_decref(conn); /* ...drop my ref from above */
3660                         did_something = 1;
3661                 }
3662
3663                 if (did_something)
3664                         continue;
3665
3666                 set_current_state(TASK_INTERRUPTIBLE);
3667                 add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3668                 spin_unlock_irqrestore(&sched->ibs_lock, flags);
3669
3670                 schedule();
3671                 busy_loops = 0;
3672
3673                 remove_wait_queue(&sched->ibs_waitq, &wait);
3674                 spin_lock_irqsave(&sched->ibs_lock, flags);
3675         }
3676
3677         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3678
3679         kiblnd_thread_fini();
3680         return 0;
3681 }
3682
3683 int
3684 kiblnd_failover_thread(void *arg)
3685 {
3686         rwlock_t *glock = &kiblnd_data.kib_global_lock;
3687         struct kib_dev *dev;
3688         wait_queue_t wait;
3689         unsigned long flags;
3690         int rc;
3691
3692         LASSERT(*kiblnd_tunables.kib_dev_failover);
3693
3694         cfs_block_allsigs();
3695
3696         init_waitqueue_entry(&wait, current);
3697         write_lock_irqsave(glock, flags);
3698
3699         while (!kiblnd_data.kib_shutdown) {
3700                 int do_failover = 0;
3701                 int long_sleep;
3702
3703                 list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3704                                     ibd_fail_list) {
3705                         if (time_before(cfs_time_current(),
3706                                         dev->ibd_next_failover))
3707                                 continue;
3708                         do_failover = 1;
3709                         break;
3710                 }
3711
3712                 if (do_failover) {
3713                         list_del_init(&dev->ibd_fail_list);
3714                         dev->ibd_failover = 1;
3715                         write_unlock_irqrestore(glock, flags);
3716
3717                         rc = kiblnd_dev_failover(dev);
3718
3719                         write_lock_irqsave(glock, flags);
3720
3721                         LASSERT(dev->ibd_failover);
3722                         dev->ibd_failover = 0;
3723                         if (rc >= 0) { /* Device is OK or failover succeed */
3724                                 dev->ibd_next_failover = cfs_time_shift(3);
3725                                 continue;
3726                         }
3727
3728                         /* failed to failover, retry later */
3729                         dev->ibd_next_failover =
3730                                 cfs_time_shift(min(dev->ibd_failed_failover, 10));
3731                         if (kiblnd_dev_can_failover(dev)) {
3732                                 list_add_tail(&dev->ibd_fail_list,
3733                                               &kiblnd_data.kib_failed_devs);
3734                         }
3735
3736                         continue;
3737                 }
3738
3739                 /* long sleep if no more pending failover */
3740                 long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3741
3742                 set_current_state(TASK_INTERRUPTIBLE);
3743                 add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3744                 write_unlock_irqrestore(glock, flags);
3745
3746                 rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3747                                                    cfs_time_seconds(1));
3748                 remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3749                 write_lock_irqsave(glock, flags);
3750
3751                 if (!long_sleep || rc)
3752                         continue;
3753
3754                 /*
3755                  * have a long sleep, routine check all active devices,
3756                  * we need checking like this because if there is not active
3757                  * connection on the dev and no SEND from local, we may listen
3758                  * on wrong HCA for ever while there is a bonding failover
3759                  */
3760                 list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3761                         if (kiblnd_dev_can_failover(dev)) {
3762                                 list_add_tail(&dev->ibd_fail_list,
3763                                               &kiblnd_data.kib_failed_devs);
3764                         }
3765                 }
3766         }
3767
3768         write_unlock_irqrestore(glock, flags);
3769
3770         kiblnd_thread_fini();
3771         return 0;
3772 }