GNU Linux-libre 4.9.309-gnu1
[releases.git] / net / sunrpc / xprtrdma / verbs.c
1 /*
2  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the BSD-type
8  * license below:
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  *
14  *      Redistributions of source code must retain the above copyright
15  *      notice, this list of conditions and the following disclaimer.
16  *
17  *      Redistributions in binary form must reproduce the above
18  *      copyright notice, this list of conditions and the following
19  *      disclaimer in the documentation and/or other materials provided
20  *      with the distribution.
21  *
22  *      Neither the name of the Network Appliance, Inc. nor the names of
23  *      its contributors may be used to endorse or promote products
24  *      derived from this software without specific prior written
25  *      permission.
26  *
27  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
38  */
39
40 /*
41  * verbs.c
42  *
43  * Encapsulates the major functions managing:
44  *  o adapters
45  *  o endpoints
46  *  o connections
47  *  o buffer memory
48  */
49
50 #include <linux/interrupt.h>
51 #include <linux/slab.h>
52 #include <linux/prefetch.h>
53 #include <linux/sunrpc/addr.h>
54 #include <linux/sunrpc/svc_rdma.h>
55 #include <asm/bitops.h>
56 #include <linux/module.h> /* try_module_get()/module_put() */
57
58 #include "xprt_rdma.h"
59
60 /*
61  * Globals/Macros
62  */
63
64 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
65 # define RPCDBG_FACILITY        RPCDBG_TRANS
66 #endif
67
68 /*
69  * internal functions
70  */
71
72 static struct workqueue_struct *rpcrdma_receive_wq;
73
74 int
75 rpcrdma_alloc_wq(void)
76 {
77         struct workqueue_struct *recv_wq;
78
79         recv_wq = alloc_workqueue("xprtrdma_receive",
80                                   WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
81                                   0);
82         if (!recv_wq)
83                 return -ENOMEM;
84
85         rpcrdma_receive_wq = recv_wq;
86         return 0;
87 }
88
89 void
90 rpcrdma_destroy_wq(void)
91 {
92         struct workqueue_struct *wq;
93
94         if (rpcrdma_receive_wq) {
95                 wq = rpcrdma_receive_wq;
96                 rpcrdma_receive_wq = NULL;
97                 destroy_workqueue(wq);
98         }
99 }
100
101 static void
102 rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
103 {
104         struct rpcrdma_ep *ep = context;
105
106         pr_err("RPC:       %s: %s on device %s ep %p\n",
107                __func__, ib_event_msg(event->event),
108                 event->device->name, context);
109         if (ep->rep_connected == 1) {
110                 ep->rep_connected = -EIO;
111                 rpcrdma_conn_func(ep);
112                 wake_up_all(&ep->rep_connect_wait);
113         }
114 }
115
116 /**
117  * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
118  * @cq: completion queue (ignored)
119  * @wc: completed WR
120  *
121  */
122 static void
123 rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
124 {
125         /* WARNING: Only wr_cqe and status are reliable at this point */
126         if (wc->status != IB_WC_SUCCESS && wc->status != IB_WC_WR_FLUSH_ERR)
127                 pr_err("rpcrdma: Send: %s (%u/0x%x)\n",
128                        ib_wc_status_msg(wc->status),
129                        wc->status, wc->vendor_err);
130 }
131
132 /* Perform basic sanity checking to avoid using garbage
133  * to update the credit grant value.
134  */
135 static void
136 rpcrdma_update_granted_credits(struct rpcrdma_rep *rep)
137 {
138         struct rpcrdma_msg *rmsgp = rdmab_to_msg(rep->rr_rdmabuf);
139         struct rpcrdma_buffer *buffer = &rep->rr_rxprt->rx_buf;
140         u32 credits;
141
142         if (rep->rr_len < RPCRDMA_HDRLEN_ERR)
143                 return;
144
145         credits = be32_to_cpu(rmsgp->rm_credit);
146         if (credits == 0)
147                 credits = 1;    /* don't deadlock */
148         else if (credits > buffer->rb_max_requests)
149                 credits = buffer->rb_max_requests;
150
151         atomic_set(&buffer->rb_credits, credits);
152 }
153
154 /**
155  * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
156  * @cq: completion queue (ignored)
157  * @wc: completed WR
158  *
159  */
160 static void
161 rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
162 {
163         struct ib_cqe *cqe = wc->wr_cqe;
164         struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
165                                                rr_cqe);
166
167         /* WARNING: Only wr_id and status are reliable at this point */
168         if (wc->status != IB_WC_SUCCESS)
169                 goto out_fail;
170
171         /* status == SUCCESS means all fields in wc are trustworthy */
172         if (wc->opcode != IB_WC_RECV)
173                 return;
174
175         dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
176                 __func__, rep, wc->byte_len);
177
178         rep->rr_len = wc->byte_len;
179         rep->rr_wc_flags = wc->wc_flags;
180         rep->rr_inv_rkey = wc->ex.invalidate_rkey;
181
182         ib_dma_sync_single_for_cpu(rep->rr_device,
183                                    rdmab_addr(rep->rr_rdmabuf),
184                                    rep->rr_len, DMA_FROM_DEVICE);
185
186         rpcrdma_update_granted_credits(rep);
187
188 out_schedule:
189         queue_work(rpcrdma_receive_wq, &rep->rr_work);
190         return;
191
192 out_fail:
193         if (wc->status != IB_WC_WR_FLUSH_ERR)
194                 pr_err("rpcrdma: Recv: %s (%u/0x%x)\n",
195                        ib_wc_status_msg(wc->status),
196                        wc->status, wc->vendor_err);
197         rep->rr_len = RPCRDMA_BAD_LEN;
198         goto out_schedule;
199 }
200
201 static void
202 rpcrdma_update_connect_private(struct rpcrdma_xprt *r_xprt,
203                                struct rdma_conn_param *param)
204 {
205         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
206         const struct rpcrdma_connect_private *pmsg = param->private_data;
207         unsigned int rsize, wsize;
208
209         /* Default settings for RPC-over-RDMA Version One */
210         r_xprt->rx_ia.ri_reminv_expected = false;
211         r_xprt->rx_ia.ri_implicit_roundup = xprt_rdma_pad_optimize;
212         rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
213         wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
214
215         if (pmsg &&
216             pmsg->cp_magic == rpcrdma_cmp_magic &&
217             pmsg->cp_version == RPCRDMA_CMP_VERSION) {
218                 r_xprt->rx_ia.ri_reminv_expected = true;
219                 r_xprt->rx_ia.ri_implicit_roundup = true;
220                 rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
221                 wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
222         }
223
224         if (rsize < cdata->inline_rsize)
225                 cdata->inline_rsize = rsize;
226         if (wsize < cdata->inline_wsize)
227                 cdata->inline_wsize = wsize;
228         dprintk("RPC:       %s: max send %u, max recv %u\n",
229                 __func__, cdata->inline_wsize, cdata->inline_rsize);
230         rpcrdma_set_max_header_sizes(r_xprt);
231 }
232
233 static int
234 rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
235 {
236         struct rpcrdma_xprt *xprt = id->context;
237         struct rpcrdma_ia *ia = &xprt->rx_ia;
238         struct rpcrdma_ep *ep = &xprt->rx_ep;
239 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
240         struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
241 #endif
242         struct ib_qp_attr *attr = &ia->ri_qp_attr;
243         struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
244         int connstate = 0;
245
246         switch (event->event) {
247         case RDMA_CM_EVENT_ADDR_RESOLVED:
248         case RDMA_CM_EVENT_ROUTE_RESOLVED:
249                 ia->ri_async_rc = 0;
250                 complete(&ia->ri_done);
251                 break;
252         case RDMA_CM_EVENT_ADDR_ERROR:
253                 ia->ri_async_rc = -EHOSTUNREACH;
254                 dprintk("RPC:       %s: CM address resolution error, ep 0x%p\n",
255                         __func__, ep);
256                 complete(&ia->ri_done);
257                 break;
258         case RDMA_CM_EVENT_ROUTE_ERROR:
259                 ia->ri_async_rc = -ENETUNREACH;
260                 dprintk("RPC:       %s: CM route resolution error, ep 0x%p\n",
261                         __func__, ep);
262                 complete(&ia->ri_done);
263                 break;
264         case RDMA_CM_EVENT_ESTABLISHED:
265                 connstate = 1;
266                 ib_query_qp(ia->ri_id->qp, attr,
267                             IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
268                             iattr);
269                 dprintk("RPC:       %s: %d responder resources"
270                         " (%d initiator)\n",
271                         __func__, attr->max_dest_rd_atomic,
272                         attr->max_rd_atomic);
273                 rpcrdma_update_connect_private(xprt, &event->param.conn);
274                 goto connected;
275         case RDMA_CM_EVENT_CONNECT_ERROR:
276                 connstate = -ENOTCONN;
277                 goto connected;
278         case RDMA_CM_EVENT_UNREACHABLE:
279                 connstate = -ENETDOWN;
280                 goto connected;
281         case RDMA_CM_EVENT_REJECTED:
282                 connstate = -ECONNREFUSED;
283                 goto connected;
284         case RDMA_CM_EVENT_DISCONNECTED:
285                 connstate = -ECONNABORTED;
286                 goto connected;
287         case RDMA_CM_EVENT_DEVICE_REMOVAL:
288                 connstate = -ENODEV;
289 connected:
290                 dprintk("RPC:       %s: %sconnected\n",
291                                         __func__, connstate > 0 ? "" : "dis");
292                 atomic_set(&xprt->rx_buf.rb_credits, 1);
293                 ep->rep_connected = connstate;
294                 rpcrdma_conn_func(ep);
295                 wake_up_all(&ep->rep_connect_wait);
296                 /*FALLTHROUGH*/
297         default:
298                 dprintk("RPC:       %s: %pIS:%u (ep 0x%p): %s\n",
299                         __func__, sap, rpc_get_port(sap), ep,
300                         rdma_event_msg(event->event));
301                 break;
302         }
303
304 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
305         if (connstate == 1) {
306                 int ird = attr->max_dest_rd_atomic;
307                 int tird = ep->rep_remote_cma.responder_resources;
308
309                 pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
310                         sap, rpc_get_port(sap),
311                         ia->ri_device->name,
312                         ia->ri_ops->ro_displayname,
313                         xprt->rx_buf.rb_max_requests,
314                         ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
315         } else if (connstate < 0) {
316                 pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
317                         sap, rpc_get_port(sap), connstate);
318         }
319 #endif
320
321         return 0;
322 }
323
324 static void rpcrdma_destroy_id(struct rdma_cm_id *id)
325 {
326         if (id) {
327                 module_put(id->device->owner);
328                 rdma_destroy_id(id);
329         }
330 }
331
332 static struct rdma_cm_id *
333 rpcrdma_create_id(struct rpcrdma_xprt *xprt,
334                         struct rpcrdma_ia *ia, struct sockaddr *addr)
335 {
336         struct rdma_cm_id *id;
337         int rc;
338
339         init_completion(&ia->ri_done);
340
341         id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
342                             IB_QPT_RC);
343         if (IS_ERR(id)) {
344                 rc = PTR_ERR(id);
345                 dprintk("RPC:       %s: rdma_create_id() failed %i\n",
346                         __func__, rc);
347                 return id;
348         }
349
350         ia->ri_async_rc = -ETIMEDOUT;
351         rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
352         if (rc) {
353                 dprintk("RPC:       %s: rdma_resolve_addr() failed %i\n",
354                         __func__, rc);
355                 goto out;
356         }
357         wait_for_completion_interruptible_timeout(&ia->ri_done,
358                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
359
360         /* FIXME:
361          * Until xprtrdma supports DEVICE_REMOVAL, the provider must
362          * be pinned while there are active NFS/RDMA mounts to prevent
363          * hangs and crashes at umount time.
364          */
365         if (!ia->ri_async_rc && !try_module_get(id->device->owner)) {
366                 dprintk("RPC:       %s: Failed to get device module\n",
367                         __func__);
368                 ia->ri_async_rc = -ENODEV;
369         }
370         rc = ia->ri_async_rc;
371         if (rc)
372                 goto out;
373
374         ia->ri_async_rc = -ETIMEDOUT;
375         rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
376         if (rc) {
377                 dprintk("RPC:       %s: rdma_resolve_route() failed %i\n",
378                         __func__, rc);
379                 goto put;
380         }
381         wait_for_completion_interruptible_timeout(&ia->ri_done,
382                                 msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
383         rc = ia->ri_async_rc;
384         if (rc)
385                 goto put;
386
387         return id;
388 put:
389         module_put(id->device->owner);
390 out:
391         rdma_destroy_id(id);
392         return ERR_PTR(rc);
393 }
394
395 /*
396  * Exported functions.
397  */
398
399 /*
400  * Open and initialize an Interface Adapter.
401  *  o initializes fields of struct rpcrdma_ia, including
402  *    interface and provider attributes and protection zone.
403  */
404 int
405 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
406 {
407         struct rpcrdma_ia *ia = &xprt->rx_ia;
408         int rc;
409
410         ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
411         if (IS_ERR(ia->ri_id)) {
412                 rc = PTR_ERR(ia->ri_id);
413                 goto out1;
414         }
415         ia->ri_device = ia->ri_id->device;
416
417         ia->ri_pd = ib_alloc_pd(ia->ri_device, 0);
418         if (IS_ERR(ia->ri_pd)) {
419                 rc = PTR_ERR(ia->ri_pd);
420                 pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
421                 goto out2;
422         }
423
424         switch (memreg) {
425         case RPCRDMA_FRMR:
426                 if (frwr_is_supported(ia)) {
427                         ia->ri_ops = &rpcrdma_frwr_memreg_ops;
428                         break;
429                 }
430                 /*FALLTHROUGH*/
431         case RPCRDMA_MTHCAFMR:
432                 if (fmr_is_supported(ia)) {
433                         ia->ri_ops = &rpcrdma_fmr_memreg_ops;
434                         break;
435                 }
436                 /*FALLTHROUGH*/
437         default:
438                 pr_err("rpcrdma: Unsupported memory registration mode: %d\n",
439                        memreg);
440                 rc = -EINVAL;
441                 goto out3;
442         }
443
444         return 0;
445
446 out3:
447         ib_dealloc_pd(ia->ri_pd);
448         ia->ri_pd = NULL;
449 out2:
450         rpcrdma_destroy_id(ia->ri_id);
451         ia->ri_id = NULL;
452 out1:
453         return rc;
454 }
455
456 /*
457  * Clean up/close an IA.
458  *   o if event handles and PD have been initialized, free them.
459  *   o close the IA
460  */
461 void
462 rpcrdma_ia_close(struct rpcrdma_ia *ia)
463 {
464         dprintk("RPC:       %s: entering\n", __func__);
465         if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
466                 if (ia->ri_id->qp)
467                         rdma_destroy_qp(ia->ri_id);
468                 rpcrdma_destroy_id(ia->ri_id);
469                 ia->ri_id = NULL;
470         }
471
472         /* If the pd is still busy, xprtrdma missed freeing a resource */
473         if (ia->ri_pd && !IS_ERR(ia->ri_pd))
474                 ib_dealloc_pd(ia->ri_pd);
475 }
476
477 /*
478  * Create unconnected endpoint.
479  */
480 int
481 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
482                   struct rpcrdma_create_data_internal *cdata)
483 {
484         struct rpcrdma_connect_private *pmsg = &ep->rep_cm_private;
485         unsigned int max_qp_wr, max_sge;
486         struct ib_cq *sendcq, *recvcq;
487         int rc;
488
489         max_sge = min_t(unsigned int, ia->ri_device->attrs.max_sge,
490                         RPCRDMA_MAX_SEND_SGES);
491         if (max_sge < RPCRDMA_MIN_SEND_SGES) {
492                 pr_warn("rpcrdma: HCA provides only %d send SGEs\n", max_sge);
493                 return -ENOMEM;
494         }
495         ia->ri_max_send_sges = max_sge - RPCRDMA_MIN_SEND_SGES;
496
497         if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
498                 dprintk("RPC:       %s: insufficient wqe's available\n",
499                         __func__);
500                 return -ENOMEM;
501         }
502         max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1;
503
504         /* check provider's send/recv wr limits */
505         if (cdata->max_requests > max_qp_wr)
506                 cdata->max_requests = max_qp_wr;
507
508         ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
509         ep->rep_attr.qp_context = ep;
510         ep->rep_attr.srq = NULL;
511         ep->rep_attr.cap.max_send_wr = cdata->max_requests;
512         ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
513         ep->rep_attr.cap.max_send_wr += 1;      /* drain cqe */
514         rc = ia->ri_ops->ro_open(ia, ep, cdata);
515         if (rc)
516                 return rc;
517         ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
518         ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
519         ep->rep_attr.cap.max_recv_wr += 1;      /* drain cqe */
520         ep->rep_attr.cap.max_send_sge = max_sge;
521         ep->rep_attr.cap.max_recv_sge = 1;
522         ep->rep_attr.cap.max_inline_data = 0;
523         ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
524         ep->rep_attr.qp_type = IB_QPT_RC;
525         ep->rep_attr.port_num = ~0;
526
527         dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
528                 "iovs: send %d recv %d\n",
529                 __func__,
530                 ep->rep_attr.cap.max_send_wr,
531                 ep->rep_attr.cap.max_recv_wr,
532                 ep->rep_attr.cap.max_send_sge,
533                 ep->rep_attr.cap.max_recv_sge);
534
535         /* set trigger for requesting send completion */
536         ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
537         if (ep->rep_cqinit <= 2)
538                 ep->rep_cqinit = 0;     /* always signal? */
539         rpcrdma_init_cqcount(ep, 0);
540         init_waitqueue_head(&ep->rep_connect_wait);
541         INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
542
543         sendcq = ib_alloc_cq(ia->ri_device, NULL,
544                              ep->rep_attr.cap.max_send_wr + 1,
545                              0, IB_POLL_SOFTIRQ);
546         if (IS_ERR(sendcq)) {
547                 rc = PTR_ERR(sendcq);
548                 dprintk("RPC:       %s: failed to create send CQ: %i\n",
549                         __func__, rc);
550                 goto out1;
551         }
552
553         recvcq = ib_alloc_cq(ia->ri_device, NULL,
554                              ep->rep_attr.cap.max_recv_wr + 1,
555                              0, IB_POLL_SOFTIRQ);
556         if (IS_ERR(recvcq)) {
557                 rc = PTR_ERR(recvcq);
558                 dprintk("RPC:       %s: failed to create recv CQ: %i\n",
559                         __func__, rc);
560                 goto out2;
561         }
562
563         ep->rep_attr.send_cq = sendcq;
564         ep->rep_attr.recv_cq = recvcq;
565
566         /* Initialize cma parameters */
567         memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma));
568
569         /* Prepare RDMA-CM private message */
570         pmsg->cp_magic = rpcrdma_cmp_magic;
571         pmsg->cp_version = RPCRDMA_CMP_VERSION;
572         pmsg->cp_flags |= ia->ri_ops->ro_send_w_inv_ok;
573         pmsg->cp_send_size = rpcrdma_encode_buffer_size(cdata->inline_wsize);
574         pmsg->cp_recv_size = rpcrdma_encode_buffer_size(cdata->inline_rsize);
575         ep->rep_remote_cma.private_data = pmsg;
576         ep->rep_remote_cma.private_data_len = sizeof(*pmsg);
577
578         /* Client offers RDMA Read but does not initiate */
579         ep->rep_remote_cma.initiator_depth = 0;
580         if (ia->ri_device->attrs.max_qp_rd_atom > 32)   /* arbitrary but <= 255 */
581                 ep->rep_remote_cma.responder_resources = 32;
582         else
583                 ep->rep_remote_cma.responder_resources =
584                                                 ia->ri_device->attrs.max_qp_rd_atom;
585
586         /* Limit transport retries so client can detect server
587          * GID changes quickly. RPC layer handles re-establishing
588          * transport connection and retransmission.
589          */
590         ep->rep_remote_cma.retry_count = 6;
591
592         /* RPC-over-RDMA handles its own flow control. In addition,
593          * make all RNR NAKs visible so we know that RPC-over-RDMA
594          * flow control is working correctly (no NAKs should be seen).
595          */
596         ep->rep_remote_cma.flow_control = 0;
597         ep->rep_remote_cma.rnr_retry_count = 0;
598
599         return 0;
600
601 out2:
602         ib_free_cq(sendcq);
603 out1:
604         return rc;
605 }
606
607 /*
608  * rpcrdma_ep_destroy
609  *
610  * Disconnect and destroy endpoint. After this, the only
611  * valid operations on the ep are to free it (if dynamically
612  * allocated) or re-create it.
613  */
614 void
615 rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
616 {
617         dprintk("RPC:       %s: entering, connected is %d\n",
618                 __func__, ep->rep_connected);
619
620         cancel_delayed_work_sync(&ep->rep_connect_worker);
621
622         if (ia->ri_id->qp) {
623                 rpcrdma_ep_disconnect(ep, ia);
624                 rdma_destroy_qp(ia->ri_id);
625                 ia->ri_id->qp = NULL;
626         }
627
628         ib_free_cq(ep->rep_attr.recv_cq);
629         ib_free_cq(ep->rep_attr.send_cq);
630 }
631
632 /*
633  * Connect unconnected endpoint.
634  */
635 int
636 rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
637 {
638         struct rdma_cm_id *id, *old;
639         int rc = 0;
640         int retry_count = 0;
641
642         if (ep->rep_connected != 0) {
643                 struct rpcrdma_xprt *xprt;
644 retry:
645                 dprintk("RPC:       %s: reconnecting...\n", __func__);
646
647                 rpcrdma_ep_disconnect(ep, ia);
648
649                 xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
650                 id = rpcrdma_create_id(xprt, ia,
651                                 (struct sockaddr *)&xprt->rx_data.addr);
652                 if (IS_ERR(id)) {
653                         rc = -EHOSTUNREACH;
654                         goto out;
655                 }
656                 /* TEMP TEMP TEMP - fail if new device:
657                  * Deregister/remarshal *all* requests!
658                  * Close and recreate adapter, pd, etc!
659                  * Re-determine all attributes still sane!
660                  * More stuff I haven't thought of!
661                  * Rrrgh!
662                  */
663                 if (ia->ri_device != id->device) {
664                         printk("RPC:       %s: can't reconnect on "
665                                 "different device!\n", __func__);
666                         rpcrdma_destroy_id(id);
667                         rc = -ENETUNREACH;
668                         goto out;
669                 }
670                 /* END TEMP */
671                 rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
672                 if (rc) {
673                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
674                                 __func__, rc);
675                         rpcrdma_destroy_id(id);
676                         rc = -ENETUNREACH;
677                         goto out;
678                 }
679
680                 old = ia->ri_id;
681                 ia->ri_id = id;
682
683                 rdma_destroy_qp(old);
684                 rpcrdma_destroy_id(old);
685         } else {
686                 dprintk("RPC:       %s: connecting...\n", __func__);
687                 rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
688                 if (rc) {
689                         dprintk("RPC:       %s: rdma_create_qp failed %i\n",
690                                 __func__, rc);
691                         /* do not update ep->rep_connected */
692                         return -ENETUNREACH;
693                 }
694         }
695
696         ep->rep_connected = 0;
697
698         rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
699         if (rc) {
700                 dprintk("RPC:       %s: rdma_connect() failed with %i\n",
701                                 __func__, rc);
702                 goto out;
703         }
704
705         wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
706
707         /*
708          * Check state. A non-peer reject indicates no listener
709          * (ECONNREFUSED), which may be a transient state. All
710          * others indicate a transport condition which has already
711          * undergone a best-effort.
712          */
713         if (ep->rep_connected == -ECONNREFUSED &&
714             ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
715                 dprintk("RPC:       %s: non-peer_reject, retry\n", __func__);
716                 goto retry;
717         }
718         if (ep->rep_connected <= 0) {
719                 /* Sometimes, the only way to reliably connect to remote
720                  * CMs is to use same nonzero values for ORD and IRD. */
721                 if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
722                     (ep->rep_remote_cma.responder_resources == 0 ||
723                      ep->rep_remote_cma.initiator_depth !=
724                                 ep->rep_remote_cma.responder_resources)) {
725                         if (ep->rep_remote_cma.responder_resources == 0)
726                                 ep->rep_remote_cma.responder_resources = 1;
727                         ep->rep_remote_cma.initiator_depth =
728                                 ep->rep_remote_cma.responder_resources;
729                         goto retry;
730                 }
731                 rc = ep->rep_connected;
732         } else {
733                 struct rpcrdma_xprt *r_xprt;
734                 unsigned int extras;
735
736                 dprintk("RPC:       %s: connected\n", __func__);
737
738                 r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
739                 extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
740
741                 if (extras) {
742                         rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
743                         if (rc) {
744                                 pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
745                                         __func__, rc);
746                                 rc = 0;
747                         }
748                 }
749         }
750
751 out:
752         if (rc)
753                 ep->rep_connected = rc;
754         return rc;
755 }
756
757 /*
758  * rpcrdma_ep_disconnect
759  *
760  * This is separate from destroy to facilitate the ability
761  * to reconnect without recreating the endpoint.
762  *
763  * This call is not reentrant, and must not be made in parallel
764  * on the same endpoint.
765  */
766 void
767 rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
768 {
769         int rc;
770
771         rc = rdma_disconnect(ia->ri_id);
772         if (!rc) {
773                 /* returns without wait if not connected */
774                 wait_event_interruptible(ep->rep_connect_wait,
775                                                         ep->rep_connected != 1);
776                 dprintk("RPC:       %s: after wait, %sconnected\n", __func__,
777                         (ep->rep_connected == 1) ? "still " : "dis");
778         } else {
779                 dprintk("RPC:       %s: rdma_disconnect %i\n", __func__, rc);
780                 ep->rep_connected = rc;
781         }
782
783         ib_drain_qp(ia->ri_id->qp);
784 }
785
786 static void
787 rpcrdma_mr_recovery_worker(struct work_struct *work)
788 {
789         struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
790                                                   rb_recovery_worker.work);
791         struct rpcrdma_mw *mw;
792
793         spin_lock(&buf->rb_recovery_lock);
794         while (!list_empty(&buf->rb_stale_mrs)) {
795                 mw = list_first_entry(&buf->rb_stale_mrs,
796                                       struct rpcrdma_mw, mw_list);
797                 list_del_init(&mw->mw_list);
798                 spin_unlock(&buf->rb_recovery_lock);
799
800                 dprintk("RPC:       %s: recovering MR %p\n", __func__, mw);
801                 mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
802
803                 spin_lock(&buf->rb_recovery_lock);
804         }
805         spin_unlock(&buf->rb_recovery_lock);
806 }
807
808 void
809 rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
810 {
811         struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
812         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
813
814         spin_lock(&buf->rb_recovery_lock);
815         list_add(&mw->mw_list, &buf->rb_stale_mrs);
816         spin_unlock(&buf->rb_recovery_lock);
817
818         schedule_delayed_work(&buf->rb_recovery_worker, 0);
819 }
820
821 static void
822 rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt)
823 {
824         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
825         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
826         unsigned int count;
827         LIST_HEAD(free);
828         LIST_HEAD(all);
829
830         for (count = 0; count < 32; count++) {
831                 struct rpcrdma_mw *mw;
832                 int rc;
833
834                 mw = kzalloc(sizeof(*mw), GFP_KERNEL);
835                 if (!mw)
836                         break;
837
838                 rc = ia->ri_ops->ro_init_mr(ia, mw);
839                 if (rc) {
840                         kfree(mw);
841                         break;
842                 }
843
844                 mw->mw_xprt = r_xprt;
845
846                 list_add(&mw->mw_list, &free);
847                 list_add(&mw->mw_all, &all);
848         }
849
850         spin_lock(&buf->rb_mwlock);
851         list_splice(&free, &buf->rb_mws);
852         list_splice(&all, &buf->rb_all);
853         r_xprt->rx_stats.mrs_allocated += count;
854         spin_unlock(&buf->rb_mwlock);
855
856         dprintk("RPC:       %s: created %u MRs\n", __func__, count);
857 }
858
859 static void
860 rpcrdma_mr_refresh_worker(struct work_struct *work)
861 {
862         struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
863                                                   rb_refresh_worker.work);
864         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
865                                                    rx_buf);
866
867         rpcrdma_create_mrs(r_xprt);
868 }
869
870 struct rpcrdma_req *
871 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
872 {
873         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
874         struct rpcrdma_req *req;
875
876         req = kzalloc(sizeof(*req), GFP_KERNEL);
877         if (req == NULL)
878                 return ERR_PTR(-ENOMEM);
879
880         INIT_LIST_HEAD(&req->rl_free);
881         spin_lock(&buffer->rb_reqslock);
882         list_add(&req->rl_all, &buffer->rb_allreqs);
883         spin_unlock(&buffer->rb_reqslock);
884         req->rl_cqe.done = rpcrdma_wc_send;
885         req->rl_buffer = &r_xprt->rx_buf;
886         INIT_LIST_HEAD(&req->rl_registered);
887         req->rl_send_wr.next = NULL;
888         req->rl_send_wr.wr_cqe = &req->rl_cqe;
889         req->rl_send_wr.sg_list = req->rl_send_sge;
890         req->rl_send_wr.opcode = IB_WR_SEND;
891         return req;
892 }
893
894 /**
895  * rpcrdma_create_rep - Allocate an rpcrdma_rep object
896  * @r_xprt: controlling transport
897  *
898  * Returns 0 on success or a negative errno on failure.
899  */
900 int
901  rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
902 {
903         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
904         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
905         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
906         struct rpcrdma_rep *rep;
907         int rc;
908
909         rc = -ENOMEM;
910         rep = kzalloc(sizeof(*rep), GFP_KERNEL);
911         if (rep == NULL)
912                 goto out;
913
914         rep->rr_rdmabuf = rpcrdma_alloc_regbuf(cdata->inline_rsize,
915                                                DMA_FROM_DEVICE, GFP_KERNEL);
916         if (IS_ERR(rep->rr_rdmabuf)) {
917                 rc = PTR_ERR(rep->rr_rdmabuf);
918                 goto out_free;
919         }
920
921         rep->rr_device = ia->ri_device;
922         rep->rr_cqe.done = rpcrdma_wc_receive;
923         rep->rr_rxprt = r_xprt;
924         INIT_WORK(&rep->rr_work, rpcrdma_reply_handler);
925         rep->rr_recv_wr.next = NULL;
926         rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
927         rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
928         rep->rr_recv_wr.num_sge = 1;
929
930         spin_lock(&buf->rb_lock);
931         list_add(&rep->rr_list, &buf->rb_recv_bufs);
932         spin_unlock(&buf->rb_lock);
933         return 0;
934
935 out_free:
936         kfree(rep);
937 out:
938         dprintk("RPC:       %s: reply buffer %d alloc failed\n",
939                 __func__, rc);
940         return rc;
941 }
942
943 int
944 rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
945 {
946         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
947         int i, rc;
948
949         buf->rb_max_requests = r_xprt->rx_data.max_requests;
950         buf->rb_bc_srv_max_requests = 0;
951         atomic_set(&buf->rb_credits, 1);
952         spin_lock_init(&buf->rb_mwlock);
953         spin_lock_init(&buf->rb_lock);
954         spin_lock_init(&buf->rb_recovery_lock);
955         INIT_LIST_HEAD(&buf->rb_mws);
956         INIT_LIST_HEAD(&buf->rb_all);
957         INIT_LIST_HEAD(&buf->rb_stale_mrs);
958         INIT_DELAYED_WORK(&buf->rb_refresh_worker,
959                           rpcrdma_mr_refresh_worker);
960         INIT_DELAYED_WORK(&buf->rb_recovery_worker,
961                           rpcrdma_mr_recovery_worker);
962
963         rpcrdma_create_mrs(r_xprt);
964
965         INIT_LIST_HEAD(&buf->rb_send_bufs);
966         INIT_LIST_HEAD(&buf->rb_allreqs);
967         spin_lock_init(&buf->rb_reqslock);
968         for (i = 0; i < buf->rb_max_requests; i++) {
969                 struct rpcrdma_req *req;
970
971                 req = rpcrdma_create_req(r_xprt);
972                 if (IS_ERR(req)) {
973                         dprintk("RPC:       %s: request buffer %d alloc"
974                                 " failed\n", __func__, i);
975                         rc = PTR_ERR(req);
976                         goto out;
977                 }
978                 req->rl_backchannel = false;
979                 list_add(&req->rl_free, &buf->rb_send_bufs);
980         }
981
982         INIT_LIST_HEAD(&buf->rb_recv_bufs);
983         for (i = 0; i <= buf->rb_max_requests; i++) {
984                 rc = rpcrdma_create_rep(r_xprt);
985                 if (rc)
986                         goto out;
987         }
988
989         return 0;
990 out:
991         rpcrdma_buffer_destroy(buf);
992         return rc;
993 }
994
995 static struct rpcrdma_req *
996 rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
997 {
998         struct rpcrdma_req *req;
999
1000         req = list_first_entry(&buf->rb_send_bufs,
1001                                struct rpcrdma_req, rl_free);
1002         list_del(&req->rl_free);
1003         return req;
1004 }
1005
1006 static struct rpcrdma_rep *
1007 rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
1008 {
1009         struct rpcrdma_rep *rep;
1010
1011         rep = list_first_entry(&buf->rb_recv_bufs,
1012                                struct rpcrdma_rep, rr_list);
1013         list_del(&rep->rr_list);
1014         return rep;
1015 }
1016
1017 static void
1018 rpcrdma_destroy_rep(struct rpcrdma_rep *rep)
1019 {
1020         rpcrdma_free_regbuf(rep->rr_rdmabuf);
1021         kfree(rep);
1022 }
1023
1024 void
1025 rpcrdma_destroy_req(struct rpcrdma_req *req)
1026 {
1027         rpcrdma_free_regbuf(req->rl_recvbuf);
1028         rpcrdma_free_regbuf(req->rl_sendbuf);
1029         rpcrdma_free_regbuf(req->rl_rdmabuf);
1030         kfree(req);
1031 }
1032
1033 static void
1034 rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
1035 {
1036         struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
1037                                                    rx_buf);
1038         struct rpcrdma_ia *ia = rdmab_to_ia(buf);
1039         struct rpcrdma_mw *mw;
1040         unsigned int count;
1041
1042         count = 0;
1043         spin_lock(&buf->rb_mwlock);
1044         while (!list_empty(&buf->rb_all)) {
1045                 mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
1046                 list_del(&mw->mw_all);
1047
1048                 spin_unlock(&buf->rb_mwlock);
1049                 ia->ri_ops->ro_release_mr(mw);
1050                 count++;
1051                 spin_lock(&buf->rb_mwlock);
1052         }
1053         spin_unlock(&buf->rb_mwlock);
1054         r_xprt->rx_stats.mrs_allocated = 0;
1055
1056         dprintk("RPC:       %s: released %u MRs\n", __func__, count);
1057 }
1058
1059 void
1060 rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
1061 {
1062         cancel_delayed_work_sync(&buf->rb_recovery_worker);
1063         cancel_delayed_work_sync(&buf->rb_refresh_worker);
1064
1065         while (!list_empty(&buf->rb_recv_bufs)) {
1066                 struct rpcrdma_rep *rep;
1067
1068                 rep = rpcrdma_buffer_get_rep_locked(buf);
1069                 rpcrdma_destroy_rep(rep);
1070         }
1071         buf->rb_send_count = 0;
1072
1073         spin_lock(&buf->rb_reqslock);
1074         while (!list_empty(&buf->rb_allreqs)) {
1075                 struct rpcrdma_req *req;
1076
1077                 req = list_first_entry(&buf->rb_allreqs,
1078                                        struct rpcrdma_req, rl_all);
1079                 list_del(&req->rl_all);
1080
1081                 spin_unlock(&buf->rb_reqslock);
1082                 rpcrdma_destroy_req(req);
1083                 spin_lock(&buf->rb_reqslock);
1084         }
1085         spin_unlock(&buf->rb_reqslock);
1086         buf->rb_recv_count = 0;
1087
1088         rpcrdma_destroy_mrs(buf);
1089 }
1090
1091 struct rpcrdma_mw *
1092 rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
1093 {
1094         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1095         struct rpcrdma_mw *mw = NULL;
1096
1097         spin_lock(&buf->rb_mwlock);
1098         if (!list_empty(&buf->rb_mws)) {
1099                 mw = list_first_entry(&buf->rb_mws,
1100                                       struct rpcrdma_mw, mw_list);
1101                 list_del_init(&mw->mw_list);
1102         }
1103         spin_unlock(&buf->rb_mwlock);
1104
1105         if (!mw)
1106                 goto out_nomws;
1107         return mw;
1108
1109 out_nomws:
1110         dprintk("RPC:       %s: no MWs available\n", __func__);
1111         schedule_delayed_work(&buf->rb_refresh_worker, 0);
1112
1113         /* Allow the reply handler and refresh worker to run */
1114         cond_resched();
1115
1116         return NULL;
1117 }
1118
1119 void
1120 rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
1121 {
1122         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1123
1124         spin_lock(&buf->rb_mwlock);
1125         list_add_tail(&mw->mw_list, &buf->rb_mws);
1126         spin_unlock(&buf->rb_mwlock);
1127 }
1128
1129 static struct rpcrdma_rep *
1130 rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
1131 {
1132         /* If an RPC previously completed without a reply (say, a
1133          * credential problem or a soft timeout occurs) then hold off
1134          * on supplying more Receive buffers until the number of new
1135          * pending RPCs catches up to the number of posted Receives.
1136          */
1137         if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
1138                 return NULL;
1139
1140         if (unlikely(list_empty(&buffers->rb_recv_bufs)))
1141                 return NULL;
1142         buffers->rb_recv_count++;
1143         return rpcrdma_buffer_get_rep_locked(buffers);
1144 }
1145
1146 /*
1147  * Get a set of request/reply buffers.
1148  *
1149  * Reply buffer (if available) is attached to send buffer upon return.
1150  */
1151 struct rpcrdma_req *
1152 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
1153 {
1154         struct rpcrdma_req *req;
1155
1156         spin_lock(&buffers->rb_lock);
1157         if (list_empty(&buffers->rb_send_bufs))
1158                 goto out_reqbuf;
1159         buffers->rb_send_count++;
1160         req = rpcrdma_buffer_get_req_locked(buffers);
1161         req->rl_reply = rpcrdma_buffer_get_rep(buffers);
1162         spin_unlock(&buffers->rb_lock);
1163         return req;
1164
1165 out_reqbuf:
1166         spin_unlock(&buffers->rb_lock);
1167         pr_warn("RPC:       %s: out of request buffers\n", __func__);
1168         return NULL;
1169 }
1170
1171 /*
1172  * Put request/reply buffers back into pool.
1173  * Pre-decrement counter/array index.
1174  */
1175 void
1176 rpcrdma_buffer_put(struct rpcrdma_req *req)
1177 {
1178         struct rpcrdma_buffer *buffers = req->rl_buffer;
1179         struct rpcrdma_rep *rep = req->rl_reply;
1180
1181         req->rl_send_wr.num_sge = 0;
1182         req->rl_reply = NULL;
1183
1184         spin_lock(&buffers->rb_lock);
1185         buffers->rb_send_count--;
1186         list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
1187         if (rep) {
1188                 buffers->rb_recv_count--;
1189                 list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1190         }
1191         spin_unlock(&buffers->rb_lock);
1192 }
1193
1194 /*
1195  * Recover reply buffers from pool.
1196  * This happens when recovering from disconnect.
1197  */
1198 void
1199 rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
1200 {
1201         struct rpcrdma_buffer *buffers = req->rl_buffer;
1202
1203         spin_lock(&buffers->rb_lock);
1204         req->rl_reply = rpcrdma_buffer_get_rep(buffers);
1205         spin_unlock(&buffers->rb_lock);
1206 }
1207
1208 /*
1209  * Put reply buffers back into pool when not attached to
1210  * request. This happens in error conditions.
1211  */
1212 void
1213 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
1214 {
1215         struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
1216
1217         spin_lock(&buffers->rb_lock);
1218         buffers->rb_recv_count--;
1219         list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
1220         spin_unlock(&buffers->rb_lock);
1221 }
1222
1223 /**
1224  * rpcrdma_alloc_regbuf - allocate and DMA-map memory for SEND/RECV buffers
1225  * @size: size of buffer to be allocated, in bytes
1226  * @direction: direction of data movement
1227  * @flags: GFP flags
1228  *
1229  * Returns an ERR_PTR, or a pointer to a regbuf, a buffer that
1230  * can be persistently DMA-mapped for I/O.
1231  *
1232  * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
1233  * receiving the payload of RDMA RECV operations. During Long Calls
1234  * or Replies they may be registered externally via ro_map.
1235  */
1236 struct rpcrdma_regbuf *
1237 rpcrdma_alloc_regbuf(size_t size, enum dma_data_direction direction,
1238                      gfp_t flags)
1239 {
1240         struct rpcrdma_regbuf *rb;
1241
1242         rb = kmalloc(sizeof(*rb) + size, flags);
1243         if (rb == NULL)
1244                 return ERR_PTR(-ENOMEM);
1245
1246         rb->rg_device = NULL;
1247         rb->rg_direction = direction;
1248         rb->rg_iov.length = size;
1249
1250         return rb;
1251 }
1252
1253 /**
1254  * __rpcrdma_map_regbuf - DMA-map a regbuf
1255  * @ia: controlling rpcrdma_ia
1256  * @rb: regbuf to be mapped
1257  */
1258 bool
1259 __rpcrdma_dma_map_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
1260 {
1261         if (rb->rg_direction == DMA_NONE)
1262                 return false;
1263
1264         rb->rg_iov.addr = ib_dma_map_single(ia->ri_device,
1265                                             (void *)rb->rg_base,
1266                                             rdmab_length(rb),
1267                                             rb->rg_direction);
1268         if (ib_dma_mapping_error(ia->ri_device, rdmab_addr(rb)))
1269                 return false;
1270
1271         rb->rg_device = ia->ri_device;
1272         rb->rg_iov.lkey = ia->ri_pd->local_dma_lkey;
1273         return true;
1274 }
1275
1276 static void
1277 rpcrdma_dma_unmap_regbuf(struct rpcrdma_regbuf *rb)
1278 {
1279         if (!rpcrdma_regbuf_is_mapped(rb))
1280                 return;
1281
1282         ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb),
1283                             rdmab_length(rb), rb->rg_direction);
1284         rb->rg_device = NULL;
1285 }
1286
1287 /**
1288  * rpcrdma_free_regbuf - deregister and free registered buffer
1289  * @rb: regbuf to be deregistered and freed
1290  */
1291 void
1292 rpcrdma_free_regbuf(struct rpcrdma_regbuf *rb)
1293 {
1294         if (!rb)
1295                 return;
1296
1297         rpcrdma_dma_unmap_regbuf(rb);
1298         kfree(rb);
1299 }
1300
1301 /*
1302  * Prepost any receive buffer, then post send.
1303  *
1304  * Receive buffer is donated to hardware, reclaimed upon recv completion.
1305  */
1306 int
1307 rpcrdma_ep_post(struct rpcrdma_ia *ia,
1308                 struct rpcrdma_ep *ep,
1309                 struct rpcrdma_req *req)
1310 {
1311         struct ib_send_wr *send_wr = &req->rl_send_wr;
1312         struct ib_send_wr *send_wr_fail;
1313         int rc;
1314
1315         if (req->rl_reply) {
1316                 rc = rpcrdma_ep_post_recv(ia, req->rl_reply);
1317                 if (rc)
1318                         return rc;
1319                 req->rl_reply = NULL;
1320         }
1321
1322         dprintk("RPC:       %s: posting %d s/g entries\n",
1323                 __func__, send_wr->num_sge);
1324
1325         rpcrdma_set_signaled(ep, send_wr);
1326         rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
1327         if (rc)
1328                 goto out_postsend_err;
1329         return 0;
1330
1331 out_postsend_err:
1332         pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc);
1333         return -ENOTCONN;
1334 }
1335
1336 int
1337 rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
1338                      struct rpcrdma_rep *rep)
1339 {
1340         struct ib_recv_wr *recv_wr_fail;
1341         int rc;
1342
1343         if (!rpcrdma_dma_map_regbuf(ia, rep->rr_rdmabuf))
1344                 goto out_map;
1345         rc = ib_post_recv(ia->ri_id->qp, &rep->rr_recv_wr, &recv_wr_fail);
1346         if (rc)
1347                 goto out_postrecv;
1348         return 0;
1349
1350 out_map:
1351         pr_err("rpcrdma: failed to DMA map the Receive buffer\n");
1352         return -EIO;
1353
1354 out_postrecv:
1355         pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
1356         return -ENOTCONN;
1357 }
1358
1359 /**
1360  * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
1361  * @r_xprt: transport associated with these backchannel resources
1362  * @min_reqs: minimum number of incoming requests expected
1363  *
1364  * Returns zero if all requested buffers were posted, or a negative errno.
1365  */
1366 int
1367 rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
1368 {
1369         struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
1370         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
1371         struct rpcrdma_rep *rep;
1372         int rc;
1373
1374         while (count--) {
1375                 spin_lock(&buffers->rb_lock);
1376                 if (list_empty(&buffers->rb_recv_bufs))
1377                         goto out_reqbuf;
1378                 rep = rpcrdma_buffer_get_rep_locked(buffers);
1379                 spin_unlock(&buffers->rb_lock);
1380
1381                 rc = rpcrdma_ep_post_recv(ia, rep);
1382                 if (rc)
1383                         goto out_rc;
1384         }
1385
1386         return 0;
1387
1388 out_reqbuf:
1389         spin_unlock(&buffers->rb_lock);
1390         pr_warn("%s: no extra receive buffers\n", __func__);
1391         return -ENOMEM;
1392
1393 out_rc:
1394         rpcrdma_recv_buffer_put(rep);
1395         return rc;
1396 }