GNU Linux-libre 4.19.286-gnu1
[releases.git] / net / sunrpc / xprtrdma / rpc_rdma.c
1 // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
2 /*
3  * Copyright (c) 2014-2017 Oracle.  All rights reserved.
4  * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the BSD-type
10  * license below:
11  *
12  * Redistribution and use in source and binary forms, with or without
13  * modification, are permitted provided that the following conditions
14  * are met:
15  *
16  *      Redistributions of source code must retain the above copyright
17  *      notice, this list of conditions and the following disclaimer.
18  *
19  *      Redistributions in binary form must reproduce the above
20  *      copyright notice, this list of conditions and the following
21  *      disclaimer in the documentation and/or other materials provided
22  *      with the distribution.
23  *
24  *      Neither the name of the Network Appliance, Inc. nor the names of
25  *      its contributors may be used to endorse or promote products
26  *      derived from this software without specific prior written
27  *      permission.
28  *
29  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
30  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
31  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
32  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
33  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
34  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
35  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
36  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
37  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
38  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
39  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40  */
41
42 /*
43  * rpc_rdma.c
44  *
45  * This file contains the guts of the RPC RDMA protocol, and
46  * does marshaling/unmarshaling, etc. It is also where interfacing
47  * to the Linux RPC framework lives.
48  */
49
50 #include <linux/highmem.h>
51
52 #include <linux/sunrpc/svc_rdma.h>
53
54 #include "xprt_rdma.h"
55 #include <trace/events/rpcrdma.h>
56
57 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
58 # define RPCDBG_FACILITY        RPCDBG_TRANS
59 #endif
60
61 /* Returns size of largest RPC-over-RDMA header in a Call message
62  *
63  * The largest Call header contains a full-size Read list and a
64  * minimal Reply chunk.
65  */
66 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs)
67 {
68         unsigned int size;
69
70         /* Fixed header fields and list discriminators */
71         size = RPCRDMA_HDRLEN_MIN;
72
73         /* Maximum Read list size */
74         maxsegs += 2;   /* segment for head and tail buffers */
75         size += maxsegs * rpcrdma_readchunk_maxsz * sizeof(__be32);
76
77         /* Minimal Read chunk size */
78         size += sizeof(__be32); /* segment count */
79         size += rpcrdma_segment_maxsz * sizeof(__be32);
80         size += sizeof(__be32); /* list discriminator */
81
82         dprintk("RPC:       %s: max call header size = %u\n",
83                 __func__, size);
84         return size;
85 }
86
87 /* Returns size of largest RPC-over-RDMA header in a Reply message
88  *
89  * There is only one Write list or one Reply chunk per Reply
90  * message.  The larger list is the Write list.
91  */
92 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs)
93 {
94         unsigned int size;
95
96         /* Fixed header fields and list discriminators */
97         size = RPCRDMA_HDRLEN_MIN;
98
99         /* Maximum Write list size */
100         maxsegs += 2;   /* segment for head and tail buffers */
101         size += sizeof(__be32);         /* segment count */
102         size += maxsegs * rpcrdma_segment_maxsz * sizeof(__be32);
103         size += sizeof(__be32); /* list discriminator */
104
105         dprintk("RPC:       %s: max reply header size = %u\n",
106                 __func__, size);
107         return size;
108 }
109
110 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *r_xprt)
111 {
112         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
113         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
114         unsigned int maxsegs = ia->ri_max_segs;
115
116         ia->ri_max_inline_write = cdata->inline_wsize -
117                                   rpcrdma_max_call_header_size(maxsegs);
118         ia->ri_max_inline_read = cdata->inline_rsize -
119                                  rpcrdma_max_reply_header_size(maxsegs);
120 }
121
122 /* The client can send a request inline as long as the RPCRDMA header
123  * plus the RPC call fit under the transport's inline limit. If the
124  * combined call message size exceeds that limit, the client must use
125  * a Read chunk for this operation.
126  *
127  * A Read chunk is also required if sending the RPC call inline would
128  * exceed this device's max_sge limit.
129  */
130 static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt,
131                                 struct rpc_rqst *rqst)
132 {
133         struct xdr_buf *xdr = &rqst->rq_snd_buf;
134         unsigned int count, remaining, offset;
135
136         if (xdr->len > r_xprt->rx_ia.ri_max_inline_write)
137                 return false;
138
139         if (xdr->page_len) {
140                 remaining = xdr->page_len;
141                 offset = offset_in_page(xdr->page_base);
142                 count = RPCRDMA_MIN_SEND_SGES;
143                 while (remaining) {
144                         remaining -= min_t(unsigned int,
145                                            PAGE_SIZE - offset, remaining);
146                         offset = 0;
147                         if (++count > r_xprt->rx_ia.ri_max_send_sges)
148                                 return false;
149                 }
150         }
151
152         return true;
153 }
154
155 /* The client can't know how large the actual reply will be. Thus it
156  * plans for the largest possible reply for that particular ULP
157  * operation. If the maximum combined reply message size exceeds that
158  * limit, the client must provide a write list or a reply chunk for
159  * this request.
160  */
161 static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
162                                    struct rpc_rqst *rqst)
163 {
164         struct rpcrdma_ia *ia = &r_xprt->rx_ia;
165
166         return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
167 }
168
169 /* Split @vec on page boundaries into SGEs. FMR registers pages, not
170  * a byte range. Other modes coalesce these SGEs into a single MR
171  * when they can.
172  *
173  * Returns pointer to next available SGE, and bumps the total number
174  * of SGEs consumed.
175  */
176 static struct rpcrdma_mr_seg *
177 rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg,
178                      unsigned int *n)
179 {
180         u32 remaining, page_offset;
181         char *base;
182
183         base = vec->iov_base;
184         page_offset = offset_in_page(base);
185         remaining = vec->iov_len;
186         while (remaining) {
187                 seg->mr_page = NULL;
188                 seg->mr_offset = base;
189                 seg->mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining);
190                 remaining -= seg->mr_len;
191                 base += seg->mr_len;
192                 ++seg;
193                 ++(*n);
194                 page_offset = 0;
195         }
196         return seg;
197 }
198
199 /* Convert @xdrbuf into SGEs no larger than a page each. As they
200  * are registered, these SGEs are then coalesced into RDMA segments
201  * when the selected memreg mode supports it.
202  *
203  * Returns positive number of SGEs consumed, or a negative errno.
204  */
205
206 static int
207 rpcrdma_convert_iovs(struct rpcrdma_xprt *r_xprt, struct xdr_buf *xdrbuf,
208                      unsigned int pos, enum rpcrdma_chunktype type,
209                      struct rpcrdma_mr_seg *seg)
210 {
211         unsigned long page_base;
212         unsigned int len, n;
213         struct page **ppages;
214
215         n = 0;
216         if (pos == 0)
217                 seg = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, &n);
218
219         len = xdrbuf->page_len;
220         ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
221         page_base = offset_in_page(xdrbuf->page_base);
222         while (len) {
223                 if (unlikely(!*ppages)) {
224                         /* XXX: Certain upper layer operations do
225                          *      not provide receive buffer pages.
226                          */
227                         *ppages = alloc_page(GFP_ATOMIC);
228                         if (!*ppages)
229                                 return -ENOBUFS;
230                 }
231                 seg->mr_page = *ppages;
232                 seg->mr_offset = (char *)page_base;
233                 seg->mr_len = min_t(u32, PAGE_SIZE - page_base, len);
234                 len -= seg->mr_len;
235                 ++ppages;
236                 ++seg;
237                 ++n;
238                 page_base = 0;
239         }
240
241         /* When encoding a Read chunk, the tail iovec contains an
242          * XDR pad and may be omitted.
243          */
244         if (type == rpcrdma_readch && r_xprt->rx_ia.ri_implicit_roundup)
245                 goto out;
246
247         /* When encoding a Write chunk, some servers need to see an
248          * extra segment for non-XDR-aligned Write chunks. The upper
249          * layer provides space in the tail iovec that may be used
250          * for this purpose.
251          */
252         if (type == rpcrdma_writech && r_xprt->rx_ia.ri_implicit_roundup)
253                 goto out;
254
255         if (xdrbuf->tail[0].iov_len)
256                 seg = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, &n);
257
258 out:
259         if (unlikely(n > RPCRDMA_MAX_SEGS))
260                 return -EIO;
261         return n;
262 }
263
264 static inline int
265 encode_item_present(struct xdr_stream *xdr)
266 {
267         __be32 *p;
268
269         p = xdr_reserve_space(xdr, sizeof(*p));
270         if (unlikely(!p))
271                 return -EMSGSIZE;
272
273         *p = xdr_one;
274         return 0;
275 }
276
277 static inline int
278 encode_item_not_present(struct xdr_stream *xdr)
279 {
280         __be32 *p;
281
282         p = xdr_reserve_space(xdr, sizeof(*p));
283         if (unlikely(!p))
284                 return -EMSGSIZE;
285
286         *p = xdr_zero;
287         return 0;
288 }
289
290 static void
291 xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr *mr)
292 {
293         *iptr++ = cpu_to_be32(mr->mr_handle);
294         *iptr++ = cpu_to_be32(mr->mr_length);
295         xdr_encode_hyper(iptr, mr->mr_offset);
296 }
297
298 static int
299 encode_rdma_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr)
300 {
301         __be32 *p;
302
303         p = xdr_reserve_space(xdr, 4 * sizeof(*p));
304         if (unlikely(!p))
305                 return -EMSGSIZE;
306
307         xdr_encode_rdma_segment(p, mr);
308         return 0;
309 }
310
311 static int
312 encode_read_segment(struct xdr_stream *xdr, struct rpcrdma_mr *mr,
313                     u32 position)
314 {
315         __be32 *p;
316
317         p = xdr_reserve_space(xdr, 6 * sizeof(*p));
318         if (unlikely(!p))
319                 return -EMSGSIZE;
320
321         *p++ = xdr_one;                 /* Item present */
322         *p++ = cpu_to_be32(position);
323         xdr_encode_rdma_segment(p, mr);
324         return 0;
325 }
326
327 /* Register and XDR encode the Read list. Supports encoding a list of read
328  * segments that belong to a single read chunk.
329  *
330  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
331  *
332  *  Read chunklist (a linked list):
333  *   N elements, position P (same P for all chunks of same arg!):
334  *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
335  *
336  * Returns zero on success, or a negative errno if a failure occurred.
337  * @xdr is advanced to the next position in the stream.
338  *
339  * Only a single @pos value is currently supported.
340  */
341 static noinline int
342 rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
343                          struct rpc_rqst *rqst, enum rpcrdma_chunktype rtype)
344 {
345         struct xdr_stream *xdr = &req->rl_stream;
346         struct rpcrdma_mr_seg *seg;
347         struct rpcrdma_mr *mr;
348         unsigned int pos;
349         int nsegs;
350
351         pos = rqst->rq_snd_buf.head[0].iov_len;
352         if (rtype == rpcrdma_areadch)
353                 pos = 0;
354         seg = req->rl_segments;
355         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_snd_buf, pos,
356                                      rtype, seg);
357         if (nsegs < 0)
358                 return nsegs;
359
360         do {
361                 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
362                                                    false, &mr);
363                 if (IS_ERR(seg))
364                         return PTR_ERR(seg);
365                 rpcrdma_mr_push(mr, &req->rl_registered);
366
367                 if (encode_read_segment(xdr, mr, pos) < 0)
368                         return -EMSGSIZE;
369
370                 trace_xprtrdma_read_chunk(rqst->rq_task, pos, mr, nsegs);
371                 r_xprt->rx_stats.read_chunk_count++;
372                 nsegs -= mr->mr_nents;
373         } while (nsegs);
374
375         return 0;
376 }
377
378 /* Register and XDR encode the Write list. Supports encoding a list
379  * containing one array of plain segments that belong to a single
380  * write chunk.
381  *
382  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
383  *
384  *  Write chunklist (a list of (one) counted array):
385  *   N elements:
386  *    1 - N - HLOO - HLOO - ... - HLOO - 0
387  *
388  * Returns zero on success, or a negative errno if a failure occurred.
389  * @xdr is advanced to the next position in the stream.
390  *
391  * Only a single Write chunk is currently supported.
392  */
393 static noinline int
394 rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
395                           struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
396 {
397         struct xdr_stream *xdr = &req->rl_stream;
398         struct rpcrdma_mr_seg *seg;
399         struct rpcrdma_mr *mr;
400         int nsegs, nchunks;
401         __be32 *segcount;
402
403         seg = req->rl_segments;
404         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf,
405                                      rqst->rq_rcv_buf.head[0].iov_len,
406                                      wtype, seg);
407         if (nsegs < 0)
408                 return nsegs;
409
410         if (encode_item_present(xdr) < 0)
411                 return -EMSGSIZE;
412         segcount = xdr_reserve_space(xdr, sizeof(*segcount));
413         if (unlikely(!segcount))
414                 return -EMSGSIZE;
415         /* Actual value encoded below */
416
417         nchunks = 0;
418         do {
419                 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
420                                                    true, &mr);
421                 if (IS_ERR(seg))
422                         return PTR_ERR(seg);
423                 rpcrdma_mr_push(mr, &req->rl_registered);
424
425                 if (encode_rdma_segment(xdr, mr) < 0)
426                         return -EMSGSIZE;
427
428                 trace_xprtrdma_write_chunk(rqst->rq_task, mr, nsegs);
429                 r_xprt->rx_stats.write_chunk_count++;
430                 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
431                 nchunks++;
432                 nsegs -= mr->mr_nents;
433         } while (nsegs);
434
435         /* Update count of segments in this Write chunk */
436         *segcount = cpu_to_be32(nchunks);
437
438         return 0;
439 }
440
441 /* Register and XDR encode the Reply chunk. Supports encoding an array
442  * of plain segments that belong to a single write (reply) chunk.
443  *
444  * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
445  *
446  *  Reply chunk (a counted array):
447  *   N elements:
448  *    1 - N - HLOO - HLOO - ... - HLOO
449  *
450  * Returns zero on success, or a negative errno if a failure occurred.
451  * @xdr is advanced to the next position in the stream.
452  */
453 static noinline int
454 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
455                            struct rpc_rqst *rqst, enum rpcrdma_chunktype wtype)
456 {
457         struct xdr_stream *xdr = &req->rl_stream;
458         struct rpcrdma_mr_seg *seg;
459         struct rpcrdma_mr *mr;
460         int nsegs, nchunks;
461         __be32 *segcount;
462
463         seg = req->rl_segments;
464         nsegs = rpcrdma_convert_iovs(r_xprt, &rqst->rq_rcv_buf, 0, wtype, seg);
465         if (nsegs < 0)
466                 return nsegs;
467
468         if (encode_item_present(xdr) < 0)
469                 return -EMSGSIZE;
470         segcount = xdr_reserve_space(xdr, sizeof(*segcount));
471         if (unlikely(!segcount))
472                 return -EMSGSIZE;
473         /* Actual value encoded below */
474
475         nchunks = 0;
476         do {
477                 seg = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs,
478                                                    true, &mr);
479                 if (IS_ERR(seg))
480                         return PTR_ERR(seg);
481                 rpcrdma_mr_push(mr, &req->rl_registered);
482
483                 if (encode_rdma_segment(xdr, mr) < 0)
484                         return -EMSGSIZE;
485
486                 trace_xprtrdma_reply_chunk(rqst->rq_task, mr, nsegs);
487                 r_xprt->rx_stats.reply_chunk_count++;
488                 r_xprt->rx_stats.total_rdma_request += mr->mr_length;
489                 nchunks++;
490                 nsegs -= mr->mr_nents;
491         } while (nsegs);
492
493         /* Update count of segments in the Reply chunk */
494         *segcount = cpu_to_be32(nchunks);
495
496         return 0;
497 }
498
499 /**
500  * rpcrdma_unmap_sendctx - DMA-unmap Send buffers
501  * @sc: sendctx containing SGEs to unmap
502  *
503  */
504 void
505 rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
506 {
507         struct rpcrdma_ia *ia = &sc->sc_xprt->rx_ia;
508         struct ib_sge *sge;
509         unsigned int count;
510
511         /* The first two SGEs contain the transport header and
512          * the inline buffer. These are always left mapped so
513          * they can be cheaply re-used.
514          */
515         sge = &sc->sc_sges[2];
516         for (count = sc->sc_unmap_count; count; ++sge, --count)
517                 ib_dma_unmap_page(ia->ri_device,
518                                   sge->addr, sge->length, DMA_TO_DEVICE);
519
520         if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
521                 smp_mb__after_atomic();
522                 wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
523         }
524 }
525
526 /* Prepare an SGE for the RPC-over-RDMA transport header.
527  */
528 static bool
529 rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
530                         u32 len)
531 {
532         struct rpcrdma_sendctx *sc = req->rl_sendctx;
533         struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
534         struct ib_sge *sge = sc->sc_sges;
535
536         if (!rpcrdma_dma_map_regbuf(ia, rb))
537                 goto out_regbuf;
538         sge->addr = rdmab_addr(rb);
539         sge->length = len;
540         sge->lkey = rdmab_lkey(rb);
541
542         ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr,
543                                       sge->length, DMA_TO_DEVICE);
544         sc->sc_wr.num_sge++;
545         return true;
546
547 out_regbuf:
548         pr_err("rpcrdma: failed to DMA map a Send buffer\n");
549         return false;
550 }
551
552 /* Prepare the Send SGEs. The head and tail iovec, and each entry
553  * in the page list, gets its own SGE.
554  */
555 static bool
556 rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
557                          struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
558 {
559         struct rpcrdma_sendctx *sc = req->rl_sendctx;
560         unsigned int sge_no, page_base, len, remaining;
561         struct rpcrdma_regbuf *rb = req->rl_sendbuf;
562         struct ib_device *device = ia->ri_device;
563         struct ib_sge *sge = sc->sc_sges;
564         u32 lkey = ia->ri_pd->local_dma_lkey;
565         struct page *page, **ppages;
566
567         /* The head iovec is straightforward, as it is already
568          * DMA-mapped. Sync the content that has changed.
569          */
570         if (!rpcrdma_dma_map_regbuf(ia, rb))
571                 goto out_regbuf;
572         sge_no = 1;
573         sge[sge_no].addr = rdmab_addr(rb);
574         sge[sge_no].length = xdr->head[0].iov_len;
575         sge[sge_no].lkey = rdmab_lkey(rb);
576         ib_dma_sync_single_for_device(rdmab_device(rb), sge[sge_no].addr,
577                                       sge[sge_no].length, DMA_TO_DEVICE);
578
579         /* If there is a Read chunk, the page list is being handled
580          * via explicit RDMA, and thus is skipped here. However, the
581          * tail iovec may include an XDR pad for the page list, as
582          * well as additional content, and may not reside in the
583          * same page as the head iovec.
584          */
585         if (rtype == rpcrdma_readch) {
586                 len = xdr->tail[0].iov_len;
587
588                 /* Do not include the tail if it is only an XDR pad */
589                 if (len < 4)
590                         goto out;
591
592                 page = virt_to_page(xdr->tail[0].iov_base);
593                 page_base = offset_in_page(xdr->tail[0].iov_base);
594
595                 /* If the content in the page list is an odd length,
596                  * xdr_write_pages() has added a pad at the beginning
597                  * of the tail iovec. Force the tail's non-pad content
598                  * to land at the next XDR position in the Send message.
599                  */
600                 page_base += len & 3;
601                 len -= len & 3;
602                 goto map_tail;
603         }
604
605         /* If there is a page list present, temporarily DMA map
606          * and prepare an SGE for each page to be sent.
607          */
608         if (xdr->page_len) {
609                 ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
610                 page_base = offset_in_page(xdr->page_base);
611                 remaining = xdr->page_len;
612                 while (remaining) {
613                         sge_no++;
614                         if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
615                                 goto out_mapping_overflow;
616
617                         len = min_t(u32, PAGE_SIZE - page_base, remaining);
618                         sge[sge_no].addr = ib_dma_map_page(device, *ppages,
619                                                            page_base, len,
620                                                            DMA_TO_DEVICE);
621                         if (ib_dma_mapping_error(device, sge[sge_no].addr))
622                                 goto out_mapping_err;
623                         sge[sge_no].length = len;
624                         sge[sge_no].lkey = lkey;
625
626                         sc->sc_unmap_count++;
627                         ppages++;
628                         remaining -= len;
629                         page_base = 0;
630                 }
631         }
632
633         /* The tail iovec is not always constructed in the same
634          * page where the head iovec resides (see, for example,
635          * gss_wrap_req_priv). To neatly accommodate that case,
636          * DMA map it separately.
637          */
638         if (xdr->tail[0].iov_len) {
639                 page = virt_to_page(xdr->tail[0].iov_base);
640                 page_base = offset_in_page(xdr->tail[0].iov_base);
641                 len = xdr->tail[0].iov_len;
642
643 map_tail:
644                 sge_no++;
645                 sge[sge_no].addr = ib_dma_map_page(device, page,
646                                                    page_base, len,
647                                                    DMA_TO_DEVICE);
648                 if (ib_dma_mapping_error(device, sge[sge_no].addr))
649                         goto out_mapping_err;
650                 sge[sge_no].length = len;
651                 sge[sge_no].lkey = lkey;
652                 sc->sc_unmap_count++;
653         }
654
655 out:
656         sc->sc_wr.num_sge += sge_no;
657         if (sc->sc_unmap_count)
658                 __set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
659         return true;
660
661 out_regbuf:
662         pr_err("rpcrdma: failed to DMA map a Send buffer\n");
663         return false;
664
665 out_mapping_overflow:
666         rpcrdma_unmap_sendctx(sc);
667         pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
668         return false;
669
670 out_mapping_err:
671         rpcrdma_unmap_sendctx(sc);
672         pr_err("rpcrdma: Send mapping error\n");
673         return false;
674 }
675
676 /**
677  * rpcrdma_prepare_send_sges - Construct SGEs for a Send WR
678  * @r_xprt: controlling transport
679  * @req: context of RPC Call being marshalled
680  * @hdrlen: size of transport header, in bytes
681  * @xdr: xdr_buf containing RPC Call
682  * @rtype: chunk type being encoded
683  *
684  * Returns 0 on success; otherwise a negative errno is returned.
685  */
686 int
687 rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
688                           struct rpcrdma_req *req, u32 hdrlen,
689                           struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
690 {
691         req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
692         if (!req->rl_sendctx)
693                 return -EAGAIN;
694         req->rl_sendctx->sc_wr.num_sge = 0;
695         req->rl_sendctx->sc_unmap_count = 0;
696         req->rl_sendctx->sc_req = req;
697         __clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
698
699         if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
700                 return -EIO;
701
702         if (rtype != rpcrdma_areadch)
703                 if (!rpcrdma_prepare_msg_sges(&r_xprt->rx_ia, req, xdr, rtype))
704                         return -EIO;
705
706         return 0;
707 }
708
709 /**
710  * rpcrdma_marshal_req - Marshal and send one RPC request
711  * @r_xprt: controlling transport
712  * @rqst: RPC request to be marshaled
713  *
714  * For the RPC in "rqst", this function:
715  *  - Chooses the transfer mode (eg., RDMA_MSG or RDMA_NOMSG)
716  *  - Registers Read, Write, and Reply chunks
717  *  - Constructs the transport header
718  *  - Posts a Send WR to send the transport header and request
719  *
720  * Returns:
721  *      %0 if the RPC was sent successfully,
722  *      %-ENOTCONN if the connection was lost,
723  *      %-EAGAIN if the caller should call again with the same arguments,
724  *      %-ENOBUFS if the caller should call again after a delay,
725  *      %-EMSGSIZE if the transport header is too small,
726  *      %-EIO if a permanent problem occurred while marshaling.
727  */
728 int
729 rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst)
730 {
731         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
732         struct xdr_stream *xdr = &req->rl_stream;
733         enum rpcrdma_chunktype rtype, wtype;
734         bool ddp_allowed;
735         __be32 *p;
736         int ret;
737
738         rpcrdma_set_xdrlen(&req->rl_hdrbuf, 0);
739         xdr_init_encode(xdr, &req->rl_hdrbuf,
740                         req->rl_rdmabuf->rg_base);
741
742         /* Fixed header fields */
743         ret = -EMSGSIZE;
744         p = xdr_reserve_space(xdr, 4 * sizeof(*p));
745         if (!p)
746                 goto out_err;
747         *p++ = rqst->rq_xid;
748         *p++ = rpcrdma_version;
749         *p++ = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
750
751         /* When the ULP employs a GSS flavor that guarantees integrity
752          * or privacy, direct data placement of individual data items
753          * is not allowed.
754          */
755         ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags &
756                                                 RPCAUTH_AUTH_DATATOUCH);
757
758         /*
759          * Chunks needed for results?
760          *
761          * o If the expected result is under the inline threshold, all ops
762          *   return as inline.
763          * o Large read ops return data as write chunk(s), header as
764          *   inline.
765          * o Large non-read ops return as a single reply chunk.
766          */
767         if (rpcrdma_results_inline(r_xprt, rqst))
768                 wtype = rpcrdma_noch;
769         else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ)
770                 wtype = rpcrdma_writech;
771         else
772                 wtype = rpcrdma_replych;
773
774         /*
775          * Chunks needed for arguments?
776          *
777          * o If the total request is under the inline threshold, all ops
778          *   are sent as inline.
779          * o Large write ops transmit data as read chunk(s), header as
780          *   inline.
781          * o Large non-write ops are sent with the entire message as a
782          *   single read chunk (protocol 0-position special case).
783          *
784          * This assumes that the upper layer does not present a request
785          * that both has a data payload, and whose non-data arguments
786          * by themselves are larger than the inline threshold.
787          */
788         if (rpcrdma_args_inline(r_xprt, rqst)) {
789                 *p++ = rdma_msg;
790                 rtype = rpcrdma_noch;
791         } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
792                 *p++ = rdma_msg;
793                 rtype = rpcrdma_readch;
794         } else {
795                 r_xprt->rx_stats.nomsg_call_count++;
796                 *p++ = rdma_nomsg;
797                 rtype = rpcrdma_areadch;
798         }
799
800         /* If this is a retransmit, discard previously registered
801          * chunks. Very likely the connection has been replaced,
802          * so these registrations are invalid and unusable.
803          */
804         while (unlikely(!list_empty(&req->rl_registered))) {
805                 struct rpcrdma_mr *mr;
806
807                 mr = rpcrdma_mr_pop(&req->rl_registered);
808                 rpcrdma_mr_defer_recovery(mr);
809         }
810
811         /* This implementation supports the following combinations
812          * of chunk lists in one RPC-over-RDMA Call message:
813          *
814          *   - Read list
815          *   - Write list
816          *   - Reply chunk
817          *   - Read list + Reply chunk
818          *
819          * It might not yet support the following combinations:
820          *
821          *   - Read list + Write list
822          *
823          * It does not support the following combinations:
824          *
825          *   - Write list + Reply chunk
826          *   - Read list + Write list + Reply chunk
827          *
828          * This implementation supports only a single chunk in each
829          * Read or Write list. Thus for example the client cannot
830          * send a Call message with a Position Zero Read chunk and a
831          * regular Read chunk at the same time.
832          */
833         if (rtype != rpcrdma_noch) {
834                 ret = rpcrdma_encode_read_list(r_xprt, req, rqst, rtype);
835                 if (ret)
836                         goto out_err;
837         }
838         ret = encode_item_not_present(xdr);
839         if (ret)
840                 goto out_err;
841
842         if (wtype == rpcrdma_writech) {
843                 ret = rpcrdma_encode_write_list(r_xprt, req, rqst, wtype);
844                 if (ret)
845                         goto out_err;
846         }
847         ret = encode_item_not_present(xdr);
848         if (ret)
849                 goto out_err;
850
851         if (wtype != rpcrdma_replych)
852                 ret = encode_item_not_present(xdr);
853         else
854                 ret = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, wtype);
855         if (ret)
856                 goto out_err;
857
858         trace_xprtrdma_marshal(rqst, xdr_stream_pos(xdr), rtype, wtype);
859
860         ret = rpcrdma_prepare_send_sges(r_xprt, req, xdr_stream_pos(xdr),
861                                         &rqst->rq_snd_buf, rtype);
862         if (ret)
863                 goto out_err;
864         return 0;
865
866 out_err:
867         switch (ret) {
868         case -EAGAIN:
869                 xprt_wait_for_buffer_space(rqst->rq_task, NULL);
870                 break;
871         case -ENOBUFS:
872                 break;
873         default:
874                 r_xprt->rx_stats.failed_marshal_count++;
875         }
876         return ret;
877 }
878
879 /**
880  * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
881  * @rqst: controlling RPC request
882  * @srcp: points to RPC message payload in receive buffer
883  * @copy_len: remaining length of receive buffer content
884  * @pad: Write chunk pad bytes needed (zero for pure inline)
885  *
886  * The upper layer has set the maximum number of bytes it can
887  * receive in each component of rq_rcv_buf. These values are set in
888  * the head.iov_len, page_len, tail.iov_len, and buflen fields.
889  *
890  * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
891  * many cases this function simply updates iov_base pointers in
892  * rq_rcv_buf to point directly to the received reply data, to
893  * avoid copying reply data.
894  *
895  * Returns the count of bytes which had to be memcopied.
896  */
897 static unsigned long
898 rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
899 {
900         unsigned long fixup_copy_count;
901         int i, npages, curlen;
902         char *destp;
903         struct page **ppages;
904         int page_base;
905
906         /* The head iovec is redirected to the RPC reply message
907          * in the receive buffer, to avoid a memcopy.
908          */
909         rqst->rq_rcv_buf.head[0].iov_base = srcp;
910         rqst->rq_private_buf.head[0].iov_base = srcp;
911
912         /* The contents of the receive buffer that follow
913          * head.iov_len bytes are copied into the page list.
914          */
915         curlen = rqst->rq_rcv_buf.head[0].iov_len;
916         if (curlen > copy_len)
917                 curlen = copy_len;
918         trace_xprtrdma_fixup(rqst, copy_len, curlen);
919         srcp += curlen;
920         copy_len -= curlen;
921
922         ppages = rqst->rq_rcv_buf.pages +
923                 (rqst->rq_rcv_buf.page_base >> PAGE_SHIFT);
924         page_base = offset_in_page(rqst->rq_rcv_buf.page_base);
925         fixup_copy_count = 0;
926         if (copy_len && rqst->rq_rcv_buf.page_len) {
927                 int pagelist_len;
928
929                 pagelist_len = rqst->rq_rcv_buf.page_len;
930                 if (pagelist_len > copy_len)
931                         pagelist_len = copy_len;
932                 npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT;
933                 for (i = 0; i < npages; i++) {
934                         curlen = PAGE_SIZE - page_base;
935                         if (curlen > pagelist_len)
936                                 curlen = pagelist_len;
937
938                         trace_xprtrdma_fixup_pg(rqst, i, srcp,
939                                                 copy_len, curlen);
940                         destp = kmap_atomic(ppages[i]);
941                         memcpy(destp + page_base, srcp, curlen);
942                         flush_dcache_page(ppages[i]);
943                         kunmap_atomic(destp);
944                         srcp += curlen;
945                         copy_len -= curlen;
946                         fixup_copy_count += curlen;
947                         pagelist_len -= curlen;
948                         if (!pagelist_len)
949                                 break;
950                         page_base = 0;
951                 }
952
953                 /* Implicit padding for the last segment in a Write
954                  * chunk is inserted inline at the front of the tail
955                  * iovec. The upper layer ignores the content of
956                  * the pad. Simply ensure inline content in the tail
957                  * that follows the Write chunk is properly aligned.
958                  */
959                 if (pad)
960                         srcp -= pad;
961         }
962
963         /* The tail iovec is redirected to the remaining data
964          * in the receive buffer, to avoid a memcopy.
965          */
966         if (copy_len || pad) {
967                 rqst->rq_rcv_buf.tail[0].iov_base = srcp;
968                 rqst->rq_private_buf.tail[0].iov_base = srcp;
969         }
970
971         return fixup_copy_count;
972 }
973
974 /* By convention, backchannel calls arrive via rdma_msg type
975  * messages, and never populate the chunk lists. This makes
976  * the RPC/RDMA header small and fixed in size, so it is
977  * straightforward to check the RPC header's direction field.
978  */
979 static bool
980 rpcrdma_is_bcall(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
981 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
982 {
983         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
984         struct xdr_stream *xdr = &rep->rr_stream;
985         __be32 *p;
986
987         if (rep->rr_proc != rdma_msg)
988                 return false;
989
990         /* Peek at stream contents without advancing. */
991         p = xdr_inline_decode(xdr, 0);
992
993         /* Chunk lists */
994         if (*p++ != xdr_zero)
995                 return false;
996         if (*p++ != xdr_zero)
997                 return false;
998         if (*p++ != xdr_zero)
999                 return false;
1000
1001         /* RPC header */
1002         if (*p++ != rep->rr_xid)
1003                 return false;
1004         if (*p != cpu_to_be32(RPC_CALL))
1005                 return false;
1006
1007         /* No bc service. */
1008         if (xprt->bc_serv == NULL)
1009                 return false;
1010
1011         /* Now that we are sure this is a backchannel call,
1012          * advance to the RPC header.
1013          */
1014         p = xdr_inline_decode(xdr, 3 * sizeof(*p));
1015         if (unlikely(!p))
1016                 goto out_short;
1017
1018         rpcrdma_bc_receive_call(r_xprt, rep);
1019         return true;
1020
1021 out_short:
1022         pr_warn("RPC/RDMA short backward direction call\n");
1023         return true;
1024 }
1025 #else   /* CONFIG_SUNRPC_BACKCHANNEL */
1026 {
1027         return false;
1028 }
1029 #endif  /* CONFIG_SUNRPC_BACKCHANNEL */
1030
1031 static int decode_rdma_segment(struct xdr_stream *xdr, u32 *length)
1032 {
1033         u32 handle;
1034         u64 offset;
1035         __be32 *p;
1036
1037         p = xdr_inline_decode(xdr, 4 * sizeof(*p));
1038         if (unlikely(!p))
1039                 return -EIO;
1040
1041         handle = be32_to_cpup(p++);
1042         *length = be32_to_cpup(p++);
1043         xdr_decode_hyper(p, &offset);
1044
1045         trace_xprtrdma_decode_seg(handle, *length, offset);
1046         return 0;
1047 }
1048
1049 static int decode_write_chunk(struct xdr_stream *xdr, u32 *length)
1050 {
1051         u32 segcount, seglength;
1052         __be32 *p;
1053
1054         p = xdr_inline_decode(xdr, sizeof(*p));
1055         if (unlikely(!p))
1056                 return -EIO;
1057
1058         *length = 0;
1059         segcount = be32_to_cpup(p);
1060         while (segcount--) {
1061                 if (decode_rdma_segment(xdr, &seglength))
1062                         return -EIO;
1063                 *length += seglength;
1064         }
1065
1066         return 0;
1067 }
1068
1069 /* In RPC-over-RDMA Version One replies, a Read list is never
1070  * expected. This decoder is a stub that returns an error if
1071  * a Read list is present.
1072  */
1073 static int decode_read_list(struct xdr_stream *xdr)
1074 {
1075         __be32 *p;
1076
1077         p = xdr_inline_decode(xdr, sizeof(*p));
1078         if (unlikely(!p))
1079                 return -EIO;
1080         if (unlikely(*p != xdr_zero))
1081                 return -EIO;
1082         return 0;
1083 }
1084
1085 /* Supports only one Write chunk in the Write list
1086  */
1087 static int decode_write_list(struct xdr_stream *xdr, u32 *length)
1088 {
1089         u32 chunklen;
1090         bool first;
1091         __be32 *p;
1092
1093         *length = 0;
1094         first = true;
1095         do {
1096                 p = xdr_inline_decode(xdr, sizeof(*p));
1097                 if (unlikely(!p))
1098                         return -EIO;
1099                 if (*p == xdr_zero)
1100                         break;
1101                 if (!first)
1102                         return -EIO;
1103
1104                 if (decode_write_chunk(xdr, &chunklen))
1105                         return -EIO;
1106                 *length += chunklen;
1107                 first = false;
1108         } while (true);
1109         return 0;
1110 }
1111
1112 static int decode_reply_chunk(struct xdr_stream *xdr, u32 *length)
1113 {
1114         __be32 *p;
1115
1116         p = xdr_inline_decode(xdr, sizeof(*p));
1117         if (unlikely(!p))
1118                 return -EIO;
1119
1120         *length = 0;
1121         if (*p != xdr_zero)
1122                 if (decode_write_chunk(xdr, length))
1123                         return -EIO;
1124         return 0;
1125 }
1126
1127 static int
1128 rpcrdma_decode_msg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1129                    struct rpc_rqst *rqst)
1130 {
1131         struct xdr_stream *xdr = &rep->rr_stream;
1132         u32 writelist, replychunk, rpclen;
1133         char *base;
1134
1135         /* Decode the chunk lists */
1136         if (decode_read_list(xdr))
1137                 return -EIO;
1138         if (decode_write_list(xdr, &writelist))
1139                 return -EIO;
1140         if (decode_reply_chunk(xdr, &replychunk))
1141                 return -EIO;
1142
1143         /* RDMA_MSG sanity checks */
1144         if (unlikely(replychunk))
1145                 return -EIO;
1146
1147         /* Build the RPC reply's Payload stream in rqst->rq_rcv_buf */
1148         base = (char *)xdr_inline_decode(xdr, 0);
1149         rpclen = xdr_stream_remaining(xdr);
1150         r_xprt->rx_stats.fixup_copy_count +=
1151                 rpcrdma_inline_fixup(rqst, base, rpclen, writelist & 3);
1152
1153         r_xprt->rx_stats.total_rdma_reply += writelist;
1154         return rpclen + xdr_align_size(writelist);
1155 }
1156
1157 static noinline int
1158 rpcrdma_decode_nomsg(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep)
1159 {
1160         struct xdr_stream *xdr = &rep->rr_stream;
1161         u32 writelist, replychunk;
1162
1163         /* Decode the chunk lists */
1164         if (decode_read_list(xdr))
1165                 return -EIO;
1166         if (decode_write_list(xdr, &writelist))
1167                 return -EIO;
1168         if (decode_reply_chunk(xdr, &replychunk))
1169                 return -EIO;
1170
1171         /* RDMA_NOMSG sanity checks */
1172         if (unlikely(writelist))
1173                 return -EIO;
1174         if (unlikely(!replychunk))
1175                 return -EIO;
1176
1177         /* Reply chunk buffer already is the reply vector */
1178         r_xprt->rx_stats.total_rdma_reply += replychunk;
1179         return replychunk;
1180 }
1181
1182 static noinline int
1183 rpcrdma_decode_error(struct rpcrdma_xprt *r_xprt, struct rpcrdma_rep *rep,
1184                      struct rpc_rqst *rqst)
1185 {
1186         struct xdr_stream *xdr = &rep->rr_stream;
1187         __be32 *p;
1188
1189         p = xdr_inline_decode(xdr, sizeof(*p));
1190         if (unlikely(!p))
1191                 return -EIO;
1192
1193         switch (*p) {
1194         case err_vers:
1195                 p = xdr_inline_decode(xdr, 2 * sizeof(*p));
1196                 if (!p)
1197                         break;
1198                 dprintk("RPC: %5u: %s: server reports version error (%u-%u)\n",
1199                         rqst->rq_task->tk_pid, __func__,
1200                         be32_to_cpup(p), be32_to_cpu(*(p + 1)));
1201                 break;
1202         case err_chunk:
1203                 dprintk("RPC: %5u: %s: server reports header decoding error\n",
1204                         rqst->rq_task->tk_pid, __func__);
1205                 break;
1206         default:
1207                 dprintk("RPC: %5u: %s: server reports unrecognized error %d\n",
1208                         rqst->rq_task->tk_pid, __func__, be32_to_cpup(p));
1209         }
1210
1211         r_xprt->rx_stats.bad_reply_count++;
1212         return -EREMOTEIO;
1213 }
1214
1215 /* Perform XID lookup, reconstruction of the RPC reply, and
1216  * RPC completion while holding the transport lock to ensure
1217  * the rep, rqst, and rq_task pointers remain stable.
1218  */
1219 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
1220 {
1221         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1222         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1223         struct rpc_rqst *rqst = rep->rr_rqst;
1224         unsigned long cwnd;
1225         int status;
1226
1227         xprt->reestablish_timeout = 0;
1228
1229         switch (rep->rr_proc) {
1230         case rdma_msg:
1231                 status = rpcrdma_decode_msg(r_xprt, rep, rqst);
1232                 break;
1233         case rdma_nomsg:
1234                 status = rpcrdma_decode_nomsg(r_xprt, rep);
1235                 break;
1236         case rdma_error:
1237                 status = rpcrdma_decode_error(r_xprt, rep, rqst);
1238                 break;
1239         default:
1240                 status = -EIO;
1241         }
1242         if (status < 0)
1243                 goto out_badheader;
1244
1245 out:
1246         spin_lock(&xprt->recv_lock);
1247         cwnd = xprt->cwnd;
1248         xprt->cwnd = r_xprt->rx_buf.rb_credits << RPC_CWNDSHIFT;
1249         if (xprt->cwnd > cwnd)
1250                 xprt_release_rqst_cong(rqst->rq_task);
1251
1252         xprt_complete_rqst(rqst->rq_task, status);
1253         xprt_unpin_rqst(rqst);
1254         spin_unlock(&xprt->recv_lock);
1255         return;
1256
1257 /* If the incoming reply terminated a pending RPC, the next
1258  * RPC call will post a replacement receive buffer as it is
1259  * being marshaled.
1260  */
1261 out_badheader:
1262         trace_xprtrdma_reply_hdr(rep);
1263         r_xprt->rx_stats.bad_reply_count++;
1264         status = -EIO;
1265         goto out;
1266 }
1267
1268 void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
1269 {
1270         /* Invalidate and unmap the data payloads before waking
1271          * the waiting application. This guarantees the memory
1272          * regions are properly fenced from the server before the
1273          * application accesses the data. It also ensures proper
1274          * send flow control: waking the next RPC waits until this
1275          * RPC has relinquished all its Send Queue entries.
1276          */
1277         if (!list_empty(&req->rl_registered))
1278                 r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
1279                                                     &req->rl_registered);
1280
1281         /* Ensure that any DMA mapped pages associated with
1282          * the Send of the RPC Call have been unmapped before
1283          * allowing the RPC to complete. This protects argument
1284          * memory not controlled by the RPC client from being
1285          * re-used before we're done with it.
1286          */
1287         if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
1288                 r_xprt->rx_stats.reply_waits_for_send++;
1289                 out_of_line_wait_on_bit(&req->rl_flags,
1290                                         RPCRDMA_REQ_F_TX_RESOURCES,
1291                                         bit_wait,
1292                                         TASK_UNINTERRUPTIBLE);
1293         }
1294 }
1295
1296 /* Reply handling runs in the poll worker thread. Anything that
1297  * might wait is deferred to a separate workqueue.
1298  */
1299 void rpcrdma_deferred_completion(struct work_struct *work)
1300 {
1301         struct rpcrdma_rep *rep =
1302                         container_of(work, struct rpcrdma_rep, rr_work);
1303         struct rpcrdma_req *req = rpcr_to_rdmar(rep->rr_rqst);
1304         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1305
1306         trace_xprtrdma_defer_cmp(rep);
1307         if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
1308                 r_xprt->rx_ia.ri_ops->ro_reminv(rep, &req->rl_registered);
1309         rpcrdma_release_rqst(r_xprt, req);
1310         rpcrdma_complete_rqst(rep);
1311 }
1312
1313 /* Process received RPC/RDMA messages.
1314  *
1315  * Errors must result in the RPC task either being awakened, or
1316  * allowed to timeout, to discover the errors at that time.
1317  */
1318 void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
1319 {
1320         struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
1321         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
1322         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
1323         struct rpcrdma_req *req;
1324         struct rpc_rqst *rqst;
1325         u32 credits;
1326         __be32 *p;
1327
1328         --buf->rb_posted_receives;
1329
1330         if (rep->rr_hdrbuf.head[0].iov_len == 0)
1331                 goto out_badstatus;
1332
1333         /* Fixed transport header fields */
1334         xdr_init_decode(&rep->rr_stream, &rep->rr_hdrbuf,
1335                         rep->rr_hdrbuf.head[0].iov_base);
1336         p = xdr_inline_decode(&rep->rr_stream, 4 * sizeof(*p));
1337         if (unlikely(!p))
1338                 goto out_shortreply;
1339         rep->rr_xid = *p++;
1340         rep->rr_vers = *p++;
1341         credits = be32_to_cpu(*p++);
1342         rep->rr_proc = *p++;
1343
1344         if (rep->rr_vers != rpcrdma_version)
1345                 goto out_badversion;
1346
1347         if (rpcrdma_is_bcall(r_xprt, rep))
1348                 return;
1349
1350         /* Match incoming rpcrdma_rep to an rpcrdma_req to
1351          * get context for handling any incoming chunks.
1352          */
1353         spin_lock(&xprt->recv_lock);
1354         rqst = xprt_lookup_rqst(xprt, rep->rr_xid);
1355         if (!rqst)
1356                 goto out_norqst;
1357         xprt_pin_rqst(rqst);
1358
1359         if (credits == 0)
1360                 credits = 1;    /* don't deadlock */
1361         else if (credits > buf->rb_max_requests)
1362                 credits = buf->rb_max_requests;
1363         buf->rb_credits = credits;
1364
1365         spin_unlock(&xprt->recv_lock);
1366
1367         req = rpcr_to_rdmar(rqst);
1368         if (req->rl_reply) {
1369                 trace_xprtrdma_leaked_rep(rqst, req->rl_reply);
1370                 rpcrdma_recv_buffer_put(req->rl_reply);
1371         }
1372         req->rl_reply = rep;
1373         rep->rr_rqst = rqst;
1374         clear_bit(RPCRDMA_REQ_F_PENDING, &req->rl_flags);
1375
1376         trace_xprtrdma_reply(rqst->rq_task, rep, req, credits);
1377
1378         rpcrdma_post_recvs(r_xprt, false);
1379         queue_work(rpcrdma_receive_wq, &rep->rr_work);
1380         return;
1381
1382 out_badversion:
1383         trace_xprtrdma_reply_vers(rep);
1384         goto repost;
1385
1386 /* The RPC transaction has already been terminated, or the header
1387  * is corrupt.
1388  */
1389 out_norqst:
1390         spin_unlock(&xprt->recv_lock);
1391         trace_xprtrdma_reply_rqst(rep);
1392         goto repost;
1393
1394 out_shortreply:
1395         trace_xprtrdma_reply_short(rep);
1396
1397 /* If no pending RPC transaction was matched, post a replacement
1398  * receive buffer before returning.
1399  */
1400 repost:
1401         rpcrdma_post_recvs(r_xprt, false);
1402 out_badstatus:
1403         rpcrdma_recv_buffer_put(rep);
1404 }