GNU Linux-libre 4.14.290-gnu1
[releases.git] / net / rxrpc / recvmsg.c
1 /* RxRPC recvmsg() implementation
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
13
14 #include <linux/net.h>
15 #include <linux/skbuff.h>
16 #include <linux/export.h>
17 #include <linux/sched/signal.h>
18
19 #include <net/sock.h>
20 #include <net/af_rxrpc.h>
21 #include "ar-internal.h"
22
23 /*
24  * Post a call for attention by the socket or kernel service.  Further
25  * notifications are suppressed by putting recvmsg_link on a dummy queue.
26  */
27 void rxrpc_notify_socket(struct rxrpc_call *call)
28 {
29         struct rxrpc_sock *rx;
30         struct sock *sk;
31
32         _enter("%d", call->debug_id);
33
34         if (!list_empty(&call->recvmsg_link))
35                 return;
36
37         rcu_read_lock();
38
39         rx = rcu_dereference(call->socket);
40         sk = &rx->sk;
41         if (rx && sk->sk_state < RXRPC_CLOSE) {
42                 if (call->notify_rx) {
43                         call->notify_rx(sk, call, call->user_call_ID);
44                 } else {
45                         write_lock_bh(&rx->recvmsg_lock);
46                         if (list_empty(&call->recvmsg_link)) {
47                                 rxrpc_get_call(call, rxrpc_call_got);
48                                 list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
49                         }
50                         write_unlock_bh(&rx->recvmsg_lock);
51
52                         if (!sock_flag(sk, SOCK_DEAD)) {
53                                 _debug("call %ps", sk->sk_data_ready);
54                                 sk->sk_data_ready(sk);
55                         }
56                 }
57         }
58
59         rcu_read_unlock();
60         _leave("");
61 }
62
63 /*
64  * Pass a call terminating message to userspace.
65  */
66 static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
67 {
68         u32 tmp = 0;
69         int ret;
70
71         switch (call->completion) {
72         case RXRPC_CALL_SUCCEEDED:
73                 ret = 0;
74                 if (rxrpc_is_service_call(call))
75                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
76                 break;
77         case RXRPC_CALL_REMOTELY_ABORTED:
78                 tmp = call->abort_code;
79                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
80                 break;
81         case RXRPC_CALL_LOCALLY_ABORTED:
82                 tmp = call->abort_code;
83                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
84                 break;
85         case RXRPC_CALL_NETWORK_ERROR:
86                 tmp = -call->error;
87                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
88                 break;
89         case RXRPC_CALL_LOCAL_ERROR:
90                 tmp = -call->error;
91                 ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
92                 break;
93         default:
94                 pr_err("Invalid terminal call state %u\n", call->state);
95                 BUG();
96                 break;
97         }
98
99         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
100                             call->rx_pkt_offset, call->rx_pkt_len, ret);
101         return ret;
102 }
103
104 /*
105  * Pass back notification of a new call.  The call is added to the
106  * to-be-accepted list.  This means that the next call to be accepted might not
107  * be the last call seen awaiting acceptance, but unless we leave this on the
108  * front of the queue and block all other messages until someone gives us a
109  * user_ID for it, there's not a lot we can do.
110  */
111 static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
112                                   struct rxrpc_call *call,
113                                   struct msghdr *msg, int flags)
114 {
115         int tmp = 0, ret;
116
117         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
118
119         if (ret == 0 && !(flags & MSG_PEEK)) {
120                 _debug("to be accepted");
121                 write_lock_bh(&rx->recvmsg_lock);
122                 list_del_init(&call->recvmsg_link);
123                 write_unlock_bh(&rx->recvmsg_lock);
124
125                 rxrpc_get_call(call, rxrpc_call_got);
126                 write_lock(&rx->call_lock);
127                 list_add_tail(&call->accept_link, &rx->to_be_accepted);
128                 write_unlock(&rx->call_lock);
129         }
130
131         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
132         return ret;
133 }
134
135 /*
136  * End the packet reception phase.
137  */
138 static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
139 {
140         _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
141
142         trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
143         ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
144
145         if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
146                 rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false,
147                                   rxrpc_propose_ack_terminal_ack);
148                 rxrpc_send_ack_packet(call, false);
149         }
150
151         write_lock_bh(&call->state_lock);
152
153         switch (call->state) {
154         case RXRPC_CALL_CLIENT_RECV_REPLY:
155                 __rxrpc_call_completed(call);
156                 write_unlock_bh(&call->state_lock);
157                 break;
158
159         case RXRPC_CALL_SERVER_RECV_REQUEST:
160                 call->tx_phase = true;
161                 call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
162                 call->ack_at = call->expire_at;
163                 write_unlock_bh(&call->state_lock);
164                 rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true,
165                                   rxrpc_propose_ack_processing_op);
166                 break;
167         default:
168                 write_unlock_bh(&call->state_lock);
169                 break;
170         }
171 }
172
173 /*
174  * Discard a packet we've used up and advance the Rx window by one.
175  */
176 static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
177 {
178         struct rxrpc_skb_priv *sp;
179         struct sk_buff *skb;
180         rxrpc_serial_t serial;
181         rxrpc_seq_t hard_ack, top;
182         u8 flags;
183         int ix;
184
185         _enter("%d", call->debug_id);
186
187         hard_ack = call->rx_hard_ack;
188         top = smp_load_acquire(&call->rx_top);
189         ASSERT(before(hard_ack, top));
190
191         hard_ack++;
192         ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
193         skb = call->rxtx_buffer[ix];
194         rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
195         sp = rxrpc_skb(skb);
196         flags = sp->hdr.flags;
197         serial = sp->hdr.serial;
198         if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
199                 serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
200
201         call->rxtx_buffer[ix] = NULL;
202         call->rxtx_annotations[ix] = 0;
203         /* Barrier against rxrpc_input_data(). */
204         smp_store_release(&call->rx_hard_ack, hard_ack);
205
206         rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
207
208         _debug("%u,%u,%02x", hard_ack, top, flags);
209         trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
210         if (flags & RXRPC_LAST_PACKET) {
211                 rxrpc_end_rx_phase(call, serial);
212         } else {
213                 /* Check to see if there's an ACK that needs sending. */
214                 if (after_eq(hard_ack, call->ackr_consumed + 2) ||
215                     after_eq(top, call->ackr_seen + 2) ||
216                     (hard_ack == top && after(hard_ack, call->ackr_consumed)))
217                         rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial,
218                                           true, false,
219                                           rxrpc_propose_ack_rotate_rx);
220                 if (call->ackr_reason)
221                         rxrpc_send_ack_packet(call, false);
222         }
223 }
224
225 /*
226  * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
227  * padding, but if this is the case, the packet length will be resident in the
228  * socket buffer.  Note that we can't modify the master skb info as the skb may
229  * be the home to multiple subpackets.
230  */
231 static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
232                                u8 annotation,
233                                unsigned int offset, unsigned int len)
234 {
235         struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
236         rxrpc_seq_t seq = sp->hdr.seq;
237         u16 cksum = sp->hdr.cksum;
238
239         _enter("");
240
241         /* For all but the head jumbo subpacket, the security checksum is in a
242          * jumbo header immediately prior to the data.
243          */
244         if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
245                 __be16 tmp;
246                 if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
247                         BUG();
248                 cksum = ntohs(tmp);
249                 seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
250         }
251
252         return call->conn->security->verify_packet(call, skb, offset, len,
253                                                    seq, cksum);
254 }
255
256 /*
257  * Locate the data within a packet.  This is complicated by:
258  *
259  * (1) An skb may contain a jumbo packet - so we have to find the appropriate
260  *     subpacket.
261  *
262  * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
263  *     contains an extra header which includes the true length of the data,
264  *     excluding any encrypted padding.
265  */
266 static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
267                              u8 *_annotation,
268                              unsigned int *_offset, unsigned int *_len)
269 {
270         unsigned int offset = sizeof(struct rxrpc_wire_header);
271         unsigned int len = *_len;
272         int ret;
273         u8 annotation = *_annotation;
274
275         /* Locate the subpacket */
276         len = skb->len - offset;
277         if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
278                 offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
279                            RXRPC_JUMBO_SUBPKTLEN);
280                 len = (annotation & RXRPC_RX_ANNO_JLAST) ?
281                         skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
282         }
283
284         if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
285                 ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
286                 if (ret < 0)
287                         return ret;
288                 *_annotation |= RXRPC_RX_ANNO_VERIFIED;
289         }
290
291         *_offset = offset;
292         *_len = len;
293         call->conn->security->locate_data(call, skb, _offset, _len);
294         return 0;
295 }
296
297 /*
298  * Deliver messages to a call.  This keeps processing packets until the buffer
299  * is filled and we find either more DATA (returns 0) or the end of the DATA
300  * (returns 1).  If more packets are required, it returns -EAGAIN.
301  */
302 static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
303                               struct msghdr *msg, struct iov_iter *iter,
304                               size_t len, int flags, size_t *_offset)
305 {
306         struct rxrpc_skb_priv *sp;
307         struct sk_buff *skb;
308         rxrpc_seq_t hard_ack, top, seq;
309         size_t remain;
310         bool last;
311         unsigned int rx_pkt_offset, rx_pkt_len;
312         int ix, copy, ret = -EAGAIN, ret2;
313
314         rx_pkt_offset = call->rx_pkt_offset;
315         rx_pkt_len = call->rx_pkt_len;
316
317         if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
318                 seq = call->rx_hard_ack;
319                 ret = 1;
320                 goto done;
321         }
322
323         /* Barriers against rxrpc_input_data(). */
324         hard_ack = call->rx_hard_ack;
325         seq = hard_ack + 1;
326         while (top = smp_load_acquire(&call->rx_top),
327                before_eq(seq, top)
328                ) {
329                 ix = seq & RXRPC_RXTX_BUFF_MASK;
330                 skb = call->rxtx_buffer[ix];
331                 if (!skb) {
332                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
333                                             rx_pkt_offset, rx_pkt_len, 0);
334                         break;
335                 }
336                 smp_rmb();
337                 rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
338                 sp = rxrpc_skb(skb);
339
340                 if (!(flags & MSG_PEEK))
341                         trace_rxrpc_receive(call, rxrpc_receive_front,
342                                             sp->hdr.serial, seq);
343
344                 if (msg)
345                         sock_recv_timestamp(msg, sock->sk, skb);
346
347                 if (rx_pkt_offset == 0) {
348                         ret2 = rxrpc_locate_data(call, skb,
349                                                  &call->rxtx_annotations[ix],
350                                                  &rx_pkt_offset, &rx_pkt_len);
351                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
352                                             rx_pkt_offset, rx_pkt_len, ret2);
353                         if (ret2 < 0) {
354                                 ret = ret2;
355                                 goto out;
356                         }
357                 } else {
358                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
359                                             rx_pkt_offset, rx_pkt_len, 0);
360                 }
361
362                 /* We have to handle short, empty and used-up DATA packets. */
363                 remain = len - *_offset;
364                 copy = rx_pkt_len;
365                 if (copy > remain)
366                         copy = remain;
367                 if (copy > 0) {
368                         ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
369                                                       copy);
370                         if (ret2 < 0) {
371                                 ret = ret2;
372                                 goto out;
373                         }
374
375                         /* handle piecemeal consumption of data packets */
376                         rx_pkt_offset += copy;
377                         rx_pkt_len -= copy;
378                         *_offset += copy;
379                 }
380
381                 if (rx_pkt_len > 0) {
382                         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
383                                             rx_pkt_offset, rx_pkt_len, 0);
384                         ASSERTCMP(*_offset, ==, len);
385                         ret = 0;
386                         break;
387                 }
388
389                 /* The whole packet has been transferred. */
390                 last = sp->hdr.flags & RXRPC_LAST_PACKET;
391                 if (!(flags & MSG_PEEK))
392                         rxrpc_rotate_rx_window(call);
393                 rx_pkt_offset = 0;
394                 rx_pkt_len = 0;
395
396                 if (last) {
397                         ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
398                         ret = 1;
399                         goto out;
400                 }
401
402                 seq++;
403         }
404
405 out:
406         if (!(flags & MSG_PEEK)) {
407                 call->rx_pkt_offset = rx_pkt_offset;
408                 call->rx_pkt_len = rx_pkt_len;
409         }
410 done:
411         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
412                             rx_pkt_offset, rx_pkt_len, ret);
413         return ret;
414 }
415
416 /*
417  * Receive a message from an RxRPC socket
418  * - we need to be careful about two or more threads calling recvmsg
419  *   simultaneously
420  */
421 int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
422                   int flags)
423 {
424         struct rxrpc_call *call;
425         struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
426         struct list_head *l;
427         size_t copied = 0;
428         long timeo;
429         int ret;
430
431         DEFINE_WAIT(wait);
432
433         trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
434
435         if (flags & (MSG_OOB | MSG_TRUNC))
436                 return -EOPNOTSUPP;
437
438         timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
439
440 try_again:
441         lock_sock(&rx->sk);
442
443         /* Return immediately if a client socket has no outstanding calls */
444         if (RB_EMPTY_ROOT(&rx->calls) &&
445             list_empty(&rx->recvmsg_q) &&
446             rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
447                 release_sock(&rx->sk);
448                 return -EAGAIN;
449         }
450
451         if (list_empty(&rx->recvmsg_q)) {
452                 ret = -EWOULDBLOCK;
453                 if (timeo == 0) {
454                         call = NULL;
455                         goto error_no_call;
456                 }
457
458                 release_sock(&rx->sk);
459
460                 /* Wait for something to happen */
461                 prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
462                                           TASK_INTERRUPTIBLE);
463                 ret = sock_error(&rx->sk);
464                 if (ret)
465                         goto wait_error;
466
467                 if (list_empty(&rx->recvmsg_q)) {
468                         if (signal_pending(current))
469                                 goto wait_interrupted;
470                         trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
471                                             0, 0, 0, 0);
472                         timeo = schedule_timeout(timeo);
473                 }
474                 finish_wait(sk_sleep(&rx->sk), &wait);
475                 goto try_again;
476         }
477
478         /* Find the next call and dequeue it if we're not just peeking.  If we
479          * do dequeue it, that comes with a ref that we will need to release.
480          */
481         write_lock_bh(&rx->recvmsg_lock);
482         l = rx->recvmsg_q.next;
483         call = list_entry(l, struct rxrpc_call, recvmsg_link);
484         if (!(flags & MSG_PEEK))
485                 list_del_init(&call->recvmsg_link);
486         else
487                 rxrpc_get_call(call, rxrpc_call_got);
488         write_unlock_bh(&rx->recvmsg_lock);
489
490         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
491
492         /* We're going to drop the socket lock, so we need to lock the call
493          * against interference by sendmsg.
494          */
495         if (!mutex_trylock(&call->user_mutex)) {
496                 ret = -EWOULDBLOCK;
497                 if (flags & MSG_DONTWAIT)
498                         goto error_requeue_call;
499                 ret = -ERESTARTSYS;
500                 if (mutex_lock_interruptible(&call->user_mutex) < 0)
501                         goto error_requeue_call;
502         }
503
504         release_sock(&rx->sk);
505
506         if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
507                 BUG();
508
509         if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
510                 if (flags & MSG_CMSG_COMPAT) {
511                         unsigned int id32 = call->user_call_ID;
512
513                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
514                                        sizeof(unsigned int), &id32);
515                 } else {
516                         unsigned long idl = call->user_call_ID;
517
518                         ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
519                                        sizeof(unsigned long), &idl);
520                 }
521                 if (ret < 0)
522                         goto error_unlock_call;
523         }
524
525         if (msg->msg_name && call->peer) {
526                 struct sockaddr_rxrpc *srx = msg->msg_name;
527                 size_t len = sizeof(call->peer->srx);
528
529                 memcpy(msg->msg_name, &call->peer->srx, len);
530                 srx->srx_service = call->service_id;
531                 msg->msg_namelen = len;
532         }
533
534         switch (READ_ONCE(call->state)) {
535         case RXRPC_CALL_SERVER_ACCEPTING:
536                 ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
537                 break;
538         case RXRPC_CALL_CLIENT_RECV_REPLY:
539         case RXRPC_CALL_SERVER_RECV_REQUEST:
540         case RXRPC_CALL_SERVER_ACK_REQUEST:
541                 ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
542                                          flags, &copied);
543                 if (ret == -EAGAIN)
544                         ret = 0;
545
546                 if (after(call->rx_top, call->rx_hard_ack) &&
547                     call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
548                         rxrpc_notify_socket(call);
549                 break;
550         default:
551                 ret = 0;
552                 break;
553         }
554
555         if (ret < 0)
556                 goto error_unlock_call;
557
558         if (call->state == RXRPC_CALL_COMPLETE) {
559                 ret = rxrpc_recvmsg_term(call, msg);
560                 if (ret < 0)
561                         goto error_unlock_call;
562                 if (!(flags & MSG_PEEK))
563                         rxrpc_release_call(rx, call);
564                 msg->msg_flags |= MSG_EOR;
565                 ret = 1;
566         }
567
568         if (ret == 0)
569                 msg->msg_flags |= MSG_MORE;
570         else
571                 msg->msg_flags &= ~MSG_MORE;
572         ret = copied;
573
574 error_unlock_call:
575         mutex_unlock(&call->user_mutex);
576         rxrpc_put_call(call, rxrpc_call_put);
577         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
578         return ret;
579
580 error_requeue_call:
581         if (!(flags & MSG_PEEK)) {
582                 write_lock_bh(&rx->recvmsg_lock);
583                 list_add(&call->recvmsg_link, &rx->recvmsg_q);
584                 write_unlock_bh(&rx->recvmsg_lock);
585                 trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
586         } else {
587                 rxrpc_put_call(call, rxrpc_call_put);
588         }
589 error_no_call:
590         release_sock(&rx->sk);
591 error_trace:
592         trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
593         return ret;
594
595 wait_interrupted:
596         ret = sock_intr_errno(timeo);
597 wait_error:
598         finish_wait(sk_sleep(&rx->sk), &wait);
599         call = NULL;
600         goto error_trace;
601 }
602
603 /**
604  * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
605  * @sock: The socket that the call exists on
606  * @call: The call to send data through
607  * @buf: The buffer to receive into
608  * @size: The size of the buffer, including data already read
609  * @_offset: The running offset into the buffer.
610  * @want_more: True if more data is expected to be read
611  * @_abort: Where the abort code is stored if -ECONNABORTED is returned
612  *
613  * Allow a kernel service to receive data and pick up information about the
614  * state of a call.  Returns 0 if got what was asked for and there's more
615  * available, 1 if we got what was asked for and we're at the end of the data
616  * and -EAGAIN if we need more data.
617  *
618  * Note that we may return -EAGAIN to drain empty packets at the end of the
619  * data, even if we've already copied over the requested data.
620  *
621  * This function adds the amount it transfers to *_offset, so this should be
622  * precleared as appropriate.  Note that the amount remaining in the buffer is
623  * taken to be size - *_offset.
624  *
625  * *_abort should also be initialised to 0.
626  */
627 int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
628                            void *buf, size_t size, size_t *_offset,
629                            bool want_more, u32 *_abort)
630 {
631         struct iov_iter iter;
632         struct kvec iov;
633         int ret;
634
635         _enter("{%d,%s},%zu/%zu,%d",
636                call->debug_id, rxrpc_call_states[call->state],
637                *_offset, size, want_more);
638
639         ASSERTCMP(*_offset, <=, size);
640         ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
641
642         iov.iov_base = buf + *_offset;
643         iov.iov_len = size - *_offset;
644         iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
645
646         mutex_lock(&call->user_mutex);
647
648         switch (READ_ONCE(call->state)) {
649         case RXRPC_CALL_CLIENT_RECV_REPLY:
650         case RXRPC_CALL_SERVER_RECV_REQUEST:
651         case RXRPC_CALL_SERVER_ACK_REQUEST:
652                 ret = rxrpc_recvmsg_data(sock, call, NULL, &iter, size, 0,
653                                          _offset);
654                 if (ret < 0)
655                         goto out;
656
657                 /* We can only reach here with a partially full buffer if we
658                  * have reached the end of the data.  We must otherwise have a
659                  * full buffer or have been given -EAGAIN.
660                  */
661                 if (ret == 1) {
662                         if (*_offset < size)
663                                 goto short_data;
664                         if (!want_more)
665                                 goto read_phase_complete;
666                         ret = 0;
667                         goto out;
668                 }
669
670                 if (!want_more)
671                         goto excess_data;
672                 goto out;
673
674         case RXRPC_CALL_COMPLETE:
675                 goto call_complete;
676
677         default:
678                 ret = -EINPROGRESS;
679                 goto out;
680         }
681
682 read_phase_complete:
683         ret = 1;
684 out:
685         mutex_unlock(&call->user_mutex);
686         _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
687         return ret;
688
689 short_data:
690         trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
691         ret = -EBADMSG;
692         goto out;
693 excess_data:
694         trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
695         ret = -EMSGSIZE;
696         goto out;
697 call_complete:
698         *_abort = call->abort_code;
699         ret = call->error;
700         if (call->completion == RXRPC_CALL_SUCCEEDED) {
701                 ret = 1;
702                 if (size > 0)
703                         ret = -ECONNRESET;
704         }
705         goto out;
706 }
707 EXPORT_SYMBOL(rxrpc_kernel_recv_data);