GNU Linux-libre 4.9-gnu1
[releases.git] / drivers / staging / lustre / lnet / klnds / o2iblnd / o2iblnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/o2iblnd/o2iblnd_cb.c
33  *
34  * Author: Eric Barton <eric@bartonsoftware.com>
35  */
36
37 #include "o2iblnd.h"
38
39 #define MAX_CONN_RACES_BEFORE_ABORT 20
40
41 static void kiblnd_peer_alive(struct kib_peer *peer);
42 static void kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error);
43 static void kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx,
44                                int type, int body_nob);
45 static int kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
46                             int resid, struct kib_rdma_desc *dstrd,
47                             __u64 dstcookie);
48 static void kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn);
49 static void kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn);
50 static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx);
51 static void kiblnd_check_sends_locked(struct kib_conn *conn);
52
53 static void
54 kiblnd_tx_done(lnet_ni_t *ni, struct kib_tx *tx)
55 {
56         lnet_msg_t *lntmsg[2];
57         struct kib_net *net = ni->ni_data;
58         int rc;
59         int i;
60
61         LASSERT(net);
62         LASSERT(!in_interrupt());
63         LASSERT(!tx->tx_queued);               /* mustn't be queued for sending */
64         LASSERT(!tx->tx_sending);         /* mustn't be awaiting sent callback */
65         LASSERT(!tx->tx_waiting);             /* mustn't be awaiting peer response */
66         LASSERT(tx->tx_pool);
67
68         kiblnd_unmap_tx(ni, tx);
69
70         /* tx may have up to 2 lnet msgs to finalise */
71         lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
72         lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
73         rc = tx->tx_status;
74
75         if (tx->tx_conn) {
76                 LASSERT(ni == tx->tx_conn->ibc_peer->ibp_ni);
77
78                 kiblnd_conn_decref(tx->tx_conn);
79                 tx->tx_conn = NULL;
80         }
81
82         tx->tx_nwrq = 0;
83         tx->tx_status = 0;
84
85         kiblnd_pool_free_node(&tx->tx_pool->tpo_pool, &tx->tx_list);
86
87         /* delay finalize until my descs have been freed */
88         for (i = 0; i < 2; i++) {
89                 if (!lntmsg[i])
90                         continue;
91
92                 lnet_finalize(ni, lntmsg[i], rc);
93         }
94 }
95
96 void
97 kiblnd_txlist_done(lnet_ni_t *ni, struct list_head *txlist, int status)
98 {
99         struct kib_tx *tx;
100
101         while (!list_empty(txlist)) {
102                 tx = list_entry(txlist->next, struct kib_tx, tx_list);
103
104                 list_del(&tx->tx_list);
105                 /* complete now */
106                 tx->tx_waiting = 0;
107                 tx->tx_status = status;
108                 kiblnd_tx_done(ni, tx);
109         }
110 }
111
112 static struct kib_tx *
113 kiblnd_get_idle_tx(lnet_ni_t *ni, lnet_nid_t target)
114 {
115         struct kib_net *net = (struct kib_net *)ni->ni_data;
116         struct list_head *node;
117         struct kib_tx *tx;
118         struct kib_tx_poolset *tps;
119
120         tps = net->ibn_tx_ps[lnet_cpt_of_nid(target)];
121         node = kiblnd_pool_alloc_node(&tps->tps_poolset);
122         if (!node)
123                 return NULL;
124         tx = list_entry(node, struct kib_tx, tx_list);
125
126         LASSERT(!tx->tx_nwrq);
127         LASSERT(!tx->tx_queued);
128         LASSERT(!tx->tx_sending);
129         LASSERT(!tx->tx_waiting);
130         LASSERT(!tx->tx_status);
131         LASSERT(!tx->tx_conn);
132         LASSERT(!tx->tx_lntmsg[0]);
133         LASSERT(!tx->tx_lntmsg[1]);
134         LASSERT(!tx->tx_nfrags);
135
136         return tx;
137 }
138
139 static void
140 kiblnd_drop_rx(struct kib_rx *rx)
141 {
142         struct kib_conn *conn = rx->rx_conn;
143         struct kib_sched_info *sched = conn->ibc_sched;
144         unsigned long flags;
145
146         spin_lock_irqsave(&sched->ibs_lock, flags);
147         LASSERT(conn->ibc_nrx > 0);
148         conn->ibc_nrx--;
149         spin_unlock_irqrestore(&sched->ibs_lock, flags);
150
151         kiblnd_conn_decref(conn);
152 }
153
154 int
155 kiblnd_post_rx(struct kib_rx *rx, int credit)
156 {
157         struct kib_conn *conn = rx->rx_conn;
158         struct kib_net *net = conn->ibc_peer->ibp_ni->ni_data;
159         struct ib_recv_wr *bad_wrq = NULL;
160         struct ib_mr *mr = conn->ibc_hdev->ibh_mrs;
161         int rc;
162
163         LASSERT(net);
164         LASSERT(!in_interrupt());
165         LASSERT(credit == IBLND_POSTRX_NO_CREDIT ||
166                 credit == IBLND_POSTRX_PEER_CREDIT ||
167                 credit == IBLND_POSTRX_RSRVD_CREDIT);
168         LASSERT(mr);
169
170         rx->rx_sge.lkey   = mr->lkey;
171         rx->rx_sge.addr   = rx->rx_msgaddr;
172         rx->rx_sge.length = IBLND_MSG_SIZE;
173
174         rx->rx_wrq.next    = NULL;
175         rx->rx_wrq.sg_list = &rx->rx_sge;
176         rx->rx_wrq.num_sge = 1;
177         rx->rx_wrq.wr_id   = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);
178
179         LASSERT(conn->ibc_state >= IBLND_CONN_INIT);
180         LASSERT(rx->rx_nob >= 0);             /* not posted */
181
182         if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {
183                 kiblnd_drop_rx(rx);          /* No more posts for this rx */
184                 return 0;
185         }
186
187         rx->rx_nob = -1;                        /* flag posted */
188
189         /* NB: need an extra reference after ib_post_recv because we don't
190          * own this rx (and rx::rx_conn) anymore, LU-5678.
191          */
192         kiblnd_conn_addref(conn);
193         rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);
194         if (unlikely(rc)) {
195                 CERROR("Can't post rx for %s: %d, bad_wrq: %p\n",
196                        libcfs_nid2str(conn->ibc_peer->ibp_nid), rc, bad_wrq);
197                 rx->rx_nob = 0;
198         }
199
200         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */
201                 goto out;
202
203         if (unlikely(rc)) {
204                 kiblnd_close_conn(conn, rc);
205                 kiblnd_drop_rx(rx);          /* No more posts for this rx */
206                 goto out;
207         }
208
209         if (credit == IBLND_POSTRX_NO_CREDIT)
210                 goto out;
211
212         spin_lock(&conn->ibc_lock);
213         if (credit == IBLND_POSTRX_PEER_CREDIT)
214                 conn->ibc_outstanding_credits++;
215         else
216                 conn->ibc_reserved_credits++;
217         kiblnd_check_sends_locked(conn);
218         spin_unlock(&conn->ibc_lock);
219
220 out:
221         kiblnd_conn_decref(conn);
222         return rc;
223 }
224
225 static struct kib_tx *
226 kiblnd_find_waiting_tx_locked(struct kib_conn *conn, int txtype, __u64 cookie)
227 {
228         struct list_head *tmp;
229
230         list_for_each(tmp, &conn->ibc_active_txs) {
231                 struct kib_tx *tx = list_entry(tmp, struct kib_tx, tx_list);
232
233                 LASSERT(!tx->tx_queued);
234                 LASSERT(tx->tx_sending || tx->tx_waiting);
235
236                 if (tx->tx_cookie != cookie)
237                         continue;
238
239                 if (tx->tx_waiting &&
240                     tx->tx_msg->ibm_type == txtype)
241                         return tx;
242
243                 CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",
244                       tx->tx_waiting ? "" : "NOT ",
245                       tx->tx_msg->ibm_type, txtype);
246         }
247         return NULL;
248 }
249
250 static void
251 kiblnd_handle_completion(struct kib_conn *conn, int txtype, int status, __u64 cookie)
252 {
253         struct kib_tx *tx;
254         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
255         int idle;
256
257         spin_lock(&conn->ibc_lock);
258
259         tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);
260         if (!tx) {
261                 spin_unlock(&conn->ibc_lock);
262
263                 CWARN("Unmatched completion type %x cookie %#llx from %s\n",
264                       txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));
265                 kiblnd_close_conn(conn, -EPROTO);
266                 return;
267         }
268
269         if (!tx->tx_status) {          /* success so far */
270                 if (status < 0) /* failed? */
271                         tx->tx_status = status;
272                 else if (txtype == IBLND_MSG_GET_REQ)
273                         lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);
274         }
275
276         tx->tx_waiting = 0;
277
278         idle = !tx->tx_queued && !tx->tx_sending;
279         if (idle)
280                 list_del(&tx->tx_list);
281
282         spin_unlock(&conn->ibc_lock);
283
284         if (idle)
285                 kiblnd_tx_done(ni, tx);
286 }
287
288 static void
289 kiblnd_send_completion(struct kib_conn *conn, int type, int status, __u64 cookie)
290 {
291         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
292         struct kib_tx *tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
293
294         if (!tx) {
295                 CERROR("Can't get tx for completion %x for %s\n",
296                        type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
297                 return;
298         }
299
300         tx->tx_msg->ibm_u.completion.ibcm_status = status;
301         tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;
302         kiblnd_init_tx_msg(ni, tx, type, sizeof(struct kib_completion_msg));
303
304         kiblnd_queue_tx(tx, conn);
305 }
306
307 static void
308 kiblnd_handle_rx(struct kib_rx *rx)
309 {
310         struct kib_msg *msg = rx->rx_msg;
311         struct kib_conn *conn = rx->rx_conn;
312         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
313         int credits = msg->ibm_credits;
314         struct kib_tx *tx;
315         int rc = 0;
316         int rc2;
317         int post_credit;
318
319         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
320
321         CDEBUG(D_NET, "Received %x[%d] from %s\n",
322                msg->ibm_type, credits,
323                libcfs_nid2str(conn->ibc_peer->ibp_nid));
324
325         if (credits) {
326                 /* Have I received credits that will let me send? */
327                 spin_lock(&conn->ibc_lock);
328
329                 if (conn->ibc_credits + credits >
330                     conn->ibc_queue_depth) {
331                         rc2 = conn->ibc_credits;
332                         spin_unlock(&conn->ibc_lock);
333
334                         CERROR("Bad credits from %s: %d + %d > %d\n",
335                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
336                                rc2, credits, conn->ibc_queue_depth);
337
338                         kiblnd_close_conn(conn, -EPROTO);
339                         kiblnd_post_rx(rx, IBLND_POSTRX_NO_CREDIT);
340                         return;
341                 }
342
343                 conn->ibc_credits += credits;
344
345                 /* This ensures the credit taken by NOOP can be returned */
346                 if (msg->ibm_type == IBLND_MSG_NOOP &&
347                     !IBLND_OOB_CAPABLE(conn->ibc_version)) /* v1 only */
348                         conn->ibc_outstanding_credits++;
349
350                 kiblnd_check_sends_locked(conn);
351                 spin_unlock(&conn->ibc_lock);
352         }
353
354         switch (msg->ibm_type) {
355         default:
356                 CERROR("Bad IBLND message type %x from %s\n",
357                        msg->ibm_type, libcfs_nid2str(conn->ibc_peer->ibp_nid));
358                 post_credit = IBLND_POSTRX_NO_CREDIT;
359                 rc = -EPROTO;
360                 break;
361
362         case IBLND_MSG_NOOP:
363                 if (IBLND_OOB_CAPABLE(conn->ibc_version)) {
364                         post_credit = IBLND_POSTRX_NO_CREDIT;
365                         break;
366                 }
367
368                 if (credits) /* credit already posted */
369                         post_credit = IBLND_POSTRX_NO_CREDIT;
370                 else          /* a keepalive NOOP */
371                         post_credit = IBLND_POSTRX_PEER_CREDIT;
372                 break;
373
374         case IBLND_MSG_IMMEDIATE:
375                 post_credit = IBLND_POSTRX_DONT_POST;
376                 rc = lnet_parse(ni, &msg->ibm_u.immediate.ibim_hdr,
377                                 msg->ibm_srcnid, rx, 0);
378                 if (rc < 0)                  /* repost on error */
379                         post_credit = IBLND_POSTRX_PEER_CREDIT;
380                 break;
381
382         case IBLND_MSG_PUT_REQ:
383                 post_credit = IBLND_POSTRX_DONT_POST;
384                 rc = lnet_parse(ni, &msg->ibm_u.putreq.ibprm_hdr,
385                                 msg->ibm_srcnid, rx, 1);
386                 if (rc < 0)                  /* repost on error */
387                         post_credit = IBLND_POSTRX_PEER_CREDIT;
388                 break;
389
390         case IBLND_MSG_PUT_NAK:
391                 CWARN("PUT_NACK from %s\n",
392                       libcfs_nid2str(conn->ibc_peer->ibp_nid));
393                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
394                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_REQ,
395                                          msg->ibm_u.completion.ibcm_status,
396                                          msg->ibm_u.completion.ibcm_cookie);
397                 break;
398
399         case IBLND_MSG_PUT_ACK:
400                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
401
402                 spin_lock(&conn->ibc_lock);
403                 tx = kiblnd_find_waiting_tx_locked(conn, IBLND_MSG_PUT_REQ,
404                                                    msg->ibm_u.putack.ibpam_src_cookie);
405                 if (tx)
406                         list_del(&tx->tx_list);
407                 spin_unlock(&conn->ibc_lock);
408
409                 if (!tx) {
410                         CERROR("Unmatched PUT_ACK from %s\n",
411                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
412                         rc = -EPROTO;
413                         break;
414                 }
415
416                 LASSERT(tx->tx_waiting);
417                 /*
418                  * CAVEAT EMPTOR: I could be racing with tx_complete, but...
419                  * (a) I can overwrite tx_msg since my peer has received it!
420                  * (b) tx_waiting set tells tx_complete() it's not done.
421                  */
422                 tx->tx_nwrq = 0;                /* overwrite PUT_REQ */
423
424                 rc2 = kiblnd_init_rdma(conn, tx, IBLND_MSG_PUT_DONE,
425                                        kiblnd_rd_size(&msg->ibm_u.putack.ibpam_rd),
426                                        &msg->ibm_u.putack.ibpam_rd,
427                                        msg->ibm_u.putack.ibpam_dst_cookie);
428                 if (rc2 < 0)
429                         CERROR("Can't setup rdma for PUT to %s: %d\n",
430                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc2);
431
432                 spin_lock(&conn->ibc_lock);
433                 tx->tx_waiting = 0;     /* clear waiting and queue atomically */
434                 kiblnd_queue_tx_locked(tx, conn);
435                 spin_unlock(&conn->ibc_lock);
436                 break;
437
438         case IBLND_MSG_PUT_DONE:
439                 post_credit = IBLND_POSTRX_PEER_CREDIT;
440                 kiblnd_handle_completion(conn, IBLND_MSG_PUT_ACK,
441                                          msg->ibm_u.completion.ibcm_status,
442                                          msg->ibm_u.completion.ibcm_cookie);
443                 break;
444
445         case IBLND_MSG_GET_REQ:
446                 post_credit = IBLND_POSTRX_DONT_POST;
447                 rc = lnet_parse(ni, &msg->ibm_u.get.ibgm_hdr,
448                                 msg->ibm_srcnid, rx, 1);
449                 if (rc < 0)                  /* repost on error */
450                         post_credit = IBLND_POSTRX_PEER_CREDIT;
451                 break;
452
453         case IBLND_MSG_GET_DONE:
454                 post_credit = IBLND_POSTRX_RSRVD_CREDIT;
455                 kiblnd_handle_completion(conn, IBLND_MSG_GET_REQ,
456                                          msg->ibm_u.completion.ibcm_status,
457                                          msg->ibm_u.completion.ibcm_cookie);
458                 break;
459         }
460
461         if (rc < 0)                          /* protocol error */
462                 kiblnd_close_conn(conn, rc);
463
464         if (post_credit != IBLND_POSTRX_DONT_POST)
465                 kiblnd_post_rx(rx, post_credit);
466 }
467
468 static void
469 kiblnd_rx_complete(struct kib_rx *rx, int status, int nob)
470 {
471         struct kib_msg *msg = rx->rx_msg;
472         struct kib_conn *conn = rx->rx_conn;
473         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
474         struct kib_net *net = ni->ni_data;
475         int rc;
476         int err = -EIO;
477
478         LASSERT(net);
479         LASSERT(rx->rx_nob < 0);               /* was posted */
480         rx->rx_nob = 0;                  /* isn't now */
481
482         if (conn->ibc_state > IBLND_CONN_ESTABLISHED)
483                 goto ignore;
484
485         if (status != IB_WC_SUCCESS) {
486                 CNETERR("Rx from %s failed: %d\n",
487                         libcfs_nid2str(conn->ibc_peer->ibp_nid), status);
488                 goto failed;
489         }
490
491         LASSERT(nob >= 0);
492         rx->rx_nob = nob;
493
494         rc = kiblnd_unpack_msg(msg, rx->rx_nob);
495         if (rc) {
496                 CERROR("Error %d unpacking rx from %s\n",
497                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));
498                 goto failed;
499         }
500
501         if (msg->ibm_srcnid != conn->ibc_peer->ibp_nid ||
502             msg->ibm_dstnid != ni->ni_nid ||
503             msg->ibm_srcstamp != conn->ibc_incarnation ||
504             msg->ibm_dststamp != net->ibn_incarnation) {
505                 CERROR("Stale rx from %s\n",
506                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
507                 err = -ESTALE;
508                 goto failed;
509         }
510
511         /* set time last known alive */
512         kiblnd_peer_alive(conn->ibc_peer);
513
514         /* racing with connection establishment/teardown! */
515
516         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
517                 rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
518                 unsigned long flags;
519
520                 write_lock_irqsave(g_lock, flags);
521                 /* must check holding global lock to eliminate race */
522                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
523                         list_add_tail(&rx->rx_list, &conn->ibc_early_rxs);
524                         write_unlock_irqrestore(g_lock, flags);
525                         return;
526                 }
527                 write_unlock_irqrestore(g_lock, flags);
528         }
529         kiblnd_handle_rx(rx);
530         return;
531
532  failed:
533         CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
534         kiblnd_close_conn(conn, err);
535  ignore:
536         kiblnd_drop_rx(rx);                  /* Don't re-post rx. */
537 }
538
539 static struct page *
540 kiblnd_kvaddr_to_page(unsigned long vaddr)
541 {
542         struct page *page;
543
544         if (is_vmalloc_addr((void *)vaddr)) {
545                 page = vmalloc_to_page((void *)vaddr);
546                 LASSERT(page);
547                 return page;
548         }
549 #ifdef CONFIG_HIGHMEM
550         if (vaddr >= PKMAP_BASE &&
551             vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE)) {
552                 /* No highmem pages only used for bulk (kiov) I/O */
553                 CERROR("find page for address in highmem\n");
554                 LBUG();
555         }
556 #endif
557         page = virt_to_page(vaddr);
558         LASSERT(page);
559         return page;
560 }
561
562 static int
563 kiblnd_fmr_map_tx(struct kib_net *net, struct kib_tx *tx, struct kib_rdma_desc *rd, __u32 nob)
564 {
565         struct kib_hca_dev *hdev;
566         struct kib_fmr_poolset *fps;
567         int cpt;
568         int rc;
569
570         LASSERT(tx->tx_pool);
571         LASSERT(tx->tx_pool->tpo_pool.po_owner);
572
573         hdev = tx->tx_pool->tpo_hdev;
574         cpt = tx->tx_pool->tpo_pool.po_owner->ps_cpt;
575
576         fps = net->ibn_fmr_ps[cpt];
577         rc = kiblnd_fmr_pool_map(fps, tx, rd, nob, 0, &tx->fmr);
578         if (rc) {
579                 CERROR("Can't map %u bytes: %d\n", nob, rc);
580                 return rc;
581         }
582
583         /*
584          * If rd is not tx_rd, it's going to get sent to a peer, who will need
585          * the rkey
586          */
587         rd->rd_key = tx->fmr.fmr_key;
588         rd->rd_frags[0].rf_addr &= ~hdev->ibh_page_mask;
589         rd->rd_frags[0].rf_nob = nob;
590         rd->rd_nfrags = 1;
591
592         return 0;
593 }
594
595 static void kiblnd_unmap_tx(lnet_ni_t *ni, struct kib_tx *tx)
596 {
597         struct kib_net *net = ni->ni_data;
598
599         LASSERT(net);
600
601         if (net->ibn_fmr_ps)
602                 kiblnd_fmr_pool_unmap(&tx->fmr, tx->tx_status);
603
604         if (tx->tx_nfrags) {
605                 kiblnd_dma_unmap_sg(tx->tx_pool->tpo_hdev->ibh_ibdev,
606                                     tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);
607                 tx->tx_nfrags = 0;
608         }
609 }
610
611 static int kiblnd_map_tx(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
612                          int nfrags)
613 {
614         struct kib_net *net = ni->ni_data;
615         struct kib_hca_dev *hdev = net->ibn_dev->ibd_hdev;
616         struct ib_mr *mr    = NULL;
617         __u32 nob;
618         int i;
619
620         /*
621          * If rd is not tx_rd, it's going to get sent to a peer and I'm the
622          * RDMA sink
623          */
624         tx->tx_dmadir = (rd != tx->tx_rd) ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
625         tx->tx_nfrags = nfrags;
626
627         rd->rd_nfrags = kiblnd_dma_map_sg(hdev->ibh_ibdev, tx->tx_frags,
628                                           tx->tx_nfrags, tx->tx_dmadir);
629
630         for (i = 0, nob = 0; i < rd->rd_nfrags; i++) {
631                 rd->rd_frags[i].rf_nob  = kiblnd_sg_dma_len(
632                         hdev->ibh_ibdev, &tx->tx_frags[i]);
633                 rd->rd_frags[i].rf_addr = kiblnd_sg_dma_address(
634                         hdev->ibh_ibdev, &tx->tx_frags[i]);
635                 nob += rd->rd_frags[i].rf_nob;
636         }
637
638         mr = kiblnd_find_rd_dma_mr(ni, rd, tx->tx_conn ?
639                                    tx->tx_conn->ibc_max_frags : -1);
640         if (mr) {
641                 /* found pre-mapping MR */
642                 rd->rd_key = (rd != tx->tx_rd) ? mr->rkey : mr->lkey;
643                 return 0;
644         }
645
646         if (net->ibn_fmr_ps)
647                 return kiblnd_fmr_map_tx(net, tx, rd, nob);
648
649         return -EINVAL;
650 }
651
652 static int
653 kiblnd_setup_rd_iov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
654                     unsigned int niov, const struct kvec *iov, int offset, int nob)
655 {
656         struct kib_net *net = ni->ni_data;
657         struct page *page;
658         struct scatterlist *sg;
659         unsigned long vaddr;
660         int fragnob;
661         int page_offset;
662
663         LASSERT(nob > 0);
664         LASSERT(niov > 0);
665         LASSERT(net);
666
667         while (offset >= iov->iov_len) {
668                 offset -= iov->iov_len;
669                 niov--;
670                 iov++;
671                 LASSERT(niov > 0);
672         }
673
674         sg = tx->tx_frags;
675         do {
676                 LASSERT(niov > 0);
677
678                 vaddr = ((unsigned long)iov->iov_base) + offset;
679                 page_offset = vaddr & (PAGE_SIZE - 1);
680                 page = kiblnd_kvaddr_to_page(vaddr);
681                 if (!page) {
682                         CERROR("Can't find page\n");
683                         return -EFAULT;
684                 }
685
686                 fragnob = min((int)(iov->iov_len - offset), nob);
687                 fragnob = min(fragnob, (int)PAGE_SIZE - page_offset);
688
689                 sg_set_page(sg, page, fragnob, page_offset);
690                 sg = sg_next(sg);
691                 if (!sg) {
692                         CERROR("lacking enough sg entries to map tx\n");
693                         return -EFAULT;
694                 }
695
696                 if (offset + fragnob < iov->iov_len) {
697                         offset += fragnob;
698                 } else {
699                         offset = 0;
700                         iov++;
701                         niov--;
702                 }
703                 nob -= fragnob;
704         } while (nob > 0);
705
706         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
707 }
708
709 static int
710 kiblnd_setup_rd_kiov(lnet_ni_t *ni, struct kib_tx *tx, struct kib_rdma_desc *rd,
711                      int nkiov, const lnet_kiov_t *kiov, int offset, int nob)
712 {
713         struct kib_net *net = ni->ni_data;
714         struct scatterlist *sg;
715         int fragnob;
716
717         CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
718
719         LASSERT(nob > 0);
720         LASSERT(nkiov > 0);
721         LASSERT(net);
722
723         while (offset >= kiov->bv_len) {
724                 offset -= kiov->bv_len;
725                 nkiov--;
726                 kiov++;
727                 LASSERT(nkiov > 0);
728         }
729
730         sg = tx->tx_frags;
731         do {
732                 LASSERT(nkiov > 0);
733
734                 fragnob = min((int)(kiov->bv_len - offset), nob);
735
736                 sg_set_page(sg, kiov->bv_page, fragnob,
737                             kiov->bv_offset + offset);
738                 sg = sg_next(sg);
739                 if (!sg) {
740                         CERROR("lacking enough sg entries to map tx\n");
741                         return -EFAULT;
742                 }
743
744                 offset = 0;
745                 kiov++;
746                 nkiov--;
747                 nob -= fragnob;
748         } while (nob > 0);
749
750         return kiblnd_map_tx(ni, tx, rd, sg - tx->tx_frags);
751 }
752
753 static int
754 kiblnd_post_tx_locked(struct kib_conn *conn, struct kib_tx *tx, int credit)
755         __must_hold(&conn->ibc_lock)
756 {
757         struct kib_msg *msg = tx->tx_msg;
758         struct kib_peer *peer = conn->ibc_peer;
759         struct lnet_ni *ni = peer->ibp_ni;
760         int ver = conn->ibc_version;
761         int rc;
762         int done;
763
764         LASSERT(tx->tx_queued);
765         /* We rely on this for QP sizing */
766         LASSERT(tx->tx_nwrq > 0);
767
768         LASSERT(!credit || credit == 1);
769         LASSERT(conn->ibc_outstanding_credits >= 0);
770         LASSERT(conn->ibc_outstanding_credits <= conn->ibc_queue_depth);
771         LASSERT(conn->ibc_credits >= 0);
772         LASSERT(conn->ibc_credits <= conn->ibc_queue_depth);
773
774         if (conn->ibc_nsends_posted == kiblnd_concurrent_sends(ver, ni)) {
775                 /* tx completions outstanding... */
776                 CDEBUG(D_NET, "%s: posted enough\n",
777                        libcfs_nid2str(peer->ibp_nid));
778                 return -EAGAIN;
779         }
780
781         if (credit && !conn->ibc_credits) {   /* no credits */
782                 CDEBUG(D_NET, "%s: no credits\n",
783                        libcfs_nid2str(peer->ibp_nid));
784                 return -EAGAIN;
785         }
786
787         if (credit && !IBLND_OOB_CAPABLE(ver) &&
788             conn->ibc_credits == 1 &&   /* last credit reserved */
789             msg->ibm_type != IBLND_MSG_NOOP) {      /* for NOOP */
790                 CDEBUG(D_NET, "%s: not using last credit\n",
791                        libcfs_nid2str(peer->ibp_nid));
792                 return -EAGAIN;
793         }
794
795         /* NB don't drop ibc_lock before bumping tx_sending */
796         list_del(&tx->tx_list);
797         tx->tx_queued = 0;
798
799         if (msg->ibm_type == IBLND_MSG_NOOP &&
800             (!kiblnd_need_noop(conn) ||     /* redundant NOOP */
801              (IBLND_OOB_CAPABLE(ver) && /* posted enough NOOP */
802               conn->ibc_noops_posted == IBLND_OOB_MSGS(ver)))) {
803                 /*
804                  * OK to drop when posted enough NOOPs, since
805                  * kiblnd_check_sends_locked will queue NOOP again when
806                  * posted NOOPs complete
807                  */
808                 spin_unlock(&conn->ibc_lock);
809                 kiblnd_tx_done(peer->ibp_ni, tx);
810                 spin_lock(&conn->ibc_lock);
811                 CDEBUG(D_NET, "%s(%d): redundant or enough NOOP\n",
812                        libcfs_nid2str(peer->ibp_nid),
813                        conn->ibc_noops_posted);
814                 return 0;
815         }
816
817         kiblnd_pack_msg(peer->ibp_ni, msg, ver, conn->ibc_outstanding_credits,
818                         peer->ibp_nid, conn->ibc_incarnation);
819
820         conn->ibc_credits -= credit;
821         conn->ibc_outstanding_credits = 0;
822         conn->ibc_nsends_posted++;
823         if (msg->ibm_type == IBLND_MSG_NOOP)
824                 conn->ibc_noops_posted++;
825
826         /*
827          * CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA
828          * PUT.  If so, it was first queued here as a PUT_REQ, sent and
829          * stashed on ibc_active_txs, matched by an incoming PUT_ACK,
830          * and then re-queued here.  It's (just) possible that
831          * tx_sending is non-zero if we've not done the tx_complete()
832          * from the first send; hence the ++ rather than = below.
833          */
834         tx->tx_sending++;
835         list_add(&tx->tx_list, &conn->ibc_active_txs);
836
837         /* I'm still holding ibc_lock! */
838         if (conn->ibc_state != IBLND_CONN_ESTABLISHED) {
839                 rc = -ECONNABORTED;
840         } else if (tx->tx_pool->tpo_pool.po_failed ||
841                  conn->ibc_hdev != tx->tx_pool->tpo_hdev) {
842                 /* close_conn will launch failover */
843                 rc = -ENETDOWN;
844         } else {
845                 struct kib_fast_reg_descriptor *frd = tx->fmr.fmr_frd;
846                 struct ib_send_wr *bad = &tx->tx_wrq[tx->tx_nwrq - 1].wr;
847                 struct ib_send_wr *wrq = &tx->tx_wrq[0].wr;
848
849                 if (frd) {
850                         if (!frd->frd_valid) {
851                                 wrq = &frd->frd_inv_wr;
852                                 wrq->next = &frd->frd_fastreg_wr.wr;
853                         } else {
854                                 wrq = &frd->frd_fastreg_wr.wr;
855                         }
856                         frd->frd_fastreg_wr.wr.next = &tx->tx_wrq[0].wr;
857                 }
858
859                 LASSERTF(bad->wr_id == kiblnd_ptr2wreqid(tx, IBLND_WID_TX),
860                          "bad wr_id %llx, opc %d, flags %d, peer: %s\n",
861                          bad->wr_id, bad->opcode, bad->send_flags,
862                          libcfs_nid2str(conn->ibc_peer->ibp_nid));
863                 bad = NULL;
864                 rc = ib_post_send(conn->ibc_cmid->qp, wrq, &bad);
865         }
866
867         conn->ibc_last_send = jiffies;
868
869         if (!rc)
870                 return 0;
871
872         /*
873          * NB credits are transferred in the actual
874          * message, which can only be the last work item
875          */
876         conn->ibc_credits += credit;
877         conn->ibc_outstanding_credits += msg->ibm_credits;
878         conn->ibc_nsends_posted--;
879         if (msg->ibm_type == IBLND_MSG_NOOP)
880                 conn->ibc_noops_posted--;
881
882         tx->tx_status = rc;
883         tx->tx_waiting = 0;
884         tx->tx_sending--;
885
886         done = !tx->tx_sending;
887         if (done)
888                 list_del(&tx->tx_list);
889
890         spin_unlock(&conn->ibc_lock);
891
892         if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
893                 CERROR("Error %d posting transmit to %s\n",
894                        rc, libcfs_nid2str(peer->ibp_nid));
895         else
896                 CDEBUG(D_NET, "Error %d posting transmit to %s\n",
897                        rc, libcfs_nid2str(peer->ibp_nid));
898
899         kiblnd_close_conn(conn, rc);
900
901         if (done)
902                 kiblnd_tx_done(peer->ibp_ni, tx);
903
904         spin_lock(&conn->ibc_lock);
905
906         return -EIO;
907 }
908
909 static void
910 kiblnd_check_sends_locked(struct kib_conn *conn)
911 {
912         int ver = conn->ibc_version;
913         lnet_ni_t *ni = conn->ibc_peer->ibp_ni;
914         struct kib_tx *tx;
915
916         /* Don't send anything until after the connection is established */
917         if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
918                 CDEBUG(D_NET, "%s too soon\n",
919                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
920                 return;
921         }
922
923         LASSERT(conn->ibc_nsends_posted <= kiblnd_concurrent_sends(ver, ni));
924         LASSERT(!IBLND_OOB_CAPABLE(ver) ||
925                 conn->ibc_noops_posted <= IBLND_OOB_MSGS(ver));
926         LASSERT(conn->ibc_reserved_credits >= 0);
927
928         while (conn->ibc_reserved_credits > 0 &&
929                !list_empty(&conn->ibc_tx_queue_rsrvd)) {
930                 tx = list_entry(conn->ibc_tx_queue_rsrvd.next,
931                                 struct kib_tx, tx_list);
932                 list_del(&tx->tx_list);
933                 list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);
934                 conn->ibc_reserved_credits--;
935         }
936
937         if (kiblnd_need_noop(conn)) {
938                 spin_unlock(&conn->ibc_lock);
939
940                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
941                 if (tx)
942                         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);
943
944                 spin_lock(&conn->ibc_lock);
945                 if (tx)
946                         kiblnd_queue_tx_locked(tx, conn);
947         }
948
949         for (;;) {
950                 int credit;
951
952                 if (!list_empty(&conn->ibc_tx_queue_nocred)) {
953                         credit = 0;
954                         tx = list_entry(conn->ibc_tx_queue_nocred.next,
955                                         struct kib_tx, tx_list);
956                 } else if (!list_empty(&conn->ibc_tx_noops)) {
957                         LASSERT(!IBLND_OOB_CAPABLE(ver));
958                         credit = 1;
959                         tx = list_entry(conn->ibc_tx_noops.next,
960                                         struct kib_tx, tx_list);
961                 } else if (!list_empty(&conn->ibc_tx_queue)) {
962                         credit = 1;
963                         tx = list_entry(conn->ibc_tx_queue.next,
964                                         struct kib_tx, tx_list);
965                 } else {
966                         break;
967                 }
968
969                 if (kiblnd_post_tx_locked(conn, tx, credit))
970                         break;
971         }
972 }
973
974 static void
975 kiblnd_tx_complete(struct kib_tx *tx, int status)
976 {
977         int failed = (status != IB_WC_SUCCESS);
978         struct kib_conn *conn = tx->tx_conn;
979         int idle;
980
981         LASSERT(tx->tx_sending > 0);
982
983         if (failed) {
984                 if (conn->ibc_state == IBLND_CONN_ESTABLISHED)
985                         CNETERR("Tx -> %s cookie %#llx sending %d waiting %d: failed %d\n",
986                                 libcfs_nid2str(conn->ibc_peer->ibp_nid),
987                                 tx->tx_cookie, tx->tx_sending, tx->tx_waiting,
988                                 status);
989
990                 kiblnd_close_conn(conn, -EIO);
991         } else {
992                 kiblnd_peer_alive(conn->ibc_peer);
993         }
994
995         spin_lock(&conn->ibc_lock);
996
997         /*
998          * I could be racing with rdma completion.  Whoever makes 'tx' idle
999          * gets to free it, which also drops its ref on 'conn'.
1000          */
1001         tx->tx_sending--;
1002         conn->ibc_nsends_posted--;
1003         if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP)
1004                 conn->ibc_noops_posted--;
1005
1006         if (failed) {
1007                 tx->tx_waiting = 0;          /* don't wait for peer */
1008                 tx->tx_status = -EIO;
1009         }
1010
1011         idle = !tx->tx_sending &&        /* This is the final callback */
1012                !tx->tx_waiting &&              /* Not waiting for peer */
1013                !tx->tx_queued;            /* Not re-queued (PUT_DONE) */
1014         if (idle)
1015                 list_del(&tx->tx_list);
1016
1017         kiblnd_check_sends_locked(conn);
1018         spin_unlock(&conn->ibc_lock);
1019
1020         if (idle)
1021                 kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);
1022 }
1023
1024 static void
1025 kiblnd_init_tx_msg(lnet_ni_t *ni, struct kib_tx *tx, int type, int body_nob)
1026 {
1027         struct kib_hca_dev *hdev = tx->tx_pool->tpo_hdev;
1028         struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq];
1029         struct ib_rdma_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];
1030         int nob = offsetof(struct kib_msg, ibm_u) + body_nob;
1031         struct ib_mr *mr = hdev->ibh_mrs;
1032
1033         LASSERT(tx->tx_nwrq >= 0);
1034         LASSERT(tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);
1035         LASSERT(nob <= IBLND_MSG_SIZE);
1036         LASSERT(mr);
1037
1038         kiblnd_init_msg(tx->tx_msg, type, body_nob);
1039
1040         sge->lkey   = mr->lkey;
1041         sge->addr   = tx->tx_msgaddr;
1042         sge->length = nob;
1043
1044         memset(wrq, 0, sizeof(*wrq));
1045
1046         wrq->wr.next       = NULL;
1047         wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);
1048         wrq->wr.sg_list    = sge;
1049         wrq->wr.num_sge    = 1;
1050         wrq->wr.opcode     = IB_WR_SEND;
1051         wrq->wr.send_flags = IB_SEND_SIGNALED;
1052
1053         tx->tx_nwrq++;
1054 }
1055
1056 static int
1057 kiblnd_init_rdma(struct kib_conn *conn, struct kib_tx *tx, int type,
1058                  int resid, struct kib_rdma_desc *dstrd, __u64 dstcookie)
1059 {
1060         struct kib_msg *ibmsg = tx->tx_msg;
1061         struct kib_rdma_desc *srcrd = tx->tx_rd;
1062         struct ib_sge *sge = &tx->tx_sge[0];
1063         struct ib_rdma_wr *wrq, *next;
1064         int rc  = resid;
1065         int srcidx = 0;
1066         int dstidx = 0;
1067         int wrknob;
1068
1069         LASSERT(!in_interrupt());
1070         LASSERT(!tx->tx_nwrq);
1071         LASSERT(type == IBLND_MSG_GET_DONE ||
1072                 type == IBLND_MSG_PUT_DONE);
1073
1074         if (kiblnd_rd_size(srcrd) > conn->ibc_max_frags << PAGE_SHIFT) {
1075                 CERROR("RDMA is too large for peer %s (%d), src size: %d dst size: %d\n",
1076                        libcfs_nid2str(conn->ibc_peer->ibp_nid),
1077                        conn->ibc_max_frags << PAGE_SHIFT,
1078                        kiblnd_rd_size(srcrd), kiblnd_rd_size(dstrd));
1079                 rc = -EMSGSIZE;
1080                 goto too_big;
1081         }
1082
1083         while (resid > 0) {
1084                 if (srcidx >= srcrd->rd_nfrags) {
1085                         CERROR("Src buffer exhausted: %d frags\n", srcidx);
1086                         rc = -EPROTO;
1087                         break;
1088                 }
1089
1090                 if (dstidx == dstrd->rd_nfrags) {
1091                         CERROR("Dst buffer exhausted: %d frags\n", dstidx);
1092                         rc = -EPROTO;
1093                         break;
1094                 }
1095
1096                 if (tx->tx_nwrq >= IBLND_MAX_RDMA_FRAGS) {
1097                         CERROR("RDMA has too many fragments for peer %s (%d), src idx/frags: %d/%d dst idx/frags: %d/%d\n",
1098                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
1099                                IBLND_MAX_RDMA_FRAGS,
1100                                srcidx, srcrd->rd_nfrags,
1101                                dstidx, dstrd->rd_nfrags);
1102                         rc = -EMSGSIZE;
1103                         break;
1104                 }
1105
1106                 wrknob = min(min(kiblnd_rd_frag_size(srcrd, srcidx),
1107                                  kiblnd_rd_frag_size(dstrd, dstidx)),
1108                              (__u32)resid);
1109
1110                 sge = &tx->tx_sge[tx->tx_nwrq];
1111                 sge->addr   = kiblnd_rd_frag_addr(srcrd, srcidx);
1112                 sge->lkey   = kiblnd_rd_frag_key(srcrd, srcidx);
1113                 sge->length = wrknob;
1114
1115                 wrq = &tx->tx_wrq[tx->tx_nwrq];
1116                 next = wrq + 1;
1117
1118                 wrq->wr.next       = &next->wr;
1119                 wrq->wr.wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);
1120                 wrq->wr.sg_list    = sge;
1121                 wrq->wr.num_sge    = 1;
1122                 wrq->wr.opcode     = IB_WR_RDMA_WRITE;
1123                 wrq->wr.send_flags = 0;
1124
1125                 wrq->remote_addr = kiblnd_rd_frag_addr(dstrd, dstidx);
1126                 wrq->rkey        = kiblnd_rd_frag_key(dstrd, dstidx);
1127
1128                 srcidx = kiblnd_rd_consume_frag(srcrd, srcidx, wrknob);
1129                 dstidx = kiblnd_rd_consume_frag(dstrd, dstidx, wrknob);
1130
1131                 resid -= wrknob;
1132
1133                 tx->tx_nwrq++;
1134                 wrq++;
1135                 sge++;
1136         }
1137 too_big:
1138         if (rc < 0)                          /* no RDMA if completing with failure */
1139                 tx->tx_nwrq = 0;
1140
1141         ibmsg->ibm_u.completion.ibcm_status = rc;
1142         ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;
1143         kiblnd_init_tx_msg(conn->ibc_peer->ibp_ni, tx,
1144                            type, sizeof(struct kib_completion_msg));
1145
1146         return rc;
1147 }
1148
1149 static void
1150 kiblnd_queue_tx_locked(struct kib_tx *tx, struct kib_conn *conn)
1151 {
1152         struct list_head *q;
1153
1154         LASSERT(tx->tx_nwrq > 0);             /* work items set up */
1155         LASSERT(!tx->tx_queued);               /* not queued for sending already */
1156         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1157
1158         tx->tx_queued = 1;
1159         tx->tx_deadline = jiffies +
1160                           msecs_to_jiffies(*kiblnd_tunables.kib_timeout *
1161                                            MSEC_PER_SEC);
1162
1163         if (!tx->tx_conn) {
1164                 kiblnd_conn_addref(conn);
1165                 tx->tx_conn = conn;
1166                 LASSERT(tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);
1167         } else {
1168                 /* PUT_DONE first attached to conn as a PUT_REQ */
1169                 LASSERT(tx->tx_conn == conn);
1170                 LASSERT(tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);
1171         }
1172
1173         switch (tx->tx_msg->ibm_type) {
1174         default:
1175                 LBUG();
1176
1177         case IBLND_MSG_PUT_REQ:
1178         case IBLND_MSG_GET_REQ:
1179                 q = &conn->ibc_tx_queue_rsrvd;
1180                 break;
1181
1182         case IBLND_MSG_PUT_NAK:
1183         case IBLND_MSG_PUT_ACK:
1184         case IBLND_MSG_PUT_DONE:
1185         case IBLND_MSG_GET_DONE:
1186                 q = &conn->ibc_tx_queue_nocred;
1187                 break;
1188
1189         case IBLND_MSG_NOOP:
1190                 if (IBLND_OOB_CAPABLE(conn->ibc_version))
1191                         q = &conn->ibc_tx_queue_nocred;
1192                 else
1193                         q = &conn->ibc_tx_noops;
1194                 break;
1195
1196         case IBLND_MSG_IMMEDIATE:
1197                 q = &conn->ibc_tx_queue;
1198                 break;
1199         }
1200
1201         list_add_tail(&tx->tx_list, q);
1202 }
1203
1204 static void
1205 kiblnd_queue_tx(struct kib_tx *tx, struct kib_conn *conn)
1206 {
1207         spin_lock(&conn->ibc_lock);
1208         kiblnd_queue_tx_locked(tx, conn);
1209         kiblnd_check_sends_locked(conn);
1210         spin_unlock(&conn->ibc_lock);
1211 }
1212
1213 static int kiblnd_resolve_addr(struct rdma_cm_id *cmid,
1214                                struct sockaddr_in *srcaddr,
1215                                struct sockaddr_in *dstaddr,
1216                                int timeout_ms)
1217 {
1218         unsigned short port;
1219         int rc;
1220
1221         /* allow the port to be reused */
1222         rc = rdma_set_reuseaddr(cmid, 1);
1223         if (rc) {
1224                 CERROR("Unable to set reuse on cmid: %d\n", rc);
1225                 return rc;
1226         }
1227
1228         /* look for a free privileged port */
1229         for (port = PROT_SOCK - 1; port > 0; port--) {
1230                 srcaddr->sin_port = htons(port);
1231                 rc = rdma_resolve_addr(cmid,
1232                                        (struct sockaddr *)srcaddr,
1233                                        (struct sockaddr *)dstaddr,
1234                                        timeout_ms);
1235                 if (!rc) {
1236                         CDEBUG(D_NET, "bound to port %hu\n", port);
1237                         return 0;
1238                 } else if (rc == -EADDRINUSE || rc == -EADDRNOTAVAIL) {
1239                         CDEBUG(D_NET, "bind to port %hu failed: %d\n",
1240                                port, rc);
1241                 } else {
1242                         return rc;
1243                 }
1244         }
1245
1246         CERROR("Failed to bind to a free privileged port\n");
1247         return rc;
1248 }
1249
1250 static void
1251 kiblnd_connect_peer(struct kib_peer *peer)
1252 {
1253         struct rdma_cm_id *cmid;
1254         struct kib_dev *dev;
1255         struct kib_net *net = peer->ibp_ni->ni_data;
1256         struct sockaddr_in srcaddr;
1257         struct sockaddr_in dstaddr;
1258         int rc;
1259
1260         LASSERT(net);
1261         LASSERT(peer->ibp_connecting > 0);
1262         LASSERT(!peer->ibp_reconnecting);
1263
1264         cmid = kiblnd_rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP,
1265                                      IB_QPT_RC);
1266
1267         if (IS_ERR(cmid)) {
1268                 CERROR("Can't create CMID for %s: %ld\n",
1269                        libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));
1270                 rc = PTR_ERR(cmid);
1271                 goto failed;
1272         }
1273
1274         dev = net->ibn_dev;
1275         memset(&srcaddr, 0, sizeof(srcaddr));
1276         srcaddr.sin_family = AF_INET;
1277         srcaddr.sin_addr.s_addr = htonl(dev->ibd_ifip);
1278
1279         memset(&dstaddr, 0, sizeof(dstaddr));
1280         dstaddr.sin_family = AF_INET;
1281         dstaddr.sin_port = htons(*kiblnd_tunables.kib_service);
1282         dstaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));
1283
1284         kiblnd_peer_addref(peer);              /* cmid's ref */
1285
1286         if (*kiblnd_tunables.kib_use_priv_port) {
1287                 rc = kiblnd_resolve_addr(cmid, &srcaddr, &dstaddr,
1288                                          *kiblnd_tunables.kib_timeout * 1000);
1289         } else {
1290                 rc = rdma_resolve_addr(cmid,
1291                                        (struct sockaddr *)&srcaddr,
1292                                        (struct sockaddr *)&dstaddr,
1293                                        *kiblnd_tunables.kib_timeout * 1000);
1294         }
1295         if (rc) {
1296                 /* Can't initiate address resolution:  */
1297                 CERROR("Can't resolve addr for %s: %d\n",
1298                        libcfs_nid2str(peer->ibp_nid), rc);
1299                 goto failed2;
1300         }
1301
1302         LASSERT(cmid->device);
1303         CDEBUG(D_NET, "%s: connection bound to %s:%pI4h:%s\n",
1304                libcfs_nid2str(peer->ibp_nid), dev->ibd_ifname,
1305                &dev->ibd_ifip, cmid->device->name);
1306
1307         return;
1308
1309  failed2:
1310         kiblnd_peer_connect_failed(peer, 1, rc);
1311         kiblnd_peer_decref(peer);              /* cmid's ref */
1312         rdma_destroy_id(cmid);
1313         return;
1314  failed:
1315         kiblnd_peer_connect_failed(peer, 1, rc);
1316 }
1317
1318 bool
1319 kiblnd_reconnect_peer(struct kib_peer *peer)
1320 {
1321         rwlock_t *glock = &kiblnd_data.kib_global_lock;
1322         char *reason = NULL;
1323         struct list_head txs;
1324         unsigned long flags;
1325
1326         INIT_LIST_HEAD(&txs);
1327
1328         write_lock_irqsave(glock, flags);
1329         if (!peer->ibp_reconnecting) {
1330                 if (peer->ibp_accepting)
1331                         reason = "accepting";
1332                 else if (peer->ibp_connecting)
1333                         reason = "connecting";
1334                 else if (!list_empty(&peer->ibp_conns))
1335                         reason = "connected";
1336                 else /* connected then closed */
1337                         reason = "closed";
1338
1339                 goto no_reconnect;
1340         }
1341
1342         LASSERT(!peer->ibp_accepting && !peer->ibp_connecting &&
1343                 list_empty(&peer->ibp_conns));
1344         peer->ibp_reconnecting = 0;
1345
1346         if (!kiblnd_peer_active(peer)) {
1347                 list_splice_init(&peer->ibp_tx_queue, &txs);
1348                 reason = "unlinked";
1349                 goto no_reconnect;
1350         }
1351
1352         peer->ibp_connecting++;
1353         peer->ibp_reconnected++;
1354         write_unlock_irqrestore(glock, flags);
1355
1356         kiblnd_connect_peer(peer);
1357         return true;
1358
1359 no_reconnect:
1360         write_unlock_irqrestore(glock, flags);
1361
1362         CWARN("Abort reconnection of %s: %s\n",
1363               libcfs_nid2str(peer->ibp_nid), reason);
1364         kiblnd_txlist_done(peer->ibp_ni, &txs, -ECONNABORTED);
1365         return false;
1366 }
1367
1368 void
1369 kiblnd_launch_tx(lnet_ni_t *ni, struct kib_tx *tx, lnet_nid_t nid)
1370 {
1371         struct kib_peer *peer;
1372         struct kib_peer *peer2;
1373         struct kib_conn *conn;
1374         rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
1375         unsigned long flags;
1376         int rc;
1377
1378         /*
1379          * If I get here, I've committed to send, so I complete the tx with
1380          * failure on any problems
1381          */
1382         LASSERT(!tx || !tx->tx_conn); /* only set when assigned a conn */
1383         LASSERT(!tx || tx->tx_nwrq > 0);     /* work items have been set up */
1384
1385         /*
1386          * First time, just use a read lock since I expect to find my peer
1387          * connected
1388          */
1389         read_lock_irqsave(g_lock, flags);
1390
1391         peer = kiblnd_find_peer_locked(nid);
1392         if (peer && !list_empty(&peer->ibp_conns)) {
1393                 /* Found a peer with an established connection */
1394                 conn = kiblnd_get_conn_locked(peer);
1395                 kiblnd_conn_addref(conn); /* 1 ref for me... */
1396
1397                 read_unlock_irqrestore(g_lock, flags);
1398
1399                 if (tx)
1400                         kiblnd_queue_tx(tx, conn);
1401                 kiblnd_conn_decref(conn); /* ...to here */
1402                 return;
1403         }
1404
1405         read_unlock(g_lock);
1406         /* Re-try with a write lock */
1407         write_lock(g_lock);
1408
1409         peer = kiblnd_find_peer_locked(nid);
1410         if (peer) {
1411                 if (list_empty(&peer->ibp_conns)) {
1412                         /* found a peer, but it's still connecting... */
1413                         LASSERT(kiblnd_peer_connecting(peer));
1414                         if (tx)
1415                                 list_add_tail(&tx->tx_list,
1416                                               &peer->ibp_tx_queue);
1417                         write_unlock_irqrestore(g_lock, flags);
1418                 } else {
1419                         conn = kiblnd_get_conn_locked(peer);
1420                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1421
1422                         write_unlock_irqrestore(g_lock, flags);
1423
1424                         if (tx)
1425                                 kiblnd_queue_tx(tx, conn);
1426                         kiblnd_conn_decref(conn); /* ...to here */
1427                 }
1428                 return;
1429         }
1430
1431         write_unlock_irqrestore(g_lock, flags);
1432
1433         /* Allocate a peer ready to add to the peer table and retry */
1434         rc = kiblnd_create_peer(ni, &peer, nid);
1435         if (rc) {
1436                 CERROR("Can't create peer %s\n", libcfs_nid2str(nid));
1437                 if (tx) {
1438                         tx->tx_status = -EHOSTUNREACH;
1439                         tx->tx_waiting = 0;
1440                         kiblnd_tx_done(ni, tx);
1441                 }
1442                 return;
1443         }
1444
1445         write_lock_irqsave(g_lock, flags);
1446
1447         peer2 = kiblnd_find_peer_locked(nid);
1448         if (peer2) {
1449                 if (list_empty(&peer2->ibp_conns)) {
1450                         /* found a peer, but it's still connecting... */
1451                         LASSERT(kiblnd_peer_connecting(peer2));
1452                         if (tx)
1453                                 list_add_tail(&tx->tx_list,
1454                                               &peer2->ibp_tx_queue);
1455                         write_unlock_irqrestore(g_lock, flags);
1456                 } else {
1457                         conn = kiblnd_get_conn_locked(peer2);
1458                         kiblnd_conn_addref(conn); /* 1 ref for me... */
1459
1460                         write_unlock_irqrestore(g_lock, flags);
1461
1462                         if (tx)
1463                                 kiblnd_queue_tx(tx, conn);
1464                         kiblnd_conn_decref(conn); /* ...to here */
1465                 }
1466
1467                 kiblnd_peer_decref(peer);
1468                 return;
1469         }
1470
1471         /* Brand new peer */
1472         LASSERT(!peer->ibp_connecting);
1473         peer->ibp_connecting = 1;
1474
1475         /* always called with a ref on ni, which prevents ni being shutdown */
1476         LASSERT(!((struct kib_net *)ni->ni_data)->ibn_shutdown);
1477
1478         if (tx)
1479                 list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);
1480
1481         kiblnd_peer_addref(peer);
1482         list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
1483
1484         write_unlock_irqrestore(g_lock, flags);
1485
1486         kiblnd_connect_peer(peer);
1487         kiblnd_peer_decref(peer);
1488 }
1489
1490 int
1491 kiblnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1492 {
1493         lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1494         int type = lntmsg->msg_type;
1495         lnet_process_id_t target = lntmsg->msg_target;
1496         int target_is_router = lntmsg->msg_target_is_router;
1497         int routing = lntmsg->msg_routing;
1498         unsigned int payload_niov = lntmsg->msg_niov;
1499         struct kvec *payload_iov = lntmsg->msg_iov;
1500         lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
1501         unsigned int payload_offset = lntmsg->msg_offset;
1502         unsigned int payload_nob = lntmsg->msg_len;
1503         struct iov_iter from;
1504         struct kib_msg *ibmsg;
1505         struct kib_rdma_desc  *rd;
1506         struct kib_tx *tx;
1507         int nob;
1508         int rc;
1509
1510         /* NB 'private' is different depending on what we're sending.... */
1511
1512         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1513                payload_nob, payload_niov, libcfs_id2str(target));
1514
1515         LASSERT(!payload_nob || payload_niov > 0);
1516         LASSERT(payload_niov <= LNET_MAX_IOV);
1517
1518         /* Thread context */
1519         LASSERT(!in_interrupt());
1520         /* payload is either all vaddrs or all pages */
1521         LASSERT(!(payload_kiov && payload_iov));
1522
1523         if (payload_kiov)
1524                 iov_iter_bvec(&from, ITER_BVEC | WRITE,
1525                               payload_kiov, payload_niov,
1526                               payload_nob + payload_offset);
1527         else
1528                 iov_iter_kvec(&from, ITER_KVEC | WRITE,
1529                               payload_iov, payload_niov,
1530                               payload_nob + payload_offset);
1531
1532         iov_iter_advance(&from, payload_offset);
1533
1534         switch (type) {
1535         default:
1536                 LBUG();
1537                 return -EIO;
1538
1539         case LNET_MSG_ACK:
1540                 LASSERT(!payload_nob);
1541                 break;
1542
1543         case LNET_MSG_GET:
1544                 if (routing || target_is_router)
1545                         break;            /* send IMMEDIATE */
1546
1547                 /* is the REPLY message too small for RDMA? */
1548                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);
1549                 if (nob <= IBLND_MSG_SIZE)
1550                         break;            /* send IMMEDIATE */
1551
1552                 tx = kiblnd_get_idle_tx(ni, target.nid);
1553                 if (!tx) {
1554                         CERROR("Can't allocate txd for GET to %s\n",
1555                                libcfs_nid2str(target.nid));
1556                         return -ENOMEM;
1557                 }
1558
1559                 ibmsg = tx->tx_msg;
1560                 rd = &ibmsg->ibm_u.get.ibgm_rd;
1561                 if (!(lntmsg->msg_md->md_options & LNET_MD_KIOV))
1562                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1563                                                  lntmsg->msg_md->md_niov,
1564                                                  lntmsg->msg_md->md_iov.iov,
1565                                                  0, lntmsg->msg_md->md_length);
1566                 else
1567                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1568                                                   lntmsg->msg_md->md_niov,
1569                                                   lntmsg->msg_md->md_iov.kiov,
1570                                                   0, lntmsg->msg_md->md_length);
1571                 if (rc) {
1572                         CERROR("Can't setup GET sink for %s: %d\n",
1573                                libcfs_nid2str(target.nid), rc);
1574                         kiblnd_tx_done(ni, tx);
1575                         return -EIO;
1576                 }
1577
1578                 nob = offsetof(struct kib_get_msg, ibgm_rd.rd_frags[rd->rd_nfrags]);
1579                 ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;
1580                 ibmsg->ibm_u.get.ibgm_hdr = *hdr;
1581
1582                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob);
1583
1584                 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1585                 if (!tx->tx_lntmsg[1]) {
1586                         CERROR("Can't create reply for GET -> %s\n",
1587                                libcfs_nid2str(target.nid));
1588                         kiblnd_tx_done(ni, tx);
1589                         return -EIO;
1590                 }
1591
1592                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */
1593                 tx->tx_waiting = 1;          /* waiting for GET_DONE */
1594                 kiblnd_launch_tx(ni, tx, target.nid);
1595                 return 0;
1596
1597         case LNET_MSG_REPLY:
1598         case LNET_MSG_PUT:
1599                 /* Is the payload small enough not to need RDMA? */
1600                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob]);
1601                 if (nob <= IBLND_MSG_SIZE)
1602                         break;            /* send IMMEDIATE */
1603
1604                 tx = kiblnd_get_idle_tx(ni, target.nid);
1605                 if (!tx) {
1606                         CERROR("Can't allocate %s txd for %s\n",
1607                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1608                                libcfs_nid2str(target.nid));
1609                         return -ENOMEM;
1610                 }
1611
1612                 if (!payload_kiov)
1613                         rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1614                                                  payload_niov, payload_iov,
1615                                                  payload_offset, payload_nob);
1616                 else
1617                         rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1618                                                   payload_niov, payload_kiov,
1619                                                   payload_offset, payload_nob);
1620                 if (rc) {
1621                         CERROR("Can't setup PUT src for %s: %d\n",
1622                                libcfs_nid2str(target.nid), rc);
1623                         kiblnd_tx_done(ni, tx);
1624                         return -EIO;
1625                 }
1626
1627                 ibmsg = tx->tx_msg;
1628                 ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;
1629                 ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;
1630                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(struct kib_putreq_msg));
1631
1632                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1633                 tx->tx_waiting = 1;          /* waiting for PUT_{ACK,NAK} */
1634                 kiblnd_launch_tx(ni, tx, target.nid);
1635                 return 0;
1636         }
1637
1638         /* send IMMEDIATE */
1639
1640         LASSERT(offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[payload_nob])
1641                  <= IBLND_MSG_SIZE);
1642
1643         tx = kiblnd_get_idle_tx(ni, target.nid);
1644         if (!tx) {
1645                 CERROR("Can't send %d to %s: tx descs exhausted\n",
1646                        type, libcfs_nid2str(target.nid));
1647                 return -ENOMEM;
1648         }
1649
1650         ibmsg = tx->tx_msg;
1651         ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1652
1653         copy_from_iter(&ibmsg->ibm_u.immediate.ibim_payload, IBLND_MSG_SIZE,
1654                        &from);
1655         nob = offsetof(struct kib_immediate_msg, ibim_payload[payload_nob]);
1656         kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob);
1657
1658         tx->tx_lntmsg[0] = lntmsg;            /* finalise lntmsg on completion */
1659         kiblnd_launch_tx(ni, tx, target.nid);
1660         return 0;
1661 }
1662
1663 static void
1664 kiblnd_reply(lnet_ni_t *ni, struct kib_rx *rx, lnet_msg_t *lntmsg)
1665 {
1666         lnet_process_id_t target = lntmsg->msg_target;
1667         unsigned int niov = lntmsg->msg_niov;
1668         struct kvec *iov = lntmsg->msg_iov;
1669         lnet_kiov_t *kiov = lntmsg->msg_kiov;
1670         unsigned int offset = lntmsg->msg_offset;
1671         unsigned int nob = lntmsg->msg_len;
1672         struct kib_tx *tx;
1673         int rc;
1674
1675         tx = kiblnd_get_idle_tx(ni, rx->rx_conn->ibc_peer->ibp_nid);
1676         if (!tx) {
1677                 CERROR("Can't get tx for REPLY to %s\n",
1678                        libcfs_nid2str(target.nid));
1679                 goto failed_0;
1680         }
1681
1682         if (!nob)
1683                 rc = 0;
1684         else if (!kiov)
1685                 rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd,
1686                                          niov, iov, offset, nob);
1687         else
1688                 rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd,
1689                                           niov, kiov, offset, nob);
1690
1691         if (rc) {
1692                 CERROR("Can't setup GET src for %s: %d\n",
1693                        libcfs_nid2str(target.nid), rc);
1694                 goto failed_1;
1695         }
1696
1697         rc = kiblnd_init_rdma(rx->rx_conn, tx,
1698                               IBLND_MSG_GET_DONE, nob,
1699                               &rx->rx_msg->ibm_u.get.ibgm_rd,
1700                               rx->rx_msg->ibm_u.get.ibgm_cookie);
1701         if (rc < 0) {
1702                 CERROR("Can't setup rdma for GET from %s: %d\n",
1703                        libcfs_nid2str(target.nid), rc);
1704                 goto failed_1;
1705         }
1706
1707         if (!nob) {
1708                 /* No RDMA: local completion may happen now! */
1709                 lnet_finalize(ni, lntmsg, 0);
1710         } else {
1711                 /* RDMA: lnet_finalize(lntmsg) when it completes */
1712                 tx->tx_lntmsg[0] = lntmsg;
1713         }
1714
1715         kiblnd_queue_tx(tx, rx->rx_conn);
1716         return;
1717
1718  failed_1:
1719         kiblnd_tx_done(ni, tx);
1720  failed_0:
1721         lnet_finalize(ni, lntmsg, -EIO);
1722 }
1723
1724 int
1725 kiblnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1726             struct iov_iter *to, unsigned int rlen)
1727 {
1728         struct kib_rx *rx = private;
1729         struct kib_msg *rxmsg = rx->rx_msg;
1730         struct kib_conn *conn = rx->rx_conn;
1731         struct kib_tx *tx;
1732         int nob;
1733         int post_credit = IBLND_POSTRX_PEER_CREDIT;
1734         int rc = 0;
1735
1736         LASSERT(iov_iter_count(to) <= rlen);
1737         LASSERT(!in_interrupt());
1738         /* Either all pages or all vaddrs */
1739
1740         switch (rxmsg->ibm_type) {
1741         default:
1742                 LBUG();
1743
1744         case IBLND_MSG_IMMEDIATE:
1745                 nob = offsetof(struct kib_msg, ibm_u.immediate.ibim_payload[rlen]);
1746                 if (nob > rx->rx_nob) {
1747                         CERROR("Immediate message from %s too big: %d(%d)\n",
1748                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
1749                                nob, rx->rx_nob);
1750                         rc = -EPROTO;
1751                         break;
1752                 }
1753
1754                 copy_to_iter(&rxmsg->ibm_u.immediate.ibim_payload,
1755                              IBLND_MSG_SIZE, to);
1756                 lnet_finalize(ni, lntmsg, 0);
1757                 break;
1758
1759         case IBLND_MSG_PUT_REQ: {
1760                 struct kib_msg  *txmsg;
1761                 struct kib_rdma_desc *rd;
1762
1763                 if (!iov_iter_count(to)) {
1764                         lnet_finalize(ni, lntmsg, 0);
1765                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0,
1766                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1767                         break;
1768                 }
1769
1770                 tx = kiblnd_get_idle_tx(ni, conn->ibc_peer->ibp_nid);
1771                 if (!tx) {
1772                         CERROR("Can't allocate tx for %s\n",
1773                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
1774                         /* Not replying will break the connection */
1775                         rc = -ENOMEM;
1776                         break;
1777                 }
1778
1779                 txmsg = tx->tx_msg;
1780                 rd = &txmsg->ibm_u.putack.ibpam_rd;
1781                 if (!(to->type & ITER_BVEC))
1782                         rc = kiblnd_setup_rd_iov(ni, tx, rd,
1783                                                  to->nr_segs, to->kvec,
1784                                                  to->iov_offset,
1785                                                  iov_iter_count(to));
1786                 else
1787                         rc = kiblnd_setup_rd_kiov(ni, tx, rd,
1788                                                   to->nr_segs, to->bvec,
1789                                                   to->iov_offset,
1790                                                   iov_iter_count(to));
1791                 if (rc) {
1792                         CERROR("Can't setup PUT sink for %s: %d\n",
1793                                libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
1794                         kiblnd_tx_done(ni, tx);
1795                         /* tell peer it's over */
1796                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc,
1797                                                rxmsg->ibm_u.putreq.ibprm_cookie);
1798                         break;
1799                 }
1800
1801                 nob = offsetof(struct kib_putack_msg, ibpam_rd.rd_frags[rd->rd_nfrags]);
1802                 txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie;
1803                 txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;
1804
1805                 kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob);
1806
1807                 tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */
1808                 tx->tx_waiting = 1;          /* waiting for PUT_DONE */
1809                 kiblnd_queue_tx(tx, conn);
1810
1811                 /* reposted buffer reserved for PUT_DONE */
1812                 post_credit = IBLND_POSTRX_NO_CREDIT;
1813                 break;
1814                 }
1815
1816         case IBLND_MSG_GET_REQ:
1817                 if (lntmsg) {
1818                         /* Optimized GET; RDMA lntmsg's payload */
1819                         kiblnd_reply(ni, rx, lntmsg);
1820                 } else {
1821                         /* GET didn't match anything */
1822                         kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE,
1823                                                -ENODATA,
1824                                                rxmsg->ibm_u.get.ibgm_cookie);
1825                 }
1826                 break;
1827         }
1828
1829         kiblnd_post_rx(rx, post_credit);
1830         return rc;
1831 }
1832
1833 int
1834 kiblnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
1835 {
1836         struct task_struct *task = kthread_run(fn, arg, "%s", name);
1837
1838         if (IS_ERR(task))
1839                 return PTR_ERR(task);
1840
1841         atomic_inc(&kiblnd_data.kib_nthreads);
1842         return 0;
1843 }
1844
1845 static void
1846 kiblnd_thread_fini(void)
1847 {
1848         atomic_dec(&kiblnd_data.kib_nthreads);
1849 }
1850
1851 static void
1852 kiblnd_peer_alive(struct kib_peer *peer)
1853 {
1854         /* This is racy, but everyone's only writing cfs_time_current() */
1855         peer->ibp_last_alive = cfs_time_current();
1856         mb();
1857 }
1858
1859 static void
1860 kiblnd_peer_notify(struct kib_peer *peer)
1861 {
1862         int error = 0;
1863         unsigned long last_alive = 0;
1864         unsigned long flags;
1865
1866         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1867
1868         if (kiblnd_peer_idle(peer) && peer->ibp_error) {
1869                 error = peer->ibp_error;
1870                 peer->ibp_error = 0;
1871
1872                 last_alive = peer->ibp_last_alive;
1873         }
1874
1875         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1876
1877         if (error)
1878                 lnet_notify(peer->ibp_ni,
1879                             peer->ibp_nid, 0, last_alive);
1880 }
1881
1882 void
1883 kiblnd_close_conn_locked(struct kib_conn *conn, int error)
1884 {
1885         /*
1886          * This just does the immediate housekeeping. 'error' is zero for a
1887          * normal shutdown which can happen only after the connection has been
1888          * established.  If the connection is established, schedule the
1889          * connection to be finished off by the connd. Otherwise the connd is
1890          * already dealing with it (either to set it up or tear it down).
1891          * Caller holds kib_global_lock exclusively in irq context
1892          */
1893         struct kib_peer *peer = conn->ibc_peer;
1894         struct kib_dev *dev;
1895         unsigned long flags;
1896
1897         LASSERT(error || conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1898
1899         if (error && !conn->ibc_comms_error)
1900                 conn->ibc_comms_error = error;
1901
1902         if (conn->ibc_state != IBLND_CONN_ESTABLISHED)
1903                 return; /* already being handled  */
1904
1905         if (!error &&
1906             list_empty(&conn->ibc_tx_noops) &&
1907             list_empty(&conn->ibc_tx_queue) &&
1908             list_empty(&conn->ibc_tx_queue_rsrvd) &&
1909             list_empty(&conn->ibc_tx_queue_nocred) &&
1910             list_empty(&conn->ibc_active_txs)) {
1911                 CDEBUG(D_NET, "closing conn to %s\n",
1912                        libcfs_nid2str(peer->ibp_nid));
1913         } else {
1914                 CNETERR("Closing conn to %s: error %d%s%s%s%s%s\n",
1915                         libcfs_nid2str(peer->ibp_nid), error,
1916                         list_empty(&conn->ibc_tx_queue) ? "" : "(sending)",
1917                         list_empty(&conn->ibc_tx_noops) ? "" : "(sending_noops)",
1918                         list_empty(&conn->ibc_tx_queue_rsrvd) ? "" : "(sending_rsrvd)",
1919                         list_empty(&conn->ibc_tx_queue_nocred) ? "" : "(sending_nocred)",
1920                         list_empty(&conn->ibc_active_txs) ? "" : "(waiting)");
1921         }
1922
1923         dev = ((struct kib_net *)peer->ibp_ni->ni_data)->ibn_dev;
1924         list_del(&conn->ibc_list);
1925         /* connd (see below) takes over ibc_list's ref */
1926
1927         if (list_empty(&peer->ibp_conns) &&    /* no more conns */
1928             kiblnd_peer_active(peer)) {  /* still in peer table */
1929                 kiblnd_unlink_peer_locked(peer);
1930
1931                 /* set/clear error on last conn */
1932                 peer->ibp_error = conn->ibc_comms_error;
1933         }
1934
1935         kiblnd_set_conn_state(conn, IBLND_CONN_CLOSING);
1936
1937         if (error &&
1938             kiblnd_dev_can_failover(dev)) {
1939                 list_add_tail(&dev->ibd_fail_list,
1940                               &kiblnd_data.kib_failed_devs);
1941                 wake_up(&kiblnd_data.kib_failover_waitq);
1942         }
1943
1944         spin_lock_irqsave(&kiblnd_data.kib_connd_lock, flags);
1945
1946         list_add_tail(&conn->ibc_list, &kiblnd_data.kib_connd_conns);
1947         wake_up(&kiblnd_data.kib_connd_waitq);
1948
1949         spin_unlock_irqrestore(&kiblnd_data.kib_connd_lock, flags);
1950 }
1951
1952 void
1953 kiblnd_close_conn(struct kib_conn *conn, int error)
1954 {
1955         unsigned long flags;
1956
1957         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1958
1959         kiblnd_close_conn_locked(conn, error);
1960
1961         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1962 }
1963
1964 static void
1965 kiblnd_handle_early_rxs(struct kib_conn *conn)
1966 {
1967         unsigned long flags;
1968         struct kib_rx *rx;
1969         struct kib_rx *tmp;
1970
1971         LASSERT(!in_interrupt());
1972         LASSERT(conn->ibc_state >= IBLND_CONN_ESTABLISHED);
1973
1974         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1975         list_for_each_entry_safe(rx, tmp, &conn->ibc_early_rxs, rx_list) {
1976                 list_del(&rx->rx_list);
1977                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1978
1979                 kiblnd_handle_rx(rx);
1980
1981                 write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
1982         }
1983         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
1984 }
1985
1986 static void
1987 kiblnd_abort_txs(struct kib_conn *conn, struct list_head *txs)
1988 {
1989         LIST_HEAD(zombies);
1990         struct list_head *tmp;
1991         struct list_head *nxt;
1992         struct kib_tx *tx;
1993
1994         spin_lock(&conn->ibc_lock);
1995
1996         list_for_each_safe(tmp, nxt, txs) {
1997                 tx = list_entry(tmp, struct kib_tx, tx_list);
1998
1999                 if (txs == &conn->ibc_active_txs) {
2000                         LASSERT(!tx->tx_queued);
2001                         LASSERT(tx->tx_waiting || tx->tx_sending);
2002                 } else {
2003                         LASSERT(tx->tx_queued);
2004                 }
2005
2006                 tx->tx_status = -ECONNABORTED;
2007                 tx->tx_waiting = 0;
2008
2009                 if (!tx->tx_sending) {
2010                         tx->tx_queued = 0;
2011                         list_del(&tx->tx_list);
2012                         list_add(&tx->tx_list, &zombies);
2013                 }
2014         }
2015
2016         spin_unlock(&conn->ibc_lock);
2017
2018         kiblnd_txlist_done(conn->ibc_peer->ibp_ni, &zombies, -ECONNABORTED);
2019 }
2020
2021 static void
2022 kiblnd_finalise_conn(struct kib_conn *conn)
2023 {
2024         LASSERT(!in_interrupt());
2025         LASSERT(conn->ibc_state > IBLND_CONN_INIT);
2026
2027         kiblnd_set_conn_state(conn, IBLND_CONN_DISCONNECTED);
2028
2029         /*
2030          * abort_receives moves QP state to IB_QPS_ERR.  This is only required
2031          * for connections that didn't get as far as being connected, because
2032          * rdma_disconnect() does this for free.
2033          */
2034         kiblnd_abort_receives(conn);
2035
2036         /*
2037          * Complete all tx descs not waiting for sends to complete.
2038          * NB we should be safe from RDMA now that the QP has changed state
2039          */
2040         kiblnd_abort_txs(conn, &conn->ibc_tx_noops);
2041         kiblnd_abort_txs(conn, &conn->ibc_tx_queue);
2042         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_rsrvd);
2043         kiblnd_abort_txs(conn, &conn->ibc_tx_queue_nocred);
2044         kiblnd_abort_txs(conn, &conn->ibc_active_txs);
2045
2046         kiblnd_handle_early_rxs(conn);
2047 }
2048
2049 static void
2050 kiblnd_peer_connect_failed(struct kib_peer *peer, int active, int error)
2051 {
2052         LIST_HEAD(zombies);
2053         unsigned long flags;
2054
2055         LASSERT(error);
2056         LASSERT(!in_interrupt());
2057
2058         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2059
2060         if (active) {
2061                 LASSERT(peer->ibp_connecting > 0);
2062                 peer->ibp_connecting--;
2063         } else {
2064                 LASSERT(peer->ibp_accepting > 0);
2065                 peer->ibp_accepting--;
2066         }
2067
2068         if (kiblnd_peer_connecting(peer)) {
2069                 /* another connection attempt under way... */
2070                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock,
2071                                         flags);
2072                 return;
2073         }
2074
2075         peer->ibp_reconnected = 0;
2076         if (list_empty(&peer->ibp_conns)) {
2077                 /* Take peer's blocked transmits to complete with error */
2078                 list_add(&zombies, &peer->ibp_tx_queue);
2079                 list_del_init(&peer->ibp_tx_queue);
2080
2081                 if (kiblnd_peer_active(peer))
2082                         kiblnd_unlink_peer_locked(peer);
2083
2084                 peer->ibp_error = error;
2085         } else {
2086                 /* Can't have blocked transmits if there are connections */
2087                 LASSERT(list_empty(&peer->ibp_tx_queue));
2088         }
2089
2090         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2091
2092         kiblnd_peer_notify(peer);
2093
2094         if (list_empty(&zombies))
2095                 return;
2096
2097         CNETERR("Deleting messages for %s: connection failed\n",
2098                 libcfs_nid2str(peer->ibp_nid));
2099
2100         kiblnd_txlist_done(peer->ibp_ni, &zombies, -EHOSTUNREACH);
2101 }
2102
2103 static void
2104 kiblnd_connreq_done(struct kib_conn *conn, int status)
2105 {
2106         struct kib_peer *peer = conn->ibc_peer;
2107         struct kib_tx *tx;
2108         struct kib_tx *tmp;
2109         struct list_head txs;
2110         unsigned long flags;
2111         int active;
2112
2113         active = (conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2114
2115         CDEBUG(D_NET, "%s: active(%d), version(%x), status(%d)\n",
2116                libcfs_nid2str(peer->ibp_nid), active,
2117                conn->ibc_version, status);
2118
2119         LASSERT(!in_interrupt());
2120         LASSERT((conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT &&
2121                  peer->ibp_connecting > 0) ||
2122                  (conn->ibc_state == IBLND_CONN_PASSIVE_WAIT &&
2123                  peer->ibp_accepting > 0));
2124
2125         LIBCFS_FREE(conn->ibc_connvars, sizeof(*conn->ibc_connvars));
2126         conn->ibc_connvars = NULL;
2127
2128         if (status) {
2129                 /* failed to establish connection */
2130                 kiblnd_peer_connect_failed(peer, active, status);
2131                 kiblnd_finalise_conn(conn);
2132                 return;
2133         }
2134
2135         /* connection established */
2136         write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2137
2138         conn->ibc_last_send = jiffies;
2139         kiblnd_set_conn_state(conn, IBLND_CONN_ESTABLISHED);
2140         kiblnd_peer_alive(peer);
2141
2142         /*
2143          * Add conn to peer's list and nuke any dangling conns from a different
2144          * peer instance...
2145          */
2146         kiblnd_conn_addref(conn);              /* +1 ref for ibc_list */
2147         list_add(&conn->ibc_list, &peer->ibp_conns);
2148         peer->ibp_reconnected = 0;
2149         if (active)
2150                 peer->ibp_connecting--;
2151         else
2152                 peer->ibp_accepting--;
2153
2154         if (!peer->ibp_version) {
2155                 peer->ibp_version     = conn->ibc_version;
2156                 peer->ibp_incarnation = conn->ibc_incarnation;
2157         }
2158
2159         if (peer->ibp_version     != conn->ibc_version ||
2160             peer->ibp_incarnation != conn->ibc_incarnation) {
2161                 kiblnd_close_stale_conns_locked(peer, conn->ibc_version,
2162                                                 conn->ibc_incarnation);
2163                 peer->ibp_version     = conn->ibc_version;
2164                 peer->ibp_incarnation = conn->ibc_incarnation;
2165         }
2166
2167         /* grab pending txs while I have the lock */
2168         list_add(&txs, &peer->ibp_tx_queue);
2169         list_del_init(&peer->ibp_tx_queue);
2170
2171         if (!kiblnd_peer_active(peer) ||        /* peer has been deleted */
2172             conn->ibc_comms_error) {       /* error has happened already */
2173                 lnet_ni_t *ni = peer->ibp_ni;
2174
2175                 /* start to shut down connection */
2176                 kiblnd_close_conn_locked(conn, -ECONNABORTED);
2177                 write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2178
2179                 kiblnd_txlist_done(ni, &txs, -ECONNABORTED);
2180
2181                 return;
2182         }
2183
2184         /*
2185          * +1 ref for myself, this connection is visible to other threads
2186          * now, refcount of peer:ibp_conns can be released by connection
2187          * close from either a different thread, or the calling of
2188          * kiblnd_check_sends_locked() below. See bz21911 for details.
2189          */
2190         kiblnd_conn_addref(conn);
2191         write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2192
2193         /* Schedule blocked txs */
2194         spin_lock(&conn->ibc_lock);
2195         list_for_each_entry_safe(tx, tmp, &txs, tx_list) {
2196                 list_del(&tx->tx_list);
2197
2198                 kiblnd_queue_tx_locked(tx, conn);
2199         }
2200         kiblnd_check_sends_locked(conn);
2201         spin_unlock(&conn->ibc_lock);
2202
2203         /* schedule blocked rxs */
2204         kiblnd_handle_early_rxs(conn);
2205
2206         kiblnd_conn_decref(conn);
2207 }
2208
2209 static void
2210 kiblnd_reject(struct rdma_cm_id *cmid, struct kib_rej *rej)
2211 {
2212         int rc;
2213
2214         rc = rdma_reject(cmid, rej, sizeof(*rej));
2215
2216         if (rc)
2217                 CWARN("Error %d sending reject\n", rc);
2218 }
2219
2220 static int
2221 kiblnd_passive_connect(struct rdma_cm_id *cmid, void *priv, int priv_nob)
2222 {
2223         rwlock_t *g_lock = &kiblnd_data.kib_global_lock;
2224         struct kib_msg *reqmsg = priv;
2225         struct kib_msg *ackmsg;
2226         struct kib_dev *ibdev;
2227         struct kib_peer *peer;
2228         struct kib_peer *peer2;
2229         struct kib_conn *conn;
2230         lnet_ni_t *ni  = NULL;
2231         struct kib_net *net = NULL;
2232         lnet_nid_t nid;
2233         struct rdma_conn_param cp;
2234         struct kib_rej rej;
2235         int version = IBLND_MSG_VERSION;
2236         unsigned long flags;
2237         int max_frags;
2238         int rc;
2239         struct sockaddr_in *peer_addr;
2240
2241         LASSERT(!in_interrupt());
2242
2243         /* cmid inherits 'context' from the corresponding listener id */
2244         ibdev = (struct kib_dev *)cmid->context;
2245         LASSERT(ibdev);
2246
2247         memset(&rej, 0, sizeof(rej));
2248         rej.ibr_magic = IBLND_MSG_MAGIC;
2249         rej.ibr_why = IBLND_REJECT_FATAL;
2250         rej.ibr_cp.ibcp_max_msg_size = IBLND_MSG_SIZE;
2251
2252         peer_addr = (struct sockaddr_in *)&cmid->route.addr.dst_addr;
2253         if (*kiblnd_tunables.kib_require_priv_port &&
2254             ntohs(peer_addr->sin_port) >= PROT_SOCK) {
2255                 __u32 ip = ntohl(peer_addr->sin_addr.s_addr);
2256
2257                 CERROR("Peer's port (%pI4h:%hu) is not privileged\n",
2258                        &ip, ntohs(peer_addr->sin_port));
2259                 goto failed;
2260         }
2261
2262         if (priv_nob < offsetof(struct kib_msg, ibm_type)) {
2263                 CERROR("Short connection request\n");
2264                 goto failed;
2265         }
2266
2267         /*
2268          * Future protocol version compatibility support!  If the
2269          * o2iblnd-specific protocol changes, or when LNET unifies
2270          * protocols over all LNDs, the initial connection will
2271          * negotiate a protocol version.  I trap this here to avoid
2272          * console errors; the reject tells the peer which protocol I
2273          * speak.
2274          */
2275         if (reqmsg->ibm_magic == LNET_PROTO_MAGIC ||
2276             reqmsg->ibm_magic == __swab32(LNET_PROTO_MAGIC))
2277                 goto failed;
2278         if (reqmsg->ibm_magic == IBLND_MSG_MAGIC &&
2279             reqmsg->ibm_version != IBLND_MSG_VERSION &&
2280             reqmsg->ibm_version != IBLND_MSG_VERSION_1)
2281                 goto failed;
2282         if (reqmsg->ibm_magic == __swab32(IBLND_MSG_MAGIC) &&
2283             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION) &&
2284             reqmsg->ibm_version != __swab16(IBLND_MSG_VERSION_1))
2285                 goto failed;
2286
2287         rc = kiblnd_unpack_msg(reqmsg, priv_nob);
2288         if (rc) {
2289                 CERROR("Can't parse connection request: %d\n", rc);
2290                 goto failed;
2291         }
2292
2293         nid = reqmsg->ibm_srcnid;
2294         ni = lnet_net2ni(LNET_NIDNET(reqmsg->ibm_dstnid));
2295
2296         if (ni) {
2297                 net = (struct kib_net *)ni->ni_data;
2298                 rej.ibr_incarnation = net->ibn_incarnation;
2299         }
2300
2301         if (!ni ||                       /* no matching net */
2302             ni->ni_nid != reqmsg->ibm_dstnid ||   /* right NET, wrong NID! */
2303             net->ibn_dev != ibdev) {          /* wrong device */
2304                 CERROR("Can't accept conn from %s on %s (%s:%d:%pI4h): bad dst nid %s\n",
2305                        libcfs_nid2str(nid),
2306                        !ni ? "NA" : libcfs_nid2str(ni->ni_nid),
2307                        ibdev->ibd_ifname, ibdev->ibd_nnets,
2308                        &ibdev->ibd_ifip,
2309                        libcfs_nid2str(reqmsg->ibm_dstnid));
2310
2311                 goto failed;
2312         }
2313
2314        /* check time stamp as soon as possible */
2315         if (reqmsg->ibm_dststamp &&
2316             reqmsg->ibm_dststamp != net->ibn_incarnation) {
2317                 CWARN("Stale connection request\n");
2318                 rej.ibr_why = IBLND_REJECT_CONN_STALE;
2319                 goto failed;
2320         }
2321
2322         /* I can accept peer's version */
2323         version = reqmsg->ibm_version;
2324
2325         if (reqmsg->ibm_type != IBLND_MSG_CONNREQ) {
2326                 CERROR("Unexpected connreq msg type: %x from %s\n",
2327                        reqmsg->ibm_type, libcfs_nid2str(nid));
2328                 goto failed;
2329         }
2330
2331         if (reqmsg->ibm_u.connparams.ibcp_queue_depth >
2332             kiblnd_msg_queue_size(version, ni)) {
2333                 CERROR("Can't accept conn from %s, queue depth too large: %d (<=%d wanted)\n",
2334                        libcfs_nid2str(nid),
2335                        reqmsg->ibm_u.connparams.ibcp_queue_depth,
2336                        kiblnd_msg_queue_size(version, ni));
2337
2338                 if (version == IBLND_MSG_VERSION)
2339                         rej.ibr_why = IBLND_REJECT_MSG_QUEUE_SIZE;
2340
2341                 goto failed;
2342         }
2343
2344         max_frags = reqmsg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2345         if (max_frags > kiblnd_rdma_frags(version, ni)) {
2346                 CWARN("Can't accept conn from %s (version %x): max message size %d is too large (%d wanted)\n",
2347                       libcfs_nid2str(nid), version, max_frags,
2348                       kiblnd_rdma_frags(version, ni));
2349
2350                 if (version >= IBLND_MSG_VERSION)
2351                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2352
2353                 goto failed;
2354         } else if (max_frags < kiblnd_rdma_frags(version, ni) &&
2355                    !net->ibn_fmr_ps) {
2356                 CWARN("Can't accept conn from %s (version %x): max message size %d incompatible without FMR pool (%d wanted)\n",
2357                       libcfs_nid2str(nid), version, max_frags,
2358                       kiblnd_rdma_frags(version, ni));
2359
2360                 if (version == IBLND_MSG_VERSION)
2361                         rej.ibr_why = IBLND_REJECT_RDMA_FRAGS;
2362
2363                 goto failed;
2364         }
2365
2366         if (reqmsg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2367                 CERROR("Can't accept %s: message size %d too big (%d max)\n",
2368                        libcfs_nid2str(nid),
2369                        reqmsg->ibm_u.connparams.ibcp_max_msg_size,
2370                        IBLND_MSG_SIZE);
2371                 goto failed;
2372         }
2373
2374         /* assume 'nid' is a new peer; create  */
2375         rc = kiblnd_create_peer(ni, &peer, nid);
2376         if (rc) {
2377                 CERROR("Can't create peer for %s\n", libcfs_nid2str(nid));
2378                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2379                 goto failed;
2380         }
2381
2382         /* We have validated the peer's parameters so use those */
2383         peer->ibp_max_frags = max_frags;
2384         peer->ibp_queue_depth = reqmsg->ibm_u.connparams.ibcp_queue_depth;
2385
2386         write_lock_irqsave(g_lock, flags);
2387
2388         peer2 = kiblnd_find_peer_locked(nid);
2389         if (peer2) {
2390                 if (!peer2->ibp_version) {
2391                         peer2->ibp_version     = version;
2392                         peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2393                 }
2394
2395                 /* not the guy I've talked with */
2396                 if (peer2->ibp_incarnation != reqmsg->ibm_srcstamp ||
2397                     peer2->ibp_version     != version) {
2398                         kiblnd_close_peer_conns_locked(peer2, -ESTALE);
2399
2400                         if (kiblnd_peer_active(peer2)) {
2401                                 peer2->ibp_incarnation = reqmsg->ibm_srcstamp;
2402                                 peer2->ibp_version = version;
2403                         }
2404                         write_unlock_irqrestore(g_lock, flags);
2405
2406                         CWARN("Conn stale %s version %x/%x incarnation %llu/%llu\n",
2407                               libcfs_nid2str(nid), peer2->ibp_version, version,
2408                               peer2->ibp_incarnation, reqmsg->ibm_srcstamp);
2409
2410                         kiblnd_peer_decref(peer);
2411                         rej.ibr_why = IBLND_REJECT_CONN_STALE;
2412                         goto failed;
2413                 }
2414
2415                 /*
2416                  * Tie-break connection race in favour of the higher NID.
2417                  * If we keep running into a race condition multiple times,
2418                  * we have to assume that the connection attempt with the
2419                  * higher NID is stuck in a connecting state and will never
2420                  * recover.  As such, we pass through this if-block and let
2421                  * the lower NID connection win so we can move forward.
2422                  */
2423                 if (peer2->ibp_connecting &&
2424                     nid < ni->ni_nid && peer2->ibp_races <
2425                     MAX_CONN_RACES_BEFORE_ABORT) {
2426                         peer2->ibp_races++;
2427                         write_unlock_irqrestore(g_lock, flags);
2428
2429                         CDEBUG(D_NET, "Conn race %s\n",
2430                                libcfs_nid2str(peer2->ibp_nid));
2431
2432                         kiblnd_peer_decref(peer);
2433                         rej.ibr_why = IBLND_REJECT_CONN_RACE;
2434                         goto failed;
2435                 }
2436                 if (peer2->ibp_races >= MAX_CONN_RACES_BEFORE_ABORT)
2437                         CNETERR("Conn race %s: unresolved after %d attempts, letting lower NID win\n",
2438                                 libcfs_nid2str(peer2->ibp_nid),
2439                                 MAX_CONN_RACES_BEFORE_ABORT);
2440                 /**
2441                  * passive connection is allowed even this peer is waiting for
2442                  * reconnection.
2443                  */
2444                 peer2->ibp_reconnecting = 0;
2445                 peer2->ibp_races = 0;
2446                 peer2->ibp_accepting++;
2447                 kiblnd_peer_addref(peer2);
2448
2449                 /**
2450                  * Race with kiblnd_launch_tx (active connect) to create peer
2451                  * so copy validated parameters since we now know what the
2452                  * peer's limits are
2453                  */
2454                 peer2->ibp_max_frags = peer->ibp_max_frags;
2455                 peer2->ibp_queue_depth = peer->ibp_queue_depth;
2456
2457                 write_unlock_irqrestore(g_lock, flags);
2458                 kiblnd_peer_decref(peer);
2459                 peer = peer2;
2460         } else {
2461                 /* Brand new peer */
2462                 LASSERT(!peer->ibp_accepting);
2463                 LASSERT(!peer->ibp_version &&
2464                         !peer->ibp_incarnation);
2465
2466                 peer->ibp_accepting   = 1;
2467                 peer->ibp_version     = version;
2468                 peer->ibp_incarnation = reqmsg->ibm_srcstamp;
2469
2470                 /* I have a ref on ni that prevents it being shutdown */
2471                 LASSERT(!net->ibn_shutdown);
2472
2473                 kiblnd_peer_addref(peer);
2474                 list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
2475
2476                 write_unlock_irqrestore(g_lock, flags);
2477         }
2478
2479         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_PASSIVE_WAIT,
2480                                   version);
2481         if (!conn) {
2482                 kiblnd_peer_connect_failed(peer, 0, -ENOMEM);
2483                 kiblnd_peer_decref(peer);
2484                 rej.ibr_why = IBLND_REJECT_NO_RESOURCES;
2485                 goto failed;
2486         }
2487
2488         /*
2489          * conn now "owns" cmid, so I return success from here on to ensure the
2490          * CM callback doesn't destroy cmid.
2491          */
2492         conn->ibc_incarnation      = reqmsg->ibm_srcstamp;
2493         conn->ibc_credits          = conn->ibc_queue_depth;
2494         conn->ibc_reserved_credits = conn->ibc_queue_depth;
2495         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2496                 IBLND_OOB_MSGS(version) <= IBLND_RX_MSGS(conn));
2497
2498         ackmsg = &conn->ibc_connvars->cv_msg;
2499         memset(ackmsg, 0, sizeof(*ackmsg));
2500
2501         kiblnd_init_msg(ackmsg, IBLND_MSG_CONNACK,
2502                         sizeof(ackmsg->ibm_u.connparams));
2503         ackmsg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2504         ackmsg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2505         ackmsg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2506
2507         kiblnd_pack_msg(ni, ackmsg, version, 0, nid, reqmsg->ibm_srcstamp);
2508
2509         memset(&cp, 0, sizeof(cp));
2510         cp.private_data = ackmsg;
2511         cp.private_data_len = ackmsg->ibm_nob;
2512         cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2513         cp.initiator_depth = 0;
2514         cp.flow_control = 1;
2515         cp.retry_count = *kiblnd_tunables.kib_retry_count;
2516         cp.rnr_retry_count = *kiblnd_tunables.kib_rnr_retry_count;
2517
2518         CDEBUG(D_NET, "Accept %s\n", libcfs_nid2str(nid));
2519
2520         rc = rdma_accept(cmid, &cp);
2521         if (rc) {
2522                 CERROR("Can't accept %s: %d\n", libcfs_nid2str(nid), rc);
2523                 rej.ibr_version = version;
2524                 rej.ibr_why     = IBLND_REJECT_FATAL;
2525
2526                 kiblnd_reject(cmid, &rej);
2527                 kiblnd_connreq_done(conn, rc);
2528                 kiblnd_conn_decref(conn);
2529         }
2530
2531         lnet_ni_decref(ni);
2532         return 0;
2533
2534  failed:
2535         if (ni) {
2536                 rej.ibr_cp.ibcp_queue_depth = kiblnd_msg_queue_size(version, ni);
2537                 rej.ibr_cp.ibcp_max_frags = kiblnd_rdma_frags(version, ni);
2538                 lnet_ni_decref(ni);
2539         }
2540
2541         rej.ibr_version             = version;
2542         kiblnd_reject(cmid, &rej);
2543
2544         return -ECONNREFUSED;
2545 }
2546
2547 static void
2548 kiblnd_check_reconnect(struct kib_conn *conn, int version,
2549                        __u64 incarnation, int why, struct kib_connparams *cp)
2550 {
2551         rwlock_t *glock = &kiblnd_data.kib_global_lock;
2552         struct kib_peer *peer = conn->ibc_peer;
2553         char *reason;
2554         int msg_size = IBLND_MSG_SIZE;
2555         int frag_num = -1;
2556         int queue_dep = -1;
2557         bool reconnect;
2558         unsigned long flags;
2559
2560         LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2561         LASSERT(peer->ibp_connecting > 0);     /* 'conn' at least */
2562         LASSERT(!peer->ibp_reconnecting);
2563
2564         if (cp) {
2565                 msg_size = cp->ibcp_max_msg_size;
2566                 frag_num        = cp->ibcp_max_frags << IBLND_FRAG_SHIFT;
2567                 queue_dep = cp->ibcp_queue_depth;
2568         }
2569
2570         write_lock_irqsave(glock, flags);
2571         /**
2572          * retry connection if it's still needed and no other connection
2573          * attempts (active or passive) are in progress
2574          * NB: reconnect is still needed even when ibp_tx_queue is
2575          * empty if ibp_version != version because reconnect may be
2576          * initiated by kiblnd_query()
2577          */
2578         reconnect = (!list_empty(&peer->ibp_tx_queue) ||
2579                      peer->ibp_version != version) &&
2580                     peer->ibp_connecting == 1 &&
2581                     !peer->ibp_accepting;
2582         if (!reconnect) {
2583                 reason = "no need";
2584                 goto out;
2585         }
2586
2587         switch (why) {
2588         default:
2589                 reason = "Unknown";
2590                 break;
2591
2592         case IBLND_REJECT_RDMA_FRAGS: {
2593                 struct lnet_ioctl_config_lnd_tunables *tunables;
2594
2595                 if (!cp) {
2596                         reason = "can't negotiate max frags";
2597                         goto out;
2598                 }
2599                 tunables = peer->ibp_ni->ni_lnd_tunables;
2600                 if (!tunables->lt_tun_u.lt_o2ib.lnd_map_on_demand) {
2601                         reason = "map_on_demand must be enabled";
2602                         goto out;
2603                 }
2604                 if (conn->ibc_max_frags <= frag_num) {
2605                         reason = "unsupported max frags";
2606                         goto out;
2607                 }
2608
2609                 peer->ibp_max_frags = frag_num;
2610                 reason = "rdma fragments";
2611                 break;
2612         }
2613         case IBLND_REJECT_MSG_QUEUE_SIZE:
2614                 if (!cp) {
2615                         reason = "can't negotiate queue depth";
2616                         goto out;
2617                 }
2618                 if (conn->ibc_queue_depth <= queue_dep) {
2619                         reason = "unsupported queue depth";
2620                         goto out;
2621                 }
2622
2623                 peer->ibp_queue_depth = queue_dep;
2624                 reason = "queue depth";
2625                 break;
2626
2627         case IBLND_REJECT_CONN_STALE:
2628                 reason = "stale";
2629                 break;
2630
2631         case IBLND_REJECT_CONN_RACE:
2632                 reason = "conn race";
2633                 break;
2634
2635         case IBLND_REJECT_CONN_UNCOMPAT:
2636                 reason = "version negotiation";
2637                 break;
2638         }
2639
2640         conn->ibc_reconnect = 1;
2641         peer->ibp_reconnecting = 1;
2642         peer->ibp_version = version;
2643         if (incarnation)
2644                 peer->ibp_incarnation = incarnation;
2645 out:
2646         write_unlock_irqrestore(glock, flags);
2647
2648         CNETERR("%s: %s (%s), %x, %x, msg_size: %d, queue_depth: %d/%d, max_frags: %d/%d\n",
2649                 libcfs_nid2str(peer->ibp_nid),
2650                 reconnect ? "reconnect" : "don't reconnect",
2651                 reason, IBLND_MSG_VERSION, version, msg_size,
2652                 conn->ibc_queue_depth, queue_dep,
2653                 conn->ibc_max_frags, frag_num);
2654         /**
2655          * if conn::ibc_reconnect is TRUE, connd will reconnect to the peer
2656          * while destroying the zombie
2657          */
2658 }
2659
2660 static void
2661 kiblnd_rejected(struct kib_conn *conn, int reason, void *priv, int priv_nob)
2662 {
2663         struct kib_peer *peer = conn->ibc_peer;
2664
2665         LASSERT(!in_interrupt());
2666         LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT);
2667
2668         switch (reason) {
2669         case IB_CM_REJ_STALE_CONN:
2670                 kiblnd_check_reconnect(conn, IBLND_MSG_VERSION, 0,
2671                                        IBLND_REJECT_CONN_STALE, NULL);
2672                 break;
2673
2674         case IB_CM_REJ_INVALID_SERVICE_ID:
2675                 CNETERR("%s rejected: no listener at %d\n",
2676                         libcfs_nid2str(peer->ibp_nid),
2677                         *kiblnd_tunables.kib_service);
2678                 break;
2679
2680         case IB_CM_REJ_CONSUMER_DEFINED:
2681                 if (priv_nob >= offsetof(struct kib_rej, ibr_padding)) {
2682                         struct kib_rej *rej = priv;
2683                         struct kib_connparams *cp = NULL;
2684                         int flip = 0;
2685                         __u64 incarnation = -1;
2686
2687                         /* NB. default incarnation is -1 because:
2688                          * a) V1 will ignore dst incarnation in connreq.
2689                          * b) V2 will provide incarnation while rejecting me,
2690                          *    -1 will be overwrote.
2691                          *
2692                          * if I try to connect to a V1 peer with V2 protocol,
2693                          * it rejected me then upgrade to V2, I have no idea
2694                          * about the upgrading and try to reconnect with V1,
2695                          * in this case upgraded V2 can find out I'm trying to
2696                          * talk to the old guy and reject me(incarnation is -1).
2697                          */
2698
2699                         if (rej->ibr_magic == __swab32(IBLND_MSG_MAGIC) ||
2700                             rej->ibr_magic == __swab32(LNET_PROTO_MAGIC)) {
2701                                 __swab32s(&rej->ibr_magic);
2702                                 __swab16s(&rej->ibr_version);
2703                                 flip = 1;
2704                         }
2705
2706                         if (priv_nob >= sizeof(struct kib_rej) &&
2707                             rej->ibr_version > IBLND_MSG_VERSION_1) {
2708                                 /*
2709                                  * priv_nob is always 148 in current version
2710                                  * of OFED, so we still need to check version.
2711                                  * (define of IB_CM_REJ_PRIVATE_DATA_SIZE)
2712                                  */
2713                                 cp = &rej->ibr_cp;
2714
2715                                 if (flip) {
2716                                         __swab64s(&rej->ibr_incarnation);
2717                                         __swab16s(&cp->ibcp_queue_depth);
2718                                         __swab16s(&cp->ibcp_max_frags);
2719                                         __swab32s(&cp->ibcp_max_msg_size);
2720                                 }
2721
2722                                 incarnation = rej->ibr_incarnation;
2723                         }
2724
2725                         if (rej->ibr_magic != IBLND_MSG_MAGIC &&
2726                             rej->ibr_magic != LNET_PROTO_MAGIC) {
2727                                 CERROR("%s rejected: consumer defined fatal error\n",
2728                                        libcfs_nid2str(peer->ibp_nid));
2729                                 break;
2730                         }
2731
2732                         if (rej->ibr_version != IBLND_MSG_VERSION &&
2733                             rej->ibr_version != IBLND_MSG_VERSION_1) {
2734                                 CERROR("%s rejected: o2iblnd version %x error\n",
2735                                        libcfs_nid2str(peer->ibp_nid),
2736                                        rej->ibr_version);
2737                                 break;
2738                         }
2739
2740                         if (rej->ibr_why     == IBLND_REJECT_FATAL &&
2741                             rej->ibr_version == IBLND_MSG_VERSION_1) {
2742                                 CDEBUG(D_NET, "rejected by old version peer %s: %x\n",
2743                                        libcfs_nid2str(peer->ibp_nid), rej->ibr_version);
2744
2745                                 if (conn->ibc_version != IBLND_MSG_VERSION_1)
2746                                         rej->ibr_why = IBLND_REJECT_CONN_UNCOMPAT;
2747                         }
2748
2749                         switch (rej->ibr_why) {
2750                         case IBLND_REJECT_CONN_RACE:
2751                         case IBLND_REJECT_CONN_STALE:
2752                         case IBLND_REJECT_CONN_UNCOMPAT:
2753                         case IBLND_REJECT_MSG_QUEUE_SIZE:
2754                         case IBLND_REJECT_RDMA_FRAGS:
2755                                 kiblnd_check_reconnect(conn, rej->ibr_version,
2756                                                        incarnation,
2757                                                        rej->ibr_why, cp);
2758                                 break;
2759
2760                         case IBLND_REJECT_NO_RESOURCES:
2761                                 CERROR("%s rejected: o2iblnd no resources\n",
2762                                        libcfs_nid2str(peer->ibp_nid));
2763                                 break;
2764
2765                         case IBLND_REJECT_FATAL:
2766                                 CERROR("%s rejected: o2iblnd fatal error\n",
2767                                        libcfs_nid2str(peer->ibp_nid));
2768                                 break;
2769
2770                         default:
2771                                 CERROR("%s rejected: o2iblnd reason %d\n",
2772                                        libcfs_nid2str(peer->ibp_nid),
2773                                        rej->ibr_why);
2774                                 break;
2775                         }
2776                         break;
2777                 }
2778                 /* fall through */
2779         default:
2780                 CNETERR("%s rejected: reason %d, size %d\n",
2781                         libcfs_nid2str(peer->ibp_nid), reason, priv_nob);
2782                 break;
2783         }
2784
2785         kiblnd_connreq_done(conn, -ECONNREFUSED);
2786 }
2787
2788 static void
2789 kiblnd_check_connreply(struct kib_conn *conn, void *priv, int priv_nob)
2790 {
2791         struct kib_peer *peer = conn->ibc_peer;
2792         lnet_ni_t *ni = peer->ibp_ni;
2793         struct kib_net *net = ni->ni_data;
2794         struct kib_msg *msg = priv;
2795         int ver = conn->ibc_version;
2796         int rc = kiblnd_unpack_msg(msg, priv_nob);
2797         unsigned long flags;
2798
2799         LASSERT(net);
2800
2801         if (rc) {
2802                 CERROR("Can't unpack connack from %s: %d\n",
2803                        libcfs_nid2str(peer->ibp_nid), rc);
2804                 goto failed;
2805         }
2806
2807         if (msg->ibm_type != IBLND_MSG_CONNACK) {
2808                 CERROR("Unexpected message %d from %s\n",
2809                        msg->ibm_type, libcfs_nid2str(peer->ibp_nid));
2810                 rc = -EPROTO;
2811                 goto failed;
2812         }
2813
2814         if (ver != msg->ibm_version) {
2815                 CERROR("%s replied version %x is different with requested version %x\n",
2816                        libcfs_nid2str(peer->ibp_nid), msg->ibm_version, ver);
2817                 rc = -EPROTO;
2818                 goto failed;
2819         }
2820
2821         if (msg->ibm_u.connparams.ibcp_queue_depth >
2822             conn->ibc_queue_depth) {
2823                 CERROR("%s has incompatible queue depth %d (<=%d wanted)\n",
2824                        libcfs_nid2str(peer->ibp_nid),
2825                        msg->ibm_u.connparams.ibcp_queue_depth,
2826                        conn->ibc_queue_depth);
2827                 rc = -EPROTO;
2828                 goto failed;
2829         }
2830
2831         if ((msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT) >
2832             conn->ibc_max_frags) {
2833                 CERROR("%s has incompatible max_frags %d (<=%d wanted)\n",
2834                        libcfs_nid2str(peer->ibp_nid),
2835                        msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT,
2836                        conn->ibc_max_frags);
2837                 rc = -EPROTO;
2838                 goto failed;
2839         }
2840
2841         if (msg->ibm_u.connparams.ibcp_max_msg_size > IBLND_MSG_SIZE) {
2842                 CERROR("%s max message size %d too big (%d max)\n",
2843                        libcfs_nid2str(peer->ibp_nid),
2844                        msg->ibm_u.connparams.ibcp_max_msg_size,
2845                        IBLND_MSG_SIZE);
2846                 rc = -EPROTO;
2847                 goto failed;
2848         }
2849
2850         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2851         if (msg->ibm_dstnid == ni->ni_nid &&
2852             msg->ibm_dststamp == net->ibn_incarnation)
2853                 rc = 0;
2854         else
2855                 rc = -ESTALE;
2856         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2857
2858         if (rc) {
2859                 CERROR("Bad connection reply from %s, rc = %d, version: %x max_frags: %d\n",
2860                        libcfs_nid2str(peer->ibp_nid), rc,
2861                        msg->ibm_version, msg->ibm_u.connparams.ibcp_max_frags);
2862                 goto failed;
2863         }
2864
2865         conn->ibc_incarnation = msg->ibm_srcstamp;
2866         conn->ibc_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2867         conn->ibc_reserved_credits = msg->ibm_u.connparams.ibcp_queue_depth;
2868         conn->ibc_queue_depth = msg->ibm_u.connparams.ibcp_queue_depth;
2869         conn->ibc_max_frags = msg->ibm_u.connparams.ibcp_max_frags >> IBLND_FRAG_SHIFT;
2870         LASSERT(conn->ibc_credits + conn->ibc_reserved_credits +
2871                 IBLND_OOB_MSGS(ver) <= IBLND_RX_MSGS(conn));
2872
2873         kiblnd_connreq_done(conn, 0);
2874         return;
2875
2876  failed:
2877         /*
2878          * NB My QP has already established itself, so I handle anything going
2879          * wrong here by setting ibc_comms_error.
2880          * kiblnd_connreq_done(0) moves the conn state to ESTABLISHED, but then
2881          * immediately tears it down.
2882          */
2883         LASSERT(rc);
2884         conn->ibc_comms_error = rc;
2885         kiblnd_connreq_done(conn, 0);
2886 }
2887
2888 static int
2889 kiblnd_active_connect(struct rdma_cm_id *cmid)
2890 {
2891         struct kib_peer *peer = (struct kib_peer *)cmid->context;
2892         struct kib_conn *conn;
2893         struct kib_msg *msg;
2894         struct rdma_conn_param cp;
2895         int version;
2896         __u64 incarnation;
2897         unsigned long flags;
2898         int rc;
2899
2900         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
2901
2902         incarnation = peer->ibp_incarnation;
2903         version = !peer->ibp_version ? IBLND_MSG_VERSION :
2904                                        peer->ibp_version;
2905
2906         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
2907
2908         conn = kiblnd_create_conn(peer, cmid, IBLND_CONN_ACTIVE_CONNECT,
2909                                   version);
2910         if (!conn) {
2911                 kiblnd_peer_connect_failed(peer, 1, -ENOMEM);
2912                 kiblnd_peer_decref(peer); /* lose cmid's ref */
2913                 return -ENOMEM;
2914         }
2915
2916         /*
2917          * conn "owns" cmid now, so I return success from here on to ensure the
2918          * CM callback doesn't destroy cmid. conn also takes over cmid's ref
2919          * on peer
2920          */
2921         msg = &conn->ibc_connvars->cv_msg;
2922
2923         memset(msg, 0, sizeof(*msg));
2924         kiblnd_init_msg(msg, IBLND_MSG_CONNREQ, sizeof(msg->ibm_u.connparams));
2925         msg->ibm_u.connparams.ibcp_queue_depth = conn->ibc_queue_depth;
2926         msg->ibm_u.connparams.ibcp_max_frags = conn->ibc_max_frags << IBLND_FRAG_SHIFT;
2927         msg->ibm_u.connparams.ibcp_max_msg_size = IBLND_MSG_SIZE;
2928
2929         kiblnd_pack_msg(peer->ibp_ni, msg, version,
2930                         0, peer->ibp_nid, incarnation);
2931
2932         memset(&cp, 0, sizeof(cp));
2933         cp.private_data = msg;
2934         cp.private_data_len    = msg->ibm_nob;
2935         cp.responder_resources = 0;          /* No atomic ops or RDMA reads */
2936         cp.initiator_depth     = 0;
2937         cp.flow_control        = 1;
2938         cp.retry_count         = *kiblnd_tunables.kib_retry_count;
2939         cp.rnr_retry_count     = *kiblnd_tunables.kib_rnr_retry_count;
2940
2941         LASSERT(cmid->context == (void *)conn);
2942         LASSERT(conn->ibc_cmid == cmid);
2943
2944         rc = rdma_connect(cmid, &cp);
2945         if (rc) {
2946                 CERROR("Can't connect to %s: %d\n",
2947                        libcfs_nid2str(peer->ibp_nid), rc);
2948                 kiblnd_connreq_done(conn, rc);
2949                 kiblnd_conn_decref(conn);
2950         }
2951
2952         return 0;
2953 }
2954
2955 int
2956 kiblnd_cm_callback(struct rdma_cm_id *cmid, struct rdma_cm_event *event)
2957 {
2958         struct kib_peer *peer;
2959         struct kib_conn *conn;
2960         int rc;
2961
2962         switch (event->event) {
2963         default:
2964                 CERROR("Unexpected event: %d, status: %d\n",
2965                        event->event, event->status);
2966                 LBUG();
2967
2968         case RDMA_CM_EVENT_CONNECT_REQUEST:
2969                 /* destroy cmid on failure */
2970                 rc = kiblnd_passive_connect(cmid,
2971                                             (void *)KIBLND_CONN_PARAM(event),
2972                                             KIBLND_CONN_PARAM_LEN(event));
2973                 CDEBUG(D_NET, "connreq: %d\n", rc);
2974                 return rc;
2975
2976         case RDMA_CM_EVENT_ADDR_ERROR:
2977                 peer = (struct kib_peer *)cmid->context;
2978                 CNETERR("%s: ADDR ERROR %d\n",
2979                         libcfs_nid2str(peer->ibp_nid), event->status);
2980                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
2981                 kiblnd_peer_decref(peer);
2982                 return -EHOSTUNREACH;      /* rc destroys cmid */
2983
2984         case RDMA_CM_EVENT_ADDR_RESOLVED:
2985                 peer = (struct kib_peer *)cmid->context;
2986
2987                 CDEBUG(D_NET, "%s Addr resolved: %d\n",
2988                        libcfs_nid2str(peer->ibp_nid), event->status);
2989
2990                 if (event->status) {
2991                         CNETERR("Can't resolve address for %s: %d\n",
2992                                 libcfs_nid2str(peer->ibp_nid), event->status);
2993                         rc = event->status;
2994                 } else {
2995                         rc = rdma_resolve_route(
2996                                 cmid, *kiblnd_tunables.kib_timeout * 1000);
2997                         if (!rc)
2998                                 return 0;
2999                         /* Can't initiate route resolution */
3000                         CERROR("Can't resolve route for %s: %d\n",
3001                                libcfs_nid2str(peer->ibp_nid), rc);
3002                 }
3003                 kiblnd_peer_connect_failed(peer, 1, rc);
3004                 kiblnd_peer_decref(peer);
3005                 return rc;                    /* rc destroys cmid */
3006
3007         case RDMA_CM_EVENT_ROUTE_ERROR:
3008                 peer = (struct kib_peer *)cmid->context;
3009                 CNETERR("%s: ROUTE ERROR %d\n",
3010                         libcfs_nid2str(peer->ibp_nid), event->status);
3011                 kiblnd_peer_connect_failed(peer, 1, -EHOSTUNREACH);
3012                 kiblnd_peer_decref(peer);
3013                 return -EHOSTUNREACH;      /* rc destroys cmid */
3014
3015         case RDMA_CM_EVENT_ROUTE_RESOLVED:
3016                 peer = (struct kib_peer *)cmid->context;
3017                 CDEBUG(D_NET, "%s Route resolved: %d\n",
3018                        libcfs_nid2str(peer->ibp_nid), event->status);
3019
3020                 if (!event->status)
3021                         return kiblnd_active_connect(cmid);
3022
3023                 CNETERR("Can't resolve route for %s: %d\n",
3024                         libcfs_nid2str(peer->ibp_nid), event->status);
3025                 kiblnd_peer_connect_failed(peer, 1, event->status);
3026                 kiblnd_peer_decref(peer);
3027                 return event->status;      /* rc destroys cmid */
3028
3029         case RDMA_CM_EVENT_UNREACHABLE:
3030                 conn = (struct kib_conn *)cmid->context;
3031                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3032                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3033                 CNETERR("%s: UNREACHABLE %d\n",
3034                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3035                 kiblnd_connreq_done(conn, -ENETDOWN);
3036                 kiblnd_conn_decref(conn);
3037                 return 0;
3038
3039         case RDMA_CM_EVENT_CONNECT_ERROR:
3040                 conn = (struct kib_conn *)cmid->context;
3041                 LASSERT(conn->ibc_state == IBLND_CONN_ACTIVE_CONNECT ||
3042                         conn->ibc_state == IBLND_CONN_PASSIVE_WAIT);
3043                 CNETERR("%s: CONNECT ERROR %d\n",
3044                         libcfs_nid2str(conn->ibc_peer->ibp_nid), event->status);
3045                 kiblnd_connreq_done(conn, -ENOTCONN);
3046                 kiblnd_conn_decref(conn);
3047                 return 0;
3048
3049         case RDMA_CM_EVENT_REJECTED:
3050                 conn = (struct kib_conn *)cmid->context;
3051                 switch (conn->ibc_state) {
3052                 default:
3053                         LBUG();
3054
3055                 case IBLND_CONN_PASSIVE_WAIT:
3056                         CERROR("%s: REJECTED %d\n",
3057                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
3058                                event->status);
3059                         kiblnd_connreq_done(conn, -ECONNRESET);
3060                         break;
3061
3062                 case IBLND_CONN_ACTIVE_CONNECT:
3063                         kiblnd_rejected(conn, event->status,
3064                                         (void *)KIBLND_CONN_PARAM(event),
3065                                         KIBLND_CONN_PARAM_LEN(event));
3066                         break;
3067                 }
3068                 kiblnd_conn_decref(conn);
3069                 return 0;
3070
3071         case RDMA_CM_EVENT_ESTABLISHED:
3072                 conn = (struct kib_conn *)cmid->context;
3073                 switch (conn->ibc_state) {
3074                 default:
3075                         LBUG();
3076
3077                 case IBLND_CONN_PASSIVE_WAIT:
3078                         CDEBUG(D_NET, "ESTABLISHED (passive): %s\n",
3079                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3080                         kiblnd_connreq_done(conn, 0);
3081                         break;
3082
3083                 case IBLND_CONN_ACTIVE_CONNECT:
3084                         CDEBUG(D_NET, "ESTABLISHED(active): %s\n",
3085                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3086                         kiblnd_check_connreply(conn,
3087                                                (void *)KIBLND_CONN_PARAM(event),
3088                                                KIBLND_CONN_PARAM_LEN(event));
3089                         break;
3090                 }
3091                 /* net keeps its ref on conn! */
3092                 return 0;
3093
3094         case RDMA_CM_EVENT_TIMEWAIT_EXIT:
3095                 CDEBUG(D_NET, "Ignore TIMEWAIT_EXIT event\n");
3096                 return 0;
3097         case RDMA_CM_EVENT_DISCONNECTED:
3098                 conn = (struct kib_conn *)cmid->context;
3099                 if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {
3100                         CERROR("%s DISCONNECTED\n",
3101                                libcfs_nid2str(conn->ibc_peer->ibp_nid));
3102                         kiblnd_connreq_done(conn, -ECONNRESET);
3103                 } else {
3104                         kiblnd_close_conn(conn, 0);
3105                 }
3106                 kiblnd_conn_decref(conn);
3107                 cmid->context = NULL;
3108                 return 0;
3109
3110         case RDMA_CM_EVENT_DEVICE_REMOVAL:
3111                 LCONSOLE_ERROR_MSG(0x131,
3112                                    "Received notification of device removal\n"
3113                                    "Please shutdown LNET to allow this to proceed\n");
3114                 /*
3115                  * Can't remove network from underneath LNET for now, so I have
3116                  * to ignore this
3117                  */
3118                 return 0;
3119
3120         case RDMA_CM_EVENT_ADDR_CHANGE:
3121                 LCONSOLE_INFO("Physical link changed (eg hca/port)\n");
3122                 return 0;
3123         }
3124 }
3125
3126 static int
3127 kiblnd_check_txs_locked(struct kib_conn *conn, struct list_head *txs)
3128 {
3129         struct kib_tx *tx;
3130         struct list_head *ttmp;
3131
3132         list_for_each(ttmp, txs) {
3133                 tx = list_entry(ttmp, struct kib_tx, tx_list);
3134
3135                 if (txs != &conn->ibc_active_txs) {
3136                         LASSERT(tx->tx_queued);
3137                 } else {
3138                         LASSERT(!tx->tx_queued);
3139                         LASSERT(tx->tx_waiting || tx->tx_sending);
3140                 }
3141
3142                 if (cfs_time_aftereq(jiffies, tx->tx_deadline)) {
3143                         CERROR("Timed out tx: %s, %lu seconds\n",
3144                                kiblnd_queue2str(conn, txs),
3145                                cfs_duration_sec(jiffies - tx->tx_deadline));
3146                         return 1;
3147                 }
3148         }
3149
3150         return 0;
3151 }
3152
3153 static int
3154 kiblnd_conn_timed_out_locked(struct kib_conn *conn)
3155 {
3156         return  kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue) ||
3157                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_noops) ||
3158                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_rsrvd) ||
3159                 kiblnd_check_txs_locked(conn, &conn->ibc_tx_queue_nocred) ||
3160                 kiblnd_check_txs_locked(conn, &conn->ibc_active_txs);
3161 }
3162
3163 static void
3164 kiblnd_check_conns(int idx)
3165 {
3166         LIST_HEAD(closes);
3167         LIST_HEAD(checksends);
3168         struct list_head *peers = &kiblnd_data.kib_peers[idx];
3169         struct list_head *ptmp;
3170         struct kib_peer *peer;
3171         struct kib_conn *conn;
3172         struct kib_conn *temp;
3173         struct kib_conn *tmp;
3174         struct list_head *ctmp;
3175         unsigned long flags;
3176
3177         /*
3178          * NB. We expect to have a look at all the peers and not find any
3179          * RDMAs to time out, so we just use a shared lock while we
3180          * take a look...
3181          */
3182         read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);
3183
3184         list_for_each(ptmp, peers) {
3185                 peer = list_entry(ptmp, struct kib_peer, ibp_list);
3186
3187                 list_for_each(ctmp, &peer->ibp_conns) {
3188                         int timedout;
3189                         int sendnoop;
3190
3191                         conn = list_entry(ctmp, struct kib_conn, ibc_list);
3192
3193                         LASSERT(conn->ibc_state == IBLND_CONN_ESTABLISHED);
3194
3195                         spin_lock(&conn->ibc_lock);
3196
3197                         sendnoop = kiblnd_need_noop(conn);
3198                         timedout = kiblnd_conn_timed_out_locked(conn);
3199                         if (!sendnoop && !timedout) {
3200                                 spin_unlock(&conn->ibc_lock);
3201                                 continue;
3202                         }
3203
3204                         if (timedout) {
3205                                 CERROR("Timed out RDMA with %s (%lu): c: %u, oc: %u, rc: %u\n",
3206                                        libcfs_nid2str(peer->ibp_nid),
3207                                        cfs_duration_sec(cfs_time_current() -
3208                                                         peer->ibp_last_alive),
3209                                        conn->ibc_credits,
3210                                        conn->ibc_outstanding_credits,
3211                                        conn->ibc_reserved_credits);
3212                                 list_add(&conn->ibc_connd_list, &closes);
3213                         } else {
3214                                 list_add(&conn->ibc_connd_list, &checksends);
3215                         }
3216                         /* +ref for 'closes' or 'checksends' */
3217                         kiblnd_conn_addref(conn);
3218
3219                         spin_unlock(&conn->ibc_lock);
3220                 }
3221         }
3222
3223         read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);
3224
3225         /*
3226          * Handle timeout by closing the whole
3227          * connection. We can only be sure RDMA activity
3228          * has ceased once the QP has been modified.
3229          */
3230         list_for_each_entry_safe(conn, tmp, &closes, ibc_connd_list) {
3231                 list_del(&conn->ibc_connd_list);
3232                 kiblnd_close_conn(conn, -ETIMEDOUT);
3233                 kiblnd_conn_decref(conn);
3234         }
3235
3236         /*
3237          * In case we have enough credits to return via a
3238          * NOOP, but there were no non-blocking tx descs
3239          * free to do it last time...
3240          */
3241         list_for_each_entry_safe(conn, temp, &checksends, ibc_connd_list) {
3242                 list_del(&conn->ibc_connd_list);
3243
3244                 spin_lock(&conn->ibc_lock);
3245                 kiblnd_check_sends_locked(conn);
3246                 spin_unlock(&conn->ibc_lock);
3247
3248                 kiblnd_conn_decref(conn);
3249         }
3250 }
3251
3252 static void
3253 kiblnd_disconnect_conn(struct kib_conn *conn)
3254 {
3255         LASSERT(!in_interrupt());
3256         LASSERT(current == kiblnd_data.kib_connd);
3257         LASSERT(conn->ibc_state == IBLND_CONN_CLOSING);
3258
3259         rdma_disconnect(conn->ibc_cmid);
3260         kiblnd_finalise_conn(conn);
3261
3262         kiblnd_peer_notify(conn->ibc_peer);
3263 }
3264
3265 /**
3266  * High-water for reconnection to the same peer, reconnection attempt should
3267  * be delayed after trying more than KIB_RECONN_HIGH_RACE.
3268  */
3269 #define KIB_RECONN_HIGH_RACE    10
3270 /**
3271  * Allow connd to take a break and handle other things after consecutive
3272  * reconnection attemps.
3273  */
3274 #define KIB_RECONN_BREAK        100
3275
3276 int
3277 kiblnd_connd(void *arg)
3278 {
3279         spinlock_t *lock= &kiblnd_data.kib_connd_lock;
3280         wait_queue_t wait;
3281         unsigned long flags;
3282         struct kib_conn *conn;
3283         int timeout;
3284         int i;
3285         int dropped_lock;
3286         int peer_index = 0;
3287         unsigned long deadline = jiffies;
3288
3289         cfs_block_allsigs();
3290
3291         init_waitqueue_entry(&wait, current);
3292         kiblnd_data.kib_connd = current;
3293
3294         spin_lock_irqsave(lock, flags);
3295
3296         while (!kiblnd_data.kib_shutdown) {
3297                 int reconn = 0;
3298
3299                 dropped_lock = 0;
3300
3301                 if (!list_empty(&kiblnd_data.kib_connd_zombies)) {
3302                         struct kib_peer *peer = NULL;
3303
3304                         conn = list_entry(kiblnd_data.kib_connd_zombies.next,
3305                                           struct kib_conn, ibc_list);
3306                         list_del(&conn->ibc_list);
3307                         if (conn->ibc_reconnect) {
3308                                 peer = conn->ibc_peer;
3309                                 kiblnd_peer_addref(peer);
3310                         }
3311
3312                         spin_unlock_irqrestore(lock, flags);
3313                         dropped_lock = 1;
3314
3315                         kiblnd_destroy_conn(conn, !peer);
3316
3317                         spin_lock_irqsave(lock, flags);
3318                         if (!peer)
3319                                 continue;
3320
3321                         conn->ibc_peer = peer;
3322                         if (peer->ibp_reconnected < KIB_RECONN_HIGH_RACE)
3323                                 list_add_tail(&conn->ibc_list,
3324                                               &kiblnd_data.kib_reconn_list);
3325                         else
3326                                 list_add_tail(&conn->ibc_list,
3327                                               &kiblnd_data.kib_reconn_wait);
3328                 }
3329
3330                 if (!list_empty(&kiblnd_data.kib_connd_conns)) {
3331                         conn = list_entry(kiblnd_data.kib_connd_conns.next,
3332                                           struct kib_conn, ibc_list);
3333                         list_del(&conn->ibc_list);
3334
3335                         spin_unlock_irqrestore(lock, flags);
3336                         dropped_lock = 1;
3337
3338                         kiblnd_disconnect_conn(conn);
3339                         kiblnd_conn_decref(conn);
3340
3341                         spin_lock_irqsave(lock, flags);
3342                 }
3343
3344                 while (reconn < KIB_RECONN_BREAK) {
3345                         if (kiblnd_data.kib_reconn_sec !=
3346                             ktime_get_real_seconds()) {
3347                                 kiblnd_data.kib_reconn_sec = ktime_get_real_seconds();
3348                                 list_splice_init(&kiblnd_data.kib_reconn_wait,
3349                                                  &kiblnd_data.kib_reconn_list);
3350                         }
3351
3352                         if (list_empty(&kiblnd_data.kib_reconn_list))
3353                                 break;
3354
3355                         conn = list_entry(kiblnd_data.kib_reconn_list.next,
3356                                           struct kib_conn, ibc_list);
3357                         list_del(&conn->ibc_list);
3358
3359                         spin_unlock_irqrestore(lock, flags);
3360                         dropped_lock = 1;
3361
3362                         reconn += kiblnd_reconnect_peer(conn->ibc_peer);
3363                         kiblnd_peer_decref(conn->ibc_peer);
3364                         LIBCFS_FREE(conn, sizeof(*conn));
3365
3366                         spin_lock_irqsave(lock, flags);
3367                 }
3368
3369                 /* careful with the jiffy wrap... */
3370                 timeout = (int)(deadline - jiffies);
3371                 if (timeout <= 0) {
3372                         const int n = 4;
3373                         const int p = 1;
3374                         int chunk = kiblnd_data.kib_peer_hash_size;
3375
3376                         spin_unlock_irqrestore(lock, flags);
3377                         dropped_lock = 1;
3378
3379                         /*
3380                          * Time to check for RDMA timeouts on a few more
3381                          * peers: I do checks every 'p' seconds on a
3382                          * proportion of the peer table and I need to check
3383                          * every connection 'n' times within a timeout
3384                          * interval, to ensure I detect a timeout on any
3385                          * connection within (n+1)/n times the timeout
3386                          * interval.
3387                          */
3388                         if (*kiblnd_tunables.kib_timeout > n * p)
3389                                 chunk = (chunk * n * p) /
3390                                         *kiblnd_tunables.kib_timeout;
3391                         if (!chunk)
3392                                 chunk = 1;
3393
3394                         for (i = 0; i < chunk; i++) {
3395                                 kiblnd_check_conns(peer_index);
3396                                 peer_index = (peer_index + 1) %
3397                                              kiblnd_data.kib_peer_hash_size;
3398                         }
3399
3400                         deadline += msecs_to_jiffies(p * MSEC_PER_SEC);
3401                         spin_lock_irqsave(lock, flags);
3402                 }
3403
3404                 if (dropped_lock)
3405                         continue;
3406
3407                 /* Nothing to do for 'timeout'  */
3408                 set_current_state(TASK_INTERRUPTIBLE);
3409                 add_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3410                 spin_unlock_irqrestore(lock, flags);
3411
3412                 schedule_timeout(timeout);
3413
3414                 remove_wait_queue(&kiblnd_data.kib_connd_waitq, &wait);
3415                 spin_lock_irqsave(lock, flags);
3416         }
3417
3418         spin_unlock_irqrestore(lock, flags);
3419
3420         kiblnd_thread_fini();
3421         return 0;
3422 }
3423
3424 void
3425 kiblnd_qp_event(struct ib_event *event, void *arg)
3426 {
3427         struct kib_conn *conn = arg;
3428
3429         switch (event->event) {
3430         case IB_EVENT_COMM_EST:
3431                 CDEBUG(D_NET, "%s established\n",
3432                        libcfs_nid2str(conn->ibc_peer->ibp_nid));
3433                 /*
3434                  * We received a packet but connection isn't established
3435                  * probably handshake packet was lost, so free to
3436                  * force make connection established
3437                  */
3438                 rdma_notify(conn->ibc_cmid, IB_EVENT_COMM_EST);
3439                 return;
3440
3441         default:
3442                 CERROR("%s: Async QP event type %d\n",
3443                        libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3444                 return;
3445         }
3446 }
3447
3448 static void
3449 kiblnd_complete(struct ib_wc *wc)
3450 {
3451         switch (kiblnd_wreqid2type(wc->wr_id)) {
3452         default:
3453                 LBUG();
3454
3455         case IBLND_WID_MR:
3456                 if (wc->status != IB_WC_SUCCESS &&
3457                     wc->status != IB_WC_WR_FLUSH_ERR)
3458                         CNETERR("FastReg failed: %d\n", wc->status);
3459                 break;
3460
3461         case IBLND_WID_RDMA:
3462                 /*
3463                  * We only get RDMA completion notification if it fails.  All
3464                  * subsequent work items, including the final SEND will fail
3465                  * too.  However we can't print out any more info about the
3466                  * failing RDMA because 'tx' might be back on the idle list or
3467                  * even reused already if we didn't manage to post all our work
3468                  * items
3469                  */
3470                 CNETERR("RDMA (tx: %p) failed: %d\n",
3471                         kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3472                 return;
3473
3474         case IBLND_WID_TX:
3475                 kiblnd_tx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status);
3476                 return;
3477
3478         case IBLND_WID_RX:
3479                 kiblnd_rx_complete(kiblnd_wreqid2ptr(wc->wr_id), wc->status,
3480                                    wc->byte_len);
3481                 return;
3482         }
3483 }
3484
3485 void
3486 kiblnd_cq_completion(struct ib_cq *cq, void *arg)
3487 {
3488         /*
3489          * NB I'm not allowed to schedule this conn once its refcount has
3490          * reached 0.  Since fundamentally I'm racing with scheduler threads
3491          * consuming my CQ I could be called after all completions have
3492          * occurred.  But in this case, !ibc_nrx && !ibc_nsends_posted
3493          * and this CQ is about to be destroyed so I NOOP.
3494          */
3495         struct kib_conn *conn = arg;
3496         struct kib_sched_info *sched = conn->ibc_sched;
3497         unsigned long flags;
3498
3499         LASSERT(cq == conn->ibc_cq);
3500
3501         spin_lock_irqsave(&sched->ibs_lock, flags);
3502
3503         conn->ibc_ready = 1;
3504
3505         if (!conn->ibc_scheduled &&
3506             (conn->ibc_nrx > 0 ||
3507              conn->ibc_nsends_posted > 0)) {
3508                 kiblnd_conn_addref(conn); /* +1 ref for sched_conns */
3509                 conn->ibc_scheduled = 1;
3510                 list_add_tail(&conn->ibc_sched_list, &sched->ibs_conns);
3511
3512                 if (waitqueue_active(&sched->ibs_waitq))
3513                         wake_up(&sched->ibs_waitq);
3514         }
3515
3516         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3517 }
3518
3519 void
3520 kiblnd_cq_event(struct ib_event *event, void *arg)
3521 {
3522         struct kib_conn *conn = arg;
3523
3524         CERROR("%s: async CQ event type %d\n",
3525                libcfs_nid2str(conn->ibc_peer->ibp_nid), event->event);
3526 }
3527
3528 int
3529 kiblnd_scheduler(void *arg)
3530 {
3531         long id = (long)arg;
3532         struct kib_sched_info *sched;
3533         struct kib_conn *conn;
3534         wait_queue_t wait;
3535         unsigned long flags;
3536         struct ib_wc wc;
3537         int did_something;
3538         int busy_loops = 0;
3539         int rc;
3540
3541         cfs_block_allsigs();
3542
3543         init_waitqueue_entry(&wait, current);
3544
3545         sched = kiblnd_data.kib_scheds[KIB_THREAD_CPT(id)];
3546
3547         rc = cfs_cpt_bind(lnet_cpt_table(), sched->ibs_cpt);
3548         if (rc) {
3549                 CWARN("Failed to bind on CPT %d, please verify whether all CPUs are healthy and reload modules if necessary, otherwise your system might under risk of low performance\n",
3550                       sched->ibs_cpt);
3551         }
3552
3553         spin_lock_irqsave(&sched->ibs_lock, flags);
3554
3555         while (!kiblnd_data.kib_shutdown) {
3556                 if (busy_loops++ >= IBLND_RESCHED) {
3557                         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3558
3559                         cond_resched();
3560                         busy_loops = 0;
3561
3562                         spin_lock_irqsave(&sched->ibs_lock, flags);
3563                 }
3564
3565                 did_something = 0;
3566
3567                 if (!list_empty(&sched->ibs_conns)) {
3568                         conn = list_entry(sched->ibs_conns.next, struct kib_conn,
3569                                           ibc_sched_list);
3570                         /* take over kib_sched_conns' ref on conn... */
3571                         LASSERT(conn->ibc_scheduled);
3572                         list_del(&conn->ibc_sched_list);
3573                         conn->ibc_ready = 0;
3574
3575                         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3576
3577                         wc.wr_id = IBLND_WID_INVAL;
3578
3579                         rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3580                         if (!rc) {
3581                                 rc = ib_req_notify_cq(conn->ibc_cq,
3582                                                       IB_CQ_NEXT_COMP);
3583                                 if (rc < 0) {
3584                                         CWARN("%s: ib_req_notify_cq failed: %d, closing connection\n",
3585                                               libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);
3586                                         kiblnd_close_conn(conn, -EIO);
3587                                         kiblnd_conn_decref(conn);
3588                                         spin_lock_irqsave(&sched->ibs_lock,
3589                                                           flags);
3590                                         continue;
3591                                 }
3592
3593                                 rc = ib_poll_cq(conn->ibc_cq, 1, &wc);
3594                         }
3595
3596                         if (unlikely(rc > 0 && wc.wr_id == IBLND_WID_INVAL)) {
3597                                 LCONSOLE_ERROR("ib_poll_cq (rc: %d) returned invalid wr_id, opcode %d, status: %d, vendor_err: %d, conn: %s status: %d\nplease upgrade firmware and OFED or contact vendor.\n",
3598                                                rc, wc.opcode, wc.status,
3599                                                wc.vendor_err,
3600                                                libcfs_nid2str(conn->ibc_peer->ibp_nid),
3601                                                conn->ibc_state);
3602                                 rc = -EINVAL;
3603                         }
3604
3605                         if (rc < 0) {
3606                                 CWARN("%s: ib_poll_cq failed: %d, closing connection\n",
3607                                       libcfs_nid2str(conn->ibc_peer->ibp_nid),
3608                                       rc);
3609                                 kiblnd_close_conn(conn, -EIO);
3610                                 kiblnd_conn_decref(conn);
3611                                 spin_lock_irqsave(&sched->ibs_lock, flags);
3612                                 continue;
3613                         }
3614
3615                         spin_lock_irqsave(&sched->ibs_lock, flags);
3616
3617                         if (rc || conn->ibc_ready) {
3618                                 /*
3619                                  * There may be another completion waiting; get
3620                                  * another scheduler to check while I handle
3621                                  * this one...
3622                                  */
3623                                 /* +1 ref for sched_conns */
3624                                 kiblnd_conn_addref(conn);
3625                                 list_add_tail(&conn->ibc_sched_list,
3626                                               &sched->ibs_conns);
3627                                 if (waitqueue_active(&sched->ibs_waitq))
3628                                         wake_up(&sched->ibs_waitq);
3629                         } else {
3630                                 conn->ibc_scheduled = 0;
3631                         }
3632
3633                         if (rc) {
3634                                 spin_unlock_irqrestore(&sched->ibs_lock, flags);
3635                                 kiblnd_complete(&wc);
3636
3637                                 spin_lock_irqsave(&sched->ibs_lock, flags);
3638                         }
3639
3640                         kiblnd_conn_decref(conn); /* ...drop my ref from above */
3641                         did_something = 1;
3642                 }
3643
3644                 if (did_something)
3645                         continue;
3646
3647                 set_current_state(TASK_INTERRUPTIBLE);
3648                 add_wait_queue_exclusive(&sched->ibs_waitq, &wait);
3649                 spin_unlock_irqrestore(&sched->ibs_lock, flags);
3650
3651                 schedule();
3652                 busy_loops = 0;
3653
3654                 remove_wait_queue(&sched->ibs_waitq, &wait);
3655                 spin_lock_irqsave(&sched->ibs_lock, flags);
3656         }
3657
3658         spin_unlock_irqrestore(&sched->ibs_lock, flags);
3659
3660         kiblnd_thread_fini();
3661         return 0;
3662 }
3663
3664 int
3665 kiblnd_failover_thread(void *arg)
3666 {
3667         rwlock_t *glock = &kiblnd_data.kib_global_lock;
3668         struct kib_dev *dev;
3669         wait_queue_t wait;
3670         unsigned long flags;
3671         int rc;
3672
3673         LASSERT(*kiblnd_tunables.kib_dev_failover);
3674
3675         cfs_block_allsigs();
3676
3677         init_waitqueue_entry(&wait, current);
3678         write_lock_irqsave(glock, flags);
3679
3680         while (!kiblnd_data.kib_shutdown) {
3681                 int do_failover = 0;
3682                 int long_sleep;
3683
3684                 list_for_each_entry(dev, &kiblnd_data.kib_failed_devs,
3685                                     ibd_fail_list) {
3686                         if (time_before(cfs_time_current(),
3687                                         dev->ibd_next_failover))
3688                                 continue;
3689                         do_failover = 1;
3690                         break;
3691                 }
3692
3693                 if (do_failover) {
3694                         list_del_init(&dev->ibd_fail_list);
3695                         dev->ibd_failover = 1;
3696                         write_unlock_irqrestore(glock, flags);
3697
3698                         rc = kiblnd_dev_failover(dev);
3699
3700                         write_lock_irqsave(glock, flags);
3701
3702                         LASSERT(dev->ibd_failover);
3703                         dev->ibd_failover = 0;
3704                         if (rc >= 0) { /* Device is OK or failover succeed */
3705                                 dev->ibd_next_failover = cfs_time_shift(3);
3706                                 continue;
3707                         }
3708
3709                         /* failed to failover, retry later */
3710                         dev->ibd_next_failover =
3711                                 cfs_time_shift(min(dev->ibd_failed_failover, 10));
3712                         if (kiblnd_dev_can_failover(dev)) {
3713                                 list_add_tail(&dev->ibd_fail_list,
3714                                               &kiblnd_data.kib_failed_devs);
3715                         }
3716
3717                         continue;
3718                 }
3719
3720                 /* long sleep if no more pending failover */
3721                 long_sleep = list_empty(&kiblnd_data.kib_failed_devs);
3722
3723                 set_current_state(TASK_INTERRUPTIBLE);
3724                 add_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3725                 write_unlock_irqrestore(glock, flags);
3726
3727                 rc = schedule_timeout(long_sleep ? cfs_time_seconds(10) :
3728                                                    cfs_time_seconds(1));
3729                 remove_wait_queue(&kiblnd_data.kib_failover_waitq, &wait);
3730                 write_lock_irqsave(glock, flags);
3731
3732                 if (!long_sleep || rc)
3733                         continue;
3734
3735                 /*
3736                  * have a long sleep, routine check all active devices,
3737                  * we need checking like this because if there is not active
3738                  * connection on the dev and no SEND from local, we may listen
3739                  * on wrong HCA for ever while there is a bonding failover
3740                  */
3741                 list_for_each_entry(dev, &kiblnd_data.kib_devs, ibd_list) {
3742                         if (kiblnd_dev_can_failover(dev)) {
3743                                 list_add_tail(&dev->ibd_fail_list,
3744                                               &kiblnd_data.kib_failed_devs);
3745                         }
3746                 }
3747         }
3748
3749         write_unlock_irqrestore(glock, flags);
3750
3751         kiblnd_thread_fini();
3752         return 0;
3753 }