GNU Linux-libre 4.4.284-gnu1
[releases.git] / fs / ocfs2 / dlm / dlmdomain.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/init.h>
32 #include <linux/spinlock.h>
33 #include <linux/delay.h>
34 #include <linux/err.h>
35 #include <linux/debugfs.h>
36
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43 #include "dlmdomain.h"
44 #include "dlmdebug.h"
45
46 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
47 #include "cluster/masklog.h"
48
49 /*
50  * ocfs2 node maps are array of long int, which limits to send them freely
51  * across the wire due to endianness issues. To workaround this, we convert
52  * long ints to byte arrays. Following 3 routines are helper functions to
53  * set/test/copy bits within those array of bytes
54  */
55 static inline void byte_set_bit(u8 nr, u8 map[])
56 {
57         map[nr >> 3] |= (1UL << (nr & 7));
58 }
59
60 static inline int byte_test_bit(u8 nr, u8 map[])
61 {
62         return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
63 }
64
65 static inline void byte_copymap(u8 dmap[], unsigned long smap[],
66                         unsigned int sz)
67 {
68         unsigned int nn;
69
70         if (!sz)
71                 return;
72
73         memset(dmap, 0, ((sz + 7) >> 3));
74         for (nn = 0 ; nn < sz; nn++)
75                 if (test_bit(nn, smap))
76                         byte_set_bit(nn, dmap);
77 }
78
79 static void dlm_free_pagevec(void **vec, int pages)
80 {
81         while (pages--)
82                 free_page((unsigned long)vec[pages]);
83         kfree(vec);
84 }
85
86 static void **dlm_alloc_pagevec(int pages)
87 {
88         void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
89         int i;
90
91         if (!vec)
92                 return NULL;
93
94         for (i = 0; i < pages; i++)
95                 if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
96                         goto out_free;
97
98         mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
99              pages, (unsigned long)DLM_HASH_PAGES,
100              (unsigned long)DLM_BUCKETS_PER_PAGE);
101         return vec;
102 out_free:
103         dlm_free_pagevec(vec, i);
104         return NULL;
105 }
106
107 /*
108  *
109  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
110  *    dlm_domain_lock
111  *    struct dlm_ctxt->spinlock
112  *    struct dlm_lock_resource->spinlock
113  *    struct dlm_ctxt->master_lock
114  *    struct dlm_ctxt->ast_lock
115  *    dlm_master_list_entry->spinlock
116  *    dlm_lock->spinlock
117  *
118  */
119
120 DEFINE_SPINLOCK(dlm_domain_lock);
121 LIST_HEAD(dlm_domains);
122 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
123
124 /*
125  * The supported protocol version for DLM communication.  Running domains
126  * will have a negotiated version with the same major number and a minor
127  * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
128  * be used to determine what a running domain is actually using.
129  *
130  * New in version 1.1:
131  *      - Message DLM_QUERY_REGION added to support global heartbeat
132  *      - Message DLM_QUERY_NODEINFO added to allow online node removes
133  * New in version 1.2:
134  *      - Message DLM_BEGIN_EXIT_DOMAIN_MSG added to mark start of exit domain
135  */
136 static const struct dlm_protocol_version dlm_protocol = {
137         .pv_major = 1,
138         .pv_minor = 2,
139 };
140
141 #define DLM_DOMAIN_BACKOFF_MS 200
142
143 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
144                                   void **ret_data);
145 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
146                                      void **ret_data);
147 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
148                                    void **ret_data);
149 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
150                                     void *data, void **ret_data);
151 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
152                                    void **ret_data);
153 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
154                                 struct dlm_protocol_version *request);
155
156 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
157
158 void __dlm_unhash_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
159 {
160         if (hlist_unhashed(&res->hash_node))
161                 return;
162
163         mlog(0, "%s: Unhash res %.*s\n", dlm->name, res->lockname.len,
164              res->lockname.name);
165         hlist_del_init(&res->hash_node);
166         dlm_lockres_put(res);
167 }
168
169 void __dlm_insert_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
170 {
171         struct hlist_head *bucket;
172         struct qstr *q;
173
174         assert_spin_locked(&dlm->spinlock);
175
176         q = &res->lockname;
177         bucket = dlm_lockres_hash(dlm, q->hash);
178
179         /* get a reference for our hashtable */
180         dlm_lockres_get(res);
181
182         hlist_add_head(&res->hash_node, bucket);
183
184         mlog(0, "%s: Hash res %.*s\n", dlm->name, res->lockname.len,
185              res->lockname.name);
186 }
187
188 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
189                                                      const char *name,
190                                                      unsigned int len,
191                                                      unsigned int hash)
192 {
193         struct hlist_head *bucket;
194         struct dlm_lock_resource *res;
195
196         mlog(0, "%.*s\n", len, name);
197
198         assert_spin_locked(&dlm->spinlock);
199
200         bucket = dlm_lockres_hash(dlm, hash);
201
202         hlist_for_each_entry(res, bucket, hash_node) {
203                 if (res->lockname.name[0] != name[0])
204                         continue;
205                 if (unlikely(res->lockname.len != len))
206                         continue;
207                 if (memcmp(res->lockname.name + 1, name + 1, len - 1))
208                         continue;
209                 dlm_lockres_get(res);
210                 return res;
211         }
212         return NULL;
213 }
214
215 /* intended to be called by functions which do not care about lock
216  * resources which are being purged (most net _handler functions).
217  * this will return NULL for any lock resource which is found but
218  * currently in the process of dropping its mastery reference.
219  * use __dlm_lookup_lockres_full when you need the lock resource
220  * regardless (e.g. dlm_get_lock_resource) */
221 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
222                                                 const char *name,
223                                                 unsigned int len,
224                                                 unsigned int hash)
225 {
226         struct dlm_lock_resource *res = NULL;
227
228         mlog(0, "%.*s\n", len, name);
229
230         assert_spin_locked(&dlm->spinlock);
231
232         res = __dlm_lookup_lockres_full(dlm, name, len, hash);
233         if (res) {
234                 spin_lock(&res->spinlock);
235                 if (res->state & DLM_LOCK_RES_DROPPING_REF) {
236                         spin_unlock(&res->spinlock);
237                         dlm_lockres_put(res);
238                         return NULL;
239                 }
240                 spin_unlock(&res->spinlock);
241         }
242
243         return res;
244 }
245
246 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
247                                     const char *name,
248                                     unsigned int len)
249 {
250         struct dlm_lock_resource *res;
251         unsigned int hash = dlm_lockid_hash(name, len);
252
253         spin_lock(&dlm->spinlock);
254         res = __dlm_lookup_lockres(dlm, name, len, hash);
255         spin_unlock(&dlm->spinlock);
256         return res;
257 }
258
259 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
260 {
261         struct dlm_ctxt *tmp;
262
263         assert_spin_locked(&dlm_domain_lock);
264
265         /* tmp->name here is always NULL terminated,
266          * but domain may not be! */
267         list_for_each_entry(tmp, &dlm_domains, list) {
268                 if (strlen(tmp->name) == len &&
269                     memcmp(tmp->name, domain, len)==0)
270                         return tmp;
271         }
272
273         return NULL;
274 }
275
276 /* For null terminated domain strings ONLY */
277 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
278 {
279         assert_spin_locked(&dlm_domain_lock);
280
281         return __dlm_lookup_domain_full(domain, strlen(domain));
282 }
283
284
285 /* returns true on one of two conditions:
286  * 1) the domain does not exist
287  * 2) the domain exists and it's state is "joined" */
288 static int dlm_wait_on_domain_helper(const char *domain)
289 {
290         int ret = 0;
291         struct dlm_ctxt *tmp = NULL;
292
293         spin_lock(&dlm_domain_lock);
294
295         tmp = __dlm_lookup_domain(domain);
296         if (!tmp)
297                 ret = 1;
298         else if (tmp->dlm_state == DLM_CTXT_JOINED)
299                 ret = 1;
300
301         spin_unlock(&dlm_domain_lock);
302         return ret;
303 }
304
305 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
306 {
307         dlm_destroy_debugfs_subroot(dlm);
308
309         if (dlm->lockres_hash)
310                 dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
311
312         if (dlm->master_hash)
313                 dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
314
315         kfree(dlm->name);
316         kfree(dlm);
317 }
318
319 /* A little strange - this function will be called while holding
320  * dlm_domain_lock and is expected to be holding it on the way out. We
321  * will however drop and reacquire it multiple times */
322 static void dlm_ctxt_release(struct kref *kref)
323 {
324         struct dlm_ctxt *dlm;
325
326         dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
327
328         BUG_ON(dlm->num_joins);
329         BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
330
331         /* we may still be in the list if we hit an error during join. */
332         list_del_init(&dlm->list);
333
334         spin_unlock(&dlm_domain_lock);
335
336         mlog(0, "freeing memory from domain %s\n", dlm->name);
337
338         wake_up(&dlm_domain_events);
339
340         dlm_free_ctxt_mem(dlm);
341
342         spin_lock(&dlm_domain_lock);
343 }
344
345 void dlm_put(struct dlm_ctxt *dlm)
346 {
347         spin_lock(&dlm_domain_lock);
348         kref_put(&dlm->dlm_refs, dlm_ctxt_release);
349         spin_unlock(&dlm_domain_lock);
350 }
351
352 static void __dlm_get(struct dlm_ctxt *dlm)
353 {
354         kref_get(&dlm->dlm_refs);
355 }
356
357 /* given a questionable reference to a dlm object, gets a reference if
358  * it can find it in the list, otherwise returns NULL in which case
359  * you shouldn't trust your pointer. */
360 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
361 {
362         struct dlm_ctxt *target;
363         struct dlm_ctxt *ret = NULL;
364
365         spin_lock(&dlm_domain_lock);
366
367         list_for_each_entry(target, &dlm_domains, list) {
368                 if (target == dlm) {
369                         __dlm_get(target);
370                         ret = target;
371                         break;
372                 }
373         }
374
375         spin_unlock(&dlm_domain_lock);
376
377         return ret;
378 }
379
380 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
381 {
382         int ret;
383
384         spin_lock(&dlm_domain_lock);
385         ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
386                 (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
387         spin_unlock(&dlm_domain_lock);
388
389         return ret;
390 }
391
392 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
393 {
394         if (dlm->dlm_worker) {
395                 flush_workqueue(dlm->dlm_worker);
396                 destroy_workqueue(dlm->dlm_worker);
397                 dlm->dlm_worker = NULL;
398         }
399 }
400
401 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
402 {
403         dlm_unregister_domain_handlers(dlm);
404         dlm_debug_shutdown(dlm);
405         dlm_complete_thread(dlm);
406         dlm_complete_recovery_thread(dlm);
407         dlm_destroy_dlm_worker(dlm);
408
409         /* We've left the domain. Now we can take ourselves out of the
410          * list and allow the kref stuff to help us free the
411          * memory. */
412         spin_lock(&dlm_domain_lock);
413         list_del_init(&dlm->list);
414         spin_unlock(&dlm_domain_lock);
415
416         /* Wake up anyone waiting for us to remove this domain */
417         wake_up(&dlm_domain_events);
418 }
419
420 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
421 {
422         int i, num, n, ret = 0;
423         struct dlm_lock_resource *res;
424         struct hlist_node *iter;
425         struct hlist_head *bucket;
426         int dropped;
427
428         mlog(0, "Migrating locks from domain %s\n", dlm->name);
429
430         num = 0;
431         spin_lock(&dlm->spinlock);
432         for (i = 0; i < DLM_HASH_BUCKETS; i++) {
433 redo_bucket:
434                 n = 0;
435                 bucket = dlm_lockres_hash(dlm, i);
436                 iter = bucket->first;
437                 while (iter) {
438                         n++;
439                         res = hlist_entry(iter, struct dlm_lock_resource,
440                                           hash_node);
441                         dlm_lockres_get(res);
442                         /* migrate, if necessary.  this will drop the dlm
443                          * spinlock and retake it if it does migration. */
444                         dropped = dlm_empty_lockres(dlm, res);
445
446                         spin_lock(&res->spinlock);
447                         if (dropped)
448                                 __dlm_lockres_calc_usage(dlm, res);
449                         else
450                                 iter = res->hash_node.next;
451                         spin_unlock(&res->spinlock);
452
453                         dlm_lockres_put(res);
454
455                         if (dropped) {
456                                 cond_resched_lock(&dlm->spinlock);
457                                 goto redo_bucket;
458                         }
459                 }
460                 cond_resched_lock(&dlm->spinlock);
461                 num += n;
462         }
463         spin_unlock(&dlm->spinlock);
464         wake_up(&dlm->dlm_thread_wq);
465
466         /* let the dlm thread take care of purging, keep scanning until
467          * nothing remains in the hash */
468         if (num) {
469                 mlog(0, "%s: %d lock resources in hash last pass\n",
470                      dlm->name, num);
471                 ret = -EAGAIN;
472         }
473         mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
474         return ret;
475 }
476
477 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
478 {
479         int ret;
480
481         spin_lock(&dlm->spinlock);
482         ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
483         spin_unlock(&dlm->spinlock);
484
485         return ret;
486 }
487
488 static int dlm_begin_exit_domain_handler(struct o2net_msg *msg, u32 len,
489                                          void *data, void **ret_data)
490 {
491         struct dlm_ctxt *dlm = data;
492         unsigned int node;
493         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
494
495         if (!dlm_grab(dlm))
496                 return 0;
497
498         node = exit_msg->node_idx;
499         mlog(0, "%s: Node %u sent a begin exit domain message\n", dlm->name, node);
500
501         spin_lock(&dlm->spinlock);
502         set_bit(node, dlm->exit_domain_map);
503         spin_unlock(&dlm->spinlock);
504
505         dlm_put(dlm);
506
507         return 0;
508 }
509
510 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
511 {
512         /* Yikes, a double spinlock! I need domain_lock for the dlm
513          * state and the dlm spinlock for join state... Sorry! */
514 again:
515         spin_lock(&dlm_domain_lock);
516         spin_lock(&dlm->spinlock);
517
518         if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
519                 mlog(0, "Node %d is joining, we wait on it.\n",
520                           dlm->joining_node);
521                 spin_unlock(&dlm->spinlock);
522                 spin_unlock(&dlm_domain_lock);
523
524                 wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
525                 goto again;
526         }
527
528         dlm->dlm_state = DLM_CTXT_LEAVING;
529         spin_unlock(&dlm->spinlock);
530         spin_unlock(&dlm_domain_lock);
531 }
532
533 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
534 {
535         int node = -1, num = 0;
536
537         assert_spin_locked(&dlm->spinlock);
538
539         printk("( ");
540         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
541                                      node + 1)) < O2NM_MAX_NODES) {
542                 printk("%d ", node);
543                 ++num;
544         }
545         printk(") %u nodes\n", num);
546 }
547
548 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
549                                    void **ret_data)
550 {
551         struct dlm_ctxt *dlm = data;
552         unsigned int node;
553         struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
554
555         mlog(0, "%p %u %p", msg, len, data);
556
557         if (!dlm_grab(dlm))
558                 return 0;
559
560         node = exit_msg->node_idx;
561
562         spin_lock(&dlm->spinlock);
563         clear_bit(node, dlm->domain_map);
564         clear_bit(node, dlm->exit_domain_map);
565         printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s ", node, dlm->name);
566         __dlm_print_nodes(dlm);
567
568         /* notify anything attached to the heartbeat events */
569         dlm_hb_event_notify_attached(dlm, node, 0);
570
571         spin_unlock(&dlm->spinlock);
572
573         dlm_put(dlm);
574
575         return 0;
576 }
577
578 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm, u32 msg_type,
579                                     unsigned int node)
580 {
581         int status;
582         struct dlm_exit_domain leave_msg;
583
584         mlog(0, "%s: Sending domain exit message %u to node %u\n", dlm->name,
585              msg_type, node);
586
587         memset(&leave_msg, 0, sizeof(leave_msg));
588         leave_msg.node_idx = dlm->node_num;
589
590         status = o2net_send_message(msg_type, dlm->key, &leave_msg,
591                                     sizeof(leave_msg), node, NULL);
592         if (status < 0)
593                 mlog(ML_ERROR, "Error %d sending domain exit message %u "
594                      "to node %u on domain %s\n", status, msg_type, node,
595                      dlm->name);
596
597         return status;
598 }
599
600 static void dlm_begin_exit_domain(struct dlm_ctxt *dlm)
601 {
602         int node = -1;
603
604         /* Support for begin exit domain was added in 1.2 */
605         if (dlm->dlm_locking_proto.pv_major == 1 &&
606             dlm->dlm_locking_proto.pv_minor < 2)
607                 return;
608
609         /*
610          * Unlike DLM_EXIT_DOMAIN_MSG, DLM_BEGIN_EXIT_DOMAIN_MSG is purely
611          * informational. Meaning if a node does not receive the message,
612          * so be it.
613          */
614         spin_lock(&dlm->spinlock);
615         while (1) {
616                 node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, node + 1);
617                 if (node >= O2NM_MAX_NODES)
618                         break;
619                 if (node == dlm->node_num)
620                         continue;
621
622                 spin_unlock(&dlm->spinlock);
623                 dlm_send_one_domain_exit(dlm, DLM_BEGIN_EXIT_DOMAIN_MSG, node);
624                 spin_lock(&dlm->spinlock);
625         }
626         spin_unlock(&dlm->spinlock);
627 }
628
629 static void dlm_leave_domain(struct dlm_ctxt *dlm)
630 {
631         int node, clear_node, status;
632
633         /* At this point we've migrated away all our locks and won't
634          * accept mastership of new ones. The dlm is responsible for
635          * almost nothing now. We make sure not to confuse any joining
636          * nodes and then commence shutdown procedure. */
637
638         spin_lock(&dlm->spinlock);
639         /* Clear ourselves from the domain map */
640         clear_bit(dlm->node_num, dlm->domain_map);
641         while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
642                                      0)) < O2NM_MAX_NODES) {
643                 /* Drop the dlm spinlock. This is safe wrt the domain_map.
644                  * -nodes cannot be added now as the
645                  *   query_join_handlers knows to respond with OK_NO_MAP
646                  * -we catch the right network errors if a node is
647                  *   removed from the map while we're sending him the
648                  *   exit message. */
649                 spin_unlock(&dlm->spinlock);
650
651                 clear_node = 1;
652
653                 status = dlm_send_one_domain_exit(dlm, DLM_EXIT_DOMAIN_MSG,
654                                                   node);
655                 if (status < 0 &&
656                     status != -ENOPROTOOPT &&
657                     status != -ENOTCONN) {
658                         mlog(ML_NOTICE, "Error %d sending domain exit message "
659                              "to node %d\n", status, node);
660
661                         /* Not sure what to do here but lets sleep for
662                          * a bit in case this was a transient
663                          * error... */
664                         msleep(DLM_DOMAIN_BACKOFF_MS);
665                         clear_node = 0;
666                 }
667
668                 spin_lock(&dlm->spinlock);
669                 /* If we're not clearing the node bit then we intend
670                  * to loop back around to try again. */
671                 if (clear_node)
672                         clear_bit(node, dlm->domain_map);
673         }
674         spin_unlock(&dlm->spinlock);
675 }
676
677 void dlm_unregister_domain(struct dlm_ctxt *dlm)
678 {
679         int leave = 0;
680         struct dlm_lock_resource *res;
681
682         spin_lock(&dlm_domain_lock);
683         BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
684         BUG_ON(!dlm->num_joins);
685
686         dlm->num_joins--;
687         if (!dlm->num_joins) {
688                 /* We mark it "in shutdown" now so new register
689                  * requests wait until we've completely left the
690                  * domain. Don't use DLM_CTXT_LEAVING yet as we still
691                  * want new domain joins to communicate with us at
692                  * least until we've completed migration of our
693                  * resources. */
694                 dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
695                 leave = 1;
696         }
697         spin_unlock(&dlm_domain_lock);
698
699         if (leave) {
700                 mlog(0, "shutting down domain %s\n", dlm->name);
701                 dlm_begin_exit_domain(dlm);
702
703                 /* We changed dlm state, notify the thread */
704                 dlm_kick_thread(dlm, NULL);
705
706                 while (dlm_migrate_all_locks(dlm)) {
707                         /* Give dlm_thread time to purge the lockres' */
708                         msleep(500);
709                         mlog(0, "%s: more migration to do\n", dlm->name);
710                 }
711
712                 /* This list should be empty. If not, print remaining lockres */
713                 if (!list_empty(&dlm->tracking_list)) {
714                         mlog(ML_ERROR, "Following lockres' are still on the "
715                              "tracking list:\n");
716                         list_for_each_entry(res, &dlm->tracking_list, tracking)
717                                 dlm_print_one_lock_resource(res);
718                 }
719
720                 dlm_mark_domain_leaving(dlm);
721                 dlm_leave_domain(dlm);
722                 printk(KERN_NOTICE "o2dlm: Leaving domain %s\n", dlm->name);
723                 dlm_force_free_mles(dlm);
724                 dlm_complete_dlm_shutdown(dlm);
725         }
726         dlm_put(dlm);
727 }
728 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
729
730 static int dlm_query_join_proto_check(char *proto_type, int node,
731                                       struct dlm_protocol_version *ours,
732                                       struct dlm_protocol_version *request)
733 {
734         int rc;
735         struct dlm_protocol_version proto = *request;
736
737         if (!dlm_protocol_compare(ours, &proto)) {
738                 mlog(0,
739                      "node %u wanted to join with %s locking protocol "
740                      "%u.%u, we respond with %u.%u\n",
741                      node, proto_type,
742                      request->pv_major,
743                      request->pv_minor,
744                      proto.pv_major, proto.pv_minor);
745                 request->pv_minor = proto.pv_minor;
746                 rc = 0;
747         } else {
748                 mlog(ML_NOTICE,
749                      "Node %u wanted to join with %s locking "
750                      "protocol %u.%u, but we have %u.%u, disallowing\n",
751                      node, proto_type,
752                      request->pv_major,
753                      request->pv_minor,
754                      ours->pv_major,
755                      ours->pv_minor);
756                 rc = 1;
757         }
758
759         return rc;
760 }
761
762 /*
763  * struct dlm_query_join_packet is made up of four one-byte fields.  They
764  * are effectively in big-endian order already.  However, little-endian
765  * machines swap them before putting the packet on the wire (because
766  * query_join's response is a status, and that status is treated as a u32
767  * on the wire).  Thus, a big-endian and little-endian machines will treat
768  * this structure differently.
769  *
770  * The solution is to have little-endian machines swap the structure when
771  * converting from the structure to the u32 representation.  This will
772  * result in the structure having the correct format on the wire no matter
773  * the host endian format.
774  */
775 static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
776                                           u32 *wire)
777 {
778         union dlm_query_join_response response;
779
780         response.packet = *packet;
781         *wire = be32_to_cpu(response.intval);
782 }
783
784 static void dlm_query_join_wire_to_packet(u32 wire,
785                                           struct dlm_query_join_packet *packet)
786 {
787         union dlm_query_join_response response;
788
789         response.intval = cpu_to_be32(wire);
790         *packet = response.packet;
791 }
792
793 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
794                                   void **ret_data)
795 {
796         struct dlm_query_join_request *query;
797         struct dlm_query_join_packet packet = {
798                 .code = JOIN_DISALLOW,
799         };
800         struct dlm_ctxt *dlm = NULL;
801         u32 response;
802         u8 nodenum;
803
804         query = (struct dlm_query_join_request *) msg->buf;
805
806         mlog(0, "node %u wants to join domain %s\n", query->node_idx,
807                   query->domain);
808
809         /*
810          * If heartbeat doesn't consider the node live, tell it
811          * to back off and try again.  This gives heartbeat a chance
812          * to catch up.
813          */
814         if (!o2hb_check_node_heartbeating_no_sem(query->node_idx)) {
815                 mlog(0, "node %u is not in our live map yet\n",
816                      query->node_idx);
817
818                 packet.code = JOIN_DISALLOW;
819                 goto respond;
820         }
821
822         packet.code = JOIN_OK_NO_MAP;
823
824         spin_lock(&dlm_domain_lock);
825         dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
826         if (!dlm)
827                 goto unlock_respond;
828
829         /*
830          * There is a small window where the joining node may not see the
831          * node(s) that just left but still part of the cluster. DISALLOW
832          * join request if joining node has different node map.
833          */
834         nodenum=0;
835         while (nodenum < O2NM_MAX_NODES) {
836                 if (test_bit(nodenum, dlm->domain_map)) {
837                         if (!byte_test_bit(nodenum, query->node_map)) {
838                                 mlog(0, "disallow join as node %u does not "
839                                      "have node %u in its nodemap\n",
840                                      query->node_idx, nodenum);
841                                 packet.code = JOIN_DISALLOW;
842                                 goto unlock_respond;
843                         }
844                 }
845                 nodenum++;
846         }
847
848         /* Once the dlm ctxt is marked as leaving then we don't want
849          * to be put in someone's domain map.
850          * Also, explicitly disallow joining at certain troublesome
851          * times (ie. during recovery). */
852         if (dlm->dlm_state != DLM_CTXT_LEAVING) {
853                 int bit = query->node_idx;
854                 spin_lock(&dlm->spinlock);
855
856                 if (dlm->dlm_state == DLM_CTXT_NEW &&
857                     dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
858                         /*If this is a brand new context and we
859                          * haven't started our join process yet, then
860                          * the other node won the race. */
861                         packet.code = JOIN_OK_NO_MAP;
862                 } else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
863                         /* Disallow parallel joins. */
864                         packet.code = JOIN_DISALLOW;
865                 } else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
866                         mlog(0, "node %u trying to join, but recovery "
867                              "is ongoing.\n", bit);
868                         packet.code = JOIN_DISALLOW;
869                 } else if (test_bit(bit, dlm->recovery_map)) {
870                         mlog(0, "node %u trying to join, but it "
871                              "still needs recovery.\n", bit);
872                         packet.code = JOIN_DISALLOW;
873                 } else if (test_bit(bit, dlm->domain_map)) {
874                         mlog(0, "node %u trying to join, but it "
875                              "is still in the domain! needs recovery?\n",
876                              bit);
877                         packet.code = JOIN_DISALLOW;
878                 } else {
879                         /* Alright we're fully a part of this domain
880                          * so we keep some state as to who's joining
881                          * and indicate to him that needs to be fixed
882                          * up. */
883
884                         /* Make sure we speak compatible locking protocols.  */
885                         if (dlm_query_join_proto_check("DLM", bit,
886                                                        &dlm->dlm_locking_proto,
887                                                        &query->dlm_proto)) {
888                                 packet.code = JOIN_PROTOCOL_MISMATCH;
889                         } else if (dlm_query_join_proto_check("fs", bit,
890                                                               &dlm->fs_locking_proto,
891                                                               &query->fs_proto)) {
892                                 packet.code = JOIN_PROTOCOL_MISMATCH;
893                         } else {
894                                 packet.dlm_minor = query->dlm_proto.pv_minor;
895                                 packet.fs_minor = query->fs_proto.pv_minor;
896                                 packet.code = JOIN_OK;
897                                 __dlm_set_joining_node(dlm, query->node_idx);
898                         }
899                 }
900
901                 spin_unlock(&dlm->spinlock);
902         }
903 unlock_respond:
904         spin_unlock(&dlm_domain_lock);
905
906 respond:
907         mlog(0, "We respond with %u\n", packet.code);
908
909         dlm_query_join_packet_to_wire(&packet, &response);
910         return response;
911 }
912
913 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
914                                      void **ret_data)
915 {
916         struct dlm_assert_joined *assert;
917         struct dlm_ctxt *dlm = NULL;
918
919         assert = (struct dlm_assert_joined *) msg->buf;
920
921         mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
922                   assert->domain);
923
924         spin_lock(&dlm_domain_lock);
925         dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
926         /* XXX should we consider no dlm ctxt an error? */
927         if (dlm) {
928                 spin_lock(&dlm->spinlock);
929
930                 /* Alright, this node has officially joined our
931                  * domain. Set him in the map and clean up our
932                  * leftover join state. */
933                 BUG_ON(dlm->joining_node != assert->node_idx);
934
935                 if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
936                         mlog(0, "dlm recovery is ongoing, disallow join\n");
937                         spin_unlock(&dlm->spinlock);
938                         spin_unlock(&dlm_domain_lock);
939                         return -EAGAIN;
940                 }
941
942                 set_bit(assert->node_idx, dlm->domain_map);
943                 clear_bit(assert->node_idx, dlm->exit_domain_map);
944                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
945
946                 printk(KERN_NOTICE "o2dlm: Node %u joins domain %s ",
947                        assert->node_idx, dlm->name);
948                 __dlm_print_nodes(dlm);
949
950                 /* notify anything attached to the heartbeat events */
951                 dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
952
953                 spin_unlock(&dlm->spinlock);
954         }
955         spin_unlock(&dlm_domain_lock);
956
957         return 0;
958 }
959
960 static int dlm_match_regions(struct dlm_ctxt *dlm,
961                              struct dlm_query_region *qr,
962                              char *local, int locallen)
963 {
964         char *remote = qr->qr_regions;
965         char *l, *r;
966         int localnr, i, j, foundit;
967         int status = 0;
968
969         if (!o2hb_global_heartbeat_active()) {
970                 if (qr->qr_numregions) {
971                         mlog(ML_ERROR, "Domain %s: Joining node %d has global "
972                              "heartbeat enabled but local node %d does not\n",
973                              qr->qr_domain, qr->qr_node, dlm->node_num);
974                         status = -EINVAL;
975                 }
976                 goto bail;
977         }
978
979         if (o2hb_global_heartbeat_active() && !qr->qr_numregions) {
980                 mlog(ML_ERROR, "Domain %s: Local node %d has global "
981                      "heartbeat enabled but joining node %d does not\n",
982                      qr->qr_domain, dlm->node_num, qr->qr_node);
983                 status = -EINVAL;
984                 goto bail;
985         }
986
987         r = remote;
988         for (i = 0; i < qr->qr_numregions; ++i) {
989                 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, r);
990                 r += O2HB_MAX_REGION_NAME_LEN;
991         }
992
993         localnr = min(O2NM_MAX_REGIONS, locallen/O2HB_MAX_REGION_NAME_LEN);
994         localnr = o2hb_get_all_regions(local, (u8)localnr);
995
996         /* compare local regions with remote */
997         l = local;
998         for (i = 0; i < localnr; ++i) {
999                 foundit = 0;
1000                 r = remote;
1001                 for (j = 0; j <= qr->qr_numregions; ++j) {
1002                         if (!memcmp(l, r, O2HB_MAX_REGION_NAME_LEN)) {
1003                                 foundit = 1;
1004                                 break;
1005                         }
1006                         r += O2HB_MAX_REGION_NAME_LEN;
1007                 }
1008                 if (!foundit) {
1009                         status = -EINVAL;
1010                         mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1011                              "in local node %d but not in joining node %d\n",
1012                              qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, l,
1013                              dlm->node_num, qr->qr_node);
1014                         goto bail;
1015                 }
1016                 l += O2HB_MAX_REGION_NAME_LEN;
1017         }
1018
1019         /* compare remote with local regions */
1020         r = remote;
1021         for (i = 0; i < qr->qr_numregions; ++i) {
1022                 foundit = 0;
1023                 l = local;
1024                 for (j = 0; j < localnr; ++j) {
1025                         if (!memcmp(r, l, O2HB_MAX_REGION_NAME_LEN)) {
1026                                 foundit = 1;
1027                                 break;
1028                         }
1029                         l += O2HB_MAX_REGION_NAME_LEN;
1030                 }
1031                 if (!foundit) {
1032                         status = -EINVAL;
1033                         mlog(ML_ERROR, "Domain %s: Region '%.*s' registered "
1034                              "in joining node %d but not in local node %d\n",
1035                              qr->qr_domain, O2HB_MAX_REGION_NAME_LEN, r,
1036                              qr->qr_node, dlm->node_num);
1037                         goto bail;
1038                 }
1039                 r += O2HB_MAX_REGION_NAME_LEN;
1040         }
1041
1042 bail:
1043         return status;
1044 }
1045
1046 static int dlm_send_regions(struct dlm_ctxt *dlm, unsigned long *node_map)
1047 {
1048         struct dlm_query_region *qr = NULL;
1049         int status, ret = 0, i;
1050         char *p;
1051
1052         if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1053                 goto bail;
1054
1055         qr = kzalloc(sizeof(struct dlm_query_region), GFP_KERNEL);
1056         if (!qr) {
1057                 ret = -ENOMEM;
1058                 mlog_errno(ret);
1059                 goto bail;
1060         }
1061
1062         qr->qr_node = dlm->node_num;
1063         qr->qr_namelen = strlen(dlm->name);
1064         memcpy(qr->qr_domain, dlm->name, qr->qr_namelen);
1065         /* if local hb, the numregions will be zero */
1066         if (o2hb_global_heartbeat_active())
1067                 qr->qr_numregions = o2hb_get_all_regions(qr->qr_regions,
1068                                                          O2NM_MAX_REGIONS);
1069
1070         p = qr->qr_regions;
1071         for (i = 0; i < qr->qr_numregions; ++i, p += O2HB_MAX_REGION_NAME_LEN)
1072                 mlog(0, "Region %.*s\n", O2HB_MAX_REGION_NAME_LEN, p);
1073
1074         i = -1;
1075         while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1076                                   i + 1)) < O2NM_MAX_NODES) {
1077                 if (i == dlm->node_num)
1078                         continue;
1079
1080                 mlog(0, "Sending regions to node %d\n", i);
1081
1082                 ret = o2net_send_message(DLM_QUERY_REGION, DLM_MOD_KEY, qr,
1083                                          sizeof(struct dlm_query_region),
1084                                          i, &status);
1085                 if (ret >= 0)
1086                         ret = status;
1087                 if (ret) {
1088                         mlog(ML_ERROR, "Region mismatch %d, node %d\n",
1089                              ret, i);
1090                         break;
1091                 }
1092         }
1093
1094 bail:
1095         kfree(qr);
1096         return ret;
1097 }
1098
1099 static int dlm_query_region_handler(struct o2net_msg *msg, u32 len,
1100                                     void *data, void **ret_data)
1101 {
1102         struct dlm_query_region *qr;
1103         struct dlm_ctxt *dlm = NULL;
1104         char *local = NULL;
1105         int status = 0;
1106
1107         qr = (struct dlm_query_region *) msg->buf;
1108
1109         mlog(0, "Node %u queries hb regions on domain %s\n", qr->qr_node,
1110              qr->qr_domain);
1111
1112         /* buffer used in dlm_mast_regions() */
1113         local = kmalloc(sizeof(qr->qr_regions), GFP_KERNEL);
1114         if (!local)
1115                 return -ENOMEM;
1116
1117         status = -EINVAL;
1118
1119         spin_lock(&dlm_domain_lock);
1120         dlm = __dlm_lookup_domain_full(qr->qr_domain, qr->qr_namelen);
1121         if (!dlm) {
1122                 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1123                      "before join domain\n", qr->qr_node, qr->qr_domain);
1124                 goto out_domain_lock;
1125         }
1126
1127         spin_lock(&dlm->spinlock);
1128         if (dlm->joining_node != qr->qr_node) {
1129                 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1130                      "but joining node is %d\n", qr->qr_node, qr->qr_domain,
1131                      dlm->joining_node);
1132                 goto out_dlm_lock;
1133         }
1134
1135         /* Support for global heartbeat was added in 1.1 */
1136         if (dlm->dlm_locking_proto.pv_major == 1 &&
1137             dlm->dlm_locking_proto.pv_minor == 0) {
1138                 mlog(ML_ERROR, "Node %d queried hb regions on domain %s "
1139                      "but active dlm protocol is %d.%d\n", qr->qr_node,
1140                      qr->qr_domain, dlm->dlm_locking_proto.pv_major,
1141                      dlm->dlm_locking_proto.pv_minor);
1142                 goto out_dlm_lock;
1143         }
1144
1145         status = dlm_match_regions(dlm, qr, local, sizeof(qr->qr_regions));
1146
1147 out_dlm_lock:
1148         spin_unlock(&dlm->spinlock);
1149
1150 out_domain_lock:
1151         spin_unlock(&dlm_domain_lock);
1152
1153         kfree(local);
1154
1155         return status;
1156 }
1157
1158 static int dlm_match_nodes(struct dlm_ctxt *dlm, struct dlm_query_nodeinfo *qn)
1159 {
1160         struct o2nm_node *local;
1161         struct dlm_node_info *remote;
1162         int i, j;
1163         int status = 0;
1164
1165         for (j = 0; j < qn->qn_numnodes; ++j)
1166                 mlog(0, "Node %3d, %pI4:%u\n", qn->qn_nodes[j].ni_nodenum,
1167                      &(qn->qn_nodes[j].ni_ipv4_address),
1168                      ntohs(qn->qn_nodes[j].ni_ipv4_port));
1169
1170         for (i = 0; i < O2NM_MAX_NODES && !status; ++i) {
1171                 local = o2nm_get_node_by_num(i);
1172                 remote = NULL;
1173                 for (j = 0; j < qn->qn_numnodes; ++j) {
1174                         if (qn->qn_nodes[j].ni_nodenum == i) {
1175                                 remote = &(qn->qn_nodes[j]);
1176                                 break;
1177                         }
1178                 }
1179
1180                 if (!local && !remote)
1181                         continue;
1182
1183                 if ((local && !remote) || (!local && remote))
1184                         status = -EINVAL;
1185
1186                 if (!status &&
1187                     ((remote->ni_nodenum != local->nd_num) ||
1188                      (remote->ni_ipv4_port != local->nd_ipv4_port) ||
1189                      (remote->ni_ipv4_address != local->nd_ipv4_address)))
1190                         status = -EINVAL;
1191
1192                 if (status) {
1193                         if (remote && !local)
1194                                 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1195                                      "registered in joining node %d but not in "
1196                                      "local node %d\n", qn->qn_domain,
1197                                      remote->ni_nodenum,
1198                                      &(remote->ni_ipv4_address),
1199                                      ntohs(remote->ni_ipv4_port),
1200                                      qn->qn_nodenum, dlm->node_num);
1201                         if (local && !remote)
1202                                 mlog(ML_ERROR, "Domain %s: Node %d (%pI4:%u) "
1203                                      "registered in local node %d but not in "
1204                                      "joining node %d\n", qn->qn_domain,
1205                                      local->nd_num, &(local->nd_ipv4_address),
1206                                      ntohs(local->nd_ipv4_port),
1207                                      dlm->node_num, qn->qn_nodenum);
1208                         BUG_ON((!local && !remote));
1209                 }
1210
1211                 if (local)
1212                         o2nm_node_put(local);
1213         }
1214
1215         return status;
1216 }
1217
1218 static int dlm_send_nodeinfo(struct dlm_ctxt *dlm, unsigned long *node_map)
1219 {
1220         struct dlm_query_nodeinfo *qn = NULL;
1221         struct o2nm_node *node;
1222         int ret = 0, status, count, i;
1223
1224         if (find_next_bit(node_map, O2NM_MAX_NODES, 0) >= O2NM_MAX_NODES)
1225                 goto bail;
1226
1227         qn = kzalloc(sizeof(struct dlm_query_nodeinfo), GFP_KERNEL);
1228         if (!qn) {
1229                 ret = -ENOMEM;
1230                 mlog_errno(ret);
1231                 goto bail;
1232         }
1233
1234         for (i = 0, count = 0; i < O2NM_MAX_NODES; ++i) {
1235                 node = o2nm_get_node_by_num(i);
1236                 if (!node)
1237                         continue;
1238                 qn->qn_nodes[count].ni_nodenum = node->nd_num;
1239                 qn->qn_nodes[count].ni_ipv4_port = node->nd_ipv4_port;
1240                 qn->qn_nodes[count].ni_ipv4_address = node->nd_ipv4_address;
1241                 mlog(0, "Node %3d, %pI4:%u\n", node->nd_num,
1242                      &(node->nd_ipv4_address), ntohs(node->nd_ipv4_port));
1243                 ++count;
1244                 o2nm_node_put(node);
1245         }
1246
1247         qn->qn_nodenum = dlm->node_num;
1248         qn->qn_numnodes = count;
1249         qn->qn_namelen = strlen(dlm->name);
1250         memcpy(qn->qn_domain, dlm->name, qn->qn_namelen);
1251
1252         i = -1;
1253         while ((i = find_next_bit(node_map, O2NM_MAX_NODES,
1254                                   i + 1)) < O2NM_MAX_NODES) {
1255                 if (i == dlm->node_num)
1256                         continue;
1257
1258                 mlog(0, "Sending nodeinfo to node %d\n", i);
1259
1260                 ret = o2net_send_message(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
1261                                          qn, sizeof(struct dlm_query_nodeinfo),
1262                                          i, &status);
1263                 if (ret >= 0)
1264                         ret = status;
1265                 if (ret) {
1266                         mlog(ML_ERROR, "node mismatch %d, node %d\n", ret, i);
1267                         break;
1268                 }
1269         }
1270
1271 bail:
1272         kfree(qn);
1273         return ret;
1274 }
1275
1276 static int dlm_query_nodeinfo_handler(struct o2net_msg *msg, u32 len,
1277                                       void *data, void **ret_data)
1278 {
1279         struct dlm_query_nodeinfo *qn;
1280         struct dlm_ctxt *dlm = NULL;
1281         int locked = 0, status = -EINVAL;
1282
1283         qn = (struct dlm_query_nodeinfo *) msg->buf;
1284
1285         mlog(0, "Node %u queries nodes on domain %s\n", qn->qn_nodenum,
1286              qn->qn_domain);
1287
1288         spin_lock(&dlm_domain_lock);
1289         dlm = __dlm_lookup_domain_full(qn->qn_domain, qn->qn_namelen);
1290         if (!dlm) {
1291                 mlog(ML_ERROR, "Node %d queried nodes on domain %s before "
1292                      "join domain\n", qn->qn_nodenum, qn->qn_domain);
1293                 goto bail;
1294         }
1295
1296         spin_lock(&dlm->spinlock);
1297         locked = 1;
1298         if (dlm->joining_node != qn->qn_nodenum) {
1299                 mlog(ML_ERROR, "Node %d queried nodes on domain %s but "
1300                      "joining node is %d\n", qn->qn_nodenum, qn->qn_domain,
1301                      dlm->joining_node);
1302                 goto bail;
1303         }
1304
1305         /* Support for node query was added in 1.1 */
1306         if (dlm->dlm_locking_proto.pv_major == 1 &&
1307             dlm->dlm_locking_proto.pv_minor == 0) {
1308                 mlog(ML_ERROR, "Node %d queried nodes on domain %s "
1309                      "but active dlm protocol is %d.%d\n", qn->qn_nodenum,
1310                      qn->qn_domain, dlm->dlm_locking_proto.pv_major,
1311                      dlm->dlm_locking_proto.pv_minor);
1312                 goto bail;
1313         }
1314
1315         status = dlm_match_nodes(dlm, qn);
1316
1317 bail:
1318         if (locked)
1319                 spin_unlock(&dlm->spinlock);
1320         spin_unlock(&dlm_domain_lock);
1321
1322         return status;
1323 }
1324
1325 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
1326                                    void **ret_data)
1327 {
1328         struct dlm_cancel_join *cancel;
1329         struct dlm_ctxt *dlm = NULL;
1330
1331         cancel = (struct dlm_cancel_join *) msg->buf;
1332
1333         mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
1334                   cancel->domain);
1335
1336         spin_lock(&dlm_domain_lock);
1337         dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
1338
1339         if (dlm) {
1340                 spin_lock(&dlm->spinlock);
1341
1342                 /* Yikes, this guy wants to cancel his join. No
1343                  * problem, we simply cleanup our join state. */
1344                 BUG_ON(dlm->joining_node != cancel->node_idx);
1345                 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1346
1347                 spin_unlock(&dlm->spinlock);
1348         }
1349         spin_unlock(&dlm_domain_lock);
1350
1351         return 0;
1352 }
1353
1354 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
1355                                     unsigned int node)
1356 {
1357         int status;
1358         struct dlm_cancel_join cancel_msg;
1359
1360         memset(&cancel_msg, 0, sizeof(cancel_msg));
1361         cancel_msg.node_idx = dlm->node_num;
1362         cancel_msg.name_len = strlen(dlm->name);
1363         memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
1364
1365         status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1366                                     &cancel_msg, sizeof(cancel_msg), node,
1367                                     NULL);
1368         if (status < 0) {
1369                 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1370                      "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1371                      node);
1372                 goto bail;
1373         }
1374
1375 bail:
1376         return status;
1377 }
1378
1379 /* map_size should be in bytes. */
1380 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
1381                                  unsigned long *node_map,
1382                                  unsigned int map_size)
1383 {
1384         int status, tmpstat;
1385         unsigned int node;
1386
1387         if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
1388                          sizeof(unsigned long))) {
1389                 mlog(ML_ERROR,
1390                      "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
1391                      map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
1392                 return -EINVAL;
1393         }
1394
1395         status = 0;
1396         node = -1;
1397         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1398                                      node + 1)) < O2NM_MAX_NODES) {
1399                 if (node == dlm->node_num)
1400                         continue;
1401
1402                 tmpstat = dlm_send_one_join_cancel(dlm, node);
1403                 if (tmpstat) {
1404                         mlog(ML_ERROR, "Error return %d cancelling join on "
1405                              "node %d\n", tmpstat, node);
1406                         if (!status)
1407                                 status = tmpstat;
1408                 }
1409         }
1410
1411         if (status)
1412                 mlog_errno(status);
1413         return status;
1414 }
1415
1416 static int dlm_request_join(struct dlm_ctxt *dlm,
1417                             int node,
1418                             enum dlm_query_join_response_code *response)
1419 {
1420         int status;
1421         struct dlm_query_join_request join_msg;
1422         struct dlm_query_join_packet packet;
1423         u32 join_resp;
1424
1425         mlog(0, "querying node %d\n", node);
1426
1427         memset(&join_msg, 0, sizeof(join_msg));
1428         join_msg.node_idx = dlm->node_num;
1429         join_msg.name_len = strlen(dlm->name);
1430         memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1431         join_msg.dlm_proto = dlm->dlm_locking_proto;
1432         join_msg.fs_proto = dlm->fs_locking_proto;
1433
1434         /* copy live node map to join message */
1435         byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1436
1437         status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1438                                     sizeof(join_msg), node, &join_resp);
1439         if (status < 0 && status != -ENOPROTOOPT) {
1440                 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1441                      "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1442                      node);
1443                 goto bail;
1444         }
1445         dlm_query_join_wire_to_packet(join_resp, &packet);
1446
1447         /* -ENOPROTOOPT from the net code means the other side isn't
1448             listening for our message type -- that's fine, it means
1449             his dlm isn't up, so we can consider him a 'yes' but not
1450             joined into the domain.  */
1451         if (status == -ENOPROTOOPT) {
1452                 status = 0;
1453                 *response = JOIN_OK_NO_MAP;
1454         } else {
1455                 *response = packet.code;
1456                 switch (packet.code) {
1457                 case JOIN_DISALLOW:
1458                 case JOIN_OK_NO_MAP:
1459                         break;
1460                 case JOIN_PROTOCOL_MISMATCH:
1461                         mlog(ML_NOTICE,
1462                              "This node requested DLM locking protocol %u.%u and "
1463                              "filesystem locking protocol %u.%u.  At least one of "
1464                              "the protocol versions on node %d is not compatible, "
1465                              "disconnecting\n",
1466                              dlm->dlm_locking_proto.pv_major,
1467                              dlm->dlm_locking_proto.pv_minor,
1468                              dlm->fs_locking_proto.pv_major,
1469                              dlm->fs_locking_proto.pv_minor,
1470                              node);
1471                         status = -EPROTO;
1472                         break;
1473                 case JOIN_OK:
1474                         /* Use the same locking protocol as the remote node */
1475                         dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1476                         dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1477                         mlog(0,
1478                              "Node %d responds JOIN_OK with DLM locking protocol "
1479                              "%u.%u and fs locking protocol %u.%u\n",
1480                              node,
1481                              dlm->dlm_locking_proto.pv_major,
1482                              dlm->dlm_locking_proto.pv_minor,
1483                              dlm->fs_locking_proto.pv_major,
1484                              dlm->fs_locking_proto.pv_minor);
1485                         break;
1486                 default:
1487                         status = -EINVAL;
1488                         mlog(ML_ERROR, "invalid response %d from node %u\n",
1489                              packet.code, node);
1490                         /* Reset response to JOIN_DISALLOW */
1491                         *response = JOIN_DISALLOW;
1492                         break;
1493                 }
1494         }
1495
1496         mlog(0, "status %d, node %d response is %d\n", status, node,
1497              *response);
1498
1499 bail:
1500         return status;
1501 }
1502
1503 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1504                                     unsigned int node)
1505 {
1506         int status;
1507         int ret;
1508         struct dlm_assert_joined assert_msg;
1509
1510         mlog(0, "Sending join assert to node %u\n", node);
1511
1512         memset(&assert_msg, 0, sizeof(assert_msg));
1513         assert_msg.node_idx = dlm->node_num;
1514         assert_msg.name_len = strlen(dlm->name);
1515         memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1516
1517         status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1518                                     &assert_msg, sizeof(assert_msg), node,
1519                                     &ret);
1520         if (status < 0)
1521                 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1522                      "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1523                      node);
1524         else
1525                 status = ret;
1526
1527         return status;
1528 }
1529
1530 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1531                                   unsigned long *node_map)
1532 {
1533         int status, node, live;
1534
1535         status = 0;
1536         node = -1;
1537         while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1538                                      node + 1)) < O2NM_MAX_NODES) {
1539                 if (node == dlm->node_num)
1540                         continue;
1541
1542                 do {
1543                         /* It is very important that this message be
1544                          * received so we spin until either the node
1545                          * has died or it gets the message. */
1546                         status = dlm_send_one_join_assert(dlm, node);
1547
1548                         spin_lock(&dlm->spinlock);
1549                         live = test_bit(node, dlm->live_nodes_map);
1550                         spin_unlock(&dlm->spinlock);
1551
1552                         if (status) {
1553                                 mlog(ML_ERROR, "Error return %d asserting "
1554                                      "join on node %d\n", status, node);
1555
1556                                 /* give us some time between errors... */
1557                                 if (live)
1558                                         msleep(DLM_DOMAIN_BACKOFF_MS);
1559                         }
1560                 } while (status && live);
1561         }
1562 }
1563
1564 struct domain_join_ctxt {
1565         unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1566         unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1567 };
1568
1569 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1570                                    struct domain_join_ctxt *ctxt,
1571                                    enum dlm_query_join_response_code response)
1572 {
1573         int ret;
1574
1575         if (response == JOIN_DISALLOW) {
1576                 mlog(0, "Latest response of disallow -- should restart\n");
1577                 return 1;
1578         }
1579
1580         spin_lock(&dlm->spinlock);
1581         /* For now, we restart the process if the node maps have
1582          * changed at all */
1583         ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1584                      sizeof(dlm->live_nodes_map));
1585         spin_unlock(&dlm->spinlock);
1586
1587         if (ret)
1588                 mlog(0, "Node maps changed -- should restart\n");
1589
1590         return ret;
1591 }
1592
1593 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1594 {
1595         int status = 0, tmpstat, node;
1596         struct domain_join_ctxt *ctxt;
1597         enum dlm_query_join_response_code response = JOIN_DISALLOW;
1598
1599         mlog(0, "%p", dlm);
1600
1601         ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1602         if (!ctxt) {
1603                 status = -ENOMEM;
1604                 mlog_errno(status);
1605                 goto bail;
1606         }
1607
1608         /* group sem locking should work for us here -- we're already
1609          * registered for heartbeat events so filling this should be
1610          * atomic wrt getting those handlers called. */
1611         o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1612
1613         spin_lock(&dlm->spinlock);
1614         memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1615
1616         __dlm_set_joining_node(dlm, dlm->node_num);
1617
1618         spin_unlock(&dlm->spinlock);
1619
1620         node = -1;
1621         while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1622                                      node + 1)) < O2NM_MAX_NODES) {
1623                 if (node == dlm->node_num)
1624                         continue;
1625
1626                 status = dlm_request_join(dlm, node, &response);
1627                 if (status < 0) {
1628                         mlog_errno(status);
1629                         goto bail;
1630                 }
1631
1632                 /* Ok, either we got a response or the node doesn't have a
1633                  * dlm up. */
1634                 if (response == JOIN_OK)
1635                         set_bit(node, ctxt->yes_resp_map);
1636
1637                 if (dlm_should_restart_join(dlm, ctxt, response)) {
1638                         status = -EAGAIN;
1639                         goto bail;
1640                 }
1641         }
1642
1643         mlog(0, "Yay, done querying nodes!\n");
1644
1645         /* Yay, everyone agree's we can join the domain. My domain is
1646          * comprised of all nodes who were put in the
1647          * yes_resp_map. Copy that into our domain map and send a join
1648          * assert message to clean up everyone elses state. */
1649         spin_lock(&dlm->spinlock);
1650         memcpy(dlm->domain_map, ctxt->yes_resp_map,
1651                sizeof(ctxt->yes_resp_map));
1652         set_bit(dlm->node_num, dlm->domain_map);
1653         spin_unlock(&dlm->spinlock);
1654
1655         /* Support for global heartbeat and node info was added in 1.1 */
1656         if (dlm->dlm_locking_proto.pv_major > 1 ||
1657             dlm->dlm_locking_proto.pv_minor > 0) {
1658                 status = dlm_send_nodeinfo(dlm, ctxt->yes_resp_map);
1659                 if (status) {
1660                         mlog_errno(status);
1661                         goto bail;
1662                 }
1663                 status = dlm_send_regions(dlm, ctxt->yes_resp_map);
1664                 if (status) {
1665                         mlog_errno(status);
1666                         goto bail;
1667                 }
1668         }
1669
1670         dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1671
1672         /* Joined state *must* be set before the joining node
1673          * information, otherwise the query_join handler may read no
1674          * current joiner but a state of NEW and tell joining nodes
1675          * we're not in the domain. */
1676         spin_lock(&dlm_domain_lock);
1677         dlm->dlm_state = DLM_CTXT_JOINED;
1678         dlm->num_joins++;
1679         spin_unlock(&dlm_domain_lock);
1680
1681 bail:
1682         spin_lock(&dlm->spinlock);
1683         __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1684         if (!status) {
1685                 printk(KERN_NOTICE "o2dlm: Joining domain %s ", dlm->name);
1686                 __dlm_print_nodes(dlm);
1687         }
1688         spin_unlock(&dlm->spinlock);
1689
1690         if (ctxt) {
1691                 /* Do we need to send a cancel message to any nodes? */
1692                 if (status < 0) {
1693                         tmpstat = dlm_send_join_cancels(dlm,
1694                                                         ctxt->yes_resp_map,
1695                                                         sizeof(ctxt->yes_resp_map));
1696                         if (tmpstat < 0)
1697                                 mlog_errno(tmpstat);
1698                 }
1699                 kfree(ctxt);
1700         }
1701
1702         mlog(0, "returning %d\n", status);
1703         return status;
1704 }
1705
1706 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1707 {
1708         o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_up);
1709         o2hb_unregister_callback(dlm->name, &dlm->dlm_hb_down);
1710         o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1711 }
1712
1713 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1714 {
1715         int status;
1716
1717         mlog(0, "registering handlers.\n");
1718
1719         o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1720                             dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1721         o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1722                             dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1723
1724         status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_down);
1725         if (status)
1726                 goto bail;
1727
1728         status = o2hb_register_callback(dlm->name, &dlm->dlm_hb_up);
1729         if (status)
1730                 goto bail;
1731
1732         status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1733                                         sizeof(struct dlm_master_request),
1734                                         dlm_master_request_handler,
1735                                         dlm, NULL, &dlm->dlm_domain_handlers);
1736         if (status)
1737                 goto bail;
1738
1739         status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1740                                         sizeof(struct dlm_assert_master),
1741                                         dlm_assert_master_handler,
1742                                         dlm, dlm_assert_master_post_handler,
1743                                         &dlm->dlm_domain_handlers);
1744         if (status)
1745                 goto bail;
1746
1747         status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1748                                         sizeof(struct dlm_create_lock),
1749                                         dlm_create_lock_handler,
1750                                         dlm, NULL, &dlm->dlm_domain_handlers);
1751         if (status)
1752                 goto bail;
1753
1754         status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1755                                         DLM_CONVERT_LOCK_MAX_LEN,
1756                                         dlm_convert_lock_handler,
1757                                         dlm, NULL, &dlm->dlm_domain_handlers);
1758         if (status)
1759                 goto bail;
1760
1761         status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1762                                         DLM_UNLOCK_LOCK_MAX_LEN,
1763                                         dlm_unlock_lock_handler,
1764                                         dlm, NULL, &dlm->dlm_domain_handlers);
1765         if (status)
1766                 goto bail;
1767
1768         status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1769                                         DLM_PROXY_AST_MAX_LEN,
1770                                         dlm_proxy_ast_handler,
1771                                         dlm, NULL, &dlm->dlm_domain_handlers);
1772         if (status)
1773                 goto bail;
1774
1775         status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1776                                         sizeof(struct dlm_exit_domain),
1777                                         dlm_exit_domain_handler,
1778                                         dlm, NULL, &dlm->dlm_domain_handlers);
1779         if (status)
1780                 goto bail;
1781
1782         status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1783                                         sizeof(struct dlm_deref_lockres),
1784                                         dlm_deref_lockres_handler,
1785                                         dlm, NULL, &dlm->dlm_domain_handlers);
1786         if (status)
1787                 goto bail;
1788
1789         status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1790                                         sizeof(struct dlm_migrate_request),
1791                                         dlm_migrate_request_handler,
1792                                         dlm, NULL, &dlm->dlm_domain_handlers);
1793         if (status)
1794                 goto bail;
1795
1796         status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1797                                         DLM_MIG_LOCKRES_MAX_LEN,
1798                                         dlm_mig_lockres_handler,
1799                                         dlm, NULL, &dlm->dlm_domain_handlers);
1800         if (status)
1801                 goto bail;
1802
1803         status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1804                                         sizeof(struct dlm_master_requery),
1805                                         dlm_master_requery_handler,
1806                                         dlm, NULL, &dlm->dlm_domain_handlers);
1807         if (status)
1808                 goto bail;
1809
1810         status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1811                                         sizeof(struct dlm_lock_request),
1812                                         dlm_request_all_locks_handler,
1813                                         dlm, NULL, &dlm->dlm_domain_handlers);
1814         if (status)
1815                 goto bail;
1816
1817         status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1818                                         sizeof(struct dlm_reco_data_done),
1819                                         dlm_reco_data_done_handler,
1820                                         dlm, NULL, &dlm->dlm_domain_handlers);
1821         if (status)
1822                 goto bail;
1823
1824         status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1825                                         sizeof(struct dlm_begin_reco),
1826                                         dlm_begin_reco_handler,
1827                                         dlm, NULL, &dlm->dlm_domain_handlers);
1828         if (status)
1829                 goto bail;
1830
1831         status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1832                                         sizeof(struct dlm_finalize_reco),
1833                                         dlm_finalize_reco_handler,
1834                                         dlm, NULL, &dlm->dlm_domain_handlers);
1835         if (status)
1836                 goto bail;
1837
1838         status = o2net_register_handler(DLM_BEGIN_EXIT_DOMAIN_MSG, dlm->key,
1839                                         sizeof(struct dlm_exit_domain),
1840                                         dlm_begin_exit_domain_handler,
1841                                         dlm, NULL, &dlm->dlm_domain_handlers);
1842
1843 bail:
1844         if (status)
1845                 dlm_unregister_domain_handlers(dlm);
1846
1847         return status;
1848 }
1849
1850 static int dlm_join_domain(struct dlm_ctxt *dlm)
1851 {
1852         int status;
1853         unsigned int backoff;
1854         unsigned int total_backoff = 0;
1855         char wq_name[O2NM_MAX_NAME_LEN];
1856
1857         BUG_ON(!dlm);
1858
1859         mlog(0, "Join domain %s\n", dlm->name);
1860
1861         status = dlm_register_domain_handlers(dlm);
1862         if (status) {
1863                 mlog_errno(status);
1864                 goto bail;
1865         }
1866
1867         status = dlm_launch_thread(dlm);
1868         if (status < 0) {
1869                 mlog_errno(status);
1870                 goto bail;
1871         }
1872
1873         status = dlm_launch_recovery_thread(dlm);
1874         if (status < 0) {
1875                 mlog_errno(status);
1876                 goto bail;
1877         }
1878
1879         status = dlm_debug_init(dlm);
1880         if (status < 0) {
1881                 mlog_errno(status);
1882                 goto bail;
1883         }
1884
1885         snprintf(wq_name, O2NM_MAX_NAME_LEN, "dlm_wq-%s", dlm->name);
1886         dlm->dlm_worker = create_singlethread_workqueue(wq_name);
1887         if (!dlm->dlm_worker) {
1888                 status = -ENOMEM;
1889                 mlog_errno(status);
1890                 goto bail;
1891         }
1892
1893         do {
1894                 status = dlm_try_to_join_domain(dlm);
1895
1896                 /* If we're racing another node to the join, then we
1897                  * need to back off temporarily and let them
1898                  * complete. */
1899 #define DLM_JOIN_TIMEOUT_MSECS  90000
1900                 if (status == -EAGAIN) {
1901                         if (signal_pending(current)) {
1902                                 status = -ERESTARTSYS;
1903                                 goto bail;
1904                         }
1905
1906                         if (total_backoff > DLM_JOIN_TIMEOUT_MSECS) {
1907                                 status = -ERESTARTSYS;
1908                                 mlog(ML_NOTICE, "Timed out joining dlm domain "
1909                                      "%s after %u msecs\n", dlm->name,
1910                                      total_backoff);
1911                                 goto bail;
1912                         }
1913
1914                         /*
1915                          * <chip> After you!
1916                          * <dale> No, after you!
1917                          * <chip> I insist!
1918                          * <dale> But you first!
1919                          * ...
1920                          */
1921                         backoff = (unsigned int)(jiffies & 0x3);
1922                         backoff *= DLM_DOMAIN_BACKOFF_MS;
1923                         total_backoff += backoff;
1924                         mlog(0, "backoff %d\n", backoff);
1925                         msleep(backoff);
1926                 }
1927         } while (status == -EAGAIN);
1928
1929         if (status < 0) {
1930                 mlog_errno(status);
1931                 goto bail;
1932         }
1933
1934         status = 0;
1935 bail:
1936         wake_up(&dlm_domain_events);
1937
1938         if (status) {
1939                 dlm_unregister_domain_handlers(dlm);
1940                 dlm_debug_shutdown(dlm);
1941                 dlm_complete_thread(dlm);
1942                 dlm_complete_recovery_thread(dlm);
1943                 dlm_destroy_dlm_worker(dlm);
1944         }
1945
1946         return status;
1947 }
1948
1949 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1950                                 u32 key)
1951 {
1952         int i;
1953         int ret;
1954         struct dlm_ctxt *dlm = NULL;
1955
1956         dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1957         if (!dlm) {
1958                 ret = -ENOMEM;
1959                 mlog_errno(ret);
1960                 goto leave;
1961         }
1962
1963         dlm->name = kstrdup(domain, GFP_KERNEL);
1964         if (dlm->name == NULL) {
1965                 ret = -ENOMEM;
1966                 mlog_errno(ret);
1967                 goto leave;
1968         }
1969
1970         dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1971         if (!dlm->lockres_hash) {
1972                 ret = -ENOMEM;
1973                 mlog_errno(ret);
1974                 goto leave;
1975         }
1976
1977         for (i = 0; i < DLM_HASH_BUCKETS; i++)
1978                 INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1979
1980         dlm->master_hash = (struct hlist_head **)
1981                                 dlm_alloc_pagevec(DLM_HASH_PAGES);
1982         if (!dlm->master_hash) {
1983                 ret = -ENOMEM;
1984                 mlog_errno(ret);
1985                 goto leave;
1986         }
1987
1988         for (i = 0; i < DLM_HASH_BUCKETS; i++)
1989                 INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
1990
1991         dlm->key = key;
1992         dlm->node_num = o2nm_this_node();
1993
1994         ret = dlm_create_debugfs_subroot(dlm);
1995         if (ret < 0)
1996                 goto leave;
1997
1998         spin_lock_init(&dlm->spinlock);
1999         spin_lock_init(&dlm->master_lock);
2000         spin_lock_init(&dlm->ast_lock);
2001         spin_lock_init(&dlm->track_lock);
2002         INIT_LIST_HEAD(&dlm->list);
2003         INIT_LIST_HEAD(&dlm->dirty_list);
2004         INIT_LIST_HEAD(&dlm->reco.resources);
2005         INIT_LIST_HEAD(&dlm->reco.node_data);
2006         INIT_LIST_HEAD(&dlm->purge_list);
2007         INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
2008         INIT_LIST_HEAD(&dlm->tracking_list);
2009         dlm->reco.state = 0;
2010
2011         INIT_LIST_HEAD(&dlm->pending_asts);
2012         INIT_LIST_HEAD(&dlm->pending_basts);
2013
2014         mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
2015                   dlm->recovery_map, &(dlm->recovery_map[0]));
2016
2017         memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
2018         memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
2019         memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
2020
2021         dlm->dlm_thread_task = NULL;
2022         dlm->dlm_reco_thread_task = NULL;
2023         dlm->dlm_worker = NULL;
2024         init_waitqueue_head(&dlm->dlm_thread_wq);
2025         init_waitqueue_head(&dlm->dlm_reco_thread_wq);
2026         init_waitqueue_head(&dlm->reco.event);
2027         init_waitqueue_head(&dlm->ast_wq);
2028         init_waitqueue_head(&dlm->migration_wq);
2029         INIT_LIST_HEAD(&dlm->mle_hb_events);
2030
2031         dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
2032         init_waitqueue_head(&dlm->dlm_join_events);
2033
2034         dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
2035         dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
2036
2037         atomic_set(&dlm->res_tot_count, 0);
2038         atomic_set(&dlm->res_cur_count, 0);
2039         for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
2040                 atomic_set(&dlm->mle_tot_count[i], 0);
2041                 atomic_set(&dlm->mle_cur_count[i], 0);
2042         }
2043
2044         spin_lock_init(&dlm->work_lock);
2045         INIT_LIST_HEAD(&dlm->work_list);
2046         INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
2047
2048         kref_init(&dlm->dlm_refs);
2049         dlm->dlm_state = DLM_CTXT_NEW;
2050
2051         INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
2052
2053         mlog(0, "context init: refcount %u\n",
2054                   atomic_read(&dlm->dlm_refs.refcount));
2055
2056 leave:
2057         if (ret < 0 && dlm) {
2058                 if (dlm->master_hash)
2059                         dlm_free_pagevec((void **)dlm->master_hash,
2060                                         DLM_HASH_PAGES);
2061
2062                 if (dlm->lockres_hash)
2063                         dlm_free_pagevec((void **)dlm->lockres_hash,
2064                                         DLM_HASH_PAGES);
2065
2066                 kfree(dlm->name);
2067                 kfree(dlm);
2068                 dlm = NULL;
2069         }
2070         return dlm;
2071 }
2072
2073 /*
2074  * Compare a requested locking protocol version against the current one.
2075  *
2076  * If the major numbers are different, they are incompatible.
2077  * If the current minor is greater than the request, they are incompatible.
2078  * If the current minor is less than or equal to the request, they are
2079  * compatible, and the requester should run at the current minor version.
2080  */
2081 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
2082                                 struct dlm_protocol_version *request)
2083 {
2084         if (existing->pv_major != request->pv_major)
2085                 return 1;
2086
2087         if (existing->pv_minor > request->pv_minor)
2088                 return 1;
2089
2090         if (existing->pv_minor < request->pv_minor)
2091                 request->pv_minor = existing->pv_minor;
2092
2093         return 0;
2094 }
2095
2096 /*
2097  * dlm_register_domain: one-time setup per "domain".
2098  *
2099  * The filesystem passes in the requested locking version via proto.
2100  * If registration was successful, proto will contain the negotiated
2101  * locking protocol.
2102  */
2103 struct dlm_ctxt * dlm_register_domain(const char *domain,
2104                                u32 key,
2105                                struct dlm_protocol_version *fs_proto)
2106 {
2107         int ret;
2108         struct dlm_ctxt *dlm = NULL;
2109         struct dlm_ctxt *new_ctxt = NULL;
2110
2111         if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
2112                 ret = -ENAMETOOLONG;
2113                 mlog(ML_ERROR, "domain name length too long\n");
2114                 goto leave;
2115         }
2116
2117         mlog(0, "register called for domain \"%s\"\n", domain);
2118
2119 retry:
2120         dlm = NULL;
2121         if (signal_pending(current)) {
2122                 ret = -ERESTARTSYS;
2123                 mlog_errno(ret);
2124                 goto leave;
2125         }
2126
2127         spin_lock(&dlm_domain_lock);
2128
2129         dlm = __dlm_lookup_domain(domain);
2130         if (dlm) {
2131                 if (dlm->dlm_state != DLM_CTXT_JOINED) {
2132                         spin_unlock(&dlm_domain_lock);
2133
2134                         mlog(0, "This ctxt is not joined yet!\n");
2135                         wait_event_interruptible(dlm_domain_events,
2136                                                  dlm_wait_on_domain_helper(
2137                                                          domain));
2138                         goto retry;
2139                 }
2140
2141                 if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
2142                         spin_unlock(&dlm_domain_lock);
2143                         mlog(ML_ERROR,
2144                              "Requested locking protocol version is not "
2145                              "compatible with already registered domain "
2146                              "\"%s\"\n", domain);
2147                         ret = -EPROTO;
2148                         goto leave;
2149                 }
2150
2151                 __dlm_get(dlm);
2152                 dlm->num_joins++;
2153
2154                 spin_unlock(&dlm_domain_lock);
2155
2156                 ret = 0;
2157                 goto leave;
2158         }
2159
2160         /* doesn't exist */
2161         if (!new_ctxt) {
2162                 spin_unlock(&dlm_domain_lock);
2163
2164                 new_ctxt = dlm_alloc_ctxt(domain, key);
2165                 if (new_ctxt)
2166                         goto retry;
2167
2168                 ret = -ENOMEM;
2169                 mlog_errno(ret);
2170                 goto leave;
2171         }
2172
2173         /* a little variable switch-a-roo here... */
2174         dlm = new_ctxt;
2175         new_ctxt = NULL;
2176
2177         /* add the new domain */
2178         list_add_tail(&dlm->list, &dlm_domains);
2179         spin_unlock(&dlm_domain_lock);
2180
2181         /*
2182          * Pass the locking protocol version into the join.  If the join
2183          * succeeds, it will have the negotiated protocol set.
2184          */
2185         dlm->dlm_locking_proto = dlm_protocol;
2186         dlm->fs_locking_proto = *fs_proto;
2187
2188         ret = dlm_join_domain(dlm);
2189         if (ret) {
2190                 mlog_errno(ret);
2191                 dlm_put(dlm);
2192                 goto leave;
2193         }
2194
2195         /* Tell the caller what locking protocol we negotiated */
2196         *fs_proto = dlm->fs_locking_proto;
2197
2198         ret = 0;
2199 leave:
2200         if (new_ctxt)
2201                 dlm_free_ctxt_mem(new_ctxt);
2202
2203         if (ret < 0)
2204                 dlm = ERR_PTR(ret);
2205
2206         return dlm;
2207 }
2208 EXPORT_SYMBOL_GPL(dlm_register_domain);
2209
2210 static LIST_HEAD(dlm_join_handlers);
2211
2212 static void dlm_unregister_net_handlers(void)
2213 {
2214         o2net_unregister_handler_list(&dlm_join_handlers);
2215 }
2216
2217 static int dlm_register_net_handlers(void)
2218 {
2219         int status = 0;
2220
2221         status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
2222                                         sizeof(struct dlm_query_join_request),
2223                                         dlm_query_join_handler,
2224                                         NULL, NULL, &dlm_join_handlers);
2225         if (status)
2226                 goto bail;
2227
2228         status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
2229                                         sizeof(struct dlm_assert_joined),
2230                                         dlm_assert_joined_handler,
2231                                         NULL, NULL, &dlm_join_handlers);
2232         if (status)
2233                 goto bail;
2234
2235         status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
2236                                         sizeof(struct dlm_cancel_join),
2237                                         dlm_cancel_join_handler,
2238                                         NULL, NULL, &dlm_join_handlers);
2239         if (status)
2240                 goto bail;
2241
2242         status = o2net_register_handler(DLM_QUERY_REGION, DLM_MOD_KEY,
2243                                         sizeof(struct dlm_query_region),
2244                                         dlm_query_region_handler,
2245                                         NULL, NULL, &dlm_join_handlers);
2246
2247         if (status)
2248                 goto bail;
2249
2250         status = o2net_register_handler(DLM_QUERY_NODEINFO, DLM_MOD_KEY,
2251                                         sizeof(struct dlm_query_nodeinfo),
2252                                         dlm_query_nodeinfo_handler,
2253                                         NULL, NULL, &dlm_join_handlers);
2254 bail:
2255         if (status < 0)
2256                 dlm_unregister_net_handlers();
2257
2258         return status;
2259 }
2260
2261 /* Domain eviction callback handling.
2262  *
2263  * The file system requires notification of node death *before* the
2264  * dlm completes it's recovery work, otherwise it may be able to
2265  * acquire locks on resources requiring recovery. Since the dlm can
2266  * evict a node from it's domain *before* heartbeat fires, a similar
2267  * mechanism is required. */
2268
2269 /* Eviction is not expected to happen often, so a per-domain lock is
2270  * not necessary. Eviction callbacks are allowed to sleep for short
2271  * periods of time. */
2272 static DECLARE_RWSEM(dlm_callback_sem);
2273
2274 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
2275                                         int node_num)
2276 {
2277         struct dlm_eviction_cb *cb;
2278
2279         down_read(&dlm_callback_sem);
2280         list_for_each_entry(cb, &dlm->dlm_eviction_callbacks, ec_item) {
2281                 cb->ec_func(node_num, cb->ec_data);
2282         }
2283         up_read(&dlm_callback_sem);
2284 }
2285
2286 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
2287                            dlm_eviction_func *f,
2288                            void *data)
2289 {
2290         INIT_LIST_HEAD(&cb->ec_item);
2291         cb->ec_func = f;
2292         cb->ec_data = data;
2293 }
2294 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
2295
2296 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
2297                               struct dlm_eviction_cb *cb)
2298 {
2299         down_write(&dlm_callback_sem);
2300         list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
2301         up_write(&dlm_callback_sem);
2302 }
2303 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
2304
2305 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
2306 {
2307         down_write(&dlm_callback_sem);
2308         list_del_init(&cb->ec_item);
2309         up_write(&dlm_callback_sem);
2310 }
2311 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
2312
2313 static int __init dlm_init(void)
2314 {
2315         int status;
2316
2317         status = dlm_init_mle_cache();
2318         if (status) {
2319                 mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
2320                 goto error;
2321         }
2322
2323         status = dlm_init_master_caches();
2324         if (status) {
2325                 mlog(ML_ERROR, "Could not create o2dlm_lockres and "
2326                      "o2dlm_lockname slabcaches\n");
2327                 goto error;
2328         }
2329
2330         status = dlm_init_lock_cache();
2331         if (status) {
2332                 mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
2333                 goto error;
2334         }
2335
2336         status = dlm_register_net_handlers();
2337         if (status) {
2338                 mlog(ML_ERROR, "Unable to register network handlers\n");
2339                 goto error;
2340         }
2341
2342         status = dlm_create_debugfs_root();
2343         if (status)
2344                 goto error;
2345
2346         return 0;
2347 error:
2348         dlm_unregister_net_handlers();
2349         dlm_destroy_lock_cache();
2350         dlm_destroy_master_caches();
2351         dlm_destroy_mle_cache();
2352         return -1;
2353 }
2354
2355 static void __exit dlm_exit (void)
2356 {
2357         dlm_destroy_debugfs_root();
2358         dlm_unregister_net_handlers();
2359         dlm_destroy_lock_cache();
2360         dlm_destroy_master_caches();
2361         dlm_destroy_mle_cache();
2362 }
2363
2364 MODULE_AUTHOR("Oracle");
2365 MODULE_LICENSE("GPL");
2366 MODULE_DESCRIPTION("OCFS2 Distributed Lock Management");
2367
2368 module_init(dlm_init);
2369 module_exit(dlm_exit);