GNU Linux-libre 4.4.284-gnu1
[releases.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(device, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:     DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252                               bool retry)
253 {
254         struct drbd_device *device = peer_device->device;
255         struct page *page = NULL;
256         struct net_conf *nc;
257         DEFINE_WAIT(wait);
258         unsigned int mxb;
259
260         rcu_read_lock();
261         nc = rcu_dereference(peer_device->connection->net_conf);
262         mxb = nc ? nc->max_buffers : 1000000;
263         rcu_read_unlock();
264
265         if (atomic_read(&device->pp_in_use) < mxb)
266                 page = __drbd_alloc_pages(device, number);
267
268         while (page == NULL) {
269                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271                 drbd_kick_lo_and_reclaim_net(device);
272
273                 if (atomic_read(&device->pp_in_use) < mxb) {
274                         page = __drbd_alloc_pages(device, number);
275                         if (page)
276                                 break;
277                 }
278
279                 if (!retry)
280                         break;
281
282                 if (signal_pending(current)) {
283                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284                         break;
285                 }
286
287                 if (schedule_timeout(HZ/10) == 0)
288                         mxb = UINT_MAX;
289         }
290         finish_wait(&drbd_pp_wait, &wait);
291
292         if (page)
293                 atomic_add(number, &device->pp_in_use);
294         return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304         int i;
305
306         if (page == NULL)
307                 return;
308
309         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310                 i = page_chain_free(page);
311         else {
312                 struct page *tmp;
313                 tmp = page_chain_tail(page, &i);
314                 spin_lock(&drbd_pp_lock);
315                 page_chain_add(&drbd_pp_pool, page, tmp);
316                 drbd_pp_vacant += i;
317                 spin_unlock(&drbd_pp_lock);
318         }
319         i = atomic_sub_return(i, a);
320         if (i < 0)
321                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323         wake_up(&drbd_pp_wait);
324 }
325
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342                     unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344         struct drbd_device *device = peer_device->device;
345         struct drbd_peer_request *peer_req;
346         struct page *page = NULL;
347         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350                 return NULL;
351
352         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353         if (!peer_req) {
354                 if (!(gfp_mask & __GFP_NOWARN))
355                         drbd_err(device, "%s: allocation failed\n", __func__);
356                 return NULL;
357         }
358
359         if (has_payload && data_size) {
360                 page = drbd_alloc_pages(peer_device, nr_pages,
361                                         gfpflags_allow_blocking(gfp_mask));
362                 if (!page)
363                         goto fail;
364         }
365
366         memset(peer_req, 0, sizeof(*peer_req));
367         INIT_LIST_HEAD(&peer_req->w.list);
368         drbd_clear_interval(&peer_req->i);
369         peer_req->i.size = data_size;
370         peer_req->i.sector = sector;
371         peer_req->submit_jif = jiffies;
372         peer_req->peer_device = peer_device;
373         peer_req->pages = page;
374         /*
375          * The block_id is opaque to the receiver.  It is not endianness
376          * converted, and sent back to the sender unchanged.
377          */
378         peer_req->block_id = id;
379
380         return peer_req;
381
382  fail:
383         mempool_free(peer_req, drbd_ee_mempool);
384         return NULL;
385 }
386
387 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
388                        int is_net)
389 {
390         might_sleep();
391         if (peer_req->flags & EE_HAS_DIGEST)
392                 kfree(peer_req->digest);
393         drbd_free_pages(device, peer_req->pages, is_net);
394         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
395         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
396         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
397                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
398                 drbd_al_complete_io(device, &peer_req->i);
399         }
400         mempool_free(peer_req, drbd_ee_mempool);
401 }
402
403 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
404 {
405         LIST_HEAD(work_list);
406         struct drbd_peer_request *peer_req, *t;
407         int count = 0;
408         int is_net = list == &device->net_ee;
409
410         spin_lock_irq(&device->resource->req_lock);
411         list_splice_init(list, &work_list);
412         spin_unlock_irq(&device->resource->req_lock);
413
414         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
415                 __drbd_free_peer_req(device, peer_req, is_net);
416                 count++;
417         }
418         return count;
419 }
420
421 /*
422  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
423  */
424 static int drbd_finish_peer_reqs(struct drbd_device *device)
425 {
426         LIST_HEAD(work_list);
427         LIST_HEAD(reclaimed);
428         struct drbd_peer_request *peer_req, *t;
429         int err = 0;
430
431         spin_lock_irq(&device->resource->req_lock);
432         reclaim_finished_net_peer_reqs(device, &reclaimed);
433         list_splice_init(&device->done_ee, &work_list);
434         spin_unlock_irq(&device->resource->req_lock);
435
436         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
437                 drbd_free_net_peer_req(device, peer_req);
438
439         /* possible callbacks here:
440          * e_end_block, and e_end_resync_block, e_send_superseded.
441          * all ignore the last argument.
442          */
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 int err2;
445
446                 /* list_del not necessary, next/prev members not touched */
447                 err2 = peer_req->w.cb(&peer_req->w, !!err);
448                 if (!err)
449                         err = err2;
450                 drbd_free_peer_req(device, peer_req);
451         }
452         wake_up(&device->ee_wait);
453
454         return err;
455 }
456
457 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
458                                      struct list_head *head)
459 {
460         DEFINE_WAIT(wait);
461
462         /* avoids spin_lock/unlock
463          * and calling prepare_to_wait in the fast path */
464         while (!list_empty(head)) {
465                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
466                 spin_unlock_irq(&device->resource->req_lock);
467                 io_schedule();
468                 finish_wait(&device->ee_wait, &wait);
469                 spin_lock_irq(&device->resource->req_lock);
470         }
471 }
472
473 static void drbd_wait_ee_list_empty(struct drbd_device *device,
474                                     struct list_head *head)
475 {
476         spin_lock_irq(&device->resource->req_lock);
477         _drbd_wait_ee_list_empty(device, head);
478         spin_unlock_irq(&device->resource->req_lock);
479 }
480
481 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
482 {
483         struct kvec iov = {
484                 .iov_base = buf,
485                 .iov_len = size,
486         };
487         struct msghdr msg = {
488                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
489         };
490         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
491 }
492
493 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
494 {
495         int rv;
496
497         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
498
499         if (rv < 0) {
500                 if (rv == -ECONNRESET)
501                         drbd_info(connection, "sock was reset by peer\n");
502                 else if (rv != -ERESTARTSYS)
503                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
504         } else if (rv == 0) {
505                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
506                         long t;
507                         rcu_read_lock();
508                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
509                         rcu_read_unlock();
510
511                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
512
513                         if (t)
514                                 goto out;
515                 }
516                 drbd_info(connection, "sock was shut down by peer\n");
517         }
518
519         if (rv != size)
520                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
521
522 out:
523         return rv;
524 }
525
526 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
527 {
528         int err;
529
530         err = drbd_recv(connection, buf, size);
531         if (err != size) {
532                 if (err >= 0)
533                         err = -EIO;
534         } else
535                 err = 0;
536         return err;
537 }
538
539 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
540 {
541         int err;
542
543         err = drbd_recv_all(connection, buf, size);
544         if (err && !signal_pending(current))
545                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
546         return err;
547 }
548
549 /* quoting tcp(7):
550  *   On individual connections, the socket buffer size must be set prior to the
551  *   listen(2) or connect(2) calls in order to have it take effect.
552  * This is our wrapper to do so.
553  */
554 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
555                 unsigned int rcv)
556 {
557         /* open coded SO_SNDBUF, SO_RCVBUF */
558         if (snd) {
559                 sock->sk->sk_sndbuf = snd;
560                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
561         }
562         if (rcv) {
563                 sock->sk->sk_rcvbuf = rcv;
564                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
565         }
566 }
567
568 static struct socket *drbd_try_connect(struct drbd_connection *connection)
569 {
570         const char *what;
571         struct socket *sock;
572         struct sockaddr_in6 src_in6;
573         struct sockaddr_in6 peer_in6;
574         struct net_conf *nc;
575         int err, peer_addr_len, my_addr_len;
576         int sndbuf_size, rcvbuf_size, connect_int;
577         int disconnect_on_error = 1;
578
579         rcu_read_lock();
580         nc = rcu_dereference(connection->net_conf);
581         if (!nc) {
582                 rcu_read_unlock();
583                 return NULL;
584         }
585         sndbuf_size = nc->sndbuf_size;
586         rcvbuf_size = nc->rcvbuf_size;
587         connect_int = nc->connect_int;
588         rcu_read_unlock();
589
590         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
591         memcpy(&src_in6, &connection->my_addr, my_addr_len);
592
593         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
594                 src_in6.sin6_port = 0;
595         else
596                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
597
598         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
599         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
600
601         what = "sock_create_kern";
602         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
603                                SOCK_STREAM, IPPROTO_TCP, &sock);
604         if (err < 0) {
605                 sock = NULL;
606                 goto out;
607         }
608
609         sock->sk->sk_rcvtimeo =
610         sock->sk->sk_sndtimeo = connect_int * HZ;
611         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
612
613        /* explicitly bind to the configured IP as source IP
614         *  for the outgoing connections.
615         *  This is needed for multihomed hosts and to be
616         *  able to use lo: interfaces for drbd.
617         * Make sure to use 0 as port number, so linux selects
618         *  a free one dynamically.
619         */
620         what = "bind before connect";
621         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
622         if (err < 0)
623                 goto out;
624
625         /* connect may fail, peer not yet available.
626          * stay C_WF_CONNECTION, don't go Disconnecting! */
627         disconnect_on_error = 0;
628         what = "connect";
629         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
630
631 out:
632         if (err < 0) {
633                 if (sock) {
634                         sock_release(sock);
635                         sock = NULL;
636                 }
637                 switch (-err) {
638                         /* timeout, busy, signal pending */
639                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640                 case EINTR: case ERESTARTSYS:
641                         /* peer not (yet) available, network problem */
642                 case ECONNREFUSED: case ENETUNREACH:
643                 case EHOSTDOWN:    case EHOSTUNREACH:
644                         disconnect_on_error = 0;
645                         break;
646                 default:
647                         drbd_err(connection, "%s failed, err = %d\n", what, err);
648                 }
649                 if (disconnect_on_error)
650                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
651         }
652
653         return sock;
654 }
655
656 struct accept_wait_data {
657         struct drbd_connection *connection;
658         struct socket *s_listen;
659         struct completion door_bell;
660         void (*original_sk_state_change)(struct sock *sk);
661
662 };
663
664 static void drbd_incoming_connection(struct sock *sk)
665 {
666         struct accept_wait_data *ad = sk->sk_user_data;
667         void (*state_change)(struct sock *sk);
668
669         state_change = ad->original_sk_state_change;
670         if (sk->sk_state == TCP_ESTABLISHED)
671                 complete(&ad->door_bell);
672         state_change(sk);
673 }
674
675 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
676 {
677         int err, sndbuf_size, rcvbuf_size, my_addr_len;
678         struct sockaddr_in6 my_addr;
679         struct socket *s_listen;
680         struct net_conf *nc;
681         const char *what;
682
683         rcu_read_lock();
684         nc = rcu_dereference(connection->net_conf);
685         if (!nc) {
686                 rcu_read_unlock();
687                 return -EIO;
688         }
689         sndbuf_size = nc->sndbuf_size;
690         rcvbuf_size = nc->rcvbuf_size;
691         rcu_read_unlock();
692
693         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
694         memcpy(&my_addr, &connection->my_addr, my_addr_len);
695
696         what = "sock_create_kern";
697         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
698                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
699         if (err) {
700                 s_listen = NULL;
701                 goto out;
702         }
703
704         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
705         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
706
707         what = "bind before listen";
708         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
709         if (err < 0)
710                 goto out;
711
712         ad->s_listen = s_listen;
713         write_lock_bh(&s_listen->sk->sk_callback_lock);
714         ad->original_sk_state_change = s_listen->sk->sk_state_change;
715         s_listen->sk->sk_state_change = drbd_incoming_connection;
716         s_listen->sk->sk_user_data = ad;
717         write_unlock_bh(&s_listen->sk->sk_callback_lock);
718
719         what = "listen";
720         err = s_listen->ops->listen(s_listen, 5);
721         if (err < 0)
722                 goto out;
723
724         return 0;
725 out:
726         if (s_listen)
727                 sock_release(s_listen);
728         if (err < 0) {
729                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
730                         drbd_err(connection, "%s failed, err = %d\n", what, err);
731                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
732                 }
733         }
734
735         return -EIO;
736 }
737
738 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
739 {
740         write_lock_bh(&sk->sk_callback_lock);
741         sk->sk_state_change = ad->original_sk_state_change;
742         sk->sk_user_data = NULL;
743         write_unlock_bh(&sk->sk_callback_lock);
744 }
745
746 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
747 {
748         int timeo, connect_int, err = 0;
749         struct socket *s_estab = NULL;
750         struct net_conf *nc;
751
752         rcu_read_lock();
753         nc = rcu_dereference(connection->net_conf);
754         if (!nc) {
755                 rcu_read_unlock();
756                 return NULL;
757         }
758         connect_int = nc->connect_int;
759         rcu_read_unlock();
760
761         timeo = connect_int * HZ;
762         /* 28.5% random jitter */
763         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
764
765         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
766         if (err <= 0)
767                 return NULL;
768
769         err = kernel_accept(ad->s_listen, &s_estab, 0);
770         if (err < 0) {
771                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
772                         drbd_err(connection, "accept failed, err = %d\n", err);
773                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
774                 }
775         }
776
777         if (s_estab)
778                 unregister_state_change(s_estab->sk, ad);
779
780         return s_estab;
781 }
782
783 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
784
785 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
786                              enum drbd_packet cmd)
787 {
788         if (!conn_prepare_command(connection, sock))
789                 return -EIO;
790         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
791 }
792
793 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
794 {
795         unsigned int header_size = drbd_header_size(connection);
796         struct packet_info pi;
797         struct net_conf *nc;
798         int err;
799
800         rcu_read_lock();
801         nc = rcu_dereference(connection->net_conf);
802         if (!nc) {
803                 rcu_read_unlock();
804                 return -EIO;
805         }
806         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
807         rcu_read_unlock();
808
809         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
810         if (err != header_size) {
811                 if (err >= 0)
812                         err = -EIO;
813                 return err;
814         }
815         err = decode_header(connection, connection->data.rbuf, &pi);
816         if (err)
817                 return err;
818         return pi.cmd;
819 }
820
821 /**
822  * drbd_socket_okay() - Free the socket if its connection is not okay
823  * @sock:       pointer to the pointer to the socket.
824  */
825 static bool drbd_socket_okay(struct socket **sock)
826 {
827         int rr;
828         char tb[4];
829
830         if (!*sock)
831                 return false;
832
833         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
834
835         if (rr > 0 || rr == -EAGAIN) {
836                 return true;
837         } else {
838                 sock_release(*sock);
839                 *sock = NULL;
840                 return false;
841         }
842 }
843
844 static bool connection_established(struct drbd_connection *connection,
845                                    struct socket **sock1,
846                                    struct socket **sock2)
847 {
848         struct net_conf *nc;
849         int timeout;
850         bool ok;
851
852         if (!*sock1 || !*sock2)
853                 return false;
854
855         rcu_read_lock();
856         nc = rcu_dereference(connection->net_conf);
857         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
858         rcu_read_unlock();
859         schedule_timeout_interruptible(timeout);
860
861         ok = drbd_socket_okay(sock1);
862         ok = drbd_socket_okay(sock2) && ok;
863
864         return ok;
865 }
866
867 /* Gets called if a connection is established, or if a new minor gets created
868    in a connection */
869 int drbd_connected(struct drbd_peer_device *peer_device)
870 {
871         struct drbd_device *device = peer_device->device;
872         int err;
873
874         atomic_set(&device->packet_seq, 0);
875         device->peer_seq = 0;
876
877         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
878                 &peer_device->connection->cstate_mutex :
879                 &device->own_state_mutex;
880
881         err = drbd_send_sync_param(peer_device);
882         if (!err)
883                 err = drbd_send_sizes(peer_device, 0, 0);
884         if (!err)
885                 err = drbd_send_uuids(peer_device);
886         if (!err)
887                 err = drbd_send_current_state(peer_device);
888         clear_bit(USE_DEGR_WFC_T, &device->flags);
889         clear_bit(RESIZE_PENDING, &device->flags);
890         atomic_set(&device->ap_in_flight, 0);
891         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
892         return err;
893 }
894
895 /*
896  * return values:
897  *   1 yes, we have a valid connection
898  *   0 oops, did not work out, please try again
899  *  -1 peer talks different language,
900  *     no point in trying again, please go standalone.
901  *  -2 We do not have a network config...
902  */
903 static int conn_connect(struct drbd_connection *connection)
904 {
905         struct drbd_socket sock, msock;
906         struct drbd_peer_device *peer_device;
907         struct net_conf *nc;
908         int vnr, timeout, h;
909         bool discard_my_data, ok;
910         enum drbd_state_rv rv;
911         struct accept_wait_data ad = {
912                 .connection = connection,
913                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
914         };
915
916         clear_bit(DISCONNECT_SENT, &connection->flags);
917         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
918                 return -2;
919
920         mutex_init(&sock.mutex);
921         sock.sbuf = connection->data.sbuf;
922         sock.rbuf = connection->data.rbuf;
923         sock.socket = NULL;
924         mutex_init(&msock.mutex);
925         msock.sbuf = connection->meta.sbuf;
926         msock.rbuf = connection->meta.rbuf;
927         msock.socket = NULL;
928
929         /* Assume that the peer only understands protocol 80 until we know better.  */
930         connection->agreed_pro_version = 80;
931
932         if (prepare_listen_socket(connection, &ad))
933                 return 0;
934
935         do {
936                 struct socket *s;
937
938                 s = drbd_try_connect(connection);
939                 if (s) {
940                         if (!sock.socket) {
941                                 sock.socket = s;
942                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
943                         } else if (!msock.socket) {
944                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
945                                 msock.socket = s;
946                                 send_first_packet(connection, &msock, P_INITIAL_META);
947                         } else {
948                                 drbd_err(connection, "Logic error in conn_connect()\n");
949                                 goto out_release_sockets;
950                         }
951                 }
952
953                 if (connection_established(connection, &sock.socket, &msock.socket))
954                         break;
955
956 retry:
957                 s = drbd_wait_for_connect(connection, &ad);
958                 if (s) {
959                         int fp = receive_first_packet(connection, s);
960                         drbd_socket_okay(&sock.socket);
961                         drbd_socket_okay(&msock.socket);
962                         switch (fp) {
963                         case P_INITIAL_DATA:
964                                 if (sock.socket) {
965                                         drbd_warn(connection, "initial packet S crossed\n");
966                                         sock_release(sock.socket);
967                                         sock.socket = s;
968                                         goto randomize;
969                                 }
970                                 sock.socket = s;
971                                 break;
972                         case P_INITIAL_META:
973                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
974                                 if (msock.socket) {
975                                         drbd_warn(connection, "initial packet M crossed\n");
976                                         sock_release(msock.socket);
977                                         msock.socket = s;
978                                         goto randomize;
979                                 }
980                                 msock.socket = s;
981                                 break;
982                         default:
983                                 drbd_warn(connection, "Error receiving initial packet\n");
984                                 sock_release(s);
985 randomize:
986                                 if (prandom_u32() & 1)
987                                         goto retry;
988                         }
989                 }
990
991                 if (connection->cstate <= C_DISCONNECTING)
992                         goto out_release_sockets;
993                 if (signal_pending(current)) {
994                         flush_signals(current);
995                         smp_rmb();
996                         if (get_t_state(&connection->receiver) == EXITING)
997                                 goto out_release_sockets;
998                 }
999
1000                 ok = connection_established(connection, &sock.socket, &msock.socket);
1001         } while (!ok);
1002
1003         if (ad.s_listen)
1004                 sock_release(ad.s_listen);
1005
1006         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1008
1009         sock.socket->sk->sk_allocation = GFP_NOIO;
1010         msock.socket->sk->sk_allocation = GFP_NOIO;
1011
1012         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1013         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1014
1015         /* NOT YET ...
1016          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1017          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1018          * first set it to the P_CONNECTION_FEATURES timeout,
1019          * which we set to 4x the configured ping_timeout. */
1020         rcu_read_lock();
1021         nc = rcu_dereference(connection->net_conf);
1022
1023         sock.socket->sk->sk_sndtimeo =
1024         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1025
1026         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1027         timeout = nc->timeout * HZ / 10;
1028         discard_my_data = nc->discard_my_data;
1029         rcu_read_unlock();
1030
1031         msock.socket->sk->sk_sndtimeo = timeout;
1032
1033         /* we don't want delays.
1034          * we use TCP_CORK where appropriate, though */
1035         drbd_tcp_nodelay(sock.socket);
1036         drbd_tcp_nodelay(msock.socket);
1037
1038         connection->data.socket = sock.socket;
1039         connection->meta.socket = msock.socket;
1040         connection->last_received = jiffies;
1041
1042         h = drbd_do_features(connection);
1043         if (h <= 0)
1044                 return h;
1045
1046         if (connection->cram_hmac_tfm) {
1047                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1048                 switch (drbd_do_auth(connection)) {
1049                 case -1:
1050                         drbd_err(connection, "Authentication of peer failed\n");
1051                         return -1;
1052                 case 0:
1053                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1054                         return 0;
1055                 }
1056         }
1057
1058         connection->data.socket->sk->sk_sndtimeo = timeout;
1059         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1060
1061         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1062                 return -1;
1063
1064         /* Prevent a race between resync-handshake and
1065          * being promoted to Primary.
1066          *
1067          * Grab and release the state mutex, so we know that any current
1068          * drbd_set_role() is finished, and any incoming drbd_set_role
1069          * will see the STATE_SENT flag, and wait for it to be cleared.
1070          */
1071         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1072                 mutex_lock(peer_device->device->state_mutex);
1073
1074         set_bit(STATE_SENT, &connection->flags);
1075
1076         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1077                 mutex_unlock(peer_device->device->state_mutex);
1078
1079         rcu_read_lock();
1080         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1081                 struct drbd_device *device = peer_device->device;
1082                 kref_get(&device->kref);
1083                 rcu_read_unlock();
1084
1085                 if (discard_my_data)
1086                         set_bit(DISCARD_MY_DATA, &device->flags);
1087                 else
1088                         clear_bit(DISCARD_MY_DATA, &device->flags);
1089
1090                 drbd_connected(peer_device);
1091                 kref_put(&device->kref, drbd_destroy_device);
1092                 rcu_read_lock();
1093         }
1094         rcu_read_unlock();
1095
1096         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1097         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1098                 clear_bit(STATE_SENT, &connection->flags);
1099                 return 0;
1100         }
1101
1102         drbd_thread_start(&connection->asender);
1103
1104         mutex_lock(&connection->resource->conf_update);
1105         /* The discard_my_data flag is a single-shot modifier to the next
1106          * connection attempt, the handshake of which is now well underway.
1107          * No need for rcu style copying of the whole struct
1108          * just to clear a single value. */
1109         connection->net_conf->discard_my_data = 0;
1110         mutex_unlock(&connection->resource->conf_update);
1111
1112         return h;
1113
1114 out_release_sockets:
1115         if (ad.s_listen)
1116                 sock_release(ad.s_listen);
1117         if (sock.socket)
1118                 sock_release(sock.socket);
1119         if (msock.socket)
1120                 sock_release(msock.socket);
1121         return -1;
1122 }
1123
1124 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1125 {
1126         unsigned int header_size = drbd_header_size(connection);
1127
1128         if (header_size == sizeof(struct p_header100) &&
1129             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1130                 struct p_header100 *h = header;
1131                 if (h->pad != 0) {
1132                         drbd_err(connection, "Header padding is not zero\n");
1133                         return -EINVAL;
1134                 }
1135                 pi->vnr = be16_to_cpu(h->volume);
1136                 pi->cmd = be16_to_cpu(h->command);
1137                 pi->size = be32_to_cpu(h->length);
1138         } else if (header_size == sizeof(struct p_header95) &&
1139                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1140                 struct p_header95 *h = header;
1141                 pi->cmd = be16_to_cpu(h->command);
1142                 pi->size = be32_to_cpu(h->length);
1143                 pi->vnr = 0;
1144         } else if (header_size == sizeof(struct p_header80) &&
1145                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1146                 struct p_header80 *h = header;
1147                 pi->cmd = be16_to_cpu(h->command);
1148                 pi->size = be16_to_cpu(h->length);
1149                 pi->vnr = 0;
1150         } else {
1151                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1152                          be32_to_cpu(*(__be32 *)header),
1153                          connection->agreed_pro_version);
1154                 return -EINVAL;
1155         }
1156         pi->data = header + header_size;
1157         return 0;
1158 }
1159
1160 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1161 {
1162         void *buffer = connection->data.rbuf;
1163         int err;
1164
1165         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1166         if (err)
1167                 return err;
1168
1169         err = decode_header(connection, buffer, pi);
1170         connection->last_received = jiffies;
1171
1172         return err;
1173 }
1174
1175 static void drbd_flush(struct drbd_connection *connection)
1176 {
1177         int rv;
1178         struct drbd_peer_device *peer_device;
1179         int vnr;
1180
1181         if (connection->resource->write_ordering >= WO_bdev_flush) {
1182                 rcu_read_lock();
1183                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1184                         struct drbd_device *device = peer_device->device;
1185
1186                         if (!get_ldev(device))
1187                                 continue;
1188                         kref_get(&device->kref);
1189                         rcu_read_unlock();
1190
1191                         /* Right now, we have only this one synchronous code path
1192                          * for flushes between request epochs.
1193                          * We may want to make those asynchronous,
1194                          * or at least parallelize the flushes to the volume devices.
1195                          */
1196                         device->flush_jif = jiffies;
1197                         set_bit(FLUSH_PENDING, &device->flags);
1198                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1199                                         GFP_NOIO, NULL);
1200                         clear_bit(FLUSH_PENDING, &device->flags);
1201                         if (rv) {
1202                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1203                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1204                                  * don't try again for ANY return value != 0
1205                                  * if (rv == -EOPNOTSUPP) */
1206                                 drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1207                         }
1208                         put_ldev(device);
1209                         kref_put(&device->kref, drbd_destroy_device);
1210
1211                         rcu_read_lock();
1212                         if (rv)
1213                                 break;
1214                 }
1215                 rcu_read_unlock();
1216         }
1217 }
1218
1219 /**
1220  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1221  * @device:     DRBD device.
1222  * @epoch:      Epoch object.
1223  * @ev:         Epoch event.
1224  */
1225 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1226                                                struct drbd_epoch *epoch,
1227                                                enum epoch_event ev)
1228 {
1229         int epoch_size;
1230         struct drbd_epoch *next_epoch;
1231         enum finish_epoch rv = FE_STILL_LIVE;
1232
1233         spin_lock(&connection->epoch_lock);
1234         do {
1235                 next_epoch = NULL;
1236
1237                 epoch_size = atomic_read(&epoch->epoch_size);
1238
1239                 switch (ev & ~EV_CLEANUP) {
1240                 case EV_PUT:
1241                         atomic_dec(&epoch->active);
1242                         break;
1243                 case EV_GOT_BARRIER_NR:
1244                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1245                         break;
1246                 case EV_BECAME_LAST:
1247                         /* nothing to do*/
1248                         break;
1249                 }
1250
1251                 if (epoch_size != 0 &&
1252                     atomic_read(&epoch->active) == 0 &&
1253                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1254                         if (!(ev & EV_CLEANUP)) {
1255                                 spin_unlock(&connection->epoch_lock);
1256                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1257                                 spin_lock(&connection->epoch_lock);
1258                         }
1259 #if 0
1260                         /* FIXME: dec unacked on connection, once we have
1261                          * something to count pending connection packets in. */
1262                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1263                                 dec_unacked(epoch->connection);
1264 #endif
1265
1266                         if (connection->current_epoch != epoch) {
1267                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1268                                 list_del(&epoch->list);
1269                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1270                                 connection->epochs--;
1271                                 kfree(epoch);
1272
1273                                 if (rv == FE_STILL_LIVE)
1274                                         rv = FE_DESTROYED;
1275                         } else {
1276                                 epoch->flags = 0;
1277                                 atomic_set(&epoch->epoch_size, 0);
1278                                 /* atomic_set(&epoch->active, 0); is already zero */
1279                                 if (rv == FE_STILL_LIVE)
1280                                         rv = FE_RECYCLED;
1281                         }
1282                 }
1283
1284                 if (!next_epoch)
1285                         break;
1286
1287                 epoch = next_epoch;
1288         } while (1);
1289
1290         spin_unlock(&connection->epoch_lock);
1291
1292         return rv;
1293 }
1294
1295 static enum write_ordering_e
1296 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1297 {
1298         struct disk_conf *dc;
1299
1300         dc = rcu_dereference(bdev->disk_conf);
1301
1302         if (wo == WO_bdev_flush && !dc->disk_flushes)
1303                 wo = WO_drain_io;
1304         if (wo == WO_drain_io && !dc->disk_drain)
1305                 wo = WO_none;
1306
1307         return wo;
1308 }
1309
1310 /**
1311  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1312  * @connection: DRBD connection.
1313  * @wo:         Write ordering method to try.
1314  */
1315 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1316                               enum write_ordering_e wo)
1317 {
1318         struct drbd_device *device;
1319         enum write_ordering_e pwo;
1320         int vnr;
1321         static char *write_ordering_str[] = {
1322                 [WO_none] = "none",
1323                 [WO_drain_io] = "drain",
1324                 [WO_bdev_flush] = "flush",
1325         };
1326
1327         pwo = resource->write_ordering;
1328         if (wo != WO_bdev_flush)
1329                 wo = min(pwo, wo);
1330         rcu_read_lock();
1331         idr_for_each_entry(&resource->devices, device, vnr) {
1332                 if (get_ldev(device)) {
1333                         wo = max_allowed_wo(device->ldev, wo);
1334                         if (device->ldev == bdev)
1335                                 bdev = NULL;
1336                         put_ldev(device);
1337                 }
1338         }
1339
1340         if (bdev)
1341                 wo = max_allowed_wo(bdev, wo);
1342
1343         rcu_read_unlock();
1344
1345         resource->write_ordering = wo;
1346         if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1347                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1348 }
1349
1350 /**
1351  * drbd_submit_peer_request()
1352  * @device:     DRBD device.
1353  * @peer_req:   peer request
1354  * @rw:         flag field, see bio->bi_rw
1355  *
1356  * May spread the pages to multiple bios,
1357  * depending on bio_add_page restrictions.
1358  *
1359  * Returns 0 if all bios have been submitted,
1360  * -ENOMEM if we could not allocate enough bios,
1361  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1362  *  single page to an empty bio (which should never happen and likely indicates
1363  *  that the lower level IO stack is in some way broken). This has been observed
1364  *  on certain Xen deployments.
1365  */
1366 /* TODO allocate from our own bio_set. */
1367 int drbd_submit_peer_request(struct drbd_device *device,
1368                              struct drbd_peer_request *peer_req,
1369                              const unsigned rw, const int fault_type)
1370 {
1371         struct bio *bios = NULL;
1372         struct bio *bio;
1373         struct page *page = peer_req->pages;
1374         sector_t sector = peer_req->i.sector;
1375         unsigned data_size = peer_req->i.size;
1376         unsigned n_bios = 0;
1377         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1378         int err = -ENOMEM;
1379
1380         if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1381                 /* wait for all pending IO completions, before we start
1382                  * zeroing things out. */
1383                 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1384                 /* add it to the active list now,
1385                  * so we can find it to present it in debugfs */
1386                 peer_req->submit_jif = jiffies;
1387                 peer_req->flags |= EE_SUBMITTED;
1388                 spin_lock_irq(&device->resource->req_lock);
1389                 list_add_tail(&peer_req->w.list, &device->active_ee);
1390                 spin_unlock_irq(&device->resource->req_lock);
1391                 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1392                         sector, data_size >> 9, GFP_NOIO, false))
1393                         peer_req->flags |= EE_WAS_ERROR;
1394                 drbd_endio_write_sec_final(peer_req);
1395                 return 0;
1396         }
1397
1398         /* Discards don't have any payload.
1399          * But the scsi layer still expects a bio_vec it can use internally,
1400          * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1401         if (peer_req->flags & EE_IS_TRIM)
1402                 nr_pages = 1;
1403
1404         /* In most cases, we will only need one bio.  But in case the lower
1405          * level restrictions happen to be different at this offset on this
1406          * side than those of the sending peer, we may need to submit the
1407          * request in more than one bio.
1408          *
1409          * Plain bio_alloc is good enough here, this is no DRBD internally
1410          * generated bio, but a bio allocated on behalf of the peer.
1411          */
1412 next_bio:
1413         bio = bio_alloc(GFP_NOIO, nr_pages);
1414         if (!bio) {
1415                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1416                 goto fail;
1417         }
1418         /* > peer_req->i.sector, unless this is the first bio */
1419         bio->bi_iter.bi_sector = sector;
1420         bio->bi_bdev = device->ldev->backing_bdev;
1421         bio->bi_rw = rw;
1422         bio->bi_private = peer_req;
1423         bio->bi_end_io = drbd_peer_request_endio;
1424
1425         bio->bi_next = bios;
1426         bios = bio;
1427         ++n_bios;
1428
1429         if (rw & REQ_DISCARD) {
1430                 bio->bi_iter.bi_size = data_size;
1431                 goto submit;
1432         }
1433
1434         page_chain_for_each(page) {
1435                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1436                 if (!bio_add_page(bio, page, len, 0)) {
1437                         /* A single page must always be possible!
1438                          * But in case it fails anyways,
1439                          * we deal with it, and complain (below). */
1440                         if (bio->bi_vcnt == 0) {
1441                                 drbd_err(device,
1442                                         "bio_add_page failed for len=%u, "
1443                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1444                                         len, (uint64_t)bio->bi_iter.bi_sector);
1445                                 err = -ENOSPC;
1446                                 goto fail;
1447                         }
1448                         goto next_bio;
1449                 }
1450                 data_size -= len;
1451                 sector += len >> 9;
1452                 --nr_pages;
1453         }
1454         D_ASSERT(device, data_size == 0);
1455 submit:
1456         D_ASSERT(device, page == NULL);
1457
1458         atomic_set(&peer_req->pending_bios, n_bios);
1459         /* for debugfs: update timestamp, mark as submitted */
1460         peer_req->submit_jif = jiffies;
1461         peer_req->flags |= EE_SUBMITTED;
1462         do {
1463                 bio = bios;
1464                 bios = bios->bi_next;
1465                 bio->bi_next = NULL;
1466
1467                 drbd_generic_make_request(device, fault_type, bio);
1468         } while (bios);
1469         return 0;
1470
1471 fail:
1472         while (bios) {
1473                 bio = bios;
1474                 bios = bios->bi_next;
1475                 bio_put(bio);
1476         }
1477         return err;
1478 }
1479
1480 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1481                                              struct drbd_peer_request *peer_req)
1482 {
1483         struct drbd_interval *i = &peer_req->i;
1484
1485         drbd_remove_interval(&device->write_requests, i);
1486         drbd_clear_interval(i);
1487
1488         /* Wake up any processes waiting for this peer request to complete.  */
1489         if (i->waiting)
1490                 wake_up(&device->misc_wait);
1491 }
1492
1493 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1494 {
1495         struct drbd_peer_device *peer_device;
1496         int vnr;
1497
1498         rcu_read_lock();
1499         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1500                 struct drbd_device *device = peer_device->device;
1501
1502                 kref_get(&device->kref);
1503                 rcu_read_unlock();
1504                 drbd_wait_ee_list_empty(device, &device->active_ee);
1505                 kref_put(&device->kref, drbd_destroy_device);
1506                 rcu_read_lock();
1507         }
1508         rcu_read_unlock();
1509 }
1510
1511 static struct drbd_peer_device *
1512 conn_peer_device(struct drbd_connection *connection, int volume_number)
1513 {
1514         return idr_find(&connection->peer_devices, volume_number);
1515 }
1516
1517 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1518 {
1519         int rv;
1520         struct p_barrier *p = pi->data;
1521         struct drbd_epoch *epoch;
1522
1523         /* FIXME these are unacked on connection,
1524          * not a specific (peer)device.
1525          */
1526         connection->current_epoch->barrier_nr = p->barrier;
1527         connection->current_epoch->connection = connection;
1528         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1529
1530         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1531          * the activity log, which means it would not be resynced in case the
1532          * R_PRIMARY crashes now.
1533          * Therefore we must send the barrier_ack after the barrier request was
1534          * completed. */
1535         switch (connection->resource->write_ordering) {
1536         case WO_none:
1537                 if (rv == FE_RECYCLED)
1538                         return 0;
1539
1540                 /* receiver context, in the writeout path of the other node.
1541                  * avoid potential distributed deadlock */
1542                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1543                 if (epoch)
1544                         break;
1545                 else
1546                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1547                         /* Fall through */
1548
1549         case WO_bdev_flush:
1550         case WO_drain_io:
1551                 conn_wait_active_ee_empty(connection);
1552                 drbd_flush(connection);
1553
1554                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1555                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1556                         if (epoch)
1557                                 break;
1558                 }
1559
1560                 return 0;
1561         default:
1562                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1563                          connection->resource->write_ordering);
1564                 return -EIO;
1565         }
1566
1567         epoch->flags = 0;
1568         atomic_set(&epoch->epoch_size, 0);
1569         atomic_set(&epoch->active, 0);
1570
1571         spin_lock(&connection->epoch_lock);
1572         if (atomic_read(&connection->current_epoch->epoch_size)) {
1573                 list_add(&epoch->list, &connection->current_epoch->list);
1574                 connection->current_epoch = epoch;
1575                 connection->epochs++;
1576         } else {
1577                 /* The current_epoch got recycled while we allocated this one... */
1578                 kfree(epoch);
1579         }
1580         spin_unlock(&connection->epoch_lock);
1581
1582         return 0;
1583 }
1584
1585 /* used from receive_RSDataReply (recv_resync_read)
1586  * and from receive_Data */
1587 static struct drbd_peer_request *
1588 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1589               struct packet_info *pi) __must_hold(local)
1590 {
1591         struct drbd_device *device = peer_device->device;
1592         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1593         struct drbd_peer_request *peer_req;
1594         struct page *page;
1595         int digest_size, err;
1596         unsigned int data_size = pi->size, ds;
1597         void *dig_in = peer_device->connection->int_dig_in;
1598         void *dig_vv = peer_device->connection->int_dig_vv;
1599         unsigned long *data;
1600         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1601
1602         digest_size = 0;
1603         if (!trim && peer_device->connection->peer_integrity_tfm) {
1604                 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1605                 /*
1606                  * FIXME: Receive the incoming digest into the receive buffer
1607                  *        here, together with its struct p_data?
1608                  */
1609                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1610                 if (err)
1611                         return NULL;
1612                 data_size -= digest_size;
1613         }
1614
1615         if (trim) {
1616                 D_ASSERT(peer_device, data_size == 0);
1617                 data_size = be32_to_cpu(trim->size);
1618         }
1619
1620         if (!expect(IS_ALIGNED(data_size, 512)))
1621                 return NULL;
1622         /* prepare for larger trim requests. */
1623         if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1624                 return NULL;
1625
1626         /* even though we trust out peer,
1627          * we sometimes have to double check. */
1628         if (sector + (data_size>>9) > capacity) {
1629                 drbd_err(device, "request from peer beyond end of local disk: "
1630                         "capacity: %llus < sector: %llus + size: %u\n",
1631                         (unsigned long long)capacity,
1632                         (unsigned long long)sector, data_size);
1633                 return NULL;
1634         }
1635
1636         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1637          * "criss-cross" setup, that might cause write-out on some other DRBD,
1638          * which in turn might block on the other node at this very place.  */
1639         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1640         if (!peer_req)
1641                 return NULL;
1642
1643         peer_req->flags |= EE_WRITE;
1644         if (trim)
1645                 return peer_req;
1646
1647         ds = data_size;
1648         page = peer_req->pages;
1649         page_chain_for_each(page) {
1650                 unsigned len = min_t(int, ds, PAGE_SIZE);
1651                 data = kmap(page);
1652                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1653                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1654                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1655                         data[0] = data[0] ^ (unsigned long)-1;
1656                 }
1657                 kunmap(page);
1658                 if (err) {
1659                         drbd_free_peer_req(device, peer_req);
1660                         return NULL;
1661                 }
1662                 ds -= len;
1663         }
1664
1665         if (digest_size) {
1666                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1667                 if (memcmp(dig_in, dig_vv, digest_size)) {
1668                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1669                                 (unsigned long long)sector, data_size);
1670                         drbd_free_peer_req(device, peer_req);
1671                         return NULL;
1672                 }
1673         }
1674         device->recv_cnt += data_size >> 9;
1675         return peer_req;
1676 }
1677
1678 /* drbd_drain_block() just takes a data block
1679  * out of the socket input buffer, and discards it.
1680  */
1681 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1682 {
1683         struct page *page;
1684         int err = 0;
1685         void *data;
1686
1687         if (!data_size)
1688                 return 0;
1689
1690         page = drbd_alloc_pages(peer_device, 1, 1);
1691
1692         data = kmap(page);
1693         while (data_size) {
1694                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1695
1696                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1697                 if (err)
1698                         break;
1699                 data_size -= len;
1700         }
1701         kunmap(page);
1702         drbd_free_pages(peer_device->device, page, 0);
1703         return err;
1704 }
1705
1706 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1707                            sector_t sector, int data_size)
1708 {
1709         struct bio_vec bvec;
1710         struct bvec_iter iter;
1711         struct bio *bio;
1712         int digest_size, err, expect;
1713         void *dig_in = peer_device->connection->int_dig_in;
1714         void *dig_vv = peer_device->connection->int_dig_vv;
1715
1716         digest_size = 0;
1717         if (peer_device->connection->peer_integrity_tfm) {
1718                 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1719                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1720                 if (err)
1721                         return err;
1722                 data_size -= digest_size;
1723         }
1724
1725         /* optimistically update recv_cnt.  if receiving fails below,
1726          * we disconnect anyways, and counters will be reset. */
1727         peer_device->device->recv_cnt += data_size>>9;
1728
1729         bio = req->master_bio;
1730         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1731
1732         bio_for_each_segment(bvec, bio, iter) {
1733                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1734                 expect = min_t(int, data_size, bvec.bv_len);
1735                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1736                 kunmap(bvec.bv_page);
1737                 if (err)
1738                         return err;
1739                 data_size -= expect;
1740         }
1741
1742         if (digest_size) {
1743                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1744                 if (memcmp(dig_in, dig_vv, digest_size)) {
1745                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1746                         return -EINVAL;
1747                 }
1748         }
1749
1750         D_ASSERT(peer_device->device, data_size == 0);
1751         return 0;
1752 }
1753
1754 /*
1755  * e_end_resync_block() is called in asender context via
1756  * drbd_finish_peer_reqs().
1757  */
1758 static int e_end_resync_block(struct drbd_work *w, int unused)
1759 {
1760         struct drbd_peer_request *peer_req =
1761                 container_of(w, struct drbd_peer_request, w);
1762         struct drbd_peer_device *peer_device = peer_req->peer_device;
1763         struct drbd_device *device = peer_device->device;
1764         sector_t sector = peer_req->i.sector;
1765         int err;
1766
1767         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1768
1769         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1770                 drbd_set_in_sync(device, sector, peer_req->i.size);
1771                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1772         } else {
1773                 /* Record failure to sync */
1774                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1775
1776                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1777         }
1778         dec_unacked(device);
1779
1780         return err;
1781 }
1782
1783 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1784                             struct packet_info *pi) __releases(local)
1785 {
1786         struct drbd_device *device = peer_device->device;
1787         struct drbd_peer_request *peer_req;
1788
1789         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1790         if (!peer_req)
1791                 goto fail;
1792
1793         dec_rs_pending(device);
1794
1795         inc_unacked(device);
1796         /* corresponding dec_unacked() in e_end_resync_block()
1797          * respective _drbd_clear_done_ee */
1798
1799         peer_req->w.cb = e_end_resync_block;
1800         peer_req->submit_jif = jiffies;
1801
1802         spin_lock_irq(&device->resource->req_lock);
1803         list_add_tail(&peer_req->w.list, &device->sync_ee);
1804         spin_unlock_irq(&device->resource->req_lock);
1805
1806         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1807         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1808                 return 0;
1809
1810         /* don't care for the reason here */
1811         drbd_err(device, "submit failed, triggering re-connect\n");
1812         spin_lock_irq(&device->resource->req_lock);
1813         list_del(&peer_req->w.list);
1814         spin_unlock_irq(&device->resource->req_lock);
1815
1816         drbd_free_peer_req(device, peer_req);
1817 fail:
1818         put_ldev(device);
1819         return -EIO;
1820 }
1821
1822 static struct drbd_request *
1823 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1824              sector_t sector, bool missing_ok, const char *func)
1825 {
1826         struct drbd_request *req;
1827
1828         /* Request object according to our peer */
1829         req = (struct drbd_request *)(unsigned long)id;
1830         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1831                 return req;
1832         if (!missing_ok) {
1833                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1834                         (unsigned long)id, (unsigned long long)sector);
1835         }
1836         return NULL;
1837 }
1838
1839 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1840 {
1841         struct drbd_peer_device *peer_device;
1842         struct drbd_device *device;
1843         struct drbd_request *req;
1844         sector_t sector;
1845         int err;
1846         struct p_data *p = pi->data;
1847
1848         peer_device = conn_peer_device(connection, pi->vnr);
1849         if (!peer_device)
1850                 return -EIO;
1851         device = peer_device->device;
1852
1853         sector = be64_to_cpu(p->sector);
1854
1855         spin_lock_irq(&device->resource->req_lock);
1856         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1857         spin_unlock_irq(&device->resource->req_lock);
1858         if (unlikely(!req))
1859                 return -EIO;
1860
1861         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1862          * special casing it there for the various failure cases.
1863          * still no race with drbd_fail_pending_reads */
1864         err = recv_dless_read(peer_device, req, sector, pi->size);
1865         if (!err)
1866                 req_mod(req, DATA_RECEIVED);
1867         /* else: nothing. handled from drbd_disconnect...
1868          * I don't think we may complete this just yet
1869          * in case we are "on-disconnect: freeze" */
1870
1871         return err;
1872 }
1873
1874 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1875 {
1876         struct drbd_peer_device *peer_device;
1877         struct drbd_device *device;
1878         sector_t sector;
1879         int err;
1880         struct p_data *p = pi->data;
1881
1882         peer_device = conn_peer_device(connection, pi->vnr);
1883         if (!peer_device)
1884                 return -EIO;
1885         device = peer_device->device;
1886
1887         sector = be64_to_cpu(p->sector);
1888         D_ASSERT(device, p->block_id == ID_SYNCER);
1889
1890         if (get_ldev(device)) {
1891                 /* data is submitted to disk within recv_resync_read.
1892                  * corresponding put_ldev done below on error,
1893                  * or in drbd_peer_request_endio. */
1894                 err = recv_resync_read(peer_device, sector, pi);
1895         } else {
1896                 if (__ratelimit(&drbd_ratelimit_state))
1897                         drbd_err(device, "Can not write resync data to local disk.\n");
1898
1899                 err = drbd_drain_block(peer_device, pi->size);
1900
1901                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1902         }
1903
1904         atomic_add(pi->size >> 9, &device->rs_sect_in);
1905
1906         return err;
1907 }
1908
1909 static void restart_conflicting_writes(struct drbd_device *device,
1910                                        sector_t sector, int size)
1911 {
1912         struct drbd_interval *i;
1913         struct drbd_request *req;
1914
1915         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1916                 if (!i->local)
1917                         continue;
1918                 req = container_of(i, struct drbd_request, i);
1919                 if (req->rq_state & RQ_LOCAL_PENDING ||
1920                     !(req->rq_state & RQ_POSTPONED))
1921                         continue;
1922                 /* as it is RQ_POSTPONED, this will cause it to
1923                  * be queued on the retry workqueue. */
1924                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1925         }
1926 }
1927
1928 /*
1929  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1930  */
1931 static int e_end_block(struct drbd_work *w, int cancel)
1932 {
1933         struct drbd_peer_request *peer_req =
1934                 container_of(w, struct drbd_peer_request, w);
1935         struct drbd_peer_device *peer_device = peer_req->peer_device;
1936         struct drbd_device *device = peer_device->device;
1937         sector_t sector = peer_req->i.sector;
1938         int err = 0, pcmd;
1939
1940         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1941                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1942                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1943                                 device->state.conn <= C_PAUSED_SYNC_T &&
1944                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1945                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1946                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1947                         if (pcmd == P_RS_WRITE_ACK)
1948                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1949                 } else {
1950                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1951                         /* we expect it to be marked out of sync anyways...
1952                          * maybe assert this?  */
1953                 }
1954                 dec_unacked(device);
1955         }
1956
1957         /* we delete from the conflict detection hash _after_ we sent out the
1958          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1959         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1960                 spin_lock_irq(&device->resource->req_lock);
1961                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1962                 drbd_remove_epoch_entry_interval(device, peer_req);
1963                 if (peer_req->flags & EE_RESTART_REQUESTS)
1964                         restart_conflicting_writes(device, sector, peer_req->i.size);
1965                 spin_unlock_irq(&device->resource->req_lock);
1966         } else
1967                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1968
1969         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1970
1971         return err;
1972 }
1973
1974 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1975 {
1976         struct drbd_peer_request *peer_req =
1977                 container_of(w, struct drbd_peer_request, w);
1978         struct drbd_peer_device *peer_device = peer_req->peer_device;
1979         int err;
1980
1981         err = drbd_send_ack(peer_device, ack, peer_req);
1982         dec_unacked(peer_device->device);
1983
1984         return err;
1985 }
1986
1987 static int e_send_superseded(struct drbd_work *w, int unused)
1988 {
1989         return e_send_ack(w, P_SUPERSEDED);
1990 }
1991
1992 static int e_send_retry_write(struct drbd_work *w, int unused)
1993 {
1994         struct drbd_peer_request *peer_req =
1995                 container_of(w, struct drbd_peer_request, w);
1996         struct drbd_connection *connection = peer_req->peer_device->connection;
1997
1998         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1999                              P_RETRY_WRITE : P_SUPERSEDED);
2000 }
2001
2002 static bool seq_greater(u32 a, u32 b)
2003 {
2004         /*
2005          * We assume 32-bit wrap-around here.
2006          * For 24-bit wrap-around, we would have to shift:
2007          *  a <<= 8; b <<= 8;
2008          */
2009         return (s32)a - (s32)b > 0;
2010 }
2011
2012 static u32 seq_max(u32 a, u32 b)
2013 {
2014         return seq_greater(a, b) ? a : b;
2015 }
2016
2017 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2018 {
2019         struct drbd_device *device = peer_device->device;
2020         unsigned int newest_peer_seq;
2021
2022         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2023                 spin_lock(&device->peer_seq_lock);
2024                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2025                 device->peer_seq = newest_peer_seq;
2026                 spin_unlock(&device->peer_seq_lock);
2027                 /* wake up only if we actually changed device->peer_seq */
2028                 if (peer_seq == newest_peer_seq)
2029                         wake_up(&device->seq_wait);
2030         }
2031 }
2032
2033 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2034 {
2035         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2036 }
2037
2038 /* maybe change sync_ee into interval trees as well? */
2039 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2040 {
2041         struct drbd_peer_request *rs_req;
2042         bool rv = 0;
2043
2044         spin_lock_irq(&device->resource->req_lock);
2045         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2046                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2047                              rs_req->i.sector, rs_req->i.size)) {
2048                         rv = 1;
2049                         break;
2050                 }
2051         }
2052         spin_unlock_irq(&device->resource->req_lock);
2053
2054         return rv;
2055 }
2056
2057 /* Called from receive_Data.
2058  * Synchronize packets on sock with packets on msock.
2059  *
2060  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2061  * packet traveling on msock, they are still processed in the order they have
2062  * been sent.
2063  *
2064  * Note: we don't care for Ack packets overtaking P_DATA packets.
2065  *
2066  * In case packet_seq is larger than device->peer_seq number, there are
2067  * outstanding packets on the msock. We wait for them to arrive.
2068  * In case we are the logically next packet, we update device->peer_seq
2069  * ourselves. Correctly handles 32bit wrap around.
2070  *
2071  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2072  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2073  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2074  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2075  *
2076  * returns 0 if we may process the packet,
2077  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2078 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2079 {
2080         struct drbd_device *device = peer_device->device;
2081         DEFINE_WAIT(wait);
2082         long timeout;
2083         int ret = 0, tp;
2084
2085         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2086                 return 0;
2087
2088         spin_lock(&device->peer_seq_lock);
2089         for (;;) {
2090                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2091                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2092                         break;
2093                 }
2094
2095                 if (signal_pending(current)) {
2096                         ret = -ERESTARTSYS;
2097                         break;
2098                 }
2099
2100                 rcu_read_lock();
2101                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2102                 rcu_read_unlock();
2103
2104                 if (!tp)
2105                         break;
2106
2107                 /* Only need to wait if two_primaries is enabled */
2108                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2109                 spin_unlock(&device->peer_seq_lock);
2110                 rcu_read_lock();
2111                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2112                 rcu_read_unlock();
2113                 timeout = schedule_timeout(timeout);
2114                 spin_lock(&device->peer_seq_lock);
2115                 if (!timeout) {
2116                         ret = -ETIMEDOUT;
2117                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2118                         break;
2119                 }
2120         }
2121         spin_unlock(&device->peer_seq_lock);
2122         finish_wait(&device->seq_wait, &wait);
2123         return ret;
2124 }
2125
2126 /* see also bio_flags_to_wire()
2127  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2128  * flags and back. We may replicate to other kernel versions. */
2129 static unsigned long wire_flags_to_bio(u32 dpf)
2130 {
2131         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2132                 (dpf & DP_FUA ? REQ_FUA : 0) |
2133                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2134                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2135 }
2136
2137 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2138                                     unsigned int size)
2139 {
2140         struct drbd_interval *i;
2141
2142     repeat:
2143         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2144                 struct drbd_request *req;
2145                 struct bio_and_error m;
2146
2147                 if (!i->local)
2148                         continue;
2149                 req = container_of(i, struct drbd_request, i);
2150                 if (!(req->rq_state & RQ_POSTPONED))
2151                         continue;
2152                 req->rq_state &= ~RQ_POSTPONED;
2153                 __req_mod(req, NEG_ACKED, &m);
2154                 spin_unlock_irq(&device->resource->req_lock);
2155                 if (m.bio)
2156                         complete_master_bio(device, &m);
2157                 spin_lock_irq(&device->resource->req_lock);
2158                 goto repeat;
2159         }
2160 }
2161
2162 static int handle_write_conflicts(struct drbd_device *device,
2163                                   struct drbd_peer_request *peer_req)
2164 {
2165         struct drbd_connection *connection = peer_req->peer_device->connection;
2166         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2167         sector_t sector = peer_req->i.sector;
2168         const unsigned int size = peer_req->i.size;
2169         struct drbd_interval *i;
2170         bool equal;
2171         int err;
2172
2173         /*
2174          * Inserting the peer request into the write_requests tree will prevent
2175          * new conflicting local requests from being added.
2176          */
2177         drbd_insert_interval(&device->write_requests, &peer_req->i);
2178
2179     repeat:
2180         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2181                 if (i == &peer_req->i)
2182                         continue;
2183                 if (i->completed)
2184                         continue;
2185
2186                 if (!i->local) {
2187                         /*
2188                          * Our peer has sent a conflicting remote request; this
2189                          * should not happen in a two-node setup.  Wait for the
2190                          * earlier peer request to complete.
2191                          */
2192                         err = drbd_wait_misc(device, i);
2193                         if (err)
2194                                 goto out;
2195                         goto repeat;
2196                 }
2197
2198                 equal = i->sector == sector && i->size == size;
2199                 if (resolve_conflicts) {
2200                         /*
2201                          * If the peer request is fully contained within the
2202                          * overlapping request, it can be considered overwritten
2203                          * and thus superseded; otherwise, it will be retried
2204                          * once all overlapping requests have completed.
2205                          */
2206                         bool superseded = i->sector <= sector && i->sector +
2207                                        (i->size >> 9) >= sector + (size >> 9);
2208
2209                         if (!equal)
2210                                 drbd_alert(device, "Concurrent writes detected: "
2211                                                "local=%llus +%u, remote=%llus +%u, "
2212                                                "assuming %s came first\n",
2213                                           (unsigned long long)i->sector, i->size,
2214                                           (unsigned long long)sector, size,
2215                                           superseded ? "local" : "remote");
2216
2217                         peer_req->w.cb = superseded ? e_send_superseded :
2218                                                    e_send_retry_write;
2219                         list_add_tail(&peer_req->w.list, &device->done_ee);
2220                         wake_asender(connection);
2221
2222                         err = -ENOENT;
2223                         goto out;
2224                 } else {
2225                         struct drbd_request *req =
2226                                 container_of(i, struct drbd_request, i);
2227
2228                         if (!equal)
2229                                 drbd_alert(device, "Concurrent writes detected: "
2230                                                "local=%llus +%u, remote=%llus +%u\n",
2231                                           (unsigned long long)i->sector, i->size,
2232                                           (unsigned long long)sector, size);
2233
2234                         if (req->rq_state & RQ_LOCAL_PENDING ||
2235                             !(req->rq_state & RQ_POSTPONED)) {
2236                                 /*
2237                                  * Wait for the node with the discard flag to
2238                                  * decide if this request has been superseded
2239                                  * or needs to be retried.
2240                                  * Requests that have been superseded will
2241                                  * disappear from the write_requests tree.
2242                                  *
2243                                  * In addition, wait for the conflicting
2244                                  * request to finish locally before submitting
2245                                  * the conflicting peer request.
2246                                  */
2247                                 err = drbd_wait_misc(device, &req->i);
2248                                 if (err) {
2249                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2250                                         fail_postponed_requests(device, sector, size);
2251                                         goto out;
2252                                 }
2253                                 goto repeat;
2254                         }
2255                         /*
2256                          * Remember to restart the conflicting requests after
2257                          * the new peer request has completed.
2258                          */
2259                         peer_req->flags |= EE_RESTART_REQUESTS;
2260                 }
2261         }
2262         err = 0;
2263
2264     out:
2265         if (err)
2266                 drbd_remove_epoch_entry_interval(device, peer_req);
2267         return err;
2268 }
2269
2270 /* mirrored write */
2271 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2272 {
2273         struct drbd_peer_device *peer_device;
2274         struct drbd_device *device;
2275         struct net_conf *nc;
2276         sector_t sector;
2277         struct drbd_peer_request *peer_req;
2278         struct p_data *p = pi->data;
2279         u32 peer_seq = be32_to_cpu(p->seq_num);
2280         int rw = WRITE;
2281         u32 dp_flags;
2282         int err, tp;
2283
2284         peer_device = conn_peer_device(connection, pi->vnr);
2285         if (!peer_device)
2286                 return -EIO;
2287         device = peer_device->device;
2288
2289         if (!get_ldev(device)) {
2290                 int err2;
2291
2292                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2293                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2294                 atomic_inc(&connection->current_epoch->epoch_size);
2295                 err2 = drbd_drain_block(peer_device, pi->size);
2296                 if (!err)
2297                         err = err2;
2298                 return err;
2299         }
2300
2301         /*
2302          * Corresponding put_ldev done either below (on various errors), or in
2303          * drbd_peer_request_endio, if we successfully submit the data at the
2304          * end of this function.
2305          */
2306
2307         sector = be64_to_cpu(p->sector);
2308         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2309         if (!peer_req) {
2310                 put_ldev(device);
2311                 return -EIO;
2312         }
2313
2314         peer_req->w.cb = e_end_block;
2315         peer_req->submit_jif = jiffies;
2316         peer_req->flags |= EE_APPLICATION;
2317
2318         dp_flags = be32_to_cpu(p->dp_flags);
2319         rw |= wire_flags_to_bio(dp_flags);
2320         if (pi->cmd == P_TRIM) {
2321                 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2322                 peer_req->flags |= EE_IS_TRIM;
2323                 if (!blk_queue_discard(q))
2324                         peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2325                 D_ASSERT(peer_device, peer_req->i.size > 0);
2326                 D_ASSERT(peer_device, rw & REQ_DISCARD);
2327                 D_ASSERT(peer_device, peer_req->pages == NULL);
2328         } else if (peer_req->pages == NULL) {
2329                 D_ASSERT(device, peer_req->i.size == 0);
2330                 D_ASSERT(device, dp_flags & DP_FLUSH);
2331         }
2332
2333         if (dp_flags & DP_MAY_SET_IN_SYNC)
2334                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2335
2336         spin_lock(&connection->epoch_lock);
2337         peer_req->epoch = connection->current_epoch;
2338         atomic_inc(&peer_req->epoch->epoch_size);
2339         atomic_inc(&peer_req->epoch->active);
2340         spin_unlock(&connection->epoch_lock);
2341
2342         rcu_read_lock();
2343         nc = rcu_dereference(peer_device->connection->net_conf);
2344         tp = nc->two_primaries;
2345         if (peer_device->connection->agreed_pro_version < 100) {
2346                 switch (nc->wire_protocol) {
2347                 case DRBD_PROT_C:
2348                         dp_flags |= DP_SEND_WRITE_ACK;
2349                         break;
2350                 case DRBD_PROT_B:
2351                         dp_flags |= DP_SEND_RECEIVE_ACK;
2352                         break;
2353                 }
2354         }
2355         rcu_read_unlock();
2356
2357         if (dp_flags & DP_SEND_WRITE_ACK) {
2358                 peer_req->flags |= EE_SEND_WRITE_ACK;
2359                 inc_unacked(device);
2360                 /* corresponding dec_unacked() in e_end_block()
2361                  * respective _drbd_clear_done_ee */
2362         }
2363
2364         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2365                 /* I really don't like it that the receiver thread
2366                  * sends on the msock, but anyways */
2367                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2368         }
2369
2370         if (tp) {
2371                 /* two primaries implies protocol C */
2372                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2373                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2374                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2375                 if (err)
2376                         goto out_interrupted;
2377                 spin_lock_irq(&device->resource->req_lock);
2378                 err = handle_write_conflicts(device, peer_req);
2379                 if (err) {
2380                         spin_unlock_irq(&device->resource->req_lock);
2381                         if (err == -ENOENT) {
2382                                 put_ldev(device);
2383                                 return 0;
2384                         }
2385                         goto out_interrupted;
2386                 }
2387         } else {
2388                 update_peer_seq(peer_device, peer_seq);
2389                 spin_lock_irq(&device->resource->req_lock);
2390         }
2391         /* if we use the zeroout fallback code, we process synchronously
2392          * and we wait for all pending requests, respectively wait for
2393          * active_ee to become empty in drbd_submit_peer_request();
2394          * better not add ourselves here. */
2395         if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2396                 list_add_tail(&peer_req->w.list, &device->active_ee);
2397         spin_unlock_irq(&device->resource->req_lock);
2398
2399         if (device->state.conn == C_SYNC_TARGET)
2400                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2401
2402         if (device->state.pdsk < D_INCONSISTENT) {
2403                 /* In case we have the only disk of the cluster, */
2404                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2405                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2406                 drbd_al_begin_io(device, &peer_req->i);
2407                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2408         }
2409
2410         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2411         if (!err)
2412                 return 0;
2413
2414         /* don't care for the reason here */
2415         drbd_err(device, "submit failed, triggering re-connect\n");
2416         spin_lock_irq(&device->resource->req_lock);
2417         list_del(&peer_req->w.list);
2418         drbd_remove_epoch_entry_interval(device, peer_req);
2419         spin_unlock_irq(&device->resource->req_lock);
2420         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2421                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2422                 drbd_al_complete_io(device, &peer_req->i);
2423         }
2424
2425 out_interrupted:
2426         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2427         put_ldev(device);
2428         drbd_free_peer_req(device, peer_req);
2429         return err;
2430 }
2431
2432 /* We may throttle resync, if the lower device seems to be busy,
2433  * and current sync rate is above c_min_rate.
2434  *
2435  * To decide whether or not the lower device is busy, we use a scheme similar
2436  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2437  * (more than 64 sectors) of activity we cannot account for with our own resync
2438  * activity, it obviously is "busy".
2439  *
2440  * The current sync rate used here uses only the most recent two step marks,
2441  * to have a short time average so we can react faster.
2442  */
2443 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2444                 bool throttle_if_app_is_waiting)
2445 {
2446         struct lc_element *tmp;
2447         bool throttle = drbd_rs_c_min_rate_throttle(device);
2448
2449         if (!throttle || throttle_if_app_is_waiting)
2450                 return throttle;
2451
2452         spin_lock_irq(&device->al_lock);
2453         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2454         if (tmp) {
2455                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2456                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2457                         throttle = false;
2458                 /* Do not slow down if app IO is already waiting for this extent,
2459                  * and our progress is necessary for application IO to complete. */
2460         }
2461         spin_unlock_irq(&device->al_lock);
2462
2463         return throttle;
2464 }
2465
2466 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2467 {
2468         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2469         unsigned long db, dt, dbdt;
2470         unsigned int c_min_rate;
2471         int curr_events;
2472
2473         rcu_read_lock();
2474         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2475         rcu_read_unlock();
2476
2477         /* feature disabled? */
2478         if (c_min_rate == 0)
2479                 return false;
2480
2481         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2482                       (int)part_stat_read(&disk->part0, sectors[1]) -
2483                         atomic_read(&device->rs_sect_ev);
2484
2485         if (atomic_read(&device->ap_actlog_cnt)
2486             || curr_events - device->rs_last_events > 64) {
2487                 unsigned long rs_left;
2488                 int i;
2489
2490                 device->rs_last_events = curr_events;
2491
2492                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2493                  * approx. */
2494                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2495
2496                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2497                         rs_left = device->ov_left;
2498                 else
2499                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2500
2501                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2502                 if (!dt)
2503                         dt++;
2504                 db = device->rs_mark_left[i] - rs_left;
2505                 dbdt = Bit2KB(db/dt);
2506
2507                 if (dbdt > c_min_rate)
2508                         return true;
2509         }
2510         return false;
2511 }
2512
2513 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2514 {
2515         struct drbd_peer_device *peer_device;
2516         struct drbd_device *device;
2517         sector_t sector;
2518         sector_t capacity;
2519         struct drbd_peer_request *peer_req;
2520         struct digest_info *di = NULL;
2521         int size, verb;
2522         unsigned int fault_type;
2523         struct p_block_req *p = pi->data;
2524
2525         peer_device = conn_peer_device(connection, pi->vnr);
2526         if (!peer_device)
2527                 return -EIO;
2528         device = peer_device->device;
2529         capacity = drbd_get_capacity(device->this_bdev);
2530
2531         sector = be64_to_cpu(p->sector);
2532         size   = be32_to_cpu(p->blksize);
2533
2534         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2535                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2536                                 (unsigned long long)sector, size);
2537                 return -EINVAL;
2538         }
2539         if (sector + (size>>9) > capacity) {
2540                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2541                                 (unsigned long long)sector, size);
2542                 return -EINVAL;
2543         }
2544
2545         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2546                 verb = 1;
2547                 switch (pi->cmd) {
2548                 case P_DATA_REQUEST:
2549                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2550                         break;
2551                 case P_RS_DATA_REQUEST:
2552                 case P_CSUM_RS_REQUEST:
2553                 case P_OV_REQUEST:
2554                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2555                         break;
2556                 case P_OV_REPLY:
2557                         verb = 0;
2558                         dec_rs_pending(device);
2559                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2560                         break;
2561                 default:
2562                         BUG();
2563                 }
2564                 if (verb && __ratelimit(&drbd_ratelimit_state))
2565                         drbd_err(device, "Can not satisfy peer's read request, "
2566                             "no local data.\n");
2567
2568                 /* drain possibly payload */
2569                 return drbd_drain_block(peer_device, pi->size);
2570         }
2571
2572         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2573          * "criss-cross" setup, that might cause write-out on some other DRBD,
2574          * which in turn might block on the other node at this very place.  */
2575         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2576                         true /* has real payload */, GFP_NOIO);
2577         if (!peer_req) {
2578                 put_ldev(device);
2579                 return -ENOMEM;
2580         }
2581
2582         switch (pi->cmd) {
2583         case P_DATA_REQUEST:
2584                 peer_req->w.cb = w_e_end_data_req;
2585                 fault_type = DRBD_FAULT_DT_RD;
2586                 /* application IO, don't drbd_rs_begin_io */
2587                 peer_req->flags |= EE_APPLICATION;
2588                 goto submit;
2589
2590         case P_RS_DATA_REQUEST:
2591                 peer_req->w.cb = w_e_end_rsdata_req;
2592                 fault_type = DRBD_FAULT_RS_RD;
2593                 /* used in the sector offset progress display */
2594                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2595                 break;
2596
2597         case P_OV_REPLY:
2598         case P_CSUM_RS_REQUEST:
2599                 fault_type = DRBD_FAULT_RS_RD;
2600                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2601                 if (!di)
2602                         goto out_free_e;
2603
2604                 di->digest_size = pi->size;
2605                 di->digest = (((char *)di)+sizeof(struct digest_info));
2606
2607                 peer_req->digest = di;
2608                 peer_req->flags |= EE_HAS_DIGEST;
2609
2610                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2611                         goto out_free_e;
2612
2613                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2614                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2615                         peer_req->w.cb = w_e_end_csum_rs_req;
2616                         /* used in the sector offset progress display */
2617                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2618                         /* remember to report stats in drbd_resync_finished */
2619                         device->use_csums = true;
2620                 } else if (pi->cmd == P_OV_REPLY) {
2621                         /* track progress, we may need to throttle */
2622                         atomic_add(size >> 9, &device->rs_sect_in);
2623                         peer_req->w.cb = w_e_end_ov_reply;
2624                         dec_rs_pending(device);
2625                         /* drbd_rs_begin_io done when we sent this request,
2626                          * but accounting still needs to be done. */
2627                         goto submit_for_resync;
2628                 }
2629                 break;
2630
2631         case P_OV_REQUEST:
2632                 if (device->ov_start_sector == ~(sector_t)0 &&
2633                     peer_device->connection->agreed_pro_version >= 90) {
2634                         unsigned long now = jiffies;
2635                         int i;
2636                         device->ov_start_sector = sector;
2637                         device->ov_position = sector;
2638                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2639                         device->rs_total = device->ov_left;
2640                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2641                                 device->rs_mark_left[i] = device->ov_left;
2642                                 device->rs_mark_time[i] = now;
2643                         }
2644                         drbd_info(device, "Online Verify start sector: %llu\n",
2645                                         (unsigned long long)sector);
2646                 }
2647                 peer_req->w.cb = w_e_end_ov_req;
2648                 fault_type = DRBD_FAULT_RS_RD;
2649                 break;
2650
2651         default:
2652                 BUG();
2653         }
2654
2655         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2656          * wrt the receiver, but it is not as straightforward as it may seem.
2657          * Various places in the resync start and stop logic assume resync
2658          * requests are processed in order, requeuing this on the worker thread
2659          * introduces a bunch of new code for synchronization between threads.
2660          *
2661          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2662          * "forever", throttling after drbd_rs_begin_io will lock that extent
2663          * for application writes for the same time.  For now, just throttle
2664          * here, where the rest of the code expects the receiver to sleep for
2665          * a while, anyways.
2666          */
2667
2668         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2669          * this defers syncer requests for some time, before letting at least
2670          * on request through.  The resync controller on the receiving side
2671          * will adapt to the incoming rate accordingly.
2672          *
2673          * We cannot throttle here if remote is Primary/SyncTarget:
2674          * we would also throttle its application reads.
2675          * In that case, throttling is done on the SyncTarget only.
2676          */
2677
2678         /* Even though this may be a resync request, we do add to "read_ee";
2679          * "sync_ee" is only used for resync WRITEs.
2680          * Add to list early, so debugfs can find this request
2681          * even if we have to sleep below. */
2682         spin_lock_irq(&device->resource->req_lock);
2683         list_add_tail(&peer_req->w.list, &device->read_ee);
2684         spin_unlock_irq(&device->resource->req_lock);
2685
2686         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2687         if (device->state.peer != R_PRIMARY
2688         && drbd_rs_should_slow_down(device, sector, false))
2689                 schedule_timeout_uninterruptible(HZ/10);
2690         update_receiver_timing_details(connection, drbd_rs_begin_io);
2691         if (drbd_rs_begin_io(device, sector))
2692                 goto out_free_e;
2693
2694 submit_for_resync:
2695         atomic_add(size >> 9, &device->rs_sect_ev);
2696
2697 submit:
2698         update_receiver_timing_details(connection, drbd_submit_peer_request);
2699         inc_unacked(device);
2700         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2701                 return 0;
2702
2703         /* don't care for the reason here */
2704         drbd_err(device, "submit failed, triggering re-connect\n");
2705
2706 out_free_e:
2707         spin_lock_irq(&device->resource->req_lock);
2708         list_del(&peer_req->w.list);
2709         spin_unlock_irq(&device->resource->req_lock);
2710         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2711
2712         put_ldev(device);
2713         drbd_free_peer_req(device, peer_req);
2714         return -EIO;
2715 }
2716
2717 /**
2718  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2719  */
2720 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2721 {
2722         struct drbd_device *device = peer_device->device;
2723         int self, peer, rv = -100;
2724         unsigned long ch_self, ch_peer;
2725         enum drbd_after_sb_p after_sb_0p;
2726
2727         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2728         peer = device->p_uuid[UI_BITMAP] & 1;
2729
2730         ch_peer = device->p_uuid[UI_SIZE];
2731         ch_self = device->comm_bm_set;
2732
2733         rcu_read_lock();
2734         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2735         rcu_read_unlock();
2736         switch (after_sb_0p) {
2737         case ASB_CONSENSUS:
2738         case ASB_DISCARD_SECONDARY:
2739         case ASB_CALL_HELPER:
2740         case ASB_VIOLENTLY:
2741                 drbd_err(device, "Configuration error.\n");
2742                 break;
2743         case ASB_DISCONNECT:
2744                 break;
2745         case ASB_DISCARD_YOUNGER_PRI:
2746                 if (self == 0 && peer == 1) {
2747                         rv = -1;
2748                         break;
2749                 }
2750                 if (self == 1 && peer == 0) {
2751                         rv =  1;
2752                         break;
2753                 }
2754                 /* Else fall through to one of the other strategies... */
2755         case ASB_DISCARD_OLDER_PRI:
2756                 if (self == 0 && peer == 1) {
2757                         rv = 1;
2758                         break;
2759                 }
2760                 if (self == 1 && peer == 0) {
2761                         rv = -1;
2762                         break;
2763                 }
2764                 /* Else fall through to one of the other strategies... */
2765                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2766                      "Using discard-least-changes instead\n");
2767         case ASB_DISCARD_ZERO_CHG:
2768                 if (ch_peer == 0 && ch_self == 0) {
2769                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2770                                 ? -1 : 1;
2771                         break;
2772                 } else {
2773                         if (ch_peer == 0) { rv =  1; break; }
2774                         if (ch_self == 0) { rv = -1; break; }
2775                 }
2776                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2777                         break;
2778         case ASB_DISCARD_LEAST_CHG:
2779                 if      (ch_self < ch_peer)
2780                         rv = -1;
2781                 else if (ch_self > ch_peer)
2782                         rv =  1;
2783                 else /* ( ch_self == ch_peer ) */
2784                      /* Well, then use something else. */
2785                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2786                                 ? -1 : 1;
2787                 break;
2788         case ASB_DISCARD_LOCAL:
2789                 rv = -1;
2790                 break;
2791         case ASB_DISCARD_REMOTE:
2792                 rv =  1;
2793         }
2794
2795         return rv;
2796 }
2797
2798 /**
2799  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2800  */
2801 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2802 {
2803         struct drbd_device *device = peer_device->device;
2804         int hg, rv = -100;
2805         enum drbd_after_sb_p after_sb_1p;
2806
2807         rcu_read_lock();
2808         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2809         rcu_read_unlock();
2810         switch (after_sb_1p) {
2811         case ASB_DISCARD_YOUNGER_PRI:
2812         case ASB_DISCARD_OLDER_PRI:
2813         case ASB_DISCARD_LEAST_CHG:
2814         case ASB_DISCARD_LOCAL:
2815         case ASB_DISCARD_REMOTE:
2816         case ASB_DISCARD_ZERO_CHG:
2817                 drbd_err(device, "Configuration error.\n");
2818                 break;
2819         case ASB_DISCONNECT:
2820                 break;
2821         case ASB_CONSENSUS:
2822                 hg = drbd_asb_recover_0p(peer_device);
2823                 if (hg == -1 && device->state.role == R_SECONDARY)
2824                         rv = hg;
2825                 if (hg == 1  && device->state.role == R_PRIMARY)
2826                         rv = hg;
2827                 break;
2828         case ASB_VIOLENTLY:
2829                 rv = drbd_asb_recover_0p(peer_device);
2830                 break;
2831         case ASB_DISCARD_SECONDARY:
2832                 return device->state.role == R_PRIMARY ? 1 : -1;
2833         case ASB_CALL_HELPER:
2834                 hg = drbd_asb_recover_0p(peer_device);
2835                 if (hg == -1 && device->state.role == R_PRIMARY) {
2836                         enum drbd_state_rv rv2;
2837
2838                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2839                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2840                           * we do not need to wait for the after state change work either. */
2841                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2842                         if (rv2 != SS_SUCCESS) {
2843                                 drbd_khelper(device, "pri-lost-after-sb");
2844                         } else {
2845                                 drbd_warn(device, "Successfully gave up primary role.\n");
2846                                 rv = hg;
2847                         }
2848                 } else
2849                         rv = hg;
2850         }
2851
2852         return rv;
2853 }
2854
2855 /**
2856  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2857  */
2858 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2859 {
2860         struct drbd_device *device = peer_device->device;
2861         int hg, rv = -100;
2862         enum drbd_after_sb_p after_sb_2p;
2863
2864         rcu_read_lock();
2865         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2866         rcu_read_unlock();
2867         switch (after_sb_2p) {
2868         case ASB_DISCARD_YOUNGER_PRI:
2869         case ASB_DISCARD_OLDER_PRI:
2870         case ASB_DISCARD_LEAST_CHG:
2871         case ASB_DISCARD_LOCAL:
2872         case ASB_DISCARD_REMOTE:
2873         case ASB_CONSENSUS:
2874         case ASB_DISCARD_SECONDARY:
2875         case ASB_DISCARD_ZERO_CHG:
2876                 drbd_err(device, "Configuration error.\n");
2877                 break;
2878         case ASB_VIOLENTLY:
2879                 rv = drbd_asb_recover_0p(peer_device);
2880                 break;
2881         case ASB_DISCONNECT:
2882                 break;
2883         case ASB_CALL_HELPER:
2884                 hg = drbd_asb_recover_0p(peer_device);
2885                 if (hg == -1) {
2886                         enum drbd_state_rv rv2;
2887
2888                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2889                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2890                           * we do not need to wait for the after state change work either. */
2891                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2892                         if (rv2 != SS_SUCCESS) {
2893                                 drbd_khelper(device, "pri-lost-after-sb");
2894                         } else {
2895                                 drbd_warn(device, "Successfully gave up primary role.\n");
2896                                 rv = hg;
2897                         }
2898                 } else
2899                         rv = hg;
2900         }
2901
2902         return rv;
2903 }
2904
2905 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2906                            u64 bits, u64 flags)
2907 {
2908         if (!uuid) {
2909                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2910                 return;
2911         }
2912         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2913              text,
2914              (unsigned long long)uuid[UI_CURRENT],
2915              (unsigned long long)uuid[UI_BITMAP],
2916              (unsigned long long)uuid[UI_HISTORY_START],
2917              (unsigned long long)uuid[UI_HISTORY_END],
2918              (unsigned long long)bits,
2919              (unsigned long long)flags);
2920 }
2921
2922 /*
2923   100   after split brain try auto recover
2924     2   C_SYNC_SOURCE set BitMap
2925     1   C_SYNC_SOURCE use BitMap
2926     0   no Sync
2927    -1   C_SYNC_TARGET use BitMap
2928    -2   C_SYNC_TARGET set BitMap
2929  -100   after split brain, disconnect
2930 -1000   unrelated data
2931 -1091   requires proto 91
2932 -1096   requires proto 96
2933  */
2934 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2935 {
2936         struct drbd_peer_device *const peer_device = first_peer_device(device);
2937         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2938         u64 self, peer;
2939         int i, j;
2940
2941         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2942         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2943
2944         *rule_nr = 10;
2945         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2946                 return 0;
2947
2948         *rule_nr = 20;
2949         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2950              peer != UUID_JUST_CREATED)
2951                 return -2;
2952
2953         *rule_nr = 30;
2954         if (self != UUID_JUST_CREATED &&
2955             (peer == UUID_JUST_CREATED || peer == (u64)0))
2956                 return 2;
2957
2958         if (self == peer) {
2959                 int rct, dc; /* roles at crash time */
2960
2961                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2962
2963                         if (connection->agreed_pro_version < 91)
2964                                 return -1091;
2965
2966                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2967                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2968                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2969                                 drbd_uuid_move_history(device);
2970                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2971                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2972
2973                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2974                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2975                                 *rule_nr = 34;
2976                         } else {
2977                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2978                                 *rule_nr = 36;
2979                         }
2980
2981                         return 1;
2982                 }
2983
2984                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2985
2986                         if (connection->agreed_pro_version < 91)
2987                                 return -1091;
2988
2989                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2990                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2991                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2992
2993                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2994                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2995                                 device->p_uuid[UI_BITMAP] = 0UL;
2996
2997                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2998                                 *rule_nr = 35;
2999                         } else {
3000                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3001                                 *rule_nr = 37;
3002                         }
3003
3004                         return -1;
3005                 }
3006
3007                 /* Common power [off|failure] */
3008                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3009                         (device->p_uuid[UI_FLAGS] & 2);
3010                 /* lowest bit is set when we were primary,
3011                  * next bit (weight 2) is set when peer was primary */
3012                 *rule_nr = 40;
3013
3014                 switch (rct) {
3015                 case 0: /* !self_pri && !peer_pri */ return 0;
3016                 case 1: /*  self_pri && !peer_pri */ return 1;
3017                 case 2: /* !self_pri &&  peer_pri */ return -1;
3018                 case 3: /*  self_pri &&  peer_pri */
3019                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3020                         return dc ? -1 : 1;
3021                 }
3022         }
3023
3024         *rule_nr = 50;
3025         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3026         if (self == peer)
3027                 return -1;
3028
3029         *rule_nr = 51;
3030         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3031         if (self == peer) {
3032                 if (connection->agreed_pro_version < 96 ?
3033                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3034                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3035                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3036                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3037                            resync as sync source modifications of the peer's UUIDs. */
3038
3039                         if (connection->agreed_pro_version < 91)
3040                                 return -1091;
3041
3042                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3043                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3044
3045                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3046                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3047
3048                         return -1;
3049                 }
3050         }
3051
3052         *rule_nr = 60;
3053         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3054         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3055                 peer = device->p_uuid[i] & ~((u64)1);
3056                 if (self == peer)
3057                         return -2;
3058         }
3059
3060         *rule_nr = 70;
3061         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3062         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3063         if (self == peer)
3064                 return 1;
3065
3066         *rule_nr = 71;
3067         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3068         if (self == peer) {
3069                 if (connection->agreed_pro_version < 96 ?
3070                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3071                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3072                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3073                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3074                            resync as sync source modifications of our UUIDs. */
3075
3076                         if (connection->agreed_pro_version < 91)
3077                                 return -1091;
3078
3079                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3080                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3081
3082                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3083                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3084                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3085
3086                         return 1;
3087                 }
3088         }
3089
3090
3091         *rule_nr = 80;
3092         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3093         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3094                 self = device->ldev->md.uuid[i] & ~((u64)1);
3095                 if (self == peer)
3096                         return 2;
3097         }
3098
3099         *rule_nr = 90;
3100         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3101         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3102         if (self == peer && self != ((u64)0))
3103                 return 100;
3104
3105         *rule_nr = 100;
3106         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3107                 self = device->ldev->md.uuid[i] & ~((u64)1);
3108                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3109                         peer = device->p_uuid[j] & ~((u64)1);
3110                         if (self == peer)
3111                                 return -100;
3112                 }
3113         }
3114
3115         return -1000;
3116 }
3117
3118 /* drbd_sync_handshake() returns the new conn state on success, or
3119    CONN_MASK (-1) on failure.
3120  */
3121 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3122                                            enum drbd_role peer_role,
3123                                            enum drbd_disk_state peer_disk) __must_hold(local)
3124 {
3125         struct drbd_device *device = peer_device->device;
3126         enum drbd_conns rv = C_MASK;
3127         enum drbd_disk_state mydisk;
3128         struct net_conf *nc;
3129         int hg, rule_nr, rr_conflict, tentative, always_asbp;
3130
3131         mydisk = device->state.disk;
3132         if (mydisk == D_NEGOTIATING)
3133                 mydisk = device->new_state_tmp.disk;
3134
3135         drbd_info(device, "drbd_sync_handshake:\n");
3136
3137         spin_lock_irq(&device->ldev->md.uuid_lock);
3138         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3139         drbd_uuid_dump(device, "peer", device->p_uuid,
3140                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3141
3142         hg = drbd_uuid_compare(device, &rule_nr);
3143         spin_unlock_irq(&device->ldev->md.uuid_lock);
3144
3145         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3146
3147         if (hg == -1000) {
3148                 drbd_alert(device, "Unrelated data, aborting!\n");
3149                 return C_MASK;
3150         }
3151         if (hg < -1000) {
3152                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3153                 return C_MASK;
3154         }
3155
3156         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3157             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3158                 int f = (hg == -100) || abs(hg) == 2;
3159                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3160                 if (f)
3161                         hg = hg*2;
3162                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3163                      hg > 0 ? "source" : "target");
3164         }
3165
3166         if (abs(hg) == 100)
3167                 drbd_khelper(device, "initial-split-brain");
3168
3169         rcu_read_lock();
3170         nc = rcu_dereference(peer_device->connection->net_conf);
3171         always_asbp = nc->always_asbp;
3172         rr_conflict = nc->rr_conflict;
3173         tentative = nc->tentative;
3174         rcu_read_unlock();
3175
3176         if (hg == 100 || (hg == -100 && always_asbp)) {
3177                 int pcount = (device->state.role == R_PRIMARY)
3178                            + (peer_role == R_PRIMARY);
3179                 int forced = (hg == -100);
3180
3181                 switch (pcount) {
3182                 case 0:
3183                         hg = drbd_asb_recover_0p(peer_device);
3184                         break;
3185                 case 1:
3186                         hg = drbd_asb_recover_1p(peer_device);
3187                         break;
3188                 case 2:
3189                         hg = drbd_asb_recover_2p(peer_device);
3190                         break;
3191                 }
3192                 if (abs(hg) < 100) {
3193                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3194                              "automatically solved. Sync from %s node\n",
3195                              pcount, (hg < 0) ? "peer" : "this");
3196                         if (forced) {
3197                                 drbd_warn(device, "Doing a full sync, since"
3198                                      " UUIDs where ambiguous.\n");
3199                                 hg = hg*2;
3200                         }
3201                 }
3202         }
3203
3204         if (hg == -100) {
3205                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3206                         hg = -1;
3207                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3208                         hg = 1;
3209
3210                 if (abs(hg) < 100)
3211                         drbd_warn(device, "Split-Brain detected, manually solved. "
3212                              "Sync from %s node\n",
3213                              (hg < 0) ? "peer" : "this");
3214         }
3215
3216         if (hg == -100) {
3217                 /* FIXME this log message is not correct if we end up here
3218                  * after an attempted attach on a diskless node.
3219                  * We just refuse to attach -- well, we drop the "connection"
3220                  * to that disk, in a way... */
3221                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3222                 drbd_khelper(device, "split-brain");
3223                 return C_MASK;
3224         }
3225
3226         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3227                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3228                 return C_MASK;
3229         }
3230
3231         if (hg < 0 && /* by intention we do not use mydisk here. */
3232             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3233                 switch (rr_conflict) {
3234                 case ASB_CALL_HELPER:
3235                         drbd_khelper(device, "pri-lost");
3236                         /* fall through */
3237                 case ASB_DISCONNECT:
3238                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3239                         return C_MASK;
3240                 case ASB_VIOLENTLY:
3241                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3242                              "assumption\n");
3243                 }
3244         }
3245
3246         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3247                 if (hg == 0)
3248                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3249                 else
3250                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3251                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3252                                  abs(hg) >= 2 ? "full" : "bit-map based");
3253                 return C_MASK;
3254         }
3255
3256         if (abs(hg) >= 2) {
3257                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3258                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3259                                         BM_LOCKED_SET_ALLOWED))
3260                         return C_MASK;
3261         }
3262
3263         if (hg > 0) { /* become sync source. */
3264                 rv = C_WF_BITMAP_S;
3265         } else if (hg < 0) { /* become sync target */
3266                 rv = C_WF_BITMAP_T;
3267         } else {
3268                 rv = C_CONNECTED;
3269                 if (drbd_bm_total_weight(device)) {
3270                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3271                              drbd_bm_total_weight(device));
3272                 }
3273         }
3274
3275         return rv;
3276 }
3277
3278 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3279 {
3280         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3281         if (peer == ASB_DISCARD_REMOTE)
3282                 return ASB_DISCARD_LOCAL;
3283
3284         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3285         if (peer == ASB_DISCARD_LOCAL)
3286                 return ASB_DISCARD_REMOTE;
3287
3288         /* everything else is valid if they are equal on both sides. */
3289         return peer;
3290 }
3291
3292 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3293 {
3294         struct p_protocol *p = pi->data;
3295         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3296         int p_proto, p_discard_my_data, p_two_primaries, cf;
3297         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3298         char integrity_alg[SHARED_SECRET_MAX] = "";
3299         struct crypto_hash *peer_integrity_tfm = NULL;
3300         void *int_dig_in = NULL, *int_dig_vv = NULL;
3301
3302         p_proto         = be32_to_cpu(p->protocol);
3303         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3304         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3305         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3306         p_two_primaries = be32_to_cpu(p->two_primaries);
3307         cf              = be32_to_cpu(p->conn_flags);
3308         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3309
3310         if (connection->agreed_pro_version >= 87) {
3311                 int err;
3312
3313                 if (pi->size > sizeof(integrity_alg))
3314                         return -EIO;
3315                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3316                 if (err)
3317                         return err;
3318                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3319         }
3320
3321         if (pi->cmd != P_PROTOCOL_UPDATE) {
3322                 clear_bit(CONN_DRY_RUN, &connection->flags);
3323
3324                 if (cf & CF_DRY_RUN)
3325                         set_bit(CONN_DRY_RUN, &connection->flags);
3326
3327                 rcu_read_lock();
3328                 nc = rcu_dereference(connection->net_conf);
3329
3330                 if (p_proto != nc->wire_protocol) {
3331                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3332                         goto disconnect_rcu_unlock;
3333                 }
3334
3335                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3336                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3337                         goto disconnect_rcu_unlock;
3338                 }
3339
3340                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3341                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3342                         goto disconnect_rcu_unlock;
3343                 }
3344
3345                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3346                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3347                         goto disconnect_rcu_unlock;
3348                 }
3349
3350                 if (p_discard_my_data && nc->discard_my_data) {
3351                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3352                         goto disconnect_rcu_unlock;
3353                 }
3354
3355                 if (p_two_primaries != nc->two_primaries) {
3356                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3357                         goto disconnect_rcu_unlock;
3358                 }
3359
3360                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3361                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3362                         goto disconnect_rcu_unlock;
3363                 }
3364
3365                 rcu_read_unlock();
3366         }
3367
3368         if (integrity_alg[0]) {
3369                 int hash_size;
3370
3371                 /*
3372                  * We can only change the peer data integrity algorithm
3373                  * here.  Changing our own data integrity algorithm
3374                  * requires that we send a P_PROTOCOL_UPDATE packet at
3375                  * the same time; otherwise, the peer has no way to
3376                  * tell between which packets the algorithm should
3377                  * change.
3378                  */
3379
3380                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3381                 if (!peer_integrity_tfm) {
3382                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3383                                  integrity_alg);
3384                         goto disconnect;
3385                 }
3386
3387                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3388                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3389                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3390                 if (!(int_dig_in && int_dig_vv)) {
3391                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3392                         goto disconnect;
3393                 }
3394         }
3395
3396         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3397         if (!new_net_conf) {
3398                 drbd_err(connection, "Allocation of new net_conf failed\n");
3399                 goto disconnect;
3400         }
3401
3402         mutex_lock(&connection->data.mutex);
3403         mutex_lock(&connection->resource->conf_update);
3404         old_net_conf = connection->net_conf;
3405         *new_net_conf = *old_net_conf;
3406
3407         new_net_conf->wire_protocol = p_proto;
3408         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3409         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3410         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3411         new_net_conf->two_primaries = p_two_primaries;
3412
3413         rcu_assign_pointer(connection->net_conf, new_net_conf);
3414         mutex_unlock(&connection->resource->conf_update);
3415         mutex_unlock(&connection->data.mutex);
3416
3417         crypto_free_hash(connection->peer_integrity_tfm);
3418         kfree(connection->int_dig_in);
3419         kfree(connection->int_dig_vv);
3420         connection->peer_integrity_tfm = peer_integrity_tfm;
3421         connection->int_dig_in = int_dig_in;
3422         connection->int_dig_vv = int_dig_vv;
3423
3424         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3425                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3426                           integrity_alg[0] ? integrity_alg : "(none)");
3427
3428         synchronize_rcu();
3429         kfree(old_net_conf);
3430         return 0;
3431
3432 disconnect_rcu_unlock:
3433         rcu_read_unlock();
3434 disconnect:
3435         crypto_free_hash(peer_integrity_tfm);
3436         kfree(int_dig_in);
3437         kfree(int_dig_vv);
3438         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3439         return -EIO;
3440 }
3441
3442 /* helper function
3443  * input: alg name, feature name
3444  * return: NULL (alg name was "")
3445  *         ERR_PTR(error) if something goes wrong
3446  *         or the crypto hash ptr, if it worked out ok. */
3447 static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3448                 const char *alg, const char *name)
3449 {
3450         struct crypto_hash *tfm;
3451
3452         if (!alg[0])
3453                 return NULL;
3454
3455         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3456         if (IS_ERR(tfm)) {
3457                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3458                         alg, name, PTR_ERR(tfm));
3459                 return tfm;
3460         }
3461         return tfm;
3462 }
3463
3464 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3465 {
3466         void *buffer = connection->data.rbuf;
3467         int size = pi->size;
3468
3469         while (size) {
3470                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3471                 s = drbd_recv(connection, buffer, s);
3472                 if (s <= 0) {
3473                         if (s < 0)
3474                                 return s;
3475                         break;
3476                 }
3477                 size -= s;
3478         }
3479         if (size)
3480                 return -EIO;
3481         return 0;
3482 }
3483
3484 /*
3485  * config_unknown_volume  -  device configuration command for unknown volume
3486  *
3487  * When a device is added to an existing connection, the node on which the
3488  * device is added first will send configuration commands to its peer but the
3489  * peer will not know about the device yet.  It will warn and ignore these
3490  * commands.  Once the device is added on the second node, the second node will
3491  * send the same device configuration commands, but in the other direction.
3492  *
3493  * (We can also end up here if drbd is misconfigured.)
3494  */
3495 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3496 {
3497         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3498                   cmdname(pi->cmd), pi->vnr);
3499         return ignore_remaining_packet(connection, pi);
3500 }
3501
3502 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3503 {
3504         struct drbd_peer_device *peer_device;
3505         struct drbd_device *device;
3506         struct p_rs_param_95 *p;
3507         unsigned int header_size, data_size, exp_max_sz;
3508         struct crypto_hash *verify_tfm = NULL;
3509         struct crypto_hash *csums_tfm = NULL;
3510         struct net_conf *old_net_conf, *new_net_conf = NULL;
3511         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3512         const int apv = connection->agreed_pro_version;
3513         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3514         int fifo_size = 0;
3515         int err;
3516
3517         peer_device = conn_peer_device(connection, pi->vnr);
3518         if (!peer_device)
3519                 return config_unknown_volume(connection, pi);
3520         device = peer_device->device;
3521
3522         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3523                     : apv == 88 ? sizeof(struct p_rs_param)
3524                                         + SHARED_SECRET_MAX
3525                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3526                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3527
3528         if (pi->size > exp_max_sz) {
3529                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3530                     pi->size, exp_max_sz);
3531                 return -EIO;
3532         }
3533
3534         if (apv <= 88) {
3535                 header_size = sizeof(struct p_rs_param);
3536                 data_size = pi->size - header_size;
3537         } else if (apv <= 94) {
3538                 header_size = sizeof(struct p_rs_param_89);
3539                 data_size = pi->size - header_size;
3540                 D_ASSERT(device, data_size == 0);
3541         } else {
3542                 header_size = sizeof(struct p_rs_param_95);
3543                 data_size = pi->size - header_size;
3544                 D_ASSERT(device, data_size == 0);
3545         }
3546
3547         /* initialize verify_alg and csums_alg */
3548         p = pi->data;
3549         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3550
3551         err = drbd_recv_all(peer_device->connection, p, header_size);
3552         if (err)
3553                 return err;
3554
3555         mutex_lock(&connection->resource->conf_update);
3556         old_net_conf = peer_device->connection->net_conf;
3557         if (get_ldev(device)) {
3558                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3559                 if (!new_disk_conf) {
3560                         put_ldev(device);
3561                         mutex_unlock(&connection->resource->conf_update);
3562                         drbd_err(device, "Allocation of new disk_conf failed\n");
3563                         return -ENOMEM;
3564                 }
3565
3566                 old_disk_conf = device->ldev->disk_conf;
3567                 *new_disk_conf = *old_disk_conf;
3568
3569                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3570         }
3571
3572         if (apv >= 88) {
3573                 if (apv == 88) {
3574                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3575                                 drbd_err(device, "verify-alg of wrong size, "
3576                                         "peer wants %u, accepting only up to %u byte\n",
3577                                         data_size, SHARED_SECRET_MAX);
3578                                 err = -EIO;
3579                                 goto reconnect;
3580                         }
3581
3582                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3583                         if (err)
3584                                 goto reconnect;
3585                         /* we expect NUL terminated string */
3586                         /* but just in case someone tries to be evil */
3587                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3588                         p->verify_alg[data_size-1] = 0;
3589
3590                 } else /* apv >= 89 */ {
3591                         /* we still expect NUL terminated strings */
3592                         /* but just in case someone tries to be evil */
3593                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3594                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3595                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3596                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3597                 }
3598
3599                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3600                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3601                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3602                                     old_net_conf->verify_alg, p->verify_alg);
3603                                 goto disconnect;
3604                         }
3605                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3606                                         p->verify_alg, "verify-alg");
3607                         if (IS_ERR(verify_tfm)) {
3608                                 verify_tfm = NULL;
3609                                 goto disconnect;
3610                         }
3611                 }
3612
3613                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3614                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3615                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3616                                     old_net_conf->csums_alg, p->csums_alg);
3617                                 goto disconnect;
3618                         }
3619                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3620                                         p->csums_alg, "csums-alg");
3621                         if (IS_ERR(csums_tfm)) {
3622                                 csums_tfm = NULL;
3623                                 goto disconnect;
3624                         }
3625                 }
3626
3627                 if (apv > 94 && new_disk_conf) {
3628                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3629                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3630                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3631                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3632
3633                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3634                         if (fifo_size != device->rs_plan_s->size) {
3635                                 new_plan = fifo_alloc(fifo_size);
3636                                 if (!new_plan) {
3637                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3638                                         put_ldev(device);
3639                                         goto disconnect;
3640                                 }
3641                         }
3642                 }
3643
3644                 if (verify_tfm || csums_tfm) {
3645                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3646                         if (!new_net_conf) {
3647                                 drbd_err(device, "Allocation of new net_conf failed\n");
3648                                 goto disconnect;
3649                         }
3650
3651                         *new_net_conf = *old_net_conf;
3652
3653                         if (verify_tfm) {
3654                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3655                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3656                                 crypto_free_hash(peer_device->connection->verify_tfm);
3657                                 peer_device->connection->verify_tfm = verify_tfm;
3658                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3659                         }
3660                         if (csums_tfm) {
3661                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3662                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3663                                 crypto_free_hash(peer_device->connection->csums_tfm);
3664                                 peer_device->connection->csums_tfm = csums_tfm;
3665                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3666                         }
3667                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3668                 }
3669         }
3670
3671         if (new_disk_conf) {
3672                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3673                 put_ldev(device);
3674         }
3675
3676         if (new_plan) {
3677                 old_plan = device->rs_plan_s;
3678                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3679         }
3680
3681         mutex_unlock(&connection->resource->conf_update);
3682         synchronize_rcu();
3683         if (new_net_conf)
3684                 kfree(old_net_conf);
3685         kfree(old_disk_conf);
3686         kfree(old_plan);
3687
3688         return 0;
3689
3690 reconnect:
3691         if (new_disk_conf) {
3692                 put_ldev(device);
3693                 kfree(new_disk_conf);
3694         }
3695         mutex_unlock(&connection->resource->conf_update);
3696         return -EIO;
3697
3698 disconnect:
3699         kfree(new_plan);
3700         if (new_disk_conf) {
3701                 put_ldev(device);
3702                 kfree(new_disk_conf);
3703         }
3704         mutex_unlock(&connection->resource->conf_update);
3705         /* just for completeness: actually not needed,
3706          * as this is not reached if csums_tfm was ok. */
3707         crypto_free_hash(csums_tfm);
3708         /* but free the verify_tfm again, if csums_tfm did not work out */
3709         crypto_free_hash(verify_tfm);
3710         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3711         return -EIO;
3712 }
3713
3714 /* warn if the arguments differ by more than 12.5% */
3715 static void warn_if_differ_considerably(struct drbd_device *device,
3716         const char *s, sector_t a, sector_t b)
3717 {
3718         sector_t d;
3719         if (a == 0 || b == 0)
3720                 return;
3721         d = (a > b) ? (a - b) : (b - a);
3722         if (d > (a>>3) || d > (b>>3))
3723                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3724                      (unsigned long long)a, (unsigned long long)b);
3725 }
3726
3727 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3728 {
3729         struct drbd_peer_device *peer_device;
3730         struct drbd_device *device;
3731         struct p_sizes *p = pi->data;
3732         enum determine_dev_size dd = DS_UNCHANGED;
3733         sector_t p_size, p_usize, p_csize, my_usize;
3734         int ldsc = 0; /* local disk size changed */
3735         enum dds_flags ddsf;
3736
3737         peer_device = conn_peer_device(connection, pi->vnr);
3738         if (!peer_device)
3739                 return config_unknown_volume(connection, pi);
3740         device = peer_device->device;
3741
3742         p_size = be64_to_cpu(p->d_size);
3743         p_usize = be64_to_cpu(p->u_size);
3744         p_csize = be64_to_cpu(p->c_size);
3745
3746         /* just store the peer's disk size for now.
3747          * we still need to figure out whether we accept that. */
3748         device->p_size = p_size;
3749
3750         if (get_ldev(device)) {
3751                 rcu_read_lock();
3752                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3753                 rcu_read_unlock();
3754
3755                 warn_if_differ_considerably(device, "lower level device sizes",
3756                            p_size, drbd_get_max_capacity(device->ldev));
3757                 warn_if_differ_considerably(device, "user requested size",
3758                                             p_usize, my_usize);
3759
3760                 /* if this is the first connect, or an otherwise expected
3761                  * param exchange, choose the minimum */
3762                 if (device->state.conn == C_WF_REPORT_PARAMS)
3763                         p_usize = min_not_zero(my_usize, p_usize);
3764
3765                 /* Never shrink a device with usable data during connect.
3766                    But allow online shrinking if we are connected. */
3767                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3768                     drbd_get_capacity(device->this_bdev) &&
3769                     device->state.disk >= D_OUTDATED &&
3770                     device->state.conn < C_CONNECTED) {
3771                         drbd_err(device, "The peer's disk size is too small!\n");
3772                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3773                         put_ldev(device);
3774                         return -EIO;
3775                 }
3776
3777                 if (my_usize != p_usize) {
3778                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3779
3780                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3781                         if (!new_disk_conf) {
3782                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3783                                 put_ldev(device);
3784                                 return -ENOMEM;
3785                         }
3786
3787                         mutex_lock(&connection->resource->conf_update);
3788                         old_disk_conf = device->ldev->disk_conf;
3789                         *new_disk_conf = *old_disk_conf;
3790                         new_disk_conf->disk_size = p_usize;
3791
3792                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3793                         mutex_unlock(&connection->resource->conf_update);
3794                         synchronize_rcu();
3795                         kfree(old_disk_conf);
3796
3797                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3798                                  (unsigned long)my_usize);
3799                 }
3800
3801                 put_ldev(device);
3802         }
3803
3804         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3805         /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3806            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3807            drbd_reconsider_max_bio_size(), we can be sure that after
3808            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3809
3810         ddsf = be16_to_cpu(p->dds_flags);
3811         if (get_ldev(device)) {
3812                 drbd_reconsider_max_bio_size(device, device->ldev);
3813                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3814                 put_ldev(device);
3815                 if (dd == DS_ERROR)
3816                         return -EIO;
3817                 drbd_md_sync(device);
3818         } else {
3819                 /*
3820                  * I am diskless, need to accept the peer's *current* size.
3821                  * I must NOT accept the peers backing disk size,
3822                  * it may have been larger than mine all along...
3823                  *
3824                  * At this point, the peer knows more about my disk, or at
3825                  * least about what we last agreed upon, than myself.
3826                  * So if his c_size is less than his d_size, the most likely
3827                  * reason is that *my* d_size was smaller last time we checked.
3828                  *
3829                  * However, if he sends a zero current size,
3830                  * take his (user-capped or) backing disk size anyways.
3831                  */
3832                 drbd_reconsider_max_bio_size(device, NULL);
3833                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3834         }
3835
3836         if (get_ldev(device)) {
3837                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3838                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3839                         ldsc = 1;
3840                 }
3841
3842                 put_ldev(device);
3843         }
3844
3845         if (device->state.conn > C_WF_REPORT_PARAMS) {
3846                 if (be64_to_cpu(p->c_size) !=
3847                     drbd_get_capacity(device->this_bdev) || ldsc) {
3848                         /* we have different sizes, probably peer
3849                          * needs to know my new size... */
3850                         drbd_send_sizes(peer_device, 0, ddsf);
3851                 }
3852                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3853                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3854                         if (device->state.pdsk >= D_INCONSISTENT &&
3855                             device->state.disk >= D_INCONSISTENT) {
3856                                 if (ddsf & DDSF_NO_RESYNC)
3857                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3858                                 else
3859                                         resync_after_online_grow(device);
3860                         } else
3861                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3862                 }
3863         }
3864
3865         return 0;
3866 }
3867
3868 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3869 {
3870         struct drbd_peer_device *peer_device;
3871         struct drbd_device *device;
3872         struct p_uuids *p = pi->data;
3873         u64 *p_uuid;
3874         int i, updated_uuids = 0;
3875
3876         peer_device = conn_peer_device(connection, pi->vnr);
3877         if (!peer_device)
3878                 return config_unknown_volume(connection, pi);
3879         device = peer_device->device;
3880
3881         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3882         if (!p_uuid) {
3883                 drbd_err(device, "kmalloc of p_uuid failed\n");
3884                 return false;
3885         }
3886
3887         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3888                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3889
3890         kfree(device->p_uuid);
3891         device->p_uuid = p_uuid;
3892
3893         if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
3894             device->state.disk < D_INCONSISTENT &&
3895             device->state.role == R_PRIMARY &&
3896             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3897                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3898                     (unsigned long long)device->ed_uuid);
3899                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3900                 return -EIO;
3901         }
3902
3903         if (get_ldev(device)) {
3904                 int skip_initial_sync =
3905                         device->state.conn == C_CONNECTED &&
3906                         peer_device->connection->agreed_pro_version >= 90 &&
3907                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3908                         (p_uuid[UI_FLAGS] & 8);
3909                 if (skip_initial_sync) {
3910                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3911                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3912                                         "clear_n_write from receive_uuids",
3913                                         BM_LOCKED_TEST_ALLOWED);
3914                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3915                         _drbd_uuid_set(device, UI_BITMAP, 0);
3916                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3917                                         CS_VERBOSE, NULL);
3918                         drbd_md_sync(device);
3919                         updated_uuids = 1;
3920                 }
3921                 put_ldev(device);
3922         } else if (device->state.disk < D_INCONSISTENT &&
3923                    device->state.role == R_PRIMARY) {
3924                 /* I am a diskless primary, the peer just created a new current UUID
3925                    for me. */
3926                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3927         }
3928
3929         /* Before we test for the disk state, we should wait until an eventually
3930            ongoing cluster wide state change is finished. That is important if
3931            we are primary and are detaching from our disk. We need to see the
3932            new disk state... */
3933         mutex_lock(device->state_mutex);
3934         mutex_unlock(device->state_mutex);
3935         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3936                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3937
3938         if (updated_uuids)
3939                 drbd_print_uuids(device, "receiver updated UUIDs to");
3940
3941         return 0;
3942 }
3943
3944 /**
3945  * convert_state() - Converts the peer's view of the cluster state to our point of view
3946  * @ps:         The state as seen by the peer.
3947  */
3948 static union drbd_state convert_state(union drbd_state ps)
3949 {
3950         union drbd_state ms;
3951
3952         static enum drbd_conns c_tab[] = {
3953                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3954                 [C_CONNECTED] = C_CONNECTED,
3955
3956                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3957                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3958                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3959                 [C_VERIFY_S]       = C_VERIFY_T,
3960                 [C_MASK]   = C_MASK,
3961         };
3962
3963         ms.i = ps.i;
3964
3965         ms.conn = c_tab[ps.conn];
3966         ms.peer = ps.role;
3967         ms.role = ps.peer;
3968         ms.pdsk = ps.disk;
3969         ms.disk = ps.pdsk;
3970         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3971
3972         return ms;
3973 }
3974
3975 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3976 {
3977         struct drbd_peer_device *peer_device;
3978         struct drbd_device *device;
3979         struct p_req_state *p = pi->data;
3980         union drbd_state mask, val;
3981         enum drbd_state_rv rv;
3982
3983         peer_device = conn_peer_device(connection, pi->vnr);
3984         if (!peer_device)
3985                 return -EIO;
3986         device = peer_device->device;
3987
3988         mask.i = be32_to_cpu(p->mask);
3989         val.i = be32_to_cpu(p->val);
3990
3991         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3992             mutex_is_locked(device->state_mutex)) {
3993                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3994                 return 0;
3995         }
3996
3997         mask = convert_state(mask);
3998         val = convert_state(val);
3999
4000         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4001         drbd_send_sr_reply(peer_device, rv);
4002
4003         drbd_md_sync(device);
4004
4005         return 0;
4006 }
4007
4008 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4009 {
4010         struct p_req_state *p = pi->data;
4011         union drbd_state mask, val;
4012         enum drbd_state_rv rv;
4013
4014         mask.i = be32_to_cpu(p->mask);
4015         val.i = be32_to_cpu(p->val);
4016
4017         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4018             mutex_is_locked(&connection->cstate_mutex)) {
4019                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4020                 return 0;
4021         }
4022
4023         mask = convert_state(mask);
4024         val = convert_state(val);
4025
4026         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4027         conn_send_sr_reply(connection, rv);
4028
4029         return 0;
4030 }
4031
4032 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4033 {
4034         struct drbd_peer_device *peer_device;
4035         struct drbd_device *device;
4036         struct p_state *p = pi->data;
4037         union drbd_state os, ns, peer_state;
4038         enum drbd_disk_state real_peer_disk;
4039         enum chg_state_flags cs_flags;
4040         int rv;
4041
4042         peer_device = conn_peer_device(connection, pi->vnr);
4043         if (!peer_device)
4044                 return config_unknown_volume(connection, pi);
4045         device = peer_device->device;
4046
4047         peer_state.i = be32_to_cpu(p->state);
4048
4049         real_peer_disk = peer_state.disk;
4050         if (peer_state.disk == D_NEGOTIATING) {
4051                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4052                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4053         }
4054
4055         spin_lock_irq(&device->resource->req_lock);
4056  retry:
4057         os = ns = drbd_read_state(device);
4058         spin_unlock_irq(&device->resource->req_lock);
4059
4060         /* If some other part of the code (asender thread, timeout)
4061          * already decided to close the connection again,
4062          * we must not "re-establish" it here. */
4063         if (os.conn <= C_TEAR_DOWN)
4064                 return -ECONNRESET;
4065
4066         /* If this is the "end of sync" confirmation, usually the peer disk
4067          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4068          * set) resync started in PausedSyncT, or if the timing of pause-/
4069          * unpause-sync events has been "just right", the peer disk may
4070          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4071          */
4072         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4073             real_peer_disk == D_UP_TO_DATE &&
4074             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4075                 /* If we are (becoming) SyncSource, but peer is still in sync
4076                  * preparation, ignore its uptodate-ness to avoid flapping, it
4077                  * will change to inconsistent once the peer reaches active
4078                  * syncing states.
4079                  * It may have changed syncer-paused flags, however, so we
4080                  * cannot ignore this completely. */
4081                 if (peer_state.conn > C_CONNECTED &&
4082                     peer_state.conn < C_SYNC_SOURCE)
4083                         real_peer_disk = D_INCONSISTENT;
4084
4085                 /* if peer_state changes to connected at the same time,
4086                  * it explicitly notifies us that it finished resync.
4087                  * Maybe we should finish it up, too? */
4088                 else if (os.conn >= C_SYNC_SOURCE &&
4089                          peer_state.conn == C_CONNECTED) {
4090                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4091                                 drbd_resync_finished(device);
4092                         return 0;
4093                 }
4094         }
4095
4096         /* explicit verify finished notification, stop sector reached. */
4097         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4098             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4099                 ov_out_of_sync_print(device);
4100                 drbd_resync_finished(device);
4101                 return 0;
4102         }
4103
4104         /* peer says his disk is inconsistent, while we think it is uptodate,
4105          * and this happens while the peer still thinks we have a sync going on,
4106          * but we think we are already done with the sync.
4107          * We ignore this to avoid flapping pdsk.
4108          * This should not happen, if the peer is a recent version of drbd. */
4109         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4110             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4111                 real_peer_disk = D_UP_TO_DATE;
4112
4113         if (ns.conn == C_WF_REPORT_PARAMS)
4114                 ns.conn = C_CONNECTED;
4115
4116         if (peer_state.conn == C_AHEAD)
4117                 ns.conn = C_BEHIND;
4118
4119         /* TODO:
4120          * if (primary and diskless and peer uuid != effective uuid)
4121          *     abort attach on peer;
4122          *
4123          * If this node does not have good data, was already connected, but
4124          * the peer did a late attach only now, trying to "negotiate" with me,
4125          * AND I am currently Primary, possibly frozen, with some specific
4126          * "effective" uuid, this should never be reached, really, because
4127          * we first send the uuids, then the current state.
4128          *
4129          * In this scenario, we already dropped the connection hard
4130          * when we received the unsuitable uuids (receive_uuids().
4131          *
4132          * Should we want to change this, that is: not drop the connection in
4133          * receive_uuids() already, then we would need to add a branch here
4134          * that aborts the attach of "unsuitable uuids" on the peer in case
4135          * this node is currently Diskless Primary.
4136          */
4137
4138         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4139             get_ldev_if_state(device, D_NEGOTIATING)) {
4140                 int cr; /* consider resync */
4141
4142                 /* if we established a new connection */
4143                 cr  = (os.conn < C_CONNECTED);
4144                 /* if we had an established connection
4145                  * and one of the nodes newly attaches a disk */
4146                 cr |= (os.conn == C_CONNECTED &&
4147                        (peer_state.disk == D_NEGOTIATING ||
4148                         os.disk == D_NEGOTIATING));
4149                 /* if we have both been inconsistent, and the peer has been
4150                  * forced to be UpToDate with --overwrite-data */
4151                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4152                 /* if we had been plain connected, and the admin requested to
4153                  * start a sync by "invalidate" or "invalidate-remote" */
4154                 cr |= (os.conn == C_CONNECTED &&
4155                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4156                                  peer_state.conn <= C_WF_BITMAP_T));
4157
4158                 if (cr)
4159                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4160
4161                 put_ldev(device);
4162                 if (ns.conn == C_MASK) {
4163                         ns.conn = C_CONNECTED;
4164                         if (device->state.disk == D_NEGOTIATING) {
4165                                 drbd_force_state(device, NS(disk, D_FAILED));
4166                         } else if (peer_state.disk == D_NEGOTIATING) {
4167                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4168                                 peer_state.disk = D_DISKLESS;
4169                                 real_peer_disk = D_DISKLESS;
4170                         } else {
4171                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4172                                         return -EIO;
4173                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4174                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4175                                 return -EIO;
4176                         }
4177                 }
4178         }
4179
4180         spin_lock_irq(&device->resource->req_lock);
4181         if (os.i != drbd_read_state(device).i)
4182                 goto retry;
4183         clear_bit(CONSIDER_RESYNC, &device->flags);
4184         ns.peer = peer_state.role;
4185         ns.pdsk = real_peer_disk;
4186         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4187         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4188                 ns.disk = device->new_state_tmp.disk;
4189         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4190         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4191             test_bit(NEW_CUR_UUID, &device->flags)) {
4192                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4193                    for temporal network outages! */
4194                 spin_unlock_irq(&device->resource->req_lock);
4195                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4196                 tl_clear(peer_device->connection);
4197                 drbd_uuid_new_current(device);
4198                 clear_bit(NEW_CUR_UUID, &device->flags);
4199                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4200                 return -EIO;
4201         }
4202         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4203         ns = drbd_read_state(device);
4204         spin_unlock_irq(&device->resource->req_lock);
4205
4206         if (rv < SS_SUCCESS) {
4207                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4208                 return -EIO;
4209         }
4210
4211         if (os.conn > C_WF_REPORT_PARAMS) {
4212                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4213                     peer_state.disk != D_NEGOTIATING ) {
4214                         /* we want resync, peer has not yet decided to sync... */
4215                         /* Nowadays only used when forcing a node into primary role and
4216                            setting its disk to UpToDate with that */
4217                         drbd_send_uuids(peer_device);
4218                         drbd_send_current_state(peer_device);
4219                 }
4220         }
4221
4222         clear_bit(DISCARD_MY_DATA, &device->flags);
4223
4224         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4225
4226         return 0;
4227 }
4228
4229 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4230 {
4231         struct drbd_peer_device *peer_device;
4232         struct drbd_device *device;
4233         struct p_rs_uuid *p = pi->data;
4234
4235         peer_device = conn_peer_device(connection, pi->vnr);
4236         if (!peer_device)
4237                 return -EIO;
4238         device = peer_device->device;
4239
4240         wait_event(device->misc_wait,
4241                    device->state.conn == C_WF_SYNC_UUID ||
4242                    device->state.conn == C_BEHIND ||
4243                    device->state.conn < C_CONNECTED ||
4244                    device->state.disk < D_NEGOTIATING);
4245
4246         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4247
4248         /* Here the _drbd_uuid_ functions are right, current should
4249            _not_ be rotated into the history */
4250         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4251                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4252                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4253
4254                 drbd_print_uuids(device, "updated sync uuid");
4255                 drbd_start_resync(device, C_SYNC_TARGET);
4256
4257                 put_ldev(device);
4258         } else
4259                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4260
4261         return 0;
4262 }
4263
4264 /**
4265  * receive_bitmap_plain
4266  *
4267  * Return 0 when done, 1 when another iteration is needed, and a negative error
4268  * code upon failure.
4269  */
4270 static int
4271 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4272                      unsigned long *p, struct bm_xfer_ctx *c)
4273 {
4274         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4275                                  drbd_header_size(peer_device->connection);
4276         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4277                                        c->bm_words - c->word_offset);
4278         unsigned int want = num_words * sizeof(*p);
4279         int err;
4280
4281         if (want != size) {
4282                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4283                 return -EIO;
4284         }
4285         if (want == 0)
4286                 return 0;
4287         err = drbd_recv_all(peer_device->connection, p, want);
4288         if (err)
4289                 return err;
4290
4291         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4292
4293         c->word_offset += num_words;
4294         c->bit_offset = c->word_offset * BITS_PER_LONG;
4295         if (c->bit_offset > c->bm_bits)
4296                 c->bit_offset = c->bm_bits;
4297
4298         return 1;
4299 }
4300
4301 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4302 {
4303         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4304 }
4305
4306 static int dcbp_get_start(struct p_compressed_bm *p)
4307 {
4308         return (p->encoding & 0x80) != 0;
4309 }
4310
4311 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4312 {
4313         return (p->encoding >> 4) & 0x7;
4314 }
4315
4316 /**
4317  * recv_bm_rle_bits
4318  *
4319  * Return 0 when done, 1 when another iteration is needed, and a negative error
4320  * code upon failure.
4321  */
4322 static int
4323 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4324                 struct p_compressed_bm *p,
4325                  struct bm_xfer_ctx *c,
4326                  unsigned int len)
4327 {
4328         struct bitstream bs;
4329         u64 look_ahead;
4330         u64 rl;
4331         u64 tmp;
4332         unsigned long s = c->bit_offset;
4333         unsigned long e;
4334         int toggle = dcbp_get_start(p);
4335         int have;
4336         int bits;
4337
4338         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4339
4340         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4341         if (bits < 0)
4342                 return -EIO;
4343
4344         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4345                 bits = vli_decode_bits(&rl, look_ahead);
4346                 if (bits <= 0)
4347                         return -EIO;
4348
4349                 if (toggle) {
4350                         e = s + rl -1;
4351                         if (e >= c->bm_bits) {
4352                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4353                                 return -EIO;
4354                         }
4355                         _drbd_bm_set_bits(peer_device->device, s, e);
4356                 }
4357
4358                 if (have < bits) {
4359                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4360                                 have, bits, look_ahead,
4361                                 (unsigned int)(bs.cur.b - p->code),
4362                                 (unsigned int)bs.buf_len);
4363                         return -EIO;
4364                 }
4365                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4366                 if (likely(bits < 64))
4367                         look_ahead >>= bits;
4368                 else
4369                         look_ahead = 0;
4370                 have -= bits;
4371
4372                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4373                 if (bits < 0)
4374                         return -EIO;
4375                 look_ahead |= tmp << have;
4376                 have += bits;
4377         }
4378
4379         c->bit_offset = s;
4380         bm_xfer_ctx_bit_to_word_offset(c);
4381
4382         return (s != c->bm_bits);
4383 }
4384
4385 /**
4386  * decode_bitmap_c
4387  *
4388  * Return 0 when done, 1 when another iteration is needed, and a negative error
4389  * code upon failure.
4390  */
4391 static int
4392 decode_bitmap_c(struct drbd_peer_device *peer_device,
4393                 struct p_compressed_bm *p,
4394                 struct bm_xfer_ctx *c,
4395                 unsigned int len)
4396 {
4397         if (dcbp_get_code(p) == RLE_VLI_Bits)
4398                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4399
4400         /* other variants had been implemented for evaluation,
4401          * but have been dropped as this one turned out to be "best"
4402          * during all our tests. */
4403
4404         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4405         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4406         return -EIO;
4407 }
4408
4409 void INFO_bm_xfer_stats(struct drbd_device *device,
4410                 const char *direction, struct bm_xfer_ctx *c)
4411 {
4412         /* what would it take to transfer it "plaintext" */
4413         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4414         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4415         unsigned int plain =
4416                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4417                 c->bm_words * sizeof(unsigned long);
4418         unsigned int total = c->bytes[0] + c->bytes[1];
4419         unsigned int r;
4420
4421         /* total can not be zero. but just in case: */
4422         if (total == 0)
4423                 return;
4424
4425         /* don't report if not compressed */
4426         if (total >= plain)
4427                 return;
4428
4429         /* total < plain. check for overflow, still */
4430         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4431                                     : (1000 * total / plain);
4432
4433         if (r > 1000)
4434                 r = 1000;
4435
4436         r = 1000 - r;
4437         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4438              "total %u; compression: %u.%u%%\n",
4439                         direction,
4440                         c->bytes[1], c->packets[1],
4441                         c->bytes[0], c->packets[0],
4442                         total, r/10, r % 10);
4443 }
4444
4445 /* Since we are processing the bitfield from lower addresses to higher,
4446    it does not matter if the process it in 32 bit chunks or 64 bit
4447    chunks as long as it is little endian. (Understand it as byte stream,
4448    beginning with the lowest byte...) If we would use big endian
4449    we would need to process it from the highest address to the lowest,
4450    in order to be agnostic to the 32 vs 64 bits issue.
4451
4452    returns 0 on failure, 1 if we successfully received it. */
4453 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4454 {
4455         struct drbd_peer_device *peer_device;
4456         struct drbd_device *device;
4457         struct bm_xfer_ctx c;
4458         int err;
4459
4460         peer_device = conn_peer_device(connection, pi->vnr);
4461         if (!peer_device)
4462                 return -EIO;
4463         device = peer_device->device;
4464
4465         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4466         /* you are supposed to send additional out-of-sync information
4467          * if you actually set bits during this phase */
4468
4469         c = (struct bm_xfer_ctx) {
4470                 .bm_bits = drbd_bm_bits(device),
4471                 .bm_words = drbd_bm_words(device),
4472         };
4473
4474         for(;;) {
4475                 if (pi->cmd == P_BITMAP)
4476                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4477                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4478                         /* MAYBE: sanity check that we speak proto >= 90,
4479                          * and the feature is enabled! */
4480                         struct p_compressed_bm *p = pi->data;
4481
4482                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4483                                 drbd_err(device, "ReportCBitmap packet too large\n");
4484                                 err = -EIO;
4485                                 goto out;
4486                         }
4487                         if (pi->size <= sizeof(*p)) {
4488                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4489                                 err = -EIO;
4490                                 goto out;
4491                         }
4492                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4493                         if (err)
4494                                goto out;
4495                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4496                 } else {
4497                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4498                         err = -EIO;
4499                         goto out;
4500                 }
4501
4502                 c.packets[pi->cmd == P_BITMAP]++;
4503                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4504
4505                 if (err <= 0) {
4506                         if (err < 0)
4507                                 goto out;
4508                         break;
4509                 }
4510                 err = drbd_recv_header(peer_device->connection, pi);
4511                 if (err)
4512                         goto out;
4513         }
4514
4515         INFO_bm_xfer_stats(device, "receive", &c);
4516
4517         if (device->state.conn == C_WF_BITMAP_T) {
4518                 enum drbd_state_rv rv;
4519
4520                 err = drbd_send_bitmap(device);
4521                 if (err)
4522                         goto out;
4523                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4524                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4525                 D_ASSERT(device, rv == SS_SUCCESS);
4526         } else if (device->state.conn != C_WF_BITMAP_S) {
4527                 /* admin may have requested C_DISCONNECTING,
4528                  * other threads may have noticed network errors */
4529                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4530                     drbd_conn_str(device->state.conn));
4531         }
4532         err = 0;
4533
4534  out:
4535         drbd_bm_unlock(device);
4536         if (!err && device->state.conn == C_WF_BITMAP_S)
4537                 drbd_start_resync(device, C_SYNC_SOURCE);
4538         return err;
4539 }
4540
4541 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4542 {
4543         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4544                  pi->cmd, pi->size);
4545
4546         return ignore_remaining_packet(connection, pi);
4547 }
4548
4549 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4550 {
4551         /* Make sure we've acked all the TCP data associated
4552          * with the data requests being unplugged */
4553         drbd_tcp_quickack(connection->data.socket);
4554
4555         return 0;
4556 }
4557
4558 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4559 {
4560         struct drbd_peer_device *peer_device;
4561         struct drbd_device *device;
4562         struct p_block_desc *p = pi->data;
4563
4564         peer_device = conn_peer_device(connection, pi->vnr);
4565         if (!peer_device)
4566                 return -EIO;
4567         device = peer_device->device;
4568
4569         switch (device->state.conn) {
4570         case C_WF_SYNC_UUID:
4571         case C_WF_BITMAP_T:
4572         case C_BEHIND:
4573                         break;
4574         default:
4575                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4576                                 drbd_conn_str(device->state.conn));
4577         }
4578
4579         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4580
4581         return 0;
4582 }
4583
4584 struct data_cmd {
4585         int expect_payload;
4586         size_t pkt_size;
4587         int (*fn)(struct drbd_connection *, struct packet_info *);
4588 };
4589
4590 static struct data_cmd drbd_cmd_handler[] = {
4591         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4592         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4593         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4594         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4595         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4596         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4597         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4598         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4599         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4600         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4601         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4602         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4603         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4604         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4605         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4606         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4607         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4608         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4609         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4610         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4611         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4612         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4613         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4614         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4615         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4616 };
4617
4618 static void drbdd(struct drbd_connection *connection)
4619 {
4620         struct packet_info pi;
4621         size_t shs; /* sub header size */
4622         int err;
4623
4624         while (get_t_state(&connection->receiver) == RUNNING) {
4625                 struct data_cmd *cmd;
4626
4627                 drbd_thread_current_set_cpu(&connection->receiver);
4628                 update_receiver_timing_details(connection, drbd_recv_header);
4629                 if (drbd_recv_header(connection, &pi))
4630                         goto err_out;
4631
4632                 cmd = &drbd_cmd_handler[pi.cmd];
4633                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4634                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4635                                  cmdname(pi.cmd), pi.cmd);
4636                         goto err_out;
4637                 }
4638
4639                 shs = cmd->pkt_size;
4640                 if (pi.size > shs && !cmd->expect_payload) {
4641                         drbd_err(connection, "No payload expected %s l:%d\n",
4642                                  cmdname(pi.cmd), pi.size);
4643                         goto err_out;
4644                 }
4645
4646                 if (shs) {
4647                         update_receiver_timing_details(connection, drbd_recv_all_warn);
4648                         err = drbd_recv_all_warn(connection, pi.data, shs);
4649                         if (err)
4650                                 goto err_out;
4651                         pi.size -= shs;
4652                 }
4653
4654                 update_receiver_timing_details(connection, cmd->fn);
4655                 err = cmd->fn(connection, &pi);
4656                 if (err) {
4657                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4658                                  cmdname(pi.cmd), err, pi.size);
4659                         goto err_out;
4660                 }
4661         }
4662         return;
4663
4664     err_out:
4665         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4666 }
4667
4668 static void conn_disconnect(struct drbd_connection *connection)
4669 {
4670         struct drbd_peer_device *peer_device;
4671         enum drbd_conns oc;
4672         int vnr;
4673
4674         if (connection->cstate == C_STANDALONE)
4675                 return;
4676
4677         /* We are about to start the cleanup after connection loss.
4678          * Make sure drbd_make_request knows about that.
4679          * Usually we should be in some network failure state already,
4680          * but just in case we are not, we fix it up here.
4681          */
4682         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4683
4684         /* asender does not clean up anything. it must not interfere, either */
4685         drbd_thread_stop(&connection->asender);
4686         drbd_free_sock(connection);
4687
4688         rcu_read_lock();
4689         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4690                 struct drbd_device *device = peer_device->device;
4691                 kref_get(&device->kref);
4692                 rcu_read_unlock();
4693                 drbd_disconnected(peer_device);
4694                 kref_put(&device->kref, drbd_destroy_device);
4695                 rcu_read_lock();
4696         }
4697         rcu_read_unlock();
4698
4699         if (!list_empty(&connection->current_epoch->list))
4700                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4701         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4702         atomic_set(&connection->current_epoch->epoch_size, 0);
4703         connection->send.seen_any_write_yet = false;
4704
4705         drbd_info(connection, "Connection closed\n");
4706
4707         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4708                 conn_try_outdate_peer_async(connection);
4709
4710         spin_lock_irq(&connection->resource->req_lock);
4711         oc = connection->cstate;
4712         if (oc >= C_UNCONNECTED)
4713                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4714
4715         spin_unlock_irq(&connection->resource->req_lock);
4716
4717         if (oc == C_DISCONNECTING)
4718                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4719 }
4720
4721 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4722 {
4723         struct drbd_device *device = peer_device->device;
4724         unsigned int i;
4725
4726         /* wait for current activity to cease. */
4727         spin_lock_irq(&device->resource->req_lock);
4728         _drbd_wait_ee_list_empty(device, &device->active_ee);
4729         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4730         _drbd_wait_ee_list_empty(device, &device->read_ee);
4731         spin_unlock_irq(&device->resource->req_lock);
4732
4733         /* We do not have data structures that would allow us to
4734          * get the rs_pending_cnt down to 0 again.
4735          *  * On C_SYNC_TARGET we do not have any data structures describing
4736          *    the pending RSDataRequest's we have sent.
4737          *  * On C_SYNC_SOURCE there is no data structure that tracks
4738          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4739          *  And no, it is not the sum of the reference counts in the
4740          *  resync_LRU. The resync_LRU tracks the whole operation including
4741          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4742          *  on the fly. */
4743         drbd_rs_cancel_all(device);
4744         device->rs_total = 0;
4745         device->rs_failed = 0;
4746         atomic_set(&device->rs_pending_cnt, 0);
4747         wake_up(&device->misc_wait);
4748
4749         del_timer_sync(&device->resync_timer);
4750         resync_timer_fn((unsigned long)device);
4751
4752         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4753          * w_make_resync_request etc. which may still be on the worker queue
4754          * to be "canceled" */
4755         drbd_flush_workqueue(&peer_device->connection->sender_work);
4756
4757         drbd_finish_peer_reqs(device);
4758
4759         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4760            might have issued a work again. The one before drbd_finish_peer_reqs() is
4761            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4762         drbd_flush_workqueue(&peer_device->connection->sender_work);
4763
4764         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4765          * again via drbd_try_clear_on_disk_bm(). */
4766         drbd_rs_cancel_all(device);
4767
4768         kfree(device->p_uuid);
4769         device->p_uuid = NULL;
4770
4771         if (!drbd_suspended(device))
4772                 tl_clear(peer_device->connection);
4773
4774         drbd_md_sync(device);
4775
4776         /* serialize with bitmap writeout triggered by the state change,
4777          * if any. */
4778         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4779
4780         /* tcp_close and release of sendpage pages can be deferred.  I don't
4781          * want to use SO_LINGER, because apparently it can be deferred for
4782          * more than 20 seconds (longest time I checked).
4783          *
4784          * Actually we don't care for exactly when the network stack does its
4785          * put_page(), but release our reference on these pages right here.
4786          */
4787         i = drbd_free_peer_reqs(device, &device->net_ee);
4788         if (i)
4789                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4790         i = atomic_read(&device->pp_in_use_by_net);
4791         if (i)
4792                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4793         i = atomic_read(&device->pp_in_use);
4794         if (i)
4795                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4796
4797         D_ASSERT(device, list_empty(&device->read_ee));
4798         D_ASSERT(device, list_empty(&device->active_ee));
4799         D_ASSERT(device, list_empty(&device->sync_ee));
4800         D_ASSERT(device, list_empty(&device->done_ee));
4801
4802         return 0;
4803 }
4804
4805 /*
4806  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4807  * we can agree on is stored in agreed_pro_version.
4808  *
4809  * feature flags and the reserved array should be enough room for future
4810  * enhancements of the handshake protocol, and possible plugins...
4811  *
4812  * for now, they are expected to be zero, but ignored.
4813  */
4814 static int drbd_send_features(struct drbd_connection *connection)
4815 {
4816         struct drbd_socket *sock;
4817         struct p_connection_features *p;
4818
4819         sock = &connection->data;
4820         p = conn_prepare_command(connection, sock);
4821         if (!p)
4822                 return -EIO;
4823         memset(p, 0, sizeof(*p));
4824         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4825         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4826         p->feature_flags = cpu_to_be32(PRO_FEATURES);
4827         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4828 }
4829
4830 /*
4831  * return values:
4832  *   1 yes, we have a valid connection
4833  *   0 oops, did not work out, please try again
4834  *  -1 peer talks different language,
4835  *     no point in trying again, please go standalone.
4836  */
4837 static int drbd_do_features(struct drbd_connection *connection)
4838 {
4839         /* ASSERT current == connection->receiver ... */
4840         struct p_connection_features *p;
4841         const int expect = sizeof(struct p_connection_features);
4842         struct packet_info pi;
4843         int err;
4844
4845         err = drbd_send_features(connection);
4846         if (err)
4847                 return 0;
4848
4849         err = drbd_recv_header(connection, &pi);
4850         if (err)
4851                 return 0;
4852
4853         if (pi.cmd != P_CONNECTION_FEATURES) {
4854                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4855                          cmdname(pi.cmd), pi.cmd);
4856                 return -1;
4857         }
4858
4859         if (pi.size != expect) {
4860                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4861                      expect, pi.size);
4862                 return -1;
4863         }
4864
4865         p = pi.data;
4866         err = drbd_recv_all_warn(connection, p, expect);
4867         if (err)
4868                 return 0;
4869
4870         p->protocol_min = be32_to_cpu(p->protocol_min);
4871         p->protocol_max = be32_to_cpu(p->protocol_max);
4872         if (p->protocol_max == 0)
4873                 p->protocol_max = p->protocol_min;
4874
4875         if (PRO_VERSION_MAX < p->protocol_min ||
4876             PRO_VERSION_MIN > p->protocol_max)
4877                 goto incompat;
4878
4879         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4880         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4881
4882         drbd_info(connection, "Handshake successful: "
4883              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4884
4885         drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4886                   connection->agreed_features & FF_TRIM ? " " : " not ");
4887
4888         return 1;
4889
4890  incompat:
4891         drbd_err(connection, "incompatible DRBD dialects: "
4892             "I support %d-%d, peer supports %d-%d\n",
4893             PRO_VERSION_MIN, PRO_VERSION_MAX,
4894             p->protocol_min, p->protocol_max);
4895         return -1;
4896 }
4897
4898 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4899 static int drbd_do_auth(struct drbd_connection *connection)
4900 {
4901         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4902         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4903         return -1;
4904 }
4905 #else
4906 #define CHALLENGE_LEN 64
4907
4908 /* Return value:
4909         1 - auth succeeded,
4910         0 - failed, try again (network error),
4911         -1 - auth failed, don't try again.
4912 */
4913
4914 static int drbd_do_auth(struct drbd_connection *connection)
4915 {
4916         struct drbd_socket *sock;
4917         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4918         struct scatterlist sg;
4919         char *response = NULL;
4920         char *right_response = NULL;
4921         char *peers_ch = NULL;
4922         unsigned int key_len;
4923         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4924         unsigned int resp_size;
4925         struct hash_desc desc;
4926         struct packet_info pi;
4927         struct net_conf *nc;
4928         int err, rv;
4929
4930         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4931
4932         rcu_read_lock();
4933         nc = rcu_dereference(connection->net_conf);
4934         key_len = strlen(nc->shared_secret);
4935         memcpy(secret, nc->shared_secret, key_len);
4936         rcu_read_unlock();
4937
4938         desc.tfm = connection->cram_hmac_tfm;
4939         desc.flags = 0;
4940
4941         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4942         if (rv) {
4943                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4944                 rv = -1;
4945                 goto fail;
4946         }
4947
4948         get_random_bytes(my_challenge, CHALLENGE_LEN);
4949
4950         sock = &connection->data;
4951         if (!conn_prepare_command(connection, sock)) {
4952                 rv = 0;
4953                 goto fail;
4954         }
4955         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4956                                 my_challenge, CHALLENGE_LEN);
4957         if (!rv)
4958                 goto fail;
4959
4960         err = drbd_recv_header(connection, &pi);
4961         if (err) {
4962                 rv = 0;
4963                 goto fail;
4964         }
4965
4966         if (pi.cmd != P_AUTH_CHALLENGE) {
4967                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4968                          cmdname(pi.cmd), pi.cmd);
4969                 rv = 0;
4970                 goto fail;
4971         }
4972
4973         if (pi.size > CHALLENGE_LEN * 2) {
4974                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4975                 rv = -1;
4976                 goto fail;
4977         }
4978
4979         if (pi.size < CHALLENGE_LEN) {
4980                 drbd_err(connection, "AuthChallenge payload too small.\n");
4981                 rv = -1;
4982                 goto fail;
4983         }
4984
4985         peers_ch = kmalloc(pi.size, GFP_NOIO);
4986         if (peers_ch == NULL) {
4987                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4988                 rv = -1;
4989                 goto fail;
4990         }
4991
4992         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4993         if (err) {
4994                 rv = 0;
4995                 goto fail;
4996         }
4997
4998         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4999                 drbd_err(connection, "Peer presented the same challenge!\n");
5000                 rv = -1;
5001                 goto fail;
5002         }
5003
5004         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
5005         response = kmalloc(resp_size, GFP_NOIO);
5006         if (response == NULL) {
5007                 drbd_err(connection, "kmalloc of response failed\n");
5008                 rv = -1;
5009                 goto fail;
5010         }
5011
5012         sg_init_table(&sg, 1);
5013         sg_set_buf(&sg, peers_ch, pi.size);
5014
5015         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
5016         if (rv) {
5017                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5018                 rv = -1;
5019                 goto fail;
5020         }
5021
5022         if (!conn_prepare_command(connection, sock)) {
5023                 rv = 0;
5024                 goto fail;
5025         }
5026         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5027                                 response, resp_size);
5028         if (!rv)
5029                 goto fail;
5030
5031         err = drbd_recv_header(connection, &pi);
5032         if (err) {
5033                 rv = 0;
5034                 goto fail;
5035         }
5036
5037         if (pi.cmd != P_AUTH_RESPONSE) {
5038                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5039                          cmdname(pi.cmd), pi.cmd);
5040                 rv = 0;
5041                 goto fail;
5042         }
5043
5044         if (pi.size != resp_size) {
5045                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5046                 rv = 0;
5047                 goto fail;
5048         }
5049
5050         err = drbd_recv_all_warn(connection, response , resp_size);
5051         if (err) {
5052                 rv = 0;
5053                 goto fail;
5054         }
5055
5056         right_response = kmalloc(resp_size, GFP_NOIO);
5057         if (right_response == NULL) {
5058                 drbd_err(connection, "kmalloc of right_response failed\n");
5059                 rv = -1;
5060                 goto fail;
5061         }
5062
5063         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5064
5065         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5066         if (rv) {
5067                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5068                 rv = -1;
5069                 goto fail;
5070         }
5071
5072         rv = !memcmp(response, right_response, resp_size);
5073
5074         if (rv)
5075                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5076                      resp_size);
5077         else
5078                 rv = -1;
5079
5080  fail:
5081         kfree(peers_ch);
5082         kfree(response);
5083         kfree(right_response);
5084
5085         return rv;
5086 }
5087 #endif
5088
5089 int drbd_receiver(struct drbd_thread *thi)
5090 {
5091         struct drbd_connection *connection = thi->connection;
5092         int h;
5093
5094         drbd_info(connection, "receiver (re)started\n");
5095
5096         do {
5097                 h = conn_connect(connection);
5098                 if (h == 0) {
5099                         conn_disconnect(connection);
5100                         schedule_timeout_interruptible(HZ);
5101                 }
5102                 if (h == -1) {
5103                         drbd_warn(connection, "Discarding network configuration.\n");
5104                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5105                 }
5106         } while (h == 0);
5107
5108         if (h > 0)
5109                 drbdd(connection);
5110
5111         conn_disconnect(connection);
5112
5113         drbd_info(connection, "receiver terminated\n");
5114         return 0;
5115 }
5116
5117 /* ********* acknowledge sender ******** */
5118
5119 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5120 {
5121         struct p_req_state_reply *p = pi->data;
5122         int retcode = be32_to_cpu(p->retcode);
5123
5124         if (retcode >= SS_SUCCESS) {
5125                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5126         } else {
5127                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5128                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5129                          drbd_set_st_err_str(retcode), retcode);
5130         }
5131         wake_up(&connection->ping_wait);
5132
5133         return 0;
5134 }
5135
5136 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5137 {
5138         struct drbd_peer_device *peer_device;
5139         struct drbd_device *device;
5140         struct p_req_state_reply *p = pi->data;
5141         int retcode = be32_to_cpu(p->retcode);
5142
5143         peer_device = conn_peer_device(connection, pi->vnr);
5144         if (!peer_device)
5145                 return -EIO;
5146         device = peer_device->device;
5147
5148         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5149                 D_ASSERT(device, connection->agreed_pro_version < 100);
5150                 return got_conn_RqSReply(connection, pi);
5151         }
5152
5153         if (retcode >= SS_SUCCESS) {
5154                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5155         } else {
5156                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5157                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5158                         drbd_set_st_err_str(retcode), retcode);
5159         }
5160         wake_up(&device->state_wait);
5161
5162         return 0;
5163 }
5164
5165 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5166 {
5167         return drbd_send_ping_ack(connection);
5168
5169 }
5170
5171 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5172 {
5173         /* restore idle timeout */
5174         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5175         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5176                 wake_up(&connection->ping_wait);
5177
5178         return 0;
5179 }
5180
5181 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5182 {
5183         struct drbd_peer_device *peer_device;
5184         struct drbd_device *device;
5185         struct p_block_ack *p = pi->data;
5186         sector_t sector = be64_to_cpu(p->sector);
5187         int blksize = be32_to_cpu(p->blksize);
5188
5189         peer_device = conn_peer_device(connection, pi->vnr);
5190         if (!peer_device)
5191                 return -EIO;
5192         device = peer_device->device;
5193
5194         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5195
5196         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5197
5198         if (get_ldev(device)) {
5199                 drbd_rs_complete_io(device, sector);
5200                 drbd_set_in_sync(device, sector, blksize);
5201                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5202                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5203                 put_ldev(device);
5204         }
5205         dec_rs_pending(device);
5206         atomic_add(blksize >> 9, &device->rs_sect_in);
5207
5208         return 0;
5209 }
5210
5211 static int
5212 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5213                               struct rb_root *root, const char *func,
5214                               enum drbd_req_event what, bool missing_ok)
5215 {
5216         struct drbd_request *req;
5217         struct bio_and_error m;
5218
5219         spin_lock_irq(&device->resource->req_lock);
5220         req = find_request(device, root, id, sector, missing_ok, func);
5221         if (unlikely(!req)) {
5222                 spin_unlock_irq(&device->resource->req_lock);
5223                 return -EIO;
5224         }
5225         __req_mod(req, what, &m);
5226         spin_unlock_irq(&device->resource->req_lock);
5227
5228         if (m.bio)
5229                 complete_master_bio(device, &m);
5230         return 0;
5231 }
5232
5233 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5234 {
5235         struct drbd_peer_device *peer_device;
5236         struct drbd_device *device;
5237         struct p_block_ack *p = pi->data;
5238         sector_t sector = be64_to_cpu(p->sector);
5239         int blksize = be32_to_cpu(p->blksize);
5240         enum drbd_req_event what;
5241
5242         peer_device = conn_peer_device(connection, pi->vnr);
5243         if (!peer_device)
5244                 return -EIO;
5245         device = peer_device->device;
5246
5247         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5248
5249         if (p->block_id == ID_SYNCER) {
5250                 drbd_set_in_sync(device, sector, blksize);
5251                 dec_rs_pending(device);
5252                 return 0;
5253         }
5254         switch (pi->cmd) {
5255         case P_RS_WRITE_ACK:
5256                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5257                 break;
5258         case P_WRITE_ACK:
5259                 what = WRITE_ACKED_BY_PEER;
5260                 break;
5261         case P_RECV_ACK:
5262                 what = RECV_ACKED_BY_PEER;
5263                 break;
5264         case P_SUPERSEDED:
5265                 what = CONFLICT_RESOLVED;
5266                 break;
5267         case P_RETRY_WRITE:
5268                 what = POSTPONE_WRITE;
5269                 break;
5270         default:
5271                 BUG();
5272         }
5273
5274         return validate_req_change_req_state(device, p->block_id, sector,
5275                                              &device->write_requests, __func__,
5276                                              what, false);
5277 }
5278
5279 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5280 {
5281         struct drbd_peer_device *peer_device;
5282         struct drbd_device *device;
5283         struct p_block_ack *p = pi->data;
5284         sector_t sector = be64_to_cpu(p->sector);
5285         int size = be32_to_cpu(p->blksize);
5286         int err;
5287
5288         peer_device = conn_peer_device(connection, pi->vnr);
5289         if (!peer_device)
5290                 return -EIO;
5291         device = peer_device->device;
5292
5293         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5294
5295         if (p->block_id == ID_SYNCER) {
5296                 dec_rs_pending(device);
5297                 drbd_rs_failed_io(device, sector, size);
5298                 return 0;
5299         }
5300
5301         err = validate_req_change_req_state(device, p->block_id, sector,
5302                                             &device->write_requests, __func__,
5303                                             NEG_ACKED, true);
5304         if (err) {
5305                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5306                    The master bio might already be completed, therefore the
5307                    request is no longer in the collision hash. */
5308                 /* In Protocol B we might already have got a P_RECV_ACK
5309                    but then get a P_NEG_ACK afterwards. */
5310                 drbd_set_out_of_sync(device, sector, size);
5311         }
5312         return 0;
5313 }
5314
5315 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5316 {
5317         struct drbd_peer_device *peer_device;
5318         struct drbd_device *device;
5319         struct p_block_ack *p = pi->data;
5320         sector_t sector = be64_to_cpu(p->sector);
5321
5322         peer_device = conn_peer_device(connection, pi->vnr);
5323         if (!peer_device)
5324                 return -EIO;
5325         device = peer_device->device;
5326
5327         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5328
5329         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5330             (unsigned long long)sector, be32_to_cpu(p->blksize));
5331
5332         return validate_req_change_req_state(device, p->block_id, sector,
5333                                              &device->read_requests, __func__,
5334                                              NEG_ACKED, false);
5335 }
5336
5337 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5338 {
5339         struct drbd_peer_device *peer_device;
5340         struct drbd_device *device;
5341         sector_t sector;
5342         int size;
5343         struct p_block_ack *p = pi->data;
5344
5345         peer_device = conn_peer_device(connection, pi->vnr);
5346         if (!peer_device)
5347                 return -EIO;
5348         device = peer_device->device;
5349
5350         sector = be64_to_cpu(p->sector);
5351         size = be32_to_cpu(p->blksize);
5352
5353         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5354
5355         dec_rs_pending(device);
5356
5357         if (get_ldev_if_state(device, D_FAILED)) {
5358                 drbd_rs_complete_io(device, sector);
5359                 switch (pi->cmd) {
5360                 case P_NEG_RS_DREPLY:
5361                         drbd_rs_failed_io(device, sector, size);
5362                 case P_RS_CANCEL:
5363                         break;
5364                 default:
5365                         BUG();
5366                 }
5367                 put_ldev(device);
5368         }
5369
5370         return 0;
5371 }
5372
5373 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5374 {
5375         struct p_barrier_ack *p = pi->data;
5376         struct drbd_peer_device *peer_device;
5377         int vnr;
5378
5379         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5380
5381         rcu_read_lock();
5382         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5383                 struct drbd_device *device = peer_device->device;
5384
5385                 if (device->state.conn == C_AHEAD &&
5386                     atomic_read(&device->ap_in_flight) == 0 &&
5387                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5388                         device->start_resync_timer.expires = jiffies + HZ;
5389                         add_timer(&device->start_resync_timer);
5390                 }
5391         }
5392         rcu_read_unlock();
5393
5394         return 0;
5395 }
5396
5397 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5398 {
5399         struct drbd_peer_device *peer_device;
5400         struct drbd_device *device;
5401         struct p_block_ack *p = pi->data;
5402         struct drbd_device_work *dw;
5403         sector_t sector;
5404         int size;
5405
5406         peer_device = conn_peer_device(connection, pi->vnr);
5407         if (!peer_device)
5408                 return -EIO;
5409         device = peer_device->device;
5410
5411         sector = be64_to_cpu(p->sector);
5412         size = be32_to_cpu(p->blksize);
5413
5414         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5415
5416         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5417                 drbd_ov_out_of_sync_found(device, sector, size);
5418         else
5419                 ov_out_of_sync_print(device);
5420
5421         if (!get_ldev(device))
5422                 return 0;
5423
5424         drbd_rs_complete_io(device, sector);
5425         dec_rs_pending(device);
5426
5427         --device->ov_left;
5428
5429         /* let's advance progress step marks only for every other megabyte */
5430         if ((device->ov_left & 0x200) == 0x200)
5431                 drbd_advance_rs_marks(device, device->ov_left);
5432
5433         if (device->ov_left == 0) {
5434                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5435                 if (dw) {
5436                         dw->w.cb = w_ov_finished;
5437                         dw->device = device;
5438                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5439                 } else {
5440                         drbd_err(device, "kmalloc(dw) failed.");
5441                         ov_out_of_sync_print(device);
5442                         drbd_resync_finished(device);
5443                 }
5444         }
5445         put_ldev(device);
5446         return 0;
5447 }
5448
5449 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5450 {
5451         return 0;
5452 }
5453
5454 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5455 {
5456         struct drbd_peer_device *peer_device;
5457         int vnr, not_empty = 0;
5458
5459         do {
5460                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5461                 flush_signals(current);
5462
5463                 rcu_read_lock();
5464                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5465                         struct drbd_device *device = peer_device->device;
5466                         kref_get(&device->kref);
5467                         rcu_read_unlock();
5468                         if (drbd_finish_peer_reqs(device)) {
5469                                 kref_put(&device->kref, drbd_destroy_device);
5470                                 return 1;
5471                         }
5472                         kref_put(&device->kref, drbd_destroy_device);
5473                         rcu_read_lock();
5474                 }
5475                 set_bit(SIGNAL_ASENDER, &connection->flags);
5476
5477                 spin_lock_irq(&connection->resource->req_lock);
5478                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5479                         struct drbd_device *device = peer_device->device;
5480                         not_empty = !list_empty(&device->done_ee);
5481                         if (not_empty)
5482                                 break;
5483                 }
5484                 spin_unlock_irq(&connection->resource->req_lock);
5485                 rcu_read_unlock();
5486         } while (not_empty);
5487
5488         return 0;
5489 }
5490
5491 struct asender_cmd {
5492         size_t pkt_size;
5493         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5494 };
5495
5496 static struct asender_cmd asender_tbl[] = {
5497         [P_PING]            = { 0, got_Ping },
5498         [P_PING_ACK]        = { 0, got_PingAck },
5499         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5500         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5501         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5502         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5503         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5504         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5505         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5506         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5507         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5508         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5509         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5510         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5511         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5512         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5513         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5514 };
5515
5516 int drbd_asender(struct drbd_thread *thi)
5517 {
5518         struct drbd_connection *connection = thi->connection;
5519         struct asender_cmd *cmd = NULL;
5520         struct packet_info pi;
5521         int rv;
5522         void *buf    = connection->meta.rbuf;
5523         int received = 0;
5524         unsigned int header_size = drbd_header_size(connection);
5525         int expect   = header_size;
5526         bool ping_timeout_active = false;
5527         struct net_conf *nc;
5528         int ping_timeo, tcp_cork, ping_int;
5529         struct sched_param param = { .sched_priority = 2 };
5530
5531         rv = sched_setscheduler(current, SCHED_RR, &param);
5532         if (rv < 0)
5533                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5534
5535         while (get_t_state(thi) == RUNNING) {
5536                 drbd_thread_current_set_cpu(thi);
5537
5538                 rcu_read_lock();
5539                 nc = rcu_dereference(connection->net_conf);
5540                 ping_timeo = nc->ping_timeo;
5541                 tcp_cork = nc->tcp_cork;
5542                 ping_int = nc->ping_int;
5543                 rcu_read_unlock();
5544
5545                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5546                         if (drbd_send_ping(connection)) {
5547                                 drbd_err(connection, "drbd_send_ping has failed\n");
5548                                 goto reconnect;
5549                         }
5550                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5551                         ping_timeout_active = true;
5552                 }
5553
5554                 /* TODO: conditionally cork; it may hurt latency if we cork without
5555                    much to send */
5556                 if (tcp_cork)
5557                         drbd_tcp_cork(connection->meta.socket);
5558                 if (connection_finish_peer_reqs(connection)) {
5559                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5560                         goto reconnect;
5561                 }
5562                 /* but unconditionally uncork unless disabled */
5563                 if (tcp_cork)
5564                         drbd_tcp_uncork(connection->meta.socket);
5565
5566                 /* short circuit, recv_msg would return EINTR anyways. */
5567                 if (signal_pending(current))
5568                         continue;
5569
5570                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5571                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5572
5573                 flush_signals(current);
5574
5575                 /* Note:
5576                  * -EINTR        (on meta) we got a signal
5577                  * -EAGAIN       (on meta) rcvtimeo expired
5578                  * -ECONNRESET   other side closed the connection
5579                  * -ERESTARTSYS  (on data) we got a signal
5580                  * rv <  0       other than above: unexpected error!
5581                  * rv == expected: full header or command
5582                  * rv <  expected: "woken" by signal during receive
5583                  * rv == 0       : "connection shut down by peer"
5584                  */
5585 received_more:
5586                 if (likely(rv > 0)) {
5587                         received += rv;
5588                         buf      += rv;
5589                 } else if (rv == 0) {
5590                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5591                                 long t;
5592                                 rcu_read_lock();
5593                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5594                                 rcu_read_unlock();
5595
5596                                 t = wait_event_timeout(connection->ping_wait,
5597                                                        connection->cstate < C_WF_REPORT_PARAMS,
5598                                                        t);
5599                                 if (t)
5600                                         break;
5601                         }
5602                         drbd_err(connection, "meta connection shut down by peer.\n");
5603                         goto reconnect;
5604                 } else if (rv == -EAGAIN) {
5605                         /* If the data socket received something meanwhile,
5606                          * that is good enough: peer is still alive. */
5607                         if (time_after(connection->last_received,
5608                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5609                                 continue;
5610                         if (ping_timeout_active) {
5611                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5612                                 goto reconnect;
5613                         }
5614                         set_bit(SEND_PING, &connection->flags);
5615                         continue;
5616                 } else if (rv == -EINTR) {
5617                         continue;
5618                 } else {
5619                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5620                         goto reconnect;
5621                 }
5622
5623                 if (received == expect && cmd == NULL) {
5624                         if (decode_header(connection, connection->meta.rbuf, &pi))
5625                                 goto reconnect;
5626                         cmd = &asender_tbl[pi.cmd];
5627                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5628                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5629                                          cmdname(pi.cmd), pi.cmd);
5630                                 goto disconnect;
5631                         }
5632                         expect = header_size + cmd->pkt_size;
5633                         if (pi.size != expect - header_size) {
5634                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5635                                         pi.cmd, pi.size);
5636                                 goto reconnect;
5637                         }
5638                 }
5639                 if (received == expect) {
5640                         bool err;
5641
5642                         err = cmd->fn(connection, &pi);
5643                         if (err) {
5644                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5645                                 goto reconnect;
5646                         }
5647
5648                         connection->last_received = jiffies;
5649
5650                         if (cmd == &asender_tbl[P_PING_ACK]) {
5651                                 /* restore idle timeout */
5652                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5653                                 ping_timeout_active = false;
5654                         }
5655
5656                         buf      = connection->meta.rbuf;
5657                         received = 0;
5658                         expect   = header_size;
5659                         cmd      = NULL;
5660                 }
5661                 if (test_bit(SEND_PING, &connection->flags))
5662                         continue;
5663                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5664                 if (rv > 0)
5665                         goto received_more;
5666         }
5667
5668         if (0) {
5669 reconnect:
5670                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5671                 conn_md_sync(connection);
5672         }
5673         if (0) {
5674 disconnect:
5675                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5676         }
5677         clear_bit(SIGNAL_ASENDER, &connection->flags);
5678
5679         drbd_info(connection, "asender terminated\n");
5680
5681         return 0;
5682 }