GNU Linux-libre 4.9.337-gnu1
[releases.git] / net / sunrpc / xprtrdma / backchannel.c
1 /*
2  * Copyright (c) 2015 Oracle.  All rights reserved.
3  *
4  * Support for backward direction RPCs on RPC/RDMA.
5  */
6
7 #include <linux/module.h>
8 #include <linux/sunrpc/xprt.h>
9 #include <linux/sunrpc/svc.h>
10 #include <linux/sunrpc/svc_xprt.h>
11
12 #include "xprt_rdma.h"
13
14 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
15 # define RPCDBG_FACILITY        RPCDBG_TRANS
16 #endif
17
18 #undef RPCRDMA_BACKCHANNEL_DEBUG
19
20 static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
21                                  struct rpc_rqst *rqst)
22 {
23         struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
24         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
25
26         spin_lock(&buf->rb_reqslock);
27         list_del(&req->rl_all);
28         spin_unlock(&buf->rb_reqslock);
29
30         rpcrdma_destroy_req(req);
31
32         kfree(rqst);
33 }
34
35 static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
36                                  struct rpc_rqst *rqst)
37 {
38         struct rpcrdma_regbuf *rb;
39         struct rpcrdma_req *req;
40         size_t size;
41
42         req = rpcrdma_create_req(r_xprt);
43         if (IS_ERR(req))
44                 return PTR_ERR(req);
45         req->rl_backchannel = true;
46
47         rb = rpcrdma_alloc_regbuf(RPCRDMA_HDRBUF_SIZE,
48                                   DMA_TO_DEVICE, GFP_KERNEL);
49         if (IS_ERR(rb))
50                 goto out_fail;
51         req->rl_rdmabuf = rb;
52
53         size = r_xprt->rx_data.inline_rsize;
54         rb = rpcrdma_alloc_regbuf(size, DMA_TO_DEVICE, GFP_KERNEL);
55         if (IS_ERR(rb))
56                 goto out_fail;
57         req->rl_sendbuf = rb;
58         xdr_buf_init(&rqst->rq_snd_buf, rb->rg_base, size);
59         rpcrdma_set_xprtdata(rqst, req);
60         return 0;
61
62 out_fail:
63         rpcrdma_bc_free_rqst(r_xprt, rqst);
64         return -ENOMEM;
65 }
66
67 /* Allocate and add receive buffers to the rpcrdma_buffer's
68  * existing list of rep's. These are released when the
69  * transport is destroyed.
70  */
71 static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
72                                  unsigned int count)
73 {
74         int rc = 0;
75
76         while (count--) {
77                 rc = rpcrdma_create_rep(r_xprt);
78                 if (rc)
79                         break;
80         }
81         return rc;
82 }
83
84 /**
85  * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
86  * @xprt: transport associated with these backchannel resources
87  * @reqs: number of concurrent incoming requests to expect
88  *
89  * Returns 0 on success; otherwise a negative errno
90  */
91 int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
92 {
93         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
94         struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
95         struct rpc_rqst *rqst;
96         unsigned int i;
97         int rc;
98
99         /* The backchannel reply path returns each rpc_rqst to the
100          * bc_pa_list _after_ the reply is sent. If the server is
101          * faster than the client, it can send another backward
102          * direction request before the rpc_rqst is returned to the
103          * list. The client rejects the request in this case.
104          *
105          * Twice as many rpc_rqsts are prepared to ensure there is
106          * always an rpc_rqst available as soon as a reply is sent.
107          */
108         if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
109                 goto out_err;
110
111         for (i = 0; i < (reqs << 1); i++) {
112                 rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
113                 if (!rqst) {
114                         pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
115                                __func__);
116                         goto out_free;
117                 }
118                 dprintk("RPC:       %s: new rqst %p\n", __func__, rqst);
119
120                 rqst->rq_xprt = &r_xprt->rx_xprt;
121                 INIT_LIST_HEAD(&rqst->rq_list);
122                 INIT_LIST_HEAD(&rqst->rq_bc_list);
123
124                 if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
125                         goto out_free;
126
127                 spin_lock_bh(&xprt->bc_pa_lock);
128                 list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
129                 spin_unlock_bh(&xprt->bc_pa_lock);
130         }
131
132         rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
133         if (rc)
134                 goto out_free;
135
136         rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
137         if (rc)
138                 goto out_free;
139
140         buffer->rb_bc_srv_max_requests = reqs;
141         request_module("svcrdma");
142
143         return 0;
144
145 out_free:
146         xprt_rdma_bc_destroy(xprt, reqs);
147
148 out_err:
149         pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
150         return -ENOMEM;
151 }
152
153 /**
154  * xprt_rdma_bc_up - Create transport endpoint for backchannel service
155  * @serv: server endpoint
156  * @net: network namespace
157  *
158  * The "xprt" is an implied argument: it supplies the name of the
159  * backchannel transport class.
160  *
161  * Returns zero on success, negative errno on failure
162  */
163 int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
164 {
165         int ret;
166
167         ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
168         if (ret < 0)
169                 return ret;
170         return 0;
171 }
172
173 /**
174  * xprt_rdma_bc_maxpayload - Return maximum backchannel message size
175  * @xprt: transport
176  *
177  * Returns maximum size, in bytes, of a backchannel message
178  */
179 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
180 {
181         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
182         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
183         size_t maxmsg;
184
185         maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize);
186         return maxmsg - RPCRDMA_HDRLEN_MIN;
187 }
188
189 /**
190  * rpcrdma_bc_marshal_reply - Send backwards direction reply
191  * @rqst: buffer containing RPC reply data
192  *
193  * Returns zero on success.
194  */
195 int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
196 {
197         struct rpc_xprt *xprt = rqst->rq_xprt;
198         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
199         struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
200         struct rpcrdma_msg *headerp;
201
202         headerp = rdmab_to_msg(req->rl_rdmabuf);
203         headerp->rm_xid = rqst->rq_xid;
204         headerp->rm_vers = rpcrdma_version;
205         headerp->rm_credit =
206                         cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
207         headerp->rm_type = rdma_msg;
208         headerp->rm_body.rm_chunks[0] = xdr_zero;
209         headerp->rm_body.rm_chunks[1] = xdr_zero;
210         headerp->rm_body.rm_chunks[2] = xdr_zero;
211
212         if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, RPCRDMA_HDRLEN_MIN,
213                                        &rqst->rq_snd_buf, rpcrdma_noch))
214                 return -EIO;
215         return 0;
216 }
217
218 /**
219  * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
220  * @xprt: transport associated with these backchannel resources
221  * @reqs: number of incoming requests to destroy; ignored
222  */
223 void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
224 {
225         struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
226         struct rpc_rqst *rqst, *tmp;
227
228         spin_lock_bh(&xprt->bc_pa_lock);
229         list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
230                 list_del(&rqst->rq_bc_pa_list);
231                 spin_unlock_bh(&xprt->bc_pa_lock);
232
233                 rpcrdma_bc_free_rqst(r_xprt, rqst);
234
235                 spin_lock_bh(&xprt->bc_pa_lock);
236         }
237         spin_unlock_bh(&xprt->bc_pa_lock);
238 }
239
240 /**
241  * xprt_rdma_bc_free_rqst - Release a backchannel rqst
242  * @rqst: request to release
243  */
244 void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
245 {
246         struct rpc_xprt *xprt = rqst->rq_xprt;
247
248         dprintk("RPC:       %s: freeing rqst %p (req %p)\n",
249                 __func__, rqst, rpcr_to_rdmar(rqst));
250
251         smp_mb__before_atomic();
252         WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
253         clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
254         smp_mb__after_atomic();
255
256         spin_lock_bh(&xprt->bc_pa_lock);
257         list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
258         spin_unlock_bh(&xprt->bc_pa_lock);
259 }
260
261 /**
262  * rpcrdma_bc_receive_call - Handle a backward direction call
263  * @xprt: transport receiving the call
264  * @rep: receive buffer containing the call
265  *
266  * Called in the RPC reply handler, which runs in a tasklet.
267  * Be quick about it.
268  *
269  * Operational assumptions:
270  *    o Backchannel credits are ignored, just as the NFS server
271  *      forechannel currently does
272  *    o The ULP manages a replay cache (eg, NFSv4.1 sessions).
273  *      No replay detection is done at the transport level
274  */
275 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
276                              struct rpcrdma_rep *rep)
277 {
278         struct rpc_xprt *xprt = &r_xprt->rx_xprt;
279         struct rpcrdma_msg *headerp;
280         struct svc_serv *bc_serv;
281         struct rpcrdma_req *req;
282         struct rpc_rqst *rqst;
283         struct xdr_buf *buf;
284         size_t size;
285         __be32 *p;
286
287         headerp = rdmab_to_msg(rep->rr_rdmabuf);
288 #ifdef RPCRDMA_BACKCHANNEL_DEBUG
289         pr_info("RPC:       %s: callback XID %08x, length=%u\n",
290                 __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
291         pr_info("RPC:       %s: %*ph\n", __func__, rep->rr_len, headerp);
292 #endif
293
294         /* Sanity check:
295          * Need at least enough bytes for RPC/RDMA header, as code
296          * here references the header fields by array offset. Also,
297          * backward calls are always inline, so ensure there
298          * are some bytes beyond the RPC/RDMA header.
299          */
300         if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
301                 goto out_short;
302         p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
303         size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
304
305         /* Grab a free bc rqst */
306         spin_lock(&xprt->bc_pa_lock);
307         if (list_empty(&xprt->bc_pa_list)) {
308                 spin_unlock(&xprt->bc_pa_lock);
309                 goto out_overflow;
310         }
311         rqst = list_first_entry(&xprt->bc_pa_list,
312                                 struct rpc_rqst, rq_bc_pa_list);
313         list_del(&rqst->rq_bc_pa_list);
314         spin_unlock(&xprt->bc_pa_lock);
315         dprintk("RPC:       %s: using rqst %p\n", __func__, rqst);
316
317         /* Prepare rqst */
318         rqst->rq_reply_bytes_recvd = 0;
319         rqst->rq_bytes_sent = 0;
320         rqst->rq_xid = headerp->rm_xid;
321
322         rqst->rq_private_buf.len = size;
323         set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
324
325         buf = &rqst->rq_rcv_buf;
326         memset(buf, 0, sizeof(*buf));
327         buf->head[0].iov_base = p;
328         buf->head[0].iov_len = size;
329         buf->len = size;
330
331         /* The receive buffer has to be hooked to the rpcrdma_req
332          * so that it can be reposted after the server is done
333          * parsing it but just before sending the backward
334          * direction reply.
335          */
336         req = rpcr_to_rdmar(rqst);
337         dprintk("RPC:       %s: attaching rep %p to req %p\n",
338                 __func__, rep, req);
339         req->rl_reply = rep;
340
341         /* Defeat the retransmit detection logic in send_request */
342         req->rl_connect_cookie = 0;
343
344         /* Queue rqst for ULP's callback service */
345         bc_serv = xprt->bc_serv;
346         spin_lock(&bc_serv->sv_cb_lock);
347         list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
348         spin_unlock(&bc_serv->sv_cb_lock);
349
350         wake_up(&bc_serv->sv_cb_waitq);
351
352         r_xprt->rx_stats.bcall_count++;
353         return;
354
355 out_overflow:
356         pr_warn("RPC/RDMA backchannel overflow\n");
357         xprt_disconnect_done(xprt);
358         /* This receive buffer gets reposted automatically
359          * when the connection is re-established.
360          */
361         return;
362
363 out_short:
364         pr_warn("RPC/RDMA short backward direction call\n");
365
366         if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, rep))
367                 xprt_disconnect_done(xprt);
368         else
369                 pr_warn("RPC:       %s: reposting rep %p\n",
370                         __func__, rep);
371 }