GNU Linux-libre 4.19.264-gnu1
[releases.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54
55 struct packet_info {
56         enum drbd_packet cmd;
57         unsigned int size;
58         unsigned int vnr;
59         void *data;
60 };
61
62 enum finish_epoch {
63         FE_STILL_LIVE,
64         FE_DESTROYED,
65         FE_RECYCLED,
66 };
67
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74
75
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89         struct page *page;
90         struct page *tmp;
91
92         BUG_ON(!n);
93         BUG_ON(!head);
94
95         page = *head;
96
97         if (!page)
98                 return NULL;
99
100         while (page) {
101                 tmp = page_chain_next(page);
102                 if (--n == 0)
103                         break; /* found sufficient pages */
104                 if (tmp == NULL)
105                         /* insufficient pages, don't use any of them. */
106                         return NULL;
107                 page = tmp;
108         }
109
110         /* add end of list marker for the returned list */
111         set_page_private(page, 0);
112         /* actual return value, and adjustment of head */
113         page = *head;
114         *head = tmp;
115         return page;
116 }
117
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123         struct page *tmp;
124         int i = 1;
125         while ((tmp = page_chain_next(page)))
126                 ++i, page = tmp;
127         if (len)
128                 *len = i;
129         return page;
130 }
131
132 static int page_chain_free(struct page *page)
133 {
134         struct page *tmp;
135         int i = 0;
136         page_chain_for_each_safe(page, tmp) {
137                 put_page(page);
138                 ++i;
139         }
140         return i;
141 }
142
143 static void page_chain_add(struct page **head,
144                 struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147         struct page *tmp;
148         tmp = page_chain_tail(chain_first, NULL);
149         BUG_ON(tmp != chain_last);
150 #endif
151
152         /* add chain to head */
153         set_page_private(chain_last, (unsigned long)*head);
154         *head = chain_first;
155 }
156
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158                                        unsigned int number)
159 {
160         struct page *page = NULL;
161         struct page *tmp = NULL;
162         unsigned int i = 0;
163
164         /* Yes, testing drbd_pp_vacant outside the lock is racy.
165          * So what. It saves a spin_lock. */
166         if (drbd_pp_vacant >= number) {
167                 spin_lock(&drbd_pp_lock);
168                 page = page_chain_del(&drbd_pp_pool, number);
169                 if (page)
170                         drbd_pp_vacant -= number;
171                 spin_unlock(&drbd_pp_lock);
172                 if (page)
173                         return page;
174         }
175
176         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177          * "criss-cross" setup, that might cause write-out on some other DRBD,
178          * which in turn might block on the other node at this very place.  */
179         for (i = 0; i < number; i++) {
180                 tmp = alloc_page(GFP_TRY);
181                 if (!tmp)
182                         break;
183                 set_page_private(tmp, (unsigned long)page);
184                 page = tmp;
185         }
186
187         if (i == number)
188                 return page;
189
190         /* Not enough pages immediately available this time.
191          * No need to jump around here, drbd_alloc_pages will retry this
192          * function "soon". */
193         if (page) {
194                 tmp = page_chain_tail(page, NULL);
195                 spin_lock(&drbd_pp_lock);
196                 page_chain_add(&drbd_pp_pool, page, tmp);
197                 drbd_pp_vacant += i;
198                 spin_unlock(&drbd_pp_lock);
199         }
200         return NULL;
201 }
202
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204                                            struct list_head *to_be_freed)
205 {
206         struct drbd_peer_request *peer_req, *tmp;
207
208         /* The EEs are always appended to the end of the list. Since
209            they are sent in order over the wire, they have to finish
210            in order. As soon as we see the first not finished we can
211            stop to examine the list... */
212
213         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214                 if (drbd_peer_req_has_active_page(peer_req))
215                         break;
216                 list_move(&peer_req->w.list, to_be_freed);
217         }
218 }
219
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222         LIST_HEAD(reclaimed);
223         struct drbd_peer_request *peer_req, *t;
224
225         spin_lock_irq(&device->resource->req_lock);
226         reclaim_finished_net_peer_reqs(device, &reclaimed);
227         spin_unlock_irq(&device->resource->req_lock);
228         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229                 drbd_free_net_peer_req(device, peer_req);
230 }
231
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234         struct drbd_peer_device *peer_device;
235         int vnr;
236
237         rcu_read_lock();
238         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239                 struct drbd_device *device = peer_device->device;
240                 if (!atomic_read(&device->pp_in_use_by_net))
241                         continue;
242
243                 kref_get(&device->kref);
244                 rcu_read_unlock();
245                 drbd_reclaim_net_peer_reqs(device);
246                 kref_put(&device->kref, drbd_destroy_device);
247                 rcu_read_lock();
248         }
249         rcu_read_unlock();
250 }
251
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:     DRBD device.
255  * @number:     number of pages requested
256  * @retry:      whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273                               bool retry)
274 {
275         struct drbd_device *device = peer_device->device;
276         struct page *page = NULL;
277         struct net_conf *nc;
278         DEFINE_WAIT(wait);
279         unsigned int mxb;
280
281         rcu_read_lock();
282         nc = rcu_dereference(peer_device->connection->net_conf);
283         mxb = nc ? nc->max_buffers : 1000000;
284         rcu_read_unlock();
285
286         if (atomic_read(&device->pp_in_use) < mxb)
287                 page = __drbd_alloc_pages(device, number);
288
289         /* Try to keep the fast path fast, but occasionally we need
290          * to reclaim the pages we lended to the network stack. */
291         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292                 drbd_reclaim_net_peer_reqs(device);
293
294         while (page == NULL) {
295                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296
297                 drbd_reclaim_net_peer_reqs(device);
298
299                 if (atomic_read(&device->pp_in_use) < mxb) {
300                         page = __drbd_alloc_pages(device, number);
301                         if (page)
302                                 break;
303                 }
304
305                 if (!retry)
306                         break;
307
308                 if (signal_pending(current)) {
309                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310                         break;
311                 }
312
313                 if (schedule_timeout(HZ/10) == 0)
314                         mxb = UINT_MAX;
315         }
316         finish_wait(&drbd_pp_wait, &wait);
317
318         if (page)
319                 atomic_add(number, &device->pp_in_use);
320         return page;
321 }
322
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330         int i;
331
332         if (page == NULL)
333                 return;
334
335         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336                 i = page_chain_free(page);
337         else {
338                 struct page *tmp;
339                 tmp = page_chain_tail(page, &i);
340                 spin_lock(&drbd_pp_lock);
341                 page_chain_add(&drbd_pp_pool, page, tmp);
342                 drbd_pp_vacant += i;
343                 spin_unlock(&drbd_pp_lock);
344         }
345         i = atomic_sub_return(i, a);
346         if (i < 0)
347                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349         wake_up(&drbd_pp_wait);
350 }
351
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373         struct drbd_device *device = peer_device->device;
374         struct drbd_peer_request *peer_req;
375         struct page *page = NULL;
376         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377
378         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379                 return NULL;
380
381         peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382         if (!peer_req) {
383                 if (!(gfp_mask & __GFP_NOWARN))
384                         drbd_err(device, "%s: allocation failed\n", __func__);
385                 return NULL;
386         }
387
388         if (nr_pages) {
389                 page = drbd_alloc_pages(peer_device, nr_pages,
390                                         gfpflags_allow_blocking(gfp_mask));
391                 if (!page)
392                         goto fail;
393         }
394
395         memset(peer_req, 0, sizeof(*peer_req));
396         INIT_LIST_HEAD(&peer_req->w.list);
397         drbd_clear_interval(&peer_req->i);
398         peer_req->i.size = request_size;
399         peer_req->i.sector = sector;
400         peer_req->submit_jif = jiffies;
401         peer_req->peer_device = peer_device;
402         peer_req->pages = page;
403         /*
404          * The block_id is opaque to the receiver.  It is not endianness
405          * converted, and sent back to the sender unchanged.
406          */
407         peer_req->block_id = id;
408
409         return peer_req;
410
411  fail:
412         mempool_free(peer_req, &drbd_ee_mempool);
413         return NULL;
414 }
415
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417                        int is_net)
418 {
419         might_sleep();
420         if (peer_req->flags & EE_HAS_DIGEST)
421                 kfree(peer_req->digest);
422         drbd_free_pages(device, peer_req->pages, is_net);
423         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427                 drbd_al_complete_io(device, &peer_req->i);
428         }
429         mempool_free(peer_req, &drbd_ee_mempool);
430 }
431
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434         LIST_HEAD(work_list);
435         struct drbd_peer_request *peer_req, *t;
436         int count = 0;
437         int is_net = list == &device->net_ee;
438
439         spin_lock_irq(&device->resource->req_lock);
440         list_splice_init(list, &work_list);
441         spin_unlock_irq(&device->resource->req_lock);
442
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 __drbd_free_peer_req(device, peer_req, is_net);
445                 count++;
446         }
447         return count;
448 }
449
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455         LIST_HEAD(work_list);
456         LIST_HEAD(reclaimed);
457         struct drbd_peer_request *peer_req, *t;
458         int err = 0;
459
460         spin_lock_irq(&device->resource->req_lock);
461         reclaim_finished_net_peer_reqs(device, &reclaimed);
462         list_splice_init(&device->done_ee, &work_list);
463         spin_unlock_irq(&device->resource->req_lock);
464
465         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466                 drbd_free_net_peer_req(device, peer_req);
467
468         /* possible callbacks here:
469          * e_end_block, and e_end_resync_block, e_send_superseded.
470          * all ignore the last argument.
471          */
472         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473                 int err2;
474
475                 /* list_del not necessary, next/prev members not touched */
476                 err2 = peer_req->w.cb(&peer_req->w, !!err);
477                 if (!err)
478                         err = err2;
479                 drbd_free_peer_req(device, peer_req);
480         }
481         wake_up(&device->ee_wait);
482
483         return err;
484 }
485
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487                                      struct list_head *head)
488 {
489         DEFINE_WAIT(wait);
490
491         /* avoids spin_lock/unlock
492          * and calling prepare_to_wait in the fast path */
493         while (!list_empty(head)) {
494                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495                 spin_unlock_irq(&device->resource->req_lock);
496                 io_schedule();
497                 finish_wait(&device->ee_wait, &wait);
498                 spin_lock_irq(&device->resource->req_lock);
499         }
500 }
501
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503                                     struct list_head *head)
504 {
505         spin_lock_irq(&device->resource->req_lock);
506         _drbd_wait_ee_list_empty(device, head);
507         spin_unlock_irq(&device->resource->req_lock);
508 }
509
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512         struct kvec iov = {
513                 .iov_base = buf,
514                 .iov_len = size,
515         };
516         struct msghdr msg = {
517                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518         };
519         iov_iter_kvec(&msg.msg_iter, READ | ITER_KVEC, &iov, 1, size);
520         return sock_recvmsg(sock, &msg, msg.msg_flags);
521 }
522
523 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
524 {
525         int rv;
526
527         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
528
529         if (rv < 0) {
530                 if (rv == -ECONNRESET)
531                         drbd_info(connection, "sock was reset by peer\n");
532                 else if (rv != -ERESTARTSYS)
533                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
534         } else if (rv == 0) {
535                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
536                         long t;
537                         rcu_read_lock();
538                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
539                         rcu_read_unlock();
540
541                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
542
543                         if (t)
544                                 goto out;
545                 }
546                 drbd_info(connection, "sock was shut down by peer\n");
547         }
548
549         if (rv != size)
550                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
551
552 out:
553         return rv;
554 }
555
556 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
557 {
558         int err;
559
560         err = drbd_recv(connection, buf, size);
561         if (err != size) {
562                 if (err >= 0)
563                         err = -EIO;
564         } else
565                 err = 0;
566         return err;
567 }
568
569 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
570 {
571         int err;
572
573         err = drbd_recv_all(connection, buf, size);
574         if (err && !signal_pending(current))
575                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
576         return err;
577 }
578
579 /* quoting tcp(7):
580  *   On individual connections, the socket buffer size must be set prior to the
581  *   listen(2) or connect(2) calls in order to have it take effect.
582  * This is our wrapper to do so.
583  */
584 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
585                 unsigned int rcv)
586 {
587         /* open coded SO_SNDBUF, SO_RCVBUF */
588         if (snd) {
589                 sock->sk->sk_sndbuf = snd;
590                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
591         }
592         if (rcv) {
593                 sock->sk->sk_rcvbuf = rcv;
594                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
595         }
596 }
597
598 static struct socket *drbd_try_connect(struct drbd_connection *connection)
599 {
600         const char *what;
601         struct socket *sock;
602         struct sockaddr_in6 src_in6;
603         struct sockaddr_in6 peer_in6;
604         struct net_conf *nc;
605         int err, peer_addr_len, my_addr_len;
606         int sndbuf_size, rcvbuf_size, connect_int;
607         int disconnect_on_error = 1;
608
609         rcu_read_lock();
610         nc = rcu_dereference(connection->net_conf);
611         if (!nc) {
612                 rcu_read_unlock();
613                 return NULL;
614         }
615         sndbuf_size = nc->sndbuf_size;
616         rcvbuf_size = nc->rcvbuf_size;
617         connect_int = nc->connect_int;
618         rcu_read_unlock();
619
620         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
621         memcpy(&src_in6, &connection->my_addr, my_addr_len);
622
623         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
624                 src_in6.sin6_port = 0;
625         else
626                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
627
628         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
629         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
630
631         what = "sock_create_kern";
632         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
633                                SOCK_STREAM, IPPROTO_TCP, &sock);
634         if (err < 0) {
635                 sock = NULL;
636                 goto out;
637         }
638
639         sock->sk->sk_rcvtimeo =
640         sock->sk->sk_sndtimeo = connect_int * HZ;
641         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
642
643        /* explicitly bind to the configured IP as source IP
644         *  for the outgoing connections.
645         *  This is needed for multihomed hosts and to be
646         *  able to use lo: interfaces for drbd.
647         * Make sure to use 0 as port number, so linux selects
648         *  a free one dynamically.
649         */
650         what = "bind before connect";
651         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
652         if (err < 0)
653                 goto out;
654
655         /* connect may fail, peer not yet available.
656          * stay C_WF_CONNECTION, don't go Disconnecting! */
657         disconnect_on_error = 0;
658         what = "connect";
659         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
660
661 out:
662         if (err < 0) {
663                 if (sock) {
664                         sock_release(sock);
665                         sock = NULL;
666                 }
667                 switch (-err) {
668                         /* timeout, busy, signal pending */
669                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
670                 case EINTR: case ERESTARTSYS:
671                         /* peer not (yet) available, network problem */
672                 case ECONNREFUSED: case ENETUNREACH:
673                 case EHOSTDOWN:    case EHOSTUNREACH:
674                         disconnect_on_error = 0;
675                         break;
676                 default:
677                         drbd_err(connection, "%s failed, err = %d\n", what, err);
678                 }
679                 if (disconnect_on_error)
680                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
681         }
682
683         return sock;
684 }
685
686 struct accept_wait_data {
687         struct drbd_connection *connection;
688         struct socket *s_listen;
689         struct completion door_bell;
690         void (*original_sk_state_change)(struct sock *sk);
691
692 };
693
694 static void drbd_incoming_connection(struct sock *sk)
695 {
696         struct accept_wait_data *ad = sk->sk_user_data;
697         void (*state_change)(struct sock *sk);
698
699         state_change = ad->original_sk_state_change;
700         if (sk->sk_state == TCP_ESTABLISHED)
701                 complete(&ad->door_bell);
702         state_change(sk);
703 }
704
705 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
706 {
707         int err, sndbuf_size, rcvbuf_size, my_addr_len;
708         struct sockaddr_in6 my_addr;
709         struct socket *s_listen;
710         struct net_conf *nc;
711         const char *what;
712
713         rcu_read_lock();
714         nc = rcu_dereference(connection->net_conf);
715         if (!nc) {
716                 rcu_read_unlock();
717                 return -EIO;
718         }
719         sndbuf_size = nc->sndbuf_size;
720         rcvbuf_size = nc->rcvbuf_size;
721         rcu_read_unlock();
722
723         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
724         memcpy(&my_addr, &connection->my_addr, my_addr_len);
725
726         what = "sock_create_kern";
727         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
728                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
735         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
736
737         what = "bind before listen";
738         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
739         if (err < 0)
740                 goto out;
741
742         ad->s_listen = s_listen;
743         write_lock_bh(&s_listen->sk->sk_callback_lock);
744         ad->original_sk_state_change = s_listen->sk->sk_state_change;
745         s_listen->sk->sk_state_change = drbd_incoming_connection;
746         s_listen->sk->sk_user_data = ad;
747         write_unlock_bh(&s_listen->sk->sk_callback_lock);
748
749         what = "listen";
750         err = s_listen->ops->listen(s_listen, 5);
751         if (err < 0)
752                 goto out;
753
754         return 0;
755 out:
756         if (s_listen)
757                 sock_release(s_listen);
758         if (err < 0) {
759                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
760                         drbd_err(connection, "%s failed, err = %d\n", what, err);
761                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
762                 }
763         }
764
765         return -EIO;
766 }
767
768 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
769 {
770         write_lock_bh(&sk->sk_callback_lock);
771         sk->sk_state_change = ad->original_sk_state_change;
772         sk->sk_user_data = NULL;
773         write_unlock_bh(&sk->sk_callback_lock);
774 }
775
776 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
777 {
778         int timeo, connect_int, err = 0;
779         struct socket *s_estab = NULL;
780         struct net_conf *nc;
781
782         rcu_read_lock();
783         nc = rcu_dereference(connection->net_conf);
784         if (!nc) {
785                 rcu_read_unlock();
786                 return NULL;
787         }
788         connect_int = nc->connect_int;
789         rcu_read_unlock();
790
791         timeo = connect_int * HZ;
792         /* 28.5% random jitter */
793         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
794
795         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
796         if (err <= 0)
797                 return NULL;
798
799         err = kernel_accept(ad->s_listen, &s_estab, 0);
800         if (err < 0) {
801                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
802                         drbd_err(connection, "accept failed, err = %d\n", err);
803                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
804                 }
805         }
806
807         if (s_estab)
808                 unregister_state_change(s_estab->sk, ad);
809
810         return s_estab;
811 }
812
813 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
814
815 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
816                              enum drbd_packet cmd)
817 {
818         if (!conn_prepare_command(connection, sock))
819                 return -EIO;
820         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
821 }
822
823 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
824 {
825         unsigned int header_size = drbd_header_size(connection);
826         struct packet_info pi;
827         struct net_conf *nc;
828         int err;
829
830         rcu_read_lock();
831         nc = rcu_dereference(connection->net_conf);
832         if (!nc) {
833                 rcu_read_unlock();
834                 return -EIO;
835         }
836         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
837         rcu_read_unlock();
838
839         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
840         if (err != header_size) {
841                 if (err >= 0)
842                         err = -EIO;
843                 return err;
844         }
845         err = decode_header(connection, connection->data.rbuf, &pi);
846         if (err)
847                 return err;
848         return pi.cmd;
849 }
850
851 /**
852  * drbd_socket_okay() - Free the socket if its connection is not okay
853  * @sock:       pointer to the pointer to the socket.
854  */
855 static bool drbd_socket_okay(struct socket **sock)
856 {
857         int rr;
858         char tb[4];
859
860         if (!*sock)
861                 return false;
862
863         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
864
865         if (rr > 0 || rr == -EAGAIN) {
866                 return true;
867         } else {
868                 sock_release(*sock);
869                 *sock = NULL;
870                 return false;
871         }
872 }
873
874 static bool connection_established(struct drbd_connection *connection,
875                                    struct socket **sock1,
876                                    struct socket **sock2)
877 {
878         struct net_conf *nc;
879         int timeout;
880         bool ok;
881
882         if (!*sock1 || !*sock2)
883                 return false;
884
885         rcu_read_lock();
886         nc = rcu_dereference(connection->net_conf);
887         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
888         rcu_read_unlock();
889         schedule_timeout_interruptible(timeout);
890
891         ok = drbd_socket_okay(sock1);
892         ok = drbd_socket_okay(sock2) && ok;
893
894         return ok;
895 }
896
897 /* Gets called if a connection is established, or if a new minor gets created
898    in a connection */
899 int drbd_connected(struct drbd_peer_device *peer_device)
900 {
901         struct drbd_device *device = peer_device->device;
902         int err;
903
904         atomic_set(&device->packet_seq, 0);
905         device->peer_seq = 0;
906
907         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
908                 &peer_device->connection->cstate_mutex :
909                 &device->own_state_mutex;
910
911         err = drbd_send_sync_param(peer_device);
912         if (!err)
913                 err = drbd_send_sizes(peer_device, 0, 0);
914         if (!err)
915                 err = drbd_send_uuids(peer_device);
916         if (!err)
917                 err = drbd_send_current_state(peer_device);
918         clear_bit(USE_DEGR_WFC_T, &device->flags);
919         clear_bit(RESIZE_PENDING, &device->flags);
920         atomic_set(&device->ap_in_flight, 0);
921         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
922         return err;
923 }
924
925 /*
926  * return values:
927  *   1 yes, we have a valid connection
928  *   0 oops, did not work out, please try again
929  *  -1 peer talks different language,
930  *     no point in trying again, please go standalone.
931  *  -2 We do not have a network config...
932  */
933 static int conn_connect(struct drbd_connection *connection)
934 {
935         struct drbd_socket sock, msock;
936         struct drbd_peer_device *peer_device;
937         struct net_conf *nc;
938         int vnr, timeout, h;
939         bool discard_my_data, ok;
940         enum drbd_state_rv rv;
941         struct accept_wait_data ad = {
942                 .connection = connection,
943                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
944         };
945
946         clear_bit(DISCONNECT_SENT, &connection->flags);
947         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
948                 return -2;
949
950         mutex_init(&sock.mutex);
951         sock.sbuf = connection->data.sbuf;
952         sock.rbuf = connection->data.rbuf;
953         sock.socket = NULL;
954         mutex_init(&msock.mutex);
955         msock.sbuf = connection->meta.sbuf;
956         msock.rbuf = connection->meta.rbuf;
957         msock.socket = NULL;
958
959         /* Assume that the peer only understands protocol 80 until we know better.  */
960         connection->agreed_pro_version = 80;
961
962         if (prepare_listen_socket(connection, &ad))
963                 return 0;
964
965         do {
966                 struct socket *s;
967
968                 s = drbd_try_connect(connection);
969                 if (s) {
970                         if (!sock.socket) {
971                                 sock.socket = s;
972                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
973                         } else if (!msock.socket) {
974                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
975                                 msock.socket = s;
976                                 send_first_packet(connection, &msock, P_INITIAL_META);
977                         } else {
978                                 drbd_err(connection, "Logic error in conn_connect()\n");
979                                 goto out_release_sockets;
980                         }
981                 }
982
983                 if (connection_established(connection, &sock.socket, &msock.socket))
984                         break;
985
986 retry:
987                 s = drbd_wait_for_connect(connection, &ad);
988                 if (s) {
989                         int fp = receive_first_packet(connection, s);
990                         drbd_socket_okay(&sock.socket);
991                         drbd_socket_okay(&msock.socket);
992                         switch (fp) {
993                         case P_INITIAL_DATA:
994                                 if (sock.socket) {
995                                         drbd_warn(connection, "initial packet S crossed\n");
996                                         sock_release(sock.socket);
997                                         sock.socket = s;
998                                         goto randomize;
999                                 }
1000                                 sock.socket = s;
1001                                 break;
1002                         case P_INITIAL_META:
1003                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1004                                 if (msock.socket) {
1005                                         drbd_warn(connection, "initial packet M crossed\n");
1006                                         sock_release(msock.socket);
1007                                         msock.socket = s;
1008                                         goto randomize;
1009                                 }
1010                                 msock.socket = s;
1011                                 break;
1012                         default:
1013                                 drbd_warn(connection, "Error receiving initial packet\n");
1014                                 sock_release(s);
1015 randomize:
1016                                 if (prandom_u32() & 1)
1017                                         goto retry;
1018                         }
1019                 }
1020
1021                 if (connection->cstate <= C_DISCONNECTING)
1022                         goto out_release_sockets;
1023                 if (signal_pending(current)) {
1024                         flush_signals(current);
1025                         smp_rmb();
1026                         if (get_t_state(&connection->receiver) == EXITING)
1027                                 goto out_release_sockets;
1028                 }
1029
1030                 ok = connection_established(connection, &sock.socket, &msock.socket);
1031         } while (!ok);
1032
1033         if (ad.s_listen)
1034                 sock_release(ad.s_listen);
1035
1036         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1038
1039         sock.socket->sk->sk_allocation = GFP_NOIO;
1040         msock.socket->sk->sk_allocation = GFP_NOIO;
1041
1042         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1043         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1044
1045         /* NOT YET ...
1046          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1047          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1048          * first set it to the P_CONNECTION_FEATURES timeout,
1049          * which we set to 4x the configured ping_timeout. */
1050         rcu_read_lock();
1051         nc = rcu_dereference(connection->net_conf);
1052
1053         sock.socket->sk->sk_sndtimeo =
1054         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1055
1056         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1057         timeout = nc->timeout * HZ / 10;
1058         discard_my_data = nc->discard_my_data;
1059         rcu_read_unlock();
1060
1061         msock.socket->sk->sk_sndtimeo = timeout;
1062
1063         /* we don't want delays.
1064          * we use TCP_CORK where appropriate, though */
1065         drbd_tcp_nodelay(sock.socket);
1066         drbd_tcp_nodelay(msock.socket);
1067
1068         connection->data.socket = sock.socket;
1069         connection->meta.socket = msock.socket;
1070         connection->last_received = jiffies;
1071
1072         h = drbd_do_features(connection);
1073         if (h <= 0)
1074                 return h;
1075
1076         if (connection->cram_hmac_tfm) {
1077                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1078                 switch (drbd_do_auth(connection)) {
1079                 case -1:
1080                         drbd_err(connection, "Authentication of peer failed\n");
1081                         return -1;
1082                 case 0:
1083                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1084                         return 0;
1085                 }
1086         }
1087
1088         connection->data.socket->sk->sk_sndtimeo = timeout;
1089         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1090
1091         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1092                 return -1;
1093
1094         /* Prevent a race between resync-handshake and
1095          * being promoted to Primary.
1096          *
1097          * Grab and release the state mutex, so we know that any current
1098          * drbd_set_role() is finished, and any incoming drbd_set_role
1099          * will see the STATE_SENT flag, and wait for it to be cleared.
1100          */
1101         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1102                 mutex_lock(peer_device->device->state_mutex);
1103
1104         /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1105         spin_lock_irq(&connection->resource->req_lock);
1106         set_bit(STATE_SENT, &connection->flags);
1107         spin_unlock_irq(&connection->resource->req_lock);
1108
1109         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1110                 mutex_unlock(peer_device->device->state_mutex);
1111
1112         rcu_read_lock();
1113         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1114                 struct drbd_device *device = peer_device->device;
1115                 kref_get(&device->kref);
1116                 rcu_read_unlock();
1117
1118                 if (discard_my_data)
1119                         set_bit(DISCARD_MY_DATA, &device->flags);
1120                 else
1121                         clear_bit(DISCARD_MY_DATA, &device->flags);
1122
1123                 drbd_connected(peer_device);
1124                 kref_put(&device->kref, drbd_destroy_device);
1125                 rcu_read_lock();
1126         }
1127         rcu_read_unlock();
1128
1129         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1130         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1131                 clear_bit(STATE_SENT, &connection->flags);
1132                 return 0;
1133         }
1134
1135         drbd_thread_start(&connection->ack_receiver);
1136         /* opencoded create_singlethread_workqueue(),
1137          * to be able to use format string arguments */
1138         connection->ack_sender =
1139                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1140         if (!connection->ack_sender) {
1141                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1142                 return 0;
1143         }
1144
1145         mutex_lock(&connection->resource->conf_update);
1146         /* The discard_my_data flag is a single-shot modifier to the next
1147          * connection attempt, the handshake of which is now well underway.
1148          * No need for rcu style copying of the whole struct
1149          * just to clear a single value. */
1150         connection->net_conf->discard_my_data = 0;
1151         mutex_unlock(&connection->resource->conf_update);
1152
1153         return h;
1154
1155 out_release_sockets:
1156         if (ad.s_listen)
1157                 sock_release(ad.s_listen);
1158         if (sock.socket)
1159                 sock_release(sock.socket);
1160         if (msock.socket)
1161                 sock_release(msock.socket);
1162         return -1;
1163 }
1164
1165 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1166 {
1167         unsigned int header_size = drbd_header_size(connection);
1168
1169         if (header_size == sizeof(struct p_header100) &&
1170             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1171                 struct p_header100 *h = header;
1172                 if (h->pad != 0) {
1173                         drbd_err(connection, "Header padding is not zero\n");
1174                         return -EINVAL;
1175                 }
1176                 pi->vnr = be16_to_cpu(h->volume);
1177                 pi->cmd = be16_to_cpu(h->command);
1178                 pi->size = be32_to_cpu(h->length);
1179         } else if (header_size == sizeof(struct p_header95) &&
1180                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1181                 struct p_header95 *h = header;
1182                 pi->cmd = be16_to_cpu(h->command);
1183                 pi->size = be32_to_cpu(h->length);
1184                 pi->vnr = 0;
1185         } else if (header_size == sizeof(struct p_header80) &&
1186                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1187                 struct p_header80 *h = header;
1188                 pi->cmd = be16_to_cpu(h->command);
1189                 pi->size = be16_to_cpu(h->length);
1190                 pi->vnr = 0;
1191         } else {
1192                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1193                          be32_to_cpu(*(__be32 *)header),
1194                          connection->agreed_pro_version);
1195                 return -EINVAL;
1196         }
1197         pi->data = header + header_size;
1198         return 0;
1199 }
1200
1201 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1202 {
1203         if (current->plug == &connection->receiver_plug) {
1204                 blk_finish_plug(&connection->receiver_plug);
1205                 blk_start_plug(&connection->receiver_plug);
1206         } /* else: maybe just schedule() ?? */
1207 }
1208
1209 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1210 {
1211         void *buffer = connection->data.rbuf;
1212         int err;
1213
1214         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1215         if (err)
1216                 return err;
1217
1218         err = decode_header(connection, buffer, pi);
1219         connection->last_received = jiffies;
1220
1221         return err;
1222 }
1223
1224 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1225 {
1226         void *buffer = connection->data.rbuf;
1227         unsigned int size = drbd_header_size(connection);
1228         int err;
1229
1230         err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1231         if (err != size) {
1232                 /* If we have nothing in the receive buffer now, to reduce
1233                  * application latency, try to drain the backend queues as
1234                  * quickly as possible, and let remote TCP know what we have
1235                  * received so far. */
1236                 if (err == -EAGAIN) {
1237                         drbd_tcp_quickack(connection->data.socket);
1238                         drbd_unplug_all_devices(connection);
1239                 }
1240                 if (err > 0) {
1241                         buffer += err;
1242                         size -= err;
1243                 }
1244                 err = drbd_recv_all_warn(connection, buffer, size);
1245                 if (err)
1246                         return err;
1247         }
1248
1249         err = decode_header(connection, connection->data.rbuf, pi);
1250         connection->last_received = jiffies;
1251
1252         return err;
1253 }
1254 /* This is blkdev_issue_flush, but asynchronous.
1255  * We want to submit to all component volumes in parallel,
1256  * then wait for all completions.
1257  */
1258 struct issue_flush_context {
1259         atomic_t pending;
1260         int error;
1261         struct completion done;
1262 };
1263 struct one_flush_context {
1264         struct drbd_device *device;
1265         struct issue_flush_context *ctx;
1266 };
1267
1268 static void one_flush_endio(struct bio *bio)
1269 {
1270         struct one_flush_context *octx = bio->bi_private;
1271         struct drbd_device *device = octx->device;
1272         struct issue_flush_context *ctx = octx->ctx;
1273
1274         if (bio->bi_status) {
1275                 ctx->error = blk_status_to_errno(bio->bi_status);
1276                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1277         }
1278         kfree(octx);
1279         bio_put(bio);
1280
1281         clear_bit(FLUSH_PENDING, &device->flags);
1282         put_ldev(device);
1283         kref_put(&device->kref, drbd_destroy_device);
1284
1285         if (atomic_dec_and_test(&ctx->pending))
1286                 complete(&ctx->done);
1287 }
1288
1289 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1290 {
1291         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1292         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1293         if (!bio || !octx) {
1294                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1295                 /* FIXME: what else can I do now?  disconnecting or detaching
1296                  * really does not help to improve the state of the world, either.
1297                  */
1298                 kfree(octx);
1299                 if (bio)
1300                         bio_put(bio);
1301
1302                 ctx->error = -ENOMEM;
1303                 put_ldev(device);
1304                 kref_put(&device->kref, drbd_destroy_device);
1305                 return;
1306         }
1307
1308         octx->device = device;
1309         octx->ctx = ctx;
1310         bio_set_dev(bio, device->ldev->backing_bdev);
1311         bio->bi_private = octx;
1312         bio->bi_end_io = one_flush_endio;
1313         bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1314
1315         device->flush_jif = jiffies;
1316         set_bit(FLUSH_PENDING, &device->flags);
1317         atomic_inc(&ctx->pending);
1318         submit_bio(bio);
1319 }
1320
1321 static void drbd_flush(struct drbd_connection *connection)
1322 {
1323         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1324                 struct drbd_peer_device *peer_device;
1325                 struct issue_flush_context ctx;
1326                 int vnr;
1327
1328                 atomic_set(&ctx.pending, 1);
1329                 ctx.error = 0;
1330                 init_completion(&ctx.done);
1331
1332                 rcu_read_lock();
1333                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1334                         struct drbd_device *device = peer_device->device;
1335
1336                         if (!get_ldev(device))
1337                                 continue;
1338                         kref_get(&device->kref);
1339                         rcu_read_unlock();
1340
1341                         submit_one_flush(device, &ctx);
1342
1343                         rcu_read_lock();
1344                 }
1345                 rcu_read_unlock();
1346
1347                 /* Do we want to add a timeout,
1348                  * if disk-timeout is set? */
1349                 if (!atomic_dec_and_test(&ctx.pending))
1350                         wait_for_completion(&ctx.done);
1351
1352                 if (ctx.error) {
1353                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1354                          * don't try again for ANY return value != 0
1355                          * if (rv == -EOPNOTSUPP) */
1356                         /* Any error is already reported by bio_endio callback. */
1357                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1358                 }
1359         }
1360 }
1361
1362 /**
1363  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1364  * @device:     DRBD device.
1365  * @epoch:      Epoch object.
1366  * @ev:         Epoch event.
1367  */
1368 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1369                                                struct drbd_epoch *epoch,
1370                                                enum epoch_event ev)
1371 {
1372         int epoch_size;
1373         struct drbd_epoch *next_epoch;
1374         enum finish_epoch rv = FE_STILL_LIVE;
1375
1376         spin_lock(&connection->epoch_lock);
1377         do {
1378                 next_epoch = NULL;
1379
1380                 epoch_size = atomic_read(&epoch->epoch_size);
1381
1382                 switch (ev & ~EV_CLEANUP) {
1383                 case EV_PUT:
1384                         atomic_dec(&epoch->active);
1385                         break;
1386                 case EV_GOT_BARRIER_NR:
1387                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1388                         break;
1389                 case EV_BECAME_LAST:
1390                         /* nothing to do*/
1391                         break;
1392                 }
1393
1394                 if (epoch_size != 0 &&
1395                     atomic_read(&epoch->active) == 0 &&
1396                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1397                         if (!(ev & EV_CLEANUP)) {
1398                                 spin_unlock(&connection->epoch_lock);
1399                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1400                                 spin_lock(&connection->epoch_lock);
1401                         }
1402 #if 0
1403                         /* FIXME: dec unacked on connection, once we have
1404                          * something to count pending connection packets in. */
1405                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1406                                 dec_unacked(epoch->connection);
1407 #endif
1408
1409                         if (connection->current_epoch != epoch) {
1410                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1411                                 list_del(&epoch->list);
1412                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1413                                 connection->epochs--;
1414                                 kfree(epoch);
1415
1416                                 if (rv == FE_STILL_LIVE)
1417                                         rv = FE_DESTROYED;
1418                         } else {
1419                                 epoch->flags = 0;
1420                                 atomic_set(&epoch->epoch_size, 0);
1421                                 /* atomic_set(&epoch->active, 0); is already zero */
1422                                 if (rv == FE_STILL_LIVE)
1423                                         rv = FE_RECYCLED;
1424                         }
1425                 }
1426
1427                 if (!next_epoch)
1428                         break;
1429
1430                 epoch = next_epoch;
1431         } while (1);
1432
1433         spin_unlock(&connection->epoch_lock);
1434
1435         return rv;
1436 }
1437
1438 static enum write_ordering_e
1439 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1440 {
1441         struct disk_conf *dc;
1442
1443         dc = rcu_dereference(bdev->disk_conf);
1444
1445         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1446                 wo = WO_DRAIN_IO;
1447         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1448                 wo = WO_NONE;
1449
1450         return wo;
1451 }
1452
1453 /**
1454  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1455  * @connection: DRBD connection.
1456  * @wo:         Write ordering method to try.
1457  */
1458 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1459                               enum write_ordering_e wo)
1460 {
1461         struct drbd_device *device;
1462         enum write_ordering_e pwo;
1463         int vnr;
1464         static char *write_ordering_str[] = {
1465                 [WO_NONE] = "none",
1466                 [WO_DRAIN_IO] = "drain",
1467                 [WO_BDEV_FLUSH] = "flush",
1468         };
1469
1470         pwo = resource->write_ordering;
1471         if (wo != WO_BDEV_FLUSH)
1472                 wo = min(pwo, wo);
1473         rcu_read_lock();
1474         idr_for_each_entry(&resource->devices, device, vnr) {
1475                 if (get_ldev(device)) {
1476                         wo = max_allowed_wo(device->ldev, wo);
1477                         if (device->ldev == bdev)
1478                                 bdev = NULL;
1479                         put_ldev(device);
1480                 }
1481         }
1482
1483         if (bdev)
1484                 wo = max_allowed_wo(bdev, wo);
1485
1486         rcu_read_unlock();
1487
1488         resource->write_ordering = wo;
1489         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1490                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1491 }
1492
1493 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1494 {
1495         struct block_device *bdev = device->ldev->backing_bdev;
1496
1497         if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1498                         GFP_NOIO, 0))
1499                 peer_req->flags |= EE_WAS_ERROR;
1500
1501         drbd_endio_write_sec_final(peer_req);
1502 }
1503
1504 static void drbd_issue_peer_wsame(struct drbd_device *device,
1505                                   struct drbd_peer_request *peer_req)
1506 {
1507         struct block_device *bdev = device->ldev->backing_bdev;
1508         sector_t s = peer_req->i.sector;
1509         sector_t nr = peer_req->i.size >> 9;
1510         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1511                 peer_req->flags |= EE_WAS_ERROR;
1512         drbd_endio_write_sec_final(peer_req);
1513 }
1514
1515
1516 /**
1517  * drbd_submit_peer_request()
1518  * @device:     DRBD device.
1519  * @peer_req:   peer request
1520  * @rw:         flag field, see bio->bi_opf
1521  *
1522  * May spread the pages to multiple bios,
1523  * depending on bio_add_page restrictions.
1524  *
1525  * Returns 0 if all bios have been submitted,
1526  * -ENOMEM if we could not allocate enough bios,
1527  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1528  *  single page to an empty bio (which should never happen and likely indicates
1529  *  that the lower level IO stack is in some way broken). This has been observed
1530  *  on certain Xen deployments.
1531  */
1532 /* TODO allocate from our own bio_set. */
1533 int drbd_submit_peer_request(struct drbd_device *device,
1534                              struct drbd_peer_request *peer_req,
1535                              const unsigned op, const unsigned op_flags,
1536                              const int fault_type)
1537 {
1538         struct bio *bios = NULL;
1539         struct bio *bio;
1540         struct page *page = peer_req->pages;
1541         sector_t sector = peer_req->i.sector;
1542         unsigned data_size = peer_req->i.size;
1543         unsigned n_bios = 0;
1544         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1545         int err = -ENOMEM;
1546
1547         /* TRIM/DISCARD: for now, always use the helper function
1548          * blkdev_issue_zeroout(..., discard=true).
1549          * It's synchronous, but it does the right thing wrt. bio splitting.
1550          * Correctness first, performance later.  Next step is to code an
1551          * asynchronous variant of the same.
1552          */
1553         if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1554                 /* wait for all pending IO completions, before we start
1555                  * zeroing things out. */
1556                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1557                 /* add it to the active list now,
1558                  * so we can find it to present it in debugfs */
1559                 peer_req->submit_jif = jiffies;
1560                 peer_req->flags |= EE_SUBMITTED;
1561
1562                 /* If this was a resync request from receive_rs_deallocated(),
1563                  * it is already on the sync_ee list */
1564                 if (list_empty(&peer_req->w.list)) {
1565                         spin_lock_irq(&device->resource->req_lock);
1566                         list_add_tail(&peer_req->w.list, &device->active_ee);
1567                         spin_unlock_irq(&device->resource->req_lock);
1568                 }
1569
1570                 if (peer_req->flags & EE_IS_TRIM)
1571                         drbd_issue_peer_discard(device, peer_req);
1572                 else /* EE_WRITE_SAME */
1573                         drbd_issue_peer_wsame(device, peer_req);
1574                 return 0;
1575         }
1576
1577         /* In most cases, we will only need one bio.  But in case the lower
1578          * level restrictions happen to be different at this offset on this
1579          * side than those of the sending peer, we may need to submit the
1580          * request in more than one bio.
1581          *
1582          * Plain bio_alloc is good enough here, this is no DRBD internally
1583          * generated bio, but a bio allocated on behalf of the peer.
1584          */
1585 next_bio:
1586         bio = bio_alloc(GFP_NOIO, nr_pages);
1587         if (!bio) {
1588                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1589                 goto fail;
1590         }
1591         /* > peer_req->i.sector, unless this is the first bio */
1592         bio->bi_iter.bi_sector = sector;
1593         bio_set_dev(bio, device->ldev->backing_bdev);
1594         bio_set_op_attrs(bio, op, op_flags);
1595         bio->bi_private = peer_req;
1596         bio->bi_end_io = drbd_peer_request_endio;
1597
1598         bio->bi_next = bios;
1599         bios = bio;
1600         ++n_bios;
1601
1602         page_chain_for_each(page) {
1603                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1604                 if (!bio_add_page(bio, page, len, 0))
1605                         goto next_bio;
1606                 data_size -= len;
1607                 sector += len >> 9;
1608                 --nr_pages;
1609         }
1610         D_ASSERT(device, data_size == 0);
1611         D_ASSERT(device, page == NULL);
1612
1613         atomic_set(&peer_req->pending_bios, n_bios);
1614         /* for debugfs: update timestamp, mark as submitted */
1615         peer_req->submit_jif = jiffies;
1616         peer_req->flags |= EE_SUBMITTED;
1617         do {
1618                 bio = bios;
1619                 bios = bios->bi_next;
1620                 bio->bi_next = NULL;
1621
1622                 drbd_generic_make_request(device, fault_type, bio);
1623         } while (bios);
1624         return 0;
1625
1626 fail:
1627         while (bios) {
1628                 bio = bios;
1629                 bios = bios->bi_next;
1630                 bio_put(bio);
1631         }
1632         return err;
1633 }
1634
1635 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1636                                              struct drbd_peer_request *peer_req)
1637 {
1638         struct drbd_interval *i = &peer_req->i;
1639
1640         drbd_remove_interval(&device->write_requests, i);
1641         drbd_clear_interval(i);
1642
1643         /* Wake up any processes waiting for this peer request to complete.  */
1644         if (i->waiting)
1645                 wake_up(&device->misc_wait);
1646 }
1647
1648 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1649 {
1650         struct drbd_peer_device *peer_device;
1651         int vnr;
1652
1653         rcu_read_lock();
1654         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1655                 struct drbd_device *device = peer_device->device;
1656
1657                 kref_get(&device->kref);
1658                 rcu_read_unlock();
1659                 drbd_wait_ee_list_empty(device, &device->active_ee);
1660                 kref_put(&device->kref, drbd_destroy_device);
1661                 rcu_read_lock();
1662         }
1663         rcu_read_unlock();
1664 }
1665
1666 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1667 {
1668         int rv;
1669         struct p_barrier *p = pi->data;
1670         struct drbd_epoch *epoch;
1671
1672         /* FIXME these are unacked on connection,
1673          * not a specific (peer)device.
1674          */
1675         connection->current_epoch->barrier_nr = p->barrier;
1676         connection->current_epoch->connection = connection;
1677         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1678
1679         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1680          * the activity log, which means it would not be resynced in case the
1681          * R_PRIMARY crashes now.
1682          * Therefore we must send the barrier_ack after the barrier request was
1683          * completed. */
1684         switch (connection->resource->write_ordering) {
1685         case WO_NONE:
1686                 if (rv == FE_RECYCLED)
1687                         return 0;
1688
1689                 /* receiver context, in the writeout path of the other node.
1690                  * avoid potential distributed deadlock */
1691                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1692                 if (epoch)
1693                         break;
1694                 else
1695                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1696                         /* Fall through */
1697
1698         case WO_BDEV_FLUSH:
1699         case WO_DRAIN_IO:
1700                 conn_wait_active_ee_empty(connection);
1701                 drbd_flush(connection);
1702
1703                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1704                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1705                         if (epoch)
1706                                 break;
1707                 }
1708
1709                 return 0;
1710         default:
1711                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1712                          connection->resource->write_ordering);
1713                 return -EIO;
1714         }
1715
1716         epoch->flags = 0;
1717         atomic_set(&epoch->epoch_size, 0);
1718         atomic_set(&epoch->active, 0);
1719
1720         spin_lock(&connection->epoch_lock);
1721         if (atomic_read(&connection->current_epoch->epoch_size)) {
1722                 list_add(&epoch->list, &connection->current_epoch->list);
1723                 connection->current_epoch = epoch;
1724                 connection->epochs++;
1725         } else {
1726                 /* The current_epoch got recycled while we allocated this one... */
1727                 kfree(epoch);
1728         }
1729         spin_unlock(&connection->epoch_lock);
1730
1731         return 0;
1732 }
1733
1734 /* quick wrapper in case payload size != request_size (write same) */
1735 static void drbd_csum_ee_size(struct crypto_ahash *h,
1736                               struct drbd_peer_request *r, void *d,
1737                               unsigned int payload_size)
1738 {
1739         unsigned int tmp = r->i.size;
1740         r->i.size = payload_size;
1741         drbd_csum_ee(h, r, d);
1742         r->i.size = tmp;
1743 }
1744
1745 /* used from receive_RSDataReply (recv_resync_read)
1746  * and from receive_Data.
1747  * data_size: actual payload ("data in")
1748  *      for normal writes that is bi_size.
1749  *      for discards, that is zero.
1750  *      for write same, it is logical_block_size.
1751  * both trim and write same have the bi_size ("data len to be affected")
1752  * as extra argument in the packet header.
1753  */
1754 static struct drbd_peer_request *
1755 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1756               struct packet_info *pi) __must_hold(local)
1757 {
1758         struct drbd_device *device = peer_device->device;
1759         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1760         struct drbd_peer_request *peer_req;
1761         struct page *page;
1762         int digest_size, err;
1763         unsigned int data_size = pi->size, ds;
1764         void *dig_in = peer_device->connection->int_dig_in;
1765         void *dig_vv = peer_device->connection->int_dig_vv;
1766         unsigned long *data;
1767         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1768         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1769
1770         digest_size = 0;
1771         if (!trim && peer_device->connection->peer_integrity_tfm) {
1772                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1773                 /*
1774                  * FIXME: Receive the incoming digest into the receive buffer
1775                  *        here, together with its struct p_data?
1776                  */
1777                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1778                 if (err)
1779                         return NULL;
1780                 data_size -= digest_size;
1781         }
1782
1783         /* assume request_size == data_size, but special case trim and wsame. */
1784         ds = data_size;
1785         if (trim) {
1786                 if (!expect(data_size == 0))
1787                         return NULL;
1788                 ds = be32_to_cpu(trim->size);
1789         } else if (wsame) {
1790                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1791                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1792                                 data_size, queue_logical_block_size(device->rq_queue));
1793                         return NULL;
1794                 }
1795                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1796                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1797                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1798                         return NULL;
1799                 }
1800                 ds = be32_to_cpu(wsame->size);
1801         }
1802
1803         if (!expect(IS_ALIGNED(ds, 512)))
1804                 return NULL;
1805         if (trim || wsame) {
1806                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1807                         return NULL;
1808         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1809                 return NULL;
1810
1811         /* even though we trust out peer,
1812          * we sometimes have to double check. */
1813         if (sector + (ds>>9) > capacity) {
1814                 drbd_err(device, "request from peer beyond end of local disk: "
1815                         "capacity: %llus < sector: %llus + size: %u\n",
1816                         (unsigned long long)capacity,
1817                         (unsigned long long)sector, ds);
1818                 return NULL;
1819         }
1820
1821         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1822          * "criss-cross" setup, that might cause write-out on some other DRBD,
1823          * which in turn might block on the other node at this very place.  */
1824         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1825         if (!peer_req)
1826                 return NULL;
1827
1828         peer_req->flags |= EE_WRITE;
1829         if (trim) {
1830                 peer_req->flags |= EE_IS_TRIM;
1831                 return peer_req;
1832         }
1833         if (wsame)
1834                 peer_req->flags |= EE_WRITE_SAME;
1835
1836         /* receive payload size bytes into page chain */
1837         ds = data_size;
1838         page = peer_req->pages;
1839         page_chain_for_each(page) {
1840                 unsigned len = min_t(int, ds, PAGE_SIZE);
1841                 data = kmap(page);
1842                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1843                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1844                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1845                         data[0] = data[0] ^ (unsigned long)-1;
1846                 }
1847                 kunmap(page);
1848                 if (err) {
1849                         drbd_free_peer_req(device, peer_req);
1850                         return NULL;
1851                 }
1852                 ds -= len;
1853         }
1854
1855         if (digest_size) {
1856                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1857                 if (memcmp(dig_in, dig_vv, digest_size)) {
1858                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1859                                 (unsigned long long)sector, data_size);
1860                         drbd_free_peer_req(device, peer_req);
1861                         return NULL;
1862                 }
1863         }
1864         device->recv_cnt += data_size >> 9;
1865         return peer_req;
1866 }
1867
1868 /* drbd_drain_block() just takes a data block
1869  * out of the socket input buffer, and discards it.
1870  */
1871 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1872 {
1873         struct page *page;
1874         int err = 0;
1875         void *data;
1876
1877         if (!data_size)
1878                 return 0;
1879
1880         page = drbd_alloc_pages(peer_device, 1, 1);
1881
1882         data = kmap(page);
1883         while (data_size) {
1884                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1885
1886                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1887                 if (err)
1888                         break;
1889                 data_size -= len;
1890         }
1891         kunmap(page);
1892         drbd_free_pages(peer_device->device, page, 0);
1893         return err;
1894 }
1895
1896 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1897                            sector_t sector, int data_size)
1898 {
1899         struct bio_vec bvec;
1900         struct bvec_iter iter;
1901         struct bio *bio;
1902         int digest_size, err, expect;
1903         void *dig_in = peer_device->connection->int_dig_in;
1904         void *dig_vv = peer_device->connection->int_dig_vv;
1905
1906         digest_size = 0;
1907         if (peer_device->connection->peer_integrity_tfm) {
1908                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1909                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1910                 if (err)
1911                         return err;
1912                 data_size -= digest_size;
1913         }
1914
1915         /* optimistically update recv_cnt.  if receiving fails below,
1916          * we disconnect anyways, and counters will be reset. */
1917         peer_device->device->recv_cnt += data_size>>9;
1918
1919         bio = req->master_bio;
1920         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1921
1922         bio_for_each_segment(bvec, bio, iter) {
1923                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1924                 expect = min_t(int, data_size, bvec.bv_len);
1925                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1926                 kunmap(bvec.bv_page);
1927                 if (err)
1928                         return err;
1929                 data_size -= expect;
1930         }
1931
1932         if (digest_size) {
1933                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1934                 if (memcmp(dig_in, dig_vv, digest_size)) {
1935                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1936                         return -EINVAL;
1937                 }
1938         }
1939
1940         D_ASSERT(peer_device->device, data_size == 0);
1941         return 0;
1942 }
1943
1944 /*
1945  * e_end_resync_block() is called in ack_sender context via
1946  * drbd_finish_peer_reqs().
1947  */
1948 static int e_end_resync_block(struct drbd_work *w, int unused)
1949 {
1950         struct drbd_peer_request *peer_req =
1951                 container_of(w, struct drbd_peer_request, w);
1952         struct drbd_peer_device *peer_device = peer_req->peer_device;
1953         struct drbd_device *device = peer_device->device;
1954         sector_t sector = peer_req->i.sector;
1955         int err;
1956
1957         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1958
1959         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1960                 drbd_set_in_sync(device, sector, peer_req->i.size);
1961                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1962         } else {
1963                 /* Record failure to sync */
1964                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1965
1966                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1967         }
1968         dec_unacked(device);
1969
1970         return err;
1971 }
1972
1973 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1974                             struct packet_info *pi) __releases(local)
1975 {
1976         struct drbd_device *device = peer_device->device;
1977         struct drbd_peer_request *peer_req;
1978
1979         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1980         if (!peer_req)
1981                 goto fail;
1982
1983         dec_rs_pending(device);
1984
1985         inc_unacked(device);
1986         /* corresponding dec_unacked() in e_end_resync_block()
1987          * respective _drbd_clear_done_ee */
1988
1989         peer_req->w.cb = e_end_resync_block;
1990         peer_req->submit_jif = jiffies;
1991
1992         spin_lock_irq(&device->resource->req_lock);
1993         list_add_tail(&peer_req->w.list, &device->sync_ee);
1994         spin_unlock_irq(&device->resource->req_lock);
1995
1996         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1997         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1998                                      DRBD_FAULT_RS_WR) == 0)
1999                 return 0;
2000
2001         /* don't care for the reason here */
2002         drbd_err(device, "submit failed, triggering re-connect\n");
2003         spin_lock_irq(&device->resource->req_lock);
2004         list_del(&peer_req->w.list);
2005         spin_unlock_irq(&device->resource->req_lock);
2006
2007         drbd_free_peer_req(device, peer_req);
2008 fail:
2009         put_ldev(device);
2010         return -EIO;
2011 }
2012
2013 static struct drbd_request *
2014 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2015              sector_t sector, bool missing_ok, const char *func)
2016 {
2017         struct drbd_request *req;
2018
2019         /* Request object according to our peer */
2020         req = (struct drbd_request *)(unsigned long)id;
2021         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2022                 return req;
2023         if (!missing_ok) {
2024                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2025                         (unsigned long)id, (unsigned long long)sector);
2026         }
2027         return NULL;
2028 }
2029
2030 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2031 {
2032         struct drbd_peer_device *peer_device;
2033         struct drbd_device *device;
2034         struct drbd_request *req;
2035         sector_t sector;
2036         int err;
2037         struct p_data *p = pi->data;
2038
2039         peer_device = conn_peer_device(connection, pi->vnr);
2040         if (!peer_device)
2041                 return -EIO;
2042         device = peer_device->device;
2043
2044         sector = be64_to_cpu(p->sector);
2045
2046         spin_lock_irq(&device->resource->req_lock);
2047         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2048         spin_unlock_irq(&device->resource->req_lock);
2049         if (unlikely(!req))
2050                 return -EIO;
2051
2052         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2053          * special casing it there for the various failure cases.
2054          * still no race with drbd_fail_pending_reads */
2055         err = recv_dless_read(peer_device, req, sector, pi->size);
2056         if (!err)
2057                 req_mod(req, DATA_RECEIVED);
2058         /* else: nothing. handled from drbd_disconnect...
2059          * I don't think we may complete this just yet
2060          * in case we are "on-disconnect: freeze" */
2061
2062         return err;
2063 }
2064
2065 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2066 {
2067         struct drbd_peer_device *peer_device;
2068         struct drbd_device *device;
2069         sector_t sector;
2070         int err;
2071         struct p_data *p = pi->data;
2072
2073         peer_device = conn_peer_device(connection, pi->vnr);
2074         if (!peer_device)
2075                 return -EIO;
2076         device = peer_device->device;
2077
2078         sector = be64_to_cpu(p->sector);
2079         D_ASSERT(device, p->block_id == ID_SYNCER);
2080
2081         if (get_ldev(device)) {
2082                 /* data is submitted to disk within recv_resync_read.
2083                  * corresponding put_ldev done below on error,
2084                  * or in drbd_peer_request_endio. */
2085                 err = recv_resync_read(peer_device, sector, pi);
2086         } else {
2087                 if (__ratelimit(&drbd_ratelimit_state))
2088                         drbd_err(device, "Can not write resync data to local disk.\n");
2089
2090                 err = drbd_drain_block(peer_device, pi->size);
2091
2092                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2093         }
2094
2095         atomic_add(pi->size >> 9, &device->rs_sect_in);
2096
2097         return err;
2098 }
2099
2100 static void restart_conflicting_writes(struct drbd_device *device,
2101                                        sector_t sector, int size)
2102 {
2103         struct drbd_interval *i;
2104         struct drbd_request *req;
2105
2106         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2107                 if (!i->local)
2108                         continue;
2109                 req = container_of(i, struct drbd_request, i);
2110                 if (req->rq_state & RQ_LOCAL_PENDING ||
2111                     !(req->rq_state & RQ_POSTPONED))
2112                         continue;
2113                 /* as it is RQ_POSTPONED, this will cause it to
2114                  * be queued on the retry workqueue. */
2115                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2116         }
2117 }
2118
2119 /*
2120  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2121  */
2122 static int e_end_block(struct drbd_work *w, int cancel)
2123 {
2124         struct drbd_peer_request *peer_req =
2125                 container_of(w, struct drbd_peer_request, w);
2126         struct drbd_peer_device *peer_device = peer_req->peer_device;
2127         struct drbd_device *device = peer_device->device;
2128         sector_t sector = peer_req->i.sector;
2129         int err = 0, pcmd;
2130
2131         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2132                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2133                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2134                                 device->state.conn <= C_PAUSED_SYNC_T &&
2135                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2136                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2137                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2138                         if (pcmd == P_RS_WRITE_ACK)
2139                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2140                 } else {
2141                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2142                         /* we expect it to be marked out of sync anyways...
2143                          * maybe assert this?  */
2144                 }
2145                 dec_unacked(device);
2146         }
2147
2148         /* we delete from the conflict detection hash _after_ we sent out the
2149          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2150         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2151                 spin_lock_irq(&device->resource->req_lock);
2152                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2153                 drbd_remove_epoch_entry_interval(device, peer_req);
2154                 if (peer_req->flags & EE_RESTART_REQUESTS)
2155                         restart_conflicting_writes(device, sector, peer_req->i.size);
2156                 spin_unlock_irq(&device->resource->req_lock);
2157         } else
2158                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2159
2160         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2161
2162         return err;
2163 }
2164
2165 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2166 {
2167         struct drbd_peer_request *peer_req =
2168                 container_of(w, struct drbd_peer_request, w);
2169         struct drbd_peer_device *peer_device = peer_req->peer_device;
2170         int err;
2171
2172         err = drbd_send_ack(peer_device, ack, peer_req);
2173         dec_unacked(peer_device->device);
2174
2175         return err;
2176 }
2177
2178 static int e_send_superseded(struct drbd_work *w, int unused)
2179 {
2180         return e_send_ack(w, P_SUPERSEDED);
2181 }
2182
2183 static int e_send_retry_write(struct drbd_work *w, int unused)
2184 {
2185         struct drbd_peer_request *peer_req =
2186                 container_of(w, struct drbd_peer_request, w);
2187         struct drbd_connection *connection = peer_req->peer_device->connection;
2188
2189         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2190                              P_RETRY_WRITE : P_SUPERSEDED);
2191 }
2192
2193 static bool seq_greater(u32 a, u32 b)
2194 {
2195         /*
2196          * We assume 32-bit wrap-around here.
2197          * For 24-bit wrap-around, we would have to shift:
2198          *  a <<= 8; b <<= 8;
2199          */
2200         return (s32)a - (s32)b > 0;
2201 }
2202
2203 static u32 seq_max(u32 a, u32 b)
2204 {
2205         return seq_greater(a, b) ? a : b;
2206 }
2207
2208 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2209 {
2210         struct drbd_device *device = peer_device->device;
2211         unsigned int newest_peer_seq;
2212
2213         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2214                 spin_lock(&device->peer_seq_lock);
2215                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2216                 device->peer_seq = newest_peer_seq;
2217                 spin_unlock(&device->peer_seq_lock);
2218                 /* wake up only if we actually changed device->peer_seq */
2219                 if (peer_seq == newest_peer_seq)
2220                         wake_up(&device->seq_wait);
2221         }
2222 }
2223
2224 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2225 {
2226         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2227 }
2228
2229 /* maybe change sync_ee into interval trees as well? */
2230 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2231 {
2232         struct drbd_peer_request *rs_req;
2233         bool rv = false;
2234
2235         spin_lock_irq(&device->resource->req_lock);
2236         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2237                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2238                              rs_req->i.sector, rs_req->i.size)) {
2239                         rv = true;
2240                         break;
2241                 }
2242         }
2243         spin_unlock_irq(&device->resource->req_lock);
2244
2245         return rv;
2246 }
2247
2248 /* Called from receive_Data.
2249  * Synchronize packets on sock with packets on msock.
2250  *
2251  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2252  * packet traveling on msock, they are still processed in the order they have
2253  * been sent.
2254  *
2255  * Note: we don't care for Ack packets overtaking P_DATA packets.
2256  *
2257  * In case packet_seq is larger than device->peer_seq number, there are
2258  * outstanding packets on the msock. We wait for them to arrive.
2259  * In case we are the logically next packet, we update device->peer_seq
2260  * ourselves. Correctly handles 32bit wrap around.
2261  *
2262  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2263  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2264  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2265  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2266  *
2267  * returns 0 if we may process the packet,
2268  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2269 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2270 {
2271         struct drbd_device *device = peer_device->device;
2272         DEFINE_WAIT(wait);
2273         long timeout;
2274         int ret = 0, tp;
2275
2276         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2277                 return 0;
2278
2279         spin_lock(&device->peer_seq_lock);
2280         for (;;) {
2281                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2282                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2283                         break;
2284                 }
2285
2286                 if (signal_pending(current)) {
2287                         ret = -ERESTARTSYS;
2288                         break;
2289                 }
2290
2291                 rcu_read_lock();
2292                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2293                 rcu_read_unlock();
2294
2295                 if (!tp)
2296                         break;
2297
2298                 /* Only need to wait if two_primaries is enabled */
2299                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2300                 spin_unlock(&device->peer_seq_lock);
2301                 rcu_read_lock();
2302                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2303                 rcu_read_unlock();
2304                 timeout = schedule_timeout(timeout);
2305                 spin_lock(&device->peer_seq_lock);
2306                 if (!timeout) {
2307                         ret = -ETIMEDOUT;
2308                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2309                         break;
2310                 }
2311         }
2312         spin_unlock(&device->peer_seq_lock);
2313         finish_wait(&device->seq_wait, &wait);
2314         return ret;
2315 }
2316
2317 /* see also bio_flags_to_wire()
2318  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2319  * flags and back. We may replicate to other kernel versions. */
2320 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2321 {
2322         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2323                 (dpf & DP_FUA ? REQ_FUA : 0) |
2324                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2325 }
2326
2327 static unsigned long wire_flags_to_bio_op(u32 dpf)
2328 {
2329         if (dpf & DP_DISCARD)
2330                 return REQ_OP_WRITE_ZEROES;
2331         else
2332                 return REQ_OP_WRITE;
2333 }
2334
2335 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2336                                     unsigned int size)
2337 {
2338         struct drbd_interval *i;
2339
2340     repeat:
2341         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2342                 struct drbd_request *req;
2343                 struct bio_and_error m;
2344
2345                 if (!i->local)
2346                         continue;
2347                 req = container_of(i, struct drbd_request, i);
2348                 if (!(req->rq_state & RQ_POSTPONED))
2349                         continue;
2350                 req->rq_state &= ~RQ_POSTPONED;
2351                 __req_mod(req, NEG_ACKED, &m);
2352                 spin_unlock_irq(&device->resource->req_lock);
2353                 if (m.bio)
2354                         complete_master_bio(device, &m);
2355                 spin_lock_irq(&device->resource->req_lock);
2356                 goto repeat;
2357         }
2358 }
2359
2360 static int handle_write_conflicts(struct drbd_device *device,
2361                                   struct drbd_peer_request *peer_req)
2362 {
2363         struct drbd_connection *connection = peer_req->peer_device->connection;
2364         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2365         sector_t sector = peer_req->i.sector;
2366         const unsigned int size = peer_req->i.size;
2367         struct drbd_interval *i;
2368         bool equal;
2369         int err;
2370
2371         /*
2372          * Inserting the peer request into the write_requests tree will prevent
2373          * new conflicting local requests from being added.
2374          */
2375         drbd_insert_interval(&device->write_requests, &peer_req->i);
2376
2377     repeat:
2378         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2379                 if (i == &peer_req->i)
2380                         continue;
2381                 if (i->completed)
2382                         continue;
2383
2384                 if (!i->local) {
2385                         /*
2386                          * Our peer has sent a conflicting remote request; this
2387                          * should not happen in a two-node setup.  Wait for the
2388                          * earlier peer request to complete.
2389                          */
2390                         err = drbd_wait_misc(device, i);
2391                         if (err)
2392                                 goto out;
2393                         goto repeat;
2394                 }
2395
2396                 equal = i->sector == sector && i->size == size;
2397                 if (resolve_conflicts) {
2398                         /*
2399                          * If the peer request is fully contained within the
2400                          * overlapping request, it can be considered overwritten
2401                          * and thus superseded; otherwise, it will be retried
2402                          * once all overlapping requests have completed.
2403                          */
2404                         bool superseded = i->sector <= sector && i->sector +
2405                                        (i->size >> 9) >= sector + (size >> 9);
2406
2407                         if (!equal)
2408                                 drbd_alert(device, "Concurrent writes detected: "
2409                                                "local=%llus +%u, remote=%llus +%u, "
2410                                                "assuming %s came first\n",
2411                                           (unsigned long long)i->sector, i->size,
2412                                           (unsigned long long)sector, size,
2413                                           superseded ? "local" : "remote");
2414
2415                         peer_req->w.cb = superseded ? e_send_superseded :
2416                                                    e_send_retry_write;
2417                         list_add_tail(&peer_req->w.list, &device->done_ee);
2418                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2419
2420                         err = -ENOENT;
2421                         goto out;
2422                 } else {
2423                         struct drbd_request *req =
2424                                 container_of(i, struct drbd_request, i);
2425
2426                         if (!equal)
2427                                 drbd_alert(device, "Concurrent writes detected: "
2428                                                "local=%llus +%u, remote=%llus +%u\n",
2429                                           (unsigned long long)i->sector, i->size,
2430                                           (unsigned long long)sector, size);
2431
2432                         if (req->rq_state & RQ_LOCAL_PENDING ||
2433                             !(req->rq_state & RQ_POSTPONED)) {
2434                                 /*
2435                                  * Wait for the node with the discard flag to
2436                                  * decide if this request has been superseded
2437                                  * or needs to be retried.
2438                                  * Requests that have been superseded will
2439                                  * disappear from the write_requests tree.
2440                                  *
2441                                  * In addition, wait for the conflicting
2442                                  * request to finish locally before submitting
2443                                  * the conflicting peer request.
2444                                  */
2445                                 err = drbd_wait_misc(device, &req->i);
2446                                 if (err) {
2447                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2448                                         fail_postponed_requests(device, sector, size);
2449                                         goto out;
2450                                 }
2451                                 goto repeat;
2452                         }
2453                         /*
2454                          * Remember to restart the conflicting requests after
2455                          * the new peer request has completed.
2456                          */
2457                         peer_req->flags |= EE_RESTART_REQUESTS;
2458                 }
2459         }
2460         err = 0;
2461
2462     out:
2463         if (err)
2464                 drbd_remove_epoch_entry_interval(device, peer_req);
2465         return err;
2466 }
2467
2468 /* mirrored write */
2469 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2470 {
2471         struct drbd_peer_device *peer_device;
2472         struct drbd_device *device;
2473         struct net_conf *nc;
2474         sector_t sector;
2475         struct drbd_peer_request *peer_req;
2476         struct p_data *p = pi->data;
2477         u32 peer_seq = be32_to_cpu(p->seq_num);
2478         int op, op_flags;
2479         u32 dp_flags;
2480         int err, tp;
2481
2482         peer_device = conn_peer_device(connection, pi->vnr);
2483         if (!peer_device)
2484                 return -EIO;
2485         device = peer_device->device;
2486
2487         if (!get_ldev(device)) {
2488                 int err2;
2489
2490                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2491                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2492                 atomic_inc(&connection->current_epoch->epoch_size);
2493                 err2 = drbd_drain_block(peer_device, pi->size);
2494                 if (!err)
2495                         err = err2;
2496                 return err;
2497         }
2498
2499         /*
2500          * Corresponding put_ldev done either below (on various errors), or in
2501          * drbd_peer_request_endio, if we successfully submit the data at the
2502          * end of this function.
2503          */
2504
2505         sector = be64_to_cpu(p->sector);
2506         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2507         if (!peer_req) {
2508                 put_ldev(device);
2509                 return -EIO;
2510         }
2511
2512         peer_req->w.cb = e_end_block;
2513         peer_req->submit_jif = jiffies;
2514         peer_req->flags |= EE_APPLICATION;
2515
2516         dp_flags = be32_to_cpu(p->dp_flags);
2517         op = wire_flags_to_bio_op(dp_flags);
2518         op_flags = wire_flags_to_bio_flags(dp_flags);
2519         if (pi->cmd == P_TRIM) {
2520                 D_ASSERT(peer_device, peer_req->i.size > 0);
2521                 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2522                 D_ASSERT(peer_device, peer_req->pages == NULL);
2523         } else if (peer_req->pages == NULL) {
2524                 D_ASSERT(device, peer_req->i.size == 0);
2525                 D_ASSERT(device, dp_flags & DP_FLUSH);
2526         }
2527
2528         if (dp_flags & DP_MAY_SET_IN_SYNC)
2529                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2530
2531         spin_lock(&connection->epoch_lock);
2532         peer_req->epoch = connection->current_epoch;
2533         atomic_inc(&peer_req->epoch->epoch_size);
2534         atomic_inc(&peer_req->epoch->active);
2535         spin_unlock(&connection->epoch_lock);
2536
2537         rcu_read_lock();
2538         nc = rcu_dereference(peer_device->connection->net_conf);
2539         tp = nc->two_primaries;
2540         if (peer_device->connection->agreed_pro_version < 100) {
2541                 switch (nc->wire_protocol) {
2542                 case DRBD_PROT_C:
2543                         dp_flags |= DP_SEND_WRITE_ACK;
2544                         break;
2545                 case DRBD_PROT_B:
2546                         dp_flags |= DP_SEND_RECEIVE_ACK;
2547                         break;
2548                 }
2549         }
2550         rcu_read_unlock();
2551
2552         if (dp_flags & DP_SEND_WRITE_ACK) {
2553                 peer_req->flags |= EE_SEND_WRITE_ACK;
2554                 inc_unacked(device);
2555                 /* corresponding dec_unacked() in e_end_block()
2556                  * respective _drbd_clear_done_ee */
2557         }
2558
2559         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2560                 /* I really don't like it that the receiver thread
2561                  * sends on the msock, but anyways */
2562                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2563         }
2564
2565         if (tp) {
2566                 /* two primaries implies protocol C */
2567                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2568                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2569                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2570                 if (err)
2571                         goto out_interrupted;
2572                 spin_lock_irq(&device->resource->req_lock);
2573                 err = handle_write_conflicts(device, peer_req);
2574                 if (err) {
2575                         spin_unlock_irq(&device->resource->req_lock);
2576                         if (err == -ENOENT) {
2577                                 put_ldev(device);
2578                                 return 0;
2579                         }
2580                         goto out_interrupted;
2581                 }
2582         } else {
2583                 update_peer_seq(peer_device, peer_seq);
2584                 spin_lock_irq(&device->resource->req_lock);
2585         }
2586         /* TRIM and WRITE_SAME are processed synchronously,
2587          * we wait for all pending requests, respectively wait for
2588          * active_ee to become empty in drbd_submit_peer_request();
2589          * better not add ourselves here. */
2590         if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2591                 list_add_tail(&peer_req->w.list, &device->active_ee);
2592         spin_unlock_irq(&device->resource->req_lock);
2593
2594         if (device->state.conn == C_SYNC_TARGET)
2595                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2596
2597         if (device->state.pdsk < D_INCONSISTENT) {
2598                 /* In case we have the only disk of the cluster, */
2599                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2600                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2601                 drbd_al_begin_io(device, &peer_req->i);
2602                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2603         }
2604
2605         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2606                                        DRBD_FAULT_DT_WR);
2607         if (!err)
2608                 return 0;
2609
2610         /* don't care for the reason here */
2611         drbd_err(device, "submit failed, triggering re-connect\n");
2612         spin_lock_irq(&device->resource->req_lock);
2613         list_del(&peer_req->w.list);
2614         drbd_remove_epoch_entry_interval(device, peer_req);
2615         spin_unlock_irq(&device->resource->req_lock);
2616         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2617                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2618                 drbd_al_complete_io(device, &peer_req->i);
2619         }
2620
2621 out_interrupted:
2622         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2623         put_ldev(device);
2624         drbd_free_peer_req(device, peer_req);
2625         return err;
2626 }
2627
2628 /* We may throttle resync, if the lower device seems to be busy,
2629  * and current sync rate is above c_min_rate.
2630  *
2631  * To decide whether or not the lower device is busy, we use a scheme similar
2632  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2633  * (more than 64 sectors) of activity we cannot account for with our own resync
2634  * activity, it obviously is "busy".
2635  *
2636  * The current sync rate used here uses only the most recent two step marks,
2637  * to have a short time average so we can react faster.
2638  */
2639 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2640                 bool throttle_if_app_is_waiting)
2641 {
2642         struct lc_element *tmp;
2643         bool throttle = drbd_rs_c_min_rate_throttle(device);
2644
2645         if (!throttle || throttle_if_app_is_waiting)
2646                 return throttle;
2647
2648         spin_lock_irq(&device->al_lock);
2649         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2650         if (tmp) {
2651                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2652                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2653                         throttle = false;
2654                 /* Do not slow down if app IO is already waiting for this extent,
2655                  * and our progress is necessary for application IO to complete. */
2656         }
2657         spin_unlock_irq(&device->al_lock);
2658
2659         return throttle;
2660 }
2661
2662 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2663 {
2664         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2665         unsigned long db, dt, dbdt;
2666         unsigned int c_min_rate;
2667         int curr_events;
2668
2669         rcu_read_lock();
2670         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2671         rcu_read_unlock();
2672
2673         /* feature disabled? */
2674         if (c_min_rate == 0)
2675                 return false;
2676
2677         curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2678                         atomic_read(&device->rs_sect_ev);
2679
2680         if (atomic_read(&device->ap_actlog_cnt)
2681             || curr_events - device->rs_last_events > 64) {
2682                 unsigned long rs_left;
2683                 int i;
2684
2685                 device->rs_last_events = curr_events;
2686
2687                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2688                  * approx. */
2689                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2690
2691                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2692                         rs_left = device->ov_left;
2693                 else
2694                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2695
2696                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2697                 if (!dt)
2698                         dt++;
2699                 db = device->rs_mark_left[i] - rs_left;
2700                 dbdt = Bit2KB(db/dt);
2701
2702                 if (dbdt > c_min_rate)
2703                         return true;
2704         }
2705         return false;
2706 }
2707
2708 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2709 {
2710         struct drbd_peer_device *peer_device;
2711         struct drbd_device *device;
2712         sector_t sector;
2713         sector_t capacity;
2714         struct drbd_peer_request *peer_req;
2715         struct digest_info *di = NULL;
2716         int size, verb;
2717         unsigned int fault_type;
2718         struct p_block_req *p = pi->data;
2719
2720         peer_device = conn_peer_device(connection, pi->vnr);
2721         if (!peer_device)
2722                 return -EIO;
2723         device = peer_device->device;
2724         capacity = drbd_get_capacity(device->this_bdev);
2725
2726         sector = be64_to_cpu(p->sector);
2727         size   = be32_to_cpu(p->blksize);
2728
2729         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2730                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2731                                 (unsigned long long)sector, size);
2732                 return -EINVAL;
2733         }
2734         if (sector + (size>>9) > capacity) {
2735                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2736                                 (unsigned long long)sector, size);
2737                 return -EINVAL;
2738         }
2739
2740         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2741                 verb = 1;
2742                 switch (pi->cmd) {
2743                 case P_DATA_REQUEST:
2744                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2745                         break;
2746                 case P_RS_THIN_REQ:
2747                 case P_RS_DATA_REQUEST:
2748                 case P_CSUM_RS_REQUEST:
2749                 case P_OV_REQUEST:
2750                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2751                         break;
2752                 case P_OV_REPLY:
2753                         verb = 0;
2754                         dec_rs_pending(device);
2755                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2756                         break;
2757                 default:
2758                         BUG();
2759                 }
2760                 if (verb && __ratelimit(&drbd_ratelimit_state))
2761                         drbd_err(device, "Can not satisfy peer's read request, "
2762                             "no local data.\n");
2763
2764                 /* drain possibly payload */
2765                 return drbd_drain_block(peer_device, pi->size);
2766         }
2767
2768         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2769          * "criss-cross" setup, that might cause write-out on some other DRBD,
2770          * which in turn might block on the other node at this very place.  */
2771         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2772                         size, GFP_NOIO);
2773         if (!peer_req) {
2774                 put_ldev(device);
2775                 return -ENOMEM;
2776         }
2777
2778         switch (pi->cmd) {
2779         case P_DATA_REQUEST:
2780                 peer_req->w.cb = w_e_end_data_req;
2781                 fault_type = DRBD_FAULT_DT_RD;
2782                 /* application IO, don't drbd_rs_begin_io */
2783                 peer_req->flags |= EE_APPLICATION;
2784                 goto submit;
2785
2786         case P_RS_THIN_REQ:
2787                 /* If at some point in the future we have a smart way to
2788                    find out if this data block is completely deallocated,
2789                    then we would do something smarter here than reading
2790                    the block... */
2791                 peer_req->flags |= EE_RS_THIN_REQ;
2792                 /* fall through */
2793         case P_RS_DATA_REQUEST:
2794                 peer_req->w.cb = w_e_end_rsdata_req;
2795                 fault_type = DRBD_FAULT_RS_RD;
2796                 /* used in the sector offset progress display */
2797                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2798                 break;
2799
2800         case P_OV_REPLY:
2801         case P_CSUM_RS_REQUEST:
2802                 fault_type = DRBD_FAULT_RS_RD;
2803                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2804                 if (!di)
2805                         goto out_free_e;
2806
2807                 di->digest_size = pi->size;
2808                 di->digest = (((char *)di)+sizeof(struct digest_info));
2809
2810                 peer_req->digest = di;
2811                 peer_req->flags |= EE_HAS_DIGEST;
2812
2813                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2814                         goto out_free_e;
2815
2816                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2817                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2818                         peer_req->w.cb = w_e_end_csum_rs_req;
2819                         /* used in the sector offset progress display */
2820                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2821                         /* remember to report stats in drbd_resync_finished */
2822                         device->use_csums = true;
2823                 } else if (pi->cmd == P_OV_REPLY) {
2824                         /* track progress, we may need to throttle */
2825                         atomic_add(size >> 9, &device->rs_sect_in);
2826                         peer_req->w.cb = w_e_end_ov_reply;
2827                         dec_rs_pending(device);
2828                         /* drbd_rs_begin_io done when we sent this request,
2829                          * but accounting still needs to be done. */
2830                         goto submit_for_resync;
2831                 }
2832                 break;
2833
2834         case P_OV_REQUEST:
2835                 if (device->ov_start_sector == ~(sector_t)0 &&
2836                     peer_device->connection->agreed_pro_version >= 90) {
2837                         unsigned long now = jiffies;
2838                         int i;
2839                         device->ov_start_sector = sector;
2840                         device->ov_position = sector;
2841                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2842                         device->rs_total = device->ov_left;
2843                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2844                                 device->rs_mark_left[i] = device->ov_left;
2845                                 device->rs_mark_time[i] = now;
2846                         }
2847                         drbd_info(device, "Online Verify start sector: %llu\n",
2848                                         (unsigned long long)sector);
2849                 }
2850                 peer_req->w.cb = w_e_end_ov_req;
2851                 fault_type = DRBD_FAULT_RS_RD;
2852                 break;
2853
2854         default:
2855                 BUG();
2856         }
2857
2858         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2859          * wrt the receiver, but it is not as straightforward as it may seem.
2860          * Various places in the resync start and stop logic assume resync
2861          * requests are processed in order, requeuing this on the worker thread
2862          * introduces a bunch of new code for synchronization between threads.
2863          *
2864          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2865          * "forever", throttling after drbd_rs_begin_io will lock that extent
2866          * for application writes for the same time.  For now, just throttle
2867          * here, where the rest of the code expects the receiver to sleep for
2868          * a while, anyways.
2869          */
2870
2871         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2872          * this defers syncer requests for some time, before letting at least
2873          * on request through.  The resync controller on the receiving side
2874          * will adapt to the incoming rate accordingly.
2875          *
2876          * We cannot throttle here if remote is Primary/SyncTarget:
2877          * we would also throttle its application reads.
2878          * In that case, throttling is done on the SyncTarget only.
2879          */
2880
2881         /* Even though this may be a resync request, we do add to "read_ee";
2882          * "sync_ee" is only used for resync WRITEs.
2883          * Add to list early, so debugfs can find this request
2884          * even if we have to sleep below. */
2885         spin_lock_irq(&device->resource->req_lock);
2886         list_add_tail(&peer_req->w.list, &device->read_ee);
2887         spin_unlock_irq(&device->resource->req_lock);
2888
2889         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2890         if (device->state.peer != R_PRIMARY
2891         && drbd_rs_should_slow_down(device, sector, false))
2892                 schedule_timeout_uninterruptible(HZ/10);
2893         update_receiver_timing_details(connection, drbd_rs_begin_io);
2894         if (drbd_rs_begin_io(device, sector))
2895                 goto out_free_e;
2896
2897 submit_for_resync:
2898         atomic_add(size >> 9, &device->rs_sect_ev);
2899
2900 submit:
2901         update_receiver_timing_details(connection, drbd_submit_peer_request);
2902         inc_unacked(device);
2903         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2904                                      fault_type) == 0)
2905                 return 0;
2906
2907         /* don't care for the reason here */
2908         drbd_err(device, "submit failed, triggering re-connect\n");
2909
2910 out_free_e:
2911         spin_lock_irq(&device->resource->req_lock);
2912         list_del(&peer_req->w.list);
2913         spin_unlock_irq(&device->resource->req_lock);
2914         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2915
2916         put_ldev(device);
2917         drbd_free_peer_req(device, peer_req);
2918         return -EIO;
2919 }
2920
2921 /**
2922  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2923  */
2924 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2925 {
2926         struct drbd_device *device = peer_device->device;
2927         int self, peer, rv = -100;
2928         unsigned long ch_self, ch_peer;
2929         enum drbd_after_sb_p after_sb_0p;
2930
2931         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2932         peer = device->p_uuid[UI_BITMAP] & 1;
2933
2934         ch_peer = device->p_uuid[UI_SIZE];
2935         ch_self = device->comm_bm_set;
2936
2937         rcu_read_lock();
2938         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2939         rcu_read_unlock();
2940         switch (after_sb_0p) {
2941         case ASB_CONSENSUS:
2942         case ASB_DISCARD_SECONDARY:
2943         case ASB_CALL_HELPER:
2944         case ASB_VIOLENTLY:
2945                 drbd_err(device, "Configuration error.\n");
2946                 break;
2947         case ASB_DISCONNECT:
2948                 break;
2949         case ASB_DISCARD_YOUNGER_PRI:
2950                 if (self == 0 && peer == 1) {
2951                         rv = -1;
2952                         break;
2953                 }
2954                 if (self == 1 && peer == 0) {
2955                         rv =  1;
2956                         break;
2957                 }
2958                 /* Else fall through to one of the other strategies... */
2959         case ASB_DISCARD_OLDER_PRI:
2960                 if (self == 0 && peer == 1) {
2961                         rv = 1;
2962                         break;
2963                 }
2964                 if (self == 1 && peer == 0) {
2965                         rv = -1;
2966                         break;
2967                 }
2968                 /* Else fall through to one of the other strategies... */
2969                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2970                      "Using discard-least-changes instead\n");
2971                 /* fall through */
2972         case ASB_DISCARD_ZERO_CHG:
2973                 if (ch_peer == 0 && ch_self == 0) {
2974                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2975                                 ? -1 : 1;
2976                         break;
2977                 } else {
2978                         if (ch_peer == 0) { rv =  1; break; }
2979                         if (ch_self == 0) { rv = -1; break; }
2980                 }
2981                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2982                         break;
2983                 /* else: fall through */
2984         case ASB_DISCARD_LEAST_CHG:
2985                 if      (ch_self < ch_peer)
2986                         rv = -1;
2987                 else if (ch_self > ch_peer)
2988                         rv =  1;
2989                 else /* ( ch_self == ch_peer ) */
2990                      /* Well, then use something else. */
2991                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2992                                 ? -1 : 1;
2993                 break;
2994         case ASB_DISCARD_LOCAL:
2995                 rv = -1;
2996                 break;
2997         case ASB_DISCARD_REMOTE:
2998                 rv =  1;
2999         }
3000
3001         return rv;
3002 }
3003
3004 /**
3005  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3006  */
3007 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3008 {
3009         struct drbd_device *device = peer_device->device;
3010         int hg, rv = -100;
3011         enum drbd_after_sb_p after_sb_1p;
3012
3013         rcu_read_lock();
3014         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3015         rcu_read_unlock();
3016         switch (after_sb_1p) {
3017         case ASB_DISCARD_YOUNGER_PRI:
3018         case ASB_DISCARD_OLDER_PRI:
3019         case ASB_DISCARD_LEAST_CHG:
3020         case ASB_DISCARD_LOCAL:
3021         case ASB_DISCARD_REMOTE:
3022         case ASB_DISCARD_ZERO_CHG:
3023                 drbd_err(device, "Configuration error.\n");
3024                 break;
3025         case ASB_DISCONNECT:
3026                 break;
3027         case ASB_CONSENSUS:
3028                 hg = drbd_asb_recover_0p(peer_device);
3029                 if (hg == -1 && device->state.role == R_SECONDARY)
3030                         rv = hg;
3031                 if (hg == 1  && device->state.role == R_PRIMARY)
3032                         rv = hg;
3033                 break;
3034         case ASB_VIOLENTLY:
3035                 rv = drbd_asb_recover_0p(peer_device);
3036                 break;
3037         case ASB_DISCARD_SECONDARY:
3038                 return device->state.role == R_PRIMARY ? 1 : -1;
3039         case ASB_CALL_HELPER:
3040                 hg = drbd_asb_recover_0p(peer_device);
3041                 if (hg == -1 && device->state.role == R_PRIMARY) {
3042                         enum drbd_state_rv rv2;
3043
3044                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3045                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3046                           * we do not need to wait for the after state change work either. */
3047                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3048                         if (rv2 != SS_SUCCESS) {
3049                                 drbd_khelper(device, "pri-lost-after-sb");
3050                         } else {
3051                                 drbd_warn(device, "Successfully gave up primary role.\n");
3052                                 rv = hg;
3053                         }
3054                 } else
3055                         rv = hg;
3056         }
3057
3058         return rv;
3059 }
3060
3061 /**
3062  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3063  */
3064 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3065 {
3066         struct drbd_device *device = peer_device->device;
3067         int hg, rv = -100;
3068         enum drbd_after_sb_p after_sb_2p;
3069
3070         rcu_read_lock();
3071         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3072         rcu_read_unlock();
3073         switch (after_sb_2p) {
3074         case ASB_DISCARD_YOUNGER_PRI:
3075         case ASB_DISCARD_OLDER_PRI:
3076         case ASB_DISCARD_LEAST_CHG:
3077         case ASB_DISCARD_LOCAL:
3078         case ASB_DISCARD_REMOTE:
3079         case ASB_CONSENSUS:
3080         case ASB_DISCARD_SECONDARY:
3081         case ASB_DISCARD_ZERO_CHG:
3082                 drbd_err(device, "Configuration error.\n");
3083                 break;
3084         case ASB_VIOLENTLY:
3085                 rv = drbd_asb_recover_0p(peer_device);
3086                 break;
3087         case ASB_DISCONNECT:
3088                 break;
3089         case ASB_CALL_HELPER:
3090                 hg = drbd_asb_recover_0p(peer_device);
3091                 if (hg == -1) {
3092                         enum drbd_state_rv rv2;
3093
3094                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3095                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3096                           * we do not need to wait for the after state change work either. */
3097                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3098                         if (rv2 != SS_SUCCESS) {
3099                                 drbd_khelper(device, "pri-lost-after-sb");
3100                         } else {
3101                                 drbd_warn(device, "Successfully gave up primary role.\n");
3102                                 rv = hg;
3103                         }
3104                 } else
3105                         rv = hg;
3106         }
3107
3108         return rv;
3109 }
3110
3111 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3112                            u64 bits, u64 flags)
3113 {
3114         if (!uuid) {
3115                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3116                 return;
3117         }
3118         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3119              text,
3120              (unsigned long long)uuid[UI_CURRENT],
3121              (unsigned long long)uuid[UI_BITMAP],
3122              (unsigned long long)uuid[UI_HISTORY_START],
3123              (unsigned long long)uuid[UI_HISTORY_END],
3124              (unsigned long long)bits,
3125              (unsigned long long)flags);
3126 }
3127
3128 /*
3129   100   after split brain try auto recover
3130     2   C_SYNC_SOURCE set BitMap
3131     1   C_SYNC_SOURCE use BitMap
3132     0   no Sync
3133    -1   C_SYNC_TARGET use BitMap
3134    -2   C_SYNC_TARGET set BitMap
3135  -100   after split brain, disconnect
3136 -1000   unrelated data
3137 -1091   requires proto 91
3138 -1096   requires proto 96
3139  */
3140
3141 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3142 {
3143         struct drbd_peer_device *const peer_device = first_peer_device(device);
3144         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3145         u64 self, peer;
3146         int i, j;
3147
3148         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3149         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3150
3151         *rule_nr = 10;
3152         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3153                 return 0;
3154
3155         *rule_nr = 20;
3156         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3157              peer != UUID_JUST_CREATED)
3158                 return -2;
3159
3160         *rule_nr = 30;
3161         if (self != UUID_JUST_CREATED &&
3162             (peer == UUID_JUST_CREATED || peer == (u64)0))
3163                 return 2;
3164
3165         if (self == peer) {
3166                 int rct, dc; /* roles at crash time */
3167
3168                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3169
3170                         if (connection->agreed_pro_version < 91)
3171                                 return -1091;
3172
3173                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3174                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3175                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3176                                 drbd_uuid_move_history(device);
3177                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3178                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3179
3180                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3181                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3182                                 *rule_nr = 34;
3183                         } else {
3184                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3185                                 *rule_nr = 36;
3186                         }
3187
3188                         return 1;
3189                 }
3190
3191                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3192
3193                         if (connection->agreed_pro_version < 91)
3194                                 return -1091;
3195
3196                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3197                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3198                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3199
3200                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3201                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3202                                 device->p_uuid[UI_BITMAP] = 0UL;
3203
3204                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3205                                 *rule_nr = 35;
3206                         } else {
3207                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3208                                 *rule_nr = 37;
3209                         }
3210
3211                         return -1;
3212                 }
3213
3214                 /* Common power [off|failure] */
3215                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3216                         (device->p_uuid[UI_FLAGS] & 2);
3217                 /* lowest bit is set when we were primary,
3218                  * next bit (weight 2) is set when peer was primary */
3219                 *rule_nr = 40;
3220
3221                 /* Neither has the "crashed primary" flag set,
3222                  * only a replication link hickup. */
3223                 if (rct == 0)
3224                         return 0;
3225
3226                 /* Current UUID equal and no bitmap uuid; does not necessarily
3227                  * mean this was a "simultaneous hard crash", maybe IO was
3228                  * frozen, so no UUID-bump happened.
3229                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3230                  * for "new-enough" peer DRBD version. */
3231                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3232                         *rule_nr = 41;
3233                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3234                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3235                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3236                         }
3237                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3238                                 /* At least one has the "crashed primary" bit set,
3239                                  * both are primary now, but neither has rotated its UUIDs?
3240                                  * "Can not happen." */
3241                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3242                                 return -100;
3243                         }
3244                         if (device->state.role == R_PRIMARY)
3245                                 return 1;
3246                         return -1;
3247                 }
3248
3249                 /* Both are secondary.
3250                  * Really looks like recovery from simultaneous hard crash.
3251                  * Check which had been primary before, and arbitrate. */
3252                 switch (rct) {
3253                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3254                 case 1: /*  self_pri && !peer_pri */ return 1;
3255                 case 2: /* !self_pri &&  peer_pri */ return -1;
3256                 case 3: /*  self_pri &&  peer_pri */
3257                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3258                         return dc ? -1 : 1;
3259                 }
3260         }
3261
3262         *rule_nr = 50;
3263         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3264         if (self == peer)
3265                 return -1;
3266
3267         *rule_nr = 51;
3268         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3269         if (self == peer) {
3270                 if (connection->agreed_pro_version < 96 ?
3271                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3272                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3273                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3274                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3275                            resync as sync source modifications of the peer's UUIDs. */
3276
3277                         if (connection->agreed_pro_version < 91)
3278                                 return -1091;
3279
3280                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3281                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3282
3283                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3284                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3285
3286                         return -1;
3287                 }
3288         }
3289
3290         *rule_nr = 60;
3291         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3292         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3293                 peer = device->p_uuid[i] & ~((u64)1);
3294                 if (self == peer)
3295                         return -2;
3296         }
3297
3298         *rule_nr = 70;
3299         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3300         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3301         if (self == peer)
3302                 return 1;
3303
3304         *rule_nr = 71;
3305         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3306         if (self == peer) {
3307                 if (connection->agreed_pro_version < 96 ?
3308                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3309                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3310                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3311                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3312                            resync as sync source modifications of our UUIDs. */
3313
3314                         if (connection->agreed_pro_version < 91)
3315                                 return -1091;
3316
3317                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3318                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3319
3320                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3321                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3322                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3323
3324                         return 1;
3325                 }
3326         }
3327
3328
3329         *rule_nr = 80;
3330         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3331         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3332                 self = device->ldev->md.uuid[i] & ~((u64)1);
3333                 if (self == peer)
3334                         return 2;
3335         }
3336
3337         *rule_nr = 90;
3338         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3339         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3340         if (self == peer && self != ((u64)0))
3341                 return 100;
3342
3343         *rule_nr = 100;
3344         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3345                 self = device->ldev->md.uuid[i] & ~((u64)1);
3346                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3347                         peer = device->p_uuid[j] & ~((u64)1);
3348                         if (self == peer)
3349                                 return -100;
3350                 }
3351         }
3352
3353         return -1000;
3354 }
3355
3356 /* drbd_sync_handshake() returns the new conn state on success, or
3357    CONN_MASK (-1) on failure.
3358  */
3359 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3360                                            enum drbd_role peer_role,
3361                                            enum drbd_disk_state peer_disk) __must_hold(local)
3362 {
3363         struct drbd_device *device = peer_device->device;
3364         enum drbd_conns rv = C_MASK;
3365         enum drbd_disk_state mydisk;
3366         struct net_conf *nc;
3367         int hg, rule_nr, rr_conflict, tentative, always_asbp;
3368
3369         mydisk = device->state.disk;
3370         if (mydisk == D_NEGOTIATING)
3371                 mydisk = device->new_state_tmp.disk;
3372
3373         drbd_info(device, "drbd_sync_handshake:\n");
3374
3375         spin_lock_irq(&device->ldev->md.uuid_lock);
3376         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3377         drbd_uuid_dump(device, "peer", device->p_uuid,
3378                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3379
3380         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3381         spin_unlock_irq(&device->ldev->md.uuid_lock);
3382
3383         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3384
3385         if (hg == -1000) {
3386                 drbd_alert(device, "Unrelated data, aborting!\n");
3387                 return C_MASK;
3388         }
3389         if (hg < -0x10000) {
3390                 int proto, fflags;
3391                 hg = -hg;
3392                 proto = hg & 0xff;
3393                 fflags = (hg >> 8) & 0xff;
3394                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3395                                         proto, fflags);
3396                 return C_MASK;
3397         }
3398         if (hg < -1000) {
3399                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3400                 return C_MASK;
3401         }
3402
3403         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3404             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3405                 int f = (hg == -100) || abs(hg) == 2;
3406                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3407                 if (f)
3408                         hg = hg*2;
3409                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3410                      hg > 0 ? "source" : "target");
3411         }
3412
3413         if (abs(hg) == 100)
3414                 drbd_khelper(device, "initial-split-brain");
3415
3416         rcu_read_lock();
3417         nc = rcu_dereference(peer_device->connection->net_conf);
3418         always_asbp = nc->always_asbp;
3419         rr_conflict = nc->rr_conflict;
3420         tentative = nc->tentative;
3421         rcu_read_unlock();
3422
3423         if (hg == 100 || (hg == -100 && always_asbp)) {
3424                 int pcount = (device->state.role == R_PRIMARY)
3425                            + (peer_role == R_PRIMARY);
3426                 int forced = (hg == -100);
3427
3428                 switch (pcount) {
3429                 case 0:
3430                         hg = drbd_asb_recover_0p(peer_device);
3431                         break;
3432                 case 1:
3433                         hg = drbd_asb_recover_1p(peer_device);
3434                         break;
3435                 case 2:
3436                         hg = drbd_asb_recover_2p(peer_device);
3437                         break;
3438                 }
3439                 if (abs(hg) < 100) {
3440                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3441                              "automatically solved. Sync from %s node\n",
3442                              pcount, (hg < 0) ? "peer" : "this");
3443                         if (forced) {
3444                                 drbd_warn(device, "Doing a full sync, since"
3445                                      " UUIDs where ambiguous.\n");
3446                                 hg = hg*2;
3447                         }
3448                 }
3449         }
3450
3451         if (hg == -100) {
3452                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3453                         hg = -1;
3454                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3455                         hg = 1;
3456
3457                 if (abs(hg) < 100)
3458                         drbd_warn(device, "Split-Brain detected, manually solved. "
3459                              "Sync from %s node\n",
3460                              (hg < 0) ? "peer" : "this");
3461         }
3462
3463         if (hg == -100) {
3464                 /* FIXME this log message is not correct if we end up here
3465                  * after an attempted attach on a diskless node.
3466                  * We just refuse to attach -- well, we drop the "connection"
3467                  * to that disk, in a way... */
3468                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3469                 drbd_khelper(device, "split-brain");
3470                 return C_MASK;
3471         }
3472
3473         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3474                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3475                 return C_MASK;
3476         }
3477
3478         if (hg < 0 && /* by intention we do not use mydisk here. */
3479             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3480                 switch (rr_conflict) {
3481                 case ASB_CALL_HELPER:
3482                         drbd_khelper(device, "pri-lost");
3483                         /* fall through */
3484                 case ASB_DISCONNECT:
3485                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3486                         return C_MASK;
3487                 case ASB_VIOLENTLY:
3488                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3489                              "assumption\n");
3490                 }
3491         }
3492
3493         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3494                 if (hg == 0)
3495                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3496                 else
3497                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3498                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3499                                  abs(hg) >= 2 ? "full" : "bit-map based");
3500                 return C_MASK;
3501         }
3502
3503         if (abs(hg) >= 2) {
3504                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3505                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3506                                         BM_LOCKED_SET_ALLOWED))
3507                         return C_MASK;
3508         }
3509
3510         if (hg > 0) { /* become sync source. */
3511                 rv = C_WF_BITMAP_S;
3512         } else if (hg < 0) { /* become sync target */
3513                 rv = C_WF_BITMAP_T;
3514         } else {
3515                 rv = C_CONNECTED;
3516                 if (drbd_bm_total_weight(device)) {
3517                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3518                              drbd_bm_total_weight(device));
3519                 }
3520         }
3521
3522         return rv;
3523 }
3524
3525 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3526 {
3527         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3528         if (peer == ASB_DISCARD_REMOTE)
3529                 return ASB_DISCARD_LOCAL;
3530
3531         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3532         if (peer == ASB_DISCARD_LOCAL)
3533                 return ASB_DISCARD_REMOTE;
3534
3535         /* everything else is valid if they are equal on both sides. */
3536         return peer;
3537 }
3538
3539 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3540 {
3541         struct p_protocol *p = pi->data;
3542         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3543         int p_proto, p_discard_my_data, p_two_primaries, cf;
3544         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3545         char integrity_alg[SHARED_SECRET_MAX] = "";
3546         struct crypto_ahash *peer_integrity_tfm = NULL;
3547         void *int_dig_in = NULL, *int_dig_vv = NULL;
3548
3549         p_proto         = be32_to_cpu(p->protocol);
3550         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3551         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3552         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3553         p_two_primaries = be32_to_cpu(p->two_primaries);
3554         cf              = be32_to_cpu(p->conn_flags);
3555         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3556
3557         if (connection->agreed_pro_version >= 87) {
3558                 int err;
3559
3560                 if (pi->size > sizeof(integrity_alg))
3561                         return -EIO;
3562                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3563                 if (err)
3564                         return err;
3565                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3566         }
3567
3568         if (pi->cmd != P_PROTOCOL_UPDATE) {
3569                 clear_bit(CONN_DRY_RUN, &connection->flags);
3570
3571                 if (cf & CF_DRY_RUN)
3572                         set_bit(CONN_DRY_RUN, &connection->flags);
3573
3574                 rcu_read_lock();
3575                 nc = rcu_dereference(connection->net_conf);
3576
3577                 if (p_proto != nc->wire_protocol) {
3578                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3579                         goto disconnect_rcu_unlock;
3580                 }
3581
3582                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3583                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3584                         goto disconnect_rcu_unlock;
3585                 }
3586
3587                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3588                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3589                         goto disconnect_rcu_unlock;
3590                 }
3591
3592                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3593                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3594                         goto disconnect_rcu_unlock;
3595                 }
3596
3597                 if (p_discard_my_data && nc->discard_my_data) {
3598                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3599                         goto disconnect_rcu_unlock;
3600                 }
3601
3602                 if (p_two_primaries != nc->two_primaries) {
3603                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3604                         goto disconnect_rcu_unlock;
3605                 }
3606
3607                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3608                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3609                         goto disconnect_rcu_unlock;
3610                 }
3611
3612                 rcu_read_unlock();
3613         }
3614
3615         if (integrity_alg[0]) {
3616                 int hash_size;
3617
3618                 /*
3619                  * We can only change the peer data integrity algorithm
3620                  * here.  Changing our own data integrity algorithm
3621                  * requires that we send a P_PROTOCOL_UPDATE packet at
3622                  * the same time; otherwise, the peer has no way to
3623                  * tell between which packets the algorithm should
3624                  * change.
3625                  */
3626
3627                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3628                 if (IS_ERR(peer_integrity_tfm)) {
3629                         peer_integrity_tfm = NULL;
3630                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3631                                  integrity_alg);
3632                         goto disconnect;
3633                 }
3634
3635                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3636                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3637                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3638                 if (!(int_dig_in && int_dig_vv)) {
3639                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3640                         goto disconnect;
3641                 }
3642         }
3643
3644         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3645         if (!new_net_conf) {
3646                 drbd_err(connection, "Allocation of new net_conf failed\n");
3647                 goto disconnect;
3648         }
3649
3650         mutex_lock(&connection->data.mutex);
3651         mutex_lock(&connection->resource->conf_update);
3652         old_net_conf = connection->net_conf;
3653         *new_net_conf = *old_net_conf;
3654
3655         new_net_conf->wire_protocol = p_proto;
3656         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3657         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3658         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3659         new_net_conf->two_primaries = p_two_primaries;
3660
3661         rcu_assign_pointer(connection->net_conf, new_net_conf);
3662         mutex_unlock(&connection->resource->conf_update);
3663         mutex_unlock(&connection->data.mutex);
3664
3665         crypto_free_ahash(connection->peer_integrity_tfm);
3666         kfree(connection->int_dig_in);
3667         kfree(connection->int_dig_vv);
3668         connection->peer_integrity_tfm = peer_integrity_tfm;
3669         connection->int_dig_in = int_dig_in;
3670         connection->int_dig_vv = int_dig_vv;
3671
3672         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3673                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3674                           integrity_alg[0] ? integrity_alg : "(none)");
3675
3676         synchronize_rcu();
3677         kfree(old_net_conf);
3678         return 0;
3679
3680 disconnect_rcu_unlock:
3681         rcu_read_unlock();
3682 disconnect:
3683         crypto_free_ahash(peer_integrity_tfm);
3684         kfree(int_dig_in);
3685         kfree(int_dig_vv);
3686         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3687         return -EIO;
3688 }
3689
3690 /* helper function
3691  * input: alg name, feature name
3692  * return: NULL (alg name was "")
3693  *         ERR_PTR(error) if something goes wrong
3694  *         or the crypto hash ptr, if it worked out ok. */
3695 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3696                 const char *alg, const char *name)
3697 {
3698         struct crypto_ahash *tfm;
3699
3700         if (!alg[0])
3701                 return NULL;
3702
3703         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3704         if (IS_ERR(tfm)) {
3705                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3706                         alg, name, PTR_ERR(tfm));
3707                 return tfm;
3708         }
3709         return tfm;
3710 }
3711
3712 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3713 {
3714         void *buffer = connection->data.rbuf;
3715         int size = pi->size;
3716
3717         while (size) {
3718                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3719                 s = drbd_recv(connection, buffer, s);
3720                 if (s <= 0) {
3721                         if (s < 0)
3722                                 return s;
3723                         break;
3724                 }
3725                 size -= s;
3726         }
3727         if (size)
3728                 return -EIO;
3729         return 0;
3730 }
3731
3732 /*
3733  * config_unknown_volume  -  device configuration command for unknown volume
3734  *
3735  * When a device is added to an existing connection, the node on which the
3736  * device is added first will send configuration commands to its peer but the
3737  * peer will not know about the device yet.  It will warn and ignore these
3738  * commands.  Once the device is added on the second node, the second node will
3739  * send the same device configuration commands, but in the other direction.
3740  *
3741  * (We can also end up here if drbd is misconfigured.)
3742  */
3743 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3744 {
3745         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3746                   cmdname(pi->cmd), pi->vnr);
3747         return ignore_remaining_packet(connection, pi);
3748 }
3749
3750 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3751 {
3752         struct drbd_peer_device *peer_device;
3753         struct drbd_device *device;
3754         struct p_rs_param_95 *p;
3755         unsigned int header_size, data_size, exp_max_sz;
3756         struct crypto_ahash *verify_tfm = NULL;
3757         struct crypto_ahash *csums_tfm = NULL;
3758         struct net_conf *old_net_conf, *new_net_conf = NULL;
3759         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3760         const int apv = connection->agreed_pro_version;
3761         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3762         int fifo_size = 0;
3763         int err;
3764
3765         peer_device = conn_peer_device(connection, pi->vnr);
3766         if (!peer_device)
3767                 return config_unknown_volume(connection, pi);
3768         device = peer_device->device;
3769
3770         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3771                     : apv == 88 ? sizeof(struct p_rs_param)
3772                                         + SHARED_SECRET_MAX
3773                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3774                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3775
3776         if (pi->size > exp_max_sz) {
3777                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3778                     pi->size, exp_max_sz);
3779                 return -EIO;
3780         }
3781
3782         if (apv <= 88) {
3783                 header_size = sizeof(struct p_rs_param);
3784                 data_size = pi->size - header_size;
3785         } else if (apv <= 94) {
3786                 header_size = sizeof(struct p_rs_param_89);
3787                 data_size = pi->size - header_size;
3788                 D_ASSERT(device, data_size == 0);
3789         } else {
3790                 header_size = sizeof(struct p_rs_param_95);
3791                 data_size = pi->size - header_size;
3792                 D_ASSERT(device, data_size == 0);
3793         }
3794
3795         /* initialize verify_alg and csums_alg */
3796         p = pi->data;
3797         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3798
3799         err = drbd_recv_all(peer_device->connection, p, header_size);
3800         if (err)
3801                 return err;
3802
3803         mutex_lock(&connection->resource->conf_update);
3804         old_net_conf = peer_device->connection->net_conf;
3805         if (get_ldev(device)) {
3806                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3807                 if (!new_disk_conf) {
3808                         put_ldev(device);
3809                         mutex_unlock(&connection->resource->conf_update);
3810                         drbd_err(device, "Allocation of new disk_conf failed\n");
3811                         return -ENOMEM;
3812                 }
3813
3814                 old_disk_conf = device->ldev->disk_conf;
3815                 *new_disk_conf = *old_disk_conf;
3816
3817                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3818         }
3819
3820         if (apv >= 88) {
3821                 if (apv == 88) {
3822                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3823                                 drbd_err(device, "verify-alg of wrong size, "
3824                                         "peer wants %u, accepting only up to %u byte\n",
3825                                         data_size, SHARED_SECRET_MAX);
3826                                 err = -EIO;
3827                                 goto reconnect;
3828                         }
3829
3830                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3831                         if (err)
3832                                 goto reconnect;
3833                         /* we expect NUL terminated string */
3834                         /* but just in case someone tries to be evil */
3835                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3836                         p->verify_alg[data_size-1] = 0;
3837
3838                 } else /* apv >= 89 */ {
3839                         /* we still expect NUL terminated strings */
3840                         /* but just in case someone tries to be evil */
3841                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3842                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3843                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3844                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3845                 }
3846
3847                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3848                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3849                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3850                                     old_net_conf->verify_alg, p->verify_alg);
3851                                 goto disconnect;
3852                         }
3853                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3854                                         p->verify_alg, "verify-alg");
3855                         if (IS_ERR(verify_tfm)) {
3856                                 verify_tfm = NULL;
3857                                 goto disconnect;
3858                         }
3859                 }
3860
3861                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3862                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3863                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3864                                     old_net_conf->csums_alg, p->csums_alg);
3865                                 goto disconnect;
3866                         }
3867                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3868                                         p->csums_alg, "csums-alg");
3869                         if (IS_ERR(csums_tfm)) {
3870                                 csums_tfm = NULL;
3871                                 goto disconnect;
3872                         }
3873                 }
3874
3875                 if (apv > 94 && new_disk_conf) {
3876                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3877                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3878                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3879                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3880
3881                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3882                         if (fifo_size != device->rs_plan_s->size) {
3883                                 new_plan = fifo_alloc(fifo_size);
3884                                 if (!new_plan) {
3885                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3886                                         put_ldev(device);
3887                                         goto disconnect;
3888                                 }
3889                         }
3890                 }
3891
3892                 if (verify_tfm || csums_tfm) {
3893                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3894                         if (!new_net_conf) {
3895                                 drbd_err(device, "Allocation of new net_conf failed\n");
3896                                 goto disconnect;
3897                         }
3898
3899                         *new_net_conf = *old_net_conf;
3900
3901                         if (verify_tfm) {
3902                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3903                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3904                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3905                                 peer_device->connection->verify_tfm = verify_tfm;
3906                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3907                         }
3908                         if (csums_tfm) {
3909                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3910                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3911                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3912                                 peer_device->connection->csums_tfm = csums_tfm;
3913                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3914                         }
3915                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3916                 }
3917         }
3918
3919         if (new_disk_conf) {
3920                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3921                 put_ldev(device);
3922         }
3923
3924         if (new_plan) {
3925                 old_plan = device->rs_plan_s;
3926                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3927         }
3928
3929         mutex_unlock(&connection->resource->conf_update);
3930         synchronize_rcu();
3931         if (new_net_conf)
3932                 kfree(old_net_conf);
3933         kfree(old_disk_conf);
3934         kfree(old_plan);
3935
3936         return 0;
3937
3938 reconnect:
3939         if (new_disk_conf) {
3940                 put_ldev(device);
3941                 kfree(new_disk_conf);
3942         }
3943         mutex_unlock(&connection->resource->conf_update);
3944         return -EIO;
3945
3946 disconnect:
3947         kfree(new_plan);
3948         if (new_disk_conf) {
3949                 put_ldev(device);
3950                 kfree(new_disk_conf);
3951         }
3952         mutex_unlock(&connection->resource->conf_update);
3953         /* just for completeness: actually not needed,
3954          * as this is not reached if csums_tfm was ok. */
3955         crypto_free_ahash(csums_tfm);
3956         /* but free the verify_tfm again, if csums_tfm did not work out */
3957         crypto_free_ahash(verify_tfm);
3958         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3959         return -EIO;
3960 }
3961
3962 /* warn if the arguments differ by more than 12.5% */
3963 static void warn_if_differ_considerably(struct drbd_device *device,
3964         const char *s, sector_t a, sector_t b)
3965 {
3966         sector_t d;
3967         if (a == 0 || b == 0)
3968                 return;
3969         d = (a > b) ? (a - b) : (b - a);
3970         if (d > (a>>3) || d > (b>>3))
3971                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3972                      (unsigned long long)a, (unsigned long long)b);
3973 }
3974
3975 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3976 {
3977         struct drbd_peer_device *peer_device;
3978         struct drbd_device *device;
3979         struct p_sizes *p = pi->data;
3980         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3981         enum determine_dev_size dd = DS_UNCHANGED;
3982         sector_t p_size, p_usize, p_csize, my_usize;
3983         sector_t new_size, cur_size;
3984         int ldsc = 0; /* local disk size changed */
3985         enum dds_flags ddsf;
3986
3987         peer_device = conn_peer_device(connection, pi->vnr);
3988         if (!peer_device)
3989                 return config_unknown_volume(connection, pi);
3990         device = peer_device->device;
3991         cur_size = drbd_get_capacity(device->this_bdev);
3992
3993         p_size = be64_to_cpu(p->d_size);
3994         p_usize = be64_to_cpu(p->u_size);
3995         p_csize = be64_to_cpu(p->c_size);
3996
3997         /* just store the peer's disk size for now.
3998          * we still need to figure out whether we accept that. */
3999         device->p_size = p_size;
4000
4001         if (get_ldev(device)) {
4002                 rcu_read_lock();
4003                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4004                 rcu_read_unlock();
4005
4006                 warn_if_differ_considerably(device, "lower level device sizes",
4007                            p_size, drbd_get_max_capacity(device->ldev));
4008                 warn_if_differ_considerably(device, "user requested size",
4009                                             p_usize, my_usize);
4010
4011                 /* if this is the first connect, or an otherwise expected
4012                  * param exchange, choose the minimum */
4013                 if (device->state.conn == C_WF_REPORT_PARAMS)
4014                         p_usize = min_not_zero(my_usize, p_usize);
4015
4016                 /* Never shrink a device with usable data during connect.
4017                    But allow online shrinking if we are connected. */
4018                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4019                 if (new_size < cur_size &&
4020                     device->state.disk >= D_OUTDATED &&
4021                     device->state.conn < C_CONNECTED) {
4022                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4023                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4024                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4025                         put_ldev(device);
4026                         return -EIO;
4027                 }
4028
4029                 if (my_usize != p_usize) {
4030                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4031
4032                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4033                         if (!new_disk_conf) {
4034                                 drbd_err(device, "Allocation of new disk_conf failed\n");
4035                                 put_ldev(device);
4036                                 return -ENOMEM;
4037                         }
4038
4039                         mutex_lock(&connection->resource->conf_update);
4040                         old_disk_conf = device->ldev->disk_conf;
4041                         *new_disk_conf = *old_disk_conf;
4042                         new_disk_conf->disk_size = p_usize;
4043
4044                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4045                         mutex_unlock(&connection->resource->conf_update);
4046                         synchronize_rcu();
4047                         kfree(old_disk_conf);
4048
4049                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
4050                                  (unsigned long)my_usize);
4051                 }
4052
4053                 put_ldev(device);
4054         }
4055
4056         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4057         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4058            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4059            drbd_reconsider_queue_parameters(), we can be sure that after
4060            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4061
4062         ddsf = be16_to_cpu(p->dds_flags);
4063         if (get_ldev(device)) {
4064                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4065                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4066                 put_ldev(device);
4067                 if (dd == DS_ERROR)
4068                         return -EIO;
4069                 drbd_md_sync(device);
4070         } else {
4071                 /*
4072                  * I am diskless, need to accept the peer's *current* size.
4073                  * I must NOT accept the peers backing disk size,
4074                  * it may have been larger than mine all along...
4075                  *
4076                  * At this point, the peer knows more about my disk, or at
4077                  * least about what we last agreed upon, than myself.
4078                  * So if his c_size is less than his d_size, the most likely
4079                  * reason is that *my* d_size was smaller last time we checked.
4080                  *
4081                  * However, if he sends a zero current size,
4082                  * take his (user-capped or) backing disk size anyways.
4083                  *
4084                  * Unless of course he does not have a disk himself.
4085                  * In which case we ignore this completely.
4086                  */
4087                 sector_t new_size = p_csize ?: p_usize ?: p_size;
4088                 drbd_reconsider_queue_parameters(device, NULL, o);
4089                 if (new_size == 0) {
4090                         /* Ignore, peer does not know nothing. */
4091                 } else if (new_size == cur_size) {
4092                         /* nothing to do */
4093                 } else if (cur_size != 0 && p_size == 0) {
4094                         drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4095                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4096                 } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4097                         drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4098                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4099                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4100                         return -EIO;
4101                 } else {
4102                         /* I believe the peer, if
4103                          *  - I don't have a current size myself
4104                          *  - we agree on the size anyways
4105                          *  - I do have a current size, am Secondary,
4106                          *    and he has the only disk
4107                          *  - I do have a current size, am Primary,
4108                          *    and he has the only disk,
4109                          *    which is larger than my current size
4110                          */
4111                         drbd_set_my_capacity(device, new_size);
4112                 }
4113         }
4114
4115         if (get_ldev(device)) {
4116                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4117                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4118                         ldsc = 1;
4119                 }
4120
4121                 put_ldev(device);
4122         }
4123
4124         if (device->state.conn > C_WF_REPORT_PARAMS) {
4125                 if (be64_to_cpu(p->c_size) !=
4126                     drbd_get_capacity(device->this_bdev) || ldsc) {
4127                         /* we have different sizes, probably peer
4128                          * needs to know my new size... */
4129                         drbd_send_sizes(peer_device, 0, ddsf);
4130                 }
4131                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4132                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4133                         if (device->state.pdsk >= D_INCONSISTENT &&
4134                             device->state.disk >= D_INCONSISTENT) {
4135                                 if (ddsf & DDSF_NO_RESYNC)
4136                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4137                                 else
4138                                         resync_after_online_grow(device);
4139                         } else
4140                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4141                 }
4142         }
4143
4144         return 0;
4145 }
4146
4147 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4148 {
4149         struct drbd_peer_device *peer_device;
4150         struct drbd_device *device;
4151         struct p_uuids *p = pi->data;
4152         u64 *p_uuid;
4153         int i, updated_uuids = 0;
4154
4155         peer_device = conn_peer_device(connection, pi->vnr);
4156         if (!peer_device)
4157                 return config_unknown_volume(connection, pi);
4158         device = peer_device->device;
4159
4160         p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4161         if (!p_uuid) {
4162                 drbd_err(device, "kmalloc of p_uuid failed\n");
4163                 return false;
4164         }
4165
4166         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4167                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4168
4169         kfree(device->p_uuid);
4170         device->p_uuid = p_uuid;
4171
4172         if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4173             device->state.disk < D_INCONSISTENT &&
4174             device->state.role == R_PRIMARY &&
4175             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4176                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4177                     (unsigned long long)device->ed_uuid);
4178                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4179                 return -EIO;
4180         }
4181
4182         if (get_ldev(device)) {
4183                 int skip_initial_sync =
4184                         device->state.conn == C_CONNECTED &&
4185                         peer_device->connection->agreed_pro_version >= 90 &&
4186                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4187                         (p_uuid[UI_FLAGS] & 8);
4188                 if (skip_initial_sync) {
4189                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4190                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4191                                         "clear_n_write from receive_uuids",
4192                                         BM_LOCKED_TEST_ALLOWED);
4193                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4194                         _drbd_uuid_set(device, UI_BITMAP, 0);
4195                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4196                                         CS_VERBOSE, NULL);
4197                         drbd_md_sync(device);
4198                         updated_uuids = 1;
4199                 }
4200                 put_ldev(device);
4201         } else if (device->state.disk < D_INCONSISTENT &&
4202                    device->state.role == R_PRIMARY) {
4203                 /* I am a diskless primary, the peer just created a new current UUID
4204                    for me. */
4205                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4206         }
4207
4208         /* Before we test for the disk state, we should wait until an eventually
4209            ongoing cluster wide state change is finished. That is important if
4210            we are primary and are detaching from our disk. We need to see the
4211            new disk state... */
4212         mutex_lock(device->state_mutex);
4213         mutex_unlock(device->state_mutex);
4214         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4215                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4216
4217         if (updated_uuids)
4218                 drbd_print_uuids(device, "receiver updated UUIDs to");
4219
4220         return 0;
4221 }
4222
4223 /**
4224  * convert_state() - Converts the peer's view of the cluster state to our point of view
4225  * @ps:         The state as seen by the peer.
4226  */
4227 static union drbd_state convert_state(union drbd_state ps)
4228 {
4229         union drbd_state ms;
4230
4231         static enum drbd_conns c_tab[] = {
4232                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4233                 [C_CONNECTED] = C_CONNECTED,
4234
4235                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4236                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4237                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4238                 [C_VERIFY_S]       = C_VERIFY_T,
4239                 [C_MASK]   = C_MASK,
4240         };
4241
4242         ms.i = ps.i;
4243
4244         ms.conn = c_tab[ps.conn];
4245         ms.peer = ps.role;
4246         ms.role = ps.peer;
4247         ms.pdsk = ps.disk;
4248         ms.disk = ps.pdsk;
4249         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4250
4251         return ms;
4252 }
4253
4254 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4255 {
4256         struct drbd_peer_device *peer_device;
4257         struct drbd_device *device;
4258         struct p_req_state *p = pi->data;
4259         union drbd_state mask, val;
4260         enum drbd_state_rv rv;
4261
4262         peer_device = conn_peer_device(connection, pi->vnr);
4263         if (!peer_device)
4264                 return -EIO;
4265         device = peer_device->device;
4266
4267         mask.i = be32_to_cpu(p->mask);
4268         val.i = be32_to_cpu(p->val);
4269
4270         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4271             mutex_is_locked(device->state_mutex)) {
4272                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4273                 return 0;
4274         }
4275
4276         mask = convert_state(mask);
4277         val = convert_state(val);
4278
4279         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4280         drbd_send_sr_reply(peer_device, rv);
4281
4282         drbd_md_sync(device);
4283
4284         return 0;
4285 }
4286
4287 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4288 {
4289         struct p_req_state *p = pi->data;
4290         union drbd_state mask, val;
4291         enum drbd_state_rv rv;
4292
4293         mask.i = be32_to_cpu(p->mask);
4294         val.i = be32_to_cpu(p->val);
4295
4296         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4297             mutex_is_locked(&connection->cstate_mutex)) {
4298                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4299                 return 0;
4300         }
4301
4302         mask = convert_state(mask);
4303         val = convert_state(val);
4304
4305         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4306         conn_send_sr_reply(connection, rv);
4307
4308         return 0;
4309 }
4310
4311 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4312 {
4313         struct drbd_peer_device *peer_device;
4314         struct drbd_device *device;
4315         struct p_state *p = pi->data;
4316         union drbd_state os, ns, peer_state;
4317         enum drbd_disk_state real_peer_disk;
4318         enum chg_state_flags cs_flags;
4319         int rv;
4320
4321         peer_device = conn_peer_device(connection, pi->vnr);
4322         if (!peer_device)
4323                 return config_unknown_volume(connection, pi);
4324         device = peer_device->device;
4325
4326         peer_state.i = be32_to_cpu(p->state);
4327
4328         real_peer_disk = peer_state.disk;
4329         if (peer_state.disk == D_NEGOTIATING) {
4330                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4331                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4332         }
4333
4334         spin_lock_irq(&device->resource->req_lock);
4335  retry:
4336         os = ns = drbd_read_state(device);
4337         spin_unlock_irq(&device->resource->req_lock);
4338
4339         /* If some other part of the code (ack_receiver thread, timeout)
4340          * already decided to close the connection again,
4341          * we must not "re-establish" it here. */
4342         if (os.conn <= C_TEAR_DOWN)
4343                 return -ECONNRESET;
4344
4345         /* If this is the "end of sync" confirmation, usually the peer disk
4346          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4347          * set) resync started in PausedSyncT, or if the timing of pause-/
4348          * unpause-sync events has been "just right", the peer disk may
4349          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4350          */
4351         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4352             real_peer_disk == D_UP_TO_DATE &&
4353             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4354                 /* If we are (becoming) SyncSource, but peer is still in sync
4355                  * preparation, ignore its uptodate-ness to avoid flapping, it
4356                  * will change to inconsistent once the peer reaches active
4357                  * syncing states.
4358                  * It may have changed syncer-paused flags, however, so we
4359                  * cannot ignore this completely. */
4360                 if (peer_state.conn > C_CONNECTED &&
4361                     peer_state.conn < C_SYNC_SOURCE)
4362                         real_peer_disk = D_INCONSISTENT;
4363
4364                 /* if peer_state changes to connected at the same time,
4365                  * it explicitly notifies us that it finished resync.
4366                  * Maybe we should finish it up, too? */
4367                 else if (os.conn >= C_SYNC_SOURCE &&
4368                          peer_state.conn == C_CONNECTED) {
4369                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4370                                 drbd_resync_finished(device);
4371                         return 0;
4372                 }
4373         }
4374
4375         /* explicit verify finished notification, stop sector reached. */
4376         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4377             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4378                 ov_out_of_sync_print(device);
4379                 drbd_resync_finished(device);
4380                 return 0;
4381         }
4382
4383         /* peer says his disk is inconsistent, while we think it is uptodate,
4384          * and this happens while the peer still thinks we have a sync going on,
4385          * but we think we are already done with the sync.
4386          * We ignore this to avoid flapping pdsk.
4387          * This should not happen, if the peer is a recent version of drbd. */
4388         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4389             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4390                 real_peer_disk = D_UP_TO_DATE;
4391
4392         if (ns.conn == C_WF_REPORT_PARAMS)
4393                 ns.conn = C_CONNECTED;
4394
4395         if (peer_state.conn == C_AHEAD)
4396                 ns.conn = C_BEHIND;
4397
4398         /* TODO:
4399          * if (primary and diskless and peer uuid != effective uuid)
4400          *     abort attach on peer;
4401          *
4402          * If this node does not have good data, was already connected, but
4403          * the peer did a late attach only now, trying to "negotiate" with me,
4404          * AND I am currently Primary, possibly frozen, with some specific
4405          * "effective" uuid, this should never be reached, really, because
4406          * we first send the uuids, then the current state.
4407          *
4408          * In this scenario, we already dropped the connection hard
4409          * when we received the unsuitable uuids (receive_uuids().
4410          *
4411          * Should we want to change this, that is: not drop the connection in
4412          * receive_uuids() already, then we would need to add a branch here
4413          * that aborts the attach of "unsuitable uuids" on the peer in case
4414          * this node is currently Diskless Primary.
4415          */
4416
4417         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4418             get_ldev_if_state(device, D_NEGOTIATING)) {
4419                 int cr; /* consider resync */
4420
4421                 /* if we established a new connection */
4422                 cr  = (os.conn < C_CONNECTED);
4423                 /* if we had an established connection
4424                  * and one of the nodes newly attaches a disk */
4425                 cr |= (os.conn == C_CONNECTED &&
4426                        (peer_state.disk == D_NEGOTIATING ||
4427                         os.disk == D_NEGOTIATING));
4428                 /* if we have both been inconsistent, and the peer has been
4429                  * forced to be UpToDate with --overwrite-data */
4430                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4431                 /* if we had been plain connected, and the admin requested to
4432                  * start a sync by "invalidate" or "invalidate-remote" */
4433                 cr |= (os.conn == C_CONNECTED &&
4434                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4435                                  peer_state.conn <= C_WF_BITMAP_T));
4436
4437                 if (cr)
4438                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4439
4440                 put_ldev(device);
4441                 if (ns.conn == C_MASK) {
4442                         ns.conn = C_CONNECTED;
4443                         if (device->state.disk == D_NEGOTIATING) {
4444                                 drbd_force_state(device, NS(disk, D_FAILED));
4445                         } else if (peer_state.disk == D_NEGOTIATING) {
4446                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4447                                 peer_state.disk = D_DISKLESS;
4448                                 real_peer_disk = D_DISKLESS;
4449                         } else {
4450                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4451                                         return -EIO;
4452                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4453                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4454                                 return -EIO;
4455                         }
4456                 }
4457         }
4458
4459         spin_lock_irq(&device->resource->req_lock);
4460         if (os.i != drbd_read_state(device).i)
4461                 goto retry;
4462         clear_bit(CONSIDER_RESYNC, &device->flags);
4463         ns.peer = peer_state.role;
4464         ns.pdsk = real_peer_disk;
4465         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4466         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4467                 ns.disk = device->new_state_tmp.disk;
4468         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4469         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4470             test_bit(NEW_CUR_UUID, &device->flags)) {
4471                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4472                    for temporal network outages! */
4473                 spin_unlock_irq(&device->resource->req_lock);
4474                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4475                 tl_clear(peer_device->connection);
4476                 drbd_uuid_new_current(device);
4477                 clear_bit(NEW_CUR_UUID, &device->flags);
4478                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4479                 return -EIO;
4480         }
4481         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4482         ns = drbd_read_state(device);
4483         spin_unlock_irq(&device->resource->req_lock);
4484
4485         if (rv < SS_SUCCESS) {
4486                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4487                 return -EIO;
4488         }
4489
4490         if (os.conn > C_WF_REPORT_PARAMS) {
4491                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4492                     peer_state.disk != D_NEGOTIATING ) {
4493                         /* we want resync, peer has not yet decided to sync... */
4494                         /* Nowadays only used when forcing a node into primary role and
4495                            setting its disk to UpToDate with that */
4496                         drbd_send_uuids(peer_device);
4497                         drbd_send_current_state(peer_device);
4498                 }
4499         }
4500
4501         clear_bit(DISCARD_MY_DATA, &device->flags);
4502
4503         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4504
4505         return 0;
4506 }
4507
4508 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4509 {
4510         struct drbd_peer_device *peer_device;
4511         struct drbd_device *device;
4512         struct p_rs_uuid *p = pi->data;
4513
4514         peer_device = conn_peer_device(connection, pi->vnr);
4515         if (!peer_device)
4516                 return -EIO;
4517         device = peer_device->device;
4518
4519         wait_event(device->misc_wait,
4520                    device->state.conn == C_WF_SYNC_UUID ||
4521                    device->state.conn == C_BEHIND ||
4522                    device->state.conn < C_CONNECTED ||
4523                    device->state.disk < D_NEGOTIATING);
4524
4525         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4526
4527         /* Here the _drbd_uuid_ functions are right, current should
4528            _not_ be rotated into the history */
4529         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4530                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4531                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4532
4533                 drbd_print_uuids(device, "updated sync uuid");
4534                 drbd_start_resync(device, C_SYNC_TARGET);
4535
4536                 put_ldev(device);
4537         } else
4538                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4539
4540         return 0;
4541 }
4542
4543 /**
4544  * receive_bitmap_plain
4545  *
4546  * Return 0 when done, 1 when another iteration is needed, and a negative error
4547  * code upon failure.
4548  */
4549 static int
4550 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4551                      unsigned long *p, struct bm_xfer_ctx *c)
4552 {
4553         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4554                                  drbd_header_size(peer_device->connection);
4555         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4556                                        c->bm_words - c->word_offset);
4557         unsigned int want = num_words * sizeof(*p);
4558         int err;
4559
4560         if (want != size) {
4561                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4562                 return -EIO;
4563         }
4564         if (want == 0)
4565                 return 0;
4566         err = drbd_recv_all(peer_device->connection, p, want);
4567         if (err)
4568                 return err;
4569
4570         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4571
4572         c->word_offset += num_words;
4573         c->bit_offset = c->word_offset * BITS_PER_LONG;
4574         if (c->bit_offset > c->bm_bits)
4575                 c->bit_offset = c->bm_bits;
4576
4577         return 1;
4578 }
4579
4580 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4581 {
4582         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4583 }
4584
4585 static int dcbp_get_start(struct p_compressed_bm *p)
4586 {
4587         return (p->encoding & 0x80) != 0;
4588 }
4589
4590 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4591 {
4592         return (p->encoding >> 4) & 0x7;
4593 }
4594
4595 /**
4596  * recv_bm_rle_bits
4597  *
4598  * Return 0 when done, 1 when another iteration is needed, and a negative error
4599  * code upon failure.
4600  */
4601 static int
4602 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4603                 struct p_compressed_bm *p,
4604                  struct bm_xfer_ctx *c,
4605                  unsigned int len)
4606 {
4607         struct bitstream bs;
4608         u64 look_ahead;
4609         u64 rl;
4610         u64 tmp;
4611         unsigned long s = c->bit_offset;
4612         unsigned long e;
4613         int toggle = dcbp_get_start(p);
4614         int have;
4615         int bits;
4616
4617         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4618
4619         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4620         if (bits < 0)
4621                 return -EIO;
4622
4623         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4624                 bits = vli_decode_bits(&rl, look_ahead);
4625                 if (bits <= 0)
4626                         return -EIO;
4627
4628                 if (toggle) {
4629                         e = s + rl -1;
4630                         if (e >= c->bm_bits) {
4631                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4632                                 return -EIO;
4633                         }
4634                         _drbd_bm_set_bits(peer_device->device, s, e);
4635                 }
4636
4637                 if (have < bits) {
4638                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4639                                 have, bits, look_ahead,
4640                                 (unsigned int)(bs.cur.b - p->code),
4641                                 (unsigned int)bs.buf_len);
4642                         return -EIO;
4643                 }
4644                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4645                 if (likely(bits < 64))
4646                         look_ahead >>= bits;
4647                 else
4648                         look_ahead = 0;
4649                 have -= bits;
4650
4651                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4652                 if (bits < 0)
4653                         return -EIO;
4654                 look_ahead |= tmp << have;
4655                 have += bits;
4656         }
4657
4658         c->bit_offset = s;
4659         bm_xfer_ctx_bit_to_word_offset(c);
4660
4661         return (s != c->bm_bits);
4662 }
4663
4664 /**
4665  * decode_bitmap_c
4666  *
4667  * Return 0 when done, 1 when another iteration is needed, and a negative error
4668  * code upon failure.
4669  */
4670 static int
4671 decode_bitmap_c(struct drbd_peer_device *peer_device,
4672                 struct p_compressed_bm *p,
4673                 struct bm_xfer_ctx *c,
4674                 unsigned int len)
4675 {
4676         if (dcbp_get_code(p) == RLE_VLI_Bits)
4677                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4678
4679         /* other variants had been implemented for evaluation,
4680          * but have been dropped as this one turned out to be "best"
4681          * during all our tests. */
4682
4683         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4684         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4685         return -EIO;
4686 }
4687
4688 void INFO_bm_xfer_stats(struct drbd_device *device,
4689                 const char *direction, struct bm_xfer_ctx *c)
4690 {
4691         /* what would it take to transfer it "plaintext" */
4692         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4693         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4694         unsigned int plain =
4695                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4696                 c->bm_words * sizeof(unsigned long);
4697         unsigned int total = c->bytes[0] + c->bytes[1];
4698         unsigned int r;
4699
4700         /* total can not be zero. but just in case: */
4701         if (total == 0)
4702                 return;
4703
4704         /* don't report if not compressed */
4705         if (total >= plain)
4706                 return;
4707
4708         /* total < plain. check for overflow, still */
4709         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4710                                     : (1000 * total / plain);
4711
4712         if (r > 1000)
4713                 r = 1000;
4714
4715         r = 1000 - r;
4716         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4717              "total %u; compression: %u.%u%%\n",
4718                         direction,
4719                         c->bytes[1], c->packets[1],
4720                         c->bytes[0], c->packets[0],
4721                         total, r/10, r % 10);
4722 }
4723
4724 /* Since we are processing the bitfield from lower addresses to higher,
4725    it does not matter if the process it in 32 bit chunks or 64 bit
4726    chunks as long as it is little endian. (Understand it as byte stream,
4727    beginning with the lowest byte...) If we would use big endian
4728    we would need to process it from the highest address to the lowest,
4729    in order to be agnostic to the 32 vs 64 bits issue.
4730
4731    returns 0 on failure, 1 if we successfully received it. */
4732 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4733 {
4734         struct drbd_peer_device *peer_device;
4735         struct drbd_device *device;
4736         struct bm_xfer_ctx c;
4737         int err;
4738
4739         peer_device = conn_peer_device(connection, pi->vnr);
4740         if (!peer_device)
4741                 return -EIO;
4742         device = peer_device->device;
4743
4744         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4745         /* you are supposed to send additional out-of-sync information
4746          * if you actually set bits during this phase */
4747
4748         c = (struct bm_xfer_ctx) {
4749                 .bm_bits = drbd_bm_bits(device),
4750                 .bm_words = drbd_bm_words(device),
4751         };
4752
4753         for(;;) {
4754                 if (pi->cmd == P_BITMAP)
4755                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4756                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4757                         /* MAYBE: sanity check that we speak proto >= 90,
4758                          * and the feature is enabled! */
4759                         struct p_compressed_bm *p = pi->data;
4760
4761                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4762                                 drbd_err(device, "ReportCBitmap packet too large\n");
4763                                 err = -EIO;
4764                                 goto out;
4765                         }
4766                         if (pi->size <= sizeof(*p)) {
4767                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4768                                 err = -EIO;
4769                                 goto out;
4770                         }
4771                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4772                         if (err)
4773                                goto out;
4774                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4775                 } else {
4776                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4777                         err = -EIO;
4778                         goto out;
4779                 }
4780
4781                 c.packets[pi->cmd == P_BITMAP]++;
4782                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4783
4784                 if (err <= 0) {
4785                         if (err < 0)
4786                                 goto out;
4787                         break;
4788                 }
4789                 err = drbd_recv_header(peer_device->connection, pi);
4790                 if (err)
4791                         goto out;
4792         }
4793
4794         INFO_bm_xfer_stats(device, "receive", &c);
4795
4796         if (device->state.conn == C_WF_BITMAP_T) {
4797                 enum drbd_state_rv rv;
4798
4799                 err = drbd_send_bitmap(device);
4800                 if (err)
4801                         goto out;
4802                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4803                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4804                 D_ASSERT(device, rv == SS_SUCCESS);
4805         } else if (device->state.conn != C_WF_BITMAP_S) {
4806                 /* admin may have requested C_DISCONNECTING,
4807                  * other threads may have noticed network errors */
4808                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4809                     drbd_conn_str(device->state.conn));
4810         }
4811         err = 0;
4812
4813  out:
4814         drbd_bm_unlock(device);
4815         if (!err && device->state.conn == C_WF_BITMAP_S)
4816                 drbd_start_resync(device, C_SYNC_SOURCE);
4817         return err;
4818 }
4819
4820 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4821 {
4822         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4823                  pi->cmd, pi->size);
4824
4825         return ignore_remaining_packet(connection, pi);
4826 }
4827
4828 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4829 {
4830         /* Make sure we've acked all the TCP data associated
4831          * with the data requests being unplugged */
4832         drbd_tcp_quickack(connection->data.socket);
4833
4834         return 0;
4835 }
4836
4837 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4838 {
4839         struct drbd_peer_device *peer_device;
4840         struct drbd_device *device;
4841         struct p_block_desc *p = pi->data;
4842
4843         peer_device = conn_peer_device(connection, pi->vnr);
4844         if (!peer_device)
4845                 return -EIO;
4846         device = peer_device->device;
4847
4848         switch (device->state.conn) {
4849         case C_WF_SYNC_UUID:
4850         case C_WF_BITMAP_T:
4851         case C_BEHIND:
4852                         break;
4853         default:
4854                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4855                                 drbd_conn_str(device->state.conn));
4856         }
4857
4858         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4859
4860         return 0;
4861 }
4862
4863 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4864 {
4865         struct drbd_peer_device *peer_device;
4866         struct p_block_desc *p = pi->data;
4867         struct drbd_device *device;
4868         sector_t sector;
4869         int size, err = 0;
4870
4871         peer_device = conn_peer_device(connection, pi->vnr);
4872         if (!peer_device)
4873                 return -EIO;
4874         device = peer_device->device;
4875
4876         sector = be64_to_cpu(p->sector);
4877         size = be32_to_cpu(p->blksize);
4878
4879         dec_rs_pending(device);
4880
4881         if (get_ldev(device)) {
4882                 struct drbd_peer_request *peer_req;
4883                 const int op = REQ_OP_WRITE_ZEROES;
4884
4885                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4886                                                size, 0, GFP_NOIO);
4887                 if (!peer_req) {
4888                         put_ldev(device);
4889                         return -ENOMEM;
4890                 }
4891
4892                 peer_req->w.cb = e_end_resync_block;
4893                 peer_req->submit_jif = jiffies;
4894                 peer_req->flags |= EE_IS_TRIM;
4895
4896                 spin_lock_irq(&device->resource->req_lock);
4897                 list_add_tail(&peer_req->w.list, &device->sync_ee);
4898                 spin_unlock_irq(&device->resource->req_lock);
4899
4900                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4901                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4902
4903                 if (err) {
4904                         spin_lock_irq(&device->resource->req_lock);
4905                         list_del(&peer_req->w.list);
4906                         spin_unlock_irq(&device->resource->req_lock);
4907
4908                         drbd_free_peer_req(device, peer_req);
4909                         put_ldev(device);
4910                         err = 0;
4911                         goto fail;
4912                 }
4913
4914                 inc_unacked(device);
4915
4916                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4917                    as well as drbd_rs_complete_io() */
4918         } else {
4919         fail:
4920                 drbd_rs_complete_io(device, sector);
4921                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4922         }
4923
4924         atomic_add(size >> 9, &device->rs_sect_in);
4925
4926         return err;
4927 }
4928
4929 struct data_cmd {
4930         int expect_payload;
4931         unsigned int pkt_size;
4932         int (*fn)(struct drbd_connection *, struct packet_info *);
4933 };
4934
4935 static struct data_cmd drbd_cmd_handler[] = {
4936         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4937         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4938         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4939         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4940         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4941         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4942         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4943         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4944         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4945         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4946         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4947         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4948         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4949         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4950         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4951         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4952         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4953         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4954         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4955         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4956         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4957         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4958         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4959         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4960         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4961         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4962         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4963         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
4964 };
4965
4966 static void drbdd(struct drbd_connection *connection)
4967 {
4968         struct packet_info pi;
4969         size_t shs; /* sub header size */
4970         int err;
4971
4972         while (get_t_state(&connection->receiver) == RUNNING) {
4973                 struct data_cmd const *cmd;
4974
4975                 drbd_thread_current_set_cpu(&connection->receiver);
4976                 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4977                 if (drbd_recv_header_maybe_unplug(connection, &pi))
4978                         goto err_out;
4979
4980                 cmd = &drbd_cmd_handler[pi.cmd];
4981                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4982                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4983                                  cmdname(pi.cmd), pi.cmd);
4984                         goto err_out;
4985                 }
4986
4987                 shs = cmd->pkt_size;
4988                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4989                         shs += sizeof(struct o_qlim);
4990                 if (pi.size > shs && !cmd->expect_payload) {
4991                         drbd_err(connection, "No payload expected %s l:%d\n",
4992                                  cmdname(pi.cmd), pi.size);
4993                         goto err_out;
4994                 }
4995                 if (pi.size < shs) {
4996                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4997                                  cmdname(pi.cmd), (int)shs, pi.size);
4998                         goto err_out;
4999                 }
5000
5001                 if (shs) {
5002                         update_receiver_timing_details(connection, drbd_recv_all_warn);
5003                         err = drbd_recv_all_warn(connection, pi.data, shs);
5004                         if (err)
5005                                 goto err_out;
5006                         pi.size -= shs;
5007                 }
5008
5009                 update_receiver_timing_details(connection, cmd->fn);
5010                 err = cmd->fn(connection, &pi);
5011                 if (err) {
5012                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5013                                  cmdname(pi.cmd), err, pi.size);
5014                         goto err_out;
5015                 }
5016         }
5017         return;
5018
5019     err_out:
5020         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5021 }
5022
5023 static void conn_disconnect(struct drbd_connection *connection)
5024 {
5025         struct drbd_peer_device *peer_device;
5026         enum drbd_conns oc;
5027         int vnr;
5028
5029         if (connection->cstate == C_STANDALONE)
5030                 return;
5031
5032         /* We are about to start the cleanup after connection loss.
5033          * Make sure drbd_make_request knows about that.
5034          * Usually we should be in some network failure state already,
5035          * but just in case we are not, we fix it up here.
5036          */
5037         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5038
5039         /* ack_receiver does not clean up anything. it must not interfere, either */
5040         drbd_thread_stop(&connection->ack_receiver);
5041         if (connection->ack_sender) {
5042                 destroy_workqueue(connection->ack_sender);
5043                 connection->ack_sender = NULL;
5044         }
5045         drbd_free_sock(connection);
5046
5047         rcu_read_lock();
5048         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5049                 struct drbd_device *device = peer_device->device;
5050                 kref_get(&device->kref);
5051                 rcu_read_unlock();
5052                 drbd_disconnected(peer_device);
5053                 kref_put(&device->kref, drbd_destroy_device);
5054                 rcu_read_lock();
5055         }
5056         rcu_read_unlock();
5057
5058         if (!list_empty(&connection->current_epoch->list))
5059                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5060         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5061         atomic_set(&connection->current_epoch->epoch_size, 0);
5062         connection->send.seen_any_write_yet = false;
5063
5064         drbd_info(connection, "Connection closed\n");
5065
5066         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5067                 conn_try_outdate_peer_async(connection);
5068
5069         spin_lock_irq(&connection->resource->req_lock);
5070         oc = connection->cstate;
5071         if (oc >= C_UNCONNECTED)
5072                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5073
5074         spin_unlock_irq(&connection->resource->req_lock);
5075
5076         if (oc == C_DISCONNECTING)
5077                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5078 }
5079
5080 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5081 {
5082         struct drbd_device *device = peer_device->device;
5083         unsigned int i;
5084
5085         /* wait for current activity to cease. */
5086         spin_lock_irq(&device->resource->req_lock);
5087         _drbd_wait_ee_list_empty(device, &device->active_ee);
5088         _drbd_wait_ee_list_empty(device, &device->sync_ee);
5089         _drbd_wait_ee_list_empty(device, &device->read_ee);
5090         spin_unlock_irq(&device->resource->req_lock);
5091
5092         /* We do not have data structures that would allow us to
5093          * get the rs_pending_cnt down to 0 again.
5094          *  * On C_SYNC_TARGET we do not have any data structures describing
5095          *    the pending RSDataRequest's we have sent.
5096          *  * On C_SYNC_SOURCE there is no data structure that tracks
5097          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5098          *  And no, it is not the sum of the reference counts in the
5099          *  resync_LRU. The resync_LRU tracks the whole operation including
5100          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5101          *  on the fly. */
5102         drbd_rs_cancel_all(device);
5103         device->rs_total = 0;
5104         device->rs_failed = 0;
5105         atomic_set(&device->rs_pending_cnt, 0);
5106         wake_up(&device->misc_wait);
5107
5108         del_timer_sync(&device->resync_timer);
5109         resync_timer_fn(&device->resync_timer);
5110
5111         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5112          * w_make_resync_request etc. which may still be on the worker queue
5113          * to be "canceled" */
5114         drbd_flush_workqueue(&peer_device->connection->sender_work);
5115
5116         drbd_finish_peer_reqs(device);
5117
5118         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5119            might have issued a work again. The one before drbd_finish_peer_reqs() is
5120            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5121         drbd_flush_workqueue(&peer_device->connection->sender_work);
5122
5123         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5124          * again via drbd_try_clear_on_disk_bm(). */
5125         drbd_rs_cancel_all(device);
5126
5127         kfree(device->p_uuid);
5128         device->p_uuid = NULL;
5129
5130         if (!drbd_suspended(device))
5131                 tl_clear(peer_device->connection);
5132
5133         drbd_md_sync(device);
5134
5135         if (get_ldev(device)) {
5136                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5137                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5138                 put_ldev(device);
5139         }
5140
5141         /* tcp_close and release of sendpage pages can be deferred.  I don't
5142          * want to use SO_LINGER, because apparently it can be deferred for
5143          * more than 20 seconds (longest time I checked).
5144          *
5145          * Actually we don't care for exactly when the network stack does its
5146          * put_page(), but release our reference on these pages right here.
5147          */
5148         i = drbd_free_peer_reqs(device, &device->net_ee);
5149         if (i)
5150                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5151         i = atomic_read(&device->pp_in_use_by_net);
5152         if (i)
5153                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5154         i = atomic_read(&device->pp_in_use);
5155         if (i)
5156                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5157
5158         D_ASSERT(device, list_empty(&device->read_ee));
5159         D_ASSERT(device, list_empty(&device->active_ee));
5160         D_ASSERT(device, list_empty(&device->sync_ee));
5161         D_ASSERT(device, list_empty(&device->done_ee));
5162
5163         return 0;
5164 }
5165
5166 /*
5167  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5168  * we can agree on is stored in agreed_pro_version.
5169  *
5170  * feature flags and the reserved array should be enough room for future
5171  * enhancements of the handshake protocol, and possible plugins...
5172  *
5173  * for now, they are expected to be zero, but ignored.
5174  */
5175 static int drbd_send_features(struct drbd_connection *connection)
5176 {
5177         struct drbd_socket *sock;
5178         struct p_connection_features *p;
5179
5180         sock = &connection->data;
5181         p = conn_prepare_command(connection, sock);
5182         if (!p)
5183                 return -EIO;
5184         memset(p, 0, sizeof(*p));
5185         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5186         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5187         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5188         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5189 }
5190
5191 /*
5192  * return values:
5193  *   1 yes, we have a valid connection
5194  *   0 oops, did not work out, please try again
5195  *  -1 peer talks different language,
5196  *     no point in trying again, please go standalone.
5197  */
5198 static int drbd_do_features(struct drbd_connection *connection)
5199 {
5200         /* ASSERT current == connection->receiver ... */
5201         struct p_connection_features *p;
5202         const int expect = sizeof(struct p_connection_features);
5203         struct packet_info pi;
5204         int err;
5205
5206         err = drbd_send_features(connection);
5207         if (err)
5208                 return 0;
5209
5210         err = drbd_recv_header(connection, &pi);
5211         if (err)
5212                 return 0;
5213
5214         if (pi.cmd != P_CONNECTION_FEATURES) {
5215                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5216                          cmdname(pi.cmd), pi.cmd);
5217                 return -1;
5218         }
5219
5220         if (pi.size != expect) {
5221                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5222                      expect, pi.size);
5223                 return -1;
5224         }
5225
5226         p = pi.data;
5227         err = drbd_recv_all_warn(connection, p, expect);
5228         if (err)
5229                 return 0;
5230
5231         p->protocol_min = be32_to_cpu(p->protocol_min);
5232         p->protocol_max = be32_to_cpu(p->protocol_max);
5233         if (p->protocol_max == 0)
5234                 p->protocol_max = p->protocol_min;
5235
5236         if (PRO_VERSION_MAX < p->protocol_min ||
5237             PRO_VERSION_MIN > p->protocol_max)
5238                 goto incompat;
5239
5240         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5241         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5242
5243         drbd_info(connection, "Handshake successful: "
5244              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5245
5246         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5247                   connection->agreed_features,
5248                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5249                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5250                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5251                   connection->agreed_features ? "" : " none");
5252
5253         return 1;
5254
5255  incompat:
5256         drbd_err(connection, "incompatible DRBD dialects: "
5257             "I support %d-%d, peer supports %d-%d\n",
5258             PRO_VERSION_MIN, PRO_VERSION_MAX,
5259             p->protocol_min, p->protocol_max);
5260         return -1;
5261 }
5262
5263 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5264 static int drbd_do_auth(struct drbd_connection *connection)
5265 {
5266         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5267         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5268         return -1;
5269 }
5270 #else
5271 #define CHALLENGE_LEN 64
5272
5273 /* Return value:
5274         1 - auth succeeded,
5275         0 - failed, try again (network error),
5276         -1 - auth failed, don't try again.
5277 */
5278
5279 static int drbd_do_auth(struct drbd_connection *connection)
5280 {
5281         struct drbd_socket *sock;
5282         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5283         char *response = NULL;
5284         char *right_response = NULL;
5285         char *peers_ch = NULL;
5286         unsigned int key_len;
5287         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5288         unsigned int resp_size;
5289         struct shash_desc *desc;
5290         struct packet_info pi;
5291         struct net_conf *nc;
5292         int err, rv;
5293
5294         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5295
5296         rcu_read_lock();
5297         nc = rcu_dereference(connection->net_conf);
5298         key_len = strlen(nc->shared_secret);
5299         memcpy(secret, nc->shared_secret, key_len);
5300         rcu_read_unlock();
5301
5302         desc = kmalloc(sizeof(struct shash_desc) +
5303                        crypto_shash_descsize(connection->cram_hmac_tfm),
5304                        GFP_KERNEL);
5305         if (!desc) {
5306                 rv = -1;
5307                 goto fail;
5308         }
5309         desc->tfm = connection->cram_hmac_tfm;
5310         desc->flags = 0;
5311
5312         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5313         if (rv) {
5314                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5315                 rv = -1;
5316                 goto fail;
5317         }
5318
5319         get_random_bytes(my_challenge, CHALLENGE_LEN);
5320
5321         sock = &connection->data;
5322         if (!conn_prepare_command(connection, sock)) {
5323                 rv = 0;
5324                 goto fail;
5325         }
5326         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5327                                 my_challenge, CHALLENGE_LEN);
5328         if (!rv)
5329                 goto fail;
5330
5331         err = drbd_recv_header(connection, &pi);
5332         if (err) {
5333                 rv = 0;
5334                 goto fail;
5335         }
5336
5337         if (pi.cmd != P_AUTH_CHALLENGE) {
5338                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5339                          cmdname(pi.cmd), pi.cmd);
5340                 rv = 0;
5341                 goto fail;
5342         }
5343
5344         if (pi.size > CHALLENGE_LEN * 2) {
5345                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5346                 rv = -1;
5347                 goto fail;
5348         }
5349
5350         if (pi.size < CHALLENGE_LEN) {
5351                 drbd_err(connection, "AuthChallenge payload too small.\n");
5352                 rv = -1;
5353                 goto fail;
5354         }
5355
5356         peers_ch = kmalloc(pi.size, GFP_NOIO);
5357         if (peers_ch == NULL) {
5358                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5359                 rv = -1;
5360                 goto fail;
5361         }
5362
5363         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5364         if (err) {
5365                 rv = 0;
5366                 goto fail;
5367         }
5368
5369         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5370                 drbd_err(connection, "Peer presented the same challenge!\n");
5371                 rv = -1;
5372                 goto fail;
5373         }
5374
5375         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5376         response = kmalloc(resp_size, GFP_NOIO);
5377         if (response == NULL) {
5378                 drbd_err(connection, "kmalloc of response failed\n");
5379                 rv = -1;
5380                 goto fail;
5381         }
5382
5383         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5384         if (rv) {
5385                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5386                 rv = -1;
5387                 goto fail;
5388         }
5389
5390         if (!conn_prepare_command(connection, sock)) {
5391                 rv = 0;
5392                 goto fail;
5393         }
5394         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5395                                 response, resp_size);
5396         if (!rv)
5397                 goto fail;
5398
5399         err = drbd_recv_header(connection, &pi);
5400         if (err) {
5401                 rv = 0;
5402                 goto fail;
5403         }
5404
5405         if (pi.cmd != P_AUTH_RESPONSE) {
5406                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5407                          cmdname(pi.cmd), pi.cmd);
5408                 rv = 0;
5409                 goto fail;
5410         }
5411
5412         if (pi.size != resp_size) {
5413                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5414                 rv = 0;
5415                 goto fail;
5416         }
5417
5418         err = drbd_recv_all_warn(connection, response , resp_size);
5419         if (err) {
5420                 rv = 0;
5421                 goto fail;
5422         }
5423
5424         right_response = kmalloc(resp_size, GFP_NOIO);
5425         if (right_response == NULL) {
5426                 drbd_err(connection, "kmalloc of right_response failed\n");
5427                 rv = -1;
5428                 goto fail;
5429         }
5430
5431         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5432                                  right_response);
5433         if (rv) {
5434                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5435                 rv = -1;
5436                 goto fail;
5437         }
5438
5439         rv = !memcmp(response, right_response, resp_size);
5440
5441         if (rv)
5442                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5443                      resp_size);
5444         else
5445                 rv = -1;
5446
5447  fail:
5448         kfree(peers_ch);
5449         kfree(response);
5450         kfree(right_response);
5451         if (desc) {
5452                 shash_desc_zero(desc);
5453                 kfree(desc);
5454         }
5455
5456         return rv;
5457 }
5458 #endif
5459
5460 int drbd_receiver(struct drbd_thread *thi)
5461 {
5462         struct drbd_connection *connection = thi->connection;
5463         int h;
5464
5465         drbd_info(connection, "receiver (re)started\n");
5466
5467         do {
5468                 h = conn_connect(connection);
5469                 if (h == 0) {
5470                         conn_disconnect(connection);
5471                         schedule_timeout_interruptible(HZ);
5472                 }
5473                 if (h == -1) {
5474                         drbd_warn(connection, "Discarding network configuration.\n");
5475                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5476                 }
5477         } while (h == 0);
5478
5479         if (h > 0) {
5480                 blk_start_plug(&connection->receiver_plug);
5481                 drbdd(connection);
5482                 blk_finish_plug(&connection->receiver_plug);
5483         }
5484
5485         conn_disconnect(connection);
5486
5487         drbd_info(connection, "receiver terminated\n");
5488         return 0;
5489 }
5490
5491 /* ********* acknowledge sender ******** */
5492
5493 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5494 {
5495         struct p_req_state_reply *p = pi->data;
5496         int retcode = be32_to_cpu(p->retcode);
5497
5498         if (retcode >= SS_SUCCESS) {
5499                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5500         } else {
5501                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5502                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5503                          drbd_set_st_err_str(retcode), retcode);
5504         }
5505         wake_up(&connection->ping_wait);
5506
5507         return 0;
5508 }
5509
5510 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5511 {
5512         struct drbd_peer_device *peer_device;
5513         struct drbd_device *device;
5514         struct p_req_state_reply *p = pi->data;
5515         int retcode = be32_to_cpu(p->retcode);
5516
5517         peer_device = conn_peer_device(connection, pi->vnr);
5518         if (!peer_device)
5519                 return -EIO;
5520         device = peer_device->device;
5521
5522         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5523                 D_ASSERT(device, connection->agreed_pro_version < 100);
5524                 return got_conn_RqSReply(connection, pi);
5525         }
5526
5527         if (retcode >= SS_SUCCESS) {
5528                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5529         } else {
5530                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5531                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5532                         drbd_set_st_err_str(retcode), retcode);
5533         }
5534         wake_up(&device->state_wait);
5535
5536         return 0;
5537 }
5538
5539 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5540 {
5541         return drbd_send_ping_ack(connection);
5542
5543 }
5544
5545 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5546 {
5547         /* restore idle timeout */
5548         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5549         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5550                 wake_up(&connection->ping_wait);
5551
5552         return 0;
5553 }
5554
5555 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5556 {
5557         struct drbd_peer_device *peer_device;
5558         struct drbd_device *device;
5559         struct p_block_ack *p = pi->data;
5560         sector_t sector = be64_to_cpu(p->sector);
5561         int blksize = be32_to_cpu(p->blksize);
5562
5563         peer_device = conn_peer_device(connection, pi->vnr);
5564         if (!peer_device)
5565                 return -EIO;
5566         device = peer_device->device;
5567
5568         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5569
5570         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5571
5572         if (get_ldev(device)) {
5573                 drbd_rs_complete_io(device, sector);
5574                 drbd_set_in_sync(device, sector, blksize);
5575                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5576                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5577                 put_ldev(device);
5578         }
5579         dec_rs_pending(device);
5580         atomic_add(blksize >> 9, &device->rs_sect_in);
5581
5582         return 0;
5583 }
5584
5585 static int
5586 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5587                               struct rb_root *root, const char *func,
5588                               enum drbd_req_event what, bool missing_ok)
5589 {
5590         struct drbd_request *req;
5591         struct bio_and_error m;
5592
5593         spin_lock_irq(&device->resource->req_lock);
5594         req = find_request(device, root, id, sector, missing_ok, func);
5595         if (unlikely(!req)) {
5596                 spin_unlock_irq(&device->resource->req_lock);
5597                 return -EIO;
5598         }
5599         __req_mod(req, what, &m);
5600         spin_unlock_irq(&device->resource->req_lock);
5601
5602         if (m.bio)
5603                 complete_master_bio(device, &m);
5604         return 0;
5605 }
5606
5607 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5608 {
5609         struct drbd_peer_device *peer_device;
5610         struct drbd_device *device;
5611         struct p_block_ack *p = pi->data;
5612         sector_t sector = be64_to_cpu(p->sector);
5613         int blksize = be32_to_cpu(p->blksize);
5614         enum drbd_req_event what;
5615
5616         peer_device = conn_peer_device(connection, pi->vnr);
5617         if (!peer_device)
5618                 return -EIO;
5619         device = peer_device->device;
5620
5621         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5622
5623         if (p->block_id == ID_SYNCER) {
5624                 drbd_set_in_sync(device, sector, blksize);
5625                 dec_rs_pending(device);
5626                 return 0;
5627         }
5628         switch (pi->cmd) {
5629         case P_RS_WRITE_ACK:
5630                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5631                 break;
5632         case P_WRITE_ACK:
5633                 what = WRITE_ACKED_BY_PEER;
5634                 break;
5635         case P_RECV_ACK:
5636                 what = RECV_ACKED_BY_PEER;
5637                 break;
5638         case P_SUPERSEDED:
5639                 what = CONFLICT_RESOLVED;
5640                 break;
5641         case P_RETRY_WRITE:
5642                 what = POSTPONE_WRITE;
5643                 break;
5644         default:
5645                 BUG();
5646         }
5647
5648         return validate_req_change_req_state(device, p->block_id, sector,
5649                                              &device->write_requests, __func__,
5650                                              what, false);
5651 }
5652
5653 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5654 {
5655         struct drbd_peer_device *peer_device;
5656         struct drbd_device *device;
5657         struct p_block_ack *p = pi->data;
5658         sector_t sector = be64_to_cpu(p->sector);
5659         int size = be32_to_cpu(p->blksize);
5660         int err;
5661
5662         peer_device = conn_peer_device(connection, pi->vnr);
5663         if (!peer_device)
5664                 return -EIO;
5665         device = peer_device->device;
5666
5667         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5668
5669         if (p->block_id == ID_SYNCER) {
5670                 dec_rs_pending(device);
5671                 drbd_rs_failed_io(device, sector, size);
5672                 return 0;
5673         }
5674
5675         err = validate_req_change_req_state(device, p->block_id, sector,
5676                                             &device->write_requests, __func__,
5677                                             NEG_ACKED, true);
5678         if (err) {
5679                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5680                    The master bio might already be completed, therefore the
5681                    request is no longer in the collision hash. */
5682                 /* In Protocol B we might already have got a P_RECV_ACK
5683                    but then get a P_NEG_ACK afterwards. */
5684                 drbd_set_out_of_sync(device, sector, size);
5685         }
5686         return 0;
5687 }
5688
5689 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5690 {
5691         struct drbd_peer_device *peer_device;
5692         struct drbd_device *device;
5693         struct p_block_ack *p = pi->data;
5694         sector_t sector = be64_to_cpu(p->sector);
5695
5696         peer_device = conn_peer_device(connection, pi->vnr);
5697         if (!peer_device)
5698                 return -EIO;
5699         device = peer_device->device;
5700
5701         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5702
5703         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5704             (unsigned long long)sector, be32_to_cpu(p->blksize));
5705
5706         return validate_req_change_req_state(device, p->block_id, sector,
5707                                              &device->read_requests, __func__,
5708                                              NEG_ACKED, false);
5709 }
5710
5711 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5712 {
5713         struct drbd_peer_device *peer_device;
5714         struct drbd_device *device;
5715         sector_t sector;
5716         int size;
5717         struct p_block_ack *p = pi->data;
5718
5719         peer_device = conn_peer_device(connection, pi->vnr);
5720         if (!peer_device)
5721                 return -EIO;
5722         device = peer_device->device;
5723
5724         sector = be64_to_cpu(p->sector);
5725         size = be32_to_cpu(p->blksize);
5726
5727         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5728
5729         dec_rs_pending(device);
5730
5731         if (get_ldev_if_state(device, D_FAILED)) {
5732                 drbd_rs_complete_io(device, sector);
5733                 switch (pi->cmd) {
5734                 case P_NEG_RS_DREPLY:
5735                         drbd_rs_failed_io(device, sector, size);
5736                 case P_RS_CANCEL:
5737                         break;
5738                 default:
5739                         BUG();
5740                 }
5741                 put_ldev(device);
5742         }
5743
5744         return 0;
5745 }
5746
5747 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5748 {
5749         struct p_barrier_ack *p = pi->data;
5750         struct drbd_peer_device *peer_device;
5751         int vnr;
5752
5753         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5754
5755         rcu_read_lock();
5756         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5757                 struct drbd_device *device = peer_device->device;
5758
5759                 if (device->state.conn == C_AHEAD &&
5760                     atomic_read(&device->ap_in_flight) == 0 &&
5761                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5762                         device->start_resync_timer.expires = jiffies + HZ;
5763                         add_timer(&device->start_resync_timer);
5764                 }
5765         }
5766         rcu_read_unlock();
5767
5768         return 0;
5769 }
5770
5771 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5772 {
5773         struct drbd_peer_device *peer_device;
5774         struct drbd_device *device;
5775         struct p_block_ack *p = pi->data;
5776         struct drbd_device_work *dw;
5777         sector_t sector;
5778         int size;
5779
5780         peer_device = conn_peer_device(connection, pi->vnr);
5781         if (!peer_device)
5782                 return -EIO;
5783         device = peer_device->device;
5784
5785         sector = be64_to_cpu(p->sector);
5786         size = be32_to_cpu(p->blksize);
5787
5788         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5789
5790         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5791                 drbd_ov_out_of_sync_found(device, sector, size);
5792         else
5793                 ov_out_of_sync_print(device);
5794
5795         if (!get_ldev(device))
5796                 return 0;
5797
5798         drbd_rs_complete_io(device, sector);
5799         dec_rs_pending(device);
5800
5801         --device->ov_left;
5802
5803         /* let's advance progress step marks only for every other megabyte */
5804         if ((device->ov_left & 0x200) == 0x200)
5805                 drbd_advance_rs_marks(device, device->ov_left);
5806
5807         if (device->ov_left == 0) {
5808                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5809                 if (dw) {
5810                         dw->w.cb = w_ov_finished;
5811                         dw->device = device;
5812                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5813                 } else {
5814                         drbd_err(device, "kmalloc(dw) failed.");
5815                         ov_out_of_sync_print(device);
5816                         drbd_resync_finished(device);
5817                 }
5818         }
5819         put_ldev(device);
5820         return 0;
5821 }
5822
5823 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5824 {
5825         return 0;
5826 }
5827
5828 struct meta_sock_cmd {
5829         size_t pkt_size;
5830         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5831 };
5832
5833 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5834 {
5835         long t;
5836         struct net_conf *nc;
5837
5838         rcu_read_lock();
5839         nc = rcu_dereference(connection->net_conf);
5840         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5841         rcu_read_unlock();
5842
5843         t *= HZ;
5844         if (ping_timeout)
5845                 t /= 10;
5846
5847         connection->meta.socket->sk->sk_rcvtimeo = t;
5848 }
5849
5850 static void set_ping_timeout(struct drbd_connection *connection)
5851 {
5852         set_rcvtimeo(connection, 1);
5853 }
5854
5855 static void set_idle_timeout(struct drbd_connection *connection)
5856 {
5857         set_rcvtimeo(connection, 0);
5858 }
5859
5860 static struct meta_sock_cmd ack_receiver_tbl[] = {
5861         [P_PING]            = { 0, got_Ping },
5862         [P_PING_ACK]        = { 0, got_PingAck },
5863         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5864         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5865         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5866         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5867         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5868         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5869         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5870         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5871         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5872         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5873         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5874         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5875         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5876         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5877         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5878 };
5879
5880 int drbd_ack_receiver(struct drbd_thread *thi)
5881 {
5882         struct drbd_connection *connection = thi->connection;
5883         struct meta_sock_cmd *cmd = NULL;
5884         struct packet_info pi;
5885         unsigned long pre_recv_jif;
5886         int rv;
5887         void *buf    = connection->meta.rbuf;
5888         int received = 0;
5889         unsigned int header_size = drbd_header_size(connection);
5890         int expect   = header_size;
5891         bool ping_timeout_active = false;
5892         struct sched_param param = { .sched_priority = 2 };
5893
5894         rv = sched_setscheduler(current, SCHED_RR, &param);
5895         if (rv < 0)
5896                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5897
5898         while (get_t_state(thi) == RUNNING) {
5899                 drbd_thread_current_set_cpu(thi);
5900
5901                 conn_reclaim_net_peer_reqs(connection);
5902
5903                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5904                         if (drbd_send_ping(connection)) {
5905                                 drbd_err(connection, "drbd_send_ping has failed\n");
5906                                 goto reconnect;
5907                         }
5908                         set_ping_timeout(connection);
5909                         ping_timeout_active = true;
5910                 }
5911
5912                 pre_recv_jif = jiffies;
5913                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5914
5915                 /* Note:
5916                  * -EINTR        (on meta) we got a signal
5917                  * -EAGAIN       (on meta) rcvtimeo expired
5918                  * -ECONNRESET   other side closed the connection
5919                  * -ERESTARTSYS  (on data) we got a signal
5920                  * rv <  0       other than above: unexpected error!
5921                  * rv == expected: full header or command
5922                  * rv <  expected: "woken" by signal during receive
5923                  * rv == 0       : "connection shut down by peer"
5924                  */
5925                 if (likely(rv > 0)) {
5926                         received += rv;
5927                         buf      += rv;
5928                 } else if (rv == 0) {
5929                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5930                                 long t;
5931                                 rcu_read_lock();
5932                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5933                                 rcu_read_unlock();
5934
5935                                 t = wait_event_timeout(connection->ping_wait,
5936                                                        connection->cstate < C_WF_REPORT_PARAMS,
5937                                                        t);
5938                                 if (t)
5939                                         break;
5940                         }
5941                         drbd_err(connection, "meta connection shut down by peer.\n");
5942                         goto reconnect;
5943                 } else if (rv == -EAGAIN) {
5944                         /* If the data socket received something meanwhile,
5945                          * that is good enough: peer is still alive. */
5946                         if (time_after(connection->last_received, pre_recv_jif))
5947                                 continue;
5948                         if (ping_timeout_active) {
5949                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5950                                 goto reconnect;
5951                         }
5952                         set_bit(SEND_PING, &connection->flags);
5953                         continue;
5954                 } else if (rv == -EINTR) {
5955                         /* maybe drbd_thread_stop(): the while condition will notice.
5956                          * maybe woken for send_ping: we'll send a ping above,
5957                          * and change the rcvtimeo */
5958                         flush_signals(current);
5959                         continue;
5960                 } else {
5961                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5962                         goto reconnect;
5963                 }
5964
5965                 if (received == expect && cmd == NULL) {
5966                         if (decode_header(connection, connection->meta.rbuf, &pi))
5967                                 goto reconnect;
5968                         cmd = &ack_receiver_tbl[pi.cmd];
5969                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5970                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5971                                          cmdname(pi.cmd), pi.cmd);
5972                                 goto disconnect;
5973                         }
5974                         expect = header_size + cmd->pkt_size;
5975                         if (pi.size != expect - header_size) {
5976                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5977                                         pi.cmd, pi.size);
5978                                 goto reconnect;
5979                         }
5980                 }
5981                 if (received == expect) {
5982                         bool err;
5983
5984                         err = cmd->fn(connection, &pi);
5985                         if (err) {
5986                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5987                                 goto reconnect;
5988                         }
5989
5990                         connection->last_received = jiffies;
5991
5992                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5993                                 set_idle_timeout(connection);
5994                                 ping_timeout_active = false;
5995                         }
5996
5997                         buf      = connection->meta.rbuf;
5998                         received = 0;
5999                         expect   = header_size;
6000                         cmd      = NULL;
6001                 }
6002         }
6003
6004         if (0) {
6005 reconnect:
6006                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6007                 conn_md_sync(connection);
6008         }
6009         if (0) {
6010 disconnect:
6011                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6012         }
6013
6014         drbd_info(connection, "ack_receiver terminated\n");
6015
6016         return 0;
6017 }
6018
6019 void drbd_send_acks_wf(struct work_struct *ws)
6020 {
6021         struct drbd_peer_device *peer_device =
6022                 container_of(ws, struct drbd_peer_device, send_acks_work);
6023         struct drbd_connection *connection = peer_device->connection;
6024         struct drbd_device *device = peer_device->device;
6025         struct net_conf *nc;
6026         int tcp_cork, err;
6027
6028         rcu_read_lock();
6029         nc = rcu_dereference(connection->net_conf);
6030         tcp_cork = nc->tcp_cork;
6031         rcu_read_unlock();
6032
6033         if (tcp_cork)
6034                 drbd_tcp_cork(connection->meta.socket);
6035
6036         err = drbd_finish_peer_reqs(device);
6037         kref_put(&device->kref, drbd_destroy_device);
6038         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6039            struct work_struct send_acks_work alive, which is in the peer_device object */
6040
6041         if (err) {
6042                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6043                 return;
6044         }
6045
6046         if (tcp_cork)
6047                 drbd_tcp_uncork(connection->meta.socket);
6048
6049         return;
6050 }