GNU Linux-libre 4.9.309-gnu1
[releases.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2016, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include "core.h"
39 #include "name_table.h"
40 #include "node.h"
41 #include "link.h"
42 #include "name_distr.h"
43 #include "socket.h"
44 #include "bcast.h"
45 #include "netlink.h"
46
47 #define SS_LISTENING            -1      /* socket is listening */
48 #define SS_READY                -2      /* socket is connectionless */
49
50 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
51 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
52 #define TIPC_FWD_MSG            1
53 #define TIPC_CONN_OK            0
54 #define TIPC_CONN_PROBING       1
55 #define TIPC_MAX_PORT           0xffffffff
56 #define TIPC_MIN_PORT           1
57
58 /**
59  * struct tipc_sock - TIPC socket structure
60  * @sk: socket - interacts with 'port' and with user via the socket API
61  * @connected: non-zero if port is currently connected to a peer port
62  * @conn_type: TIPC type used when connection was established
63  * @conn_instance: TIPC instance used when connection was established
64  * @published: non-zero if port has one or more associated names
65  * @max_pkt: maximum packet size "hint" used when building messages sent by port
66  * @portid: unique port identity in TIPC socket hash table
67  * @phdr: preformatted message header used when sending messages
68  * @port_list: adjacent ports in TIPC's global list of ports
69  * @publications: list of publications for port
70  * @pub_count: total # of publications port has made during its lifetime
71  * @probing_state:
72  * @probing_intv:
73  * @conn_timeout: the time we can wait for an unresponded setup request
74  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
75  * @link_cong: non-zero if owner must sleep because of link congestion
76  * @sent_unacked: # messages sent by socket, and not yet acked by peer
77  * @rcv_unacked: # messages read by user, but not yet acked back to peer
78  * @remote: 'connected' peer for dgram/rdm
79  * @node: hash table node
80  * @rcu: rcu struct for tipc_sock
81  */
82 struct tipc_sock {
83         struct sock sk;
84         int connected;
85         u32 conn_type;
86         u32 conn_instance;
87         int published;
88         u32 max_pkt;
89         u32 portid;
90         struct tipc_msg phdr;
91         struct list_head sock_list;
92         struct list_head publications;
93         u32 pub_count;
94         u32 probing_state;
95         unsigned long probing_intv;
96         uint conn_timeout;
97         atomic_t dupl_rcvcnt;
98         bool link_cong;
99         u16 snt_unacked;
100         u16 snd_win;
101         u16 peer_caps;
102         u16 rcv_unacked;
103         u16 rcv_win;
104         struct sockaddr_tipc remote;
105         struct rhash_head node;
106         struct rcu_head rcu;
107 };
108
109 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
110 static void tipc_data_ready(struct sock *sk);
111 static void tipc_write_space(struct sock *sk);
112 static void tipc_sock_destruct(struct sock *sk);
113 static int tipc_release(struct socket *sock);
114 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
115 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
116 static void tipc_sk_timeout(unsigned long data);
117 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
118                            struct tipc_name_seq const *seq);
119 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
120                             struct tipc_name_seq const *seq);
121 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
122 static int tipc_sk_insert(struct tipc_sock *tsk);
123 static void tipc_sk_remove(struct tipc_sock *tsk);
124 static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
125                               size_t dsz);
126 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
127
128 static const struct proto_ops packet_ops;
129 static const struct proto_ops stream_ops;
130 static const struct proto_ops msg_ops;
131 static struct proto tipc_proto;
132 static const struct rhashtable_params tsk_rht_params;
133
134 static u32 tsk_own_node(struct tipc_sock *tsk)
135 {
136         return msg_prevnode(&tsk->phdr);
137 }
138
139 static u32 tsk_peer_node(struct tipc_sock *tsk)
140 {
141         return msg_destnode(&tsk->phdr);
142 }
143
144 static u32 tsk_peer_port(struct tipc_sock *tsk)
145 {
146         return msg_destport(&tsk->phdr);
147 }
148
149 static  bool tsk_unreliable(struct tipc_sock *tsk)
150 {
151         return msg_src_droppable(&tsk->phdr) != 0;
152 }
153
154 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
155 {
156         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
157 }
158
159 static bool tsk_unreturnable(struct tipc_sock *tsk)
160 {
161         return msg_dest_droppable(&tsk->phdr) != 0;
162 }
163
164 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
165 {
166         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
167 }
168
169 static int tsk_importance(struct tipc_sock *tsk)
170 {
171         return msg_importance(&tsk->phdr);
172 }
173
174 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
175 {
176         if (imp > TIPC_CRITICAL_IMPORTANCE)
177                 return -EINVAL;
178         msg_set_importance(&tsk->phdr, (u32)imp);
179         return 0;
180 }
181
182 static struct tipc_sock *tipc_sk(const struct sock *sk)
183 {
184         return container_of(sk, struct tipc_sock, sk);
185 }
186
187 static bool tsk_conn_cong(struct tipc_sock *tsk)
188 {
189         return tsk->snt_unacked > tsk->snd_win;
190 }
191
192 /* tsk_blocks(): translate a buffer size in bytes to number of
193  * advertisable blocks, taking into account the ratio truesize(len)/len
194  * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
195  */
196 static u16 tsk_adv_blocks(int len)
197 {
198         return len / FLOWCTL_BLK_SZ / 4;
199 }
200
201 /* tsk_inc(): increment counter for sent or received data
202  * - If block based flow control is not supported by peer we
203  *   fall back to message based ditto, incrementing the counter
204  */
205 static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
206 {
207         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
208                 return ((msglen / FLOWCTL_BLK_SZ) + 1);
209         return 1;
210 }
211
212 /**
213  * tsk_advance_rx_queue - discard first buffer in socket receive queue
214  *
215  * Caller must hold socket lock
216  */
217 static void tsk_advance_rx_queue(struct sock *sk)
218 {
219         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
220 }
221
222 /* tipc_sk_respond() : send response message back to sender
223  */
224 static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
225 {
226         u32 selector;
227         u32 dnode;
228         u32 onode = tipc_own_addr(sock_net(sk));
229
230         if (!tipc_msg_reverse(onode, &skb, err))
231                 return;
232
233         dnode = msg_destnode(buf_msg(skb));
234         selector = msg_origport(buf_msg(skb));
235         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
236 }
237
238 /**
239  * tsk_rej_rx_queue - reject all buffers in socket receive queue
240  *
241  * Caller must hold socket lock
242  */
243 static void tsk_rej_rx_queue(struct sock *sk)
244 {
245         struct sk_buff *skb;
246
247         while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
248                 tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
249 }
250
251 /* tsk_peer_msg - verify if message was sent by connected port's peer
252  *
253  * Handles cases where the node's network address has changed from
254  * the default of <0.0.0> to its configured setting.
255  */
256 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
257 {
258         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
259         u32 peer_port = tsk_peer_port(tsk);
260         u32 orig_node;
261         u32 peer_node;
262
263         if (unlikely(!tsk->connected))
264                 return false;
265
266         if (unlikely(msg_origport(msg) != peer_port))
267                 return false;
268
269         orig_node = msg_orignode(msg);
270         peer_node = tsk_peer_node(tsk);
271
272         if (likely(orig_node == peer_node))
273                 return true;
274
275         if (!orig_node && (peer_node == tn->own_addr))
276                 return true;
277
278         if (!peer_node && (orig_node == tn->own_addr))
279                 return true;
280
281         return false;
282 }
283
284 /**
285  * tipc_sk_create - create a TIPC socket
286  * @net: network namespace (must be default network)
287  * @sock: pre-allocated socket structure
288  * @protocol: protocol indicator (must be 0)
289  * @kern: caused by kernel or by userspace?
290  *
291  * This routine creates additional data structures used by the TIPC socket,
292  * initializes them, and links them together.
293  *
294  * Returns 0 on success, errno otherwise
295  */
296 static int tipc_sk_create(struct net *net, struct socket *sock,
297                           int protocol, int kern)
298 {
299         struct tipc_net *tn;
300         const struct proto_ops *ops;
301         socket_state state;
302         struct sock *sk;
303         struct tipc_sock *tsk;
304         struct tipc_msg *msg;
305
306         /* Validate arguments */
307         if (unlikely(protocol != 0))
308                 return -EPROTONOSUPPORT;
309
310         switch (sock->type) {
311         case SOCK_STREAM:
312                 ops = &stream_ops;
313                 state = SS_UNCONNECTED;
314                 break;
315         case SOCK_SEQPACKET:
316                 ops = &packet_ops;
317                 state = SS_UNCONNECTED;
318                 break;
319         case SOCK_DGRAM:
320         case SOCK_RDM:
321                 ops = &msg_ops;
322                 state = SS_READY;
323                 break;
324         default:
325                 return -EPROTOTYPE;
326         }
327
328         /* Allocate socket's protocol area */
329         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
330         if (sk == NULL)
331                 return -ENOMEM;
332
333         tsk = tipc_sk(sk);
334         tsk->max_pkt = MAX_PKT_DEFAULT;
335         INIT_LIST_HEAD(&tsk->publications);
336         msg = &tsk->phdr;
337         tn = net_generic(sock_net(sk), tipc_net_id);
338
339         /* Finish initializing socket data structures */
340         sock->ops = ops;
341         sock->state = state;
342         sock_init_data(sock, sk);
343         if (tipc_sk_insert(tsk)) {
344                 pr_warn("Socket create failed; port number exhausted\n");
345                 return -EINVAL;
346         }
347
348         /* Ensure tsk is visible before we read own_addr. */
349         smp_mb();
350
351         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
352                       NAMED_H_SIZE, 0);
353
354         msg_set_origport(msg, tsk->portid);
355         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
356         sk->sk_backlog_rcv = tipc_backlog_rcv;
357         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
358         sk->sk_data_ready = tipc_data_ready;
359         sk->sk_write_space = tipc_write_space;
360         sk->sk_destruct = tipc_sock_destruct;
361         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
362         atomic_set(&tsk->dupl_rcvcnt, 0);
363
364         /* Start out with safe limits until we receive an advertised window */
365         tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
366         tsk->rcv_win = tsk->snd_win;
367
368         if (sock->state == SS_READY) {
369                 tsk_set_unreturnable(tsk, true);
370                 if (sock->type == SOCK_DGRAM)
371                         tsk_set_unreliable(tsk, true);
372         }
373         return 0;
374 }
375
376 static void tipc_sk_callback(struct rcu_head *head)
377 {
378         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
379
380         sock_put(&tsk->sk);
381 }
382
383 /**
384  * tipc_release - destroy a TIPC socket
385  * @sock: socket to destroy
386  *
387  * This routine cleans up any messages that are still queued on the socket.
388  * For DGRAM and RDM socket types, all queued messages are rejected.
389  * For SEQPACKET and STREAM socket types, the first message is rejected
390  * and any others are discarded.  (If the first message on a STREAM socket
391  * is partially-read, it is discarded and the next one is rejected instead.)
392  *
393  * NOTE: Rejected messages are not necessarily returned to the sender!  They
394  * are returned or discarded according to the "destination droppable" setting
395  * specified for the message by the sender.
396  *
397  * Returns 0 on success, errno otherwise
398  */
399 static int tipc_release(struct socket *sock)
400 {
401         struct sock *sk = sock->sk;
402         struct net *net;
403         struct tipc_sock *tsk;
404         struct sk_buff *skb;
405         u32 dnode;
406
407         /*
408          * Exit if socket isn't fully initialized (occurs when a failed accept()
409          * releases a pre-allocated child socket that was never used)
410          */
411         if (sk == NULL)
412                 return 0;
413
414         net = sock_net(sk);
415         tsk = tipc_sk(sk);
416         lock_sock(sk);
417
418         /*
419          * Reject all unreceived messages, except on an active connection
420          * (which disconnects locally & sends a 'FIN+' to peer)
421          */
422         dnode = tsk_peer_node(tsk);
423         while (sock->state != SS_DISCONNECTING) {
424                 skb = __skb_dequeue(&sk->sk_receive_queue);
425                 if (skb == NULL)
426                         break;
427                 if (TIPC_SKB_CB(skb)->handle != NULL)
428                         kfree_skb(skb);
429                 else {
430                         if ((sock->state == SS_CONNECTING) ||
431                             (sock->state == SS_CONNECTED)) {
432                                 sock->state = SS_DISCONNECTING;
433                                 tsk->connected = 0;
434                                 tipc_node_remove_conn(net, dnode, tsk->portid);
435                         }
436                         tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
437                 }
438         }
439
440         tipc_sk_withdraw(tsk, 0, NULL);
441         sk_stop_timer(sk, &sk->sk_timer);
442         tipc_sk_remove(tsk);
443         if (tsk->connected) {
444                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
445                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
446                                       tsk_own_node(tsk), tsk_peer_port(tsk),
447                                       tsk->portid, TIPC_ERR_NO_PORT);
448                 if (skb)
449                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
450                 tipc_node_remove_conn(net, dnode, tsk->portid);
451         }
452
453         /* Reject any messages that accumulated in backlog queue */
454         sock->state = SS_DISCONNECTING;
455         release_sock(sk);
456
457         call_rcu(&tsk->rcu, tipc_sk_callback);
458         sock->sk = NULL;
459
460         return 0;
461 }
462
463 /**
464  * tipc_bind - associate or disassocate TIPC name(s) with a socket
465  * @sock: socket structure
466  * @uaddr: socket address describing name(s) and desired operation
467  * @uaddr_len: size of socket address data structure
468  *
469  * Name and name sequence binding is indicated using a positive scope value;
470  * a negative scope value unbinds the specified name.  Specifying no name
471  * (i.e. a socket address length of 0) unbinds all names from the socket.
472  *
473  * Returns 0 on success, errno otherwise
474  *
475  * NOTE: This routine doesn't need to take the socket lock since it doesn't
476  *       access any non-constant socket information.
477  */
478 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
479                      int uaddr_len)
480 {
481         struct sock *sk = sock->sk;
482         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
483         struct tipc_sock *tsk = tipc_sk(sk);
484         int res = -EINVAL;
485
486         lock_sock(sk);
487         if (unlikely(!uaddr_len)) {
488                 res = tipc_sk_withdraw(tsk, 0, NULL);
489                 goto exit;
490         }
491
492         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
493                 res = -EINVAL;
494                 goto exit;
495         }
496         if (addr->family != AF_TIPC) {
497                 res = -EAFNOSUPPORT;
498                 goto exit;
499         }
500
501         if (addr->addrtype == TIPC_ADDR_NAME)
502                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
503         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
504                 res = -EAFNOSUPPORT;
505                 goto exit;
506         }
507
508         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
509             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
510             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
511                 res = -EACCES;
512                 goto exit;
513         }
514
515         res = (addr->scope > 0) ?
516                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
517                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
518 exit:
519         release_sock(sk);
520         return res;
521 }
522
523 /**
524  * tipc_getname - get port ID of socket or peer socket
525  * @sock: socket structure
526  * @uaddr: area for returned socket address
527  * @uaddr_len: area for returned length of socket address
528  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
529  *
530  * Returns 0 on success, errno otherwise
531  *
532  * NOTE: This routine doesn't need to take the socket lock since it only
533  *       accesses socket information that is unchanging (or which changes in
534  *       a completely predictable manner).
535  */
536 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
537                         int *uaddr_len, int peer)
538 {
539         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
540         struct tipc_sock *tsk = tipc_sk(sock->sk);
541         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
542
543         memset(addr, 0, sizeof(*addr));
544         if (peer) {
545                 if ((sock->state != SS_CONNECTED) &&
546                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
547                         return -ENOTCONN;
548                 addr->addr.id.ref = tsk_peer_port(tsk);
549                 addr->addr.id.node = tsk_peer_node(tsk);
550         } else {
551                 addr->addr.id.ref = tsk->portid;
552                 addr->addr.id.node = tn->own_addr;
553         }
554
555         *uaddr_len = sizeof(*addr);
556         addr->addrtype = TIPC_ADDR_ID;
557         addr->family = AF_TIPC;
558         addr->scope = 0;
559         addr->addr.name.domain = 0;
560
561         return 0;
562 }
563
564 /**
565  * tipc_poll - read and possibly block on pollmask
566  * @file: file structure associated with the socket
567  * @sock: socket for which to calculate the poll bits
568  * @wait: ???
569  *
570  * Returns pollmask value
571  *
572  * COMMENTARY:
573  * It appears that the usual socket locking mechanisms are not useful here
574  * since the pollmask info is potentially out-of-date the moment this routine
575  * exits.  TCP and other protocols seem to rely on higher level poll routines
576  * to handle any preventable race conditions, so TIPC will do the same ...
577  *
578  * TIPC sets the returned events as follows:
579  *
580  * socket state         flags set
581  * ------------         ---------
582  * unconnected          no read flags
583  *                      POLLOUT if port is not congested
584  *
585  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
586  *                      no write flags
587  *
588  * connected            POLLIN/POLLRDNORM if data in rx queue
589  *                      POLLOUT if port is not congested
590  *
591  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
592  *                      no write flags
593  *
594  * listening            POLLIN if SYN in rx queue
595  *                      no write flags
596  *
597  * ready                POLLIN/POLLRDNORM if data in rx queue
598  * [connectionless]     POLLOUT (since port cannot be congested)
599  *
600  * IMPORTANT: The fact that a read or write operation is indicated does NOT
601  * imply that the operation will succeed, merely that it should be performed
602  * and will not block.
603  */
604 static unsigned int tipc_poll(struct file *file, struct socket *sock,
605                               poll_table *wait)
606 {
607         struct sock *sk = sock->sk;
608         struct tipc_sock *tsk = tipc_sk(sk);
609         u32 mask = 0;
610
611         sock_poll_wait(file, sk_sleep(sk), wait);
612
613         switch ((int)sock->state) {
614         case SS_UNCONNECTED:
615                 if (!tsk->link_cong)
616                         mask |= POLLOUT;
617                 break;
618         case SS_READY:
619         case SS_CONNECTED:
620                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
621                         mask |= POLLOUT;
622                 /* fall thru' */
623         case SS_CONNECTING:
624         case SS_LISTENING:
625                 if (!skb_queue_empty(&sk->sk_receive_queue))
626                         mask |= (POLLIN | POLLRDNORM);
627                 break;
628         case SS_DISCONNECTING:
629                 mask = (POLLIN | POLLRDNORM | POLLHUP);
630                 break;
631         }
632
633         return mask;
634 }
635
636 /**
637  * tipc_sendmcast - send multicast message
638  * @sock: socket structure
639  * @seq: destination address
640  * @msg: message to send
641  * @dsz: total length of message data
642  * @timeo: timeout to wait for wakeup
643  *
644  * Called from function tipc_sendmsg(), which has done all sanity checks
645  * Returns the number of bytes sent on success, or errno
646  */
647 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
648                           struct msghdr *msg, size_t dsz, long timeo)
649 {
650         struct sock *sk = sock->sk;
651         struct tipc_sock *tsk = tipc_sk(sk);
652         struct net *net = sock_net(sk);
653         struct tipc_msg *mhdr = &tsk->phdr;
654         struct sk_buff_head pktchain;
655         struct iov_iter save = msg->msg_iter;
656         uint mtu;
657         int rc;
658
659         msg_set_type(mhdr, TIPC_MCAST_MSG);
660         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
661         msg_set_destport(mhdr, 0);
662         msg_set_destnode(mhdr, 0);
663         msg_set_nametype(mhdr, seq->type);
664         msg_set_namelower(mhdr, seq->lower);
665         msg_set_nameupper(mhdr, seq->upper);
666         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
667
668         skb_queue_head_init(&pktchain);
669
670 new_mtu:
671         mtu = tipc_bcast_get_mtu(net);
672         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
673         if (unlikely(rc < 0))
674                 return rc;
675
676         do {
677                 rc = tipc_bcast_xmit(net, &pktchain);
678                 if (likely(!rc))
679                         return dsz;
680
681                 if (rc == -ELINKCONG) {
682                         tsk->link_cong = 1;
683                         rc = tipc_wait_for_sndmsg(sock, &timeo);
684                         if (!rc)
685                                 continue;
686                 }
687                 __skb_queue_purge(&pktchain);
688                 if (rc == -EMSGSIZE) {
689                         msg->msg_iter = save;
690                         goto new_mtu;
691                 }
692                 break;
693         } while (1);
694         return rc;
695 }
696
697 /**
698  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
699  * @arrvq: queue with arriving messages, to be cloned after destination lookup
700  * @inputq: queue with cloned messages, delivered to socket after dest lookup
701  *
702  * Multi-threaded: parallel calls with reference to same queues may occur
703  */
704 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
705                        struct sk_buff_head *inputq)
706 {
707         struct tipc_msg *msg;
708         struct tipc_plist dports;
709         u32 portid;
710         u32 scope = TIPC_CLUSTER_SCOPE;
711         struct sk_buff_head tmpq;
712         uint hsz;
713         struct sk_buff *skb, *_skb;
714
715         __skb_queue_head_init(&tmpq);
716         tipc_plist_init(&dports);
717
718         skb = tipc_skb_peek(arrvq, &inputq->lock);
719         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
720                 msg = buf_msg(skb);
721                 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
722
723                 if (in_own_node(net, msg_orignode(msg)))
724                         scope = TIPC_NODE_SCOPE;
725
726                 /* Create destination port list and message clones: */
727                 tipc_nametbl_mc_translate(net,
728                                           msg_nametype(msg), msg_namelower(msg),
729                                           msg_nameupper(msg), scope, &dports);
730                 portid = tipc_plist_pop(&dports);
731                 for (; portid; portid = tipc_plist_pop(&dports)) {
732                         _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
733                         if (_skb) {
734                                 msg_set_destport(buf_msg(_skb), portid);
735                                 __skb_queue_tail(&tmpq, _skb);
736                                 continue;
737                         }
738                         pr_warn("Failed to clone mcast rcv buffer\n");
739                 }
740                 /* Append to inputq if not already done by other thread */
741                 spin_lock_bh(&inputq->lock);
742                 if (skb_peek(arrvq) == skb) {
743                         skb_queue_splice_tail_init(&tmpq, inputq);
744                         /* Decrease the skb's refcnt as increasing in the
745                          * function tipc_skb_peek
746                          */
747                         kfree_skb(__skb_dequeue(arrvq));
748                 }
749                 spin_unlock_bh(&inputq->lock);
750                 __skb_queue_purge(&tmpq);
751                 kfree_skb(skb);
752         }
753         tipc_sk_rcv(net, inputq);
754 }
755
756 /**
757  * tipc_sk_proto_rcv - receive a connection mng protocol message
758  * @tsk: receiving socket
759  * @skb: pointer to message buffer.
760  */
761 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
762                               struct sk_buff_head *xmitq)
763 {
764         struct sock *sk = &tsk->sk;
765         u32 onode = tsk_own_node(tsk);
766         struct tipc_msg *hdr = buf_msg(skb);
767         int mtyp = msg_type(hdr);
768         bool conn_cong;
769
770         /* Ignore if connection cannot be validated: */
771         if (!tsk_peer_msg(tsk, hdr))
772                 goto exit;
773
774         tsk->probing_state = TIPC_CONN_OK;
775
776         if (mtyp == CONN_PROBE) {
777                 msg_set_type(hdr, CONN_PROBE_REPLY);
778                 if (tipc_msg_reverse(onode, &skb, TIPC_OK))
779                         __skb_queue_tail(xmitq, skb);
780                 return;
781         } else if (mtyp == CONN_ACK) {
782                 conn_cong = tsk_conn_cong(tsk);
783                 tsk->snt_unacked -= msg_conn_ack(hdr);
784                 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
785                         tsk->snd_win = msg_adv_win(hdr);
786                 if (conn_cong)
787                         sk->sk_write_space(sk);
788         } else if (mtyp != CONN_PROBE_REPLY) {
789                 pr_warn("Received unknown CONN_PROTO msg\n");
790         }
791 exit:
792         kfree_skb(skb);
793 }
794
795 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
796 {
797         struct sock *sk = sock->sk;
798         struct tipc_sock *tsk = tipc_sk(sk);
799         DEFINE_WAIT(wait);
800         int done;
801
802         do {
803                 int err = sock_error(sk);
804                 if (err)
805                         return err;
806                 if (sock->state == SS_DISCONNECTING)
807                         return -EPIPE;
808                 if (!*timeo_p)
809                         return -EAGAIN;
810                 if (signal_pending(current))
811                         return sock_intr_errno(*timeo_p);
812
813                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
814                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
815                 finish_wait(sk_sleep(sk), &wait);
816         } while (!done);
817         return 0;
818 }
819
820 /**
821  * tipc_sendmsg - send message in connectionless manner
822  * @sock: socket structure
823  * @m: message to send
824  * @dsz: amount of user data to be sent
825  *
826  * Message must have an destination specified explicitly.
827  * Used for SOCK_RDM and SOCK_DGRAM messages,
828  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
829  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
830  *
831  * Returns the number of bytes sent on success, or errno otherwise
832  */
833 static int tipc_sendmsg(struct socket *sock,
834                         struct msghdr *m, size_t dsz)
835 {
836         struct sock *sk = sock->sk;
837         int ret;
838
839         lock_sock(sk);
840         ret = __tipc_sendmsg(sock, m, dsz);
841         release_sock(sk);
842
843         return ret;
844 }
845
846 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
847 {
848         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
849         struct sock *sk = sock->sk;
850         struct tipc_sock *tsk = tipc_sk(sk);
851         struct net *net = sock_net(sk);
852         struct tipc_msg *mhdr = &tsk->phdr;
853         u32 dnode, dport;
854         struct sk_buff_head pktchain;
855         struct sk_buff *skb;
856         struct tipc_name_seq *seq;
857         struct iov_iter save;
858         u32 mtu;
859         long timeo;
860         int rc;
861
862         if (dsz > TIPC_MAX_USER_MSG_SIZE)
863                 return -EMSGSIZE;
864         if (unlikely(!dest)) {
865                 if (tsk->connected && sock->state == SS_READY)
866                         dest = &tsk->remote;
867                 else
868                         return -EDESTADDRREQ;
869         } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
870                    dest->family != AF_TIPC) {
871                 return -EINVAL;
872         }
873         if (unlikely(sock->state != SS_READY)) {
874                 if (sock->state == SS_LISTENING)
875                         return -EPIPE;
876                 if (sock->state != SS_UNCONNECTED)
877                         return -EISCONN;
878                 if (tsk->published)
879                         return -EOPNOTSUPP;
880                 if (dest->addrtype == TIPC_ADDR_NAME) {
881                         tsk->conn_type = dest->addr.name.name.type;
882                         tsk->conn_instance = dest->addr.name.name.instance;
883                 }
884         }
885         seq = &dest->addr.nameseq;
886         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
887
888         if (dest->addrtype == TIPC_ADDR_MCAST) {
889                 return tipc_sendmcast(sock, seq, m, dsz, timeo);
890         } else if (dest->addrtype == TIPC_ADDR_NAME) {
891                 u32 type = dest->addr.name.name.type;
892                 u32 inst = dest->addr.name.name.instance;
893                 u32 domain = dest->addr.name.domain;
894
895                 dnode = domain;
896                 msg_set_type(mhdr, TIPC_NAMED_MSG);
897                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
898                 msg_set_nametype(mhdr, type);
899                 msg_set_nameinst(mhdr, inst);
900                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
901                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
902                 msg_set_destnode(mhdr, dnode);
903                 msg_set_destport(mhdr, dport);
904                 if (unlikely(!dport && !dnode))
905                         return -EHOSTUNREACH;
906         } else if (dest->addrtype == TIPC_ADDR_ID) {
907                 dnode = dest->addr.id.node;
908                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
909                 msg_set_lookup_scope(mhdr, 0);
910                 msg_set_destnode(mhdr, dnode);
911                 msg_set_destport(mhdr, dest->addr.id.ref);
912                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
913         }
914
915         skb_queue_head_init(&pktchain);
916         save = m->msg_iter;
917 new_mtu:
918         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
919         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
920         if (rc < 0)
921                 return rc;
922
923         do {
924                 skb = skb_peek(&pktchain);
925                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
926                 rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
927                 if (likely(!rc)) {
928                         if (sock->state != SS_READY)
929                                 sock->state = SS_CONNECTING;
930                         return dsz;
931                 }
932                 if (rc == -ELINKCONG) {
933                         tsk->link_cong = 1;
934                         rc = tipc_wait_for_sndmsg(sock, &timeo);
935                         if (!rc)
936                                 continue;
937                 }
938                 __skb_queue_purge(&pktchain);
939                 if (rc == -EMSGSIZE) {
940                         m->msg_iter = save;
941                         goto new_mtu;
942                 }
943                 break;
944         } while (1);
945
946         return rc;
947 }
948
949 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
950 {
951         struct sock *sk = sock->sk;
952         struct tipc_sock *tsk = tipc_sk(sk);
953         DEFINE_WAIT(wait);
954         int done;
955
956         do {
957                 int err = sock_error(sk);
958                 if (err)
959                         return err;
960                 if (sock->state == SS_DISCONNECTING)
961                         return -EPIPE;
962                 else if (sock->state != SS_CONNECTED)
963                         return -ENOTCONN;
964                 if (!*timeo_p)
965                         return -EAGAIN;
966                 if (signal_pending(current))
967                         return sock_intr_errno(*timeo_p);
968
969                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
970                 done = sk_wait_event(sk, timeo_p,
971                                      (!tsk->link_cong &&
972                                       !tsk_conn_cong(tsk)) ||
973                                      !tsk->connected);
974                 finish_wait(sk_sleep(sk), &wait);
975         } while (!done);
976         return 0;
977 }
978
979 /**
980  * tipc_send_stream - send stream-oriented data
981  * @sock: socket structure
982  * @m: data to send
983  * @dsz: total length of data to be transmitted
984  *
985  * Used for SOCK_STREAM data.
986  *
987  * Returns the number of bytes sent on success (or partial success),
988  * or errno if no data sent
989  */
990 static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
991 {
992         struct sock *sk = sock->sk;
993         int ret;
994
995         lock_sock(sk);
996         ret = __tipc_send_stream(sock, m, dsz);
997         release_sock(sk);
998
999         return ret;
1000 }
1001
1002 static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1003 {
1004         struct sock *sk = sock->sk;
1005         struct net *net = sock_net(sk);
1006         struct tipc_sock *tsk = tipc_sk(sk);
1007         struct tipc_msg *mhdr = &tsk->phdr;
1008         struct sk_buff_head pktchain;
1009         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1010         u32 portid = tsk->portid;
1011         int rc = -EINVAL;
1012         long timeo;
1013         u32 dnode;
1014         uint mtu, send, sent = 0;
1015         struct iov_iter save;
1016         int hlen = MIN_H_SIZE;
1017
1018         /* Handle implied connection establishment */
1019         if (unlikely(dest)) {
1020                 rc = __tipc_sendmsg(sock, m, dsz);
1021                 hlen = msg_hdr_sz(mhdr);
1022                 if (dsz && (dsz == rc))
1023                         tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1024                 return rc;
1025         }
1026         if (dsz > (uint)INT_MAX)
1027                 return -EMSGSIZE;
1028
1029         if (unlikely(sock->state != SS_CONNECTED)) {
1030                 if (sock->state == SS_DISCONNECTING)
1031                         return -EPIPE;
1032                 else
1033                         return -ENOTCONN;
1034         }
1035
1036         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1037         dnode = tsk_peer_node(tsk);
1038         skb_queue_head_init(&pktchain);
1039
1040 next:
1041         save = m->msg_iter;
1042         mtu = tsk->max_pkt;
1043         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1044         rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
1045         if (unlikely(rc < 0))
1046                 return rc;
1047
1048         do {
1049                 if (likely(!tsk_conn_cong(tsk))) {
1050                         rc = tipc_node_xmit(net, &pktchain, dnode, portid);
1051                         if (likely(!rc)) {
1052                                 tsk->snt_unacked += tsk_inc(tsk, send + hlen);
1053                                 sent += send;
1054                                 if (sent == dsz)
1055                                         return dsz;
1056                                 goto next;
1057                         }
1058                         if (rc == -EMSGSIZE) {
1059                                 __skb_queue_purge(&pktchain);
1060                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1061                                                                  portid);
1062                                 m->msg_iter = save;
1063                                 goto next;
1064                         }
1065                         if (rc != -ELINKCONG)
1066                                 break;
1067
1068                         tsk->link_cong = 1;
1069                 }
1070                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1071         } while (!rc);
1072
1073         __skb_queue_purge(&pktchain);
1074         return sent ? sent : rc;
1075 }
1076
1077 /**
1078  * tipc_send_packet - send a connection-oriented message
1079  * @sock: socket structure
1080  * @m: message to send
1081  * @dsz: length of data to be transmitted
1082  *
1083  * Used for SOCK_SEQPACKET messages.
1084  *
1085  * Returns the number of bytes sent on success, or errno otherwise
1086  */
1087 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1088 {
1089         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1090                 return -EMSGSIZE;
1091
1092         return tipc_send_stream(sock, m, dsz);
1093 }
1094
1095 /* tipc_sk_finish_conn - complete the setup of a connection
1096  */
1097 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1098                                 u32 peer_node)
1099 {
1100         struct sock *sk = &tsk->sk;
1101         struct net *net = sock_net(sk);
1102         struct tipc_msg *msg = &tsk->phdr;
1103
1104         msg_set_destnode(msg, peer_node);
1105         msg_set_destport(msg, peer_port);
1106         msg_set_type(msg, TIPC_CONN_MSG);
1107         msg_set_lookup_scope(msg, 0);
1108         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1109
1110         tsk->probing_intv = CONN_PROBING_INTERVAL;
1111         tsk->probing_state = TIPC_CONN_OK;
1112         tsk->connected = 1;
1113         sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1114         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1115         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1116         tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1117         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1118                 return;
1119
1120         /* Fall back to message based flow control */
1121         tsk->rcv_win = FLOWCTL_MSG_WIN;
1122         tsk->snd_win = FLOWCTL_MSG_WIN;
1123 }
1124
1125 /**
1126  * set_orig_addr - capture sender's address for received message
1127  * @m: descriptor for message info
1128  * @msg: received message header
1129  *
1130  * Note: Address is not captured if not requested by receiver.
1131  */
1132 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1133 {
1134         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1135
1136         if (addr) {
1137                 addr->family = AF_TIPC;
1138                 addr->addrtype = TIPC_ADDR_ID;
1139                 memset(&addr->addr, 0, sizeof(addr->addr));
1140                 addr->addr.id.ref = msg_origport(msg);
1141                 addr->addr.id.node = msg_orignode(msg);
1142                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1143                 addr->scope = 0;                /* could leave uninitialized */
1144                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1145         }
1146 }
1147
1148 /**
1149  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1150  * @m: descriptor for message info
1151  * @msg: received message header
1152  * @tsk: TIPC port associated with message
1153  *
1154  * Note: Ancillary data is not captured if not requested by receiver.
1155  *
1156  * Returns 0 if successful, otherwise errno
1157  */
1158 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1159                                  struct tipc_sock *tsk)
1160 {
1161         u32 anc_data[3];
1162         u32 err;
1163         u32 dest_type;
1164         int has_name;
1165         int res;
1166
1167         if (likely(m->msg_controllen == 0))
1168                 return 0;
1169
1170         /* Optionally capture errored message object(s) */
1171         err = msg ? msg_errcode(msg) : 0;
1172         if (unlikely(err)) {
1173                 anc_data[0] = err;
1174                 anc_data[1] = msg_data_sz(msg);
1175                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1176                 if (res)
1177                         return res;
1178                 if (anc_data[1]) {
1179                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1180                                        msg_data(msg));
1181                         if (res)
1182                                 return res;
1183                 }
1184         }
1185
1186         /* Optionally capture message destination object */
1187         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1188         switch (dest_type) {
1189         case TIPC_NAMED_MSG:
1190                 has_name = 1;
1191                 anc_data[0] = msg_nametype(msg);
1192                 anc_data[1] = msg_namelower(msg);
1193                 anc_data[2] = msg_namelower(msg);
1194                 break;
1195         case TIPC_MCAST_MSG:
1196                 has_name = 1;
1197                 anc_data[0] = msg_nametype(msg);
1198                 anc_data[1] = msg_namelower(msg);
1199                 anc_data[2] = msg_nameupper(msg);
1200                 break;
1201         case TIPC_CONN_MSG:
1202                 has_name = (tsk->conn_type != 0);
1203                 anc_data[0] = tsk->conn_type;
1204                 anc_data[1] = tsk->conn_instance;
1205                 anc_data[2] = tsk->conn_instance;
1206                 break;
1207         default:
1208                 has_name = 0;
1209         }
1210         if (has_name) {
1211                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1212                 if (res)
1213                         return res;
1214         }
1215
1216         return 0;
1217 }
1218
1219 static void tipc_sk_send_ack(struct tipc_sock *tsk)
1220 {
1221         struct net *net = sock_net(&tsk->sk);
1222         struct sk_buff *skb = NULL;
1223         struct tipc_msg *msg;
1224         u32 peer_port = tsk_peer_port(tsk);
1225         u32 dnode = tsk_peer_node(tsk);
1226
1227         if (!tsk->connected)
1228                 return;
1229         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1230                               dnode, tsk_own_node(tsk), peer_port,
1231                               tsk->portid, TIPC_OK);
1232         if (!skb)
1233                 return;
1234         msg = buf_msg(skb);
1235         msg_set_conn_ack(msg, tsk->rcv_unacked);
1236         tsk->rcv_unacked = 0;
1237
1238         /* Adjust to and advertize the correct window limit */
1239         if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1240                 tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1241                 msg_set_adv_win(msg, tsk->rcv_win);
1242         }
1243         tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1244 }
1245
1246 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1247 {
1248         struct sock *sk = sock->sk;
1249         DEFINE_WAIT(wait);
1250         long timeo = *timeop;
1251         int err;
1252
1253         for (;;) {
1254                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1255                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1256                         if (sock->state == SS_DISCONNECTING) {
1257                                 err = -ENOTCONN;
1258                                 break;
1259                         }
1260                         release_sock(sk);
1261                         timeo = schedule_timeout(timeo);
1262                         lock_sock(sk);
1263                 }
1264                 err = 0;
1265                 if (!skb_queue_empty(&sk->sk_receive_queue))
1266                         break;
1267                 err = -EAGAIN;
1268                 if (!timeo)
1269                         break;
1270                 err = sock_intr_errno(timeo);
1271                 if (signal_pending(current))
1272                         break;
1273         }
1274         finish_wait(sk_sleep(sk), &wait);
1275         *timeop = timeo;
1276         return err;
1277 }
1278
1279 /**
1280  * tipc_recvmsg - receive packet-oriented message
1281  * @m: descriptor for message info
1282  * @buf_len: total size of user buffer area
1283  * @flags: receive flags
1284  *
1285  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1286  * If the complete message doesn't fit in user area, truncate it.
1287  *
1288  * Returns size of returned message data, errno otherwise
1289  */
1290 static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1291                         int flags)
1292 {
1293         struct sock *sk = sock->sk;
1294         struct tipc_sock *tsk = tipc_sk(sk);
1295         struct sk_buff *buf;
1296         struct tipc_msg *msg;
1297         long timeo;
1298         unsigned int sz;
1299         u32 err;
1300         int res, hlen;
1301
1302         /* Catch invalid receive requests */
1303         if (unlikely(!buf_len))
1304                 return -EINVAL;
1305
1306         lock_sock(sk);
1307
1308         if (unlikely(sock->state == SS_UNCONNECTED)) {
1309                 res = -ENOTCONN;
1310                 goto exit;
1311         }
1312
1313         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1314 restart:
1315
1316         /* Look for a message in receive queue; wait if necessary */
1317         res = tipc_wait_for_rcvmsg(sock, &timeo);
1318         if (res)
1319                 goto exit;
1320
1321         /* Look at first message in receive queue */
1322         buf = skb_peek(&sk->sk_receive_queue);
1323         msg = buf_msg(buf);
1324         sz = msg_data_sz(msg);
1325         hlen = msg_hdr_sz(msg);
1326         err = msg_errcode(msg);
1327
1328         /* Discard an empty non-errored message & try again */
1329         if ((!sz) && (!err)) {
1330                 tsk_advance_rx_queue(sk);
1331                 goto restart;
1332         }
1333
1334         /* Capture sender's address (optional) */
1335         set_orig_addr(m, msg);
1336
1337         /* Capture ancillary data (optional) */
1338         res = tipc_sk_anc_data_recv(m, msg, tsk);
1339         if (res)
1340                 goto exit;
1341
1342         /* Capture message data (if valid) & compute return value (always) */
1343         if (!err) {
1344                 if (unlikely(buf_len < sz)) {
1345                         sz = buf_len;
1346                         m->msg_flags |= MSG_TRUNC;
1347                 }
1348                 res = skb_copy_datagram_msg(buf, hlen, m, sz);
1349                 if (res)
1350                         goto exit;
1351                 res = sz;
1352         } else {
1353                 if ((sock->state == SS_READY) ||
1354                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1355                         res = 0;
1356                 else
1357                         res = -ECONNRESET;
1358         }
1359
1360         if (unlikely(flags & MSG_PEEK))
1361                 goto exit;
1362
1363         if (likely(sock->state != SS_READY)) {
1364                 tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1365                 if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1366                         tipc_sk_send_ack(tsk);
1367         }
1368         tsk_advance_rx_queue(sk);
1369 exit:
1370         release_sock(sk);
1371         return res;
1372 }
1373
1374 /**
1375  * tipc_recv_stream - receive stream-oriented data
1376  * @m: descriptor for message info
1377  * @buf_len: total size of user buffer area
1378  * @flags: receive flags
1379  *
1380  * Used for SOCK_STREAM messages only.  If not enough data is available
1381  * will optionally wait for more; never truncates data.
1382  *
1383  * Returns size of returned message data, errno otherwise
1384  */
1385 static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1386                             size_t buf_len, int flags)
1387 {
1388         struct sock *sk = sock->sk;
1389         struct tipc_sock *tsk = tipc_sk(sk);
1390         struct sk_buff *buf;
1391         struct tipc_msg *msg;
1392         long timeo;
1393         unsigned int sz;
1394         int sz_to_copy, target, needed;
1395         int sz_copied = 0;
1396         u32 err;
1397         int res = 0, hlen;
1398
1399         /* Catch invalid receive attempts */
1400         if (unlikely(!buf_len))
1401                 return -EINVAL;
1402
1403         lock_sock(sk);
1404
1405         if (unlikely(sock->state == SS_UNCONNECTED)) {
1406                 res = -ENOTCONN;
1407                 goto exit;
1408         }
1409
1410         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1411         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1412
1413 restart:
1414         /* Look for a message in receive queue; wait if necessary */
1415         res = tipc_wait_for_rcvmsg(sock, &timeo);
1416         if (res)
1417                 goto exit;
1418
1419         /* Look at first message in receive queue */
1420         buf = skb_peek(&sk->sk_receive_queue);
1421         msg = buf_msg(buf);
1422         sz = msg_data_sz(msg);
1423         hlen = msg_hdr_sz(msg);
1424         err = msg_errcode(msg);
1425
1426         /* Discard an empty non-errored message & try again */
1427         if ((!sz) && (!err)) {
1428                 tsk_advance_rx_queue(sk);
1429                 goto restart;
1430         }
1431
1432         /* Optionally capture sender's address & ancillary data of first msg */
1433         if (sz_copied == 0) {
1434                 set_orig_addr(m, msg);
1435                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1436                 if (res)
1437                         goto exit;
1438         }
1439
1440         /* Capture message data (if valid) & compute return value (always) */
1441         if (!err) {
1442                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1443
1444                 sz -= offset;
1445                 needed = (buf_len - sz_copied);
1446                 sz_to_copy = (sz <= needed) ? sz : needed;
1447
1448                 res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
1449                 if (res)
1450                         goto exit;
1451
1452                 sz_copied += sz_to_copy;
1453
1454                 if (sz_to_copy < sz) {
1455                         if (!(flags & MSG_PEEK))
1456                                 TIPC_SKB_CB(buf)->handle =
1457                                 (void *)(unsigned long)(offset + sz_to_copy);
1458                         goto exit;
1459                 }
1460         } else {
1461                 if (sz_copied != 0)
1462                         goto exit; /* can't add error msg to valid data */
1463
1464                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1465                         res = 0;
1466                 else
1467                         res = -ECONNRESET;
1468         }
1469
1470         if (unlikely(flags & MSG_PEEK))
1471                 goto exit;
1472
1473         tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1474         if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1475                 tipc_sk_send_ack(tsk);
1476         tsk_advance_rx_queue(sk);
1477
1478         /* Loop around if more data is required */
1479         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1480             (!skb_queue_empty(&sk->sk_receive_queue) ||
1481             (sz_copied < target)) &&    /* and more is ready or required */
1482             (!err))                     /* and haven't reached a FIN */
1483                 goto restart;
1484
1485 exit:
1486         release_sock(sk);
1487         return sz_copied ? sz_copied : res;
1488 }
1489
1490 /**
1491  * tipc_write_space - wake up thread if port congestion is released
1492  * @sk: socket
1493  */
1494 static void tipc_write_space(struct sock *sk)
1495 {
1496         struct socket_wq *wq;
1497
1498         rcu_read_lock();
1499         wq = rcu_dereference(sk->sk_wq);
1500         if (skwq_has_sleeper(wq))
1501                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1502                                                 POLLWRNORM | POLLWRBAND);
1503         rcu_read_unlock();
1504 }
1505
1506 /**
1507  * tipc_data_ready - wake up threads to indicate messages have been received
1508  * @sk: socket
1509  * @len: the length of messages
1510  */
1511 static void tipc_data_ready(struct sock *sk)
1512 {
1513         struct socket_wq *wq;
1514
1515         rcu_read_lock();
1516         wq = rcu_dereference(sk->sk_wq);
1517         if (skwq_has_sleeper(wq))
1518                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1519                                                 POLLRDNORM | POLLRDBAND);
1520         rcu_read_unlock();
1521 }
1522
1523 static void tipc_sock_destruct(struct sock *sk)
1524 {
1525         __skb_queue_purge(&sk->sk_receive_queue);
1526 }
1527
1528 /**
1529  * filter_connect - Handle all incoming messages for a connection-based socket
1530  * @tsk: TIPC socket
1531  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1532  *
1533  * Returns true if everything ok, false otherwise
1534  */
1535 static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1536 {
1537         struct sock *sk = &tsk->sk;
1538         struct net *net = sock_net(sk);
1539         struct socket *sock = sk->sk_socket;
1540         struct tipc_msg *hdr = buf_msg(skb);
1541
1542         if (unlikely(msg_mcast(hdr)))
1543                 return false;
1544
1545         switch ((int)sock->state) {
1546         case SS_CONNECTED:
1547
1548                 /* Accept only connection-based messages sent by peer */
1549                 if (unlikely(!tsk_peer_msg(tsk, hdr)))
1550                         return false;
1551
1552                 if (unlikely(msg_errcode(hdr))) {
1553                         sock->state = SS_DISCONNECTING;
1554                         tsk->connected = 0;
1555                         /* Let timer expire on it's own */
1556                         tipc_node_remove_conn(net, tsk_peer_node(tsk),
1557                                               tsk->portid);
1558                 }
1559                 return true;
1560
1561         case SS_CONNECTING:
1562
1563                 /* Accept only ACK or NACK message */
1564                 if (unlikely(!msg_connected(hdr)))
1565                         return false;
1566
1567                 if (unlikely(msg_errcode(hdr))) {
1568                         sock->state = SS_DISCONNECTING;
1569                         sk->sk_err = ECONNREFUSED;
1570                         return true;
1571                 }
1572
1573                 if (unlikely(!msg_isdata(hdr))) {
1574                         sock->state = SS_DISCONNECTING;
1575                         sk->sk_err = EINVAL;
1576                         return true;
1577                 }
1578
1579                 tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
1580                 msg_set_importance(&tsk->phdr, msg_importance(hdr));
1581                 sock->state = SS_CONNECTED;
1582
1583                 /* If 'ACK+' message, add to socket receive queue */
1584                 if (msg_data_sz(hdr))
1585                         return true;
1586
1587                 /* If empty 'ACK-' message, wake up sleeping connect() */
1588                 if (waitqueue_active(sk_sleep(sk)))
1589                         wake_up_interruptible(sk_sleep(sk));
1590
1591                 /* 'ACK-' message is neither accepted nor rejected: */
1592                 msg_set_dest_droppable(hdr, 1);
1593                 return false;
1594
1595         case SS_LISTENING:
1596         case SS_UNCONNECTED:
1597
1598                 /* Accept only SYN message */
1599                 if (!msg_connected(hdr) && !(msg_errcode(hdr)))
1600                         return true;
1601                 break;
1602         case SS_DISCONNECTING:
1603                 break;
1604         default:
1605                 pr_err("Unknown socket state %u\n", sock->state);
1606         }
1607         return false;
1608 }
1609
1610 /**
1611  * rcvbuf_limit - get proper overload limit of socket receive queue
1612  * @sk: socket
1613  * @skb: message
1614  *
1615  * For connection oriented messages, irrespective of importance,
1616  * default queue limit is 2 MB.
1617  *
1618  * For connectionless messages, queue limits are based on message
1619  * importance as follows:
1620  *
1621  * TIPC_LOW_IMPORTANCE       (2 MB)
1622  * TIPC_MEDIUM_IMPORTANCE    (4 MB)
1623  * TIPC_HIGH_IMPORTANCE      (8 MB)
1624  * TIPC_CRITICAL_IMPORTANCE  (16 MB)
1625  *
1626  * Returns overload limit according to corresponding message importance
1627  */
1628 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1629 {
1630         struct tipc_sock *tsk = tipc_sk(sk);
1631         struct tipc_msg *hdr = buf_msg(skb);
1632
1633         if (unlikely(!msg_connected(hdr)))
1634                 return sk->sk_rcvbuf << msg_importance(hdr);
1635
1636         if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
1637                 return sk->sk_rcvbuf;
1638
1639         return FLOWCTL_MSG_LIM;
1640 }
1641
1642 /**
1643  * filter_rcv - validate incoming message
1644  * @sk: socket
1645  * @skb: pointer to message.
1646  *
1647  * Enqueues message on receive queue if acceptable; optionally handles
1648  * disconnect indication for a connected socket.
1649  *
1650  * Called with socket lock already taken
1651  *
1652  * Returns true if message was added to socket receive queue, otherwise false
1653  */
1654 static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1655                        struct sk_buff_head *xmitq)
1656 {
1657         struct socket *sock = sk->sk_socket;
1658         struct tipc_sock *tsk = tipc_sk(sk);
1659         struct tipc_msg *hdr = buf_msg(skb);
1660         unsigned int limit = rcvbuf_limit(sk, skb);
1661         int err = TIPC_OK;
1662         int usr = msg_user(hdr);
1663
1664         if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1665                 tipc_sk_proto_rcv(tsk, skb, xmitq);
1666                 return false;
1667         }
1668
1669         if (unlikely(usr == SOCK_WAKEUP)) {
1670                 kfree_skb(skb);
1671                 tsk->link_cong = 0;
1672                 sk->sk_write_space(sk);
1673                 return false;
1674         }
1675
1676         /* Drop if illegal message type */
1677         if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
1678                 kfree_skb(skb);
1679                 return false;
1680         }
1681
1682         /* Reject if wrong message type for current socket state */
1683         if (unlikely(sock->state == SS_READY)) {
1684                 if (msg_connected(hdr)) {
1685                         err = TIPC_ERR_NO_PORT;
1686                         goto reject;
1687                 }
1688         } else if (unlikely(!filter_connect(tsk, skb))) {
1689                 err = TIPC_ERR_NO_PORT;
1690                 goto reject;
1691         }
1692
1693         /* Reject message if there isn't room to queue it */
1694         if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
1695                 err = TIPC_ERR_OVERLOAD;
1696                 goto reject;
1697         }
1698
1699         /* Enqueue message */
1700         TIPC_SKB_CB(skb)->handle = NULL;
1701         __skb_queue_tail(&sk->sk_receive_queue, skb);
1702         skb_set_owner_r(skb, sk);
1703
1704         sk->sk_data_ready(sk);
1705         return true;
1706
1707 reject:
1708         if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1709                 __skb_queue_tail(xmitq, skb);
1710         return false;
1711 }
1712
1713 /**
1714  * tipc_backlog_rcv - handle incoming message from backlog queue
1715  * @sk: socket
1716  * @skb: message
1717  *
1718  * Caller must hold socket lock
1719  *
1720  * Returns 0
1721  */
1722 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1723 {
1724         unsigned int truesize = skb->truesize;
1725         struct sk_buff_head xmitq;
1726         u32 dnode, selector;
1727
1728         __skb_queue_head_init(&xmitq);
1729
1730         if (likely(filter_rcv(sk, skb, &xmitq))) {
1731                 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1732                 return 0;
1733         }
1734
1735         if (skb_queue_empty(&xmitq))
1736                 return 0;
1737
1738         /* Send response/rejected message */
1739         skb = __skb_dequeue(&xmitq);
1740         dnode = msg_destnode(buf_msg(skb));
1741         selector = msg_origport(buf_msg(skb));
1742         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1743         return 0;
1744 }
1745
1746 /**
1747  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1748  *                   inputq and try adding them to socket or backlog queue
1749  * @inputq: list of incoming buffers with potentially different destinations
1750  * @sk: socket where the buffers should be enqueued
1751  * @dport: port number for the socket
1752  *
1753  * Caller must hold socket lock
1754  */
1755 static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1756                             u32 dport, struct sk_buff_head *xmitq)
1757 {
1758         unsigned long time_limit = jiffies + usecs_to_jiffies(20000);
1759         struct sk_buff *skb;
1760         unsigned int lim;
1761         atomic_t *dcnt;
1762         u32 onode;
1763
1764         while (skb_queue_len(inputq)) {
1765                 if (unlikely(time_after_eq(jiffies, time_limit)))
1766                         return;
1767
1768                 skb = tipc_skb_dequeue(inputq, dport);
1769                 if (unlikely(!skb))
1770                         return;
1771
1772                 /* Add message directly to receive queue if possible */
1773                 if (!sock_owned_by_user(sk)) {
1774                         filter_rcv(sk, skb, xmitq);
1775                         continue;
1776                 }
1777
1778                 /* Try backlog, compensating for double-counted bytes */
1779                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1780                 if (!sk->sk_backlog.len)
1781                         atomic_set(dcnt, 0);
1782                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1783                 if (likely(!sk_add_backlog(sk, skb, lim)))
1784                         continue;
1785
1786                 /* Overload => reject message back to sender */
1787                 onode = tipc_own_addr(sock_net(sk));
1788                 if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
1789                         __skb_queue_tail(xmitq, skb);
1790                 break;
1791         }
1792 }
1793
1794 /**
1795  * tipc_sk_rcv - handle a chain of incoming buffers
1796  * @inputq: buffer list containing the buffers
1797  * Consumes all buffers in list until inputq is empty
1798  * Note: may be called in multiple threads referring to the same queue
1799  */
1800 void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1801 {
1802         struct sk_buff_head xmitq;
1803         u32 dnode, dport = 0;
1804         int err;
1805         struct tipc_sock *tsk;
1806         struct sock *sk;
1807         struct sk_buff *skb;
1808
1809         __skb_queue_head_init(&xmitq);
1810         while (skb_queue_len(inputq)) {
1811                 dport = tipc_skb_peek_port(inputq, dport);
1812                 tsk = tipc_sk_lookup(net, dport);
1813
1814                 if (likely(tsk)) {
1815                         sk = &tsk->sk;
1816                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1817                                 tipc_sk_enqueue(inputq, sk, dport, &xmitq);
1818                                 spin_unlock_bh(&sk->sk_lock.slock);
1819                         }
1820                         /* Send pending response/rejected messages, if any */
1821                         while ((skb = __skb_dequeue(&xmitq))) {
1822                                 dnode = msg_destnode(buf_msg(skb));
1823                                 tipc_node_xmit_skb(net, skb, dnode, dport);
1824                         }
1825                         sock_put(sk);
1826                         continue;
1827                 }
1828
1829                 /* No destination socket => dequeue skb if still there */
1830                 skb = tipc_skb_dequeue(inputq, dport);
1831                 if (!skb)
1832                         return;
1833
1834                 /* Try secondary lookup if unresolved named message */
1835                 err = TIPC_ERR_NO_PORT;
1836                 if (tipc_msg_lookup_dest(net, skb, &err))
1837                         goto xmit;
1838
1839                 /* Prepare for message rejection */
1840                 if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1841                         continue;
1842 xmit:
1843                 dnode = msg_destnode(buf_msg(skb));
1844                 tipc_node_xmit_skb(net, skb, dnode, dport);
1845         }
1846 }
1847
1848 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1849 {
1850         struct sock *sk = sock->sk;
1851         DEFINE_WAIT(wait);
1852         int done;
1853
1854         do {
1855                 int err = sock_error(sk);
1856                 if (err)
1857                         return err;
1858                 if (!*timeo_p)
1859                         return -ETIMEDOUT;
1860                 if (signal_pending(current))
1861                         return sock_intr_errno(*timeo_p);
1862
1863                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1864                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1865                 finish_wait(sk_sleep(sk), &wait);
1866         } while (!done);
1867         return 0;
1868 }
1869
1870 /**
1871  * tipc_connect - establish a connection to another TIPC port
1872  * @sock: socket structure
1873  * @dest: socket address for destination port
1874  * @destlen: size of socket address data structure
1875  * @flags: file-related flags associated with socket
1876  *
1877  * Returns 0 on success, errno otherwise
1878  */
1879 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1880                         int destlen, int flags)
1881 {
1882         struct sock *sk = sock->sk;
1883         struct tipc_sock *tsk = tipc_sk(sk);
1884         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1885         struct msghdr m = {NULL,};
1886         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1887         socket_state previous;
1888         int res = 0;
1889
1890         lock_sock(sk);
1891
1892         /* DGRAM/RDM connect(), just save the destaddr */
1893         if (sock->state == SS_READY) {
1894                 if (dst->family == AF_UNSPEC) {
1895                         memset(&tsk->remote, 0, sizeof(struct sockaddr_tipc));
1896                         tsk->connected = 0;
1897                 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1898                         res = -EINVAL;
1899                 } else {
1900                         memcpy(&tsk->remote, dest, destlen);
1901                         tsk->connected = 1;
1902                 }
1903                 goto exit;
1904         }
1905
1906         /*
1907          * Reject connection attempt using multicast address
1908          *
1909          * Note: send_msg() validates the rest of the address fields,
1910          *       so there's no need to do it here
1911          */
1912         if (dst->addrtype == TIPC_ADDR_MCAST) {
1913                 res = -EINVAL;
1914                 goto exit;
1915         }
1916
1917         previous = sock->state;
1918         switch (sock->state) {
1919         case SS_UNCONNECTED:
1920                 /* Send a 'SYN-' to destination */
1921                 m.msg_name = dest;
1922                 m.msg_namelen = destlen;
1923
1924                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1925                  * indicate send_msg() is never blocked.
1926                  */
1927                 if (!timeout)
1928                         m.msg_flags = MSG_DONTWAIT;
1929
1930                 res = __tipc_sendmsg(sock, &m, 0);
1931                 if ((res < 0) && (res != -EWOULDBLOCK))
1932                         goto exit;
1933
1934                 /* Just entered SS_CONNECTING state; the only
1935                  * difference is that return value in non-blocking
1936                  * case is EINPROGRESS, rather than EALREADY.
1937                  */
1938                 res = -EINPROGRESS;
1939         case SS_CONNECTING:
1940                 if (previous == SS_CONNECTING)
1941                         res = -EALREADY;
1942                 if (!timeout)
1943                         goto exit;
1944                 timeout = msecs_to_jiffies(timeout);
1945                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1946                 res = tipc_wait_for_connect(sock, &timeout);
1947                 break;
1948         case SS_CONNECTED:
1949                 res = -EISCONN;
1950                 break;
1951         default:
1952                 res = -EINVAL;
1953                 break;
1954         }
1955 exit:
1956         release_sock(sk);
1957         return res;
1958 }
1959
1960 /**
1961  * tipc_listen - allow socket to listen for incoming connections
1962  * @sock: socket structure
1963  * @len: (unused)
1964  *
1965  * Returns 0 on success, errno otherwise
1966  */
1967 static int tipc_listen(struct socket *sock, int len)
1968 {
1969         struct sock *sk = sock->sk;
1970         int res;
1971
1972         lock_sock(sk);
1973
1974         if (sock->state != SS_UNCONNECTED)
1975                 res = -EINVAL;
1976         else {
1977                 sock->state = SS_LISTENING;
1978                 res = 0;
1979         }
1980
1981         release_sock(sk);
1982         return res;
1983 }
1984
1985 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1986 {
1987         struct sock *sk = sock->sk;
1988         DEFINE_WAIT_FUNC(wait, woken_wake_function);
1989         int err;
1990
1991         /* True wake-one mechanism for incoming connections: only
1992          * one process gets woken up, not the 'whole herd'.
1993          * Since we do not 'race & poll' for established sockets
1994          * anymore, the common case will execute the loop only once.
1995         */
1996         for (;;) {
1997                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1998                         add_wait_queue(sk_sleep(sk), &wait);
1999                         release_sock(sk);
2000                         timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
2001                         lock_sock(sk);
2002                         remove_wait_queue(sk_sleep(sk), &wait);
2003                 }
2004                 err = 0;
2005                 if (!skb_queue_empty(&sk->sk_receive_queue))
2006                         break;
2007                 err = -EINVAL;
2008                 if (sock->state != SS_LISTENING)
2009                         break;
2010                 err = -EAGAIN;
2011                 if (!timeo)
2012                         break;
2013                 err = sock_intr_errno(timeo);
2014                 if (signal_pending(current))
2015                         break;
2016         }
2017         return err;
2018 }
2019
2020 /**
2021  * tipc_accept - wait for connection request
2022  * @sock: listening socket
2023  * @newsock: new socket that is to be connected
2024  * @flags: file-related flags associated with socket
2025  *
2026  * Returns 0 on success, errno otherwise
2027  */
2028 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2029 {
2030         struct sock *new_sk, *sk = sock->sk;
2031         struct sk_buff *buf;
2032         struct tipc_sock *new_tsock;
2033         struct tipc_msg *msg;
2034         long timeo;
2035         int res;
2036
2037         lock_sock(sk);
2038
2039         if (sock->state != SS_LISTENING) {
2040                 res = -EINVAL;
2041                 goto exit;
2042         }
2043         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2044         res = tipc_wait_for_accept(sock, timeo);
2045         if (res)
2046                 goto exit;
2047
2048         buf = skb_peek(&sk->sk_receive_queue);
2049
2050         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2051         if (res)
2052                 goto exit;
2053         security_sk_clone(sock->sk, new_sock->sk);
2054
2055         new_sk = new_sock->sk;
2056         new_tsock = tipc_sk(new_sk);
2057         msg = buf_msg(buf);
2058
2059         /* we lock on new_sk; but lockdep sees the lock on sk */
2060         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2061
2062         /*
2063          * Reject any stray messages received by new socket
2064          * before the socket lock was taken (very, very unlikely)
2065          */
2066         tsk_rej_rx_queue(new_sk);
2067
2068         /* Connect new socket to it's peer */
2069         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2070         new_sock->state = SS_CONNECTED;
2071
2072         tsk_set_importance(new_tsock, msg_importance(msg));
2073         if (msg_named(msg)) {
2074                 new_tsock->conn_type = msg_nametype(msg);
2075                 new_tsock->conn_instance = msg_nameinst(msg);
2076         }
2077
2078         /*
2079          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2080          * Respond to 'SYN+' by queuing it on new socket.
2081          */
2082         if (!msg_data_sz(msg)) {
2083                 struct msghdr m = {NULL,};
2084
2085                 tsk_advance_rx_queue(sk);
2086                 __tipc_send_stream(new_sock, &m, 0);
2087         } else {
2088                 __skb_dequeue(&sk->sk_receive_queue);
2089                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2090                 skb_set_owner_r(buf, new_sk);
2091         }
2092         release_sock(new_sk);
2093 exit:
2094         release_sock(sk);
2095         return res;
2096 }
2097
2098 /**
2099  * tipc_shutdown - shutdown socket connection
2100  * @sock: socket structure
2101  * @how: direction to close (must be SHUT_RDWR)
2102  *
2103  * Terminates connection (if necessary), then purges socket's receive queue.
2104  *
2105  * Returns 0 on success, errno otherwise
2106  */
2107 static int tipc_shutdown(struct socket *sock, int how)
2108 {
2109         struct sock *sk = sock->sk;
2110         struct net *net = sock_net(sk);
2111         struct tipc_sock *tsk = tipc_sk(sk);
2112         struct sk_buff *skb;
2113         u32 dnode = tsk_peer_node(tsk);
2114         u32 dport = tsk_peer_port(tsk);
2115         u32 onode = tipc_own_addr(net);
2116         u32 oport = tsk->portid;
2117         int res;
2118
2119         if (how != SHUT_RDWR)
2120                 return -EINVAL;
2121
2122         lock_sock(sk);
2123
2124         switch (sock->state) {
2125         case SS_CONNECTING:
2126         case SS_CONNECTED:
2127
2128 restart:
2129                 dnode = tsk_peer_node(tsk);
2130
2131                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2132                 skb = __skb_dequeue(&sk->sk_receive_queue);
2133                 if (skb) {
2134                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2135                                 kfree_skb(skb);
2136                                 goto restart;
2137                         }
2138                         tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
2139                 } else {
2140                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2141                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2142                                               0, dnode, onode, dport, oport,
2143                                               TIPC_CONN_SHUTDOWN);
2144                         if (skb)
2145                                 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2146                 }
2147                 tsk->connected = 0;
2148                 sock->state = SS_DISCONNECTING;
2149                 tipc_node_remove_conn(net, dnode, tsk->portid);
2150                 /* fall through */
2151
2152         case SS_DISCONNECTING:
2153
2154                 /* Discard any unreceived messages */
2155                 __skb_queue_purge(&sk->sk_receive_queue);
2156
2157                 /* Wake up anyone sleeping in poll */
2158                 sk->sk_state_change(sk);
2159                 res = 0;
2160                 break;
2161
2162         default:
2163                 res = -ENOTCONN;
2164         }
2165
2166         release_sock(sk);
2167         return res;
2168 }
2169
2170 static void tipc_sk_timeout(unsigned long data)
2171 {
2172         struct tipc_sock *tsk = (struct tipc_sock *)data;
2173         struct sock *sk = &tsk->sk;
2174         struct sk_buff *skb = NULL;
2175         u32 peer_port, peer_node;
2176         u32 own_node = tsk_own_node(tsk);
2177
2178         bh_lock_sock(sk);
2179         if (!tsk->connected) {
2180                 bh_unlock_sock(sk);
2181                 goto exit;
2182         }
2183         peer_port = tsk_peer_port(tsk);
2184         peer_node = tsk_peer_node(tsk);
2185
2186         if (tsk->probing_state == TIPC_CONN_PROBING) {
2187                 if (!sock_owned_by_user(sk)) {
2188                         sk->sk_socket->state = SS_DISCONNECTING;
2189                         tsk->connected = 0;
2190                         tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
2191                                               tsk_peer_port(tsk));
2192                         sk->sk_state_change(sk);
2193                 } else {
2194                         /* Try again later */
2195                         sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2196                 }
2197
2198         } else {
2199                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2200                                       INT_H_SIZE, 0, peer_node, own_node,
2201                                       peer_port, tsk->portid, TIPC_OK);
2202                 tsk->probing_state = TIPC_CONN_PROBING;
2203                 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2204         }
2205         bh_unlock_sock(sk);
2206         if (skb)
2207                 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2208 exit:
2209         sock_put(sk);
2210 }
2211
2212 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2213                            struct tipc_name_seq const *seq)
2214 {
2215         struct net *net = sock_net(&tsk->sk);
2216         struct publication *publ;
2217         u32 key;
2218
2219         if (tsk->connected)
2220                 return -EINVAL;
2221         key = tsk->portid + tsk->pub_count + 1;
2222         if (key == tsk->portid)
2223                 return -EADDRINUSE;
2224
2225         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2226                                     scope, tsk->portid, key);
2227         if (unlikely(!publ))
2228                 return -EINVAL;
2229
2230         list_add(&publ->pport_list, &tsk->publications);
2231         tsk->pub_count++;
2232         tsk->published = 1;
2233         return 0;
2234 }
2235
2236 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2237                             struct tipc_name_seq const *seq)
2238 {
2239         struct net *net = sock_net(&tsk->sk);
2240         struct publication *publ;
2241         struct publication *safe;
2242         int rc = -EINVAL;
2243
2244         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2245                 if (seq) {
2246                         if (publ->scope != scope)
2247                                 continue;
2248                         if (publ->type != seq->type)
2249                                 continue;
2250                         if (publ->lower != seq->lower)
2251                                 continue;
2252                         if (publ->upper != seq->upper)
2253                                 break;
2254                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2255                                               publ->ref, publ->key);
2256                         rc = 0;
2257                         break;
2258                 }
2259                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2260                                       publ->ref, publ->key);
2261                 rc = 0;
2262         }
2263         if (list_empty(&tsk->publications))
2264                 tsk->published = 0;
2265         return rc;
2266 }
2267
2268 /* tipc_sk_reinit: set non-zero address in all existing sockets
2269  *                 when we go from standalone to network mode.
2270  */
2271 void tipc_sk_reinit(struct net *net)
2272 {
2273         struct tipc_net *tn = net_generic(net, tipc_net_id);
2274         struct rhashtable_iter iter;
2275         struct tipc_sock *tsk;
2276         struct tipc_msg *msg;
2277
2278         rhashtable_walk_enter(&tn->sk_rht, &iter);
2279
2280         do {
2281                 tsk = ERR_PTR(rhashtable_walk_start(&iter));
2282                 if (IS_ERR(tsk))
2283                         goto walk_stop;
2284
2285                 while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2286                         sock_hold(&tsk->sk);
2287                         rhashtable_walk_stop(&iter);
2288                         lock_sock(&tsk->sk);
2289                         msg = &tsk->phdr;
2290                         msg_set_prevnode(msg, tn->own_addr);
2291                         msg_set_orignode(msg, tn->own_addr);
2292                         release_sock(&tsk->sk);
2293                         rhashtable_walk_start(&iter);
2294                         sock_put(&tsk->sk);
2295                 }
2296 walk_stop:
2297                 rhashtable_walk_stop(&iter);
2298         } while (tsk == ERR_PTR(-EAGAIN));
2299 }
2300
2301 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2302 {
2303         struct tipc_net *tn = net_generic(net, tipc_net_id);
2304         struct tipc_sock *tsk;
2305
2306         rcu_read_lock();
2307         tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2308         if (tsk)
2309                 sock_hold(&tsk->sk);
2310         rcu_read_unlock();
2311
2312         return tsk;
2313 }
2314
2315 static int tipc_sk_insert(struct tipc_sock *tsk)
2316 {
2317         struct sock *sk = &tsk->sk;
2318         struct net *net = sock_net(sk);
2319         struct tipc_net *tn = net_generic(net, tipc_net_id);
2320         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2321         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2322
2323         while (remaining--) {
2324                 portid++;
2325                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2326                         portid = TIPC_MIN_PORT;
2327                 tsk->portid = portid;
2328                 sock_hold(&tsk->sk);
2329                 if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2330                                                    tsk_rht_params))
2331                         return 0;
2332                 sock_put(&tsk->sk);
2333         }
2334
2335         return -1;
2336 }
2337
2338 static void tipc_sk_remove(struct tipc_sock *tsk)
2339 {
2340         struct sock *sk = &tsk->sk;
2341         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2342
2343         if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2344                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2345                 __sock_put(sk);
2346         }
2347 }
2348
2349 static const struct rhashtable_params tsk_rht_params = {
2350         .nelem_hint = 192,
2351         .head_offset = offsetof(struct tipc_sock, node),
2352         .key_offset = offsetof(struct tipc_sock, portid),
2353         .key_len = sizeof(u32), /* portid */
2354         .max_size = 1048576,
2355         .min_size = 256,
2356         .automatic_shrinking = true,
2357 };
2358
2359 int tipc_sk_rht_init(struct net *net)
2360 {
2361         struct tipc_net *tn = net_generic(net, tipc_net_id);
2362
2363         return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2364 }
2365
2366 void tipc_sk_rht_destroy(struct net *net)
2367 {
2368         struct tipc_net *tn = net_generic(net, tipc_net_id);
2369
2370         /* Wait for socket readers to complete */
2371         synchronize_net();
2372
2373         rhashtable_destroy(&tn->sk_rht);
2374 }
2375
2376 /**
2377  * tipc_setsockopt - set socket option
2378  * @sock: socket structure
2379  * @lvl: option level
2380  * @opt: option identifier
2381  * @ov: pointer to new option value
2382  * @ol: length of option value
2383  *
2384  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2385  * (to ease compatibility).
2386  *
2387  * Returns 0 on success, errno otherwise
2388  */
2389 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2390                            char __user *ov, unsigned int ol)
2391 {
2392         struct sock *sk = sock->sk;
2393         struct tipc_sock *tsk = tipc_sk(sk);
2394         u32 value;
2395         int res;
2396
2397         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2398                 return 0;
2399         if (lvl != SOL_TIPC)
2400                 return -ENOPROTOOPT;
2401         if (ol < sizeof(value))
2402                 return -EINVAL;
2403         res = get_user(value, (u32 __user *)ov);
2404         if (res)
2405                 return res;
2406
2407         lock_sock(sk);
2408
2409         switch (opt) {
2410         case TIPC_IMPORTANCE:
2411                 res = tsk_set_importance(tsk, value);
2412                 break;
2413         case TIPC_SRC_DROPPABLE:
2414                 if (sock->type != SOCK_STREAM)
2415                         tsk_set_unreliable(tsk, value);
2416                 else
2417                         res = -ENOPROTOOPT;
2418                 break;
2419         case TIPC_DEST_DROPPABLE:
2420                 tsk_set_unreturnable(tsk, value);
2421                 break;
2422         case TIPC_CONN_TIMEOUT:
2423                 tipc_sk(sk)->conn_timeout = value;
2424                 /* no need to set "res", since already 0 at this point */
2425                 break;
2426         default:
2427                 res = -EINVAL;
2428         }
2429
2430         release_sock(sk);
2431
2432         return res;
2433 }
2434
2435 /**
2436  * tipc_getsockopt - get socket option
2437  * @sock: socket structure
2438  * @lvl: option level
2439  * @opt: option identifier
2440  * @ov: receptacle for option value
2441  * @ol: receptacle for length of option value
2442  *
2443  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2444  * (to ease compatibility).
2445  *
2446  * Returns 0 on success, errno otherwise
2447  */
2448 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2449                            char __user *ov, int __user *ol)
2450 {
2451         struct sock *sk = sock->sk;
2452         struct tipc_sock *tsk = tipc_sk(sk);
2453         int len;
2454         u32 value;
2455         int res;
2456
2457         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2458                 return put_user(0, ol);
2459         if (lvl != SOL_TIPC)
2460                 return -ENOPROTOOPT;
2461         res = get_user(len, ol);
2462         if (res)
2463                 return res;
2464
2465         lock_sock(sk);
2466
2467         switch (opt) {
2468         case TIPC_IMPORTANCE:
2469                 value = tsk_importance(tsk);
2470                 break;
2471         case TIPC_SRC_DROPPABLE:
2472                 value = tsk_unreliable(tsk);
2473                 break;
2474         case TIPC_DEST_DROPPABLE:
2475                 value = tsk_unreturnable(tsk);
2476                 break;
2477         case TIPC_CONN_TIMEOUT:
2478                 value = tsk->conn_timeout;
2479                 /* no need to set "res", since already 0 at this point */
2480                 break;
2481         case TIPC_NODE_RECVQ_DEPTH:
2482                 value = 0; /* was tipc_queue_size, now obsolete */
2483                 break;
2484         case TIPC_SOCK_RECVQ_DEPTH:
2485                 value = skb_queue_len(&sk->sk_receive_queue);
2486                 break;
2487         default:
2488                 res = -EINVAL;
2489         }
2490
2491         release_sock(sk);
2492
2493         if (res)
2494                 return res;     /* "get" failed */
2495
2496         if (len < sizeof(value))
2497                 return -EINVAL;
2498
2499         if (copy_to_user(ov, &value, sizeof(value)))
2500                 return -EFAULT;
2501
2502         return put_user(sizeof(value), ol);
2503 }
2504
2505 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2506 {
2507         struct sock *sk = sock->sk;
2508         struct tipc_sioc_ln_req lnr;
2509         void __user *argp = (void __user *)arg;
2510
2511         switch (cmd) {
2512         case SIOCGETLINKNAME:
2513                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2514                         return -EFAULT;
2515                 if (!tipc_node_get_linkname(sock_net(sk),
2516                                             lnr.bearer_id & 0xffff, lnr.peer,
2517                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2518                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2519                                 return -EFAULT;
2520                         return 0;
2521                 }
2522                 return -EADDRNOTAVAIL;
2523         default:
2524                 return -ENOIOCTLCMD;
2525         }
2526 }
2527
2528 /* Protocol switches for the various types of TIPC sockets */
2529
2530 static const struct proto_ops msg_ops = {
2531         .owner          = THIS_MODULE,
2532         .family         = AF_TIPC,
2533         .release        = tipc_release,
2534         .bind           = tipc_bind,
2535         .connect        = tipc_connect,
2536         .socketpair     = sock_no_socketpair,
2537         .accept         = sock_no_accept,
2538         .getname        = tipc_getname,
2539         .poll           = tipc_poll,
2540         .ioctl          = tipc_ioctl,
2541         .listen         = sock_no_listen,
2542         .shutdown       = tipc_shutdown,
2543         .setsockopt     = tipc_setsockopt,
2544         .getsockopt     = tipc_getsockopt,
2545         .sendmsg        = tipc_sendmsg,
2546         .recvmsg        = tipc_recvmsg,
2547         .mmap           = sock_no_mmap,
2548         .sendpage       = sock_no_sendpage
2549 };
2550
2551 static const struct proto_ops packet_ops = {
2552         .owner          = THIS_MODULE,
2553         .family         = AF_TIPC,
2554         .release        = tipc_release,
2555         .bind           = tipc_bind,
2556         .connect        = tipc_connect,
2557         .socketpair     = sock_no_socketpair,
2558         .accept         = tipc_accept,
2559         .getname        = tipc_getname,
2560         .poll           = tipc_poll,
2561         .ioctl          = tipc_ioctl,
2562         .listen         = tipc_listen,
2563         .shutdown       = tipc_shutdown,
2564         .setsockopt     = tipc_setsockopt,
2565         .getsockopt     = tipc_getsockopt,
2566         .sendmsg        = tipc_send_packet,
2567         .recvmsg        = tipc_recvmsg,
2568         .mmap           = sock_no_mmap,
2569         .sendpage       = sock_no_sendpage
2570 };
2571
2572 static const struct proto_ops stream_ops = {
2573         .owner          = THIS_MODULE,
2574         .family         = AF_TIPC,
2575         .release        = tipc_release,
2576         .bind           = tipc_bind,
2577         .connect        = tipc_connect,
2578         .socketpair     = sock_no_socketpair,
2579         .accept         = tipc_accept,
2580         .getname        = tipc_getname,
2581         .poll           = tipc_poll,
2582         .ioctl          = tipc_ioctl,
2583         .listen         = tipc_listen,
2584         .shutdown       = tipc_shutdown,
2585         .setsockopt     = tipc_setsockopt,
2586         .getsockopt     = tipc_getsockopt,
2587         .sendmsg        = tipc_send_stream,
2588         .recvmsg        = tipc_recv_stream,
2589         .mmap           = sock_no_mmap,
2590         .sendpage       = sock_no_sendpage
2591 };
2592
2593 static const struct net_proto_family tipc_family_ops = {
2594         .owner          = THIS_MODULE,
2595         .family         = AF_TIPC,
2596         .create         = tipc_sk_create
2597 };
2598
2599 static struct proto tipc_proto = {
2600         .name           = "TIPC",
2601         .owner          = THIS_MODULE,
2602         .obj_size       = sizeof(struct tipc_sock),
2603         .sysctl_rmem    = sysctl_tipc_rmem
2604 };
2605
2606 /**
2607  * tipc_socket_init - initialize TIPC socket interface
2608  *
2609  * Returns 0 on success, errno otherwise
2610  */
2611 int tipc_socket_init(void)
2612 {
2613         int res;
2614
2615         res = proto_register(&tipc_proto, 1);
2616         if (res) {
2617                 pr_err("Failed to register TIPC protocol type\n");
2618                 goto out;
2619         }
2620
2621         res = sock_register(&tipc_family_ops);
2622         if (res) {
2623                 pr_err("Failed to register TIPC socket type\n");
2624                 proto_unregister(&tipc_proto);
2625                 goto out;
2626         }
2627  out:
2628         return res;
2629 }
2630
2631 /**
2632  * tipc_socket_stop - stop TIPC socket interface
2633  */
2634 void tipc_socket_stop(void)
2635 {
2636         sock_unregister(tipc_family_ops.family);
2637         proto_unregister(&tipc_proto);
2638 }
2639
2640 /* Caller should hold socket lock for the passed tipc socket. */
2641 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2642 {
2643         u32 peer_node;
2644         u32 peer_port;
2645         struct nlattr *nest;
2646
2647         peer_node = tsk_peer_node(tsk);
2648         peer_port = tsk_peer_port(tsk);
2649
2650         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2651
2652         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2653                 goto msg_full;
2654         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2655                 goto msg_full;
2656
2657         if (tsk->conn_type != 0) {
2658                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2659                         goto msg_full;
2660                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2661                         goto msg_full;
2662                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2663                         goto msg_full;
2664         }
2665         nla_nest_end(skb, nest);
2666
2667         return 0;
2668
2669 msg_full:
2670         nla_nest_cancel(skb, nest);
2671
2672         return -EMSGSIZE;
2673 }
2674
2675 /* Caller should hold socket lock for the passed tipc socket. */
2676 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2677                             struct tipc_sock *tsk)
2678 {
2679         int err;
2680         void *hdr;
2681         struct nlattr *attrs;
2682         struct net *net = sock_net(skb->sk);
2683         struct tipc_net *tn = net_generic(net, tipc_net_id);
2684
2685         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2686                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2687         if (!hdr)
2688                 goto msg_cancel;
2689
2690         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2691         if (!attrs)
2692                 goto genlmsg_cancel;
2693         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2694                 goto attr_msg_cancel;
2695         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2696                 goto attr_msg_cancel;
2697
2698         if (tsk->connected) {
2699                 err = __tipc_nl_add_sk_con(skb, tsk);
2700                 if (err)
2701                         goto attr_msg_cancel;
2702         } else if (!list_empty(&tsk->publications)) {
2703                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2704                         goto attr_msg_cancel;
2705         }
2706         nla_nest_end(skb, attrs);
2707         genlmsg_end(skb, hdr);
2708
2709         return 0;
2710
2711 attr_msg_cancel:
2712         nla_nest_cancel(skb, attrs);
2713 genlmsg_cancel:
2714         genlmsg_cancel(skb, hdr);
2715 msg_cancel:
2716         return -EMSGSIZE;
2717 }
2718
2719 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2720 {
2721         int err;
2722         struct tipc_sock *tsk;
2723         const struct bucket_table *tbl;
2724         struct rhash_head *pos;
2725         struct net *net = sock_net(skb->sk);
2726         struct tipc_net *tn = net_generic(net, tipc_net_id);
2727         u32 tbl_id = cb->args[0];
2728         u32 prev_portid = cb->args[1];
2729
2730         rcu_read_lock();
2731         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2732         for (; tbl_id < tbl->size; tbl_id++) {
2733                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2734                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2735                         if (prev_portid && prev_portid != tsk->portid) {
2736                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2737                                 continue;
2738                         }
2739
2740                         err = __tipc_nl_add_sk(skb, cb, tsk);
2741                         if (err) {
2742                                 prev_portid = tsk->portid;
2743                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2744                                 goto out;
2745                         }
2746                         prev_portid = 0;
2747                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2748                 }
2749         }
2750 out:
2751         rcu_read_unlock();
2752         cb->args[0] = tbl_id;
2753         cb->args[1] = prev_portid;
2754
2755         return skb->len;
2756 }
2757
2758 /* Caller should hold socket lock for the passed tipc socket. */
2759 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2760                                  struct netlink_callback *cb,
2761                                  struct publication *publ)
2762 {
2763         void *hdr;
2764         struct nlattr *attrs;
2765
2766         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2767                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2768         if (!hdr)
2769                 goto msg_cancel;
2770
2771         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2772         if (!attrs)
2773                 goto genlmsg_cancel;
2774
2775         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2776                 goto attr_msg_cancel;
2777         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2778                 goto attr_msg_cancel;
2779         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2780                 goto attr_msg_cancel;
2781         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2782                 goto attr_msg_cancel;
2783
2784         nla_nest_end(skb, attrs);
2785         genlmsg_end(skb, hdr);
2786
2787         return 0;
2788
2789 attr_msg_cancel:
2790         nla_nest_cancel(skb, attrs);
2791 genlmsg_cancel:
2792         genlmsg_cancel(skb, hdr);
2793 msg_cancel:
2794         return -EMSGSIZE;
2795 }
2796
2797 /* Caller should hold socket lock for the passed tipc socket. */
2798 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2799                                   struct netlink_callback *cb,
2800                                   struct tipc_sock *tsk, u32 *last_publ)
2801 {
2802         int err;
2803         struct publication *p;
2804
2805         if (*last_publ) {
2806                 list_for_each_entry(p, &tsk->publications, pport_list) {
2807                         if (p->key == *last_publ)
2808                                 break;
2809                 }
2810                 if (p->key != *last_publ) {
2811                         /* We never set seq or call nl_dump_check_consistent()
2812                          * this means that setting prev_seq here will cause the
2813                          * consistence check to fail in the netlink callback
2814                          * handler. Resulting in the last NLMSG_DONE message
2815                          * having the NLM_F_DUMP_INTR flag set.
2816                          */
2817                         cb->prev_seq = 1;
2818                         *last_publ = 0;
2819                         return -EPIPE;
2820                 }
2821         } else {
2822                 p = list_first_entry(&tsk->publications, struct publication,
2823                                      pport_list);
2824         }
2825
2826         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2827                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2828                 if (err) {
2829                         *last_publ = p->key;
2830                         return err;
2831                 }
2832         }
2833         *last_publ = 0;
2834
2835         return 0;
2836 }
2837
2838 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2839 {
2840         int err;
2841         u32 tsk_portid = cb->args[0];
2842         u32 last_publ = cb->args[1];
2843         u32 done = cb->args[2];
2844         struct net *net = sock_net(skb->sk);
2845         struct tipc_sock *tsk;
2846
2847         if (!tsk_portid) {
2848                 struct nlattr **attrs;
2849                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2850
2851                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2852                 if (err)
2853                         return err;
2854
2855                 if (!attrs[TIPC_NLA_SOCK])
2856                         return -EINVAL;
2857
2858                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2859                                        attrs[TIPC_NLA_SOCK],
2860                                        tipc_nl_sock_policy);
2861                 if (err)
2862                         return err;
2863
2864                 if (!sock[TIPC_NLA_SOCK_REF])
2865                         return -EINVAL;
2866
2867                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2868         }
2869
2870         if (done)
2871                 return 0;
2872
2873         tsk = tipc_sk_lookup(net, tsk_portid);
2874         if (!tsk)
2875                 return -EINVAL;
2876
2877         lock_sock(&tsk->sk);
2878         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2879         if (!err)
2880                 done = 1;
2881         release_sock(&tsk->sk);
2882         sock_put(&tsk->sk);
2883
2884         cb->args[0] = tsk_portid;
2885         cb->args[1] = last_publ;
2886         cb->args[2] = done;
2887
2888         return skb->len;
2889 }