GNU Linux-libre 4.14.266-gnu1
[releases.git] / fs / dlm / lock.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12
13 /* Central locking logic has four stages:
14
15    dlm_lock()
16    dlm_unlock()
17
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "lowcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89                                     struct dlm_message *ms);
90 static int receive_extralen(struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void del_timeout(struct dlm_lkb *lkb);
93 static void toss_rsb(struct kref *kref);
94
95 /*
96  * Lock compatibilty matrix - thanks Steve
97  * UN = Unlocked state. Not really a state, used as a flag
98  * PD = Padding. Used to make the matrix a nice power of two in size
99  * Other states are the same as the VMS DLM.
100  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
101  */
102
103 static const int __dlm_compat_matrix[8][8] = {
104       /* UN NL CR CW PR PW EX PD */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
106         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
107         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
108         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
109         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
110         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
111         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
112         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
113 };
114
115 /*
116  * This defines the direction of transfer of LVB data.
117  * Granted mode is the row; requested mode is the column.
118  * Usage: matrix[grmode+1][rqmode+1]
119  * 1 = LVB is returned to the caller
120  * 0 = LVB is written to the resource
121  * -1 = nothing happens to the LVB
122  */
123
124 const int dlm_lvb_operations[8][8] = {
125         /* UN   NL  CR  CW  PR  PW  EX  PD*/
126         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
127         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
128         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
129         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
130         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
131         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
133         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
134 };
135
136 #define modes_compat(gr, rq) \
137         __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
138
139 int dlm_modes_compat(int mode1, int mode2)
140 {
141         return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
142 }
143
144 /*
145  * Compatibility matrix for conversions with QUECVT set.
146  * Granted mode is the row; requested mode is the column.
147  * Usage: matrix[grmode+1][rqmode+1]
148  */
149
150 static const int __quecvt_compat_matrix[8][8] = {
151       /* UN NL CR CW PR PW EX PD */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
153         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
154         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
155         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
156         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
157         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
158         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
159         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
160 };
161
162 void dlm_print_lkb(struct dlm_lkb *lkb)
163 {
164         printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
165                "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
166                lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
167                lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
168                lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
169                (unsigned long long)lkb->lkb_recover_seq);
170 }
171
172 static void dlm_print_rsb(struct dlm_rsb *r)
173 {
174         printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
175                "rlc %d name %s\n",
176                r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
177                r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
178                r->res_name);
179 }
180
181 void dlm_dump_rsb(struct dlm_rsb *r)
182 {
183         struct dlm_lkb *lkb;
184
185         dlm_print_rsb(r);
186
187         printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
188                list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
189         printk(KERN_ERR "rsb lookup list\n");
190         list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
191                 dlm_print_lkb(lkb);
192         printk(KERN_ERR "rsb grant queue:\n");
193         list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
194                 dlm_print_lkb(lkb);
195         printk(KERN_ERR "rsb convert queue:\n");
196         list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
197                 dlm_print_lkb(lkb);
198         printk(KERN_ERR "rsb wait queue:\n");
199         list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
200                 dlm_print_lkb(lkb);
201 }
202
203 /* Threads cannot use the lockspace while it's being recovered */
204
205 static inline void dlm_lock_recovery(struct dlm_ls *ls)
206 {
207         down_read(&ls->ls_in_recovery);
208 }
209
210 void dlm_unlock_recovery(struct dlm_ls *ls)
211 {
212         up_read(&ls->ls_in_recovery);
213 }
214
215 int dlm_lock_recovery_try(struct dlm_ls *ls)
216 {
217         return down_read_trylock(&ls->ls_in_recovery);
218 }
219
220 static inline int can_be_queued(struct dlm_lkb *lkb)
221 {
222         return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
223 }
224
225 static inline int force_blocking_asts(struct dlm_lkb *lkb)
226 {
227         return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
228 }
229
230 static inline int is_demoted(struct dlm_lkb *lkb)
231 {
232         return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
233 }
234
235 static inline int is_altmode(struct dlm_lkb *lkb)
236 {
237         return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
238 }
239
240 static inline int is_granted(struct dlm_lkb *lkb)
241 {
242         return (lkb->lkb_status == DLM_LKSTS_GRANTED);
243 }
244
245 static inline int is_remote(struct dlm_rsb *r)
246 {
247         DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
248         return !!r->res_nodeid;
249 }
250
251 static inline int is_process_copy(struct dlm_lkb *lkb)
252 {
253         return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
254 }
255
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258         return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
259 }
260
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263         if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264             (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265                 return 1;
266         return 0;
267 }
268
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271         return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276         return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
277 }
278
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281         return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
282 }
283
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286         return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
287                                   DLM_IFL_OVERLAP_CANCEL));
288 }
289
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292         if (is_master_copy(lkb))
293                 return;
294
295         del_timeout(lkb);
296
297         DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
298
299         /* if the operation was a cancel, then return -DLM_ECANCEL, if a
300            timeout caused the cancel then return -ETIMEDOUT */
301         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
302                 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
303                 rv = -ETIMEDOUT;
304         }
305
306         if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
307                 lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
308                 rv = -EDEADLK;
309         }
310
311         dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
312 }
313
314 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
315 {
316         queue_cast(r, lkb,
317                    is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
318 }
319
320 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
321 {
322         if (is_master_copy(lkb)) {
323                 send_bast(r, lkb, rqmode);
324         } else {
325                 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
326         }
327 }
328
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332
333 /* This is only called to add a reference when the code already holds
334    a valid reference to the rsb, so there's no need for locking. */
335
336 static inline void hold_rsb(struct dlm_rsb *r)
337 {
338         kref_get(&r->res_ref);
339 }
340
341 void dlm_hold_rsb(struct dlm_rsb *r)
342 {
343         hold_rsb(r);
344 }
345
346 /* When all references to the rsb are gone it's transferred to
347    the tossed list for later disposal. */
348
349 static void put_rsb(struct dlm_rsb *r)
350 {
351         struct dlm_ls *ls = r->res_ls;
352         uint32_t bucket = r->res_bucket;
353
354         spin_lock(&ls->ls_rsbtbl[bucket].lock);
355         kref_put(&r->res_ref, toss_rsb);
356         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
357 }
358
359 void dlm_put_rsb(struct dlm_rsb *r)
360 {
361         put_rsb(r);
362 }
363
364 static int pre_rsb_struct(struct dlm_ls *ls)
365 {
366         struct dlm_rsb *r1, *r2;
367         int count = 0;
368
369         spin_lock(&ls->ls_new_rsb_spin);
370         if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
371                 spin_unlock(&ls->ls_new_rsb_spin);
372                 return 0;
373         }
374         spin_unlock(&ls->ls_new_rsb_spin);
375
376         r1 = dlm_allocate_rsb(ls);
377         r2 = dlm_allocate_rsb(ls);
378
379         spin_lock(&ls->ls_new_rsb_spin);
380         if (r1) {
381                 list_add(&r1->res_hashchain, &ls->ls_new_rsb);
382                 ls->ls_new_rsb_count++;
383         }
384         if (r2) {
385                 list_add(&r2->res_hashchain, &ls->ls_new_rsb);
386                 ls->ls_new_rsb_count++;
387         }
388         count = ls->ls_new_rsb_count;
389         spin_unlock(&ls->ls_new_rsb_spin);
390
391         if (!count)
392                 return -ENOMEM;
393         return 0;
394 }
395
396 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
397    unlock any spinlocks, go back and call pre_rsb_struct again.
398    Otherwise, take an rsb off the list and return it. */
399
400 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
401                           struct dlm_rsb **r_ret)
402 {
403         struct dlm_rsb *r;
404         int count;
405
406         spin_lock(&ls->ls_new_rsb_spin);
407         if (list_empty(&ls->ls_new_rsb)) {
408                 count = ls->ls_new_rsb_count;
409                 spin_unlock(&ls->ls_new_rsb_spin);
410                 log_debug(ls, "find_rsb retry %d %d %s",
411                           count, dlm_config.ci_new_rsb_count, name);
412                 return -EAGAIN;
413         }
414
415         r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
416         list_del(&r->res_hashchain);
417         /* Convert the empty list_head to a NULL rb_node for tree usage: */
418         memset(&r->res_hashnode, 0, sizeof(struct rb_node));
419         ls->ls_new_rsb_count--;
420         spin_unlock(&ls->ls_new_rsb_spin);
421
422         r->res_ls = ls;
423         r->res_length = len;
424         memcpy(r->res_name, name, len);
425         mutex_init(&r->res_mutex);
426
427         INIT_LIST_HEAD(&r->res_lookup);
428         INIT_LIST_HEAD(&r->res_grantqueue);
429         INIT_LIST_HEAD(&r->res_convertqueue);
430         INIT_LIST_HEAD(&r->res_waitqueue);
431         INIT_LIST_HEAD(&r->res_root_list);
432         INIT_LIST_HEAD(&r->res_recover_list);
433
434         *r_ret = r;
435         return 0;
436 }
437
438 static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
439 {
440         char maxname[DLM_RESNAME_MAXLEN];
441
442         memset(maxname, 0, DLM_RESNAME_MAXLEN);
443         memcpy(maxname, name, nlen);
444         return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
445 }
446
447 int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
448                         struct dlm_rsb **r_ret)
449 {
450         struct rb_node *node = tree->rb_node;
451         struct dlm_rsb *r;
452         int rc;
453
454         while (node) {
455                 r = rb_entry(node, struct dlm_rsb, res_hashnode);
456                 rc = rsb_cmp(r, name, len);
457                 if (rc < 0)
458                         node = node->rb_left;
459                 else if (rc > 0)
460                         node = node->rb_right;
461                 else
462                         goto found;
463         }
464         *r_ret = NULL;
465         return -EBADR;
466
467  found:
468         *r_ret = r;
469         return 0;
470 }
471
472 static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
473 {
474         struct rb_node **newn = &tree->rb_node;
475         struct rb_node *parent = NULL;
476         int rc;
477
478         while (*newn) {
479                 struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
480                                                res_hashnode);
481
482                 parent = *newn;
483                 rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
484                 if (rc < 0)
485                         newn = &parent->rb_left;
486                 else if (rc > 0)
487                         newn = &parent->rb_right;
488                 else {
489                         log_print("rsb_insert match");
490                         dlm_dump_rsb(rsb);
491                         dlm_dump_rsb(cur);
492                         return -EEXIST;
493                 }
494         }
495
496         rb_link_node(&rsb->res_hashnode, parent, newn);
497         rb_insert_color(&rsb->res_hashnode, tree);
498         return 0;
499 }
500
501 /*
502  * Find rsb in rsbtbl and potentially create/add one
503  *
504  * Delaying the release of rsb's has a similar benefit to applications keeping
505  * NL locks on an rsb, but without the guarantee that the cached master value
506  * will still be valid when the rsb is reused.  Apps aren't always smart enough
507  * to keep NL locks on an rsb that they may lock again shortly; this can lead
508  * to excessive master lookups and removals if we don't delay the release.
509  *
510  * Searching for an rsb means looking through both the normal list and toss
511  * list.  When found on the toss list the rsb is moved to the normal list with
512  * ref count of 1; when found on normal list the ref count is incremented.
513  *
514  * rsb's on the keep list are being used locally and refcounted.
515  * rsb's on the toss list are not being used locally, and are not refcounted.
516  *
517  * The toss list rsb's were either
518  * - previously used locally but not any more (were on keep list, then
519  *   moved to toss list when last refcount dropped)
520  * - created and put on toss list as a directory record for a lookup
521  *   (we are the dir node for the res, but are not using the res right now,
522  *   but some other node is)
523  *
524  * The purpose of find_rsb() is to return a refcounted rsb for local use.
525  * So, if the given rsb is on the toss list, it is moved to the keep list
526  * before being returned.
527  *
528  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
529  * more refcounts exist, so the rsb is moved from the keep list to the
530  * toss list.
531  *
532  * rsb's on both keep and toss lists are used for doing a name to master
533  * lookups.  rsb's that are in use locally (and being refcounted) are on
534  * the keep list, rsb's that are not in use locally (not refcounted) and
535  * only exist for name/master lookups are on the toss list.
536  *
537  * rsb's on the toss list who's dir_nodeid is not local can have stale
538  * name/master mappings.  So, remote requests on such rsb's can potentially
539  * return with an error, which means the mapping is stale and needs to
540  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
541  * first_lkid is to keep only a single outstanding request on an rsb
542  * while that rsb has a potentially stale master.)
543  */
544
545 static int find_rsb_dir(struct dlm_ls *ls, char *name, int len,
546                         uint32_t hash, uint32_t b,
547                         int dir_nodeid, int from_nodeid,
548                         unsigned int flags, struct dlm_rsb **r_ret)
549 {
550         struct dlm_rsb *r = NULL;
551         int our_nodeid = dlm_our_nodeid();
552         int from_local = 0;
553         int from_other = 0;
554         int from_dir = 0;
555         int create = 0;
556         int error;
557
558         if (flags & R_RECEIVE_REQUEST) {
559                 if (from_nodeid == dir_nodeid)
560                         from_dir = 1;
561                 else
562                         from_other = 1;
563         } else if (flags & R_REQUEST) {
564                 from_local = 1;
565         }
566
567         /*
568          * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
569          * from_nodeid has sent us a lock in dlm_recover_locks, believing
570          * we're the new master.  Our local recovery may not have set
571          * res_master_nodeid to our_nodeid yet, so allow either.  Don't
572          * create the rsb; dlm_recover_process_copy() will handle EBADR
573          * by resending.
574          *
575          * If someone sends us a request, we are the dir node, and we do
576          * not find the rsb anywhere, then recreate it.  This happens if
577          * someone sends us a request after we have removed/freed an rsb
578          * from our toss list.  (They sent a request instead of lookup
579          * because they are using an rsb from their toss list.)
580          */
581
582         if (from_local || from_dir ||
583             (from_other && (dir_nodeid == our_nodeid))) {
584                 create = 1;
585         }
586
587  retry:
588         if (create) {
589                 error = pre_rsb_struct(ls);
590                 if (error < 0)
591                         goto out;
592         }
593
594         spin_lock(&ls->ls_rsbtbl[b].lock);
595
596         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
597         if (error)
598                 goto do_toss;
599         
600         /*
601          * rsb is active, so we can't check master_nodeid without lock_rsb.
602          */
603
604         kref_get(&r->res_ref);
605         error = 0;
606         goto out_unlock;
607
608
609  do_toss:
610         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
611         if (error)
612                 goto do_new;
613
614         /*
615          * rsb found inactive (master_nodeid may be out of date unless
616          * we are the dir_nodeid or were the master)  No other thread
617          * is using this rsb because it's on the toss list, so we can
618          * look at or update res_master_nodeid without lock_rsb.
619          */
620
621         if ((r->res_master_nodeid != our_nodeid) && from_other) {
622                 /* our rsb was not master, and another node (not the dir node)
623                    has sent us a request */
624                 log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
625                           from_nodeid, r->res_master_nodeid, dir_nodeid,
626                           r->res_name);
627                 error = -ENOTBLK;
628                 goto out_unlock;
629         }
630
631         if ((r->res_master_nodeid != our_nodeid) && from_dir) {
632                 /* don't think this should ever happen */
633                 log_error(ls, "find_rsb toss from_dir %d master %d",
634                           from_nodeid, r->res_master_nodeid);
635                 dlm_print_rsb(r);
636                 /* fix it and go on */
637                 r->res_master_nodeid = our_nodeid;
638                 r->res_nodeid = 0;
639                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
640                 r->res_first_lkid = 0;
641         }
642
643         if (from_local && (r->res_master_nodeid != our_nodeid)) {
644                 /* Because we have held no locks on this rsb,
645                    res_master_nodeid could have become stale. */
646                 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
647                 r->res_first_lkid = 0;
648         }
649
650         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
651         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
652         goto out_unlock;
653
654
655  do_new:
656         /*
657          * rsb not found
658          */
659
660         if (error == -EBADR && !create)
661                 goto out_unlock;
662
663         error = get_rsb_struct(ls, name, len, &r);
664         if (error == -EAGAIN) {
665                 spin_unlock(&ls->ls_rsbtbl[b].lock);
666                 goto retry;
667         }
668         if (error)
669                 goto out_unlock;
670
671         r->res_hash = hash;
672         r->res_bucket = b;
673         r->res_dir_nodeid = dir_nodeid;
674         kref_init(&r->res_ref);
675
676         if (from_dir) {
677                 /* want to see how often this happens */
678                 log_debug(ls, "find_rsb new from_dir %d recreate %s",
679                           from_nodeid, r->res_name);
680                 r->res_master_nodeid = our_nodeid;
681                 r->res_nodeid = 0;
682                 goto out_add;
683         }
684
685         if (from_other && (dir_nodeid != our_nodeid)) {
686                 /* should never happen */
687                 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
688                           from_nodeid, dir_nodeid, our_nodeid, r->res_name);
689                 dlm_free_rsb(r);
690                 r = NULL;
691                 error = -ENOTBLK;
692                 goto out_unlock;
693         }
694
695         if (from_other) {
696                 log_debug(ls, "find_rsb new from_other %d dir %d %s",
697                           from_nodeid, dir_nodeid, r->res_name);
698         }
699
700         if (dir_nodeid == our_nodeid) {
701                 /* When we are the dir nodeid, we can set the master
702                    node immediately */
703                 r->res_master_nodeid = our_nodeid;
704                 r->res_nodeid = 0;
705         } else {
706                 /* set_master will send_lookup to dir_nodeid */
707                 r->res_master_nodeid = 0;
708                 r->res_nodeid = -1;
709         }
710
711  out_add:
712         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
713  out_unlock:
714         spin_unlock(&ls->ls_rsbtbl[b].lock);
715  out:
716         *r_ret = r;
717         return error;
718 }
719
720 /* During recovery, other nodes can send us new MSTCPY locks (from
721    dlm_recover_locks) before we've made ourself master (in
722    dlm_recover_masters). */
723
724 static int find_rsb_nodir(struct dlm_ls *ls, char *name, int len,
725                           uint32_t hash, uint32_t b,
726                           int dir_nodeid, int from_nodeid,
727                           unsigned int flags, struct dlm_rsb **r_ret)
728 {
729         struct dlm_rsb *r = NULL;
730         int our_nodeid = dlm_our_nodeid();
731         int recover = (flags & R_RECEIVE_RECOVER);
732         int error;
733
734  retry:
735         error = pre_rsb_struct(ls);
736         if (error < 0)
737                 goto out;
738
739         spin_lock(&ls->ls_rsbtbl[b].lock);
740
741         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
742         if (error)
743                 goto do_toss;
744
745         /*
746          * rsb is active, so we can't check master_nodeid without lock_rsb.
747          */
748
749         kref_get(&r->res_ref);
750         goto out_unlock;
751
752
753  do_toss:
754         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
755         if (error)
756                 goto do_new;
757
758         /*
759          * rsb found inactive. No other thread is using this rsb because
760          * it's on the toss list, so we can look at or update
761          * res_master_nodeid without lock_rsb.
762          */
763
764         if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
765                 /* our rsb is not master, and another node has sent us a
766                    request; this should never happen */
767                 log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
768                           from_nodeid, r->res_master_nodeid, dir_nodeid);
769                 dlm_print_rsb(r);
770                 error = -ENOTBLK;
771                 goto out_unlock;
772         }
773
774         if (!recover && (r->res_master_nodeid != our_nodeid) &&
775             (dir_nodeid == our_nodeid)) {
776                 /* our rsb is not master, and we are dir; may as well fix it;
777                    this should never happen */
778                 log_error(ls, "find_rsb toss our %d master %d dir %d",
779                           our_nodeid, r->res_master_nodeid, dir_nodeid);
780                 dlm_print_rsb(r);
781                 r->res_master_nodeid = our_nodeid;
782                 r->res_nodeid = 0;
783         }
784
785         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
786         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
787         goto out_unlock;
788
789
790  do_new:
791         /*
792          * rsb not found
793          */
794
795         error = get_rsb_struct(ls, name, len, &r);
796         if (error == -EAGAIN) {
797                 spin_unlock(&ls->ls_rsbtbl[b].lock);
798                 goto retry;
799         }
800         if (error)
801                 goto out_unlock;
802
803         r->res_hash = hash;
804         r->res_bucket = b;
805         r->res_dir_nodeid = dir_nodeid;
806         r->res_master_nodeid = dir_nodeid;
807         r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
808         kref_init(&r->res_ref);
809
810         error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
811  out_unlock:
812         spin_unlock(&ls->ls_rsbtbl[b].lock);
813  out:
814         *r_ret = r;
815         return error;
816 }
817
818 static int find_rsb(struct dlm_ls *ls, char *name, int len, int from_nodeid,
819                     unsigned int flags, struct dlm_rsb **r_ret)
820 {
821         uint32_t hash, b;
822         int dir_nodeid;
823
824         if (len > DLM_RESNAME_MAXLEN)
825                 return -EINVAL;
826
827         hash = jhash(name, len, 0);
828         b = hash & (ls->ls_rsbtbl_size - 1);
829
830         dir_nodeid = dlm_hash2nodeid(ls, hash);
831
832         if (dlm_no_directory(ls))
833                 return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
834                                       from_nodeid, flags, r_ret);
835         else
836                 return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
837                                       from_nodeid, flags, r_ret);
838 }
839
840 /* we have received a request and found that res_master_nodeid != our_nodeid,
841    so we need to return an error or make ourself the master */
842
843 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
844                                   int from_nodeid)
845 {
846         if (dlm_no_directory(ls)) {
847                 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
848                           from_nodeid, r->res_master_nodeid,
849                           r->res_dir_nodeid);
850                 dlm_print_rsb(r);
851                 return -ENOTBLK;
852         }
853
854         if (from_nodeid != r->res_dir_nodeid) {
855                 /* our rsb is not master, and another node (not the dir node)
856                    has sent us a request.  this is much more common when our
857                    master_nodeid is zero, so limit debug to non-zero.  */
858
859                 if (r->res_master_nodeid) {
860                         log_debug(ls, "validate master from_other %d master %d "
861                                   "dir %d first %x %s", from_nodeid,
862                                   r->res_master_nodeid, r->res_dir_nodeid,
863                                   r->res_first_lkid, r->res_name);
864                 }
865                 return -ENOTBLK;
866         } else {
867                 /* our rsb is not master, but the dir nodeid has sent us a
868                    request; this could happen with master 0 / res_nodeid -1 */
869
870                 if (r->res_master_nodeid) {
871                         log_error(ls, "validate master from_dir %d master %d "
872                                   "first %x %s",
873                                   from_nodeid, r->res_master_nodeid,
874                                   r->res_first_lkid, r->res_name);
875                 }
876
877                 r->res_master_nodeid = dlm_our_nodeid();
878                 r->res_nodeid = 0;
879                 return 0;
880         }
881 }
882
883 /*
884  * We're the dir node for this res and another node wants to know the
885  * master nodeid.  During normal operation (non recovery) this is only
886  * called from receive_lookup(); master lookups when the local node is
887  * the dir node are done by find_rsb().
888  *
889  * normal operation, we are the dir node for a resource
890  * . _request_lock
891  * . set_master
892  * . send_lookup
893  * . receive_lookup
894  * . dlm_master_lookup flags 0
895  *
896  * recover directory, we are rebuilding dir for all resources
897  * . dlm_recover_directory
898  * . dlm_rcom_names
899  *   remote node sends back the rsb names it is master of and we are dir of
900  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
901  *   we either create new rsb setting remote node as master, or find existing
902  *   rsb and set master to be the remote node.
903  *
904  * recover masters, we are finding the new master for resources
905  * . dlm_recover_masters
906  * . recover_master
907  * . dlm_send_rcom_lookup
908  * . receive_rcom_lookup
909  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
910  */
911
912 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, char *name, int len,
913                       unsigned int flags, int *r_nodeid, int *result)
914 {
915         struct dlm_rsb *r = NULL;
916         uint32_t hash, b;
917         int from_master = (flags & DLM_LU_RECOVER_DIR);
918         int fix_master = (flags & DLM_LU_RECOVER_MASTER);
919         int our_nodeid = dlm_our_nodeid();
920         int dir_nodeid, error, toss_list = 0;
921
922         if (len > DLM_RESNAME_MAXLEN)
923                 return -EINVAL;
924
925         if (from_nodeid == our_nodeid) {
926                 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
927                           our_nodeid, flags);
928                 return -EINVAL;
929         }
930
931         hash = jhash(name, len, 0);
932         b = hash & (ls->ls_rsbtbl_size - 1);
933
934         dir_nodeid = dlm_hash2nodeid(ls, hash);
935         if (dir_nodeid != our_nodeid) {
936                 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
937                           from_nodeid, dir_nodeid, our_nodeid, hash,
938                           ls->ls_num_nodes);
939                 *r_nodeid = -1;
940                 return -EINVAL;
941         }
942
943  retry:
944         error = pre_rsb_struct(ls);
945         if (error < 0)
946                 return error;
947
948         spin_lock(&ls->ls_rsbtbl[b].lock);
949         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
950         if (!error) {
951                 /* because the rsb is active, we need to lock_rsb before
952                    checking/changing re_master_nodeid */
953
954                 hold_rsb(r);
955                 spin_unlock(&ls->ls_rsbtbl[b].lock);
956                 lock_rsb(r);
957                 goto found;
958         }
959
960         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
961         if (error)
962                 goto not_found;
963
964         /* because the rsb is inactive (on toss list), it's not refcounted
965            and lock_rsb is not used, but is protected by the rsbtbl lock */
966
967         toss_list = 1;
968  found:
969         if (r->res_dir_nodeid != our_nodeid) {
970                 /* should not happen, but may as well fix it and carry on */
971                 log_error(ls, "dlm_master_lookup res_dir %d our %d %s",
972                           r->res_dir_nodeid, our_nodeid, r->res_name);
973                 r->res_dir_nodeid = our_nodeid;
974         }
975
976         if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
977                 /* Recovery uses this function to set a new master when
978                    the previous master failed.  Setting NEW_MASTER will
979                    force dlm_recover_masters to call recover_master on this
980                    rsb even though the res_nodeid is no longer removed. */
981
982                 r->res_master_nodeid = from_nodeid;
983                 r->res_nodeid = from_nodeid;
984                 rsb_set_flag(r, RSB_NEW_MASTER);
985
986                 if (toss_list) {
987                         /* I don't think we should ever find it on toss list. */
988                         log_error(ls, "dlm_master_lookup fix_master on toss");
989                         dlm_dump_rsb(r);
990                 }
991         }
992
993         if (from_master && (r->res_master_nodeid != from_nodeid)) {
994                 /* this will happen if from_nodeid became master during
995                    a previous recovery cycle, and we aborted the previous
996                    cycle before recovering this master value */
997
998                 log_limit(ls, "dlm_master_lookup from_master %d "
999                           "master_nodeid %d res_nodeid %d first %x %s",
1000                           from_nodeid, r->res_master_nodeid, r->res_nodeid,
1001                           r->res_first_lkid, r->res_name);
1002
1003                 if (r->res_master_nodeid == our_nodeid) {
1004                         log_error(ls, "from_master %d our_master", from_nodeid);
1005                         dlm_dump_rsb(r);
1006                         dlm_send_rcom_lookup_dump(r, from_nodeid);
1007                         goto out_found;
1008                 }
1009
1010                 r->res_master_nodeid = from_nodeid;
1011                 r->res_nodeid = from_nodeid;
1012                 rsb_set_flag(r, RSB_NEW_MASTER);
1013         }
1014
1015         if (!r->res_master_nodeid) {
1016                 /* this will happen if recovery happens while we're looking
1017                    up the master for this rsb */
1018
1019                 log_debug(ls, "dlm_master_lookup master 0 to %d first %x %s",
1020                           from_nodeid, r->res_first_lkid, r->res_name);
1021                 r->res_master_nodeid = from_nodeid;
1022                 r->res_nodeid = from_nodeid;
1023         }
1024
1025         if (!from_master && !fix_master &&
1026             (r->res_master_nodeid == from_nodeid)) {
1027                 /* this can happen when the master sends remove, the dir node
1028                    finds the rsb on the keep list and ignores the remove,
1029                    and the former master sends a lookup */
1030
1031                 log_limit(ls, "dlm_master_lookup from master %d flags %x "
1032                           "first %x %s", from_nodeid, flags,
1033                           r->res_first_lkid, r->res_name);
1034         }
1035
1036  out_found:
1037         *r_nodeid = r->res_master_nodeid;
1038         if (result)
1039                 *result = DLM_LU_MATCH;
1040
1041         if (toss_list) {
1042                 r->res_toss_time = jiffies;
1043                 /* the rsb was inactive (on toss list) */
1044                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1045         } else {
1046                 /* the rsb was active */
1047                 unlock_rsb(r);
1048                 put_rsb(r);
1049         }
1050         return 0;
1051
1052  not_found:
1053         error = get_rsb_struct(ls, name, len, &r);
1054         if (error == -EAGAIN) {
1055                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1056                 goto retry;
1057         }
1058         if (error)
1059                 goto out_unlock;
1060
1061         r->res_hash = hash;
1062         r->res_bucket = b;
1063         r->res_dir_nodeid = our_nodeid;
1064         r->res_master_nodeid = from_nodeid;
1065         r->res_nodeid = from_nodeid;
1066         kref_init(&r->res_ref);
1067         r->res_toss_time = jiffies;
1068
1069         error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1070         if (error) {
1071                 /* should never happen */
1072                 dlm_free_rsb(r);
1073                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1074                 goto retry;
1075         }
1076
1077         if (result)
1078                 *result = DLM_LU_ADD;
1079         *r_nodeid = from_nodeid;
1080         error = 0;
1081  out_unlock:
1082         spin_unlock(&ls->ls_rsbtbl[b].lock);
1083         return error;
1084 }
1085
1086 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1087 {
1088         struct rb_node *n;
1089         struct dlm_rsb *r;
1090         int i;
1091
1092         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1093                 spin_lock(&ls->ls_rsbtbl[i].lock);
1094                 for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1095                         r = rb_entry(n, struct dlm_rsb, res_hashnode);
1096                         if (r->res_hash == hash)
1097                                 dlm_dump_rsb(r);
1098                 }
1099                 spin_unlock(&ls->ls_rsbtbl[i].lock);
1100         }
1101 }
1102
1103 void dlm_dump_rsb_name(struct dlm_ls *ls, char *name, int len)
1104 {
1105         struct dlm_rsb *r = NULL;
1106         uint32_t hash, b;
1107         int error;
1108
1109         hash = jhash(name, len, 0);
1110         b = hash & (ls->ls_rsbtbl_size - 1);
1111
1112         spin_lock(&ls->ls_rsbtbl[b].lock);
1113         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1114         if (!error)
1115                 goto out_dump;
1116
1117         error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1118         if (error)
1119                 goto out;
1120  out_dump:
1121         dlm_dump_rsb(r);
1122  out:
1123         spin_unlock(&ls->ls_rsbtbl[b].lock);
1124 }
1125
1126 static void toss_rsb(struct kref *kref)
1127 {
1128         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1129         struct dlm_ls *ls = r->res_ls;
1130
1131         DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1132         kref_init(&r->res_ref);
1133         rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1134         rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1135         r->res_toss_time = jiffies;
1136         ls->ls_rsbtbl[r->res_bucket].flags |= DLM_RTF_SHRINK;
1137         if (r->res_lvbptr) {
1138                 dlm_free_lvb(r->res_lvbptr);
1139                 r->res_lvbptr = NULL;
1140         }
1141 }
1142
1143 /* See comment for unhold_lkb */
1144
1145 static void unhold_rsb(struct dlm_rsb *r)
1146 {
1147         int rv;
1148         rv = kref_put(&r->res_ref, toss_rsb);
1149         DLM_ASSERT(!rv, dlm_dump_rsb(r););
1150 }
1151
1152 static void kill_rsb(struct kref *kref)
1153 {
1154         struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1155
1156         /* All work is done after the return from kref_put() so we
1157            can release the write_lock before the remove and free. */
1158
1159         DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1160         DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1161         DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1162         DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1163         DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1164         DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1165 }
1166
1167 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1168    The rsb must exist as long as any lkb's for it do. */
1169
1170 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1171 {
1172         hold_rsb(r);
1173         lkb->lkb_resource = r;
1174 }
1175
1176 static void detach_lkb(struct dlm_lkb *lkb)
1177 {
1178         if (lkb->lkb_resource) {
1179                 put_rsb(lkb->lkb_resource);
1180                 lkb->lkb_resource = NULL;
1181         }
1182 }
1183
1184 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1185 {
1186         struct dlm_lkb *lkb;
1187         int rv;
1188
1189         lkb = dlm_allocate_lkb(ls);
1190         if (!lkb)
1191                 return -ENOMEM;
1192
1193         lkb->lkb_nodeid = -1;
1194         lkb->lkb_grmode = DLM_LOCK_IV;
1195         kref_init(&lkb->lkb_ref);
1196         INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1197         INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1198         INIT_LIST_HEAD(&lkb->lkb_time_list);
1199         INIT_LIST_HEAD(&lkb->lkb_cb_list);
1200         mutex_init(&lkb->lkb_cb_mutex);
1201         INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1202
1203         idr_preload(GFP_NOFS);
1204         spin_lock(&ls->ls_lkbidr_spin);
1205         rv = idr_alloc(&ls->ls_lkbidr, lkb, 1, 0, GFP_NOWAIT);
1206         if (rv >= 0)
1207                 lkb->lkb_id = rv;
1208         spin_unlock(&ls->ls_lkbidr_spin);
1209         idr_preload_end();
1210
1211         if (rv < 0) {
1212                 log_error(ls, "create_lkb idr error %d", rv);
1213                 dlm_free_lkb(lkb);
1214                 return rv;
1215         }
1216
1217         *lkb_ret = lkb;
1218         return 0;
1219 }
1220
1221 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1222 {
1223         struct dlm_lkb *lkb;
1224
1225         spin_lock(&ls->ls_lkbidr_spin);
1226         lkb = idr_find(&ls->ls_lkbidr, lkid);
1227         if (lkb)
1228                 kref_get(&lkb->lkb_ref);
1229         spin_unlock(&ls->ls_lkbidr_spin);
1230
1231         *lkb_ret = lkb;
1232         return lkb ? 0 : -ENOENT;
1233 }
1234
1235 static void kill_lkb(struct kref *kref)
1236 {
1237         struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1238
1239         /* All work is done after the return from kref_put() so we
1240            can release the write_lock before the detach_lkb */
1241
1242         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1243 }
1244
1245 /* __put_lkb() is used when an lkb may not have an rsb attached to
1246    it so we need to provide the lockspace explicitly */
1247
1248 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1249 {
1250         uint32_t lkid = lkb->lkb_id;
1251
1252         spin_lock(&ls->ls_lkbidr_spin);
1253         if (kref_put(&lkb->lkb_ref, kill_lkb)) {
1254                 idr_remove(&ls->ls_lkbidr, lkid);
1255                 spin_unlock(&ls->ls_lkbidr_spin);
1256
1257                 detach_lkb(lkb);
1258
1259                 /* for local/process lkbs, lvbptr points to caller's lksb */
1260                 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1261                         dlm_free_lvb(lkb->lkb_lvbptr);
1262                 dlm_free_lkb(lkb);
1263                 return 1;
1264         } else {
1265                 spin_unlock(&ls->ls_lkbidr_spin);
1266                 return 0;
1267         }
1268 }
1269
1270 int dlm_put_lkb(struct dlm_lkb *lkb)
1271 {
1272         struct dlm_ls *ls;
1273
1274         DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1275         DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1276
1277         ls = lkb->lkb_resource->res_ls;
1278         return __put_lkb(ls, lkb);
1279 }
1280
1281 /* This is only called to add a reference when the code already holds
1282    a valid reference to the lkb, so there's no need for locking. */
1283
1284 static inline void hold_lkb(struct dlm_lkb *lkb)
1285 {
1286         kref_get(&lkb->lkb_ref);
1287 }
1288
1289 /* This is called when we need to remove a reference and are certain
1290    it's not the last ref.  e.g. del_lkb is always called between a
1291    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1292    put_lkb would work fine, but would involve unnecessary locking */
1293
1294 static inline void unhold_lkb(struct dlm_lkb *lkb)
1295 {
1296         int rv;
1297         rv = kref_put(&lkb->lkb_ref, kill_lkb);
1298         DLM_ASSERT(!rv, dlm_print_lkb(lkb););
1299 }
1300
1301 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1302                             int mode)
1303 {
1304         struct dlm_lkb *lkb = NULL;
1305
1306         list_for_each_entry(lkb, head, lkb_statequeue)
1307                 if (lkb->lkb_rqmode < mode)
1308                         break;
1309
1310         __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
1311 }
1312
1313 /* add/remove lkb to rsb's grant/convert/wait queue */
1314
1315 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1316 {
1317         kref_get(&lkb->lkb_ref);
1318
1319         DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1320
1321         lkb->lkb_timestamp = ktime_get();
1322
1323         lkb->lkb_status = status;
1324
1325         switch (status) {
1326         case DLM_LKSTS_WAITING:
1327                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1328                         list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1329                 else
1330                         list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1331                 break;
1332         case DLM_LKSTS_GRANTED:
1333                 /* convention says granted locks kept in order of grmode */
1334                 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1335                                 lkb->lkb_grmode);
1336                 break;
1337         case DLM_LKSTS_CONVERT:
1338                 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1339                         list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1340                 else
1341                         list_add_tail(&lkb->lkb_statequeue,
1342                                       &r->res_convertqueue);
1343                 break;
1344         default:
1345                 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1346         }
1347 }
1348
1349 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1350 {
1351         lkb->lkb_status = 0;
1352         list_del(&lkb->lkb_statequeue);
1353         unhold_lkb(lkb);
1354 }
1355
1356 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1357 {
1358         hold_lkb(lkb);
1359         del_lkb(r, lkb);
1360         add_lkb(r, lkb, sts);
1361         unhold_lkb(lkb);
1362 }
1363
1364 static int msg_reply_type(int mstype)
1365 {
1366         switch (mstype) {
1367         case DLM_MSG_REQUEST:
1368                 return DLM_MSG_REQUEST_REPLY;
1369         case DLM_MSG_CONVERT:
1370                 return DLM_MSG_CONVERT_REPLY;
1371         case DLM_MSG_UNLOCK:
1372                 return DLM_MSG_UNLOCK_REPLY;
1373         case DLM_MSG_CANCEL:
1374                 return DLM_MSG_CANCEL_REPLY;
1375         case DLM_MSG_LOOKUP:
1376                 return DLM_MSG_LOOKUP_REPLY;
1377         }
1378         return -1;
1379 }
1380
1381 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
1382 {
1383         int i;
1384
1385         for (i = 0; i < num_nodes; i++) {
1386                 if (!warned[i]) {
1387                         warned[i] = nodeid;
1388                         return 0;
1389                 }
1390                 if (warned[i] == nodeid)
1391                         return 1;
1392         }
1393         return 0;
1394 }
1395
1396 void dlm_scan_waiters(struct dlm_ls *ls)
1397 {
1398         struct dlm_lkb *lkb;
1399         s64 us;
1400         s64 debug_maxus = 0;
1401         u32 debug_scanned = 0;
1402         u32 debug_expired = 0;
1403         int num_nodes = 0;
1404         int *warned = NULL;
1405
1406         if (!dlm_config.ci_waitwarn_us)
1407                 return;
1408
1409         mutex_lock(&ls->ls_waiters_mutex);
1410
1411         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1412                 if (!lkb->lkb_wait_time)
1413                         continue;
1414
1415                 debug_scanned++;
1416
1417                 us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
1418
1419                 if (us < dlm_config.ci_waitwarn_us)
1420                         continue;
1421
1422                 lkb->lkb_wait_time = 0;
1423
1424                 debug_expired++;
1425                 if (us > debug_maxus)
1426                         debug_maxus = us;
1427
1428                 if (!num_nodes) {
1429                         num_nodes = ls->ls_num_nodes;
1430                         warned = kcalloc(num_nodes, sizeof(int), GFP_KERNEL);
1431                 }
1432                 if (!warned)
1433                         continue;
1434                 if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
1435                         continue;
1436
1437                 log_error(ls, "waitwarn %x %lld %d us check connection to "
1438                           "node %d", lkb->lkb_id, (long long)us,
1439                           dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
1440         }
1441         mutex_unlock(&ls->ls_waiters_mutex);
1442         kfree(warned);
1443
1444         if (debug_expired)
1445                 log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
1446                           debug_scanned, debug_expired,
1447                           dlm_config.ci_waitwarn_us, (long long)debug_maxus);
1448 }
1449
1450 /* add/remove lkb from global waiters list of lkb's waiting for
1451    a reply from a remote node */
1452
1453 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1454 {
1455         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1456         int error = 0;
1457
1458         mutex_lock(&ls->ls_waiters_mutex);
1459
1460         if (is_overlap_unlock(lkb) ||
1461             (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1462                 error = -EINVAL;
1463                 goto out;
1464         }
1465
1466         if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1467                 switch (mstype) {
1468                 case DLM_MSG_UNLOCK:
1469                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1470                         break;
1471                 case DLM_MSG_CANCEL:
1472                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1473                         break;
1474                 default:
1475                         error = -EBUSY;
1476                         goto out;
1477                 }
1478                 lkb->lkb_wait_count++;
1479                 hold_lkb(lkb);
1480
1481                 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1482                           lkb->lkb_id, lkb->lkb_wait_type, mstype,
1483                           lkb->lkb_wait_count, lkb->lkb_flags);
1484                 goto out;
1485         }
1486
1487         DLM_ASSERT(!lkb->lkb_wait_count,
1488                    dlm_print_lkb(lkb);
1489                    printk("wait_count %d\n", lkb->lkb_wait_count););
1490
1491         lkb->lkb_wait_count++;
1492         lkb->lkb_wait_type = mstype;
1493         lkb->lkb_wait_time = ktime_get();
1494         lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1495         hold_lkb(lkb);
1496         list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1497  out:
1498         if (error)
1499                 log_error(ls, "addwait error %x %d flags %x %d %d %s",
1500                           lkb->lkb_id, error, lkb->lkb_flags, mstype,
1501                           lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1502         mutex_unlock(&ls->ls_waiters_mutex);
1503         return error;
1504 }
1505
1506 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1507    list as part of process_requestqueue (e.g. a lookup that has an optimized
1508    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1509    set RESEND and dlm_recover_waiters_post() */
1510
1511 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1512                                 struct dlm_message *ms)
1513 {
1514         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1515         int overlap_done = 0;
1516
1517         if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
1518                 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1519                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
1520                 overlap_done = 1;
1521                 goto out_del;
1522         }
1523
1524         if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
1525                 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1526                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1527                 overlap_done = 1;
1528                 goto out_del;
1529         }
1530
1531         /* Cancel state was preemptively cleared by a successful convert,
1532            see next comment, nothing to do. */
1533
1534         if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1535             (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1536                 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1537                           lkb->lkb_id, lkb->lkb_wait_type);
1538                 return -1;
1539         }
1540
1541         /* Remove for the convert reply, and premptively remove for the
1542            cancel reply.  A convert has been granted while there's still
1543            an outstanding cancel on it (the cancel is moot and the result
1544            in the cancel reply should be 0).  We preempt the cancel reply
1545            because the app gets the convert result and then can follow up
1546            with another op, like convert.  This subsequent op would see the
1547            lingering state of the cancel and fail with -EBUSY. */
1548
1549         if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1550             (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
1551             is_overlap_cancel(lkb) && ms && !ms->m_result) {
1552                 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1553                           lkb->lkb_id);
1554                 lkb->lkb_wait_type = 0;
1555                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1556                 lkb->lkb_wait_count--;
1557                 goto out_del;
1558         }
1559
1560         /* N.B. type of reply may not always correspond to type of original
1561            msg due to lookup->request optimization, verify others? */
1562
1563         if (lkb->lkb_wait_type) {
1564                 lkb->lkb_wait_type = 0;
1565                 goto out_del;
1566         }
1567
1568         log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1569                   lkb->lkb_id, ms ? ms->m_header.h_nodeid : 0, lkb->lkb_remid,
1570                   mstype, lkb->lkb_flags);
1571         return -1;
1572
1573  out_del:
1574         /* the force-unlock/cancel has completed and we haven't recvd a reply
1575            to the op that was in progress prior to the unlock/cancel; we
1576            give up on any reply to the earlier op.  FIXME: not sure when/how
1577            this would happen */
1578
1579         if (overlap_done && lkb->lkb_wait_type) {
1580                 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1581                           lkb->lkb_id, mstype, lkb->lkb_wait_type);
1582                 lkb->lkb_wait_count--;
1583                 lkb->lkb_wait_type = 0;
1584         }
1585
1586         DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1587
1588         lkb->lkb_flags &= ~DLM_IFL_RESEND;
1589         lkb->lkb_wait_count--;
1590         if (!lkb->lkb_wait_count)
1591                 list_del_init(&lkb->lkb_wait_reply);
1592         unhold_lkb(lkb);
1593         return 0;
1594 }
1595
1596 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1597 {
1598         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1599         int error;
1600
1601         mutex_lock(&ls->ls_waiters_mutex);
1602         error = _remove_from_waiters(lkb, mstype, NULL);
1603         mutex_unlock(&ls->ls_waiters_mutex);
1604         return error;
1605 }
1606
1607 /* Handles situations where we might be processing a "fake" or "stub" reply in
1608    which we can't try to take waiters_mutex again. */
1609
1610 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1611 {
1612         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1613         int error;
1614
1615         if (ms->m_flags != DLM_IFL_STUB_MS)
1616                 mutex_lock(&ls->ls_waiters_mutex);
1617         error = _remove_from_waiters(lkb, ms->m_type, ms);
1618         if (ms->m_flags != DLM_IFL_STUB_MS)
1619                 mutex_unlock(&ls->ls_waiters_mutex);
1620         return error;
1621 }
1622
1623 /* If there's an rsb for the same resource being removed, ensure
1624    that the remove message is sent before the new lookup message.
1625    It should be rare to need a delay here, but if not, then it may
1626    be worthwhile to add a proper wait mechanism rather than a delay. */
1627
1628 static void wait_pending_remove(struct dlm_rsb *r)
1629 {
1630         struct dlm_ls *ls = r->res_ls;
1631  restart:
1632         spin_lock(&ls->ls_remove_spin);
1633         if (ls->ls_remove_len &&
1634             !rsb_cmp(r, ls->ls_remove_name, ls->ls_remove_len)) {
1635                 log_debug(ls, "delay lookup for remove dir %d %s",
1636                           r->res_dir_nodeid, r->res_name);
1637                 spin_unlock(&ls->ls_remove_spin);
1638                 msleep(1);
1639                 goto restart;
1640         }
1641         spin_unlock(&ls->ls_remove_spin);
1642 }
1643
1644 /*
1645  * ls_remove_spin protects ls_remove_name and ls_remove_len which are
1646  * read by other threads in wait_pending_remove.  ls_remove_names
1647  * and ls_remove_lens are only used by the scan thread, so they do
1648  * not need protection.
1649  */
1650
1651 static void shrink_bucket(struct dlm_ls *ls, int b)
1652 {
1653         struct rb_node *n, *next;
1654         struct dlm_rsb *r;
1655         char *name;
1656         int our_nodeid = dlm_our_nodeid();
1657         int remote_count = 0;
1658         int need_shrink = 0;
1659         int i, len, rv;
1660
1661         memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1662
1663         spin_lock(&ls->ls_rsbtbl[b].lock);
1664
1665         if (!(ls->ls_rsbtbl[b].flags & DLM_RTF_SHRINK)) {
1666                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1667                 return;
1668         }
1669
1670         for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1671                 next = rb_next(n);
1672                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
1673
1674                 /* If we're the directory record for this rsb, and
1675                    we're not the master of it, then we need to wait
1676                    for the master node to send us a dir remove for
1677                    before removing the dir record. */
1678
1679                 if (!dlm_no_directory(ls) &&
1680                     (r->res_master_nodeid != our_nodeid) &&
1681                     (dlm_dir_nodeid(r) == our_nodeid)) {
1682                         continue;
1683                 }
1684
1685                 need_shrink = 1;
1686
1687                 if (!time_after_eq(jiffies, r->res_toss_time +
1688                                    dlm_config.ci_toss_secs * HZ)) {
1689                         continue;
1690                 }
1691
1692                 if (!dlm_no_directory(ls) &&
1693                     (r->res_master_nodeid == our_nodeid) &&
1694                     (dlm_dir_nodeid(r) != our_nodeid)) {
1695
1696                         /* We're the master of this rsb but we're not
1697                            the directory record, so we need to tell the
1698                            dir node to remove the dir record. */
1699
1700                         ls->ls_remove_lens[remote_count] = r->res_length;
1701                         memcpy(ls->ls_remove_names[remote_count], r->res_name,
1702                                DLM_RESNAME_MAXLEN);
1703                         remote_count++;
1704
1705                         if (remote_count >= DLM_REMOVE_NAMES_MAX)
1706                                 break;
1707                         continue;
1708                 }
1709
1710                 if (!kref_put(&r->res_ref, kill_rsb)) {
1711                         log_error(ls, "tossed rsb in use %s", r->res_name);
1712                         continue;
1713                 }
1714
1715                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1716                 dlm_free_rsb(r);
1717         }
1718
1719         if (need_shrink)
1720                 ls->ls_rsbtbl[b].flags |= DLM_RTF_SHRINK;
1721         else
1722                 ls->ls_rsbtbl[b].flags &= ~DLM_RTF_SHRINK;
1723         spin_unlock(&ls->ls_rsbtbl[b].lock);
1724
1725         /*
1726          * While searching for rsb's to free, we found some that require
1727          * remote removal.  We leave them in place and find them again here
1728          * so there is a very small gap between removing them from the toss
1729          * list and sending the removal.  Keeping this gap small is
1730          * important to keep us (the master node) from being out of sync
1731          * with the remote dir node for very long.
1732          *
1733          * From the time the rsb is removed from toss until just after
1734          * send_remove, the rsb name is saved in ls_remove_name.  A new
1735          * lookup checks this to ensure that a new lookup message for the
1736          * same resource name is not sent just before the remove message.
1737          */
1738
1739         for (i = 0; i < remote_count; i++) {
1740                 name = ls->ls_remove_names[i];
1741                 len = ls->ls_remove_lens[i];
1742
1743                 spin_lock(&ls->ls_rsbtbl[b].lock);
1744                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1745                 if (rv) {
1746                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1747                         log_debug(ls, "remove_name not toss %s", name);
1748                         continue;
1749                 }
1750
1751                 if (r->res_master_nodeid != our_nodeid) {
1752                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1753                         log_debug(ls, "remove_name master %d dir %d our %d %s",
1754                                   r->res_master_nodeid, r->res_dir_nodeid,
1755                                   our_nodeid, name);
1756                         continue;
1757                 }
1758
1759                 if (r->res_dir_nodeid == our_nodeid) {
1760                         /* should never happen */
1761                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1762                         log_error(ls, "remove_name dir %d master %d our %d %s",
1763                                   r->res_dir_nodeid, r->res_master_nodeid,
1764                                   our_nodeid, name);
1765                         continue;
1766                 }
1767
1768                 if (!time_after_eq(jiffies, r->res_toss_time +
1769                                    dlm_config.ci_toss_secs * HZ)) {
1770                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1771                         log_debug(ls, "remove_name toss_time %lu now %lu %s",
1772                                   r->res_toss_time, jiffies, name);
1773                         continue;
1774                 }
1775
1776                 if (!kref_put(&r->res_ref, kill_rsb)) {
1777                         spin_unlock(&ls->ls_rsbtbl[b].lock);
1778                         log_error(ls, "remove_name in use %s", name);
1779                         continue;
1780                 }
1781
1782                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1783
1784                 /* block lookup of same name until we've sent remove */
1785                 spin_lock(&ls->ls_remove_spin);
1786                 ls->ls_remove_len = len;
1787                 memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
1788                 spin_unlock(&ls->ls_remove_spin);
1789                 spin_unlock(&ls->ls_rsbtbl[b].lock);
1790
1791                 send_remove(r);
1792
1793                 /* allow lookup of name again */
1794                 spin_lock(&ls->ls_remove_spin);
1795                 ls->ls_remove_len = 0;
1796                 memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
1797                 spin_unlock(&ls->ls_remove_spin);
1798
1799                 dlm_free_rsb(r);
1800         }
1801 }
1802
1803 void dlm_scan_rsbs(struct dlm_ls *ls)
1804 {
1805         int i;
1806
1807         for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1808                 shrink_bucket(ls, i);
1809                 if (dlm_locking_stopped(ls))
1810                         break;
1811                 cond_resched();
1812         }
1813 }
1814
1815 static void add_timeout(struct dlm_lkb *lkb)
1816 {
1817         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1818
1819         if (is_master_copy(lkb))
1820                 return;
1821
1822         if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1823             !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1824                 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1825                 goto add_it;
1826         }
1827         if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1828                 goto add_it;
1829         return;
1830
1831  add_it:
1832         DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1833         mutex_lock(&ls->ls_timeout_mutex);
1834         hold_lkb(lkb);
1835         list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1836         mutex_unlock(&ls->ls_timeout_mutex);
1837 }
1838
1839 static void del_timeout(struct dlm_lkb *lkb)
1840 {
1841         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1842
1843         mutex_lock(&ls->ls_timeout_mutex);
1844         if (!list_empty(&lkb->lkb_time_list)) {
1845                 list_del_init(&lkb->lkb_time_list);
1846                 unhold_lkb(lkb);
1847         }
1848         mutex_unlock(&ls->ls_timeout_mutex);
1849 }
1850
1851 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1852    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1853    and then lock rsb because of lock ordering in add_timeout.  We may need
1854    to specify some special timeout-related bits in the lkb that are just to
1855    be accessed under the timeout_mutex. */
1856
1857 void dlm_scan_timeout(struct dlm_ls *ls)
1858 {
1859         struct dlm_rsb *r;
1860         struct dlm_lkb *lkb;
1861         int do_cancel, do_warn;
1862         s64 wait_us;
1863
1864         for (;;) {
1865                 if (dlm_locking_stopped(ls))
1866                         break;
1867
1868                 do_cancel = 0;
1869                 do_warn = 0;
1870                 mutex_lock(&ls->ls_timeout_mutex);
1871                 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1872
1873                         wait_us = ktime_to_us(ktime_sub(ktime_get(),
1874                                                         lkb->lkb_timestamp));
1875
1876                         if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1877                             wait_us >= (lkb->lkb_timeout_cs * 10000))
1878                                 do_cancel = 1;
1879
1880                         if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1881                             wait_us >= dlm_config.ci_timewarn_cs * 10000)
1882                                 do_warn = 1;
1883
1884                         if (!do_cancel && !do_warn)
1885                                 continue;
1886                         hold_lkb(lkb);
1887                         break;
1888                 }
1889                 mutex_unlock(&ls->ls_timeout_mutex);
1890
1891                 if (!do_cancel && !do_warn)
1892                         break;
1893
1894                 r = lkb->lkb_resource;
1895                 hold_rsb(r);
1896                 lock_rsb(r);
1897
1898                 if (do_warn) {
1899                         /* clear flag so we only warn once */
1900                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1901                         if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1902                                 del_timeout(lkb);
1903                         dlm_timeout_warn(lkb);
1904                 }
1905
1906                 if (do_cancel) {
1907                         log_debug(ls, "timeout cancel %x node %d %s",
1908                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1909                         lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1910                         lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1911                         del_timeout(lkb);
1912                         _cancel_lock(r, lkb);
1913                 }
1914
1915                 unlock_rsb(r);
1916                 unhold_rsb(r);
1917                 dlm_put_lkb(lkb);
1918         }
1919 }
1920
1921 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1922    dlm_recoverd before checking/setting ls_recover_begin. */
1923
1924 void dlm_adjust_timeouts(struct dlm_ls *ls)
1925 {
1926         struct dlm_lkb *lkb;
1927         u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1928
1929         ls->ls_recover_begin = 0;
1930         mutex_lock(&ls->ls_timeout_mutex);
1931         list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1932                 lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1933         mutex_unlock(&ls->ls_timeout_mutex);
1934
1935         if (!dlm_config.ci_waitwarn_us)
1936                 return;
1937
1938         mutex_lock(&ls->ls_waiters_mutex);
1939         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1940                 if (ktime_to_us(lkb->lkb_wait_time))
1941                         lkb->lkb_wait_time = ktime_get();
1942         }
1943         mutex_unlock(&ls->ls_waiters_mutex);
1944 }
1945
1946 /* lkb is master or local copy */
1947
1948 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1949 {
1950         int b, len = r->res_ls->ls_lvblen;
1951
1952         /* b=1 lvb returned to caller
1953            b=0 lvb written to rsb or invalidated
1954            b=-1 do nothing */
1955
1956         b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1957
1958         if (b == 1) {
1959                 if (!lkb->lkb_lvbptr)
1960                         return;
1961
1962                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1963                         return;
1964
1965                 if (!r->res_lvbptr)
1966                         return;
1967
1968                 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1969                 lkb->lkb_lvbseq = r->res_lvbseq;
1970
1971         } else if (b == 0) {
1972                 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1973                         rsb_set_flag(r, RSB_VALNOTVALID);
1974                         return;
1975                 }
1976
1977                 if (!lkb->lkb_lvbptr)
1978                         return;
1979
1980                 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1981                         return;
1982
1983                 if (!r->res_lvbptr)
1984                         r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1985
1986                 if (!r->res_lvbptr)
1987                         return;
1988
1989                 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1990                 r->res_lvbseq++;
1991                 lkb->lkb_lvbseq = r->res_lvbseq;
1992                 rsb_clear_flag(r, RSB_VALNOTVALID);
1993         }
1994
1995         if (rsb_flag(r, RSB_VALNOTVALID))
1996                 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1997 }
1998
1999 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2000 {
2001         if (lkb->lkb_grmode < DLM_LOCK_PW)
2002                 return;
2003
2004         if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
2005                 rsb_set_flag(r, RSB_VALNOTVALID);
2006                 return;
2007         }
2008
2009         if (!lkb->lkb_lvbptr)
2010                 return;
2011
2012         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2013                 return;
2014
2015         if (!r->res_lvbptr)
2016                 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
2017
2018         if (!r->res_lvbptr)
2019                 return;
2020
2021         memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2022         r->res_lvbseq++;
2023         rsb_clear_flag(r, RSB_VALNOTVALID);
2024 }
2025
2026 /* lkb is process copy (pc) */
2027
2028 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2029                             struct dlm_message *ms)
2030 {
2031         int b;
2032
2033         if (!lkb->lkb_lvbptr)
2034                 return;
2035
2036         if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
2037                 return;
2038
2039         b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
2040         if (b == 1) {
2041                 int len = receive_extralen(ms);
2042                 if (len > r->res_ls->ls_lvblen)
2043                         len = r->res_ls->ls_lvblen;
2044                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2045                 lkb->lkb_lvbseq = ms->m_lvbseq;
2046         }
2047 }
2048
2049 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2050    remove_lock -- used for unlock, removes lkb from granted
2051    revert_lock -- used for cancel, moves lkb from convert to granted
2052    grant_lock  -- used for request and convert, adds lkb to granted or
2053                   moves lkb from convert or waiting to granted
2054
2055    Each of these is used for master or local copy lkb's.  There is
2056    also a _pc() variation used to make the corresponding change on
2057    a process copy (pc) lkb. */
2058
2059 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2060 {
2061         del_lkb(r, lkb);
2062         lkb->lkb_grmode = DLM_LOCK_IV;
2063         /* this unhold undoes the original ref from create_lkb()
2064            so this leads to the lkb being freed */
2065         unhold_lkb(lkb);
2066 }
2067
2068 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2069 {
2070         set_lvb_unlock(r, lkb);
2071         _remove_lock(r, lkb);
2072 }
2073
2074 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075 {
2076         _remove_lock(r, lkb);
2077 }
2078
2079 /* returns: 0 did nothing
2080             1 moved lock to granted
2081            -1 removed lock */
2082
2083 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2084 {
2085         int rv = 0;
2086
2087         lkb->lkb_rqmode = DLM_LOCK_IV;
2088
2089         switch (lkb->lkb_status) {
2090         case DLM_LKSTS_GRANTED:
2091                 break;
2092         case DLM_LKSTS_CONVERT:
2093                 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2094                 rv = 1;
2095                 break;
2096         case DLM_LKSTS_WAITING:
2097                 del_lkb(r, lkb);
2098                 lkb->lkb_grmode = DLM_LOCK_IV;
2099                 /* this unhold undoes the original ref from create_lkb()
2100                    so this leads to the lkb being freed */
2101                 unhold_lkb(lkb);
2102                 rv = -1;
2103                 break;
2104         default:
2105                 log_print("invalid status for revert %d", lkb->lkb_status);
2106         }
2107         return rv;
2108 }
2109
2110 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2111 {
2112         return revert_lock(r, lkb);
2113 }
2114
2115 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2116 {
2117         if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2118                 lkb->lkb_grmode = lkb->lkb_rqmode;
2119                 if (lkb->lkb_status)
2120                         move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2121                 else
2122                         add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2123         }
2124
2125         lkb->lkb_rqmode = DLM_LOCK_IV;
2126         lkb->lkb_highbast = 0;
2127 }
2128
2129 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2130 {
2131         set_lvb_lock(r, lkb);
2132         _grant_lock(r, lkb);
2133 }
2134
2135 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2136                           struct dlm_message *ms)
2137 {
2138         set_lvb_lock_pc(r, lkb, ms);
2139         _grant_lock(r, lkb);
2140 }
2141
2142 /* called by grant_pending_locks() which means an async grant message must
2143    be sent to the requesting node in addition to granting the lock if the
2144    lkb belongs to a remote node. */
2145
2146 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2147 {
2148         grant_lock(r, lkb);
2149         if (is_master_copy(lkb))
2150                 send_grant(r, lkb);
2151         else
2152                 queue_cast(r, lkb, 0);
2153 }
2154
2155 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2156    change the granted/requested modes.  We're munging things accordingly in
2157    the process copy.
2158    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2159    conversion deadlock
2160    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2161    compatible with other granted locks */
2162
2163 static void munge_demoted(struct dlm_lkb *lkb)
2164 {
2165         if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2166                 log_print("munge_demoted %x invalid modes gr %d rq %d",
2167                           lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2168                 return;
2169         }
2170
2171         lkb->lkb_grmode = DLM_LOCK_NL;
2172 }
2173
2174 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
2175 {
2176         if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
2177             ms->m_type != DLM_MSG_GRANT) {
2178                 log_print("munge_altmode %x invalid reply type %d",
2179                           lkb->lkb_id, ms->m_type);
2180                 return;
2181         }
2182
2183         if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2184                 lkb->lkb_rqmode = DLM_LOCK_PR;
2185         else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2186                 lkb->lkb_rqmode = DLM_LOCK_CW;
2187         else {
2188                 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2189                 dlm_print_lkb(lkb);
2190         }
2191 }
2192
2193 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2194 {
2195         struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2196                                            lkb_statequeue);
2197         if (lkb->lkb_id == first->lkb_id)
2198                 return 1;
2199
2200         return 0;
2201 }
2202
2203 /* Check if the given lkb conflicts with another lkb on the queue. */
2204
2205 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2206 {
2207         struct dlm_lkb *this;
2208
2209         list_for_each_entry(this, head, lkb_statequeue) {
2210                 if (this == lkb)
2211                         continue;
2212                 if (!modes_compat(this, lkb))
2213                         return 1;
2214         }
2215         return 0;
2216 }
2217
2218 /*
2219  * "A conversion deadlock arises with a pair of lock requests in the converting
2220  * queue for one resource.  The granted mode of each lock blocks the requested
2221  * mode of the other lock."
2222  *
2223  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2224  * convert queue from being granted, then deadlk/demote lkb.
2225  *
2226  * Example:
2227  * Granted Queue: empty
2228  * Convert Queue: NL->EX (first lock)
2229  *                PR->EX (second lock)
2230  *
2231  * The first lock can't be granted because of the granted mode of the second
2232  * lock and the second lock can't be granted because it's not first in the
2233  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2234  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2235  * flag set and return DEMOTED in the lksb flags.
2236  *
2237  * Originally, this function detected conv-deadlk in a more limited scope:
2238  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2239  * - if lkb1 was the first entry in the queue (not just earlier), and was
2240  *   blocked by the granted mode of lkb2, and there was nothing on the
2241  *   granted queue preventing lkb1 from being granted immediately, i.e.
2242  *   lkb2 was the only thing preventing lkb1 from being granted.
2243  *
2244  * That second condition meant we'd only say there was conv-deadlk if
2245  * resolving it (by demotion) would lead to the first lock on the convert
2246  * queue being granted right away.  It allowed conversion deadlocks to exist
2247  * between locks on the convert queue while they couldn't be granted anyway.
2248  *
2249  * Now, we detect and take action on conversion deadlocks immediately when
2250  * they're created, even if they may not be immediately consequential.  If
2251  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2252  * mode that would prevent lkb1's conversion from being granted, we do a
2253  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2254  * I think this means that the lkb_is_ahead condition below should always
2255  * be zero, i.e. there will never be conv-deadlk between two locks that are
2256  * both already on the convert queue.
2257  */
2258
2259 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2260 {
2261         struct dlm_lkb *lkb1;
2262         int lkb_is_ahead = 0;
2263
2264         list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2265                 if (lkb1 == lkb2) {
2266                         lkb_is_ahead = 1;
2267                         continue;
2268                 }
2269
2270                 if (!lkb_is_ahead) {
2271                         if (!modes_compat(lkb2, lkb1))
2272                                 return 1;
2273                 } else {
2274                         if (!modes_compat(lkb2, lkb1) &&
2275                             !modes_compat(lkb1, lkb2))
2276                                 return 1;
2277                 }
2278         }
2279         return 0;
2280 }
2281
2282 /*
2283  * Return 1 if the lock can be granted, 0 otherwise.
2284  * Also detect and resolve conversion deadlocks.
2285  *
2286  * lkb is the lock to be granted
2287  *
2288  * now is 1 if the function is being called in the context of the
2289  * immediate request, it is 0 if called later, after the lock has been
2290  * queued.
2291  *
2292  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2293  * after recovery.
2294  *
2295  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2296  */
2297
2298 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2299                            int recover)
2300 {
2301         int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2302
2303         /*
2304          * 6-10: Version 5.4 introduced an option to address the phenomenon of
2305          * a new request for a NL mode lock being blocked.
2306          *
2307          * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2308          * request, then it would be granted.  In essence, the use of this flag
2309          * tells the Lock Manager to expedite theis request by not considering
2310          * what may be in the CONVERTING or WAITING queues...  As of this
2311          * writing, the EXPEDITE flag can be used only with new requests for NL
2312          * mode locks.  This flag is not valid for conversion requests.
2313          *
2314          * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2315          * conversion or used with a non-NL requested mode.  We also know an
2316          * EXPEDITE request is always granted immediately, so now must always
2317          * be 1.  The full condition to grant an expedite request: (now &&
2318          * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2319          * therefore be shortened to just checking the flag.
2320          */
2321
2322         if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2323                 return 1;
2324
2325         /*
2326          * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2327          * added to the remaining conditions.
2328          */
2329
2330         if (queue_conflict(&r->res_grantqueue, lkb))
2331                 return 0;
2332
2333         /*
2334          * 6-3: By default, a conversion request is immediately granted if the
2335          * requested mode is compatible with the modes of all other granted
2336          * locks
2337          */
2338
2339         if (queue_conflict(&r->res_convertqueue, lkb))
2340                 return 0;
2341
2342         /*
2343          * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2344          * locks for a recovered rsb, on which lkb's have been rebuilt.
2345          * The lkb's may have been rebuilt on the queues in a different
2346          * order than they were in on the previous master.  So, granting
2347          * queued conversions in order after recovery doesn't make sense
2348          * since the order hasn't been preserved anyway.  The new order
2349          * could also have created a new "in place" conversion deadlock.
2350          * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2351          * After recovery, there would be no granted locks, and possibly
2352          * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2353          * recovery, grant conversions without considering order.
2354          */
2355
2356         if (conv && recover)
2357                 return 1;
2358
2359         /*
2360          * 6-5: But the default algorithm for deciding whether to grant or
2361          * queue conversion requests does not by itself guarantee that such
2362          * requests are serviced on a "first come first serve" basis.  This, in
2363          * turn, can lead to a phenomenon known as "indefinate postponement".
2364          *
2365          * 6-7: This issue is dealt with by using the optional QUECVT flag with
2366          * the system service employed to request a lock conversion.  This flag
2367          * forces certain conversion requests to be queued, even if they are
2368          * compatible with the granted modes of other locks on the same
2369          * resource.  Thus, the use of this flag results in conversion requests
2370          * being ordered on a "first come first servce" basis.
2371          *
2372          * DCT: This condition is all about new conversions being able to occur
2373          * "in place" while the lock remains on the granted queue (assuming
2374          * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2375          * doesn't _have_ to go onto the convert queue where it's processed in
2376          * order.  The "now" variable is necessary to distinguish converts
2377          * being received and processed for the first time now, because once a
2378          * convert is moved to the conversion queue the condition below applies
2379          * requiring fifo granting.
2380          */
2381
2382         if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2383                 return 1;
2384
2385         /*
2386          * Even if the convert is compat with all granted locks,
2387          * QUECVT forces it behind other locks on the convert queue.
2388          */
2389
2390         if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2391                 if (list_empty(&r->res_convertqueue))
2392                         return 1;
2393                 else
2394                         return 0;
2395         }
2396
2397         /*
2398          * The NOORDER flag is set to avoid the standard vms rules on grant
2399          * order.
2400          */
2401
2402         if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2403                 return 1;
2404
2405         /*
2406          * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2407          * granted until all other conversion requests ahead of it are granted
2408          * and/or canceled.
2409          */
2410
2411         if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2412                 return 1;
2413
2414         /*
2415          * 6-4: By default, a new request is immediately granted only if all
2416          * three of the following conditions are satisfied when the request is
2417          * issued:
2418          * - The queue of ungranted conversion requests for the resource is
2419          *   empty.
2420          * - The queue of ungranted new requests for the resource is empty.
2421          * - The mode of the new request is compatible with the most
2422          *   restrictive mode of all granted locks on the resource.
2423          */
2424
2425         if (now && !conv && list_empty(&r->res_convertqueue) &&
2426             list_empty(&r->res_waitqueue))
2427                 return 1;
2428
2429         /*
2430          * 6-4: Once a lock request is in the queue of ungranted new requests,
2431          * it cannot be granted until the queue of ungranted conversion
2432          * requests is empty, all ungranted new requests ahead of it are
2433          * granted and/or canceled, and it is compatible with the granted mode
2434          * of the most restrictive lock granted on the resource.
2435          */
2436
2437         if (!now && !conv && list_empty(&r->res_convertqueue) &&
2438             first_in_list(lkb, &r->res_waitqueue))
2439                 return 1;
2440
2441         return 0;
2442 }
2443
2444 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2445                           int recover, int *err)
2446 {
2447         int rv;
2448         int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2449         int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2450
2451         if (err)
2452                 *err = 0;
2453
2454         rv = _can_be_granted(r, lkb, now, recover);
2455         if (rv)
2456                 goto out;
2457
2458         /*
2459          * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2460          * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2461          * cancels one of the locks.
2462          */
2463
2464         if (is_convert && can_be_queued(lkb) &&
2465             conversion_deadlock_detect(r, lkb)) {
2466                 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2467                         lkb->lkb_grmode = DLM_LOCK_NL;
2468                         lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
2469                 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2470                         if (err)
2471                                 *err = -EDEADLK;
2472                         else {
2473                                 log_print("can_be_granted deadlock %x now %d",
2474                                           lkb->lkb_id, now);
2475                                 dlm_dump_rsb(r);
2476                         }
2477                 }
2478                 goto out;
2479         }
2480
2481         /*
2482          * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2483          * to grant a request in a mode other than the normal rqmode.  It's a
2484          * simple way to provide a big optimization to applications that can
2485          * use them.
2486          */
2487
2488         if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2489                 alt = DLM_LOCK_PR;
2490         else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2491                 alt = DLM_LOCK_CW;
2492
2493         if (alt) {
2494                 lkb->lkb_rqmode = alt;
2495                 rv = _can_be_granted(r, lkb, now, 0);
2496                 if (rv)
2497                         lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
2498                 else
2499                         lkb->lkb_rqmode = rqmode;
2500         }
2501  out:
2502         return rv;
2503 }
2504
2505 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
2506    for locks pending on the convert list.  Once verified (watch for these
2507    log_prints), we should be able to just call _can_be_granted() and not
2508    bother with the demote/deadlk cases here (and there's no easy way to deal
2509    with a deadlk here, we'd have to generate something like grant_lock with
2510    the deadlk error.) */
2511
2512 /* Returns the highest requested mode of all blocked conversions; sets
2513    cw if there's a blocked conversion to DLM_LOCK_CW. */
2514
2515 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2516                                  unsigned int *count)
2517 {
2518         struct dlm_lkb *lkb, *s;
2519         int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2520         int hi, demoted, quit, grant_restart, demote_restart;
2521         int deadlk;
2522
2523         quit = 0;
2524  restart:
2525         grant_restart = 0;
2526         demote_restart = 0;
2527         hi = DLM_LOCK_IV;
2528
2529         list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2530                 demoted = is_demoted(lkb);
2531                 deadlk = 0;
2532
2533                 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2534                         grant_lock_pending(r, lkb);
2535                         grant_restart = 1;
2536                         if (count)
2537                                 (*count)++;
2538                         continue;
2539                 }
2540
2541                 if (!demoted && is_demoted(lkb)) {
2542                         log_print("WARN: pending demoted %x node %d %s",
2543                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2544                         demote_restart = 1;
2545                         continue;
2546                 }
2547
2548                 if (deadlk) {
2549                         log_print("WARN: pending deadlock %x node %d %s",
2550                                   lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2551                         dlm_dump_rsb(r);
2552                         continue;
2553                 }
2554
2555                 hi = max_t(int, lkb->lkb_rqmode, hi);
2556
2557                 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2558                         *cw = 1;
2559         }
2560
2561         if (grant_restart)
2562                 goto restart;
2563         if (demote_restart && !quit) {
2564                 quit = 1;
2565                 goto restart;
2566         }
2567
2568         return max_t(int, high, hi);
2569 }
2570
2571 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2572                               unsigned int *count)
2573 {
2574         struct dlm_lkb *lkb, *s;
2575
2576         list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2577                 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2578                         grant_lock_pending(r, lkb);
2579                         if (count)
2580                                 (*count)++;
2581                 } else {
2582                         high = max_t(int, lkb->lkb_rqmode, high);
2583                         if (lkb->lkb_rqmode == DLM_LOCK_CW)
2584                                 *cw = 1;
2585                 }
2586         }
2587
2588         return high;
2589 }
2590
2591 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2592    on either the convert or waiting queue.
2593    high is the largest rqmode of all locks blocked on the convert or
2594    waiting queue. */
2595
2596 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2597 {
2598         if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2599                 if (gr->lkb_highbast < DLM_LOCK_EX)
2600                         return 1;
2601                 return 0;
2602         }
2603
2604         if (gr->lkb_highbast < high &&
2605             !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2606                 return 1;
2607         return 0;
2608 }
2609
2610 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2611 {
2612         struct dlm_lkb *lkb, *s;
2613         int high = DLM_LOCK_IV;
2614         int cw = 0;
2615
2616         if (!is_master(r)) {
2617                 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2618                 dlm_dump_rsb(r);
2619                 return;
2620         }
2621
2622         high = grant_pending_convert(r, high, &cw, count);
2623         high = grant_pending_wait(r, high, &cw, count);
2624
2625         if (high == DLM_LOCK_IV)
2626                 return;
2627
2628         /*
2629          * If there are locks left on the wait/convert queue then send blocking
2630          * ASTs to granted locks based on the largest requested mode (high)
2631          * found above.
2632          */
2633
2634         list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2635                 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2636                         if (cw && high == DLM_LOCK_PR &&
2637                             lkb->lkb_grmode == DLM_LOCK_PR)
2638                                 queue_bast(r, lkb, DLM_LOCK_CW);
2639                         else
2640                                 queue_bast(r, lkb, high);
2641                         lkb->lkb_highbast = high;
2642                 }
2643         }
2644 }
2645
2646 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2647 {
2648         if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2649             (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2650                 if (gr->lkb_highbast < DLM_LOCK_EX)
2651                         return 1;
2652                 return 0;
2653         }
2654
2655         if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2656                 return 1;
2657         return 0;
2658 }
2659
2660 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2661                             struct dlm_lkb *lkb)
2662 {
2663         struct dlm_lkb *gr;
2664
2665         list_for_each_entry(gr, head, lkb_statequeue) {
2666                 /* skip self when sending basts to convertqueue */
2667                 if (gr == lkb)
2668                         continue;
2669                 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2670                         queue_bast(r, gr, lkb->lkb_rqmode);
2671                         gr->lkb_highbast = lkb->lkb_rqmode;
2672                 }
2673         }
2674 }
2675
2676 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2677 {
2678         send_bast_queue(r, &r->res_grantqueue, lkb);
2679 }
2680
2681 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2682 {
2683         send_bast_queue(r, &r->res_grantqueue, lkb);
2684         send_bast_queue(r, &r->res_convertqueue, lkb);
2685 }
2686
2687 /* set_master(r, lkb) -- set the master nodeid of a resource
2688
2689    The purpose of this function is to set the nodeid field in the given
2690    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2691    known, it can just be copied to the lkb and the function will return
2692    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2693    before it can be copied to the lkb.
2694
2695    When the rsb nodeid is being looked up remotely, the initial lkb
2696    causing the lookup is kept on the ls_waiters list waiting for the
2697    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2698    on the rsb's res_lookup list until the master is verified.
2699
2700    Return values:
2701    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2702    1: the rsb master is not available and the lkb has been placed on
2703       a wait queue
2704 */
2705
2706 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2707 {
2708         int our_nodeid = dlm_our_nodeid();
2709
2710         if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2711                 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2712                 r->res_first_lkid = lkb->lkb_id;
2713                 lkb->lkb_nodeid = r->res_nodeid;
2714                 return 0;
2715         }
2716
2717         if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2718                 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2719                 return 1;
2720         }
2721
2722         if (r->res_master_nodeid == our_nodeid) {
2723                 lkb->lkb_nodeid = 0;
2724                 return 0;
2725         }
2726
2727         if (r->res_master_nodeid) {
2728                 lkb->lkb_nodeid = r->res_master_nodeid;
2729                 return 0;
2730         }
2731
2732         if (dlm_dir_nodeid(r) == our_nodeid) {
2733                 /* This is a somewhat unusual case; find_rsb will usually
2734                    have set res_master_nodeid when dir nodeid is local, but
2735                    there are cases where we become the dir node after we've
2736                    past find_rsb and go through _request_lock again.
2737                    confirm_master() or process_lookup_list() needs to be
2738                    called after this. */
2739                 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2740                           lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2741                           r->res_name);
2742                 r->res_master_nodeid = our_nodeid;
2743                 r->res_nodeid = 0;
2744                 lkb->lkb_nodeid = 0;
2745                 return 0;
2746         }
2747
2748         wait_pending_remove(r);
2749
2750         r->res_first_lkid = lkb->lkb_id;
2751         send_lookup(r, lkb);
2752         return 1;
2753 }
2754
2755 static void process_lookup_list(struct dlm_rsb *r)
2756 {
2757         struct dlm_lkb *lkb, *safe;
2758
2759         list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2760                 list_del_init(&lkb->lkb_rsb_lookup);
2761                 _request_lock(r, lkb);
2762                 schedule();
2763         }
2764 }
2765
2766 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2767
2768 static void confirm_master(struct dlm_rsb *r, int error)
2769 {
2770         struct dlm_lkb *lkb;
2771
2772         if (!r->res_first_lkid)
2773                 return;
2774
2775         switch (error) {
2776         case 0:
2777         case -EINPROGRESS:
2778                 r->res_first_lkid = 0;
2779                 process_lookup_list(r);
2780                 break;
2781
2782         case -EAGAIN:
2783         case -EBADR:
2784         case -ENOTBLK:
2785                 /* the remote request failed and won't be retried (it was
2786                    a NOQUEUE, or has been canceled/unlocked); make a waiting
2787                    lkb the first_lkid */
2788
2789                 r->res_first_lkid = 0;
2790
2791                 if (!list_empty(&r->res_lookup)) {
2792                         lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2793                                          lkb_rsb_lookup);
2794                         list_del_init(&lkb->lkb_rsb_lookup);
2795                         r->res_first_lkid = lkb->lkb_id;
2796                         _request_lock(r, lkb);
2797                 }
2798                 break;
2799
2800         default:
2801                 log_error(r->res_ls, "confirm_master unknown error %d", error);
2802         }
2803 }
2804
2805 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2806                          int namelen, unsigned long timeout_cs,
2807                          void (*ast) (void *astparam),
2808                          void *astparam,
2809                          void (*bast) (void *astparam, int mode),
2810                          struct dlm_args *args)
2811 {
2812         int rv = -EINVAL;
2813
2814         /* check for invalid arg usage */
2815
2816         if (mode < 0 || mode > DLM_LOCK_EX)
2817                 goto out;
2818
2819         if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2820                 goto out;
2821
2822         if (flags & DLM_LKF_CANCEL)
2823                 goto out;
2824
2825         if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2826                 goto out;
2827
2828         if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2829                 goto out;
2830
2831         if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2832                 goto out;
2833
2834         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2835                 goto out;
2836
2837         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2838                 goto out;
2839
2840         if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2841                 goto out;
2842
2843         if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2844                 goto out;
2845
2846         if (!ast || !lksb)
2847                 goto out;
2848
2849         if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2850                 goto out;
2851
2852         if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2853                 goto out;
2854
2855         /* these args will be copied to the lkb in validate_lock_args,
2856            it cannot be done now because when converting locks, fields in
2857            an active lkb cannot be modified before locking the rsb */
2858
2859         args->flags = flags;
2860         args->astfn = ast;
2861         args->astparam = astparam;
2862         args->bastfn = bast;
2863         args->timeout = timeout_cs;
2864         args->mode = mode;
2865         args->lksb = lksb;
2866         rv = 0;
2867  out:
2868         return rv;
2869 }
2870
2871 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2872 {
2873         if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2874                       DLM_LKF_FORCEUNLOCK))
2875                 return -EINVAL;
2876
2877         if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2878                 return -EINVAL;
2879
2880         args->flags = flags;
2881         args->astparam = astarg;
2882         return 0;
2883 }
2884
2885 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2886                               struct dlm_args *args)
2887 {
2888         int rv = -EINVAL;
2889
2890         if (args->flags & DLM_LKF_CONVERT) {
2891                 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2892                         goto out;
2893
2894                 if (args->flags & DLM_LKF_QUECVT &&
2895                     !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2896                         goto out;
2897
2898                 rv = -EBUSY;
2899                 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2900                         goto out;
2901
2902                 if (lkb->lkb_wait_type)
2903                         goto out;
2904
2905                 if (is_overlap(lkb))
2906                         goto out;
2907         }
2908
2909         lkb->lkb_exflags = args->flags;
2910         lkb->lkb_sbflags = 0;
2911         lkb->lkb_astfn = args->astfn;
2912         lkb->lkb_astparam = args->astparam;
2913         lkb->lkb_bastfn = args->bastfn;
2914         lkb->lkb_rqmode = args->mode;
2915         lkb->lkb_lksb = args->lksb;
2916         lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2917         lkb->lkb_ownpid = (int) current->pid;
2918         lkb->lkb_timeout_cs = args->timeout;
2919         rv = 0;
2920  out:
2921         if (rv)
2922                 log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2923                           rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2924                           lkb->lkb_status, lkb->lkb_wait_type,
2925                           lkb->lkb_resource->res_name);
2926         return rv;
2927 }
2928
2929 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2930    for success */
2931
2932 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2933    because there may be a lookup in progress and it's valid to do
2934    cancel/unlockf on it */
2935
2936 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2937 {
2938         struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2939         int rv = -EINVAL;
2940
2941         if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2942                 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2943                 dlm_print_lkb(lkb);
2944                 goto out;
2945         }
2946
2947         /* an lkb may still exist even though the lock is EOL'ed due to a
2948            cancel, unlock or failed noqueue request; an app can't use these
2949            locks; return same error as if the lkid had not been found at all */
2950
2951         if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2952                 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2953                 rv = -ENOENT;
2954                 goto out;
2955         }
2956
2957         /* an lkb may be waiting for an rsb lookup to complete where the
2958            lookup was initiated by another lock */
2959
2960         if (!list_empty(&lkb->lkb_rsb_lookup)) {
2961                 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2962                         log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2963                         list_del_init(&lkb->lkb_rsb_lookup);
2964                         queue_cast(lkb->lkb_resource, lkb,
2965                                    args->flags & DLM_LKF_CANCEL ?
2966                                    -DLM_ECANCEL : -DLM_EUNLOCK);
2967                         unhold_lkb(lkb); /* undoes create_lkb() */
2968                 }
2969                 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2970                 rv = -EBUSY;
2971                 goto out;
2972         }
2973
2974         /* cancel not allowed with another cancel/unlock in progress */
2975
2976         if (args->flags & DLM_LKF_CANCEL) {
2977                 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2978                         goto out;
2979
2980                 if (is_overlap(lkb))
2981                         goto out;
2982
2983                 /* don't let scand try to do a cancel */
2984                 del_timeout(lkb);
2985
2986                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2987                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2988                         rv = -EBUSY;
2989                         goto out;
2990                 }
2991
2992                 /* there's nothing to cancel */
2993                 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2994                     !lkb->lkb_wait_type) {
2995                         rv = -EBUSY;
2996                         goto out;
2997                 }
2998
2999                 switch (lkb->lkb_wait_type) {
3000                 case DLM_MSG_LOOKUP:
3001                 case DLM_MSG_REQUEST:
3002                         lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
3003                         rv = -EBUSY;
3004                         goto out;
3005                 case DLM_MSG_UNLOCK:
3006                 case DLM_MSG_CANCEL:
3007                         goto out;
3008                 }
3009                 /* add_to_waiters() will set OVERLAP_CANCEL */
3010                 goto out_ok;
3011         }
3012
3013         /* do we need to allow a force-unlock if there's a normal unlock
3014            already in progress?  in what conditions could the normal unlock
3015            fail such that we'd want to send a force-unlock to be sure? */
3016
3017         if (args->flags & DLM_LKF_FORCEUNLOCK) {
3018                 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
3019                         goto out;
3020
3021                 if (is_overlap_unlock(lkb))
3022                         goto out;
3023
3024                 /* don't let scand try to do a cancel */
3025                 del_timeout(lkb);
3026
3027                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
3028                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3029                         rv = -EBUSY;
3030                         goto out;
3031                 }
3032
3033                 switch (lkb->lkb_wait_type) {
3034                 case DLM_MSG_LOOKUP:
3035                 case DLM_MSG_REQUEST:
3036                         lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
3037                         rv = -EBUSY;
3038                         goto out;
3039                 case DLM_MSG_UNLOCK:
3040                         goto out;
3041                 }
3042                 /* add_to_waiters() will set OVERLAP_UNLOCK */
3043                 goto out_ok;
3044         }
3045
3046         /* normal unlock not allowed if there's any op in progress */
3047         rv = -EBUSY;
3048         if (lkb->lkb_wait_type || lkb->lkb_wait_count)
3049                 goto out;
3050
3051  out_ok:
3052         /* an overlapping op shouldn't blow away exflags from other op */
3053         lkb->lkb_exflags |= args->flags;
3054         lkb->lkb_sbflags = 0;
3055         lkb->lkb_astparam = args->astparam;
3056         rv = 0;
3057  out:
3058         if (rv)
3059                 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
3060                           lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
3061                           args->flags, lkb->lkb_wait_type,
3062                           lkb->lkb_resource->res_name);
3063         return rv;
3064 }
3065
3066 /*
3067  * Four stage 4 varieties:
3068  * do_request(), do_convert(), do_unlock(), do_cancel()
3069  * These are called on the master node for the given lock and
3070  * from the central locking logic.
3071  */
3072
3073 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3074 {
3075         int error = 0;
3076
3077         if (can_be_granted(r, lkb, 1, 0, NULL)) {
3078                 grant_lock(r, lkb);
3079                 queue_cast(r, lkb, 0);
3080                 goto out;
3081         }
3082
3083         if (can_be_queued(lkb)) {
3084                 error = -EINPROGRESS;
3085                 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3086                 add_timeout(lkb);
3087                 goto out;
3088         }
3089
3090         error = -EAGAIN;
3091         queue_cast(r, lkb, -EAGAIN);
3092  out:
3093         return error;
3094 }
3095
3096 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3097                                int error)
3098 {
3099         switch (error) {
3100         case -EAGAIN:
3101                 if (force_blocking_asts(lkb))
3102                         send_blocking_asts_all(r, lkb);
3103                 break;
3104         case -EINPROGRESS:
3105                 send_blocking_asts(r, lkb);
3106                 break;
3107         }
3108 }
3109
3110 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3111 {
3112         int error = 0;
3113         int deadlk = 0;
3114
3115         /* changing an existing lock may allow others to be granted */
3116
3117         if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3118                 grant_lock(r, lkb);
3119                 queue_cast(r, lkb, 0);
3120                 goto out;
3121         }
3122
3123         /* can_be_granted() detected that this lock would block in a conversion
3124            deadlock, so we leave it on the granted queue and return EDEADLK in
3125            the ast for the convert. */
3126
3127         if (deadlk) {
3128                 /* it's left on the granted queue */
3129                 revert_lock(r, lkb);
3130                 queue_cast(r, lkb, -EDEADLK);
3131                 error = -EDEADLK;
3132                 goto out;
3133         }
3134
3135         /* is_demoted() means the can_be_granted() above set the grmode
3136            to NL, and left us on the granted queue.  This auto-demotion
3137            (due to CONVDEADLK) might mean other locks, and/or this lock, are
3138            now grantable.  We have to try to grant other converting locks
3139            before we try again to grant this one. */
3140
3141         if (is_demoted(lkb)) {
3142                 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3143                 if (_can_be_granted(r, lkb, 1, 0)) {
3144                         grant_lock(r, lkb);
3145                         queue_cast(r, lkb, 0);
3146                         goto out;
3147                 }
3148                 /* else fall through and move to convert queue */
3149         }
3150
3151         if (can_be_queued(lkb)) {
3152                 error = -EINPROGRESS;
3153                 del_lkb(r, lkb);
3154                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3155                 add_timeout(lkb);
3156                 goto out;
3157         }
3158
3159         error = -EAGAIN;
3160         queue_cast(r, lkb, -EAGAIN);
3161  out:
3162         return error;
3163 }
3164
3165 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166                                int error)
3167 {
3168         switch (error) {
3169         case 0:
3170                 grant_pending_locks(r, NULL);
3171                 /* grant_pending_locks also sends basts */
3172                 break;
3173         case -EAGAIN:
3174                 if (force_blocking_asts(lkb))
3175                         send_blocking_asts_all(r, lkb);
3176                 break;
3177         case -EINPROGRESS:
3178                 send_blocking_asts(r, lkb);
3179                 break;
3180         }
3181 }
3182
3183 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3184 {
3185         remove_lock(r, lkb);
3186         queue_cast(r, lkb, -DLM_EUNLOCK);
3187         return -DLM_EUNLOCK;
3188 }
3189
3190 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3191                               int error)
3192 {
3193         grant_pending_locks(r, NULL);
3194 }
3195
3196 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3197
3198 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3199 {
3200         int error;
3201
3202         error = revert_lock(r, lkb);
3203         if (error) {
3204                 queue_cast(r, lkb, -DLM_ECANCEL);
3205                 return -DLM_ECANCEL;
3206         }
3207         return 0;
3208 }
3209
3210 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3211                               int error)
3212 {
3213         if (error)
3214                 grant_pending_locks(r, NULL);
3215 }
3216
3217 /*
3218  * Four stage 3 varieties:
3219  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3220  */
3221
3222 /* add a new lkb to a possibly new rsb, called by requesting process */
3223
3224 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3225 {
3226         int error;
3227
3228         /* set_master: sets lkb nodeid from r */
3229
3230         error = set_master(r, lkb);
3231         if (error < 0)
3232                 goto out;
3233         if (error) {
3234                 error = 0;
3235                 goto out;
3236         }
3237
3238         if (is_remote(r)) {
3239                 /* receive_request() calls do_request() on remote node */
3240                 error = send_request(r, lkb);
3241         } else {
3242                 error = do_request(r, lkb);
3243                 /* for remote locks the request_reply is sent
3244                    between do_request and do_request_effects */
3245                 do_request_effects(r, lkb, error);
3246         }
3247  out:
3248         return error;
3249 }
3250
3251 /* change some property of an existing lkb, e.g. mode */
3252
3253 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3254 {
3255         int error;
3256
3257         if (is_remote(r)) {
3258                 /* receive_convert() calls do_convert() on remote node */
3259                 error = send_convert(r, lkb);
3260         } else {
3261                 error = do_convert(r, lkb);
3262                 /* for remote locks the convert_reply is sent
3263                    between do_convert and do_convert_effects */
3264                 do_convert_effects(r, lkb, error);
3265         }
3266
3267         return error;
3268 }
3269
3270 /* remove an existing lkb from the granted queue */
3271
3272 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3273 {
3274         int error;
3275
3276         if (is_remote(r)) {
3277                 /* receive_unlock() calls do_unlock() on remote node */
3278                 error = send_unlock(r, lkb);
3279         } else {
3280                 error = do_unlock(r, lkb);
3281                 /* for remote locks the unlock_reply is sent
3282                    between do_unlock and do_unlock_effects */
3283                 do_unlock_effects(r, lkb, error);
3284         }
3285
3286         return error;
3287 }
3288
3289 /* remove an existing lkb from the convert or wait queue */
3290
3291 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3292 {
3293         int error;
3294
3295         if (is_remote(r)) {
3296                 /* receive_cancel() calls do_cancel() on remote node */
3297                 error = send_cancel(r, lkb);
3298         } else {
3299                 error = do_cancel(r, lkb);
3300                 /* for remote locks the cancel_reply is sent
3301                    between do_cancel and do_cancel_effects */
3302                 do_cancel_effects(r, lkb, error);
3303         }
3304
3305         return error;
3306 }
3307
3308 /*
3309  * Four stage 2 varieties:
3310  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3311  */
3312
3313 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
3314                         int len, struct dlm_args *args)
3315 {
3316         struct dlm_rsb *r;
3317         int error;
3318
3319         error = validate_lock_args(ls, lkb, args);
3320         if (error)
3321                 return error;
3322
3323         error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3324         if (error)
3325                 return error;
3326
3327         lock_rsb(r);
3328
3329         attach_lkb(r, lkb);
3330         lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3331
3332         error = _request_lock(r, lkb);
3333
3334         unlock_rsb(r);
3335         put_rsb(r);
3336         return error;
3337 }
3338
3339 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3340                         struct dlm_args *args)
3341 {
3342         struct dlm_rsb *r;
3343         int error;
3344
3345         r = lkb->lkb_resource;
3346
3347         hold_rsb(r);
3348         lock_rsb(r);
3349
3350         error = validate_lock_args(ls, lkb, args);
3351         if (error)
3352                 goto out;
3353
3354         error = _convert_lock(r, lkb);
3355  out:
3356         unlock_rsb(r);
3357         put_rsb(r);
3358         return error;
3359 }
3360
3361 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3362                        struct dlm_args *args)
3363 {
3364         struct dlm_rsb *r;
3365         int error;
3366
3367         r = lkb->lkb_resource;
3368
3369         hold_rsb(r);
3370         lock_rsb(r);
3371
3372         error = validate_unlock_args(lkb, args);
3373         if (error)
3374                 goto out;
3375
3376         error = _unlock_lock(r, lkb);
3377  out:
3378         unlock_rsb(r);
3379         put_rsb(r);
3380         return error;
3381 }
3382
3383 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3384                        struct dlm_args *args)
3385 {
3386         struct dlm_rsb *r;
3387         int error;
3388
3389         r = lkb->lkb_resource;
3390
3391         hold_rsb(r);
3392         lock_rsb(r);
3393
3394         error = validate_unlock_args(lkb, args);
3395         if (error)
3396                 goto out;
3397
3398         error = _cancel_lock(r, lkb);
3399  out:
3400         unlock_rsb(r);
3401         put_rsb(r);
3402         return error;
3403 }
3404
3405 /*
3406  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3407  */
3408
3409 int dlm_lock(dlm_lockspace_t *lockspace,
3410              int mode,
3411              struct dlm_lksb *lksb,
3412              uint32_t flags,
3413              void *name,
3414              unsigned int namelen,
3415              uint32_t parent_lkid,
3416              void (*ast) (void *astarg),
3417              void *astarg,
3418              void (*bast) (void *astarg, int mode))
3419 {
3420         struct dlm_ls *ls;
3421         struct dlm_lkb *lkb;
3422         struct dlm_args args;
3423         int error, convert = flags & DLM_LKF_CONVERT;
3424
3425         ls = dlm_find_lockspace_local(lockspace);
3426         if (!ls)
3427                 return -EINVAL;
3428
3429         dlm_lock_recovery(ls);
3430
3431         if (convert)
3432                 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3433         else
3434                 error = create_lkb(ls, &lkb);
3435
3436         if (error)
3437                 goto out;
3438
3439         error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
3440                               astarg, bast, &args);
3441         if (error)
3442                 goto out_put;
3443
3444         if (convert)
3445                 error = convert_lock(ls, lkb, &args);
3446         else
3447                 error = request_lock(ls, lkb, name, namelen, &args);
3448
3449         if (error == -EINPROGRESS)
3450                 error = 0;
3451  out_put:
3452         if (convert || error)
3453                 __put_lkb(ls, lkb);
3454         if (error == -EAGAIN || error == -EDEADLK)
3455                 error = 0;
3456  out:
3457         dlm_unlock_recovery(ls);
3458         dlm_put_lockspace(ls);
3459         return error;
3460 }
3461
3462 int dlm_unlock(dlm_lockspace_t *lockspace,
3463                uint32_t lkid,
3464                uint32_t flags,
3465                struct dlm_lksb *lksb,
3466                void *astarg)
3467 {
3468         struct dlm_ls *ls;
3469         struct dlm_lkb *lkb;
3470         struct dlm_args args;
3471         int error;
3472
3473         ls = dlm_find_lockspace_local(lockspace);
3474         if (!ls)
3475                 return -EINVAL;
3476
3477         dlm_lock_recovery(ls);
3478
3479         error = find_lkb(ls, lkid, &lkb);
3480         if (error)
3481                 goto out;
3482
3483         error = set_unlock_args(flags, astarg, &args);
3484         if (error)
3485                 goto out_put;
3486
3487         if (flags & DLM_LKF_CANCEL)
3488                 error = cancel_lock(ls, lkb, &args);
3489         else
3490                 error = unlock_lock(ls, lkb, &args);
3491
3492         if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3493                 error = 0;
3494         if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3495                 error = 0;
3496  out_put:
3497         dlm_put_lkb(lkb);
3498  out:
3499         dlm_unlock_recovery(ls);
3500         dlm_put_lockspace(ls);
3501         return error;
3502 }
3503
3504 /*
3505  * send/receive routines for remote operations and replies
3506  *
3507  * send_args
3508  * send_common
3509  * send_request                 receive_request
3510  * send_convert                 receive_convert
3511  * send_unlock                  receive_unlock
3512  * send_cancel                  receive_cancel
3513  * send_grant                   receive_grant
3514  * send_bast                    receive_bast
3515  * send_lookup                  receive_lookup
3516  * send_remove                  receive_remove
3517  *
3518  *                              send_common_reply
3519  * receive_request_reply        send_request_reply
3520  * receive_convert_reply        send_convert_reply
3521  * receive_unlock_reply         send_unlock_reply
3522  * receive_cancel_reply         send_cancel_reply
3523  * receive_lookup_reply         send_lookup_reply
3524  */
3525
3526 static int _create_message(struct dlm_ls *ls, int mb_len,
3527                            int to_nodeid, int mstype,
3528                            struct dlm_message **ms_ret,
3529                            struct dlm_mhandle **mh_ret)
3530 {
3531         struct dlm_message *ms;
3532         struct dlm_mhandle *mh;
3533         char *mb;
3534
3535         /* get_buffer gives us a message handle (mh) that we need to
3536            pass into lowcomms_commit and a message buffer (mb) that we
3537            write our data into */
3538
3539         mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
3540         if (!mh)
3541                 return -ENOBUFS;
3542
3543         memset(mb, 0, mb_len);
3544
3545         ms = (struct dlm_message *) mb;
3546
3547         ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3548         ms->m_header.h_lockspace = ls->ls_global_id;
3549         ms->m_header.h_nodeid = dlm_our_nodeid();
3550         ms->m_header.h_length = mb_len;
3551         ms->m_header.h_cmd = DLM_MSG;
3552
3553         ms->m_type = mstype;
3554
3555         *mh_ret = mh;
3556         *ms_ret = ms;
3557         return 0;
3558 }
3559
3560 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3561                           int to_nodeid, int mstype,
3562                           struct dlm_message **ms_ret,
3563                           struct dlm_mhandle **mh_ret)
3564 {
3565         int mb_len = sizeof(struct dlm_message);
3566
3567         switch (mstype) {
3568         case DLM_MSG_REQUEST:
3569         case DLM_MSG_LOOKUP:
3570         case DLM_MSG_REMOVE:
3571                 mb_len += r->res_length;
3572                 break;
3573         case DLM_MSG_CONVERT:
3574         case DLM_MSG_UNLOCK:
3575         case DLM_MSG_REQUEST_REPLY:
3576         case DLM_MSG_CONVERT_REPLY:
3577         case DLM_MSG_GRANT:
3578                 if (lkb && lkb->lkb_lvbptr)
3579                         mb_len += r->res_ls->ls_lvblen;
3580                 break;
3581         }
3582
3583         return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3584                                ms_ret, mh_ret);
3585 }
3586
3587 /* further lowcomms enhancements or alternate implementations may make
3588    the return value from this function useful at some point */
3589
3590 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
3591 {
3592         dlm_message_out(ms);
3593         dlm_lowcomms_commit_buffer(mh);
3594         return 0;
3595 }
3596
3597 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3598                       struct dlm_message *ms)
3599 {
3600         ms->m_nodeid   = lkb->lkb_nodeid;
3601         ms->m_pid      = lkb->lkb_ownpid;
3602         ms->m_lkid     = lkb->lkb_id;
3603         ms->m_remid    = lkb->lkb_remid;
3604         ms->m_exflags  = lkb->lkb_exflags;
3605         ms->m_sbflags  = lkb->lkb_sbflags;
3606         ms->m_flags    = lkb->lkb_flags;
3607         ms->m_lvbseq   = lkb->lkb_lvbseq;
3608         ms->m_status   = lkb->lkb_status;
3609         ms->m_grmode   = lkb->lkb_grmode;
3610         ms->m_rqmode   = lkb->lkb_rqmode;
3611         ms->m_hash     = r->res_hash;
3612
3613         /* m_result and m_bastmode are set from function args,
3614            not from lkb fields */
3615
3616         if (lkb->lkb_bastfn)
3617                 ms->m_asts |= DLM_CB_BAST;
3618         if (lkb->lkb_astfn)
3619                 ms->m_asts |= DLM_CB_CAST;
3620
3621         /* compare with switch in create_message; send_remove() doesn't
3622            use send_args() */
3623
3624         switch (ms->m_type) {
3625         case DLM_MSG_REQUEST:
3626         case DLM_MSG_LOOKUP:
3627                 memcpy(ms->m_extra, r->res_name, r->res_length);
3628                 break;
3629         case DLM_MSG_CONVERT:
3630         case DLM_MSG_UNLOCK:
3631         case DLM_MSG_REQUEST_REPLY:
3632         case DLM_MSG_CONVERT_REPLY:
3633         case DLM_MSG_GRANT:
3634                 if (!lkb->lkb_lvbptr)
3635                         break;
3636                 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3637                 break;
3638         }
3639 }
3640
3641 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3642 {
3643         struct dlm_message *ms;
3644         struct dlm_mhandle *mh;
3645         int to_nodeid, error;
3646
3647         to_nodeid = r->res_nodeid;
3648
3649         error = add_to_waiters(lkb, mstype, to_nodeid);
3650         if (error)
3651                 return error;
3652
3653         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3654         if (error)
3655                 goto fail;
3656
3657         send_args(r, lkb, ms);
3658
3659         error = send_message(mh, ms);
3660         if (error)
3661                 goto fail;
3662         return 0;
3663
3664  fail:
3665         remove_from_waiters(lkb, msg_reply_type(mstype));
3666         return error;
3667 }
3668
3669 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3670 {
3671         return send_common(r, lkb, DLM_MSG_REQUEST);
3672 }
3673
3674 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3675 {
3676         int error;
3677
3678         error = send_common(r, lkb, DLM_MSG_CONVERT);
3679
3680         /* down conversions go without a reply from the master */
3681         if (!error && down_conversion(lkb)) {
3682                 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3683                 r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
3684                 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3685                 r->res_ls->ls_stub_ms.m_result = 0;
3686                 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
3687         }
3688
3689         return error;
3690 }
3691
3692 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3693    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3694    that the master is still correct. */
3695
3696 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3697 {
3698         return send_common(r, lkb, DLM_MSG_UNLOCK);
3699 }
3700
3701 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3702 {
3703         return send_common(r, lkb, DLM_MSG_CANCEL);
3704 }
3705
3706 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3707 {
3708         struct dlm_message *ms;
3709         struct dlm_mhandle *mh;
3710         int to_nodeid, error;
3711
3712         to_nodeid = lkb->lkb_nodeid;
3713
3714         error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3715         if (error)
3716                 goto out;
3717
3718         send_args(r, lkb, ms);
3719
3720         ms->m_result = 0;
3721
3722         error = send_message(mh, ms);
3723  out:
3724         return error;
3725 }
3726
3727 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3728 {
3729         struct dlm_message *ms;
3730         struct dlm_mhandle *mh;
3731         int to_nodeid, error;
3732
3733         to_nodeid = lkb->lkb_nodeid;
3734
3735         error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3736         if (error)
3737                 goto out;
3738
3739         send_args(r, lkb, ms);
3740
3741         ms->m_bastmode = mode;
3742
3743         error = send_message(mh, ms);
3744  out:
3745         return error;
3746 }
3747
3748 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3749 {
3750         struct dlm_message *ms;
3751         struct dlm_mhandle *mh;
3752         int to_nodeid, error;
3753
3754         to_nodeid = dlm_dir_nodeid(r);
3755
3756         error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3757         if (error)
3758                 return error;
3759
3760         error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3761         if (error)
3762                 goto fail;
3763
3764         send_args(r, lkb, ms);
3765
3766         error = send_message(mh, ms);
3767         if (error)
3768                 goto fail;
3769         return 0;
3770
3771  fail:
3772         remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3773         return error;
3774 }
3775
3776 static int send_remove(struct dlm_rsb *r)
3777 {
3778         struct dlm_message *ms;
3779         struct dlm_mhandle *mh;
3780         int to_nodeid, error;
3781
3782         to_nodeid = dlm_dir_nodeid(r);
3783
3784         error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3785         if (error)
3786                 goto out;
3787
3788         memcpy(ms->m_extra, r->res_name, r->res_length);
3789         ms->m_hash = r->res_hash;
3790
3791         error = send_message(mh, ms);
3792  out:
3793         return error;
3794 }
3795
3796 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3797                              int mstype, int rv)
3798 {
3799         struct dlm_message *ms;
3800         struct dlm_mhandle *mh;
3801         int to_nodeid, error;
3802
3803         to_nodeid = lkb->lkb_nodeid;
3804
3805         error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3806         if (error)
3807                 goto out;
3808
3809         send_args(r, lkb, ms);
3810
3811         ms->m_result = rv;
3812
3813         error = send_message(mh, ms);
3814  out:
3815         return error;
3816 }
3817
3818 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3819 {
3820         return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3821 }
3822
3823 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3824 {
3825         return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3826 }
3827
3828 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3829 {
3830         return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3831 }
3832
3833 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3834 {
3835         return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3836 }
3837
3838 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3839                              int ret_nodeid, int rv)
3840 {
3841         struct dlm_rsb *r = &ls->ls_stub_rsb;
3842         struct dlm_message *ms;
3843         struct dlm_mhandle *mh;
3844         int error, nodeid = ms_in->m_header.h_nodeid;
3845
3846         error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3847         if (error)
3848                 goto out;
3849
3850         ms->m_lkid = ms_in->m_lkid;
3851         ms->m_result = rv;
3852         ms->m_nodeid = ret_nodeid;
3853
3854         error = send_message(mh, ms);
3855  out:
3856         return error;
3857 }
3858
3859 /* which args we save from a received message depends heavily on the type
3860    of message, unlike the send side where we can safely send everything about
3861    the lkb for any type of message */
3862
3863 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3864 {
3865         lkb->lkb_exflags = ms->m_exflags;
3866         lkb->lkb_sbflags = ms->m_sbflags;
3867         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3868                          (ms->m_flags & 0x0000FFFF);
3869 }
3870
3871 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3872 {
3873         if (ms->m_flags == DLM_IFL_STUB_MS)
3874                 return;
3875
3876         lkb->lkb_sbflags = ms->m_sbflags;
3877         lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3878                          (ms->m_flags & 0x0000FFFF);
3879 }
3880
3881 static int receive_extralen(struct dlm_message *ms)
3882 {
3883         return (ms->m_header.h_length - sizeof(struct dlm_message));
3884 }
3885
3886 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3887                        struct dlm_message *ms)
3888 {
3889         int len;
3890
3891         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3892                 if (!lkb->lkb_lvbptr)
3893                         lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3894                 if (!lkb->lkb_lvbptr)
3895                         return -ENOMEM;
3896                 len = receive_extralen(ms);
3897                 if (len > ls->ls_lvblen)
3898                         len = ls->ls_lvblen;
3899                 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3900         }
3901         return 0;
3902 }
3903
3904 static void fake_bastfn(void *astparam, int mode)
3905 {
3906         log_print("fake_bastfn should not be called");
3907 }
3908
3909 static void fake_astfn(void *astparam)
3910 {
3911         log_print("fake_astfn should not be called");
3912 }
3913
3914 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3915                                 struct dlm_message *ms)
3916 {
3917         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3918         lkb->lkb_ownpid = ms->m_pid;
3919         lkb->lkb_remid = ms->m_lkid;
3920         lkb->lkb_grmode = DLM_LOCK_IV;
3921         lkb->lkb_rqmode = ms->m_rqmode;
3922
3923         lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3924         lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3925
3926         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3927                 /* lkb was just created so there won't be an lvb yet */
3928                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3929                 if (!lkb->lkb_lvbptr)
3930                         return -ENOMEM;
3931         }
3932
3933         return 0;
3934 }
3935
3936 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3937                                 struct dlm_message *ms)
3938 {
3939         if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3940                 return -EBUSY;
3941
3942         if (receive_lvb(ls, lkb, ms))
3943                 return -ENOMEM;
3944
3945         lkb->lkb_rqmode = ms->m_rqmode;
3946         lkb->lkb_lvbseq = ms->m_lvbseq;
3947
3948         return 0;
3949 }
3950
3951 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3952                                struct dlm_message *ms)
3953 {
3954         if (receive_lvb(ls, lkb, ms))
3955                 return -ENOMEM;
3956         return 0;
3957 }
3958
3959 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3960    uses to send a reply and that the remote end uses to process the reply. */
3961
3962 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3963 {
3964         struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3965         lkb->lkb_nodeid = ms->m_header.h_nodeid;
3966         lkb->lkb_remid = ms->m_lkid;
3967 }
3968
3969 /* This is called after the rsb is locked so that we can safely inspect
3970    fields in the lkb. */
3971
3972 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3973 {
3974         int from = ms->m_header.h_nodeid;
3975         int error = 0;
3976
3977         /* currently mixing of user/kernel locks are not supported */
3978         if (ms->m_flags & DLM_IFL_USER && ~lkb->lkb_flags & DLM_IFL_USER) {
3979                 log_error(lkb->lkb_resource->res_ls,
3980                           "got user dlm message for a kernel lock");
3981                 error = -EINVAL;
3982                 goto out;
3983         }
3984
3985         switch (ms->m_type) {
3986         case DLM_MSG_CONVERT:
3987         case DLM_MSG_UNLOCK:
3988         case DLM_MSG_CANCEL:
3989                 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3990                         error = -EINVAL;
3991                 break;
3992
3993         case DLM_MSG_CONVERT_REPLY:
3994         case DLM_MSG_UNLOCK_REPLY:
3995         case DLM_MSG_CANCEL_REPLY:
3996         case DLM_MSG_GRANT:
3997         case DLM_MSG_BAST:
3998                 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3999                         error = -EINVAL;
4000                 break;
4001
4002         case DLM_MSG_REQUEST_REPLY:
4003                 if (!is_process_copy(lkb))
4004                         error = -EINVAL;
4005                 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
4006                         error = -EINVAL;
4007                 break;
4008
4009         default:
4010                 error = -EINVAL;
4011         }
4012
4013 out:
4014         if (error)
4015                 log_error(lkb->lkb_resource->res_ls,
4016                           "ignore invalid message %d from %d %x %x %x %d",
4017                           ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
4018                           lkb->lkb_flags, lkb->lkb_nodeid);
4019         return error;
4020 }
4021
4022 static void send_repeat_remove(struct dlm_ls *ls, char *ms_name, int len)
4023 {
4024         char name[DLM_RESNAME_MAXLEN + 1];
4025         struct dlm_message *ms;
4026         struct dlm_mhandle *mh;
4027         struct dlm_rsb *r;
4028         uint32_t hash, b;
4029         int rv, dir_nodeid;
4030
4031         memset(name, 0, sizeof(name));
4032         memcpy(name, ms_name, len);
4033
4034         hash = jhash(name, len, 0);
4035         b = hash & (ls->ls_rsbtbl_size - 1);
4036
4037         dir_nodeid = dlm_hash2nodeid(ls, hash);
4038
4039         log_error(ls, "send_repeat_remove dir %d %s", dir_nodeid, name);
4040
4041         spin_lock(&ls->ls_rsbtbl[b].lock);
4042         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4043         if (!rv) {
4044                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4045                 log_error(ls, "repeat_remove on keep %s", name);
4046                 return;
4047         }
4048
4049         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4050         if (!rv) {
4051                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4052                 log_error(ls, "repeat_remove on toss %s", name);
4053                 return;
4054         }
4055
4056         /* use ls->remove_name2 to avoid conflict with shrink? */
4057
4058         spin_lock(&ls->ls_remove_spin);
4059         ls->ls_remove_len = len;
4060         memcpy(ls->ls_remove_name, name, DLM_RESNAME_MAXLEN);
4061         spin_unlock(&ls->ls_remove_spin);
4062         spin_unlock(&ls->ls_rsbtbl[b].lock);
4063
4064         rv = _create_message(ls, sizeof(struct dlm_message) + len,
4065                              dir_nodeid, DLM_MSG_REMOVE, &ms, &mh);
4066         if (rv)
4067                 return;
4068
4069         memcpy(ms->m_extra, name, len);
4070         ms->m_hash = hash;
4071
4072         send_message(mh, ms);
4073
4074         spin_lock(&ls->ls_remove_spin);
4075         ls->ls_remove_len = 0;
4076         memset(ls->ls_remove_name, 0, DLM_RESNAME_MAXLEN);
4077         spin_unlock(&ls->ls_remove_spin);
4078 }
4079
4080 static int receive_request(struct dlm_ls *ls, struct dlm_message *ms)
4081 {
4082         struct dlm_lkb *lkb;
4083         struct dlm_rsb *r;
4084         int from_nodeid;
4085         int error, namelen = 0;
4086
4087         from_nodeid = ms->m_header.h_nodeid;
4088
4089         error = create_lkb(ls, &lkb);
4090         if (error)
4091                 goto fail;
4092
4093         receive_flags(lkb, ms);
4094         lkb->lkb_flags |= DLM_IFL_MSTCPY;
4095         error = receive_request_args(ls, lkb, ms);
4096         if (error) {
4097                 __put_lkb(ls, lkb);
4098                 goto fail;
4099         }
4100
4101         /* The dir node is the authority on whether we are the master
4102            for this rsb or not, so if the master sends us a request, we should
4103            recreate the rsb if we've destroyed it.   This race happens when we
4104            send a remove message to the dir node at the same time that the dir
4105            node sends us a request for the rsb. */
4106
4107         namelen = receive_extralen(ms);
4108
4109         error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4110                          R_RECEIVE_REQUEST, &r);
4111         if (error) {
4112                 __put_lkb(ls, lkb);
4113                 goto fail;
4114         }
4115
4116         lock_rsb(r);
4117
4118         if (r->res_master_nodeid != dlm_our_nodeid()) {
4119                 error = validate_master_nodeid(ls, r, from_nodeid);
4120                 if (error) {
4121                         unlock_rsb(r);
4122                         put_rsb(r);
4123                         __put_lkb(ls, lkb);
4124                         goto fail;
4125                 }
4126         }
4127
4128         attach_lkb(r, lkb);
4129         error = do_request(r, lkb);
4130         send_request_reply(r, lkb, error);
4131         do_request_effects(r, lkb, error);
4132
4133         unlock_rsb(r);
4134         put_rsb(r);
4135
4136         if (error == -EINPROGRESS)
4137                 error = 0;
4138         if (error)
4139                 dlm_put_lkb(lkb);
4140         return 0;
4141
4142  fail:
4143         /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4144            and do this receive_request again from process_lookup_list once
4145            we get the lookup reply.  This would avoid a many repeated
4146            ENOTBLK request failures when the lookup reply designating us
4147            as master is delayed. */
4148
4149         /* We could repeatedly return -EBADR here if our send_remove() is
4150            delayed in being sent/arriving/being processed on the dir node.
4151            Another node would repeatedly lookup up the master, and the dir
4152            node would continue returning our nodeid until our send_remove
4153            took effect.
4154
4155            We send another remove message in case our previous send_remove
4156            was lost/ignored/missed somehow. */
4157
4158         if (error != -ENOTBLK) {
4159                 log_limit(ls, "receive_request %x from %d %d",
4160                           ms->m_lkid, from_nodeid, error);
4161         }
4162
4163         if (namelen && error == -EBADR) {
4164                 send_repeat_remove(ls, ms->m_extra, namelen);
4165                 msleep(1000);
4166         }
4167
4168         setup_stub_lkb(ls, ms);
4169         send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4170         return error;
4171 }
4172
4173 static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
4174 {
4175         struct dlm_lkb *lkb;
4176         struct dlm_rsb *r;
4177         int error, reply = 1;
4178
4179         error = find_lkb(ls, ms->m_remid, &lkb);
4180         if (error)
4181                 goto fail;
4182
4183         if (lkb->lkb_remid != ms->m_lkid) {
4184                 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4185                           "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4186                           (unsigned long long)lkb->lkb_recover_seq,
4187                           ms->m_header.h_nodeid, ms->m_lkid);
4188                 error = -ENOENT;
4189                 dlm_put_lkb(lkb);
4190                 goto fail;
4191         }
4192
4193         r = lkb->lkb_resource;
4194
4195         hold_rsb(r);
4196         lock_rsb(r);
4197
4198         error = validate_message(lkb, ms);
4199         if (error)
4200                 goto out;
4201
4202         receive_flags(lkb, ms);
4203
4204         error = receive_convert_args(ls, lkb, ms);
4205         if (error) {
4206                 send_convert_reply(r, lkb, error);
4207                 goto out;
4208         }
4209
4210         reply = !down_conversion(lkb);
4211
4212         error = do_convert(r, lkb);
4213         if (reply)
4214                 send_convert_reply(r, lkb, error);
4215         do_convert_effects(r, lkb, error);
4216  out:
4217         unlock_rsb(r);
4218         put_rsb(r);
4219         dlm_put_lkb(lkb);
4220         return 0;
4221
4222  fail:
4223         setup_stub_lkb(ls, ms);
4224         send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4225         return error;
4226 }
4227
4228 static int receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
4229 {
4230         struct dlm_lkb *lkb;
4231         struct dlm_rsb *r;
4232         int error;
4233
4234         error = find_lkb(ls, ms->m_remid, &lkb);
4235         if (error)
4236                 goto fail;
4237
4238         if (lkb->lkb_remid != ms->m_lkid) {
4239                 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4240                           lkb->lkb_id, lkb->lkb_remid,
4241                           ms->m_header.h_nodeid, ms->m_lkid);
4242                 error = -ENOENT;
4243                 dlm_put_lkb(lkb);
4244                 goto fail;
4245         }
4246
4247         r = lkb->lkb_resource;
4248
4249         hold_rsb(r);
4250         lock_rsb(r);
4251
4252         error = validate_message(lkb, ms);
4253         if (error)
4254                 goto out;
4255
4256         receive_flags(lkb, ms);
4257
4258         error = receive_unlock_args(ls, lkb, ms);
4259         if (error) {
4260                 send_unlock_reply(r, lkb, error);
4261                 goto out;
4262         }
4263
4264         error = do_unlock(r, lkb);
4265         send_unlock_reply(r, lkb, error);
4266         do_unlock_effects(r, lkb, error);
4267  out:
4268         unlock_rsb(r);
4269         put_rsb(r);
4270         dlm_put_lkb(lkb);
4271         return 0;
4272
4273  fail:
4274         setup_stub_lkb(ls, ms);
4275         send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4276         return error;
4277 }
4278
4279 static int receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
4280 {
4281         struct dlm_lkb *lkb;
4282         struct dlm_rsb *r;
4283         int error;
4284
4285         error = find_lkb(ls, ms->m_remid, &lkb);
4286         if (error)
4287                 goto fail;
4288
4289         receive_flags(lkb, ms);
4290
4291         r = lkb->lkb_resource;
4292
4293         hold_rsb(r);
4294         lock_rsb(r);
4295
4296         error = validate_message(lkb, ms);
4297         if (error)
4298                 goto out;
4299
4300         error = do_cancel(r, lkb);
4301         send_cancel_reply(r, lkb, error);
4302         do_cancel_effects(r, lkb, error);
4303  out:
4304         unlock_rsb(r);
4305         put_rsb(r);
4306         dlm_put_lkb(lkb);
4307         return 0;
4308
4309  fail:
4310         setup_stub_lkb(ls, ms);
4311         send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
4312         return error;
4313 }
4314
4315 static int receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
4316 {
4317         struct dlm_lkb *lkb;
4318         struct dlm_rsb *r;
4319         int error;
4320
4321         error = find_lkb(ls, ms->m_remid, &lkb);
4322         if (error)
4323                 return error;
4324
4325         r = lkb->lkb_resource;
4326
4327         hold_rsb(r);
4328         lock_rsb(r);
4329
4330         error = validate_message(lkb, ms);
4331         if (error)
4332                 goto out;
4333
4334         receive_flags_reply(lkb, ms);
4335         if (is_altmode(lkb))
4336                 munge_altmode(lkb, ms);
4337         grant_lock_pc(r, lkb, ms);
4338         queue_cast(r, lkb, 0);
4339  out:
4340         unlock_rsb(r);
4341         put_rsb(r);
4342         dlm_put_lkb(lkb);
4343         return 0;
4344 }
4345
4346 static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
4347 {
4348         struct dlm_lkb *lkb;
4349         struct dlm_rsb *r;
4350         int error;
4351
4352         error = find_lkb(ls, ms->m_remid, &lkb);
4353         if (error)
4354                 return error;
4355
4356         r = lkb->lkb_resource;
4357
4358         hold_rsb(r);
4359         lock_rsb(r);
4360
4361         error = validate_message(lkb, ms);
4362         if (error)
4363                 goto out;
4364
4365         queue_bast(r, lkb, ms->m_bastmode);
4366         lkb->lkb_highbast = ms->m_bastmode;
4367  out:
4368         unlock_rsb(r);
4369         put_rsb(r);
4370         dlm_put_lkb(lkb);
4371         return 0;
4372 }
4373
4374 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
4375 {
4376         int len, error, ret_nodeid, from_nodeid, our_nodeid;
4377
4378         from_nodeid = ms->m_header.h_nodeid;
4379         our_nodeid = dlm_our_nodeid();
4380
4381         len = receive_extralen(ms);
4382
4383         error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4384                                   &ret_nodeid, NULL);
4385
4386         /* Optimization: we're master so treat lookup as a request */
4387         if (!error && ret_nodeid == our_nodeid) {
4388                 receive_request(ls, ms);
4389                 return;
4390         }
4391         send_lookup_reply(ls, ms, ret_nodeid, error);
4392 }
4393
4394 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
4395 {
4396         char name[DLM_RESNAME_MAXLEN+1];
4397         struct dlm_rsb *r;
4398         uint32_t hash, b;
4399         int rv, len, dir_nodeid, from_nodeid;
4400
4401         from_nodeid = ms->m_header.h_nodeid;
4402
4403         len = receive_extralen(ms);
4404
4405         if (len > DLM_RESNAME_MAXLEN) {
4406                 log_error(ls, "receive_remove from %d bad len %d",
4407                           from_nodeid, len);
4408                 return;
4409         }
4410
4411         dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
4412         if (dir_nodeid != dlm_our_nodeid()) {
4413                 log_error(ls, "receive_remove from %d bad nodeid %d",
4414                           from_nodeid, dir_nodeid);
4415                 return;
4416         }
4417
4418         /* Look for name on rsbtbl.toss, if it's there, kill it.
4419            If it's on rsbtbl.keep, it's being used, and we should ignore this
4420            message.  This is an expected race between the dir node sending a
4421            request to the master node at the same time as the master node sends
4422            a remove to the dir node.  The resolution to that race is for the
4423            dir node to ignore the remove message, and the master node to
4424            recreate the master rsb when it gets a request from the dir node for
4425            an rsb it doesn't have. */
4426
4427         memset(name, 0, sizeof(name));
4428         memcpy(name, ms->m_extra, len);
4429
4430         hash = jhash(name, len, 0);
4431         b = hash & (ls->ls_rsbtbl_size - 1);
4432
4433         spin_lock(&ls->ls_rsbtbl[b].lock);
4434
4435         rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4436         if (rv) {
4437                 /* verify the rsb is on keep list per comment above */
4438                 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4439                 if (rv) {
4440                         /* should not happen */
4441                         log_error(ls, "receive_remove from %d not found %s",
4442                                   from_nodeid, name);
4443                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4444                         return;
4445                 }
4446                 if (r->res_master_nodeid != from_nodeid) {
4447                         /* should not happen */
4448                         log_error(ls, "receive_remove keep from %d master %d",
4449                                   from_nodeid, r->res_master_nodeid);
4450                         dlm_print_rsb(r);
4451                         spin_unlock(&ls->ls_rsbtbl[b].lock);
4452                         return;
4453                 }
4454
4455                 log_debug(ls, "receive_remove from %d master %d first %x %s",
4456                           from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4457                           name);
4458                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4459                 return;
4460         }
4461
4462         if (r->res_master_nodeid != from_nodeid) {
4463                 log_error(ls, "receive_remove toss from %d master %d",
4464                           from_nodeid, r->res_master_nodeid);
4465                 dlm_print_rsb(r);
4466                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4467                 return;
4468         }
4469
4470         if (kref_put(&r->res_ref, kill_rsb)) {
4471                 rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4472                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4473                 dlm_free_rsb(r);
4474         } else {
4475                 log_error(ls, "receive_remove from %d rsb ref error",
4476                           from_nodeid);
4477                 dlm_print_rsb(r);
4478                 spin_unlock(&ls->ls_rsbtbl[b].lock);
4479         }
4480 }
4481
4482 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
4483 {
4484         do_purge(ls, ms->m_nodeid, ms->m_pid);
4485 }
4486
4487 static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
4488 {
4489         struct dlm_lkb *lkb;
4490         struct dlm_rsb *r;
4491         int error, mstype, result;
4492         int from_nodeid = ms->m_header.h_nodeid;
4493
4494         error = find_lkb(ls, ms->m_remid, &lkb);
4495         if (error)
4496                 return error;
4497
4498         r = lkb->lkb_resource;
4499         hold_rsb(r);
4500         lock_rsb(r);
4501
4502         error = validate_message(lkb, ms);
4503         if (error)
4504                 goto out;
4505
4506         mstype = lkb->lkb_wait_type;
4507         error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4508         if (error) {
4509                 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4510                           lkb->lkb_id, from_nodeid, ms->m_lkid, ms->m_result);
4511                 dlm_dump_rsb(r);
4512                 goto out;
4513         }
4514
4515         /* Optimization: the dir node was also the master, so it took our
4516            lookup as a request and sent request reply instead of lookup reply */
4517         if (mstype == DLM_MSG_LOOKUP) {
4518                 r->res_master_nodeid = from_nodeid;
4519                 r->res_nodeid = from_nodeid;
4520                 lkb->lkb_nodeid = from_nodeid;
4521         }
4522
4523         /* this is the value returned from do_request() on the master */
4524         result = ms->m_result;
4525
4526         switch (result) {
4527         case -EAGAIN:
4528                 /* request would block (be queued) on remote master */
4529                 queue_cast(r, lkb, -EAGAIN);
4530                 confirm_master(r, -EAGAIN);
4531                 unhold_lkb(lkb); /* undoes create_lkb() */
4532                 break;
4533
4534         case -EINPROGRESS:
4535         case 0:
4536                 /* request was queued or granted on remote master */
4537                 receive_flags_reply(lkb, ms);
4538                 lkb->lkb_remid = ms->m_lkid;
4539                 if (is_altmode(lkb))
4540                         munge_altmode(lkb, ms);
4541                 if (result) {
4542                         add_lkb(r, lkb, DLM_LKSTS_WAITING);
4543                         add_timeout(lkb);
4544                 } else {
4545                         grant_lock_pc(r, lkb, ms);
4546                         queue_cast(r, lkb, 0);
4547                 }
4548                 confirm_master(r, result);
4549                 break;
4550
4551         case -EBADR:
4552         case -ENOTBLK:
4553                 /* find_rsb failed to find rsb or rsb wasn't master */
4554                 log_limit(ls, "receive_request_reply %x from %d %d "
4555                           "master %d dir %d first %x %s", lkb->lkb_id,
4556                           from_nodeid, result, r->res_master_nodeid,
4557                           r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4558
4559                 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4560                     r->res_master_nodeid != dlm_our_nodeid()) {
4561                         /* cause _request_lock->set_master->send_lookup */
4562                         r->res_master_nodeid = 0;
4563                         r->res_nodeid = -1;
4564                         lkb->lkb_nodeid = -1;
4565                 }
4566
4567                 if (is_overlap(lkb)) {
4568                         /* we'll ignore error in cancel/unlock reply */
4569                         queue_cast_overlap(r, lkb);
4570                         confirm_master(r, result);
4571                         unhold_lkb(lkb); /* undoes create_lkb() */
4572                 } else {
4573                         _request_lock(r, lkb);
4574
4575                         if (r->res_master_nodeid == dlm_our_nodeid())
4576                                 confirm_master(r, 0);
4577                 }
4578                 break;
4579
4580         default:
4581                 log_error(ls, "receive_request_reply %x error %d",
4582                           lkb->lkb_id, result);
4583         }
4584
4585         if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
4586                 log_debug(ls, "receive_request_reply %x result %d unlock",
4587                           lkb->lkb_id, result);
4588                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4589                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4590                 send_unlock(r, lkb);
4591         } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
4592                 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4593                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4594                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4595                 send_cancel(r, lkb);
4596         } else {
4597                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4598                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4599         }
4600  out:
4601         unlock_rsb(r);
4602         put_rsb(r);
4603         dlm_put_lkb(lkb);
4604         return 0;
4605 }
4606
4607 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4608                                     struct dlm_message *ms)
4609 {
4610         /* this is the value returned from do_convert() on the master */
4611         switch (ms->m_result) {
4612         case -EAGAIN:
4613                 /* convert would block (be queued) on remote master */
4614                 queue_cast(r, lkb, -EAGAIN);
4615                 break;
4616
4617         case -EDEADLK:
4618                 receive_flags_reply(lkb, ms);
4619                 revert_lock_pc(r, lkb);
4620                 queue_cast(r, lkb, -EDEADLK);
4621                 break;
4622
4623         case -EINPROGRESS:
4624                 /* convert was queued on remote master */
4625                 receive_flags_reply(lkb, ms);
4626                 if (is_demoted(lkb))
4627                         munge_demoted(lkb);
4628                 del_lkb(r, lkb);
4629                 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4630                 add_timeout(lkb);
4631                 break;
4632
4633         case 0:
4634                 /* convert was granted on remote master */
4635                 receive_flags_reply(lkb, ms);
4636                 if (is_demoted(lkb))
4637                         munge_demoted(lkb);
4638                 grant_lock_pc(r, lkb, ms);
4639                 queue_cast(r, lkb, 0);
4640                 break;
4641
4642         default:
4643                 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4644                           lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
4645                           ms->m_result);
4646                 dlm_print_rsb(r);
4647                 dlm_print_lkb(lkb);
4648         }
4649 }
4650
4651 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4652 {
4653         struct dlm_rsb *r = lkb->lkb_resource;
4654         int error;
4655
4656         hold_rsb(r);
4657         lock_rsb(r);
4658
4659         error = validate_message(lkb, ms);
4660         if (error)
4661                 goto out;
4662
4663         /* stub reply can happen with waiters_mutex held */
4664         error = remove_from_waiters_ms(lkb, ms);
4665         if (error)
4666                 goto out;
4667
4668         __receive_convert_reply(r, lkb, ms);
4669  out:
4670         unlock_rsb(r);
4671         put_rsb(r);
4672 }
4673
4674 static int receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
4675 {
4676         struct dlm_lkb *lkb;
4677         int error;
4678
4679         error = find_lkb(ls, ms->m_remid, &lkb);
4680         if (error)
4681                 return error;
4682
4683         _receive_convert_reply(lkb, ms);
4684         dlm_put_lkb(lkb);
4685         return 0;
4686 }
4687
4688 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4689 {
4690         struct dlm_rsb *r = lkb->lkb_resource;
4691         int error;
4692
4693         hold_rsb(r);
4694         lock_rsb(r);
4695
4696         error = validate_message(lkb, ms);
4697         if (error)
4698                 goto out;
4699
4700         /* stub reply can happen with waiters_mutex held */
4701         error = remove_from_waiters_ms(lkb, ms);
4702         if (error)
4703                 goto out;
4704
4705         /* this is the value returned from do_unlock() on the master */
4706
4707         switch (ms->m_result) {
4708         case -DLM_EUNLOCK:
4709                 receive_flags_reply(lkb, ms);
4710                 remove_lock_pc(r, lkb);
4711                 queue_cast(r, lkb, -DLM_EUNLOCK);
4712                 break;
4713         case -ENOENT:
4714                 break;
4715         default:
4716                 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4717                           lkb->lkb_id, ms->m_result);
4718         }
4719  out:
4720         unlock_rsb(r);
4721         put_rsb(r);
4722 }
4723
4724 static int receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
4725 {
4726         struct dlm_lkb *lkb;
4727         int error;
4728
4729         error = find_lkb(ls, ms->m_remid, &lkb);
4730         if (error)
4731                 return error;
4732
4733         _receive_unlock_reply(lkb, ms);
4734         dlm_put_lkb(lkb);
4735         return 0;
4736 }
4737
4738 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
4739 {
4740         struct dlm_rsb *r = lkb->lkb_resource;
4741         int error;
4742
4743         hold_rsb(r);
4744         lock_rsb(r);
4745
4746         error = validate_message(lkb, ms);
4747         if (error)
4748                 goto out;
4749
4750         /* stub reply can happen with waiters_mutex held */
4751         error = remove_from_waiters_ms(lkb, ms);
4752         if (error)
4753                 goto out;
4754
4755         /* this is the value returned from do_cancel() on the master */
4756
4757         switch (ms->m_result) {
4758         case -DLM_ECANCEL:
4759                 receive_flags_reply(lkb, ms);
4760                 revert_lock_pc(r, lkb);
4761                 queue_cast(r, lkb, -DLM_ECANCEL);
4762                 break;
4763         case 0:
4764                 break;
4765         default:
4766                 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4767                           lkb->lkb_id, ms->m_result);
4768         }
4769  out:
4770         unlock_rsb(r);
4771         put_rsb(r);
4772 }
4773
4774 static int receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
4775 {
4776         struct dlm_lkb *lkb;
4777         int error;
4778
4779         error = find_lkb(ls, ms->m_remid, &lkb);
4780         if (error)
4781                 return error;
4782
4783         _receive_cancel_reply(lkb, ms);
4784         dlm_put_lkb(lkb);
4785         return 0;
4786 }
4787
4788 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
4789 {
4790         struct dlm_lkb *lkb;
4791         struct dlm_rsb *r;
4792         int error, ret_nodeid;
4793         int do_lookup_list = 0;
4794
4795         error = find_lkb(ls, ms->m_lkid, &lkb);
4796         if (error) {
4797                 log_error(ls, "receive_lookup_reply no lkid %x", ms->m_lkid);
4798                 return;
4799         }
4800
4801         /* ms->m_result is the value returned by dlm_master_lookup on dir node
4802            FIXME: will a non-zero error ever be returned? */
4803
4804         r = lkb->lkb_resource;
4805         hold_rsb(r);
4806         lock_rsb(r);
4807
4808         error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4809         if (error)
4810                 goto out;
4811
4812         ret_nodeid = ms->m_nodeid;
4813
4814         /* We sometimes receive a request from the dir node for this
4815            rsb before we've received the dir node's loookup_reply for it.
4816            The request from the dir node implies we're the master, so we set
4817            ourself as master in receive_request_reply, and verify here that
4818            we are indeed the master. */
4819
4820         if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4821                 /* This should never happen */
4822                 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4823                           "master %d dir %d our %d first %x %s",
4824                           lkb->lkb_id, ms->m_header.h_nodeid, ret_nodeid,
4825                           r->res_master_nodeid, r->res_dir_nodeid,
4826                           dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4827         }
4828
4829         if (ret_nodeid == dlm_our_nodeid()) {
4830                 r->res_master_nodeid = ret_nodeid;
4831                 r->res_nodeid = 0;
4832                 do_lookup_list = 1;
4833                 r->res_first_lkid = 0;
4834         } else if (ret_nodeid == -1) {
4835                 /* the remote node doesn't believe it's the dir node */
4836                 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4837                           lkb->lkb_id, ms->m_header.h_nodeid);
4838                 r->res_master_nodeid = 0;
4839                 r->res_nodeid = -1;
4840                 lkb->lkb_nodeid = -1;
4841         } else {
4842                 /* set_master() will set lkb_nodeid from r */
4843                 r->res_master_nodeid = ret_nodeid;
4844                 r->res_nodeid = ret_nodeid;
4845         }
4846
4847         if (is_overlap(lkb)) {
4848                 log_debug(ls, "receive_lookup_reply %x unlock %x",
4849                           lkb->lkb_id, lkb->lkb_flags);
4850                 queue_cast_overlap(r, lkb);
4851                 unhold_lkb(lkb); /* undoes create_lkb() */
4852                 goto out_list;
4853         }
4854
4855         _request_lock(r, lkb);
4856
4857  out_list:
4858         if (do_lookup_list)
4859                 process_lookup_list(r);
4860  out:
4861         unlock_rsb(r);
4862         put_rsb(r);
4863         dlm_put_lkb(lkb);
4864 }
4865
4866 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4867                              uint32_t saved_seq)
4868 {
4869         int error = 0, noent = 0;
4870
4871         if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
4872                 log_limit(ls, "receive %d from non-member %d %x %x %d",
4873                           ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
4874                           ms->m_remid, ms->m_result);
4875                 return;
4876         }
4877
4878         switch (ms->m_type) {
4879
4880         /* messages sent to a master node */
4881
4882         case DLM_MSG_REQUEST:
4883                 error = receive_request(ls, ms);
4884                 break;
4885
4886         case DLM_MSG_CONVERT:
4887                 error = receive_convert(ls, ms);
4888                 break;
4889
4890         case DLM_MSG_UNLOCK:
4891                 error = receive_unlock(ls, ms);
4892                 break;
4893
4894         case DLM_MSG_CANCEL:
4895                 noent = 1;
4896                 error = receive_cancel(ls, ms);
4897                 break;
4898
4899         /* messages sent from a master node (replies to above) */
4900
4901         case DLM_MSG_REQUEST_REPLY:
4902                 error = receive_request_reply(ls, ms);
4903                 break;
4904
4905         case DLM_MSG_CONVERT_REPLY:
4906                 error = receive_convert_reply(ls, ms);
4907                 break;
4908
4909         case DLM_MSG_UNLOCK_REPLY:
4910                 error = receive_unlock_reply(ls, ms);
4911                 break;
4912
4913         case DLM_MSG_CANCEL_REPLY:
4914                 error = receive_cancel_reply(ls, ms);
4915                 break;
4916
4917         /* messages sent from a master node (only two types of async msg) */
4918
4919         case DLM_MSG_GRANT:
4920                 noent = 1;
4921                 error = receive_grant(ls, ms);
4922                 break;
4923
4924         case DLM_MSG_BAST:
4925                 noent = 1;
4926                 error = receive_bast(ls, ms);
4927                 break;
4928
4929         /* messages sent to a dir node */
4930
4931         case DLM_MSG_LOOKUP:
4932                 receive_lookup(ls, ms);
4933                 break;
4934
4935         case DLM_MSG_REMOVE:
4936                 receive_remove(ls, ms);
4937                 break;
4938
4939         /* messages sent from a dir node (remove has no reply) */
4940
4941         case DLM_MSG_LOOKUP_REPLY:
4942                 receive_lookup_reply(ls, ms);
4943                 break;
4944
4945         /* other messages */
4946
4947         case DLM_MSG_PURGE:
4948                 receive_purge(ls, ms);
4949                 break;
4950
4951         default:
4952                 log_error(ls, "unknown message type %d", ms->m_type);
4953         }
4954
4955         /*
4956          * When checking for ENOENT, we're checking the result of
4957          * find_lkb(m_remid):
4958          *
4959          * The lock id referenced in the message wasn't found.  This may
4960          * happen in normal usage for the async messages and cancel, so
4961          * only use log_debug for them.
4962          *
4963          * Some errors are expected and normal.
4964          */
4965
4966         if (error == -ENOENT && noent) {
4967                 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4968                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4969                           ms->m_lkid, saved_seq);
4970         } else if (error == -ENOENT) {
4971                 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4972                           ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
4973                           ms->m_lkid, saved_seq);
4974
4975                 if (ms->m_type == DLM_MSG_CONVERT)
4976                         dlm_dump_rsb_hash(ls, ms->m_hash);
4977         }
4978
4979         if (error == -EINVAL) {
4980                 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4981                           "saved_seq %u",
4982                           ms->m_type, ms->m_header.h_nodeid,
4983                           ms->m_lkid, ms->m_remid, saved_seq);
4984         }
4985 }
4986
4987 /* If the lockspace is in recovery mode (locking stopped), then normal
4988    messages are saved on the requestqueue for processing after recovery is
4989    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4990    messages off the requestqueue before we process new ones. This occurs right
4991    after recovery completes when we transition from saving all messages on
4992    requestqueue, to processing all the saved messages, to processing new
4993    messages as they arrive. */
4994
4995 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4996                                 int nodeid)
4997 {
4998         if (dlm_locking_stopped(ls)) {
4999                 /* If we were a member of this lockspace, left, and rejoined,
5000                    other nodes may still be sending us messages from the
5001                    lockspace generation before we left. */
5002                 if (!ls->ls_generation) {
5003                         log_limit(ls, "receive %d from %d ignore old gen",
5004                                   ms->m_type, nodeid);
5005                         return;
5006                 }
5007
5008                 dlm_add_requestqueue(ls, nodeid, ms);
5009         } else {
5010                 dlm_wait_requestqueue(ls);
5011                 _receive_message(ls, ms, 0);
5012         }
5013 }
5014
5015 /* This is called by dlm_recoverd to process messages that were saved on
5016    the requestqueue. */
5017
5018 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms,
5019                                uint32_t saved_seq)
5020 {
5021         _receive_message(ls, ms, saved_seq);
5022 }
5023
5024 /* This is called by the midcomms layer when something is received for
5025    the lockspace.  It could be either a MSG (normal message sent as part of
5026    standard locking activity) or an RCOM (recovery message sent as part of
5027    lockspace recovery). */
5028
5029 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
5030 {
5031         struct dlm_header *hd = &p->header;
5032         struct dlm_ls *ls;
5033         int type = 0;
5034
5035         switch (hd->h_cmd) {
5036         case DLM_MSG:
5037                 dlm_message_in(&p->message);
5038                 type = p->message.m_type;
5039                 break;
5040         case DLM_RCOM:
5041                 dlm_rcom_in(&p->rcom);
5042                 type = p->rcom.rc_type;
5043                 break;
5044         default:
5045                 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
5046                 return;
5047         }
5048
5049         if (hd->h_nodeid != nodeid) {
5050                 log_print("invalid h_nodeid %d from %d lockspace %x",
5051                           hd->h_nodeid, nodeid, hd->h_lockspace);
5052                 return;
5053         }
5054
5055         ls = dlm_find_lockspace_global(hd->h_lockspace);
5056         if (!ls) {
5057                 if (dlm_config.ci_log_debug) {
5058                         printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
5059                                 "%u from %d cmd %d type %d\n",
5060                                 hd->h_lockspace, nodeid, hd->h_cmd, type);
5061                 }
5062
5063                 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
5064                         dlm_send_ls_not_ready(nodeid, &p->rcom);
5065                 return;
5066         }
5067
5068         /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5069            be inactive (in this ls) before transitioning to recovery mode */
5070
5071         down_read(&ls->ls_recv_active);
5072         if (hd->h_cmd == DLM_MSG)
5073                 dlm_receive_message(ls, &p->message, nodeid);
5074         else
5075                 dlm_receive_rcom(ls, &p->rcom, nodeid);
5076         up_read(&ls->ls_recv_active);
5077
5078         dlm_put_lockspace(ls);
5079 }
5080
5081 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5082                                    struct dlm_message *ms_stub)
5083 {
5084         if (middle_conversion(lkb)) {
5085                 hold_lkb(lkb);
5086                 memset(ms_stub, 0, sizeof(struct dlm_message));
5087                 ms_stub->m_flags = DLM_IFL_STUB_MS;
5088                 ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
5089                 ms_stub->m_result = -EINPROGRESS;
5090                 ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5091                 _receive_convert_reply(lkb, ms_stub);
5092
5093                 /* Same special case as in receive_rcom_lock_args() */
5094                 lkb->lkb_grmode = DLM_LOCK_IV;
5095                 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5096                 unhold_lkb(lkb);
5097
5098         } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5099                 lkb->lkb_flags |= DLM_IFL_RESEND;
5100         }
5101
5102         /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5103            conversions are async; there's no reply from the remote master */
5104 }
5105
5106 /* A waiting lkb needs recovery if the master node has failed, or
5107    the master node is changing (only when no directory is used) */
5108
5109 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5110                                  int dir_nodeid)
5111 {
5112         if (dlm_no_directory(ls))
5113                 return 1;
5114
5115         if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5116                 return 1;
5117
5118         return 0;
5119 }
5120
5121 /* Recovery for locks that are waiting for replies from nodes that are now
5122    gone.  We can just complete unlocks and cancels by faking a reply from the
5123    dead node.  Requests and up-conversions we flag to be resent after
5124    recovery.  Down-conversions can just be completed with a fake reply like
5125    unlocks.  Conversions between PR and CW need special attention. */
5126
5127 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5128 {
5129         struct dlm_lkb *lkb, *safe;
5130         struct dlm_message *ms_stub;
5131         int wait_type, stub_unlock_result, stub_cancel_result;
5132         int dir_nodeid;
5133
5134         ms_stub = kmalloc(sizeof(*ms_stub), GFP_KERNEL);
5135         if (!ms_stub)
5136                 return;
5137
5138         mutex_lock(&ls->ls_waiters_mutex);
5139
5140         list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5141
5142                 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5143
5144                 /* exclude debug messages about unlocks because there can be so
5145                    many and they aren't very interesting */
5146
5147                 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5148                         log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5149                                   "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5150                                   lkb->lkb_id,
5151                                   lkb->lkb_remid,
5152                                   lkb->lkb_wait_type,
5153                                   lkb->lkb_resource->res_nodeid,
5154                                   lkb->lkb_nodeid,
5155                                   lkb->lkb_wait_nodeid,
5156                                   dir_nodeid);
5157                 }
5158
5159                 /* all outstanding lookups, regardless of destination  will be
5160                    resent after recovery is done */
5161
5162                 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5163                         lkb->lkb_flags |= DLM_IFL_RESEND;
5164                         continue;
5165                 }
5166
5167                 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5168                         continue;
5169
5170                 wait_type = lkb->lkb_wait_type;
5171                 stub_unlock_result = -DLM_EUNLOCK;
5172                 stub_cancel_result = -DLM_ECANCEL;
5173
5174                 /* Main reply may have been received leaving a zero wait_type,
5175                    but a reply for the overlapping op may not have been
5176                    received.  In that case we need to fake the appropriate
5177                    reply for the overlap op. */
5178
5179                 if (!wait_type) {
5180                         if (is_overlap_cancel(lkb)) {
5181                                 wait_type = DLM_MSG_CANCEL;
5182                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5183                                         stub_cancel_result = 0;
5184                         }
5185                         if (is_overlap_unlock(lkb)) {
5186                                 wait_type = DLM_MSG_UNLOCK;
5187                                 if (lkb->lkb_grmode == DLM_LOCK_IV)
5188                                         stub_unlock_result = -ENOENT;
5189                         }
5190
5191                         log_debug(ls, "rwpre overlap %x %x %d %d %d",
5192                                   lkb->lkb_id, lkb->lkb_flags, wait_type,
5193                                   stub_cancel_result, stub_unlock_result);
5194                 }
5195
5196                 switch (wait_type) {
5197
5198                 case DLM_MSG_REQUEST:
5199                         lkb->lkb_flags |= DLM_IFL_RESEND;
5200                         break;
5201
5202                 case DLM_MSG_CONVERT:
5203                         recover_convert_waiter(ls, lkb, ms_stub);
5204                         break;
5205
5206                 case DLM_MSG_UNLOCK:
5207                         hold_lkb(lkb);
5208                         memset(ms_stub, 0, sizeof(struct dlm_message));
5209                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5210                         ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
5211                         ms_stub->m_result = stub_unlock_result;
5212                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5213                         _receive_unlock_reply(lkb, ms_stub);
5214                         dlm_put_lkb(lkb);
5215                         break;
5216
5217                 case DLM_MSG_CANCEL:
5218                         hold_lkb(lkb);
5219                         memset(ms_stub, 0, sizeof(struct dlm_message));
5220                         ms_stub->m_flags = DLM_IFL_STUB_MS;
5221                         ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
5222                         ms_stub->m_result = stub_cancel_result;
5223                         ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
5224                         _receive_cancel_reply(lkb, ms_stub);
5225                         dlm_put_lkb(lkb);
5226                         break;
5227
5228                 default:
5229                         log_error(ls, "invalid lkb wait_type %d %d",
5230                                   lkb->lkb_wait_type, wait_type);
5231                 }
5232                 schedule();
5233         }
5234         mutex_unlock(&ls->ls_waiters_mutex);
5235         kfree(ms_stub);
5236 }
5237
5238 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5239 {
5240         struct dlm_lkb *lkb;
5241         int found = 0;
5242
5243         mutex_lock(&ls->ls_waiters_mutex);
5244         list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
5245                 if (lkb->lkb_flags & DLM_IFL_RESEND) {
5246                         hold_lkb(lkb);
5247                         found = 1;
5248                         break;
5249                 }
5250         }
5251         mutex_unlock(&ls->ls_waiters_mutex);
5252
5253         if (!found)
5254                 lkb = NULL;
5255         return lkb;
5256 }
5257
5258 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5259    master or dir-node for r.  Processing the lkb may result in it being placed
5260    back on waiters. */
5261
5262 /* We do this after normal locking has been enabled and any saved messages
5263    (in requestqueue) have been processed.  We should be confident that at
5264    this point we won't get or process a reply to any of these waiting
5265    operations.  But, new ops may be coming in on the rsbs/locks here from
5266    userspace or remotely. */
5267
5268 /* there may have been an overlap unlock/cancel prior to recovery or after
5269    recovery.  if before, the lkb may still have a pos wait_count; if after, the
5270    overlap flag would just have been set and nothing new sent.  we can be
5271    confident here than any replies to either the initial op or overlap ops
5272    prior to recovery have been received. */
5273
5274 int dlm_recover_waiters_post(struct dlm_ls *ls)
5275 {
5276         struct dlm_lkb *lkb;
5277         struct dlm_rsb *r;
5278         int error = 0, mstype, err, oc, ou;
5279
5280         while (1) {
5281                 if (dlm_locking_stopped(ls)) {
5282                         log_debug(ls, "recover_waiters_post aborted");
5283                         error = -EINTR;
5284                         break;
5285                 }
5286
5287                 lkb = find_resend_waiter(ls);
5288                 if (!lkb)
5289                         break;
5290
5291                 r = lkb->lkb_resource;
5292                 hold_rsb(r);
5293                 lock_rsb(r);
5294
5295                 mstype = lkb->lkb_wait_type;
5296                 oc = is_overlap_cancel(lkb);
5297                 ou = is_overlap_unlock(lkb);
5298                 err = 0;
5299
5300                 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5301                           "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5302                           "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5303                           r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5304                           dlm_dir_nodeid(r), oc, ou);
5305
5306                 /* At this point we assume that we won't get a reply to any
5307                    previous op or overlap op on this lock.  First, do a big
5308                    remove_from_waiters() for all previous ops. */
5309
5310                 lkb->lkb_flags &= ~DLM_IFL_RESEND;
5311                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
5312                 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
5313                 lkb->lkb_wait_type = 0;
5314                 lkb->lkb_wait_count = 0;
5315                 mutex_lock(&ls->ls_waiters_mutex);
5316                 list_del_init(&lkb->lkb_wait_reply);
5317                 mutex_unlock(&ls->ls_waiters_mutex);
5318                 unhold_lkb(lkb); /* for waiters list */
5319
5320                 if (oc || ou) {
5321                         /* do an unlock or cancel instead of resending */
5322                         switch (mstype) {
5323                         case DLM_MSG_LOOKUP:
5324                         case DLM_MSG_REQUEST:
5325                                 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5326                                                         -DLM_ECANCEL);
5327                                 unhold_lkb(lkb); /* undoes create_lkb() */
5328                                 break;
5329                         case DLM_MSG_CONVERT:
5330                                 if (oc) {
5331                                         queue_cast(r, lkb, -DLM_ECANCEL);
5332                                 } else {
5333                                         lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5334                                         _unlock_lock(r, lkb);
5335                                 }
5336                                 break;
5337                         default:
5338                                 err = 1;
5339                         }
5340                 } else {
5341                         switch (mstype) {
5342                         case DLM_MSG_LOOKUP:
5343                         case DLM_MSG_REQUEST:
5344                                 _request_lock(r, lkb);
5345                                 if (is_master(r))
5346                                         confirm_master(r, 0);
5347                                 break;
5348                         case DLM_MSG_CONVERT:
5349                                 _convert_lock(r, lkb);
5350                                 break;
5351                         default:
5352                                 err = 1;
5353                         }
5354                 }
5355
5356                 if (err) {
5357                         log_error(ls, "waiter %x msg %d r_nodeid %d "
5358                                   "dir_nodeid %d overlap %d %d",
5359                                   lkb->lkb_id, mstype, r->res_nodeid,
5360                                   dlm_dir_nodeid(r), oc, ou);
5361                 }
5362                 unlock_rsb(r);
5363                 put_rsb(r);
5364                 dlm_put_lkb(lkb);
5365         }
5366
5367         return error;
5368 }
5369
5370 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5371                               struct list_head *list)
5372 {
5373         struct dlm_lkb *lkb, *safe;
5374
5375         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5376                 if (!is_master_copy(lkb))
5377                         continue;
5378
5379                 /* don't purge lkbs we've added in recover_master_copy for
5380                    the current recovery seq */
5381
5382                 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5383                         continue;
5384
5385                 del_lkb(r, lkb);
5386
5387                 /* this put should free the lkb */
5388                 if (!dlm_put_lkb(lkb))
5389                         log_error(ls, "purged mstcpy lkb not released");
5390         }
5391 }
5392
5393 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5394 {
5395         struct dlm_ls *ls = r->res_ls;
5396
5397         purge_mstcpy_list(ls, r, &r->res_grantqueue);
5398         purge_mstcpy_list(ls, r, &r->res_convertqueue);
5399         purge_mstcpy_list(ls, r, &r->res_waitqueue);
5400 }
5401
5402 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5403                             struct list_head *list,
5404                             int nodeid_gone, unsigned int *count)
5405 {
5406         struct dlm_lkb *lkb, *safe;
5407
5408         list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5409                 if (!is_master_copy(lkb))
5410                         continue;
5411
5412                 if ((lkb->lkb_nodeid == nodeid_gone) ||
5413                     dlm_is_removed(ls, lkb->lkb_nodeid)) {
5414
5415                         /* tell recover_lvb to invalidate the lvb
5416                            because a node holding EX/PW failed */
5417                         if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5418                             (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5419                                 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5420                         }
5421
5422                         del_lkb(r, lkb);
5423
5424                         /* this put should free the lkb */
5425                         if (!dlm_put_lkb(lkb))
5426                                 log_error(ls, "purged dead lkb not released");
5427
5428                         rsb_set_flag(r, RSB_RECOVER_GRANT);
5429
5430                         (*count)++;
5431                 }
5432         }
5433 }
5434
5435 /* Get rid of locks held by nodes that are gone. */
5436
5437 void dlm_recover_purge(struct dlm_ls *ls)
5438 {
5439         struct dlm_rsb *r;
5440         struct dlm_member *memb;
5441         int nodes_count = 0;
5442         int nodeid_gone = 0;
5443         unsigned int lkb_count = 0;
5444
5445         /* cache one removed nodeid to optimize the common
5446            case of a single node removed */
5447
5448         list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5449                 nodes_count++;
5450                 nodeid_gone = memb->nodeid;
5451         }
5452
5453         if (!nodes_count)
5454                 return;
5455
5456         down_write(&ls->ls_root_sem);
5457         list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5458                 hold_rsb(r);
5459                 lock_rsb(r);
5460                 if (is_master(r)) {
5461                         purge_dead_list(ls, r, &r->res_grantqueue,
5462                                         nodeid_gone, &lkb_count);
5463                         purge_dead_list(ls, r, &r->res_convertqueue,
5464                                         nodeid_gone, &lkb_count);
5465                         purge_dead_list(ls, r, &r->res_waitqueue,
5466                                         nodeid_gone, &lkb_count);
5467                 }
5468                 unlock_rsb(r);
5469                 unhold_rsb(r);
5470                 cond_resched();
5471         }
5472         up_write(&ls->ls_root_sem);
5473
5474         if (lkb_count)
5475                 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5476                           lkb_count, nodes_count);
5477 }
5478
5479 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5480 {
5481         struct rb_node *n;
5482         struct dlm_rsb *r;
5483
5484         spin_lock(&ls->ls_rsbtbl[bucket].lock);
5485         for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5486                 r = rb_entry(n, struct dlm_rsb, res_hashnode);
5487
5488                 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5489                         continue;
5490                 if (!is_master(r)) {
5491                         rsb_clear_flag(r, RSB_RECOVER_GRANT);
5492                         continue;
5493                 }
5494                 hold_rsb(r);
5495                 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5496                 return r;
5497         }
5498         spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5499         return NULL;
5500 }
5501
5502 /*
5503  * Attempt to grant locks on resources that we are the master of.
5504  * Locks may have become grantable during recovery because locks
5505  * from departed nodes have been purged (or not rebuilt), allowing
5506  * previously blocked locks to now be granted.  The subset of rsb's
5507  * we are interested in are those with lkb's on either the convert or
5508  * waiting queues.
5509  *
5510  * Simplest would be to go through each master rsb and check for non-empty
5511  * convert or waiting queues, and attempt to grant on those rsbs.
5512  * Checking the queues requires lock_rsb, though, for which we'd need
5513  * to release the rsbtbl lock.  This would make iterating through all
5514  * rsb's very inefficient.  So, we rely on earlier recovery routines
5515  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5516  * locks for.
5517  */
5518
5519 void dlm_recover_grant(struct dlm_ls *ls)
5520 {
5521         struct dlm_rsb *r;
5522         int bucket = 0;
5523         unsigned int count = 0;
5524         unsigned int rsb_count = 0;
5525         unsigned int lkb_count = 0;
5526
5527         while (1) {
5528                 r = find_grant_rsb(ls, bucket);
5529                 if (!r) {
5530                         if (bucket == ls->ls_rsbtbl_size - 1)
5531                                 break;
5532                         bucket++;
5533                         continue;
5534                 }
5535                 rsb_count++;
5536                 count = 0;
5537                 lock_rsb(r);
5538                 /* the RECOVER_GRANT flag is checked in the grant path */
5539                 grant_pending_locks(r, &count);
5540                 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5541                 lkb_count += count;
5542                 confirm_master(r, 0);
5543                 unlock_rsb(r);
5544                 put_rsb(r);
5545                 cond_resched();
5546         }
5547
5548         if (lkb_count)
5549                 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5550                           lkb_count, rsb_count);
5551 }
5552
5553 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5554                                          uint32_t remid)
5555 {
5556         struct dlm_lkb *lkb;
5557
5558         list_for_each_entry(lkb, head, lkb_statequeue) {
5559                 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5560                         return lkb;
5561         }
5562         return NULL;
5563 }
5564
5565 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5566                                     uint32_t remid)
5567 {
5568         struct dlm_lkb *lkb;
5569
5570         lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5571         if (lkb)
5572                 return lkb;
5573         lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5574         if (lkb)
5575                 return lkb;
5576         lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5577         if (lkb)
5578                 return lkb;
5579         return NULL;
5580 }
5581
5582 /* needs at least dlm_rcom + rcom_lock */
5583 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5584                                   struct dlm_rsb *r, struct dlm_rcom *rc)
5585 {
5586         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5587
5588         lkb->lkb_nodeid = rc->rc_header.h_nodeid;
5589         lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5590         lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5591         lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5592         lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
5593         lkb->lkb_flags |= DLM_IFL_MSTCPY;
5594         lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5595         lkb->lkb_rqmode = rl->rl_rqmode;
5596         lkb->lkb_grmode = rl->rl_grmode;
5597         /* don't set lkb_status because add_lkb wants to itself */
5598
5599         lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5600         lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5601
5602         if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5603                 int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
5604                          sizeof(struct rcom_lock);
5605                 if (lvblen > ls->ls_lvblen)
5606                         return -EINVAL;
5607                 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5608                 if (!lkb->lkb_lvbptr)
5609                         return -ENOMEM;
5610                 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5611         }
5612
5613         /* Conversions between PR and CW (middle modes) need special handling.
5614            The real granted mode of these converting locks cannot be determined
5615            until all locks have been rebuilt on the rsb (recover_conversion) */
5616
5617         if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5618             middle_conversion(lkb)) {
5619                 rl->rl_status = DLM_LKSTS_CONVERT;
5620                 lkb->lkb_grmode = DLM_LOCK_IV;
5621                 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5622         }
5623
5624         return 0;
5625 }
5626
5627 /* This lkb may have been recovered in a previous aborted recovery so we need
5628    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5629    If so we just send back a standard reply.  If not, we create a new lkb with
5630    the given values and send back our lkid.  We send back our lkid by sending
5631    back the rcom_lock struct we got but with the remid field filled in. */
5632
5633 /* needs at least dlm_rcom + rcom_lock */
5634 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5635 {
5636         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5637         struct dlm_rsb *r;
5638         struct dlm_lkb *lkb;
5639         uint32_t remid = 0;
5640         int from_nodeid = rc->rc_header.h_nodeid;
5641         int error;
5642
5643         if (rl->rl_parent_lkid) {
5644                 error = -EOPNOTSUPP;
5645                 goto out;
5646         }
5647
5648         remid = le32_to_cpu(rl->rl_lkid);
5649
5650         /* In general we expect the rsb returned to be R_MASTER, but we don't
5651            have to require it.  Recovery of masters on one node can overlap
5652            recovery of locks on another node, so one node can send us MSTCPY
5653            locks before we've made ourselves master of this rsb.  We can still
5654            add new MSTCPY locks that we receive here without any harm; when
5655            we make ourselves master, dlm_recover_masters() won't touch the
5656            MSTCPY locks we've received early. */
5657
5658         error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5659                          from_nodeid, R_RECEIVE_RECOVER, &r);
5660         if (error)
5661                 goto out;
5662
5663         lock_rsb(r);
5664
5665         if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5666                 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5667                           from_nodeid, remid);
5668                 error = -EBADR;
5669                 goto out_unlock;
5670         }
5671
5672         lkb = search_remid(r, from_nodeid, remid);
5673         if (lkb) {
5674                 error = -EEXIST;
5675                 goto out_remid;
5676         }
5677
5678         error = create_lkb(ls, &lkb);
5679         if (error)
5680                 goto out_unlock;
5681
5682         error = receive_rcom_lock_args(ls, lkb, r, rc);
5683         if (error) {
5684                 __put_lkb(ls, lkb);
5685                 goto out_unlock;
5686         }
5687
5688         attach_lkb(r, lkb);
5689         add_lkb(r, lkb, rl->rl_status);
5690         error = 0;
5691         ls->ls_recover_locks_in++;
5692
5693         if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5694                 rsb_set_flag(r, RSB_RECOVER_GRANT);
5695
5696  out_remid:
5697         /* this is the new value returned to the lock holder for
5698            saving in its process-copy lkb */
5699         rl->rl_remid = cpu_to_le32(lkb->lkb_id);
5700
5701         lkb->lkb_recover_seq = ls->ls_recover_seq;
5702
5703  out_unlock:
5704         unlock_rsb(r);
5705         put_rsb(r);
5706  out:
5707         if (error && error != -EEXIST)
5708                 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5709                           from_nodeid, remid, error);
5710         rl->rl_result = cpu_to_le32(error);
5711         return error;
5712 }
5713
5714 /* needs at least dlm_rcom + rcom_lock */
5715 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
5716 {
5717         struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5718         struct dlm_rsb *r;
5719         struct dlm_lkb *lkb;
5720         uint32_t lkid, remid;
5721         int error, result;
5722
5723         lkid = le32_to_cpu(rl->rl_lkid);
5724         remid = le32_to_cpu(rl->rl_remid);
5725         result = le32_to_cpu(rl->rl_result);
5726
5727         error = find_lkb(ls, lkid, &lkb);
5728         if (error) {
5729                 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5730                           lkid, rc->rc_header.h_nodeid, remid, result);
5731                 return error;
5732         }
5733
5734         r = lkb->lkb_resource;
5735         hold_rsb(r);
5736         lock_rsb(r);
5737
5738         if (!is_process_copy(lkb)) {
5739                 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5740                           lkid, rc->rc_header.h_nodeid, remid, result);
5741                 dlm_dump_rsb(r);
5742                 unlock_rsb(r);
5743                 put_rsb(r);
5744                 dlm_put_lkb(lkb);
5745                 return -EINVAL;
5746         }
5747
5748         switch (result) {
5749         case -EBADR:
5750                 /* There's a chance the new master received our lock before
5751                    dlm_recover_master_reply(), this wouldn't happen if we did
5752                    a barrier between recover_masters and recover_locks. */
5753
5754                 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5755                           lkid, rc->rc_header.h_nodeid, remid, result);
5756         
5757                 dlm_send_rcom_lock(r, lkb);
5758                 goto out;
5759         case -EEXIST:
5760         case 0:
5761                 lkb->lkb_remid = remid;
5762                 break;
5763         default:
5764                 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5765                           lkid, rc->rc_header.h_nodeid, remid, result);
5766         }
5767
5768         /* an ack for dlm_recover_locks() which waits for replies from
5769            all the locks it sends to new masters */
5770         dlm_recovered_lock(r);
5771  out:
5772         unlock_rsb(r);
5773         put_rsb(r);
5774         dlm_put_lkb(lkb);
5775
5776         return 0;
5777 }
5778
5779 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5780                      int mode, uint32_t flags, void *name, unsigned int namelen,
5781                      unsigned long timeout_cs)
5782 {
5783         struct dlm_lkb *lkb;
5784         struct dlm_args args;
5785         int error;
5786
5787         dlm_lock_recovery(ls);
5788
5789         error = create_lkb(ls, &lkb);
5790         if (error) {
5791                 kfree(ua);
5792                 goto out;
5793         }
5794
5795         if (flags & DLM_LKF_VALBLK) {
5796                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5797                 if (!ua->lksb.sb_lvbptr) {
5798                         kfree(ua);
5799                         __put_lkb(ls, lkb);
5800                         error = -ENOMEM;
5801                         goto out;
5802                 }
5803         }
5804         error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
5805                               fake_astfn, ua, fake_bastfn, &args);
5806         if (error) {
5807                 kfree(ua->lksb.sb_lvbptr);
5808                 ua->lksb.sb_lvbptr = NULL;
5809                 kfree(ua);
5810                 __put_lkb(ls, lkb);
5811                 goto out;
5812         }
5813
5814         /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5815            When DLM_IFL_USER is set, the dlm knows that this is a userspace
5816            lock and that lkb_astparam is the dlm_user_args structure. */
5817         lkb->lkb_flags |= DLM_IFL_USER;
5818         error = request_lock(ls, lkb, name, namelen, &args);
5819
5820         switch (error) {
5821         case 0:
5822                 break;
5823         case -EINPROGRESS:
5824                 error = 0;
5825                 break;
5826         case -EAGAIN:
5827                 error = 0;
5828                 /* fall through */
5829         default:
5830                 __put_lkb(ls, lkb);
5831                 goto out;
5832         }
5833
5834         /* add this new lkb to the per-process list of locks */
5835         spin_lock(&ua->proc->locks_spin);
5836         hold_lkb(lkb);
5837         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5838         spin_unlock(&ua->proc->locks_spin);
5839  out:
5840         dlm_unlock_recovery(ls);
5841         return error;
5842 }
5843
5844 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5845                      int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
5846                      unsigned long timeout_cs)
5847 {
5848         struct dlm_lkb *lkb;
5849         struct dlm_args args;
5850         struct dlm_user_args *ua;
5851         int error;
5852
5853         dlm_lock_recovery(ls);
5854
5855         error = find_lkb(ls, lkid, &lkb);
5856         if (error)
5857                 goto out;
5858
5859         /* user can change the params on its lock when it converts it, or
5860            add an lvb that didn't exist before */
5861
5862         ua = lkb->lkb_ua;
5863
5864         if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5865                 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5866                 if (!ua->lksb.sb_lvbptr) {
5867                         error = -ENOMEM;
5868                         goto out_put;
5869                 }
5870         }
5871         if (lvb_in && ua->lksb.sb_lvbptr)
5872                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5873
5874         ua->xid = ua_tmp->xid;
5875         ua->castparam = ua_tmp->castparam;
5876         ua->castaddr = ua_tmp->castaddr;
5877         ua->bastparam = ua_tmp->bastparam;
5878         ua->bastaddr = ua_tmp->bastaddr;
5879         ua->user_lksb = ua_tmp->user_lksb;
5880
5881         error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
5882                               fake_astfn, ua, fake_bastfn, &args);
5883         if (error)
5884                 goto out_put;
5885
5886         error = convert_lock(ls, lkb, &args);
5887
5888         if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5889                 error = 0;
5890  out_put:
5891         dlm_put_lkb(lkb);
5892  out:
5893         dlm_unlock_recovery(ls);
5894         kfree(ua_tmp);
5895         return error;
5896 }
5897
5898 /*
5899  * The caller asks for an orphan lock on a given resource with a given mode.
5900  * If a matching lock exists, it's moved to the owner's list of locks and
5901  * the lkid is returned.
5902  */
5903
5904 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5905                      int mode, uint32_t flags, void *name, unsigned int namelen,
5906                      unsigned long timeout_cs, uint32_t *lkid)
5907 {
5908         struct dlm_lkb *lkb;
5909         struct dlm_user_args *ua;
5910         int found_other_mode = 0;
5911         int found = 0;
5912         int rv = 0;
5913
5914         mutex_lock(&ls->ls_orphans_mutex);
5915         list_for_each_entry(lkb, &ls->ls_orphans, lkb_ownqueue) {
5916                 if (lkb->lkb_resource->res_length != namelen)
5917                         continue;
5918                 if (memcmp(lkb->lkb_resource->res_name, name, namelen))
5919                         continue;
5920                 if (lkb->lkb_grmode != mode) {
5921                         found_other_mode = 1;
5922                         continue;
5923                 }
5924
5925                 found = 1;
5926                 list_del_init(&lkb->lkb_ownqueue);
5927                 lkb->lkb_flags &= ~DLM_IFL_ORPHAN;
5928                 *lkid = lkb->lkb_id;
5929                 break;
5930         }
5931         mutex_unlock(&ls->ls_orphans_mutex);
5932
5933         if (!found && found_other_mode) {
5934                 rv = -EAGAIN;
5935                 goto out;
5936         }
5937
5938         if (!found) {
5939                 rv = -ENOENT;
5940                 goto out;
5941         }
5942
5943         lkb->lkb_exflags = flags;
5944         lkb->lkb_ownpid = (int) current->pid;
5945
5946         ua = lkb->lkb_ua;
5947
5948         ua->proc = ua_tmp->proc;
5949         ua->xid = ua_tmp->xid;
5950         ua->castparam = ua_tmp->castparam;
5951         ua->castaddr = ua_tmp->castaddr;
5952         ua->bastparam = ua_tmp->bastparam;
5953         ua->bastaddr = ua_tmp->bastaddr;
5954         ua->user_lksb = ua_tmp->user_lksb;
5955
5956         /*
5957          * The lkb reference from the ls_orphans list was not
5958          * removed above, and is now considered the reference
5959          * for the proc locks list.
5960          */
5961
5962         spin_lock(&ua->proc->locks_spin);
5963         list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5964         spin_unlock(&ua->proc->locks_spin);
5965  out:
5966         kfree(ua_tmp);
5967         return rv;
5968 }
5969
5970 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5971                     uint32_t flags, uint32_t lkid, char *lvb_in)
5972 {
5973         struct dlm_lkb *lkb;
5974         struct dlm_args args;
5975         struct dlm_user_args *ua;
5976         int error;
5977
5978         dlm_lock_recovery(ls);
5979
5980         error = find_lkb(ls, lkid, &lkb);
5981         if (error)
5982                 goto out;
5983
5984         ua = lkb->lkb_ua;
5985
5986         if (lvb_in && ua->lksb.sb_lvbptr)
5987                 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5988         if (ua_tmp->castparam)
5989                 ua->castparam = ua_tmp->castparam;
5990         ua->user_lksb = ua_tmp->user_lksb;
5991
5992         error = set_unlock_args(flags, ua, &args);
5993         if (error)
5994                 goto out_put;
5995
5996         error = unlock_lock(ls, lkb, &args);
5997
5998         if (error == -DLM_EUNLOCK)
5999                 error = 0;
6000         /* from validate_unlock_args() */
6001         if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
6002                 error = 0;
6003         if (error)
6004                 goto out_put;
6005
6006         spin_lock(&ua->proc->locks_spin);
6007         /* dlm_user_add_cb() may have already taken lkb off the proc list */
6008         if (!list_empty(&lkb->lkb_ownqueue))
6009                 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
6010         spin_unlock(&ua->proc->locks_spin);
6011  out_put:
6012         dlm_put_lkb(lkb);
6013  out:
6014         dlm_unlock_recovery(ls);
6015         kfree(ua_tmp);
6016         return error;
6017 }
6018
6019 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
6020                     uint32_t flags, uint32_t lkid)
6021 {
6022         struct dlm_lkb *lkb;
6023         struct dlm_args args;
6024         struct dlm_user_args *ua;
6025         int error;
6026
6027         dlm_lock_recovery(ls);
6028
6029         error = find_lkb(ls, lkid, &lkb);
6030         if (error)
6031                 goto out;
6032
6033         ua = lkb->lkb_ua;
6034         if (ua_tmp->castparam)
6035                 ua->castparam = ua_tmp->castparam;
6036         ua->user_lksb = ua_tmp->user_lksb;
6037
6038         error = set_unlock_args(flags, ua, &args);
6039         if (error)
6040                 goto out_put;
6041
6042         error = cancel_lock(ls, lkb, &args);
6043
6044         if (error == -DLM_ECANCEL)
6045                 error = 0;
6046         /* from validate_unlock_args() */
6047         if (error == -EBUSY)
6048                 error = 0;
6049  out_put:
6050         dlm_put_lkb(lkb);
6051  out:
6052         dlm_unlock_recovery(ls);
6053         kfree(ua_tmp);
6054         return error;
6055 }
6056
6057 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6058 {
6059         struct dlm_lkb *lkb;
6060         struct dlm_args args;
6061         struct dlm_user_args *ua;
6062         struct dlm_rsb *r;
6063         int error;
6064
6065         dlm_lock_recovery(ls);
6066
6067         error = find_lkb(ls, lkid, &lkb);
6068         if (error)
6069                 goto out;
6070
6071         ua = lkb->lkb_ua;
6072
6073         error = set_unlock_args(flags, ua, &args);
6074         if (error)
6075                 goto out_put;
6076
6077         /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6078
6079         r = lkb->lkb_resource;
6080         hold_rsb(r);
6081         lock_rsb(r);
6082
6083         error = validate_unlock_args(lkb, &args);
6084         if (error)
6085                 goto out_r;
6086         lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
6087
6088         error = _cancel_lock(r, lkb);
6089  out_r:
6090         unlock_rsb(r);
6091         put_rsb(r);
6092
6093         if (error == -DLM_ECANCEL)
6094                 error = 0;
6095         /* from validate_unlock_args() */
6096         if (error == -EBUSY)
6097                 error = 0;
6098  out_put:
6099         dlm_put_lkb(lkb);
6100  out:
6101         dlm_unlock_recovery(ls);
6102         return error;
6103 }
6104
6105 /* lkb's that are removed from the waiters list by revert are just left on the
6106    orphans list with the granted orphan locks, to be freed by purge */
6107
6108 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6109 {
6110         struct dlm_args args;
6111         int error;
6112
6113         hold_lkb(lkb); /* reference for the ls_orphans list */
6114         mutex_lock(&ls->ls_orphans_mutex);
6115         list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6116         mutex_unlock(&ls->ls_orphans_mutex);
6117
6118         set_unlock_args(0, lkb->lkb_ua, &args);
6119
6120         error = cancel_lock(ls, lkb, &args);
6121         if (error == -DLM_ECANCEL)
6122                 error = 0;
6123         return error;
6124 }
6125
6126 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6127    granted.  Regardless of what rsb queue the lock is on, it's removed and
6128    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6129    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6130
6131 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6132 {
6133         struct dlm_args args;
6134         int error;
6135
6136         set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6137                         lkb->lkb_ua, &args);
6138
6139         error = unlock_lock(ls, lkb, &args);
6140         if (error == -DLM_EUNLOCK)
6141                 error = 0;
6142         return error;
6143 }
6144
6145 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6146    (which does lock_rsb) due to deadlock with receiving a message that does
6147    lock_rsb followed by dlm_user_add_cb() */
6148
6149 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6150                                      struct dlm_user_proc *proc)
6151 {
6152         struct dlm_lkb *lkb = NULL;
6153
6154         mutex_lock(&ls->ls_clear_proc_locks);
6155         if (list_empty(&proc->locks))
6156                 goto out;
6157
6158         lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6159         list_del_init(&lkb->lkb_ownqueue);
6160
6161         if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6162                 lkb->lkb_flags |= DLM_IFL_ORPHAN;
6163         else
6164                 lkb->lkb_flags |= DLM_IFL_DEAD;
6165  out:
6166         mutex_unlock(&ls->ls_clear_proc_locks);
6167         return lkb;
6168 }
6169
6170 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6171    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6172    which we clear here. */
6173
6174 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6175    list, and no more device_writes should add lkb's to proc->locks list; so we
6176    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6177    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6178    them ourself. */
6179
6180 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6181 {
6182         struct dlm_lkb *lkb, *safe;
6183
6184         dlm_lock_recovery(ls);
6185
6186         while (1) {
6187                 lkb = del_proc_lock(ls, proc);
6188                 if (!lkb)
6189                         break;
6190                 del_timeout(lkb);
6191                 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6192                         orphan_proc_lock(ls, lkb);
6193                 else
6194                         unlock_proc_lock(ls, lkb);
6195
6196                 /* this removes the reference for the proc->locks list
6197                    added by dlm_user_request, it may result in the lkb
6198                    being freed */
6199
6200                 dlm_put_lkb(lkb);
6201         }
6202
6203         mutex_lock(&ls->ls_clear_proc_locks);
6204
6205         /* in-progress unlocks */
6206         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6207                 list_del_init(&lkb->lkb_ownqueue);
6208                 lkb->lkb_flags |= DLM_IFL_DEAD;
6209                 dlm_put_lkb(lkb);
6210         }
6211
6212         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6213                 memset(&lkb->lkb_callbacks, 0,
6214                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6215                 list_del_init(&lkb->lkb_cb_list);
6216                 dlm_put_lkb(lkb);
6217         }
6218
6219         mutex_unlock(&ls->ls_clear_proc_locks);
6220         dlm_unlock_recovery(ls);
6221 }
6222
6223 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6224 {
6225         struct dlm_lkb *lkb, *safe;
6226
6227         while (1) {
6228                 lkb = NULL;
6229                 spin_lock(&proc->locks_spin);
6230                 if (!list_empty(&proc->locks)) {
6231                         lkb = list_entry(proc->locks.next, struct dlm_lkb,
6232                                          lkb_ownqueue);
6233                         list_del_init(&lkb->lkb_ownqueue);
6234                 }
6235                 spin_unlock(&proc->locks_spin);
6236
6237                 if (!lkb)
6238                         break;
6239
6240                 lkb->lkb_flags |= DLM_IFL_DEAD;
6241                 unlock_proc_lock(ls, lkb);
6242                 dlm_put_lkb(lkb); /* ref from proc->locks list */
6243         }
6244
6245         spin_lock(&proc->locks_spin);
6246         list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6247                 list_del_init(&lkb->lkb_ownqueue);
6248                 lkb->lkb_flags |= DLM_IFL_DEAD;
6249                 dlm_put_lkb(lkb);
6250         }
6251         spin_unlock(&proc->locks_spin);
6252
6253         spin_lock(&proc->asts_spin);
6254         list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6255                 memset(&lkb->lkb_callbacks, 0,
6256                        sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
6257                 list_del_init(&lkb->lkb_cb_list);
6258                 dlm_put_lkb(lkb);
6259         }
6260         spin_unlock(&proc->asts_spin);
6261 }
6262
6263 /* pid of 0 means purge all orphans */
6264
6265 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6266 {
6267         struct dlm_lkb *lkb, *safe;
6268
6269         mutex_lock(&ls->ls_orphans_mutex);
6270         list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6271                 if (pid && lkb->lkb_ownpid != pid)
6272                         continue;
6273                 unlock_proc_lock(ls, lkb);
6274                 list_del_init(&lkb->lkb_ownqueue);
6275                 dlm_put_lkb(lkb);
6276         }
6277         mutex_unlock(&ls->ls_orphans_mutex);
6278 }
6279
6280 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6281 {
6282         struct dlm_message *ms;
6283         struct dlm_mhandle *mh;
6284         int error;
6285
6286         error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6287                                 DLM_MSG_PURGE, &ms, &mh);
6288         if (error)
6289                 return error;
6290         ms->m_nodeid = nodeid;
6291         ms->m_pid = pid;
6292
6293         return send_message(mh, ms);
6294 }
6295
6296 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6297                    int nodeid, int pid)
6298 {
6299         int error = 0;
6300
6301         if (nodeid && (nodeid != dlm_our_nodeid())) {
6302                 error = send_purge(ls, nodeid, pid);
6303         } else {
6304                 dlm_lock_recovery(ls);
6305                 if (pid == current->pid)
6306                         purge_proc_locks(ls, proc);
6307                 else
6308                         do_purge(ls, nodeid, pid);
6309                 dlm_unlock_recovery(ls);
6310         }
6311         return error;
6312 }
6313