GNU Linux-libre 4.14.266-gnu1
[releases.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54
55 struct packet_info {
56         enum drbd_packet cmd;
57         unsigned int size;
58         unsigned int vnr;
59         void *data;
60 };
61
62 enum finish_epoch {
63         FE_STILL_LIVE,
64         FE_DESTROYED,
65         FE_RECYCLED,
66 };
67
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74
75
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89         struct page *page;
90         struct page *tmp;
91
92         BUG_ON(!n);
93         BUG_ON(!head);
94
95         page = *head;
96
97         if (!page)
98                 return NULL;
99
100         while (page) {
101                 tmp = page_chain_next(page);
102                 if (--n == 0)
103                         break; /* found sufficient pages */
104                 if (tmp == NULL)
105                         /* insufficient pages, don't use any of them. */
106                         return NULL;
107                 page = tmp;
108         }
109
110         /* add end of list marker for the returned list */
111         set_page_private(page, 0);
112         /* actual return value, and adjustment of head */
113         page = *head;
114         *head = tmp;
115         return page;
116 }
117
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123         struct page *tmp;
124         int i = 1;
125         while ((tmp = page_chain_next(page)))
126                 ++i, page = tmp;
127         if (len)
128                 *len = i;
129         return page;
130 }
131
132 static int page_chain_free(struct page *page)
133 {
134         struct page *tmp;
135         int i = 0;
136         page_chain_for_each_safe(page, tmp) {
137                 put_page(page);
138                 ++i;
139         }
140         return i;
141 }
142
143 static void page_chain_add(struct page **head,
144                 struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147         struct page *tmp;
148         tmp = page_chain_tail(chain_first, NULL);
149         BUG_ON(tmp != chain_last);
150 #endif
151
152         /* add chain to head */
153         set_page_private(chain_last, (unsigned long)*head);
154         *head = chain_first;
155 }
156
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158                                        unsigned int number)
159 {
160         struct page *page = NULL;
161         struct page *tmp = NULL;
162         unsigned int i = 0;
163
164         /* Yes, testing drbd_pp_vacant outside the lock is racy.
165          * So what. It saves a spin_lock. */
166         if (drbd_pp_vacant >= number) {
167                 spin_lock(&drbd_pp_lock);
168                 page = page_chain_del(&drbd_pp_pool, number);
169                 if (page)
170                         drbd_pp_vacant -= number;
171                 spin_unlock(&drbd_pp_lock);
172                 if (page)
173                         return page;
174         }
175
176         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177          * "criss-cross" setup, that might cause write-out on some other DRBD,
178          * which in turn might block on the other node at this very place.  */
179         for (i = 0; i < number; i++) {
180                 tmp = alloc_page(GFP_TRY);
181                 if (!tmp)
182                         break;
183                 set_page_private(tmp, (unsigned long)page);
184                 page = tmp;
185         }
186
187         if (i == number)
188                 return page;
189
190         /* Not enough pages immediately available this time.
191          * No need to jump around here, drbd_alloc_pages will retry this
192          * function "soon". */
193         if (page) {
194                 tmp = page_chain_tail(page, NULL);
195                 spin_lock(&drbd_pp_lock);
196                 page_chain_add(&drbd_pp_pool, page, tmp);
197                 drbd_pp_vacant += i;
198                 spin_unlock(&drbd_pp_lock);
199         }
200         return NULL;
201 }
202
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204                                            struct list_head *to_be_freed)
205 {
206         struct drbd_peer_request *peer_req, *tmp;
207
208         /* The EEs are always appended to the end of the list. Since
209            they are sent in order over the wire, they have to finish
210            in order. As soon as we see the first not finished we can
211            stop to examine the list... */
212
213         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214                 if (drbd_peer_req_has_active_page(peer_req))
215                         break;
216                 list_move(&peer_req->w.list, to_be_freed);
217         }
218 }
219
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222         LIST_HEAD(reclaimed);
223         struct drbd_peer_request *peer_req, *t;
224
225         spin_lock_irq(&device->resource->req_lock);
226         reclaim_finished_net_peer_reqs(device, &reclaimed);
227         spin_unlock_irq(&device->resource->req_lock);
228         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229                 drbd_free_net_peer_req(device, peer_req);
230 }
231
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234         struct drbd_peer_device *peer_device;
235         int vnr;
236
237         rcu_read_lock();
238         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239                 struct drbd_device *device = peer_device->device;
240                 if (!atomic_read(&device->pp_in_use_by_net))
241                         continue;
242
243                 kref_get(&device->kref);
244                 rcu_read_unlock();
245                 drbd_reclaim_net_peer_reqs(device);
246                 kref_put(&device->kref, drbd_destroy_device);
247                 rcu_read_lock();
248         }
249         rcu_read_unlock();
250 }
251
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:     DRBD device.
255  * @number:     number of pages requested
256  * @retry:      whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273                               bool retry)
274 {
275         struct drbd_device *device = peer_device->device;
276         struct page *page = NULL;
277         struct net_conf *nc;
278         DEFINE_WAIT(wait);
279         unsigned int mxb;
280
281         rcu_read_lock();
282         nc = rcu_dereference(peer_device->connection->net_conf);
283         mxb = nc ? nc->max_buffers : 1000000;
284         rcu_read_unlock();
285
286         if (atomic_read(&device->pp_in_use) < mxb)
287                 page = __drbd_alloc_pages(device, number);
288
289         /* Try to keep the fast path fast, but occasionally we need
290          * to reclaim the pages we lended to the network stack. */
291         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292                 drbd_reclaim_net_peer_reqs(device);
293
294         while (page == NULL) {
295                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296
297                 drbd_reclaim_net_peer_reqs(device);
298
299                 if (atomic_read(&device->pp_in_use) < mxb) {
300                         page = __drbd_alloc_pages(device, number);
301                         if (page)
302                                 break;
303                 }
304
305                 if (!retry)
306                         break;
307
308                 if (signal_pending(current)) {
309                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310                         break;
311                 }
312
313                 if (schedule_timeout(HZ/10) == 0)
314                         mxb = UINT_MAX;
315         }
316         finish_wait(&drbd_pp_wait, &wait);
317
318         if (page)
319                 atomic_add(number, &device->pp_in_use);
320         return page;
321 }
322
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330         int i;
331
332         if (page == NULL)
333                 return;
334
335         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336                 i = page_chain_free(page);
337         else {
338                 struct page *tmp;
339                 tmp = page_chain_tail(page, &i);
340                 spin_lock(&drbd_pp_lock);
341                 page_chain_add(&drbd_pp_pool, page, tmp);
342                 drbd_pp_vacant += i;
343                 spin_unlock(&drbd_pp_lock);
344         }
345         i = atomic_sub_return(i, a);
346         if (i < 0)
347                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349         wake_up(&drbd_pp_wait);
350 }
351
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373         struct drbd_device *device = peer_device->device;
374         struct drbd_peer_request *peer_req;
375         struct page *page = NULL;
376         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377
378         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379                 return NULL;
380
381         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382         if (!peer_req) {
383                 if (!(gfp_mask & __GFP_NOWARN))
384                         drbd_err(device, "%s: allocation failed\n", __func__);
385                 return NULL;
386         }
387
388         if (nr_pages) {
389                 page = drbd_alloc_pages(peer_device, nr_pages,
390                                         gfpflags_allow_blocking(gfp_mask));
391                 if (!page)
392                         goto fail;
393         }
394
395         memset(peer_req, 0, sizeof(*peer_req));
396         INIT_LIST_HEAD(&peer_req->w.list);
397         drbd_clear_interval(&peer_req->i);
398         peer_req->i.size = request_size;
399         peer_req->i.sector = sector;
400         peer_req->submit_jif = jiffies;
401         peer_req->peer_device = peer_device;
402         peer_req->pages = page;
403         /*
404          * The block_id is opaque to the receiver.  It is not endianness
405          * converted, and sent back to the sender unchanged.
406          */
407         peer_req->block_id = id;
408
409         return peer_req;
410
411  fail:
412         mempool_free(peer_req, drbd_ee_mempool);
413         return NULL;
414 }
415
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417                        int is_net)
418 {
419         might_sleep();
420         if (peer_req->flags & EE_HAS_DIGEST)
421                 kfree(peer_req->digest);
422         drbd_free_pages(device, peer_req->pages, is_net);
423         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427                 drbd_al_complete_io(device, &peer_req->i);
428         }
429         mempool_free(peer_req, drbd_ee_mempool);
430 }
431
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434         LIST_HEAD(work_list);
435         struct drbd_peer_request *peer_req, *t;
436         int count = 0;
437         int is_net = list == &device->net_ee;
438
439         spin_lock_irq(&device->resource->req_lock);
440         list_splice_init(list, &work_list);
441         spin_unlock_irq(&device->resource->req_lock);
442
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 __drbd_free_peer_req(device, peer_req, is_net);
445                 count++;
446         }
447         return count;
448 }
449
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455         LIST_HEAD(work_list);
456         LIST_HEAD(reclaimed);
457         struct drbd_peer_request *peer_req, *t;
458         int err = 0;
459
460         spin_lock_irq(&device->resource->req_lock);
461         reclaim_finished_net_peer_reqs(device, &reclaimed);
462         list_splice_init(&device->done_ee, &work_list);
463         spin_unlock_irq(&device->resource->req_lock);
464
465         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466                 drbd_free_net_peer_req(device, peer_req);
467
468         /* possible callbacks here:
469          * e_end_block, and e_end_resync_block, e_send_superseded.
470          * all ignore the last argument.
471          */
472         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473                 int err2;
474
475                 /* list_del not necessary, next/prev members not touched */
476                 err2 = peer_req->w.cb(&peer_req->w, !!err);
477                 if (!err)
478                         err = err2;
479                 drbd_free_peer_req(device, peer_req);
480         }
481         wake_up(&device->ee_wait);
482
483         return err;
484 }
485
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487                                      struct list_head *head)
488 {
489         DEFINE_WAIT(wait);
490
491         /* avoids spin_lock/unlock
492          * and calling prepare_to_wait in the fast path */
493         while (!list_empty(head)) {
494                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495                 spin_unlock_irq(&device->resource->req_lock);
496                 io_schedule();
497                 finish_wait(&device->ee_wait, &wait);
498                 spin_lock_irq(&device->resource->req_lock);
499         }
500 }
501
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503                                     struct list_head *head)
504 {
505         spin_lock_irq(&device->resource->req_lock);
506         _drbd_wait_ee_list_empty(device, head);
507         spin_unlock_irq(&device->resource->req_lock);
508 }
509
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512         struct kvec iov = {
513                 .iov_base = buf,
514                 .iov_len = size,
515         };
516         struct msghdr msg = {
517                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518         };
519         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
520 }
521
522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
523 {
524         int rv;
525
526         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
527
528         if (rv < 0) {
529                 if (rv == -ECONNRESET)
530                         drbd_info(connection, "sock was reset by peer\n");
531                 else if (rv != -ERESTARTSYS)
532                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533         } else if (rv == 0) {
534                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535                         long t;
536                         rcu_read_lock();
537                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538                         rcu_read_unlock();
539
540                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
541
542                         if (t)
543                                 goto out;
544                 }
545                 drbd_info(connection, "sock was shut down by peer\n");
546         }
547
548         if (rv != size)
549                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
550
551 out:
552         return rv;
553 }
554
555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
556 {
557         int err;
558
559         err = drbd_recv(connection, buf, size);
560         if (err != size) {
561                 if (err >= 0)
562                         err = -EIO;
563         } else
564                 err = 0;
565         return err;
566 }
567
568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
569 {
570         int err;
571
572         err = drbd_recv_all(connection, buf, size);
573         if (err && !signal_pending(current))
574                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575         return err;
576 }
577
578 /* quoting tcp(7):
579  *   On individual connections, the socket buffer size must be set prior to the
580  *   listen(2) or connect(2) calls in order to have it take effect.
581  * This is our wrapper to do so.
582  */
583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584                 unsigned int rcv)
585 {
586         /* open coded SO_SNDBUF, SO_RCVBUF */
587         if (snd) {
588                 sock->sk->sk_sndbuf = snd;
589                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
590         }
591         if (rcv) {
592                 sock->sk->sk_rcvbuf = rcv;
593                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
594         }
595 }
596
597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
598 {
599         const char *what;
600         struct socket *sock;
601         struct sockaddr_in6 src_in6;
602         struct sockaddr_in6 peer_in6;
603         struct net_conf *nc;
604         int err, peer_addr_len, my_addr_len;
605         int sndbuf_size, rcvbuf_size, connect_int;
606         int disconnect_on_error = 1;
607
608         rcu_read_lock();
609         nc = rcu_dereference(connection->net_conf);
610         if (!nc) {
611                 rcu_read_unlock();
612                 return NULL;
613         }
614         sndbuf_size = nc->sndbuf_size;
615         rcvbuf_size = nc->rcvbuf_size;
616         connect_int = nc->connect_int;
617         rcu_read_unlock();
618
619         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620         memcpy(&src_in6, &connection->my_addr, my_addr_len);
621
622         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623                 src_in6.sin6_port = 0;
624         else
625                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
626
627         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
629
630         what = "sock_create_kern";
631         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632                                SOCK_STREAM, IPPROTO_TCP, &sock);
633         if (err < 0) {
634                 sock = NULL;
635                 goto out;
636         }
637
638         sock->sk->sk_rcvtimeo =
639         sock->sk->sk_sndtimeo = connect_int * HZ;
640         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
641
642        /* explicitly bind to the configured IP as source IP
643         *  for the outgoing connections.
644         *  This is needed for multihomed hosts and to be
645         *  able to use lo: interfaces for drbd.
646         * Make sure to use 0 as port number, so linux selects
647         *  a free one dynamically.
648         */
649         what = "bind before connect";
650         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651         if (err < 0)
652                 goto out;
653
654         /* connect may fail, peer not yet available.
655          * stay C_WF_CONNECTION, don't go Disconnecting! */
656         disconnect_on_error = 0;
657         what = "connect";
658         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
659
660 out:
661         if (err < 0) {
662                 if (sock) {
663                         sock_release(sock);
664                         sock = NULL;
665                 }
666                 switch (-err) {
667                         /* timeout, busy, signal pending */
668                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669                 case EINTR: case ERESTARTSYS:
670                         /* peer not (yet) available, network problem */
671                 case ECONNREFUSED: case ENETUNREACH:
672                 case EHOSTDOWN:    case EHOSTUNREACH:
673                         disconnect_on_error = 0;
674                         break;
675                 default:
676                         drbd_err(connection, "%s failed, err = %d\n", what, err);
677                 }
678                 if (disconnect_on_error)
679                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
680         }
681
682         return sock;
683 }
684
685 struct accept_wait_data {
686         struct drbd_connection *connection;
687         struct socket *s_listen;
688         struct completion door_bell;
689         void (*original_sk_state_change)(struct sock *sk);
690
691 };
692
693 static void drbd_incoming_connection(struct sock *sk)
694 {
695         struct accept_wait_data *ad = sk->sk_user_data;
696         void (*state_change)(struct sock *sk);
697
698         state_change = ad->original_sk_state_change;
699         if (sk->sk_state == TCP_ESTABLISHED)
700                 complete(&ad->door_bell);
701         state_change(sk);
702 }
703
704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
705 {
706         int err, sndbuf_size, rcvbuf_size, my_addr_len;
707         struct sockaddr_in6 my_addr;
708         struct socket *s_listen;
709         struct net_conf *nc;
710         const char *what;
711
712         rcu_read_lock();
713         nc = rcu_dereference(connection->net_conf);
714         if (!nc) {
715                 rcu_read_unlock();
716                 return -EIO;
717         }
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         rcu_read_unlock();
721
722         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, &connection->my_addr, my_addr_len);
724
725         what = "sock_create_kern";
726         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
728         if (err) {
729                 s_listen = NULL;
730                 goto out;
731         }
732
733         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
735
736         what = "bind before listen";
737         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738         if (err < 0)
739                 goto out;
740
741         ad->s_listen = s_listen;
742         write_lock_bh(&s_listen->sk->sk_callback_lock);
743         ad->original_sk_state_change = s_listen->sk->sk_state_change;
744         s_listen->sk->sk_state_change = drbd_incoming_connection;
745         s_listen->sk->sk_user_data = ad;
746         write_unlock_bh(&s_listen->sk->sk_callback_lock);
747
748         what = "listen";
749         err = s_listen->ops->listen(s_listen, 5);
750         if (err < 0)
751                 goto out;
752
753         return 0;
754 out:
755         if (s_listen)
756                 sock_release(s_listen);
757         if (err < 0) {
758                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759                         drbd_err(connection, "%s failed, err = %d\n", what, err);
760                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
761                 }
762         }
763
764         return -EIO;
765 }
766
767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
768 {
769         write_lock_bh(&sk->sk_callback_lock);
770         sk->sk_state_change = ad->original_sk_state_change;
771         sk->sk_user_data = NULL;
772         write_unlock_bh(&sk->sk_callback_lock);
773 }
774
775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
776 {
777         int timeo, connect_int, err = 0;
778         struct socket *s_estab = NULL;
779         struct net_conf *nc;
780
781         rcu_read_lock();
782         nc = rcu_dereference(connection->net_conf);
783         if (!nc) {
784                 rcu_read_unlock();
785                 return NULL;
786         }
787         connect_int = nc->connect_int;
788         rcu_read_unlock();
789
790         timeo = connect_int * HZ;
791         /* 28.5% random jitter */
792         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
793
794         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795         if (err <= 0)
796                 return NULL;
797
798         err = kernel_accept(ad->s_listen, &s_estab, 0);
799         if (err < 0) {
800                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801                         drbd_err(connection, "accept failed, err = %d\n", err);
802                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
803                 }
804         }
805
806         if (s_estab)
807                 unregister_state_change(s_estab->sk, ad);
808
809         return s_estab;
810 }
811
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
813
814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815                              enum drbd_packet cmd)
816 {
817         if (!conn_prepare_command(connection, sock))
818                 return -EIO;
819         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
820 }
821
822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
823 {
824         unsigned int header_size = drbd_header_size(connection);
825         struct packet_info pi;
826         struct net_conf *nc;
827         int err;
828
829         rcu_read_lock();
830         nc = rcu_dereference(connection->net_conf);
831         if (!nc) {
832                 rcu_read_unlock();
833                 return -EIO;
834         }
835         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836         rcu_read_unlock();
837
838         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839         if (err != header_size) {
840                 if (err >= 0)
841                         err = -EIO;
842                 return err;
843         }
844         err = decode_header(connection, connection->data.rbuf, &pi);
845         if (err)
846                 return err;
847         return pi.cmd;
848 }
849
850 /**
851  * drbd_socket_okay() - Free the socket if its connection is not okay
852  * @sock:       pointer to the pointer to the socket.
853  */
854 static bool drbd_socket_okay(struct socket **sock)
855 {
856         int rr;
857         char tb[4];
858
859         if (!*sock)
860                 return false;
861
862         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
863
864         if (rr > 0 || rr == -EAGAIN) {
865                 return true;
866         } else {
867                 sock_release(*sock);
868                 *sock = NULL;
869                 return false;
870         }
871 }
872
873 static bool connection_established(struct drbd_connection *connection,
874                                    struct socket **sock1,
875                                    struct socket **sock2)
876 {
877         struct net_conf *nc;
878         int timeout;
879         bool ok;
880
881         if (!*sock1 || !*sock2)
882                 return false;
883
884         rcu_read_lock();
885         nc = rcu_dereference(connection->net_conf);
886         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887         rcu_read_unlock();
888         schedule_timeout_interruptible(timeout);
889
890         ok = drbd_socket_okay(sock1);
891         ok = drbd_socket_okay(sock2) && ok;
892
893         return ok;
894 }
895
896 /* Gets called if a connection is established, or if a new minor gets created
897    in a connection */
898 int drbd_connected(struct drbd_peer_device *peer_device)
899 {
900         struct drbd_device *device = peer_device->device;
901         int err;
902
903         atomic_set(&device->packet_seq, 0);
904         device->peer_seq = 0;
905
906         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907                 &peer_device->connection->cstate_mutex :
908                 &device->own_state_mutex;
909
910         err = drbd_send_sync_param(peer_device);
911         if (!err)
912                 err = drbd_send_sizes(peer_device, 0, 0);
913         if (!err)
914                 err = drbd_send_uuids(peer_device);
915         if (!err)
916                 err = drbd_send_current_state(peer_device);
917         clear_bit(USE_DEGR_WFC_T, &device->flags);
918         clear_bit(RESIZE_PENDING, &device->flags);
919         atomic_set(&device->ap_in_flight, 0);
920         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921         return err;
922 }
923
924 /*
925  * return values:
926  *   1 yes, we have a valid connection
927  *   0 oops, did not work out, please try again
928  *  -1 peer talks different language,
929  *     no point in trying again, please go standalone.
930  *  -2 We do not have a network config...
931  */
932 static int conn_connect(struct drbd_connection *connection)
933 {
934         struct drbd_socket sock, msock;
935         struct drbd_peer_device *peer_device;
936         struct net_conf *nc;
937         int vnr, timeout, h;
938         bool discard_my_data, ok;
939         enum drbd_state_rv rv;
940         struct accept_wait_data ad = {
941                 .connection = connection,
942                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
943         };
944
945         clear_bit(DISCONNECT_SENT, &connection->flags);
946         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947                 return -2;
948
949         mutex_init(&sock.mutex);
950         sock.sbuf = connection->data.sbuf;
951         sock.rbuf = connection->data.rbuf;
952         sock.socket = NULL;
953         mutex_init(&msock.mutex);
954         msock.sbuf = connection->meta.sbuf;
955         msock.rbuf = connection->meta.rbuf;
956         msock.socket = NULL;
957
958         /* Assume that the peer only understands protocol 80 until we know better.  */
959         connection->agreed_pro_version = 80;
960
961         if (prepare_listen_socket(connection, &ad))
962                 return 0;
963
964         do {
965                 struct socket *s;
966
967                 s = drbd_try_connect(connection);
968                 if (s) {
969                         if (!sock.socket) {
970                                 sock.socket = s;
971                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
972                         } else if (!msock.socket) {
973                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974                                 msock.socket = s;
975                                 send_first_packet(connection, &msock, P_INITIAL_META);
976                         } else {
977                                 drbd_err(connection, "Logic error in conn_connect()\n");
978                                 goto out_release_sockets;
979                         }
980                 }
981
982                 if (connection_established(connection, &sock.socket, &msock.socket))
983                         break;
984
985 retry:
986                 s = drbd_wait_for_connect(connection, &ad);
987                 if (s) {
988                         int fp = receive_first_packet(connection, s);
989                         drbd_socket_okay(&sock.socket);
990                         drbd_socket_okay(&msock.socket);
991                         switch (fp) {
992                         case P_INITIAL_DATA:
993                                 if (sock.socket) {
994                                         drbd_warn(connection, "initial packet S crossed\n");
995                                         sock_release(sock.socket);
996                                         sock.socket = s;
997                                         goto randomize;
998                                 }
999                                 sock.socket = s;
1000                                 break;
1001                         case P_INITIAL_META:
1002                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003                                 if (msock.socket) {
1004                                         drbd_warn(connection, "initial packet M crossed\n");
1005                                         sock_release(msock.socket);
1006                                         msock.socket = s;
1007                                         goto randomize;
1008                                 }
1009                                 msock.socket = s;
1010                                 break;
1011                         default:
1012                                 drbd_warn(connection, "Error receiving initial packet\n");
1013                                 sock_release(s);
1014 randomize:
1015                                 if (prandom_u32() & 1)
1016                                         goto retry;
1017                         }
1018                 }
1019
1020                 if (connection->cstate <= C_DISCONNECTING)
1021                         goto out_release_sockets;
1022                 if (signal_pending(current)) {
1023                         flush_signals(current);
1024                         smp_rmb();
1025                         if (get_t_state(&connection->receiver) == EXITING)
1026                                 goto out_release_sockets;
1027                 }
1028
1029                 ok = connection_established(connection, &sock.socket, &msock.socket);
1030         } while (!ok);
1031
1032         if (ad.s_listen)
1033                 sock_release(ad.s_listen);
1034
1035         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037
1038         sock.socket->sk->sk_allocation = GFP_NOIO;
1039         msock.socket->sk->sk_allocation = GFP_NOIO;
1040
1041         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1043
1044         /* NOT YET ...
1045          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047          * first set it to the P_CONNECTION_FEATURES timeout,
1048          * which we set to 4x the configured ping_timeout. */
1049         rcu_read_lock();
1050         nc = rcu_dereference(connection->net_conf);
1051
1052         sock.socket->sk->sk_sndtimeo =
1053         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1054
1055         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056         timeout = nc->timeout * HZ / 10;
1057         discard_my_data = nc->discard_my_data;
1058         rcu_read_unlock();
1059
1060         msock.socket->sk->sk_sndtimeo = timeout;
1061
1062         /* we don't want delays.
1063          * we use TCP_CORK where appropriate, though */
1064         drbd_tcp_nodelay(sock.socket);
1065         drbd_tcp_nodelay(msock.socket);
1066
1067         connection->data.socket = sock.socket;
1068         connection->meta.socket = msock.socket;
1069         connection->last_received = jiffies;
1070
1071         h = drbd_do_features(connection);
1072         if (h <= 0)
1073                 return h;
1074
1075         if (connection->cram_hmac_tfm) {
1076                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1077                 switch (drbd_do_auth(connection)) {
1078                 case -1:
1079                         drbd_err(connection, "Authentication of peer failed\n");
1080                         return -1;
1081                 case 0:
1082                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083                         return 0;
1084                 }
1085         }
1086
1087         connection->data.socket->sk->sk_sndtimeo = timeout;
1088         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1089
1090         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091                 return -1;
1092
1093         /* Prevent a race between resync-handshake and
1094          * being promoted to Primary.
1095          *
1096          * Grab and release the state mutex, so we know that any current
1097          * drbd_set_role() is finished, and any incoming drbd_set_role
1098          * will see the STATE_SENT flag, and wait for it to be cleared.
1099          */
1100         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101                 mutex_lock(peer_device->device->state_mutex);
1102
1103         /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1104         spin_lock_irq(&connection->resource->req_lock);
1105         set_bit(STATE_SENT, &connection->flags);
1106         spin_unlock_irq(&connection->resource->req_lock);
1107
1108         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1109                 mutex_unlock(peer_device->device->state_mutex);
1110
1111         rcu_read_lock();
1112         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1113                 struct drbd_device *device = peer_device->device;
1114                 kref_get(&device->kref);
1115                 rcu_read_unlock();
1116
1117                 if (discard_my_data)
1118                         set_bit(DISCARD_MY_DATA, &device->flags);
1119                 else
1120                         clear_bit(DISCARD_MY_DATA, &device->flags);
1121
1122                 drbd_connected(peer_device);
1123                 kref_put(&device->kref, drbd_destroy_device);
1124                 rcu_read_lock();
1125         }
1126         rcu_read_unlock();
1127
1128         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1129         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1130                 clear_bit(STATE_SENT, &connection->flags);
1131                 return 0;
1132         }
1133
1134         drbd_thread_start(&connection->ack_receiver);
1135         /* opencoded create_singlethread_workqueue(),
1136          * to be able to use format string arguments */
1137         connection->ack_sender =
1138                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1139         if (!connection->ack_sender) {
1140                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1141                 return 0;
1142         }
1143
1144         mutex_lock(&connection->resource->conf_update);
1145         /* The discard_my_data flag is a single-shot modifier to the next
1146          * connection attempt, the handshake of which is now well underway.
1147          * No need for rcu style copying of the whole struct
1148          * just to clear a single value. */
1149         connection->net_conf->discard_my_data = 0;
1150         mutex_unlock(&connection->resource->conf_update);
1151
1152         return h;
1153
1154 out_release_sockets:
1155         if (ad.s_listen)
1156                 sock_release(ad.s_listen);
1157         if (sock.socket)
1158                 sock_release(sock.socket);
1159         if (msock.socket)
1160                 sock_release(msock.socket);
1161         return -1;
1162 }
1163
1164 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1165 {
1166         unsigned int header_size = drbd_header_size(connection);
1167
1168         if (header_size == sizeof(struct p_header100) &&
1169             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1170                 struct p_header100 *h = header;
1171                 if (h->pad != 0) {
1172                         drbd_err(connection, "Header padding is not zero\n");
1173                         return -EINVAL;
1174                 }
1175                 pi->vnr = be16_to_cpu(h->volume);
1176                 pi->cmd = be16_to_cpu(h->command);
1177                 pi->size = be32_to_cpu(h->length);
1178         } else if (header_size == sizeof(struct p_header95) &&
1179                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1180                 struct p_header95 *h = header;
1181                 pi->cmd = be16_to_cpu(h->command);
1182                 pi->size = be32_to_cpu(h->length);
1183                 pi->vnr = 0;
1184         } else if (header_size == sizeof(struct p_header80) &&
1185                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1186                 struct p_header80 *h = header;
1187                 pi->cmd = be16_to_cpu(h->command);
1188                 pi->size = be16_to_cpu(h->length);
1189                 pi->vnr = 0;
1190         } else {
1191                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1192                          be32_to_cpu(*(__be32 *)header),
1193                          connection->agreed_pro_version);
1194                 return -EINVAL;
1195         }
1196         pi->data = header + header_size;
1197         return 0;
1198 }
1199
1200 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1201 {
1202         if (current->plug == &connection->receiver_plug) {
1203                 blk_finish_plug(&connection->receiver_plug);
1204                 blk_start_plug(&connection->receiver_plug);
1205         } /* else: maybe just schedule() ?? */
1206 }
1207
1208 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1209 {
1210         void *buffer = connection->data.rbuf;
1211         int err;
1212
1213         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1214         if (err)
1215                 return err;
1216
1217         err = decode_header(connection, buffer, pi);
1218         connection->last_received = jiffies;
1219
1220         return err;
1221 }
1222
1223 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1224 {
1225         void *buffer = connection->data.rbuf;
1226         unsigned int size = drbd_header_size(connection);
1227         int err;
1228
1229         err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1230         if (err != size) {
1231                 /* If we have nothing in the receive buffer now, to reduce
1232                  * application latency, try to drain the backend queues as
1233                  * quickly as possible, and let remote TCP know what we have
1234                  * received so far. */
1235                 if (err == -EAGAIN) {
1236                         drbd_tcp_quickack(connection->data.socket);
1237                         drbd_unplug_all_devices(connection);
1238                 }
1239                 if (err > 0) {
1240                         buffer += err;
1241                         size -= err;
1242                 }
1243                 err = drbd_recv_all_warn(connection, buffer, size);
1244                 if (err)
1245                         return err;
1246         }
1247
1248         err = decode_header(connection, connection->data.rbuf, pi);
1249         connection->last_received = jiffies;
1250
1251         return err;
1252 }
1253 /* This is blkdev_issue_flush, but asynchronous.
1254  * We want to submit to all component volumes in parallel,
1255  * then wait for all completions.
1256  */
1257 struct issue_flush_context {
1258         atomic_t pending;
1259         int error;
1260         struct completion done;
1261 };
1262 struct one_flush_context {
1263         struct drbd_device *device;
1264         struct issue_flush_context *ctx;
1265 };
1266
1267 static void one_flush_endio(struct bio *bio)
1268 {
1269         struct one_flush_context *octx = bio->bi_private;
1270         struct drbd_device *device = octx->device;
1271         struct issue_flush_context *ctx = octx->ctx;
1272
1273         if (bio->bi_status) {
1274                 ctx->error = blk_status_to_errno(bio->bi_status);
1275                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1276         }
1277         kfree(octx);
1278         bio_put(bio);
1279
1280         clear_bit(FLUSH_PENDING, &device->flags);
1281         put_ldev(device);
1282         kref_put(&device->kref, drbd_destroy_device);
1283
1284         if (atomic_dec_and_test(&ctx->pending))
1285                 complete(&ctx->done);
1286 }
1287
1288 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1289 {
1290         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1291         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1292         if (!bio || !octx) {
1293                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1294                 /* FIXME: what else can I do now?  disconnecting or detaching
1295                  * really does not help to improve the state of the world, either.
1296                  */
1297                 kfree(octx);
1298                 if (bio)
1299                         bio_put(bio);
1300
1301                 ctx->error = -ENOMEM;
1302                 put_ldev(device);
1303                 kref_put(&device->kref, drbd_destroy_device);
1304                 return;
1305         }
1306
1307         octx->device = device;
1308         octx->ctx = ctx;
1309         bio_set_dev(bio, device->ldev->backing_bdev);
1310         bio->bi_private = octx;
1311         bio->bi_end_io = one_flush_endio;
1312         bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1313
1314         device->flush_jif = jiffies;
1315         set_bit(FLUSH_PENDING, &device->flags);
1316         atomic_inc(&ctx->pending);
1317         submit_bio(bio);
1318 }
1319
1320 static void drbd_flush(struct drbd_connection *connection)
1321 {
1322         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1323                 struct drbd_peer_device *peer_device;
1324                 struct issue_flush_context ctx;
1325                 int vnr;
1326
1327                 atomic_set(&ctx.pending, 1);
1328                 ctx.error = 0;
1329                 init_completion(&ctx.done);
1330
1331                 rcu_read_lock();
1332                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1333                         struct drbd_device *device = peer_device->device;
1334
1335                         if (!get_ldev(device))
1336                                 continue;
1337                         kref_get(&device->kref);
1338                         rcu_read_unlock();
1339
1340                         submit_one_flush(device, &ctx);
1341
1342                         rcu_read_lock();
1343                 }
1344                 rcu_read_unlock();
1345
1346                 /* Do we want to add a timeout,
1347                  * if disk-timeout is set? */
1348                 if (!atomic_dec_and_test(&ctx.pending))
1349                         wait_for_completion(&ctx.done);
1350
1351                 if (ctx.error) {
1352                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1353                          * don't try again for ANY return value != 0
1354                          * if (rv == -EOPNOTSUPP) */
1355                         /* Any error is already reported by bio_endio callback. */
1356                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1357                 }
1358         }
1359 }
1360
1361 /**
1362  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1363  * @device:     DRBD device.
1364  * @epoch:      Epoch object.
1365  * @ev:         Epoch event.
1366  */
1367 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1368                                                struct drbd_epoch *epoch,
1369                                                enum epoch_event ev)
1370 {
1371         int epoch_size;
1372         struct drbd_epoch *next_epoch;
1373         enum finish_epoch rv = FE_STILL_LIVE;
1374
1375         spin_lock(&connection->epoch_lock);
1376         do {
1377                 next_epoch = NULL;
1378
1379                 epoch_size = atomic_read(&epoch->epoch_size);
1380
1381                 switch (ev & ~EV_CLEANUP) {
1382                 case EV_PUT:
1383                         atomic_dec(&epoch->active);
1384                         break;
1385                 case EV_GOT_BARRIER_NR:
1386                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1387                         break;
1388                 case EV_BECAME_LAST:
1389                         /* nothing to do*/
1390                         break;
1391                 }
1392
1393                 if (epoch_size != 0 &&
1394                     atomic_read(&epoch->active) == 0 &&
1395                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1396                         if (!(ev & EV_CLEANUP)) {
1397                                 spin_unlock(&connection->epoch_lock);
1398                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1399                                 spin_lock(&connection->epoch_lock);
1400                         }
1401 #if 0
1402                         /* FIXME: dec unacked on connection, once we have
1403                          * something to count pending connection packets in. */
1404                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1405                                 dec_unacked(epoch->connection);
1406 #endif
1407
1408                         if (connection->current_epoch != epoch) {
1409                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1410                                 list_del(&epoch->list);
1411                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1412                                 connection->epochs--;
1413                                 kfree(epoch);
1414
1415                                 if (rv == FE_STILL_LIVE)
1416                                         rv = FE_DESTROYED;
1417                         } else {
1418                                 epoch->flags = 0;
1419                                 atomic_set(&epoch->epoch_size, 0);
1420                                 /* atomic_set(&epoch->active, 0); is already zero */
1421                                 if (rv == FE_STILL_LIVE)
1422                                         rv = FE_RECYCLED;
1423                         }
1424                 }
1425
1426                 if (!next_epoch)
1427                         break;
1428
1429                 epoch = next_epoch;
1430         } while (1);
1431
1432         spin_unlock(&connection->epoch_lock);
1433
1434         return rv;
1435 }
1436
1437 static enum write_ordering_e
1438 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1439 {
1440         struct disk_conf *dc;
1441
1442         dc = rcu_dereference(bdev->disk_conf);
1443
1444         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1445                 wo = WO_DRAIN_IO;
1446         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1447                 wo = WO_NONE;
1448
1449         return wo;
1450 }
1451
1452 /**
1453  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1454  * @connection: DRBD connection.
1455  * @wo:         Write ordering method to try.
1456  */
1457 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1458                               enum write_ordering_e wo)
1459 {
1460         struct drbd_device *device;
1461         enum write_ordering_e pwo;
1462         int vnr;
1463         static char *write_ordering_str[] = {
1464                 [WO_NONE] = "none",
1465                 [WO_DRAIN_IO] = "drain",
1466                 [WO_BDEV_FLUSH] = "flush",
1467         };
1468
1469         pwo = resource->write_ordering;
1470         if (wo != WO_BDEV_FLUSH)
1471                 wo = min(pwo, wo);
1472         rcu_read_lock();
1473         idr_for_each_entry(&resource->devices, device, vnr) {
1474                 if (get_ldev(device)) {
1475                         wo = max_allowed_wo(device->ldev, wo);
1476                         if (device->ldev == bdev)
1477                                 bdev = NULL;
1478                         put_ldev(device);
1479                 }
1480         }
1481
1482         if (bdev)
1483                 wo = max_allowed_wo(bdev, wo);
1484
1485         rcu_read_unlock();
1486
1487         resource->write_ordering = wo;
1488         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1489                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1490 }
1491
1492 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1493 {
1494         struct block_device *bdev = device->ldev->backing_bdev;
1495
1496         if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1497                         GFP_NOIO, 0))
1498                 peer_req->flags |= EE_WAS_ERROR;
1499
1500         drbd_endio_write_sec_final(peer_req);
1501 }
1502
1503 static void drbd_issue_peer_wsame(struct drbd_device *device,
1504                                   struct drbd_peer_request *peer_req)
1505 {
1506         struct block_device *bdev = device->ldev->backing_bdev;
1507         sector_t s = peer_req->i.sector;
1508         sector_t nr = peer_req->i.size >> 9;
1509         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1510                 peer_req->flags |= EE_WAS_ERROR;
1511         drbd_endio_write_sec_final(peer_req);
1512 }
1513
1514
1515 /**
1516  * drbd_submit_peer_request()
1517  * @device:     DRBD device.
1518  * @peer_req:   peer request
1519  * @rw:         flag field, see bio->bi_opf
1520  *
1521  * May spread the pages to multiple bios,
1522  * depending on bio_add_page restrictions.
1523  *
1524  * Returns 0 if all bios have been submitted,
1525  * -ENOMEM if we could not allocate enough bios,
1526  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1527  *  single page to an empty bio (which should never happen and likely indicates
1528  *  that the lower level IO stack is in some way broken). This has been observed
1529  *  on certain Xen deployments.
1530  */
1531 /* TODO allocate from our own bio_set. */
1532 int drbd_submit_peer_request(struct drbd_device *device,
1533                              struct drbd_peer_request *peer_req,
1534                              const unsigned op, const unsigned op_flags,
1535                              const int fault_type)
1536 {
1537         struct bio *bios = NULL;
1538         struct bio *bio;
1539         struct page *page = peer_req->pages;
1540         sector_t sector = peer_req->i.sector;
1541         unsigned data_size = peer_req->i.size;
1542         unsigned n_bios = 0;
1543         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1544         int err = -ENOMEM;
1545
1546         /* TRIM/DISCARD: for now, always use the helper function
1547          * blkdev_issue_zeroout(..., discard=true).
1548          * It's synchronous, but it does the right thing wrt. bio splitting.
1549          * Correctness first, performance later.  Next step is to code an
1550          * asynchronous variant of the same.
1551          */
1552         if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1553                 /* wait for all pending IO completions, before we start
1554                  * zeroing things out. */
1555                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1556                 /* add it to the active list now,
1557                  * so we can find it to present it in debugfs */
1558                 peer_req->submit_jif = jiffies;
1559                 peer_req->flags |= EE_SUBMITTED;
1560
1561                 /* If this was a resync request from receive_rs_deallocated(),
1562                  * it is already on the sync_ee list */
1563                 if (list_empty(&peer_req->w.list)) {
1564                         spin_lock_irq(&device->resource->req_lock);
1565                         list_add_tail(&peer_req->w.list, &device->active_ee);
1566                         spin_unlock_irq(&device->resource->req_lock);
1567                 }
1568
1569                 if (peer_req->flags & EE_IS_TRIM)
1570                         drbd_issue_peer_discard(device, peer_req);
1571                 else /* EE_WRITE_SAME */
1572                         drbd_issue_peer_wsame(device, peer_req);
1573                 return 0;
1574         }
1575
1576         /* In most cases, we will only need one bio.  But in case the lower
1577          * level restrictions happen to be different at this offset on this
1578          * side than those of the sending peer, we may need to submit the
1579          * request in more than one bio.
1580          *
1581          * Plain bio_alloc is good enough here, this is no DRBD internally
1582          * generated bio, but a bio allocated on behalf of the peer.
1583          */
1584 next_bio:
1585         bio = bio_alloc(GFP_NOIO, nr_pages);
1586         if (!bio) {
1587                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1588                 goto fail;
1589         }
1590         /* > peer_req->i.sector, unless this is the first bio */
1591         bio->bi_iter.bi_sector = sector;
1592         bio_set_dev(bio, device->ldev->backing_bdev);
1593         bio_set_op_attrs(bio, op, op_flags);
1594         bio->bi_private = peer_req;
1595         bio->bi_end_io = drbd_peer_request_endio;
1596
1597         bio->bi_next = bios;
1598         bios = bio;
1599         ++n_bios;
1600
1601         page_chain_for_each(page) {
1602                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1603                 if (!bio_add_page(bio, page, len, 0))
1604                         goto next_bio;
1605                 data_size -= len;
1606                 sector += len >> 9;
1607                 --nr_pages;
1608         }
1609         D_ASSERT(device, data_size == 0);
1610         D_ASSERT(device, page == NULL);
1611
1612         atomic_set(&peer_req->pending_bios, n_bios);
1613         /* for debugfs: update timestamp, mark as submitted */
1614         peer_req->submit_jif = jiffies;
1615         peer_req->flags |= EE_SUBMITTED;
1616         do {
1617                 bio = bios;
1618                 bios = bios->bi_next;
1619                 bio->bi_next = NULL;
1620
1621                 drbd_generic_make_request(device, fault_type, bio);
1622         } while (bios);
1623         return 0;
1624
1625 fail:
1626         while (bios) {
1627                 bio = bios;
1628                 bios = bios->bi_next;
1629                 bio_put(bio);
1630         }
1631         return err;
1632 }
1633
1634 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1635                                              struct drbd_peer_request *peer_req)
1636 {
1637         struct drbd_interval *i = &peer_req->i;
1638
1639         drbd_remove_interval(&device->write_requests, i);
1640         drbd_clear_interval(i);
1641
1642         /* Wake up any processes waiting for this peer request to complete.  */
1643         if (i->waiting)
1644                 wake_up(&device->misc_wait);
1645 }
1646
1647 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1648 {
1649         struct drbd_peer_device *peer_device;
1650         int vnr;
1651
1652         rcu_read_lock();
1653         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1654                 struct drbd_device *device = peer_device->device;
1655
1656                 kref_get(&device->kref);
1657                 rcu_read_unlock();
1658                 drbd_wait_ee_list_empty(device, &device->active_ee);
1659                 kref_put(&device->kref, drbd_destroy_device);
1660                 rcu_read_lock();
1661         }
1662         rcu_read_unlock();
1663 }
1664
1665 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1666 {
1667         int rv;
1668         struct p_barrier *p = pi->data;
1669         struct drbd_epoch *epoch;
1670
1671         /* FIXME these are unacked on connection,
1672          * not a specific (peer)device.
1673          */
1674         connection->current_epoch->barrier_nr = p->barrier;
1675         connection->current_epoch->connection = connection;
1676         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1677
1678         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1679          * the activity log, which means it would not be resynced in case the
1680          * R_PRIMARY crashes now.
1681          * Therefore we must send the barrier_ack after the barrier request was
1682          * completed. */
1683         switch (connection->resource->write_ordering) {
1684         case WO_NONE:
1685                 if (rv == FE_RECYCLED)
1686                         return 0;
1687
1688                 /* receiver context, in the writeout path of the other node.
1689                  * avoid potential distributed deadlock */
1690                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1691                 if (epoch)
1692                         break;
1693                 else
1694                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1695                         /* Fall through */
1696
1697         case WO_BDEV_FLUSH:
1698         case WO_DRAIN_IO:
1699                 conn_wait_active_ee_empty(connection);
1700                 drbd_flush(connection);
1701
1702                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1703                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1704                         if (epoch)
1705                                 break;
1706                 }
1707
1708                 return 0;
1709         default:
1710                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1711                          connection->resource->write_ordering);
1712                 return -EIO;
1713         }
1714
1715         epoch->flags = 0;
1716         atomic_set(&epoch->epoch_size, 0);
1717         atomic_set(&epoch->active, 0);
1718
1719         spin_lock(&connection->epoch_lock);
1720         if (atomic_read(&connection->current_epoch->epoch_size)) {
1721                 list_add(&epoch->list, &connection->current_epoch->list);
1722                 connection->current_epoch = epoch;
1723                 connection->epochs++;
1724         } else {
1725                 /* The current_epoch got recycled while we allocated this one... */
1726                 kfree(epoch);
1727         }
1728         spin_unlock(&connection->epoch_lock);
1729
1730         return 0;
1731 }
1732
1733 /* quick wrapper in case payload size != request_size (write same) */
1734 static void drbd_csum_ee_size(struct crypto_ahash *h,
1735                               struct drbd_peer_request *r, void *d,
1736                               unsigned int payload_size)
1737 {
1738         unsigned int tmp = r->i.size;
1739         r->i.size = payload_size;
1740         drbd_csum_ee(h, r, d);
1741         r->i.size = tmp;
1742 }
1743
1744 /* used from receive_RSDataReply (recv_resync_read)
1745  * and from receive_Data.
1746  * data_size: actual payload ("data in")
1747  *      for normal writes that is bi_size.
1748  *      for discards, that is zero.
1749  *      for write same, it is logical_block_size.
1750  * both trim and write same have the bi_size ("data len to be affected")
1751  * as extra argument in the packet header.
1752  */
1753 static struct drbd_peer_request *
1754 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1755               struct packet_info *pi) __must_hold(local)
1756 {
1757         struct drbd_device *device = peer_device->device;
1758         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1759         struct drbd_peer_request *peer_req;
1760         struct page *page;
1761         int digest_size, err;
1762         unsigned int data_size = pi->size, ds;
1763         void *dig_in = peer_device->connection->int_dig_in;
1764         void *dig_vv = peer_device->connection->int_dig_vv;
1765         unsigned long *data;
1766         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1767         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1768
1769         digest_size = 0;
1770         if (!trim && peer_device->connection->peer_integrity_tfm) {
1771                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1772                 /*
1773                  * FIXME: Receive the incoming digest into the receive buffer
1774                  *        here, together with its struct p_data?
1775                  */
1776                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1777                 if (err)
1778                         return NULL;
1779                 data_size -= digest_size;
1780         }
1781
1782         /* assume request_size == data_size, but special case trim and wsame. */
1783         ds = data_size;
1784         if (trim) {
1785                 if (!expect(data_size == 0))
1786                         return NULL;
1787                 ds = be32_to_cpu(trim->size);
1788         } else if (wsame) {
1789                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1790                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1791                                 data_size, queue_logical_block_size(device->rq_queue));
1792                         return NULL;
1793                 }
1794                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1795                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1796                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1797                         return NULL;
1798                 }
1799                 ds = be32_to_cpu(wsame->size);
1800         }
1801
1802         if (!expect(IS_ALIGNED(ds, 512)))
1803                 return NULL;
1804         if (trim || wsame) {
1805                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1806                         return NULL;
1807         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1808                 return NULL;
1809
1810         /* even though we trust out peer,
1811          * we sometimes have to double check. */
1812         if (sector + (ds>>9) > capacity) {
1813                 drbd_err(device, "request from peer beyond end of local disk: "
1814                         "capacity: %llus < sector: %llus + size: %u\n",
1815                         (unsigned long long)capacity,
1816                         (unsigned long long)sector, ds);
1817                 return NULL;
1818         }
1819
1820         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1821          * "criss-cross" setup, that might cause write-out on some other DRBD,
1822          * which in turn might block on the other node at this very place.  */
1823         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1824         if (!peer_req)
1825                 return NULL;
1826
1827         peer_req->flags |= EE_WRITE;
1828         if (trim) {
1829                 peer_req->flags |= EE_IS_TRIM;
1830                 return peer_req;
1831         }
1832         if (wsame)
1833                 peer_req->flags |= EE_WRITE_SAME;
1834
1835         /* receive payload size bytes into page chain */
1836         ds = data_size;
1837         page = peer_req->pages;
1838         page_chain_for_each(page) {
1839                 unsigned len = min_t(int, ds, PAGE_SIZE);
1840                 data = kmap(page);
1841                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1842                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1843                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1844                         data[0] = data[0] ^ (unsigned long)-1;
1845                 }
1846                 kunmap(page);
1847                 if (err) {
1848                         drbd_free_peer_req(device, peer_req);
1849                         return NULL;
1850                 }
1851                 ds -= len;
1852         }
1853
1854         if (digest_size) {
1855                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1856                 if (memcmp(dig_in, dig_vv, digest_size)) {
1857                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1858                                 (unsigned long long)sector, data_size);
1859                         drbd_free_peer_req(device, peer_req);
1860                         return NULL;
1861                 }
1862         }
1863         device->recv_cnt += data_size >> 9;
1864         return peer_req;
1865 }
1866
1867 /* drbd_drain_block() just takes a data block
1868  * out of the socket input buffer, and discards it.
1869  */
1870 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1871 {
1872         struct page *page;
1873         int err = 0;
1874         void *data;
1875
1876         if (!data_size)
1877                 return 0;
1878
1879         page = drbd_alloc_pages(peer_device, 1, 1);
1880
1881         data = kmap(page);
1882         while (data_size) {
1883                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1884
1885                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1886                 if (err)
1887                         break;
1888                 data_size -= len;
1889         }
1890         kunmap(page);
1891         drbd_free_pages(peer_device->device, page, 0);
1892         return err;
1893 }
1894
1895 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1896                            sector_t sector, int data_size)
1897 {
1898         struct bio_vec bvec;
1899         struct bvec_iter iter;
1900         struct bio *bio;
1901         int digest_size, err, expect;
1902         void *dig_in = peer_device->connection->int_dig_in;
1903         void *dig_vv = peer_device->connection->int_dig_vv;
1904
1905         digest_size = 0;
1906         if (peer_device->connection->peer_integrity_tfm) {
1907                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1908                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1909                 if (err)
1910                         return err;
1911                 data_size -= digest_size;
1912         }
1913
1914         /* optimistically update recv_cnt.  if receiving fails below,
1915          * we disconnect anyways, and counters will be reset. */
1916         peer_device->device->recv_cnt += data_size>>9;
1917
1918         bio = req->master_bio;
1919         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1920
1921         bio_for_each_segment(bvec, bio, iter) {
1922                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1923                 expect = min_t(int, data_size, bvec.bv_len);
1924                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1925                 kunmap(bvec.bv_page);
1926                 if (err)
1927                         return err;
1928                 data_size -= expect;
1929         }
1930
1931         if (digest_size) {
1932                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1933                 if (memcmp(dig_in, dig_vv, digest_size)) {
1934                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1935                         return -EINVAL;
1936                 }
1937         }
1938
1939         D_ASSERT(peer_device->device, data_size == 0);
1940         return 0;
1941 }
1942
1943 /*
1944  * e_end_resync_block() is called in ack_sender context via
1945  * drbd_finish_peer_reqs().
1946  */
1947 static int e_end_resync_block(struct drbd_work *w, int unused)
1948 {
1949         struct drbd_peer_request *peer_req =
1950                 container_of(w, struct drbd_peer_request, w);
1951         struct drbd_peer_device *peer_device = peer_req->peer_device;
1952         struct drbd_device *device = peer_device->device;
1953         sector_t sector = peer_req->i.sector;
1954         int err;
1955
1956         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1957
1958         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1959                 drbd_set_in_sync(device, sector, peer_req->i.size);
1960                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1961         } else {
1962                 /* Record failure to sync */
1963                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1964
1965                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1966         }
1967         dec_unacked(device);
1968
1969         return err;
1970 }
1971
1972 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1973                             struct packet_info *pi) __releases(local)
1974 {
1975         struct drbd_device *device = peer_device->device;
1976         struct drbd_peer_request *peer_req;
1977
1978         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1979         if (!peer_req)
1980                 goto fail;
1981
1982         dec_rs_pending(device);
1983
1984         inc_unacked(device);
1985         /* corresponding dec_unacked() in e_end_resync_block()
1986          * respective _drbd_clear_done_ee */
1987
1988         peer_req->w.cb = e_end_resync_block;
1989         peer_req->submit_jif = jiffies;
1990
1991         spin_lock_irq(&device->resource->req_lock);
1992         list_add_tail(&peer_req->w.list, &device->sync_ee);
1993         spin_unlock_irq(&device->resource->req_lock);
1994
1995         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1996         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1997                                      DRBD_FAULT_RS_WR) == 0)
1998                 return 0;
1999
2000         /* don't care for the reason here */
2001         drbd_err(device, "submit failed, triggering re-connect\n");
2002         spin_lock_irq(&device->resource->req_lock);
2003         list_del(&peer_req->w.list);
2004         spin_unlock_irq(&device->resource->req_lock);
2005
2006         drbd_free_peer_req(device, peer_req);
2007 fail:
2008         put_ldev(device);
2009         return -EIO;
2010 }
2011
2012 static struct drbd_request *
2013 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2014              sector_t sector, bool missing_ok, const char *func)
2015 {
2016         struct drbd_request *req;
2017
2018         /* Request object according to our peer */
2019         req = (struct drbd_request *)(unsigned long)id;
2020         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2021                 return req;
2022         if (!missing_ok) {
2023                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2024                         (unsigned long)id, (unsigned long long)sector);
2025         }
2026         return NULL;
2027 }
2028
2029 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2030 {
2031         struct drbd_peer_device *peer_device;
2032         struct drbd_device *device;
2033         struct drbd_request *req;
2034         sector_t sector;
2035         int err;
2036         struct p_data *p = pi->data;
2037
2038         peer_device = conn_peer_device(connection, pi->vnr);
2039         if (!peer_device)
2040                 return -EIO;
2041         device = peer_device->device;
2042
2043         sector = be64_to_cpu(p->sector);
2044
2045         spin_lock_irq(&device->resource->req_lock);
2046         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2047         spin_unlock_irq(&device->resource->req_lock);
2048         if (unlikely(!req))
2049                 return -EIO;
2050
2051         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2052          * special casing it there for the various failure cases.
2053          * still no race with drbd_fail_pending_reads */
2054         err = recv_dless_read(peer_device, req, sector, pi->size);
2055         if (!err)
2056                 req_mod(req, DATA_RECEIVED);
2057         /* else: nothing. handled from drbd_disconnect...
2058          * I don't think we may complete this just yet
2059          * in case we are "on-disconnect: freeze" */
2060
2061         return err;
2062 }
2063
2064 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2065 {
2066         struct drbd_peer_device *peer_device;
2067         struct drbd_device *device;
2068         sector_t sector;
2069         int err;
2070         struct p_data *p = pi->data;
2071
2072         peer_device = conn_peer_device(connection, pi->vnr);
2073         if (!peer_device)
2074                 return -EIO;
2075         device = peer_device->device;
2076
2077         sector = be64_to_cpu(p->sector);
2078         D_ASSERT(device, p->block_id == ID_SYNCER);
2079
2080         if (get_ldev(device)) {
2081                 /* data is submitted to disk within recv_resync_read.
2082                  * corresponding put_ldev done below on error,
2083                  * or in drbd_peer_request_endio. */
2084                 err = recv_resync_read(peer_device, sector, pi);
2085         } else {
2086                 if (__ratelimit(&drbd_ratelimit_state))
2087                         drbd_err(device, "Can not write resync data to local disk.\n");
2088
2089                 err = drbd_drain_block(peer_device, pi->size);
2090
2091                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2092         }
2093
2094         atomic_add(pi->size >> 9, &device->rs_sect_in);
2095
2096         return err;
2097 }
2098
2099 static void restart_conflicting_writes(struct drbd_device *device,
2100                                        sector_t sector, int size)
2101 {
2102         struct drbd_interval *i;
2103         struct drbd_request *req;
2104
2105         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2106                 if (!i->local)
2107                         continue;
2108                 req = container_of(i, struct drbd_request, i);
2109                 if (req->rq_state & RQ_LOCAL_PENDING ||
2110                     !(req->rq_state & RQ_POSTPONED))
2111                         continue;
2112                 /* as it is RQ_POSTPONED, this will cause it to
2113                  * be queued on the retry workqueue. */
2114                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2115         }
2116 }
2117
2118 /*
2119  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2120  */
2121 static int e_end_block(struct drbd_work *w, int cancel)
2122 {
2123         struct drbd_peer_request *peer_req =
2124                 container_of(w, struct drbd_peer_request, w);
2125         struct drbd_peer_device *peer_device = peer_req->peer_device;
2126         struct drbd_device *device = peer_device->device;
2127         sector_t sector = peer_req->i.sector;
2128         int err = 0, pcmd;
2129
2130         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2131                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2132                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2133                                 device->state.conn <= C_PAUSED_SYNC_T &&
2134                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2135                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2136                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2137                         if (pcmd == P_RS_WRITE_ACK)
2138                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2139                 } else {
2140                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2141                         /* we expect it to be marked out of sync anyways...
2142                          * maybe assert this?  */
2143                 }
2144                 dec_unacked(device);
2145         }
2146
2147         /* we delete from the conflict detection hash _after_ we sent out the
2148          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2149         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2150                 spin_lock_irq(&device->resource->req_lock);
2151                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2152                 drbd_remove_epoch_entry_interval(device, peer_req);
2153                 if (peer_req->flags & EE_RESTART_REQUESTS)
2154                         restart_conflicting_writes(device, sector, peer_req->i.size);
2155                 spin_unlock_irq(&device->resource->req_lock);
2156         } else
2157                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2158
2159         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2160
2161         return err;
2162 }
2163
2164 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2165 {
2166         struct drbd_peer_request *peer_req =
2167                 container_of(w, struct drbd_peer_request, w);
2168         struct drbd_peer_device *peer_device = peer_req->peer_device;
2169         int err;
2170
2171         err = drbd_send_ack(peer_device, ack, peer_req);
2172         dec_unacked(peer_device->device);
2173
2174         return err;
2175 }
2176
2177 static int e_send_superseded(struct drbd_work *w, int unused)
2178 {
2179         return e_send_ack(w, P_SUPERSEDED);
2180 }
2181
2182 static int e_send_retry_write(struct drbd_work *w, int unused)
2183 {
2184         struct drbd_peer_request *peer_req =
2185                 container_of(w, struct drbd_peer_request, w);
2186         struct drbd_connection *connection = peer_req->peer_device->connection;
2187
2188         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2189                              P_RETRY_WRITE : P_SUPERSEDED);
2190 }
2191
2192 static bool seq_greater(u32 a, u32 b)
2193 {
2194         /*
2195          * We assume 32-bit wrap-around here.
2196          * For 24-bit wrap-around, we would have to shift:
2197          *  a <<= 8; b <<= 8;
2198          */
2199         return (s32)a - (s32)b > 0;
2200 }
2201
2202 static u32 seq_max(u32 a, u32 b)
2203 {
2204         return seq_greater(a, b) ? a : b;
2205 }
2206
2207 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2208 {
2209         struct drbd_device *device = peer_device->device;
2210         unsigned int newest_peer_seq;
2211
2212         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2213                 spin_lock(&device->peer_seq_lock);
2214                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2215                 device->peer_seq = newest_peer_seq;
2216                 spin_unlock(&device->peer_seq_lock);
2217                 /* wake up only if we actually changed device->peer_seq */
2218                 if (peer_seq == newest_peer_seq)
2219                         wake_up(&device->seq_wait);
2220         }
2221 }
2222
2223 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2224 {
2225         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2226 }
2227
2228 /* maybe change sync_ee into interval trees as well? */
2229 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2230 {
2231         struct drbd_peer_request *rs_req;
2232         bool rv = false;
2233
2234         spin_lock_irq(&device->resource->req_lock);
2235         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2236                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2237                              rs_req->i.sector, rs_req->i.size)) {
2238                         rv = true;
2239                         break;
2240                 }
2241         }
2242         spin_unlock_irq(&device->resource->req_lock);
2243
2244         return rv;
2245 }
2246
2247 /* Called from receive_Data.
2248  * Synchronize packets on sock with packets on msock.
2249  *
2250  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2251  * packet traveling on msock, they are still processed in the order they have
2252  * been sent.
2253  *
2254  * Note: we don't care for Ack packets overtaking P_DATA packets.
2255  *
2256  * In case packet_seq is larger than device->peer_seq number, there are
2257  * outstanding packets on the msock. We wait for them to arrive.
2258  * In case we are the logically next packet, we update device->peer_seq
2259  * ourselves. Correctly handles 32bit wrap around.
2260  *
2261  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2262  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2263  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2264  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2265  *
2266  * returns 0 if we may process the packet,
2267  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2268 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2269 {
2270         struct drbd_device *device = peer_device->device;
2271         DEFINE_WAIT(wait);
2272         long timeout;
2273         int ret = 0, tp;
2274
2275         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2276                 return 0;
2277
2278         spin_lock(&device->peer_seq_lock);
2279         for (;;) {
2280                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2281                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2282                         break;
2283                 }
2284
2285                 if (signal_pending(current)) {
2286                         ret = -ERESTARTSYS;
2287                         break;
2288                 }
2289
2290                 rcu_read_lock();
2291                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2292                 rcu_read_unlock();
2293
2294                 if (!tp)
2295                         break;
2296
2297                 /* Only need to wait if two_primaries is enabled */
2298                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2299                 spin_unlock(&device->peer_seq_lock);
2300                 rcu_read_lock();
2301                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2302                 rcu_read_unlock();
2303                 timeout = schedule_timeout(timeout);
2304                 spin_lock(&device->peer_seq_lock);
2305                 if (!timeout) {
2306                         ret = -ETIMEDOUT;
2307                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2308                         break;
2309                 }
2310         }
2311         spin_unlock(&device->peer_seq_lock);
2312         finish_wait(&device->seq_wait, &wait);
2313         return ret;
2314 }
2315
2316 /* see also bio_flags_to_wire()
2317  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2318  * flags and back. We may replicate to other kernel versions. */
2319 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2320 {
2321         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2322                 (dpf & DP_FUA ? REQ_FUA : 0) |
2323                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2324 }
2325
2326 static unsigned long wire_flags_to_bio_op(u32 dpf)
2327 {
2328         if (dpf & DP_DISCARD)
2329                 return REQ_OP_WRITE_ZEROES;
2330         else
2331                 return REQ_OP_WRITE;
2332 }
2333
2334 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2335                                     unsigned int size)
2336 {
2337         struct drbd_interval *i;
2338
2339     repeat:
2340         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2341                 struct drbd_request *req;
2342                 struct bio_and_error m;
2343
2344                 if (!i->local)
2345                         continue;
2346                 req = container_of(i, struct drbd_request, i);
2347                 if (!(req->rq_state & RQ_POSTPONED))
2348                         continue;
2349                 req->rq_state &= ~RQ_POSTPONED;
2350                 __req_mod(req, NEG_ACKED, &m);
2351                 spin_unlock_irq(&device->resource->req_lock);
2352                 if (m.bio)
2353                         complete_master_bio(device, &m);
2354                 spin_lock_irq(&device->resource->req_lock);
2355                 goto repeat;
2356         }
2357 }
2358
2359 static int handle_write_conflicts(struct drbd_device *device,
2360                                   struct drbd_peer_request *peer_req)
2361 {
2362         struct drbd_connection *connection = peer_req->peer_device->connection;
2363         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2364         sector_t sector = peer_req->i.sector;
2365         const unsigned int size = peer_req->i.size;
2366         struct drbd_interval *i;
2367         bool equal;
2368         int err;
2369
2370         /*
2371          * Inserting the peer request into the write_requests tree will prevent
2372          * new conflicting local requests from being added.
2373          */
2374         drbd_insert_interval(&device->write_requests, &peer_req->i);
2375
2376     repeat:
2377         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2378                 if (i == &peer_req->i)
2379                         continue;
2380                 if (i->completed)
2381                         continue;
2382
2383                 if (!i->local) {
2384                         /*
2385                          * Our peer has sent a conflicting remote request; this
2386                          * should not happen in a two-node setup.  Wait for the
2387                          * earlier peer request to complete.
2388                          */
2389                         err = drbd_wait_misc(device, i);
2390                         if (err)
2391                                 goto out;
2392                         goto repeat;
2393                 }
2394
2395                 equal = i->sector == sector && i->size == size;
2396                 if (resolve_conflicts) {
2397                         /*
2398                          * If the peer request is fully contained within the
2399                          * overlapping request, it can be considered overwritten
2400                          * and thus superseded; otherwise, it will be retried
2401                          * once all overlapping requests have completed.
2402                          */
2403                         bool superseded = i->sector <= sector && i->sector +
2404                                        (i->size >> 9) >= sector + (size >> 9);
2405
2406                         if (!equal)
2407                                 drbd_alert(device, "Concurrent writes detected: "
2408                                                "local=%llus +%u, remote=%llus +%u, "
2409                                                "assuming %s came first\n",
2410                                           (unsigned long long)i->sector, i->size,
2411                                           (unsigned long long)sector, size,
2412                                           superseded ? "local" : "remote");
2413
2414                         peer_req->w.cb = superseded ? e_send_superseded :
2415                                                    e_send_retry_write;
2416                         list_add_tail(&peer_req->w.list, &device->done_ee);
2417                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2418
2419                         err = -ENOENT;
2420                         goto out;
2421                 } else {
2422                         struct drbd_request *req =
2423                                 container_of(i, struct drbd_request, i);
2424
2425                         if (!equal)
2426                                 drbd_alert(device, "Concurrent writes detected: "
2427                                                "local=%llus +%u, remote=%llus +%u\n",
2428                                           (unsigned long long)i->sector, i->size,
2429                                           (unsigned long long)sector, size);
2430
2431                         if (req->rq_state & RQ_LOCAL_PENDING ||
2432                             !(req->rq_state & RQ_POSTPONED)) {
2433                                 /*
2434                                  * Wait for the node with the discard flag to
2435                                  * decide if this request has been superseded
2436                                  * or needs to be retried.
2437                                  * Requests that have been superseded will
2438                                  * disappear from the write_requests tree.
2439                                  *
2440                                  * In addition, wait for the conflicting
2441                                  * request to finish locally before submitting
2442                                  * the conflicting peer request.
2443                                  */
2444                                 err = drbd_wait_misc(device, &req->i);
2445                                 if (err) {
2446                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2447                                         fail_postponed_requests(device, sector, size);
2448                                         goto out;
2449                                 }
2450                                 goto repeat;
2451                         }
2452                         /*
2453                          * Remember to restart the conflicting requests after
2454                          * the new peer request has completed.
2455                          */
2456                         peer_req->flags |= EE_RESTART_REQUESTS;
2457                 }
2458         }
2459         err = 0;
2460
2461     out:
2462         if (err)
2463                 drbd_remove_epoch_entry_interval(device, peer_req);
2464         return err;
2465 }
2466
2467 /* mirrored write */
2468 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2469 {
2470         struct drbd_peer_device *peer_device;
2471         struct drbd_device *device;
2472         struct net_conf *nc;
2473         sector_t sector;
2474         struct drbd_peer_request *peer_req;
2475         struct p_data *p = pi->data;
2476         u32 peer_seq = be32_to_cpu(p->seq_num);
2477         int op, op_flags;
2478         u32 dp_flags;
2479         int err, tp;
2480
2481         peer_device = conn_peer_device(connection, pi->vnr);
2482         if (!peer_device)
2483                 return -EIO;
2484         device = peer_device->device;
2485
2486         if (!get_ldev(device)) {
2487                 int err2;
2488
2489                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2490                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2491                 atomic_inc(&connection->current_epoch->epoch_size);
2492                 err2 = drbd_drain_block(peer_device, pi->size);
2493                 if (!err)
2494                         err = err2;
2495                 return err;
2496         }
2497
2498         /*
2499          * Corresponding put_ldev done either below (on various errors), or in
2500          * drbd_peer_request_endio, if we successfully submit the data at the
2501          * end of this function.
2502          */
2503
2504         sector = be64_to_cpu(p->sector);
2505         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2506         if (!peer_req) {
2507                 put_ldev(device);
2508                 return -EIO;
2509         }
2510
2511         peer_req->w.cb = e_end_block;
2512         peer_req->submit_jif = jiffies;
2513         peer_req->flags |= EE_APPLICATION;
2514
2515         dp_flags = be32_to_cpu(p->dp_flags);
2516         op = wire_flags_to_bio_op(dp_flags);
2517         op_flags = wire_flags_to_bio_flags(dp_flags);
2518         if (pi->cmd == P_TRIM) {
2519                 D_ASSERT(peer_device, peer_req->i.size > 0);
2520                 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2521                 D_ASSERT(peer_device, peer_req->pages == NULL);
2522         } else if (peer_req->pages == NULL) {
2523                 D_ASSERT(device, peer_req->i.size == 0);
2524                 D_ASSERT(device, dp_flags & DP_FLUSH);
2525         }
2526
2527         if (dp_flags & DP_MAY_SET_IN_SYNC)
2528                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2529
2530         spin_lock(&connection->epoch_lock);
2531         peer_req->epoch = connection->current_epoch;
2532         atomic_inc(&peer_req->epoch->epoch_size);
2533         atomic_inc(&peer_req->epoch->active);
2534         spin_unlock(&connection->epoch_lock);
2535
2536         rcu_read_lock();
2537         nc = rcu_dereference(peer_device->connection->net_conf);
2538         tp = nc->two_primaries;
2539         if (peer_device->connection->agreed_pro_version < 100) {
2540                 switch (nc->wire_protocol) {
2541                 case DRBD_PROT_C:
2542                         dp_flags |= DP_SEND_WRITE_ACK;
2543                         break;
2544                 case DRBD_PROT_B:
2545                         dp_flags |= DP_SEND_RECEIVE_ACK;
2546                         break;
2547                 }
2548         }
2549         rcu_read_unlock();
2550
2551         if (dp_flags & DP_SEND_WRITE_ACK) {
2552                 peer_req->flags |= EE_SEND_WRITE_ACK;
2553                 inc_unacked(device);
2554                 /* corresponding dec_unacked() in e_end_block()
2555                  * respective _drbd_clear_done_ee */
2556         }
2557
2558         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2559                 /* I really don't like it that the receiver thread
2560                  * sends on the msock, but anyways */
2561                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2562         }
2563
2564         if (tp) {
2565                 /* two primaries implies protocol C */
2566                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2567                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2568                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2569                 if (err)
2570                         goto out_interrupted;
2571                 spin_lock_irq(&device->resource->req_lock);
2572                 err = handle_write_conflicts(device, peer_req);
2573                 if (err) {
2574                         spin_unlock_irq(&device->resource->req_lock);
2575                         if (err == -ENOENT) {
2576                                 put_ldev(device);
2577                                 return 0;
2578                         }
2579                         goto out_interrupted;
2580                 }
2581         } else {
2582                 update_peer_seq(peer_device, peer_seq);
2583                 spin_lock_irq(&device->resource->req_lock);
2584         }
2585         /* TRIM and WRITE_SAME are processed synchronously,
2586          * we wait for all pending requests, respectively wait for
2587          * active_ee to become empty in drbd_submit_peer_request();
2588          * better not add ourselves here. */
2589         if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2590                 list_add_tail(&peer_req->w.list, &device->active_ee);
2591         spin_unlock_irq(&device->resource->req_lock);
2592
2593         if (device->state.conn == C_SYNC_TARGET)
2594                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2595
2596         if (device->state.pdsk < D_INCONSISTENT) {
2597                 /* In case we have the only disk of the cluster, */
2598                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2599                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2600                 drbd_al_begin_io(device, &peer_req->i);
2601                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2602         }
2603
2604         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2605                                        DRBD_FAULT_DT_WR);
2606         if (!err)
2607                 return 0;
2608
2609         /* don't care for the reason here */
2610         drbd_err(device, "submit failed, triggering re-connect\n");
2611         spin_lock_irq(&device->resource->req_lock);
2612         list_del(&peer_req->w.list);
2613         drbd_remove_epoch_entry_interval(device, peer_req);
2614         spin_unlock_irq(&device->resource->req_lock);
2615         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2616                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2617                 drbd_al_complete_io(device, &peer_req->i);
2618         }
2619
2620 out_interrupted:
2621         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2622         put_ldev(device);
2623         drbd_free_peer_req(device, peer_req);
2624         return err;
2625 }
2626
2627 /* We may throttle resync, if the lower device seems to be busy,
2628  * and current sync rate is above c_min_rate.
2629  *
2630  * To decide whether or not the lower device is busy, we use a scheme similar
2631  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2632  * (more than 64 sectors) of activity we cannot account for with our own resync
2633  * activity, it obviously is "busy".
2634  *
2635  * The current sync rate used here uses only the most recent two step marks,
2636  * to have a short time average so we can react faster.
2637  */
2638 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2639                 bool throttle_if_app_is_waiting)
2640 {
2641         struct lc_element *tmp;
2642         bool throttle = drbd_rs_c_min_rate_throttle(device);
2643
2644         if (!throttle || throttle_if_app_is_waiting)
2645                 return throttle;
2646
2647         spin_lock_irq(&device->al_lock);
2648         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2649         if (tmp) {
2650                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2651                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2652                         throttle = false;
2653                 /* Do not slow down if app IO is already waiting for this extent,
2654                  * and our progress is necessary for application IO to complete. */
2655         }
2656         spin_unlock_irq(&device->al_lock);
2657
2658         return throttle;
2659 }
2660
2661 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2662 {
2663         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2664         unsigned long db, dt, dbdt;
2665         unsigned int c_min_rate;
2666         int curr_events;
2667
2668         rcu_read_lock();
2669         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2670         rcu_read_unlock();
2671
2672         /* feature disabled? */
2673         if (c_min_rate == 0)
2674                 return false;
2675
2676         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2677                       (int)part_stat_read(&disk->part0, sectors[1]) -
2678                         atomic_read(&device->rs_sect_ev);
2679
2680         if (atomic_read(&device->ap_actlog_cnt)
2681             || curr_events - device->rs_last_events > 64) {
2682                 unsigned long rs_left;
2683                 int i;
2684
2685                 device->rs_last_events = curr_events;
2686
2687                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2688                  * approx. */
2689                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2690
2691                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2692                         rs_left = device->ov_left;
2693                 else
2694                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2695
2696                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2697                 if (!dt)
2698                         dt++;
2699                 db = device->rs_mark_left[i] - rs_left;
2700                 dbdt = Bit2KB(db/dt);
2701
2702                 if (dbdt > c_min_rate)
2703                         return true;
2704         }
2705         return false;
2706 }
2707
2708 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2709 {
2710         struct drbd_peer_device *peer_device;
2711         struct drbd_device *device;
2712         sector_t sector;
2713         sector_t capacity;
2714         struct drbd_peer_request *peer_req;
2715         struct digest_info *di = NULL;
2716         int size, verb;
2717         unsigned int fault_type;
2718         struct p_block_req *p = pi->data;
2719
2720         peer_device = conn_peer_device(connection, pi->vnr);
2721         if (!peer_device)
2722                 return -EIO;
2723         device = peer_device->device;
2724         capacity = drbd_get_capacity(device->this_bdev);
2725
2726         sector = be64_to_cpu(p->sector);
2727         size   = be32_to_cpu(p->blksize);
2728
2729         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2730                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2731                                 (unsigned long long)sector, size);
2732                 return -EINVAL;
2733         }
2734         if (sector + (size>>9) > capacity) {
2735                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2736                                 (unsigned long long)sector, size);
2737                 return -EINVAL;
2738         }
2739
2740         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2741                 verb = 1;
2742                 switch (pi->cmd) {
2743                 case P_DATA_REQUEST:
2744                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2745                         break;
2746                 case P_RS_THIN_REQ:
2747                 case P_RS_DATA_REQUEST:
2748                 case P_CSUM_RS_REQUEST:
2749                 case P_OV_REQUEST:
2750                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2751                         break;
2752                 case P_OV_REPLY:
2753                         verb = 0;
2754                         dec_rs_pending(device);
2755                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2756                         break;
2757                 default:
2758                         BUG();
2759                 }
2760                 if (verb && __ratelimit(&drbd_ratelimit_state))
2761                         drbd_err(device, "Can not satisfy peer's read request, "
2762                             "no local data.\n");
2763
2764                 /* drain possibly payload */
2765                 return drbd_drain_block(peer_device, pi->size);
2766         }
2767
2768         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2769          * "criss-cross" setup, that might cause write-out on some other DRBD,
2770          * which in turn might block on the other node at this very place.  */
2771         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2772                         size, GFP_NOIO);
2773         if (!peer_req) {
2774                 put_ldev(device);
2775                 return -ENOMEM;
2776         }
2777
2778         switch (pi->cmd) {
2779         case P_DATA_REQUEST:
2780                 peer_req->w.cb = w_e_end_data_req;
2781                 fault_type = DRBD_FAULT_DT_RD;
2782                 /* application IO, don't drbd_rs_begin_io */
2783                 peer_req->flags |= EE_APPLICATION;
2784                 goto submit;
2785
2786         case P_RS_THIN_REQ:
2787                 /* If at some point in the future we have a smart way to
2788                    find out if this data block is completely deallocated,
2789                    then we would do something smarter here than reading
2790                    the block... */
2791                 peer_req->flags |= EE_RS_THIN_REQ;
2792         case P_RS_DATA_REQUEST:
2793                 peer_req->w.cb = w_e_end_rsdata_req;
2794                 fault_type = DRBD_FAULT_RS_RD;
2795                 /* used in the sector offset progress display */
2796                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2797                 break;
2798
2799         case P_OV_REPLY:
2800         case P_CSUM_RS_REQUEST:
2801                 fault_type = DRBD_FAULT_RS_RD;
2802                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2803                 if (!di)
2804                         goto out_free_e;
2805
2806                 di->digest_size = pi->size;
2807                 di->digest = (((char *)di)+sizeof(struct digest_info));
2808
2809                 peer_req->digest = di;
2810                 peer_req->flags |= EE_HAS_DIGEST;
2811
2812                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2813                         goto out_free_e;
2814
2815                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2816                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2817                         peer_req->w.cb = w_e_end_csum_rs_req;
2818                         /* used in the sector offset progress display */
2819                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2820                         /* remember to report stats in drbd_resync_finished */
2821                         device->use_csums = true;
2822                 } else if (pi->cmd == P_OV_REPLY) {
2823                         /* track progress, we may need to throttle */
2824                         atomic_add(size >> 9, &device->rs_sect_in);
2825                         peer_req->w.cb = w_e_end_ov_reply;
2826                         dec_rs_pending(device);
2827                         /* drbd_rs_begin_io done when we sent this request,
2828                          * but accounting still needs to be done. */
2829                         goto submit_for_resync;
2830                 }
2831                 break;
2832
2833         case P_OV_REQUEST:
2834                 if (device->ov_start_sector == ~(sector_t)0 &&
2835                     peer_device->connection->agreed_pro_version >= 90) {
2836                         unsigned long now = jiffies;
2837                         int i;
2838                         device->ov_start_sector = sector;
2839                         device->ov_position = sector;
2840                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2841                         device->rs_total = device->ov_left;
2842                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2843                                 device->rs_mark_left[i] = device->ov_left;
2844                                 device->rs_mark_time[i] = now;
2845                         }
2846                         drbd_info(device, "Online Verify start sector: %llu\n",
2847                                         (unsigned long long)sector);
2848                 }
2849                 peer_req->w.cb = w_e_end_ov_req;
2850                 fault_type = DRBD_FAULT_RS_RD;
2851                 break;
2852
2853         default:
2854                 BUG();
2855         }
2856
2857         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2858          * wrt the receiver, but it is not as straightforward as it may seem.
2859          * Various places in the resync start and stop logic assume resync
2860          * requests are processed in order, requeuing this on the worker thread
2861          * introduces a bunch of new code for synchronization between threads.
2862          *
2863          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2864          * "forever", throttling after drbd_rs_begin_io will lock that extent
2865          * for application writes for the same time.  For now, just throttle
2866          * here, where the rest of the code expects the receiver to sleep for
2867          * a while, anyways.
2868          */
2869
2870         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2871          * this defers syncer requests for some time, before letting at least
2872          * on request through.  The resync controller on the receiving side
2873          * will adapt to the incoming rate accordingly.
2874          *
2875          * We cannot throttle here if remote is Primary/SyncTarget:
2876          * we would also throttle its application reads.
2877          * In that case, throttling is done on the SyncTarget only.
2878          */
2879
2880         /* Even though this may be a resync request, we do add to "read_ee";
2881          * "sync_ee" is only used for resync WRITEs.
2882          * Add to list early, so debugfs can find this request
2883          * even if we have to sleep below. */
2884         spin_lock_irq(&device->resource->req_lock);
2885         list_add_tail(&peer_req->w.list, &device->read_ee);
2886         spin_unlock_irq(&device->resource->req_lock);
2887
2888         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2889         if (device->state.peer != R_PRIMARY
2890         && drbd_rs_should_slow_down(device, sector, false))
2891                 schedule_timeout_uninterruptible(HZ/10);
2892         update_receiver_timing_details(connection, drbd_rs_begin_io);
2893         if (drbd_rs_begin_io(device, sector))
2894                 goto out_free_e;
2895
2896 submit_for_resync:
2897         atomic_add(size >> 9, &device->rs_sect_ev);
2898
2899 submit:
2900         update_receiver_timing_details(connection, drbd_submit_peer_request);
2901         inc_unacked(device);
2902         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2903                                      fault_type) == 0)
2904                 return 0;
2905
2906         /* don't care for the reason here */
2907         drbd_err(device, "submit failed, triggering re-connect\n");
2908
2909 out_free_e:
2910         spin_lock_irq(&device->resource->req_lock);
2911         list_del(&peer_req->w.list);
2912         spin_unlock_irq(&device->resource->req_lock);
2913         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2914
2915         put_ldev(device);
2916         drbd_free_peer_req(device, peer_req);
2917         return -EIO;
2918 }
2919
2920 /**
2921  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2922  */
2923 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2924 {
2925         struct drbd_device *device = peer_device->device;
2926         int self, peer, rv = -100;
2927         unsigned long ch_self, ch_peer;
2928         enum drbd_after_sb_p after_sb_0p;
2929
2930         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2931         peer = device->p_uuid[UI_BITMAP] & 1;
2932
2933         ch_peer = device->p_uuid[UI_SIZE];
2934         ch_self = device->comm_bm_set;
2935
2936         rcu_read_lock();
2937         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2938         rcu_read_unlock();
2939         switch (after_sb_0p) {
2940         case ASB_CONSENSUS:
2941         case ASB_DISCARD_SECONDARY:
2942         case ASB_CALL_HELPER:
2943         case ASB_VIOLENTLY:
2944                 drbd_err(device, "Configuration error.\n");
2945                 break;
2946         case ASB_DISCONNECT:
2947                 break;
2948         case ASB_DISCARD_YOUNGER_PRI:
2949                 if (self == 0 && peer == 1) {
2950                         rv = -1;
2951                         break;
2952                 }
2953                 if (self == 1 && peer == 0) {
2954                         rv =  1;
2955                         break;
2956                 }
2957                 /* Else fall through to one of the other strategies... */
2958         case ASB_DISCARD_OLDER_PRI:
2959                 if (self == 0 && peer == 1) {
2960                         rv = 1;
2961                         break;
2962                 }
2963                 if (self == 1 && peer == 0) {
2964                         rv = -1;
2965                         break;
2966                 }
2967                 /* Else fall through to one of the other strategies... */
2968                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2969                      "Using discard-least-changes instead\n");
2970         case ASB_DISCARD_ZERO_CHG:
2971                 if (ch_peer == 0 && ch_self == 0) {
2972                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2973                                 ? -1 : 1;
2974                         break;
2975                 } else {
2976                         if (ch_peer == 0) { rv =  1; break; }
2977                         if (ch_self == 0) { rv = -1; break; }
2978                 }
2979                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2980                         break;
2981         case ASB_DISCARD_LEAST_CHG:
2982                 if      (ch_self < ch_peer)
2983                         rv = -1;
2984                 else if (ch_self > ch_peer)
2985                         rv =  1;
2986                 else /* ( ch_self == ch_peer ) */
2987                      /* Well, then use something else. */
2988                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2989                                 ? -1 : 1;
2990                 break;
2991         case ASB_DISCARD_LOCAL:
2992                 rv = -1;
2993                 break;
2994         case ASB_DISCARD_REMOTE:
2995                 rv =  1;
2996         }
2997
2998         return rv;
2999 }
3000
3001 /**
3002  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3003  */
3004 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3005 {
3006         struct drbd_device *device = peer_device->device;
3007         int hg, rv = -100;
3008         enum drbd_after_sb_p after_sb_1p;
3009
3010         rcu_read_lock();
3011         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3012         rcu_read_unlock();
3013         switch (after_sb_1p) {
3014         case ASB_DISCARD_YOUNGER_PRI:
3015         case ASB_DISCARD_OLDER_PRI:
3016         case ASB_DISCARD_LEAST_CHG:
3017         case ASB_DISCARD_LOCAL:
3018         case ASB_DISCARD_REMOTE:
3019         case ASB_DISCARD_ZERO_CHG:
3020                 drbd_err(device, "Configuration error.\n");
3021                 break;
3022         case ASB_DISCONNECT:
3023                 break;
3024         case ASB_CONSENSUS:
3025                 hg = drbd_asb_recover_0p(peer_device);
3026                 if (hg == -1 && device->state.role == R_SECONDARY)
3027                         rv = hg;
3028                 if (hg == 1  && device->state.role == R_PRIMARY)
3029                         rv = hg;
3030                 break;
3031         case ASB_VIOLENTLY:
3032                 rv = drbd_asb_recover_0p(peer_device);
3033                 break;
3034         case ASB_DISCARD_SECONDARY:
3035                 return device->state.role == R_PRIMARY ? 1 : -1;
3036         case ASB_CALL_HELPER:
3037                 hg = drbd_asb_recover_0p(peer_device);
3038                 if (hg == -1 && device->state.role == R_PRIMARY) {
3039                         enum drbd_state_rv rv2;
3040
3041                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3042                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3043                           * we do not need to wait for the after state change work either. */
3044                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3045                         if (rv2 != SS_SUCCESS) {
3046                                 drbd_khelper(device, "pri-lost-after-sb");
3047                         } else {
3048                                 drbd_warn(device, "Successfully gave up primary role.\n");
3049                                 rv = hg;
3050                         }
3051                 } else
3052                         rv = hg;
3053         }
3054
3055         return rv;
3056 }
3057
3058 /**
3059  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3060  */
3061 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3062 {
3063         struct drbd_device *device = peer_device->device;
3064         int hg, rv = -100;
3065         enum drbd_after_sb_p after_sb_2p;
3066
3067         rcu_read_lock();
3068         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3069         rcu_read_unlock();
3070         switch (after_sb_2p) {
3071         case ASB_DISCARD_YOUNGER_PRI:
3072         case ASB_DISCARD_OLDER_PRI:
3073         case ASB_DISCARD_LEAST_CHG:
3074         case ASB_DISCARD_LOCAL:
3075         case ASB_DISCARD_REMOTE:
3076         case ASB_CONSENSUS:
3077         case ASB_DISCARD_SECONDARY:
3078         case ASB_DISCARD_ZERO_CHG:
3079                 drbd_err(device, "Configuration error.\n");
3080                 break;
3081         case ASB_VIOLENTLY:
3082                 rv = drbd_asb_recover_0p(peer_device);
3083                 break;
3084         case ASB_DISCONNECT:
3085                 break;
3086         case ASB_CALL_HELPER:
3087                 hg = drbd_asb_recover_0p(peer_device);
3088                 if (hg == -1) {
3089                         enum drbd_state_rv rv2;
3090
3091                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3092                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3093                           * we do not need to wait for the after state change work either. */
3094                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3095                         if (rv2 != SS_SUCCESS) {
3096                                 drbd_khelper(device, "pri-lost-after-sb");
3097                         } else {
3098                                 drbd_warn(device, "Successfully gave up primary role.\n");
3099                                 rv = hg;
3100                         }
3101                 } else
3102                         rv = hg;
3103         }
3104
3105         return rv;
3106 }
3107
3108 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3109                            u64 bits, u64 flags)
3110 {
3111         if (!uuid) {
3112                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3113                 return;
3114         }
3115         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3116              text,
3117              (unsigned long long)uuid[UI_CURRENT],
3118              (unsigned long long)uuid[UI_BITMAP],
3119              (unsigned long long)uuid[UI_HISTORY_START],
3120              (unsigned long long)uuid[UI_HISTORY_END],
3121              (unsigned long long)bits,
3122              (unsigned long long)flags);
3123 }
3124
3125 /*
3126   100   after split brain try auto recover
3127     2   C_SYNC_SOURCE set BitMap
3128     1   C_SYNC_SOURCE use BitMap
3129     0   no Sync
3130    -1   C_SYNC_TARGET use BitMap
3131    -2   C_SYNC_TARGET set BitMap
3132  -100   after split brain, disconnect
3133 -1000   unrelated data
3134 -1091   requires proto 91
3135 -1096   requires proto 96
3136  */
3137
3138 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3139 {
3140         struct drbd_peer_device *const peer_device = first_peer_device(device);
3141         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3142         u64 self, peer;
3143         int i, j;
3144
3145         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3146         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3147
3148         *rule_nr = 10;
3149         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3150                 return 0;
3151
3152         *rule_nr = 20;
3153         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3154              peer != UUID_JUST_CREATED)
3155                 return -2;
3156
3157         *rule_nr = 30;
3158         if (self != UUID_JUST_CREATED &&
3159             (peer == UUID_JUST_CREATED || peer == (u64)0))
3160                 return 2;
3161
3162         if (self == peer) {
3163                 int rct, dc; /* roles at crash time */
3164
3165                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3166
3167                         if (connection->agreed_pro_version < 91)
3168                                 return -1091;
3169
3170                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3171                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3172                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3173                                 drbd_uuid_move_history(device);
3174                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3175                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3176
3177                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3178                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3179                                 *rule_nr = 34;
3180                         } else {
3181                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3182                                 *rule_nr = 36;
3183                         }
3184
3185                         return 1;
3186                 }
3187
3188                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3189
3190                         if (connection->agreed_pro_version < 91)
3191                                 return -1091;
3192
3193                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3194                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3195                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3196
3197                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3198                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3199                                 device->p_uuid[UI_BITMAP] = 0UL;
3200
3201                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3202                                 *rule_nr = 35;
3203                         } else {
3204                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3205                                 *rule_nr = 37;
3206                         }
3207
3208                         return -1;
3209                 }
3210
3211                 /* Common power [off|failure] */
3212                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3213                         (device->p_uuid[UI_FLAGS] & 2);
3214                 /* lowest bit is set when we were primary,
3215                  * next bit (weight 2) is set when peer was primary */
3216                 *rule_nr = 40;
3217
3218                 /* Neither has the "crashed primary" flag set,
3219                  * only a replication link hickup. */
3220                 if (rct == 0)
3221                         return 0;
3222
3223                 /* Current UUID equal and no bitmap uuid; does not necessarily
3224                  * mean this was a "simultaneous hard crash", maybe IO was
3225                  * frozen, so no UUID-bump happened.
3226                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3227                  * for "new-enough" peer DRBD version. */
3228                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3229                         *rule_nr = 41;
3230                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3231                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3232                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3233                         }
3234                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3235                                 /* At least one has the "crashed primary" bit set,
3236                                  * both are primary now, but neither has rotated its UUIDs?
3237                                  * "Can not happen." */
3238                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3239                                 return -100;
3240                         }
3241                         if (device->state.role == R_PRIMARY)
3242                                 return 1;
3243                         return -1;
3244                 }
3245
3246                 /* Both are secondary.
3247                  * Really looks like recovery from simultaneous hard crash.
3248                  * Check which had been primary before, and arbitrate. */
3249                 switch (rct) {
3250                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3251                 case 1: /*  self_pri && !peer_pri */ return 1;
3252                 case 2: /* !self_pri &&  peer_pri */ return -1;
3253                 case 3: /*  self_pri &&  peer_pri */
3254                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3255                         return dc ? -1 : 1;
3256                 }
3257         }
3258
3259         *rule_nr = 50;
3260         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3261         if (self == peer)
3262                 return -1;
3263
3264         *rule_nr = 51;
3265         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3266         if (self == peer) {
3267                 if (connection->agreed_pro_version < 96 ?
3268                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3269                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3270                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3271                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3272                            resync as sync source modifications of the peer's UUIDs. */
3273
3274                         if (connection->agreed_pro_version < 91)
3275                                 return -1091;
3276
3277                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3278                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3279
3280                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3281                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3282
3283                         return -1;
3284                 }
3285         }
3286
3287         *rule_nr = 60;
3288         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3289         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3290                 peer = device->p_uuid[i] & ~((u64)1);
3291                 if (self == peer)
3292                         return -2;
3293         }
3294
3295         *rule_nr = 70;
3296         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3297         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3298         if (self == peer)
3299                 return 1;
3300
3301         *rule_nr = 71;
3302         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3303         if (self == peer) {
3304                 if (connection->agreed_pro_version < 96 ?
3305                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3306                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3307                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3308                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3309                            resync as sync source modifications of our UUIDs. */
3310
3311                         if (connection->agreed_pro_version < 91)
3312                                 return -1091;
3313
3314                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3315                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3316
3317                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3318                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3319                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3320
3321                         return 1;
3322                 }
3323         }
3324
3325
3326         *rule_nr = 80;
3327         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3328         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3329                 self = device->ldev->md.uuid[i] & ~((u64)1);
3330                 if (self == peer)
3331                         return 2;
3332         }
3333
3334         *rule_nr = 90;
3335         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3336         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3337         if (self == peer && self != ((u64)0))
3338                 return 100;
3339
3340         *rule_nr = 100;
3341         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3342                 self = device->ldev->md.uuid[i] & ~((u64)1);
3343                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3344                         peer = device->p_uuid[j] & ~((u64)1);
3345                         if (self == peer)
3346                                 return -100;
3347                 }
3348         }
3349
3350         return -1000;
3351 }
3352
3353 /* drbd_sync_handshake() returns the new conn state on success, or
3354    CONN_MASK (-1) on failure.
3355  */
3356 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3357                                            enum drbd_role peer_role,
3358                                            enum drbd_disk_state peer_disk) __must_hold(local)
3359 {
3360         struct drbd_device *device = peer_device->device;
3361         enum drbd_conns rv = C_MASK;
3362         enum drbd_disk_state mydisk;
3363         struct net_conf *nc;
3364         int hg, rule_nr, rr_conflict, tentative, always_asbp;
3365
3366         mydisk = device->state.disk;
3367         if (mydisk == D_NEGOTIATING)
3368                 mydisk = device->new_state_tmp.disk;
3369
3370         drbd_info(device, "drbd_sync_handshake:\n");
3371
3372         spin_lock_irq(&device->ldev->md.uuid_lock);
3373         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3374         drbd_uuid_dump(device, "peer", device->p_uuid,
3375                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3376
3377         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3378         spin_unlock_irq(&device->ldev->md.uuid_lock);
3379
3380         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3381
3382         if (hg == -1000) {
3383                 drbd_alert(device, "Unrelated data, aborting!\n");
3384                 return C_MASK;
3385         }
3386         if (hg < -0x10000) {
3387                 int proto, fflags;
3388                 hg = -hg;
3389                 proto = hg & 0xff;
3390                 fflags = (hg >> 8) & 0xff;
3391                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3392                                         proto, fflags);
3393                 return C_MASK;
3394         }
3395         if (hg < -1000) {
3396                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3397                 return C_MASK;
3398         }
3399
3400         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3401             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3402                 int f = (hg == -100) || abs(hg) == 2;
3403                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3404                 if (f)
3405                         hg = hg*2;
3406                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3407                      hg > 0 ? "source" : "target");
3408         }
3409
3410         if (abs(hg) == 100)
3411                 drbd_khelper(device, "initial-split-brain");
3412
3413         rcu_read_lock();
3414         nc = rcu_dereference(peer_device->connection->net_conf);
3415         always_asbp = nc->always_asbp;
3416         rr_conflict = nc->rr_conflict;
3417         tentative = nc->tentative;
3418         rcu_read_unlock();
3419
3420         if (hg == 100 || (hg == -100 && always_asbp)) {
3421                 int pcount = (device->state.role == R_PRIMARY)
3422                            + (peer_role == R_PRIMARY);
3423                 int forced = (hg == -100);
3424
3425                 switch (pcount) {
3426                 case 0:
3427                         hg = drbd_asb_recover_0p(peer_device);
3428                         break;
3429                 case 1:
3430                         hg = drbd_asb_recover_1p(peer_device);
3431                         break;
3432                 case 2:
3433                         hg = drbd_asb_recover_2p(peer_device);
3434                         break;
3435                 }
3436                 if (abs(hg) < 100) {
3437                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3438                              "automatically solved. Sync from %s node\n",
3439                              pcount, (hg < 0) ? "peer" : "this");
3440                         if (forced) {
3441                                 drbd_warn(device, "Doing a full sync, since"
3442                                      " UUIDs where ambiguous.\n");
3443                                 hg = hg*2;
3444                         }
3445                 }
3446         }
3447
3448         if (hg == -100) {
3449                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3450                         hg = -1;
3451                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3452                         hg = 1;
3453
3454                 if (abs(hg) < 100)
3455                         drbd_warn(device, "Split-Brain detected, manually solved. "
3456                              "Sync from %s node\n",
3457                              (hg < 0) ? "peer" : "this");
3458         }
3459
3460         if (hg == -100) {
3461                 /* FIXME this log message is not correct if we end up here
3462                  * after an attempted attach on a diskless node.
3463                  * We just refuse to attach -- well, we drop the "connection"
3464                  * to that disk, in a way... */
3465                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3466                 drbd_khelper(device, "split-brain");
3467                 return C_MASK;
3468         }
3469
3470         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3471                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3472                 return C_MASK;
3473         }
3474
3475         if (hg < 0 && /* by intention we do not use mydisk here. */
3476             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3477                 switch (rr_conflict) {
3478                 case ASB_CALL_HELPER:
3479                         drbd_khelper(device, "pri-lost");
3480                         /* fall through */
3481                 case ASB_DISCONNECT:
3482                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3483                         return C_MASK;
3484                 case ASB_VIOLENTLY:
3485                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3486                              "assumption\n");
3487                 }
3488         }
3489
3490         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3491                 if (hg == 0)
3492                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3493                 else
3494                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3495                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3496                                  abs(hg) >= 2 ? "full" : "bit-map based");
3497                 return C_MASK;
3498         }
3499
3500         if (abs(hg) >= 2) {
3501                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3502                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3503                                         BM_LOCKED_SET_ALLOWED))
3504                         return C_MASK;
3505         }
3506
3507         if (hg > 0) { /* become sync source. */
3508                 rv = C_WF_BITMAP_S;
3509         } else if (hg < 0) { /* become sync target */
3510                 rv = C_WF_BITMAP_T;
3511         } else {
3512                 rv = C_CONNECTED;
3513                 if (drbd_bm_total_weight(device)) {
3514                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3515                              drbd_bm_total_weight(device));
3516                 }
3517         }
3518
3519         return rv;
3520 }
3521
3522 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3523 {
3524         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3525         if (peer == ASB_DISCARD_REMOTE)
3526                 return ASB_DISCARD_LOCAL;
3527
3528         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3529         if (peer == ASB_DISCARD_LOCAL)
3530                 return ASB_DISCARD_REMOTE;
3531
3532         /* everything else is valid if they are equal on both sides. */
3533         return peer;
3534 }
3535
3536 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3537 {
3538         struct p_protocol *p = pi->data;
3539         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3540         int p_proto, p_discard_my_data, p_two_primaries, cf;
3541         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3542         char integrity_alg[SHARED_SECRET_MAX] = "";
3543         struct crypto_ahash *peer_integrity_tfm = NULL;
3544         void *int_dig_in = NULL, *int_dig_vv = NULL;
3545
3546         p_proto         = be32_to_cpu(p->protocol);
3547         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3548         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3549         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3550         p_two_primaries = be32_to_cpu(p->two_primaries);
3551         cf              = be32_to_cpu(p->conn_flags);
3552         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3553
3554         if (connection->agreed_pro_version >= 87) {
3555                 int err;
3556
3557                 if (pi->size > sizeof(integrity_alg))
3558                         return -EIO;
3559                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3560                 if (err)
3561                         return err;
3562                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3563         }
3564
3565         if (pi->cmd != P_PROTOCOL_UPDATE) {
3566                 clear_bit(CONN_DRY_RUN, &connection->flags);
3567
3568                 if (cf & CF_DRY_RUN)
3569                         set_bit(CONN_DRY_RUN, &connection->flags);
3570
3571                 rcu_read_lock();
3572                 nc = rcu_dereference(connection->net_conf);
3573
3574                 if (p_proto != nc->wire_protocol) {
3575                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3576                         goto disconnect_rcu_unlock;
3577                 }
3578
3579                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3580                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3581                         goto disconnect_rcu_unlock;
3582                 }
3583
3584                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3585                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3586                         goto disconnect_rcu_unlock;
3587                 }
3588
3589                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3590                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3591                         goto disconnect_rcu_unlock;
3592                 }
3593
3594                 if (p_discard_my_data && nc->discard_my_data) {
3595                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3596                         goto disconnect_rcu_unlock;
3597                 }
3598
3599                 if (p_two_primaries != nc->two_primaries) {
3600                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3601                         goto disconnect_rcu_unlock;
3602                 }
3603
3604                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3605                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3606                         goto disconnect_rcu_unlock;
3607                 }
3608
3609                 rcu_read_unlock();
3610         }
3611
3612         if (integrity_alg[0]) {
3613                 int hash_size;
3614
3615                 /*
3616                  * We can only change the peer data integrity algorithm
3617                  * here.  Changing our own data integrity algorithm
3618                  * requires that we send a P_PROTOCOL_UPDATE packet at
3619                  * the same time; otherwise, the peer has no way to
3620                  * tell between which packets the algorithm should
3621                  * change.
3622                  */
3623
3624                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3625                 if (IS_ERR(peer_integrity_tfm)) {
3626                         peer_integrity_tfm = NULL;
3627                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3628                                  integrity_alg);
3629                         goto disconnect;
3630                 }
3631
3632                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3633                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3634                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3635                 if (!(int_dig_in && int_dig_vv)) {
3636                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3637                         goto disconnect;
3638                 }
3639         }
3640
3641         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3642         if (!new_net_conf) {
3643                 drbd_err(connection, "Allocation of new net_conf failed\n");
3644                 goto disconnect;
3645         }
3646
3647         mutex_lock(&connection->data.mutex);
3648         mutex_lock(&connection->resource->conf_update);
3649         old_net_conf = connection->net_conf;
3650         *new_net_conf = *old_net_conf;
3651
3652         new_net_conf->wire_protocol = p_proto;
3653         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3654         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3655         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3656         new_net_conf->two_primaries = p_two_primaries;
3657
3658         rcu_assign_pointer(connection->net_conf, new_net_conf);
3659         mutex_unlock(&connection->resource->conf_update);
3660         mutex_unlock(&connection->data.mutex);
3661
3662         crypto_free_ahash(connection->peer_integrity_tfm);
3663         kfree(connection->int_dig_in);
3664         kfree(connection->int_dig_vv);
3665         connection->peer_integrity_tfm = peer_integrity_tfm;
3666         connection->int_dig_in = int_dig_in;
3667         connection->int_dig_vv = int_dig_vv;
3668
3669         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3670                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3671                           integrity_alg[0] ? integrity_alg : "(none)");
3672
3673         synchronize_rcu();
3674         kfree(old_net_conf);
3675         return 0;
3676
3677 disconnect_rcu_unlock:
3678         rcu_read_unlock();
3679 disconnect:
3680         crypto_free_ahash(peer_integrity_tfm);
3681         kfree(int_dig_in);
3682         kfree(int_dig_vv);
3683         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3684         return -EIO;
3685 }
3686
3687 /* helper function
3688  * input: alg name, feature name
3689  * return: NULL (alg name was "")
3690  *         ERR_PTR(error) if something goes wrong
3691  *         or the crypto hash ptr, if it worked out ok. */
3692 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3693                 const char *alg, const char *name)
3694 {
3695         struct crypto_ahash *tfm;
3696
3697         if (!alg[0])
3698                 return NULL;
3699
3700         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3701         if (IS_ERR(tfm)) {
3702                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3703                         alg, name, PTR_ERR(tfm));
3704                 return tfm;
3705         }
3706         return tfm;
3707 }
3708
3709 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3710 {
3711         void *buffer = connection->data.rbuf;
3712         int size = pi->size;
3713
3714         while (size) {
3715                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3716                 s = drbd_recv(connection, buffer, s);
3717                 if (s <= 0) {
3718                         if (s < 0)
3719                                 return s;
3720                         break;
3721                 }
3722                 size -= s;
3723         }
3724         if (size)
3725                 return -EIO;
3726         return 0;
3727 }
3728
3729 /*
3730  * config_unknown_volume  -  device configuration command for unknown volume
3731  *
3732  * When a device is added to an existing connection, the node on which the
3733  * device is added first will send configuration commands to its peer but the
3734  * peer will not know about the device yet.  It will warn and ignore these
3735  * commands.  Once the device is added on the second node, the second node will
3736  * send the same device configuration commands, but in the other direction.
3737  *
3738  * (We can also end up here if drbd is misconfigured.)
3739  */
3740 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3741 {
3742         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3743                   cmdname(pi->cmd), pi->vnr);
3744         return ignore_remaining_packet(connection, pi);
3745 }
3746
3747 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3748 {
3749         struct drbd_peer_device *peer_device;
3750         struct drbd_device *device;
3751         struct p_rs_param_95 *p;
3752         unsigned int header_size, data_size, exp_max_sz;
3753         struct crypto_ahash *verify_tfm = NULL;
3754         struct crypto_ahash *csums_tfm = NULL;
3755         struct net_conf *old_net_conf, *new_net_conf = NULL;
3756         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3757         const int apv = connection->agreed_pro_version;
3758         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3759         int fifo_size = 0;
3760         int err;
3761
3762         peer_device = conn_peer_device(connection, pi->vnr);
3763         if (!peer_device)
3764                 return config_unknown_volume(connection, pi);
3765         device = peer_device->device;
3766
3767         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3768                     : apv == 88 ? sizeof(struct p_rs_param)
3769                                         + SHARED_SECRET_MAX
3770                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3771                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3772
3773         if (pi->size > exp_max_sz) {
3774                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3775                     pi->size, exp_max_sz);
3776                 return -EIO;
3777         }
3778
3779         if (apv <= 88) {
3780                 header_size = sizeof(struct p_rs_param);
3781                 data_size = pi->size - header_size;
3782         } else if (apv <= 94) {
3783                 header_size = sizeof(struct p_rs_param_89);
3784                 data_size = pi->size - header_size;
3785                 D_ASSERT(device, data_size == 0);
3786         } else {
3787                 header_size = sizeof(struct p_rs_param_95);
3788                 data_size = pi->size - header_size;
3789                 D_ASSERT(device, data_size == 0);
3790         }
3791
3792         /* initialize verify_alg and csums_alg */
3793         p = pi->data;
3794         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3795
3796         err = drbd_recv_all(peer_device->connection, p, header_size);
3797         if (err)
3798                 return err;
3799
3800         mutex_lock(&connection->resource->conf_update);
3801         old_net_conf = peer_device->connection->net_conf;
3802         if (get_ldev(device)) {
3803                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3804                 if (!new_disk_conf) {
3805                         put_ldev(device);
3806                         mutex_unlock(&connection->resource->conf_update);
3807                         drbd_err(device, "Allocation of new disk_conf failed\n");
3808                         return -ENOMEM;
3809                 }
3810
3811                 old_disk_conf = device->ldev->disk_conf;
3812                 *new_disk_conf = *old_disk_conf;
3813
3814                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3815         }
3816
3817         if (apv >= 88) {
3818                 if (apv == 88) {
3819                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3820                                 drbd_err(device, "verify-alg of wrong size, "
3821                                         "peer wants %u, accepting only up to %u byte\n",
3822                                         data_size, SHARED_SECRET_MAX);
3823                                 err = -EIO;
3824                                 goto reconnect;
3825                         }
3826
3827                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3828                         if (err)
3829                                 goto reconnect;
3830                         /* we expect NUL terminated string */
3831                         /* but just in case someone tries to be evil */
3832                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3833                         p->verify_alg[data_size-1] = 0;
3834
3835                 } else /* apv >= 89 */ {
3836                         /* we still expect NUL terminated strings */
3837                         /* but just in case someone tries to be evil */
3838                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3839                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3840                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3841                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3842                 }
3843
3844                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3845                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3846                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3847                                     old_net_conf->verify_alg, p->verify_alg);
3848                                 goto disconnect;
3849                         }
3850                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3851                                         p->verify_alg, "verify-alg");
3852                         if (IS_ERR(verify_tfm)) {
3853                                 verify_tfm = NULL;
3854                                 goto disconnect;
3855                         }
3856                 }
3857
3858                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3859                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3860                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3861                                     old_net_conf->csums_alg, p->csums_alg);
3862                                 goto disconnect;
3863                         }
3864                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3865                                         p->csums_alg, "csums-alg");
3866                         if (IS_ERR(csums_tfm)) {
3867                                 csums_tfm = NULL;
3868                                 goto disconnect;
3869                         }
3870                 }
3871
3872                 if (apv > 94 && new_disk_conf) {
3873                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3874                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3875                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3876                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3877
3878                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3879                         if (fifo_size != device->rs_plan_s->size) {
3880                                 new_plan = fifo_alloc(fifo_size);
3881                                 if (!new_plan) {
3882                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3883                                         put_ldev(device);
3884                                         goto disconnect;
3885                                 }
3886                         }
3887                 }
3888
3889                 if (verify_tfm || csums_tfm) {
3890                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3891                         if (!new_net_conf) {
3892                                 drbd_err(device, "Allocation of new net_conf failed\n");
3893                                 goto disconnect;
3894                         }
3895
3896                         *new_net_conf = *old_net_conf;
3897
3898                         if (verify_tfm) {
3899                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3900                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3901                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3902                                 peer_device->connection->verify_tfm = verify_tfm;
3903                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3904                         }
3905                         if (csums_tfm) {
3906                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3907                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3908                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3909                                 peer_device->connection->csums_tfm = csums_tfm;
3910                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3911                         }
3912                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3913                 }
3914         }
3915
3916         if (new_disk_conf) {
3917                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3918                 put_ldev(device);
3919         }
3920
3921         if (new_plan) {
3922                 old_plan = device->rs_plan_s;
3923                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3924         }
3925
3926         mutex_unlock(&connection->resource->conf_update);
3927         synchronize_rcu();
3928         if (new_net_conf)
3929                 kfree(old_net_conf);
3930         kfree(old_disk_conf);
3931         kfree(old_plan);
3932
3933         return 0;
3934
3935 reconnect:
3936         if (new_disk_conf) {
3937                 put_ldev(device);
3938                 kfree(new_disk_conf);
3939         }
3940         mutex_unlock(&connection->resource->conf_update);
3941         return -EIO;
3942
3943 disconnect:
3944         kfree(new_plan);
3945         if (new_disk_conf) {
3946                 put_ldev(device);
3947                 kfree(new_disk_conf);
3948         }
3949         mutex_unlock(&connection->resource->conf_update);
3950         /* just for completeness: actually not needed,
3951          * as this is not reached if csums_tfm was ok. */
3952         crypto_free_ahash(csums_tfm);
3953         /* but free the verify_tfm again, if csums_tfm did not work out */
3954         crypto_free_ahash(verify_tfm);
3955         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3956         return -EIO;
3957 }
3958
3959 /* warn if the arguments differ by more than 12.5% */
3960 static void warn_if_differ_considerably(struct drbd_device *device,
3961         const char *s, sector_t a, sector_t b)
3962 {
3963         sector_t d;
3964         if (a == 0 || b == 0)
3965                 return;
3966         d = (a > b) ? (a - b) : (b - a);
3967         if (d > (a>>3) || d > (b>>3))
3968                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3969                      (unsigned long long)a, (unsigned long long)b);
3970 }
3971
3972 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3973 {
3974         struct drbd_peer_device *peer_device;
3975         struct drbd_device *device;
3976         struct p_sizes *p = pi->data;
3977         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3978         enum determine_dev_size dd = DS_UNCHANGED;
3979         sector_t p_size, p_usize, p_csize, my_usize;
3980         sector_t new_size, cur_size;
3981         int ldsc = 0; /* local disk size changed */
3982         enum dds_flags ddsf;
3983
3984         peer_device = conn_peer_device(connection, pi->vnr);
3985         if (!peer_device)
3986                 return config_unknown_volume(connection, pi);
3987         device = peer_device->device;
3988         cur_size = drbd_get_capacity(device->this_bdev);
3989
3990         p_size = be64_to_cpu(p->d_size);
3991         p_usize = be64_to_cpu(p->u_size);
3992         p_csize = be64_to_cpu(p->c_size);
3993
3994         /* just store the peer's disk size for now.
3995          * we still need to figure out whether we accept that. */
3996         device->p_size = p_size;
3997
3998         if (get_ldev(device)) {
3999                 rcu_read_lock();
4000                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4001                 rcu_read_unlock();
4002
4003                 warn_if_differ_considerably(device, "lower level device sizes",
4004                            p_size, drbd_get_max_capacity(device->ldev));
4005                 warn_if_differ_considerably(device, "user requested size",
4006                                             p_usize, my_usize);
4007
4008                 /* if this is the first connect, or an otherwise expected
4009                  * param exchange, choose the minimum */
4010                 if (device->state.conn == C_WF_REPORT_PARAMS)
4011                         p_usize = min_not_zero(my_usize, p_usize);
4012
4013                 /* Never shrink a device with usable data during connect.
4014                    But allow online shrinking if we are connected. */
4015                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4016                 if (new_size < cur_size &&
4017                     device->state.disk >= D_OUTDATED &&
4018                     device->state.conn < C_CONNECTED) {
4019                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4020                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4021                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4022                         put_ldev(device);
4023                         return -EIO;
4024                 }
4025
4026                 if (my_usize != p_usize) {
4027                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4028
4029                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4030                         if (!new_disk_conf) {
4031                                 drbd_err(device, "Allocation of new disk_conf failed\n");
4032                                 put_ldev(device);
4033                                 return -ENOMEM;
4034                         }
4035
4036                         mutex_lock(&connection->resource->conf_update);
4037                         old_disk_conf = device->ldev->disk_conf;
4038                         *new_disk_conf = *old_disk_conf;
4039                         new_disk_conf->disk_size = p_usize;
4040
4041                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4042                         mutex_unlock(&connection->resource->conf_update);
4043                         synchronize_rcu();
4044                         kfree(old_disk_conf);
4045
4046                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
4047                                  (unsigned long)my_usize);
4048                 }
4049
4050                 put_ldev(device);
4051         }
4052
4053         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4054         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4055            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4056            drbd_reconsider_queue_parameters(), we can be sure that after
4057            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4058
4059         ddsf = be16_to_cpu(p->dds_flags);
4060         if (get_ldev(device)) {
4061                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4062                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4063                 put_ldev(device);
4064                 if (dd == DS_ERROR)
4065                         return -EIO;
4066                 drbd_md_sync(device);
4067         } else {
4068                 /*
4069                  * I am diskless, need to accept the peer's *current* size.
4070                  * I must NOT accept the peers backing disk size,
4071                  * it may have been larger than mine all along...
4072                  *
4073                  * At this point, the peer knows more about my disk, or at
4074                  * least about what we last agreed upon, than myself.
4075                  * So if his c_size is less than his d_size, the most likely
4076                  * reason is that *my* d_size was smaller last time we checked.
4077                  *
4078                  * However, if he sends a zero current size,
4079                  * take his (user-capped or) backing disk size anyways.
4080                  *
4081                  * Unless of course he does not have a disk himself.
4082                  * In which case we ignore this completely.
4083                  */
4084                 sector_t new_size = p_csize ?: p_usize ?: p_size;
4085                 drbd_reconsider_queue_parameters(device, NULL, o);
4086                 if (new_size == 0) {
4087                         /* Ignore, peer does not know nothing. */
4088                 } else if (new_size == cur_size) {
4089                         /* nothing to do */
4090                 } else if (cur_size != 0 && p_size == 0) {
4091                         drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4092                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4093                 } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4094                         drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4095                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4096                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4097                         return -EIO;
4098                 } else {
4099                         /* I believe the peer, if
4100                          *  - I don't have a current size myself
4101                          *  - we agree on the size anyways
4102                          *  - I do have a current size, am Secondary,
4103                          *    and he has the only disk
4104                          *  - I do have a current size, am Primary,
4105                          *    and he has the only disk,
4106                          *    which is larger than my current size
4107                          */
4108                         drbd_set_my_capacity(device, new_size);
4109                 }
4110         }
4111
4112         if (get_ldev(device)) {
4113                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4114                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4115                         ldsc = 1;
4116                 }
4117
4118                 put_ldev(device);
4119         }
4120
4121         if (device->state.conn > C_WF_REPORT_PARAMS) {
4122                 if (be64_to_cpu(p->c_size) !=
4123                     drbd_get_capacity(device->this_bdev) || ldsc) {
4124                         /* we have different sizes, probably peer
4125                          * needs to know my new size... */
4126                         drbd_send_sizes(peer_device, 0, ddsf);
4127                 }
4128                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4129                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4130                         if (device->state.pdsk >= D_INCONSISTENT &&
4131                             device->state.disk >= D_INCONSISTENT) {
4132                                 if (ddsf & DDSF_NO_RESYNC)
4133                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4134                                 else
4135                                         resync_after_online_grow(device);
4136                         } else
4137                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4138                 }
4139         }
4140
4141         return 0;
4142 }
4143
4144 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4145 {
4146         struct drbd_peer_device *peer_device;
4147         struct drbd_device *device;
4148         struct p_uuids *p = pi->data;
4149         u64 *p_uuid;
4150         int i, updated_uuids = 0;
4151
4152         peer_device = conn_peer_device(connection, pi->vnr);
4153         if (!peer_device)
4154                 return config_unknown_volume(connection, pi);
4155         device = peer_device->device;
4156
4157         p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4158         if (!p_uuid) {
4159                 drbd_err(device, "kmalloc of p_uuid failed\n");
4160                 return false;
4161         }
4162
4163         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4164                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4165
4166         kfree(device->p_uuid);
4167         device->p_uuid = p_uuid;
4168
4169         if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4170             device->state.disk < D_INCONSISTENT &&
4171             device->state.role == R_PRIMARY &&
4172             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4173                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4174                     (unsigned long long)device->ed_uuid);
4175                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4176                 return -EIO;
4177         }
4178
4179         if (get_ldev(device)) {
4180                 int skip_initial_sync =
4181                         device->state.conn == C_CONNECTED &&
4182                         peer_device->connection->agreed_pro_version >= 90 &&
4183                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4184                         (p_uuid[UI_FLAGS] & 8);
4185                 if (skip_initial_sync) {
4186                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4187                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4188                                         "clear_n_write from receive_uuids",
4189                                         BM_LOCKED_TEST_ALLOWED);
4190                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4191                         _drbd_uuid_set(device, UI_BITMAP, 0);
4192                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4193                                         CS_VERBOSE, NULL);
4194                         drbd_md_sync(device);
4195                         updated_uuids = 1;
4196                 }
4197                 put_ldev(device);
4198         } else if (device->state.disk < D_INCONSISTENT &&
4199                    device->state.role == R_PRIMARY) {
4200                 /* I am a diskless primary, the peer just created a new current UUID
4201                    for me. */
4202                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4203         }
4204
4205         /* Before we test for the disk state, we should wait until an eventually
4206            ongoing cluster wide state change is finished. That is important if
4207            we are primary and are detaching from our disk. We need to see the
4208            new disk state... */
4209         mutex_lock(device->state_mutex);
4210         mutex_unlock(device->state_mutex);
4211         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4212                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4213
4214         if (updated_uuids)
4215                 drbd_print_uuids(device, "receiver updated UUIDs to");
4216
4217         return 0;
4218 }
4219
4220 /**
4221  * convert_state() - Converts the peer's view of the cluster state to our point of view
4222  * @ps:         The state as seen by the peer.
4223  */
4224 static union drbd_state convert_state(union drbd_state ps)
4225 {
4226         union drbd_state ms;
4227
4228         static enum drbd_conns c_tab[] = {
4229                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4230                 [C_CONNECTED] = C_CONNECTED,
4231
4232                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4233                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4234                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4235                 [C_VERIFY_S]       = C_VERIFY_T,
4236                 [C_MASK]   = C_MASK,
4237         };
4238
4239         ms.i = ps.i;
4240
4241         ms.conn = c_tab[ps.conn];
4242         ms.peer = ps.role;
4243         ms.role = ps.peer;
4244         ms.pdsk = ps.disk;
4245         ms.disk = ps.pdsk;
4246         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4247
4248         return ms;
4249 }
4250
4251 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4252 {
4253         struct drbd_peer_device *peer_device;
4254         struct drbd_device *device;
4255         struct p_req_state *p = pi->data;
4256         union drbd_state mask, val;
4257         enum drbd_state_rv rv;
4258
4259         peer_device = conn_peer_device(connection, pi->vnr);
4260         if (!peer_device)
4261                 return -EIO;
4262         device = peer_device->device;
4263
4264         mask.i = be32_to_cpu(p->mask);
4265         val.i = be32_to_cpu(p->val);
4266
4267         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4268             mutex_is_locked(device->state_mutex)) {
4269                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4270                 return 0;
4271         }
4272
4273         mask = convert_state(mask);
4274         val = convert_state(val);
4275
4276         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4277         drbd_send_sr_reply(peer_device, rv);
4278
4279         drbd_md_sync(device);
4280
4281         return 0;
4282 }
4283
4284 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4285 {
4286         struct p_req_state *p = pi->data;
4287         union drbd_state mask, val;
4288         enum drbd_state_rv rv;
4289
4290         mask.i = be32_to_cpu(p->mask);
4291         val.i = be32_to_cpu(p->val);
4292
4293         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4294             mutex_is_locked(&connection->cstate_mutex)) {
4295                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4296                 return 0;
4297         }
4298
4299         mask = convert_state(mask);
4300         val = convert_state(val);
4301
4302         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4303         conn_send_sr_reply(connection, rv);
4304
4305         return 0;
4306 }
4307
4308 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4309 {
4310         struct drbd_peer_device *peer_device;
4311         struct drbd_device *device;
4312         struct p_state *p = pi->data;
4313         union drbd_state os, ns, peer_state;
4314         enum drbd_disk_state real_peer_disk;
4315         enum chg_state_flags cs_flags;
4316         int rv;
4317
4318         peer_device = conn_peer_device(connection, pi->vnr);
4319         if (!peer_device)
4320                 return config_unknown_volume(connection, pi);
4321         device = peer_device->device;
4322
4323         peer_state.i = be32_to_cpu(p->state);
4324
4325         real_peer_disk = peer_state.disk;
4326         if (peer_state.disk == D_NEGOTIATING) {
4327                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4328                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4329         }
4330
4331         spin_lock_irq(&device->resource->req_lock);
4332  retry:
4333         os = ns = drbd_read_state(device);
4334         spin_unlock_irq(&device->resource->req_lock);
4335
4336         /* If some other part of the code (ack_receiver thread, timeout)
4337          * already decided to close the connection again,
4338          * we must not "re-establish" it here. */
4339         if (os.conn <= C_TEAR_DOWN)
4340                 return -ECONNRESET;
4341
4342         /* If this is the "end of sync" confirmation, usually the peer disk
4343          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4344          * set) resync started in PausedSyncT, or if the timing of pause-/
4345          * unpause-sync events has been "just right", the peer disk may
4346          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4347          */
4348         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4349             real_peer_disk == D_UP_TO_DATE &&
4350             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4351                 /* If we are (becoming) SyncSource, but peer is still in sync
4352                  * preparation, ignore its uptodate-ness to avoid flapping, it
4353                  * will change to inconsistent once the peer reaches active
4354                  * syncing states.
4355                  * It may have changed syncer-paused flags, however, so we
4356                  * cannot ignore this completely. */
4357                 if (peer_state.conn > C_CONNECTED &&
4358                     peer_state.conn < C_SYNC_SOURCE)
4359                         real_peer_disk = D_INCONSISTENT;
4360
4361                 /* if peer_state changes to connected at the same time,
4362                  * it explicitly notifies us that it finished resync.
4363                  * Maybe we should finish it up, too? */
4364                 else if (os.conn >= C_SYNC_SOURCE &&
4365                          peer_state.conn == C_CONNECTED) {
4366                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4367                                 drbd_resync_finished(device);
4368                         return 0;
4369                 }
4370         }
4371
4372         /* explicit verify finished notification, stop sector reached. */
4373         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4374             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4375                 ov_out_of_sync_print(device);
4376                 drbd_resync_finished(device);
4377                 return 0;
4378         }
4379
4380         /* peer says his disk is inconsistent, while we think it is uptodate,
4381          * and this happens while the peer still thinks we have a sync going on,
4382          * but we think we are already done with the sync.
4383          * We ignore this to avoid flapping pdsk.
4384          * This should not happen, if the peer is a recent version of drbd. */
4385         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4386             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4387                 real_peer_disk = D_UP_TO_DATE;
4388
4389         if (ns.conn == C_WF_REPORT_PARAMS)
4390                 ns.conn = C_CONNECTED;
4391
4392         if (peer_state.conn == C_AHEAD)
4393                 ns.conn = C_BEHIND;
4394
4395         /* TODO:
4396          * if (primary and diskless and peer uuid != effective uuid)
4397          *     abort attach on peer;
4398          *
4399          * If this node does not have good data, was already connected, but
4400          * the peer did a late attach only now, trying to "negotiate" with me,
4401          * AND I am currently Primary, possibly frozen, with some specific
4402          * "effective" uuid, this should never be reached, really, because
4403          * we first send the uuids, then the current state.
4404          *
4405          * In this scenario, we already dropped the connection hard
4406          * when we received the unsuitable uuids (receive_uuids().
4407          *
4408          * Should we want to change this, that is: not drop the connection in
4409          * receive_uuids() already, then we would need to add a branch here
4410          * that aborts the attach of "unsuitable uuids" on the peer in case
4411          * this node is currently Diskless Primary.
4412          */
4413
4414         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4415             get_ldev_if_state(device, D_NEGOTIATING)) {
4416                 int cr; /* consider resync */
4417
4418                 /* if we established a new connection */
4419                 cr  = (os.conn < C_CONNECTED);
4420                 /* if we had an established connection
4421                  * and one of the nodes newly attaches a disk */
4422                 cr |= (os.conn == C_CONNECTED &&
4423                        (peer_state.disk == D_NEGOTIATING ||
4424                         os.disk == D_NEGOTIATING));
4425                 /* if we have both been inconsistent, and the peer has been
4426                  * forced to be UpToDate with --overwrite-data */
4427                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4428                 /* if we had been plain connected, and the admin requested to
4429                  * start a sync by "invalidate" or "invalidate-remote" */
4430                 cr |= (os.conn == C_CONNECTED &&
4431                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4432                                  peer_state.conn <= C_WF_BITMAP_T));
4433
4434                 if (cr)
4435                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4436
4437                 put_ldev(device);
4438                 if (ns.conn == C_MASK) {
4439                         ns.conn = C_CONNECTED;
4440                         if (device->state.disk == D_NEGOTIATING) {
4441                                 drbd_force_state(device, NS(disk, D_FAILED));
4442                         } else if (peer_state.disk == D_NEGOTIATING) {
4443                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4444                                 peer_state.disk = D_DISKLESS;
4445                                 real_peer_disk = D_DISKLESS;
4446                         } else {
4447                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4448                                         return -EIO;
4449                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4450                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4451                                 return -EIO;
4452                         }
4453                 }
4454         }
4455
4456         spin_lock_irq(&device->resource->req_lock);
4457         if (os.i != drbd_read_state(device).i)
4458                 goto retry;
4459         clear_bit(CONSIDER_RESYNC, &device->flags);
4460         ns.peer = peer_state.role;
4461         ns.pdsk = real_peer_disk;
4462         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4463         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4464                 ns.disk = device->new_state_tmp.disk;
4465         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4466         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4467             test_bit(NEW_CUR_UUID, &device->flags)) {
4468                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4469                    for temporal network outages! */
4470                 spin_unlock_irq(&device->resource->req_lock);
4471                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4472                 tl_clear(peer_device->connection);
4473                 drbd_uuid_new_current(device);
4474                 clear_bit(NEW_CUR_UUID, &device->flags);
4475                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4476                 return -EIO;
4477         }
4478         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4479         ns = drbd_read_state(device);
4480         spin_unlock_irq(&device->resource->req_lock);
4481
4482         if (rv < SS_SUCCESS) {
4483                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4484                 return -EIO;
4485         }
4486
4487         if (os.conn > C_WF_REPORT_PARAMS) {
4488                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4489                     peer_state.disk != D_NEGOTIATING ) {
4490                         /* we want resync, peer has not yet decided to sync... */
4491                         /* Nowadays only used when forcing a node into primary role and
4492                            setting its disk to UpToDate with that */
4493                         drbd_send_uuids(peer_device);
4494                         drbd_send_current_state(peer_device);
4495                 }
4496         }
4497
4498         clear_bit(DISCARD_MY_DATA, &device->flags);
4499
4500         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4501
4502         return 0;
4503 }
4504
4505 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4506 {
4507         struct drbd_peer_device *peer_device;
4508         struct drbd_device *device;
4509         struct p_rs_uuid *p = pi->data;
4510
4511         peer_device = conn_peer_device(connection, pi->vnr);
4512         if (!peer_device)
4513                 return -EIO;
4514         device = peer_device->device;
4515
4516         wait_event(device->misc_wait,
4517                    device->state.conn == C_WF_SYNC_UUID ||
4518                    device->state.conn == C_BEHIND ||
4519                    device->state.conn < C_CONNECTED ||
4520                    device->state.disk < D_NEGOTIATING);
4521
4522         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4523
4524         /* Here the _drbd_uuid_ functions are right, current should
4525            _not_ be rotated into the history */
4526         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4527                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4528                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4529
4530                 drbd_print_uuids(device, "updated sync uuid");
4531                 drbd_start_resync(device, C_SYNC_TARGET);
4532
4533                 put_ldev(device);
4534         } else
4535                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4536
4537         return 0;
4538 }
4539
4540 /**
4541  * receive_bitmap_plain
4542  *
4543  * Return 0 when done, 1 when another iteration is needed, and a negative error
4544  * code upon failure.
4545  */
4546 static int
4547 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4548                      unsigned long *p, struct bm_xfer_ctx *c)
4549 {
4550         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4551                                  drbd_header_size(peer_device->connection);
4552         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4553                                        c->bm_words - c->word_offset);
4554         unsigned int want = num_words * sizeof(*p);
4555         int err;
4556
4557         if (want != size) {
4558                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4559                 return -EIO;
4560         }
4561         if (want == 0)
4562                 return 0;
4563         err = drbd_recv_all(peer_device->connection, p, want);
4564         if (err)
4565                 return err;
4566
4567         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4568
4569         c->word_offset += num_words;
4570         c->bit_offset = c->word_offset * BITS_PER_LONG;
4571         if (c->bit_offset > c->bm_bits)
4572                 c->bit_offset = c->bm_bits;
4573
4574         return 1;
4575 }
4576
4577 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4578 {
4579         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4580 }
4581
4582 static int dcbp_get_start(struct p_compressed_bm *p)
4583 {
4584         return (p->encoding & 0x80) != 0;
4585 }
4586
4587 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4588 {
4589         return (p->encoding >> 4) & 0x7;
4590 }
4591
4592 /**
4593  * recv_bm_rle_bits
4594  *
4595  * Return 0 when done, 1 when another iteration is needed, and a negative error
4596  * code upon failure.
4597  */
4598 static int
4599 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4600                 struct p_compressed_bm *p,
4601                  struct bm_xfer_ctx *c,
4602                  unsigned int len)
4603 {
4604         struct bitstream bs;
4605         u64 look_ahead;
4606         u64 rl;
4607         u64 tmp;
4608         unsigned long s = c->bit_offset;
4609         unsigned long e;
4610         int toggle = dcbp_get_start(p);
4611         int have;
4612         int bits;
4613
4614         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4615
4616         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4617         if (bits < 0)
4618                 return -EIO;
4619
4620         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4621                 bits = vli_decode_bits(&rl, look_ahead);
4622                 if (bits <= 0)
4623                         return -EIO;
4624
4625                 if (toggle) {
4626                         e = s + rl -1;
4627                         if (e >= c->bm_bits) {
4628                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4629                                 return -EIO;
4630                         }
4631                         _drbd_bm_set_bits(peer_device->device, s, e);
4632                 }
4633
4634                 if (have < bits) {
4635                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4636                                 have, bits, look_ahead,
4637                                 (unsigned int)(bs.cur.b - p->code),
4638                                 (unsigned int)bs.buf_len);
4639                         return -EIO;
4640                 }
4641                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4642                 if (likely(bits < 64))
4643                         look_ahead >>= bits;
4644                 else
4645                         look_ahead = 0;
4646                 have -= bits;
4647
4648                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4649                 if (bits < 0)
4650                         return -EIO;
4651                 look_ahead |= tmp << have;
4652                 have += bits;
4653         }
4654
4655         c->bit_offset = s;
4656         bm_xfer_ctx_bit_to_word_offset(c);
4657
4658         return (s != c->bm_bits);
4659 }
4660
4661 /**
4662  * decode_bitmap_c
4663  *
4664  * Return 0 when done, 1 when another iteration is needed, and a negative error
4665  * code upon failure.
4666  */
4667 static int
4668 decode_bitmap_c(struct drbd_peer_device *peer_device,
4669                 struct p_compressed_bm *p,
4670                 struct bm_xfer_ctx *c,
4671                 unsigned int len)
4672 {
4673         if (dcbp_get_code(p) == RLE_VLI_Bits)
4674                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4675
4676         /* other variants had been implemented for evaluation,
4677          * but have been dropped as this one turned out to be "best"
4678          * during all our tests. */
4679
4680         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4681         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4682         return -EIO;
4683 }
4684
4685 void INFO_bm_xfer_stats(struct drbd_device *device,
4686                 const char *direction, struct bm_xfer_ctx *c)
4687 {
4688         /* what would it take to transfer it "plaintext" */
4689         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4690         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4691         unsigned int plain =
4692                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4693                 c->bm_words * sizeof(unsigned long);
4694         unsigned int total = c->bytes[0] + c->bytes[1];
4695         unsigned int r;
4696
4697         /* total can not be zero. but just in case: */
4698         if (total == 0)
4699                 return;
4700
4701         /* don't report if not compressed */
4702         if (total >= plain)
4703                 return;
4704
4705         /* total < plain. check for overflow, still */
4706         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4707                                     : (1000 * total / plain);
4708
4709         if (r > 1000)
4710                 r = 1000;
4711
4712         r = 1000 - r;
4713         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4714              "total %u; compression: %u.%u%%\n",
4715                         direction,
4716                         c->bytes[1], c->packets[1],
4717                         c->bytes[0], c->packets[0],
4718                         total, r/10, r % 10);
4719 }
4720
4721 /* Since we are processing the bitfield from lower addresses to higher,
4722    it does not matter if the process it in 32 bit chunks or 64 bit
4723    chunks as long as it is little endian. (Understand it as byte stream,
4724    beginning with the lowest byte...) If we would use big endian
4725    we would need to process it from the highest address to the lowest,
4726    in order to be agnostic to the 32 vs 64 bits issue.
4727
4728    returns 0 on failure, 1 if we successfully received it. */
4729 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4730 {
4731         struct drbd_peer_device *peer_device;
4732         struct drbd_device *device;
4733         struct bm_xfer_ctx c;
4734         int err;
4735
4736         peer_device = conn_peer_device(connection, pi->vnr);
4737         if (!peer_device)
4738                 return -EIO;
4739         device = peer_device->device;
4740
4741         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4742         /* you are supposed to send additional out-of-sync information
4743          * if you actually set bits during this phase */
4744
4745         c = (struct bm_xfer_ctx) {
4746                 .bm_bits = drbd_bm_bits(device),
4747                 .bm_words = drbd_bm_words(device),
4748         };
4749
4750         for(;;) {
4751                 if (pi->cmd == P_BITMAP)
4752                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4753                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4754                         /* MAYBE: sanity check that we speak proto >= 90,
4755                          * and the feature is enabled! */
4756                         struct p_compressed_bm *p = pi->data;
4757
4758                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4759                                 drbd_err(device, "ReportCBitmap packet too large\n");
4760                                 err = -EIO;
4761                                 goto out;
4762                         }
4763                         if (pi->size <= sizeof(*p)) {
4764                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4765                                 err = -EIO;
4766                                 goto out;
4767                         }
4768                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4769                         if (err)
4770                                goto out;
4771                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4772                 } else {
4773                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4774                         err = -EIO;
4775                         goto out;
4776                 }
4777
4778                 c.packets[pi->cmd == P_BITMAP]++;
4779                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4780
4781                 if (err <= 0) {
4782                         if (err < 0)
4783                                 goto out;
4784                         break;
4785                 }
4786                 err = drbd_recv_header(peer_device->connection, pi);
4787                 if (err)
4788                         goto out;
4789         }
4790
4791         INFO_bm_xfer_stats(device, "receive", &c);
4792
4793         if (device->state.conn == C_WF_BITMAP_T) {
4794                 enum drbd_state_rv rv;
4795
4796                 err = drbd_send_bitmap(device);
4797                 if (err)
4798                         goto out;
4799                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4800                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4801                 D_ASSERT(device, rv == SS_SUCCESS);
4802         } else if (device->state.conn != C_WF_BITMAP_S) {
4803                 /* admin may have requested C_DISCONNECTING,
4804                  * other threads may have noticed network errors */
4805                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4806                     drbd_conn_str(device->state.conn));
4807         }
4808         err = 0;
4809
4810  out:
4811         drbd_bm_unlock(device);
4812         if (!err && device->state.conn == C_WF_BITMAP_S)
4813                 drbd_start_resync(device, C_SYNC_SOURCE);
4814         return err;
4815 }
4816
4817 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4818 {
4819         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4820                  pi->cmd, pi->size);
4821
4822         return ignore_remaining_packet(connection, pi);
4823 }
4824
4825 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4826 {
4827         /* Make sure we've acked all the TCP data associated
4828          * with the data requests being unplugged */
4829         drbd_tcp_quickack(connection->data.socket);
4830
4831         return 0;
4832 }
4833
4834 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4835 {
4836         struct drbd_peer_device *peer_device;
4837         struct drbd_device *device;
4838         struct p_block_desc *p = pi->data;
4839
4840         peer_device = conn_peer_device(connection, pi->vnr);
4841         if (!peer_device)
4842                 return -EIO;
4843         device = peer_device->device;
4844
4845         switch (device->state.conn) {
4846         case C_WF_SYNC_UUID:
4847         case C_WF_BITMAP_T:
4848         case C_BEHIND:
4849                         break;
4850         default:
4851                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4852                                 drbd_conn_str(device->state.conn));
4853         }
4854
4855         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4856
4857         return 0;
4858 }
4859
4860 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4861 {
4862         struct drbd_peer_device *peer_device;
4863         struct p_block_desc *p = pi->data;
4864         struct drbd_device *device;
4865         sector_t sector;
4866         int size, err = 0;
4867
4868         peer_device = conn_peer_device(connection, pi->vnr);
4869         if (!peer_device)
4870                 return -EIO;
4871         device = peer_device->device;
4872
4873         sector = be64_to_cpu(p->sector);
4874         size = be32_to_cpu(p->blksize);
4875
4876         dec_rs_pending(device);
4877
4878         if (get_ldev(device)) {
4879                 struct drbd_peer_request *peer_req;
4880                 const int op = REQ_OP_WRITE_ZEROES;
4881
4882                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4883                                                size, 0, GFP_NOIO);
4884                 if (!peer_req) {
4885                         put_ldev(device);
4886                         return -ENOMEM;
4887                 }
4888
4889                 peer_req->w.cb = e_end_resync_block;
4890                 peer_req->submit_jif = jiffies;
4891                 peer_req->flags |= EE_IS_TRIM;
4892
4893                 spin_lock_irq(&device->resource->req_lock);
4894                 list_add_tail(&peer_req->w.list, &device->sync_ee);
4895                 spin_unlock_irq(&device->resource->req_lock);
4896
4897                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4898                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4899
4900                 if (err) {
4901                         spin_lock_irq(&device->resource->req_lock);
4902                         list_del(&peer_req->w.list);
4903                         spin_unlock_irq(&device->resource->req_lock);
4904
4905                         drbd_free_peer_req(device, peer_req);
4906                         put_ldev(device);
4907                         err = 0;
4908                         goto fail;
4909                 }
4910
4911                 inc_unacked(device);
4912
4913                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4914                    as well as drbd_rs_complete_io() */
4915         } else {
4916         fail:
4917                 drbd_rs_complete_io(device, sector);
4918                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4919         }
4920
4921         atomic_add(size >> 9, &device->rs_sect_in);
4922
4923         return err;
4924 }
4925
4926 struct data_cmd {
4927         int expect_payload;
4928         unsigned int pkt_size;
4929         int (*fn)(struct drbd_connection *, struct packet_info *);
4930 };
4931
4932 static struct data_cmd drbd_cmd_handler[] = {
4933         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4934         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4935         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4936         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4937         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4938         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4939         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4940         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4941         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4942         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4943         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4944         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4945         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4946         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4947         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4948         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4949         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4950         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4951         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4952         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4953         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4954         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4955         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4956         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4957         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4958         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4959         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4960         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
4961 };
4962
4963 static void drbdd(struct drbd_connection *connection)
4964 {
4965         struct packet_info pi;
4966         size_t shs; /* sub header size */
4967         int err;
4968
4969         while (get_t_state(&connection->receiver) == RUNNING) {
4970                 struct data_cmd const *cmd;
4971
4972                 drbd_thread_current_set_cpu(&connection->receiver);
4973                 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4974                 if (drbd_recv_header_maybe_unplug(connection, &pi))
4975                         goto err_out;
4976
4977                 cmd = &drbd_cmd_handler[pi.cmd];
4978                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4979                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4980                                  cmdname(pi.cmd), pi.cmd);
4981                         goto err_out;
4982                 }
4983
4984                 shs = cmd->pkt_size;
4985                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4986                         shs += sizeof(struct o_qlim);
4987                 if (pi.size > shs && !cmd->expect_payload) {
4988                         drbd_err(connection, "No payload expected %s l:%d\n",
4989                                  cmdname(pi.cmd), pi.size);
4990                         goto err_out;
4991                 }
4992                 if (pi.size < shs) {
4993                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4994                                  cmdname(pi.cmd), (int)shs, pi.size);
4995                         goto err_out;
4996                 }
4997
4998                 if (shs) {
4999                         update_receiver_timing_details(connection, drbd_recv_all_warn);
5000                         err = drbd_recv_all_warn(connection, pi.data, shs);
5001                         if (err)
5002                                 goto err_out;
5003                         pi.size -= shs;
5004                 }
5005
5006                 update_receiver_timing_details(connection, cmd->fn);
5007                 err = cmd->fn(connection, &pi);
5008                 if (err) {
5009                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5010                                  cmdname(pi.cmd), err, pi.size);
5011                         goto err_out;
5012                 }
5013         }
5014         return;
5015
5016     err_out:
5017         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5018 }
5019
5020 static void conn_disconnect(struct drbd_connection *connection)
5021 {
5022         struct drbd_peer_device *peer_device;
5023         enum drbd_conns oc;
5024         int vnr;
5025
5026         if (connection->cstate == C_STANDALONE)
5027                 return;
5028
5029         /* We are about to start the cleanup after connection loss.
5030          * Make sure drbd_make_request knows about that.
5031          * Usually we should be in some network failure state already,
5032          * but just in case we are not, we fix it up here.
5033          */
5034         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5035
5036         /* ack_receiver does not clean up anything. it must not interfere, either */
5037         drbd_thread_stop(&connection->ack_receiver);
5038         if (connection->ack_sender) {
5039                 destroy_workqueue(connection->ack_sender);
5040                 connection->ack_sender = NULL;
5041         }
5042         drbd_free_sock(connection);
5043
5044         rcu_read_lock();
5045         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5046                 struct drbd_device *device = peer_device->device;
5047                 kref_get(&device->kref);
5048                 rcu_read_unlock();
5049                 drbd_disconnected(peer_device);
5050                 kref_put(&device->kref, drbd_destroy_device);
5051                 rcu_read_lock();
5052         }
5053         rcu_read_unlock();
5054
5055         if (!list_empty(&connection->current_epoch->list))
5056                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5057         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5058         atomic_set(&connection->current_epoch->epoch_size, 0);
5059         connection->send.seen_any_write_yet = false;
5060
5061         drbd_info(connection, "Connection closed\n");
5062
5063         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5064                 conn_try_outdate_peer_async(connection);
5065
5066         spin_lock_irq(&connection->resource->req_lock);
5067         oc = connection->cstate;
5068         if (oc >= C_UNCONNECTED)
5069                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5070
5071         spin_unlock_irq(&connection->resource->req_lock);
5072
5073         if (oc == C_DISCONNECTING)
5074                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5075 }
5076
5077 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5078 {
5079         struct drbd_device *device = peer_device->device;
5080         unsigned int i;
5081
5082         /* wait for current activity to cease. */
5083         spin_lock_irq(&device->resource->req_lock);
5084         _drbd_wait_ee_list_empty(device, &device->active_ee);
5085         _drbd_wait_ee_list_empty(device, &device->sync_ee);
5086         _drbd_wait_ee_list_empty(device, &device->read_ee);
5087         spin_unlock_irq(&device->resource->req_lock);
5088
5089         /* We do not have data structures that would allow us to
5090          * get the rs_pending_cnt down to 0 again.
5091          *  * On C_SYNC_TARGET we do not have any data structures describing
5092          *    the pending RSDataRequest's we have sent.
5093          *  * On C_SYNC_SOURCE there is no data structure that tracks
5094          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5095          *  And no, it is not the sum of the reference counts in the
5096          *  resync_LRU. The resync_LRU tracks the whole operation including
5097          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5098          *  on the fly. */
5099         drbd_rs_cancel_all(device);
5100         device->rs_total = 0;
5101         device->rs_failed = 0;
5102         atomic_set(&device->rs_pending_cnt, 0);
5103         wake_up(&device->misc_wait);
5104
5105         del_timer_sync(&device->resync_timer);
5106         resync_timer_fn((unsigned long)device);
5107
5108         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5109          * w_make_resync_request etc. which may still be on the worker queue
5110          * to be "canceled" */
5111         drbd_flush_workqueue(&peer_device->connection->sender_work);
5112
5113         drbd_finish_peer_reqs(device);
5114
5115         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5116            might have issued a work again. The one before drbd_finish_peer_reqs() is
5117            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5118         drbd_flush_workqueue(&peer_device->connection->sender_work);
5119
5120         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5121          * again via drbd_try_clear_on_disk_bm(). */
5122         drbd_rs_cancel_all(device);
5123
5124         kfree(device->p_uuid);
5125         device->p_uuid = NULL;
5126
5127         if (!drbd_suspended(device))
5128                 tl_clear(peer_device->connection);
5129
5130         drbd_md_sync(device);
5131
5132         if (get_ldev(device)) {
5133                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5134                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5135                 put_ldev(device);
5136         }
5137
5138         /* tcp_close and release of sendpage pages can be deferred.  I don't
5139          * want to use SO_LINGER, because apparently it can be deferred for
5140          * more than 20 seconds (longest time I checked).
5141          *
5142          * Actually we don't care for exactly when the network stack does its
5143          * put_page(), but release our reference on these pages right here.
5144          */
5145         i = drbd_free_peer_reqs(device, &device->net_ee);
5146         if (i)
5147                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5148         i = atomic_read(&device->pp_in_use_by_net);
5149         if (i)
5150                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5151         i = atomic_read(&device->pp_in_use);
5152         if (i)
5153                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5154
5155         D_ASSERT(device, list_empty(&device->read_ee));
5156         D_ASSERT(device, list_empty(&device->active_ee));
5157         D_ASSERT(device, list_empty(&device->sync_ee));
5158         D_ASSERT(device, list_empty(&device->done_ee));
5159
5160         return 0;
5161 }
5162
5163 /*
5164  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5165  * we can agree on is stored in agreed_pro_version.
5166  *
5167  * feature flags and the reserved array should be enough room for future
5168  * enhancements of the handshake protocol, and possible plugins...
5169  *
5170  * for now, they are expected to be zero, but ignored.
5171  */
5172 static int drbd_send_features(struct drbd_connection *connection)
5173 {
5174         struct drbd_socket *sock;
5175         struct p_connection_features *p;
5176
5177         sock = &connection->data;
5178         p = conn_prepare_command(connection, sock);
5179         if (!p)
5180                 return -EIO;
5181         memset(p, 0, sizeof(*p));
5182         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5183         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5184         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5185         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5186 }
5187
5188 /*
5189  * return values:
5190  *   1 yes, we have a valid connection
5191  *   0 oops, did not work out, please try again
5192  *  -1 peer talks different language,
5193  *     no point in trying again, please go standalone.
5194  */
5195 static int drbd_do_features(struct drbd_connection *connection)
5196 {
5197         /* ASSERT current == connection->receiver ... */
5198         struct p_connection_features *p;
5199         const int expect = sizeof(struct p_connection_features);
5200         struct packet_info pi;
5201         int err;
5202
5203         err = drbd_send_features(connection);
5204         if (err)
5205                 return 0;
5206
5207         err = drbd_recv_header(connection, &pi);
5208         if (err)
5209                 return 0;
5210
5211         if (pi.cmd != P_CONNECTION_FEATURES) {
5212                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5213                          cmdname(pi.cmd), pi.cmd);
5214                 return -1;
5215         }
5216
5217         if (pi.size != expect) {
5218                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5219                      expect, pi.size);
5220                 return -1;
5221         }
5222
5223         p = pi.data;
5224         err = drbd_recv_all_warn(connection, p, expect);
5225         if (err)
5226                 return 0;
5227
5228         p->protocol_min = be32_to_cpu(p->protocol_min);
5229         p->protocol_max = be32_to_cpu(p->protocol_max);
5230         if (p->protocol_max == 0)
5231                 p->protocol_max = p->protocol_min;
5232
5233         if (PRO_VERSION_MAX < p->protocol_min ||
5234             PRO_VERSION_MIN > p->protocol_max)
5235                 goto incompat;
5236
5237         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5238         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5239
5240         drbd_info(connection, "Handshake successful: "
5241              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5242
5243         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5244                   connection->agreed_features,
5245                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5246                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5247                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5248                   connection->agreed_features ? "" : " none");
5249
5250         return 1;
5251
5252  incompat:
5253         drbd_err(connection, "incompatible DRBD dialects: "
5254             "I support %d-%d, peer supports %d-%d\n",
5255             PRO_VERSION_MIN, PRO_VERSION_MAX,
5256             p->protocol_min, p->protocol_max);
5257         return -1;
5258 }
5259
5260 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5261 static int drbd_do_auth(struct drbd_connection *connection)
5262 {
5263         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5264         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5265         return -1;
5266 }
5267 #else
5268 #define CHALLENGE_LEN 64
5269
5270 /* Return value:
5271         1 - auth succeeded,
5272         0 - failed, try again (network error),
5273         -1 - auth failed, don't try again.
5274 */
5275
5276 static int drbd_do_auth(struct drbd_connection *connection)
5277 {
5278         struct drbd_socket *sock;
5279         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5280         char *response = NULL;
5281         char *right_response = NULL;
5282         char *peers_ch = NULL;
5283         unsigned int key_len;
5284         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5285         unsigned int resp_size;
5286         struct shash_desc *desc;
5287         struct packet_info pi;
5288         struct net_conf *nc;
5289         int err, rv;
5290
5291         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5292
5293         rcu_read_lock();
5294         nc = rcu_dereference(connection->net_conf);
5295         key_len = strlen(nc->shared_secret);
5296         memcpy(secret, nc->shared_secret, key_len);
5297         rcu_read_unlock();
5298
5299         desc = kmalloc(sizeof(struct shash_desc) +
5300                        crypto_shash_descsize(connection->cram_hmac_tfm),
5301                        GFP_KERNEL);
5302         if (!desc) {
5303                 rv = -1;
5304                 goto fail;
5305         }
5306         desc->tfm = connection->cram_hmac_tfm;
5307         desc->flags = 0;
5308
5309         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5310         if (rv) {
5311                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5312                 rv = -1;
5313                 goto fail;
5314         }
5315
5316         get_random_bytes(my_challenge, CHALLENGE_LEN);
5317
5318         sock = &connection->data;
5319         if (!conn_prepare_command(connection, sock)) {
5320                 rv = 0;
5321                 goto fail;
5322         }
5323         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5324                                 my_challenge, CHALLENGE_LEN);
5325         if (!rv)
5326                 goto fail;
5327
5328         err = drbd_recv_header(connection, &pi);
5329         if (err) {
5330                 rv = 0;
5331                 goto fail;
5332         }
5333
5334         if (pi.cmd != P_AUTH_CHALLENGE) {
5335                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5336                          cmdname(pi.cmd), pi.cmd);
5337                 rv = 0;
5338                 goto fail;
5339         }
5340
5341         if (pi.size > CHALLENGE_LEN * 2) {
5342                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5343                 rv = -1;
5344                 goto fail;
5345         }
5346
5347         if (pi.size < CHALLENGE_LEN) {
5348                 drbd_err(connection, "AuthChallenge payload too small.\n");
5349                 rv = -1;
5350                 goto fail;
5351         }
5352
5353         peers_ch = kmalloc(pi.size, GFP_NOIO);
5354         if (peers_ch == NULL) {
5355                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5356                 rv = -1;
5357                 goto fail;
5358         }
5359
5360         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5361         if (err) {
5362                 rv = 0;
5363                 goto fail;
5364         }
5365
5366         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5367                 drbd_err(connection, "Peer presented the same challenge!\n");
5368                 rv = -1;
5369                 goto fail;
5370         }
5371
5372         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5373         response = kmalloc(resp_size, GFP_NOIO);
5374         if (response == NULL) {
5375                 drbd_err(connection, "kmalloc of response failed\n");
5376                 rv = -1;
5377                 goto fail;
5378         }
5379
5380         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5381         if (rv) {
5382                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5383                 rv = -1;
5384                 goto fail;
5385         }
5386
5387         if (!conn_prepare_command(connection, sock)) {
5388                 rv = 0;
5389                 goto fail;
5390         }
5391         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5392                                 response, resp_size);
5393         if (!rv)
5394                 goto fail;
5395
5396         err = drbd_recv_header(connection, &pi);
5397         if (err) {
5398                 rv = 0;
5399                 goto fail;
5400         }
5401
5402         if (pi.cmd != P_AUTH_RESPONSE) {
5403                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5404                          cmdname(pi.cmd), pi.cmd);
5405                 rv = 0;
5406                 goto fail;
5407         }
5408
5409         if (pi.size != resp_size) {
5410                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5411                 rv = 0;
5412                 goto fail;
5413         }
5414
5415         err = drbd_recv_all_warn(connection, response , resp_size);
5416         if (err) {
5417                 rv = 0;
5418                 goto fail;
5419         }
5420
5421         right_response = kmalloc(resp_size, GFP_NOIO);
5422         if (right_response == NULL) {
5423                 drbd_err(connection, "kmalloc of right_response failed\n");
5424                 rv = -1;
5425                 goto fail;
5426         }
5427
5428         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5429                                  right_response);
5430         if (rv) {
5431                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5432                 rv = -1;
5433                 goto fail;
5434         }
5435
5436         rv = !memcmp(response, right_response, resp_size);
5437
5438         if (rv)
5439                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5440                      resp_size);
5441         else
5442                 rv = -1;
5443
5444  fail:
5445         kfree(peers_ch);
5446         kfree(response);
5447         kfree(right_response);
5448         if (desc) {
5449                 shash_desc_zero(desc);
5450                 kfree(desc);
5451         }
5452
5453         return rv;
5454 }
5455 #endif
5456
5457 int drbd_receiver(struct drbd_thread *thi)
5458 {
5459         struct drbd_connection *connection = thi->connection;
5460         int h;
5461
5462         drbd_info(connection, "receiver (re)started\n");
5463
5464         do {
5465                 h = conn_connect(connection);
5466                 if (h == 0) {
5467                         conn_disconnect(connection);
5468                         schedule_timeout_interruptible(HZ);
5469                 }
5470                 if (h == -1) {
5471                         drbd_warn(connection, "Discarding network configuration.\n");
5472                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5473                 }
5474         } while (h == 0);
5475
5476         if (h > 0) {
5477                 blk_start_plug(&connection->receiver_plug);
5478                 drbdd(connection);
5479                 blk_finish_plug(&connection->receiver_plug);
5480         }
5481
5482         conn_disconnect(connection);
5483
5484         drbd_info(connection, "receiver terminated\n");
5485         return 0;
5486 }
5487
5488 /* ********* acknowledge sender ******** */
5489
5490 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5491 {
5492         struct p_req_state_reply *p = pi->data;
5493         int retcode = be32_to_cpu(p->retcode);
5494
5495         if (retcode >= SS_SUCCESS) {
5496                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5497         } else {
5498                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5499                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5500                          drbd_set_st_err_str(retcode), retcode);
5501         }
5502         wake_up(&connection->ping_wait);
5503
5504         return 0;
5505 }
5506
5507 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5508 {
5509         struct drbd_peer_device *peer_device;
5510         struct drbd_device *device;
5511         struct p_req_state_reply *p = pi->data;
5512         int retcode = be32_to_cpu(p->retcode);
5513
5514         peer_device = conn_peer_device(connection, pi->vnr);
5515         if (!peer_device)
5516                 return -EIO;
5517         device = peer_device->device;
5518
5519         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5520                 D_ASSERT(device, connection->agreed_pro_version < 100);
5521                 return got_conn_RqSReply(connection, pi);
5522         }
5523
5524         if (retcode >= SS_SUCCESS) {
5525                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5526         } else {
5527                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5528                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5529                         drbd_set_st_err_str(retcode), retcode);
5530         }
5531         wake_up(&device->state_wait);
5532
5533         return 0;
5534 }
5535
5536 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5537 {
5538         return drbd_send_ping_ack(connection);
5539
5540 }
5541
5542 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5543 {
5544         /* restore idle timeout */
5545         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5546         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5547                 wake_up(&connection->ping_wait);
5548
5549         return 0;
5550 }
5551
5552 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5553 {
5554         struct drbd_peer_device *peer_device;
5555         struct drbd_device *device;
5556         struct p_block_ack *p = pi->data;
5557         sector_t sector = be64_to_cpu(p->sector);
5558         int blksize = be32_to_cpu(p->blksize);
5559
5560         peer_device = conn_peer_device(connection, pi->vnr);
5561         if (!peer_device)
5562                 return -EIO;
5563         device = peer_device->device;
5564
5565         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5566
5567         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5568
5569         if (get_ldev(device)) {
5570                 drbd_rs_complete_io(device, sector);
5571                 drbd_set_in_sync(device, sector, blksize);
5572                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5573                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5574                 put_ldev(device);
5575         }
5576         dec_rs_pending(device);
5577         atomic_add(blksize >> 9, &device->rs_sect_in);
5578
5579         return 0;
5580 }
5581
5582 static int
5583 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5584                               struct rb_root *root, const char *func,
5585                               enum drbd_req_event what, bool missing_ok)
5586 {
5587         struct drbd_request *req;
5588         struct bio_and_error m;
5589
5590         spin_lock_irq(&device->resource->req_lock);
5591         req = find_request(device, root, id, sector, missing_ok, func);
5592         if (unlikely(!req)) {
5593                 spin_unlock_irq(&device->resource->req_lock);
5594                 return -EIO;
5595         }
5596         __req_mod(req, what, &m);
5597         spin_unlock_irq(&device->resource->req_lock);
5598
5599         if (m.bio)
5600                 complete_master_bio(device, &m);
5601         return 0;
5602 }
5603
5604 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5605 {
5606         struct drbd_peer_device *peer_device;
5607         struct drbd_device *device;
5608         struct p_block_ack *p = pi->data;
5609         sector_t sector = be64_to_cpu(p->sector);
5610         int blksize = be32_to_cpu(p->blksize);
5611         enum drbd_req_event what;
5612
5613         peer_device = conn_peer_device(connection, pi->vnr);
5614         if (!peer_device)
5615                 return -EIO;
5616         device = peer_device->device;
5617
5618         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5619
5620         if (p->block_id == ID_SYNCER) {
5621                 drbd_set_in_sync(device, sector, blksize);
5622                 dec_rs_pending(device);
5623                 return 0;
5624         }
5625         switch (pi->cmd) {
5626         case P_RS_WRITE_ACK:
5627                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5628                 break;
5629         case P_WRITE_ACK:
5630                 what = WRITE_ACKED_BY_PEER;
5631                 break;
5632         case P_RECV_ACK:
5633                 what = RECV_ACKED_BY_PEER;
5634                 break;
5635         case P_SUPERSEDED:
5636                 what = CONFLICT_RESOLVED;
5637                 break;
5638         case P_RETRY_WRITE:
5639                 what = POSTPONE_WRITE;
5640                 break;
5641         default:
5642                 BUG();
5643         }
5644
5645         return validate_req_change_req_state(device, p->block_id, sector,
5646                                              &device->write_requests, __func__,
5647                                              what, false);
5648 }
5649
5650 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5651 {
5652         struct drbd_peer_device *peer_device;
5653         struct drbd_device *device;
5654         struct p_block_ack *p = pi->data;
5655         sector_t sector = be64_to_cpu(p->sector);
5656         int size = be32_to_cpu(p->blksize);
5657         int err;
5658
5659         peer_device = conn_peer_device(connection, pi->vnr);
5660         if (!peer_device)
5661                 return -EIO;
5662         device = peer_device->device;
5663
5664         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5665
5666         if (p->block_id == ID_SYNCER) {
5667                 dec_rs_pending(device);
5668                 drbd_rs_failed_io(device, sector, size);
5669                 return 0;
5670         }
5671
5672         err = validate_req_change_req_state(device, p->block_id, sector,
5673                                             &device->write_requests, __func__,
5674                                             NEG_ACKED, true);
5675         if (err) {
5676                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5677                    The master bio might already be completed, therefore the
5678                    request is no longer in the collision hash. */
5679                 /* In Protocol B we might already have got a P_RECV_ACK
5680                    but then get a P_NEG_ACK afterwards. */
5681                 drbd_set_out_of_sync(device, sector, size);
5682         }
5683         return 0;
5684 }
5685
5686 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5687 {
5688         struct drbd_peer_device *peer_device;
5689         struct drbd_device *device;
5690         struct p_block_ack *p = pi->data;
5691         sector_t sector = be64_to_cpu(p->sector);
5692
5693         peer_device = conn_peer_device(connection, pi->vnr);
5694         if (!peer_device)
5695                 return -EIO;
5696         device = peer_device->device;
5697
5698         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5699
5700         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5701             (unsigned long long)sector, be32_to_cpu(p->blksize));
5702
5703         return validate_req_change_req_state(device, p->block_id, sector,
5704                                              &device->read_requests, __func__,
5705                                              NEG_ACKED, false);
5706 }
5707
5708 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5709 {
5710         struct drbd_peer_device *peer_device;
5711         struct drbd_device *device;
5712         sector_t sector;
5713         int size;
5714         struct p_block_ack *p = pi->data;
5715
5716         peer_device = conn_peer_device(connection, pi->vnr);
5717         if (!peer_device)
5718                 return -EIO;
5719         device = peer_device->device;
5720
5721         sector = be64_to_cpu(p->sector);
5722         size = be32_to_cpu(p->blksize);
5723
5724         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5725
5726         dec_rs_pending(device);
5727
5728         if (get_ldev_if_state(device, D_FAILED)) {
5729                 drbd_rs_complete_io(device, sector);
5730                 switch (pi->cmd) {
5731                 case P_NEG_RS_DREPLY:
5732                         drbd_rs_failed_io(device, sector, size);
5733                 case P_RS_CANCEL:
5734                         break;
5735                 default:
5736                         BUG();
5737                 }
5738                 put_ldev(device);
5739         }
5740
5741         return 0;
5742 }
5743
5744 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5745 {
5746         struct p_barrier_ack *p = pi->data;
5747         struct drbd_peer_device *peer_device;
5748         int vnr;
5749
5750         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5751
5752         rcu_read_lock();
5753         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5754                 struct drbd_device *device = peer_device->device;
5755
5756                 if (device->state.conn == C_AHEAD &&
5757                     atomic_read(&device->ap_in_flight) == 0 &&
5758                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5759                         device->start_resync_timer.expires = jiffies + HZ;
5760                         add_timer(&device->start_resync_timer);
5761                 }
5762         }
5763         rcu_read_unlock();
5764
5765         return 0;
5766 }
5767
5768 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5769 {
5770         struct drbd_peer_device *peer_device;
5771         struct drbd_device *device;
5772         struct p_block_ack *p = pi->data;
5773         struct drbd_device_work *dw;
5774         sector_t sector;
5775         int size;
5776
5777         peer_device = conn_peer_device(connection, pi->vnr);
5778         if (!peer_device)
5779                 return -EIO;
5780         device = peer_device->device;
5781
5782         sector = be64_to_cpu(p->sector);
5783         size = be32_to_cpu(p->blksize);
5784
5785         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5786
5787         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5788                 drbd_ov_out_of_sync_found(device, sector, size);
5789         else
5790                 ov_out_of_sync_print(device);
5791
5792         if (!get_ldev(device))
5793                 return 0;
5794
5795         drbd_rs_complete_io(device, sector);
5796         dec_rs_pending(device);
5797
5798         --device->ov_left;
5799
5800         /* let's advance progress step marks only for every other megabyte */
5801         if ((device->ov_left & 0x200) == 0x200)
5802                 drbd_advance_rs_marks(device, device->ov_left);
5803
5804         if (device->ov_left == 0) {
5805                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5806                 if (dw) {
5807                         dw->w.cb = w_ov_finished;
5808                         dw->device = device;
5809                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5810                 } else {
5811                         drbd_err(device, "kmalloc(dw) failed.");
5812                         ov_out_of_sync_print(device);
5813                         drbd_resync_finished(device);
5814                 }
5815         }
5816         put_ldev(device);
5817         return 0;
5818 }
5819
5820 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5821 {
5822         return 0;
5823 }
5824
5825 struct meta_sock_cmd {
5826         size_t pkt_size;
5827         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5828 };
5829
5830 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5831 {
5832         long t;
5833         struct net_conf *nc;
5834
5835         rcu_read_lock();
5836         nc = rcu_dereference(connection->net_conf);
5837         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5838         rcu_read_unlock();
5839
5840         t *= HZ;
5841         if (ping_timeout)
5842                 t /= 10;
5843
5844         connection->meta.socket->sk->sk_rcvtimeo = t;
5845 }
5846
5847 static void set_ping_timeout(struct drbd_connection *connection)
5848 {
5849         set_rcvtimeo(connection, 1);
5850 }
5851
5852 static void set_idle_timeout(struct drbd_connection *connection)
5853 {
5854         set_rcvtimeo(connection, 0);
5855 }
5856
5857 static struct meta_sock_cmd ack_receiver_tbl[] = {
5858         [P_PING]            = { 0, got_Ping },
5859         [P_PING_ACK]        = { 0, got_PingAck },
5860         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5861         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5862         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5863         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5864         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5865         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5866         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5867         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5868         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5869         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5870         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5871         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5872         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5873         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5874         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5875 };
5876
5877 int drbd_ack_receiver(struct drbd_thread *thi)
5878 {
5879         struct drbd_connection *connection = thi->connection;
5880         struct meta_sock_cmd *cmd = NULL;
5881         struct packet_info pi;
5882         unsigned long pre_recv_jif;
5883         int rv;
5884         void *buf    = connection->meta.rbuf;
5885         int received = 0;
5886         unsigned int header_size = drbd_header_size(connection);
5887         int expect   = header_size;
5888         bool ping_timeout_active = false;
5889         struct sched_param param = { .sched_priority = 2 };
5890
5891         rv = sched_setscheduler(current, SCHED_RR, &param);
5892         if (rv < 0)
5893                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5894
5895         while (get_t_state(thi) == RUNNING) {
5896                 drbd_thread_current_set_cpu(thi);
5897
5898                 conn_reclaim_net_peer_reqs(connection);
5899
5900                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5901                         if (drbd_send_ping(connection)) {
5902                                 drbd_err(connection, "drbd_send_ping has failed\n");
5903                                 goto reconnect;
5904                         }
5905                         set_ping_timeout(connection);
5906                         ping_timeout_active = true;
5907                 }
5908
5909                 pre_recv_jif = jiffies;
5910                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5911
5912                 /* Note:
5913                  * -EINTR        (on meta) we got a signal
5914                  * -EAGAIN       (on meta) rcvtimeo expired
5915                  * -ECONNRESET   other side closed the connection
5916                  * -ERESTARTSYS  (on data) we got a signal
5917                  * rv <  0       other than above: unexpected error!
5918                  * rv == expected: full header or command
5919                  * rv <  expected: "woken" by signal during receive
5920                  * rv == 0       : "connection shut down by peer"
5921                  */
5922                 if (likely(rv > 0)) {
5923                         received += rv;
5924                         buf      += rv;
5925                 } else if (rv == 0) {
5926                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5927                                 long t;
5928                                 rcu_read_lock();
5929                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5930                                 rcu_read_unlock();
5931
5932                                 t = wait_event_timeout(connection->ping_wait,
5933                                                        connection->cstate < C_WF_REPORT_PARAMS,
5934                                                        t);
5935                                 if (t)
5936                                         break;
5937                         }
5938                         drbd_err(connection, "meta connection shut down by peer.\n");
5939                         goto reconnect;
5940                 } else if (rv == -EAGAIN) {
5941                         /* If the data socket received something meanwhile,
5942                          * that is good enough: peer is still alive. */
5943                         if (time_after(connection->last_received, pre_recv_jif))
5944                                 continue;
5945                         if (ping_timeout_active) {
5946                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5947                                 goto reconnect;
5948                         }
5949                         set_bit(SEND_PING, &connection->flags);
5950                         continue;
5951                 } else if (rv == -EINTR) {
5952                         /* maybe drbd_thread_stop(): the while condition will notice.
5953                          * maybe woken for send_ping: we'll send a ping above,
5954                          * and change the rcvtimeo */
5955                         flush_signals(current);
5956                         continue;
5957                 } else {
5958                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5959                         goto reconnect;
5960                 }
5961
5962                 if (received == expect && cmd == NULL) {
5963                         if (decode_header(connection, connection->meta.rbuf, &pi))
5964                                 goto reconnect;
5965                         cmd = &ack_receiver_tbl[pi.cmd];
5966                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5967                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5968                                          cmdname(pi.cmd), pi.cmd);
5969                                 goto disconnect;
5970                         }
5971                         expect = header_size + cmd->pkt_size;
5972                         if (pi.size != expect - header_size) {
5973                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5974                                         pi.cmd, pi.size);
5975                                 goto reconnect;
5976                         }
5977                 }
5978                 if (received == expect) {
5979                         bool err;
5980
5981                         err = cmd->fn(connection, &pi);
5982                         if (err) {
5983                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5984                                 goto reconnect;
5985                         }
5986
5987                         connection->last_received = jiffies;
5988
5989                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5990                                 set_idle_timeout(connection);
5991                                 ping_timeout_active = false;
5992                         }
5993
5994                         buf      = connection->meta.rbuf;
5995                         received = 0;
5996                         expect   = header_size;
5997                         cmd      = NULL;
5998                 }
5999         }
6000
6001         if (0) {
6002 reconnect:
6003                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6004                 conn_md_sync(connection);
6005         }
6006         if (0) {
6007 disconnect:
6008                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6009         }
6010
6011         drbd_info(connection, "ack_receiver terminated\n");
6012
6013         return 0;
6014 }
6015
6016 void drbd_send_acks_wf(struct work_struct *ws)
6017 {
6018         struct drbd_peer_device *peer_device =
6019                 container_of(ws, struct drbd_peer_device, send_acks_work);
6020         struct drbd_connection *connection = peer_device->connection;
6021         struct drbd_device *device = peer_device->device;
6022         struct net_conf *nc;
6023         int tcp_cork, err;
6024
6025         rcu_read_lock();
6026         nc = rcu_dereference(connection->net_conf);
6027         tcp_cork = nc->tcp_cork;
6028         rcu_read_unlock();
6029
6030         if (tcp_cork)
6031                 drbd_tcp_cork(connection->meta.socket);
6032
6033         err = drbd_finish_peer_reqs(device);
6034         kref_put(&device->kref, drbd_destroy_device);
6035         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6036            struct work_struct send_acks_work alive, which is in the peer_device object */
6037
6038         if (err) {
6039                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6040                 return;
6041         }
6042
6043         if (tcp_cork)
6044                 drbd_tcp_uncork(connection->meta.socket);
6045
6046         return;
6047 }