GNU Linux-libre 4.9-gnu1
[releases.git] / drivers / staging / lustre / lustre / ldlm / ldlm_lockd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ldlm/ldlm_lockd.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LDLM
39
40 #include "../../include/linux/libcfs/libcfs.h"
41 #include "../include/lustre_dlm.h"
42 #include "../include/obd_class.h"
43 #include <linux/list.h>
44 #include "ldlm_internal.h"
45
46 static int ldlm_num_threads;
47 module_param(ldlm_num_threads, int, 0444);
48 MODULE_PARM_DESC(ldlm_num_threads, "number of DLM service threads to start");
49
50 static char *ldlm_cpts;
51 module_param(ldlm_cpts, charp, 0444);
52 MODULE_PARM_DESC(ldlm_cpts, "CPU partitions ldlm threads should run on");
53
54 static struct mutex     ldlm_ref_mutex;
55 static int ldlm_refcount;
56
57 static struct kobject *ldlm_kobj;
58 struct kset *ldlm_ns_kset;
59 static struct kset *ldlm_svc_kset;
60
61 struct ldlm_cb_async_args {
62         struct ldlm_cb_set_arg *ca_set_arg;
63         struct ldlm_lock       *ca_lock;
64 };
65
66 /* LDLM state */
67
68 static struct ldlm_state *ldlm_state;
69
70 #define ELT_STOPPED   0
71 #define ELT_READY     1
72 #define ELT_TERMINATE 2
73
74 struct ldlm_bl_pool {
75         spinlock_t              blp_lock;
76
77         /*
78          * blp_prio_list is used for callbacks that should be handled
79          * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
80          * see bug 13843
81          */
82         struct list_head              blp_prio_list;
83
84         /*
85          * blp_list is used for all other callbacks which are likely
86          * to take longer to process.
87          */
88         struct list_head              blp_list;
89
90         wait_queue_head_t            blp_waitq;
91         struct completion       blp_comp;
92         atomic_t            blp_num_threads;
93         atomic_t            blp_busy_threads;
94         int                  blp_min_threads;
95         int                  blp_max_threads;
96 };
97
98 struct ldlm_bl_work_item {
99         struct list_head              blwi_entry;
100         struct ldlm_namespace  *blwi_ns;
101         struct ldlm_lock_desc   blwi_ld;
102         struct ldlm_lock       *blwi_lock;
103         struct list_head              blwi_head;
104         int                  blwi_count;
105         struct completion       blwi_comp;
106         enum ldlm_cancel_flags  blwi_flags;
107         int                  blwi_mem_pressure;
108 };
109
110 /**
111  * Callback handler for receiving incoming blocking ASTs.
112  *
113  * This can only happen on client side.
114  */
115 void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
116                              struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
117 {
118         int do_ast;
119
120         LDLM_DEBUG(lock, "client blocking AST callback handler");
121
122         lock_res_and_lock(lock);
123         ldlm_set_cbpending(lock);
124
125         if (ldlm_is_cancel_on_block(lock))
126                 ldlm_set_cancel(lock);
127
128         do_ast = !lock->l_readers && !lock->l_writers;
129         unlock_res_and_lock(lock);
130
131         if (do_ast) {
132                 CDEBUG(D_DLMTRACE,
133                        "Lock %p already unused, calling callback (%p)\n", lock,
134                        lock->l_blocking_ast);
135                 if (lock->l_blocking_ast)
136                         lock->l_blocking_ast(lock, ld, lock->l_ast_data,
137                                              LDLM_CB_BLOCKING);
138         } else {
139                 CDEBUG(D_DLMTRACE,
140                        "Lock %p is referenced, will be cancelled later\n",
141                        lock);
142         }
143
144         LDLM_DEBUG(lock, "client blocking callback handler END");
145         LDLM_LOCK_RELEASE(lock);
146 }
147
148 /**
149  * Callback handler for receiving incoming completion ASTs.
150  *
151  * This only can happen on client side.
152  */
153 static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
154                                     struct ldlm_namespace *ns,
155                                     struct ldlm_request *dlm_req,
156                                     struct ldlm_lock *lock)
157 {
158         int lvb_len;
159         LIST_HEAD(ast_list);
160         int rc = 0;
161
162         LDLM_DEBUG(lock, "client completion callback handler START");
163
164         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
165                 int to = cfs_time_seconds(1);
166
167                 while (to > 0) {
168                         set_current_state(TASK_INTERRUPTIBLE);
169                         schedule_timeout(to);
170                         if (lock->l_granted_mode == lock->l_req_mode ||
171                             ldlm_is_destroyed(lock))
172                                 break;
173                 }
174         }
175
176         lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT);
177         if (lvb_len < 0) {
178                 LDLM_ERROR(lock, "Fail to get lvb_len, rc = %d", lvb_len);
179                 rc = lvb_len;
180                 goto out;
181         } else if (lvb_len > 0) {
182                 if (lock->l_lvb_len > 0) {
183                         /* for extent lock, lvb contains ost_lvb{}. */
184                         LASSERT(lock->l_lvb_data);
185
186                         if (unlikely(lock->l_lvb_len < lvb_len)) {
187                                 LDLM_ERROR(lock, "Replied LVB is larger than expectation, expected = %d, replied = %d",
188                                            lock->l_lvb_len, lvb_len);
189                                 rc = -EINVAL;
190                                 goto out;
191                         }
192                 } else if (ldlm_has_layout(lock)) { /* for layout lock, lvb has
193                                                      * variable length
194                                                      */
195                         void *lvb_data;
196
197                         lvb_data = kzalloc(lvb_len, GFP_NOFS);
198                         if (!lvb_data) {
199                                 LDLM_ERROR(lock, "No memory: %d.\n", lvb_len);
200                                 rc = -ENOMEM;
201                                 goto out;
202                         }
203
204                         lock_res_and_lock(lock);
205                         LASSERT(!lock->l_lvb_data);
206                         lock->l_lvb_type = LVB_T_LAYOUT;
207                         lock->l_lvb_data = lvb_data;
208                         lock->l_lvb_len = lvb_len;
209                         unlock_res_and_lock(lock);
210                 }
211         }
212
213         lock_res_and_lock(lock);
214         if (ldlm_is_destroyed(lock) ||
215             lock->l_granted_mode == lock->l_req_mode) {
216                 /* bug 11300: the lock has already been granted */
217                 unlock_res_and_lock(lock);
218                 LDLM_DEBUG(lock, "Double grant race happened");
219                 rc = 0;
220                 goto out;
221         }
222
223         /* If we receive the completion AST before the actual enqueue returned,
224          * then we might need to switch lock modes, resources, or extents.
225          */
226         if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {
227                 lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;
228                 LDLM_DEBUG(lock, "completion AST, new lock mode");
229         }
230
231         if (lock->l_resource->lr_type != LDLM_PLAIN) {
232                 ldlm_convert_policy_to_local(req->rq_export,
233                                           dlm_req->lock_desc.l_resource.lr_type,
234                                           &dlm_req->lock_desc.l_policy_data,
235                                           &lock->l_policy_data);
236                 LDLM_DEBUG(lock, "completion AST, new policy data");
237         }
238
239         ldlm_resource_unlink_lock(lock);
240         if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,
241                    &lock->l_resource->lr_name,
242                    sizeof(lock->l_resource->lr_name)) != 0) {
243                 unlock_res_and_lock(lock);
244                 rc = ldlm_lock_change_resource(ns, lock,
245                                 &dlm_req->lock_desc.l_resource.lr_name);
246                 if (rc < 0) {
247                         LDLM_ERROR(lock, "Failed to allocate resource");
248                         goto out;
249                 }
250                 LDLM_DEBUG(lock, "completion AST, new resource");
251                 CERROR("change resource!\n");
252                 lock_res_and_lock(lock);
253         }
254
255         if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {
256                 /* BL_AST locks are not needed in LRU.
257                  * Let ldlm_cancel_lru() be fast.
258                  */
259                 ldlm_lock_remove_from_lru(lock);
260                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
261                 LDLM_DEBUG(lock, "completion AST includes blocking AST");
262         }
263
264         if (lock->l_lvb_len > 0) {
265                 rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_CLIENT,
266                                    lock->l_lvb_data, lvb_len);
267                 if (rc < 0) {
268                         unlock_res_and_lock(lock);
269                         goto out;
270                 }
271         }
272
273         ldlm_grant_lock(lock, &ast_list);
274         unlock_res_and_lock(lock);
275
276         LDLM_DEBUG(lock, "callback handler finished, about to run_ast_work");
277
278         /* Let Enqueue to call osc_lock_upcall() and initialize l_ast_data */
279         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
280
281         ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
282
283         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
284                           lock);
285         goto out;
286
287 out:
288         if (rc < 0) {
289                 lock_res_and_lock(lock);
290                 ldlm_set_failed(lock);
291                 unlock_res_and_lock(lock);
292                 wake_up(&lock->l_waitq);
293         }
294         LDLM_LOCK_RELEASE(lock);
295 }
296
297 /**
298  * Callback handler for receiving incoming glimpse ASTs.
299  *
300  * This only can happen on client side.  After handling the glimpse AST
301  * we also consider dropping the lock here if it is unused locally for a
302  * long time.
303  */
304 static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
305                                     struct ldlm_namespace *ns,
306                                     struct ldlm_request *dlm_req,
307                                     struct ldlm_lock *lock)
308 {
309         int rc = -ENOSYS;
310
311         LDLM_DEBUG(lock, "client glimpse AST callback handler");
312
313         if (lock->l_glimpse_ast)
314                 rc = lock->l_glimpse_ast(lock, req);
315
316         if (req->rq_repmsg) {
317                 ptlrpc_reply(req);
318         } else {
319                 req->rq_status = rc;
320                 ptlrpc_error(req);
321         }
322
323         lock_res_and_lock(lock);
324         if (lock->l_granted_mode == LCK_PW &&
325             !lock->l_readers && !lock->l_writers &&
326             cfs_time_after(cfs_time_current(),
327                            cfs_time_add(lock->l_last_used,
328                                         cfs_time_seconds(10)))) {
329                 unlock_res_and_lock(lock);
330                 if (ldlm_bl_to_thread_lock(ns, NULL, lock))
331                         ldlm_handle_bl_callback(ns, NULL, lock);
332
333                 return;
334         }
335         unlock_res_and_lock(lock);
336         LDLM_LOCK_RELEASE(lock);
337 }
338
339 static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
340 {
341         if (req->rq_no_reply)
342                 return 0;
343
344         req->rq_status = rc;
345         if (!req->rq_packed_final) {
346                 rc = lustre_pack_reply(req, 1, NULL, NULL);
347                 if (rc)
348                         return rc;
349         }
350         return ptlrpc_reply(req);
351 }
352
353 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
354                                enum ldlm_cancel_flags cancel_flags)
355 {
356         struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
357
358         spin_lock(&blp->blp_lock);
359         if (blwi->blwi_lock && ldlm_is_discard_data(blwi->blwi_lock)) {
360                 /* add LDLM_FL_DISCARD_DATA requests to the priority list */
361                 list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
362         } else {
363                 /* other blocking callbacks are added to the regular list */
364                 list_add_tail(&blwi->blwi_entry, &blp->blp_list);
365         }
366         spin_unlock(&blp->blp_lock);
367
368         wake_up(&blp->blp_waitq);
369
370         /* can not check blwi->blwi_flags as blwi could be already freed in
371          * LCF_ASYNC mode
372          */
373         if (!(cancel_flags & LCF_ASYNC))
374                 wait_for_completion(&blwi->blwi_comp);
375
376         return 0;
377 }
378
379 static inline void init_blwi(struct ldlm_bl_work_item *blwi,
380                              struct ldlm_namespace *ns,
381                              struct ldlm_lock_desc *ld,
382                              struct list_head *cancels, int count,
383                              struct ldlm_lock *lock,
384                              enum ldlm_cancel_flags cancel_flags)
385 {
386         init_completion(&blwi->blwi_comp);
387         INIT_LIST_HEAD(&blwi->blwi_head);
388
389         if (memory_pressure_get())
390                 blwi->blwi_mem_pressure = 1;
391
392         blwi->blwi_ns = ns;
393         blwi->blwi_flags = cancel_flags;
394         if (ld)
395                 blwi->blwi_ld = *ld;
396         if (count) {
397                 list_add(&blwi->blwi_head, cancels);
398                 list_del_init(cancels);
399                 blwi->blwi_count = count;
400         } else {
401                 blwi->blwi_lock = lock;
402         }
403 }
404
405 /**
406  * Queues a list of locks \a cancels containing \a count locks
407  * for later processing by a blocking thread.  If \a count is zero,
408  * then the lock referenced as \a lock is queued instead.
409  *
410  * The blocking thread would then call ->l_blocking_ast callback in the lock.
411  * If list addition fails an error is returned and caller is supposed to
412  * call ->l_blocking_ast itself.
413  */
414 static int ldlm_bl_to_thread(struct ldlm_namespace *ns,
415                              struct ldlm_lock_desc *ld,
416                              struct ldlm_lock *lock,
417                              struct list_head *cancels, int count,
418                              enum ldlm_cancel_flags cancel_flags)
419 {
420         if (cancels && count == 0)
421                 return 0;
422
423         if (cancel_flags & LCF_ASYNC) {
424                 struct ldlm_bl_work_item *blwi;
425
426                 blwi = kzalloc(sizeof(*blwi), GFP_NOFS);
427                 if (!blwi)
428                         return -ENOMEM;
429                 init_blwi(blwi, ns, ld, cancels, count, lock, cancel_flags);
430
431                 return __ldlm_bl_to_thread(blwi, cancel_flags);
432         } else {
433                 /* if it is synchronous call do minimum mem alloc, as it could
434                  * be triggered from kernel shrinker
435                  */
436                 struct ldlm_bl_work_item blwi;
437
438                 memset(&blwi, 0, sizeof(blwi));
439                 init_blwi(&blwi, ns, ld, cancels, count, lock, cancel_flags);
440                 return __ldlm_bl_to_thread(&blwi, cancel_flags);
441         }
442 }
443
444 int ldlm_bl_to_thread_lock(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
445                            struct ldlm_lock *lock)
446 {
447         return ldlm_bl_to_thread(ns, ld, lock, NULL, 0, LCF_ASYNC);
448 }
449
450 int ldlm_bl_to_thread_list(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
451                            struct list_head *cancels, int count,
452                            enum ldlm_cancel_flags cancel_flags)
453 {
454         return ldlm_bl_to_thread(ns, ld, NULL, cancels, count, cancel_flags);
455 }
456
457 /* Setinfo coming from Server (eg MDT) to Client (eg MDC)! */
458 static int ldlm_handle_setinfo(struct ptlrpc_request *req)
459 {
460         struct obd_device *obd = req->rq_export->exp_obd;
461         char *key;
462         void *val;
463         int keylen, vallen;
464         int rc = -ENOSYS;
465
466         DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
467
468         req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
469
470         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
471         if (!key) {
472                 DEBUG_REQ(D_IOCTL, req, "no set_info key");
473                 return -EFAULT;
474         }
475         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
476                                       RCL_CLIENT);
477         val = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
478         if (!val) {
479                 DEBUG_REQ(D_IOCTL, req, "no set_info val");
480                 return -EFAULT;
481         }
482         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
483                                       RCL_CLIENT);
484
485         /* We are responsible for swabbing contents of val */
486
487         if (KEY_IS(KEY_HSM_COPYTOOL_SEND))
488                 /* Pass it on to mdc (the "export" in this case) */
489                 rc = obd_set_info_async(req->rq_svc_thread->t_env,
490                                         req->rq_export,
491                                         sizeof(KEY_HSM_COPYTOOL_SEND),
492                                         KEY_HSM_COPYTOOL_SEND,
493                                         vallen, val, NULL);
494         else
495                 DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
496
497         return rc;
498 }
499
500 static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
501                                         const char *msg, int rc,
502                                         const struct lustre_handle *handle)
503 {
504         DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
505                   "%s: [nid %s] [rc %d] [lock %#llx]",
506                   msg, libcfs_id2str(req->rq_peer), rc,
507                   handle ? handle->cookie : 0);
508         if (req->rq_no_reply)
509                 CWARN("No reply was sent, maybe cause bug 21636.\n");
510         else if (rc)
511                 CWARN("Send reply failed, maybe cause bug 21636.\n");
512 }
513
514 static int ldlm_handle_qc_callback(struct ptlrpc_request *req)
515 {
516         struct obd_quotactl *oqctl;
517         struct client_obd *cli = &req->rq_export->exp_obd->u.cli;
518
519         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
520         if (!oqctl) {
521                 CERROR("Can't unpack obd_quotactl\n");
522                 return -EPROTO;
523         }
524
525         oqctl->qc_stat = ptlrpc_status_ntoh(oqctl->qc_stat);
526
527         cli->cl_qchk_stat = oqctl->qc_stat;
528         return 0;
529 }
530
531 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
532 static int ldlm_callback_handler(struct ptlrpc_request *req)
533 {
534         struct ldlm_namespace *ns;
535         struct ldlm_request *dlm_req;
536         struct ldlm_lock *lock;
537         int rc;
538
539         /* Requests arrive in sender's byte order.  The ptlrpc service
540          * handler has already checked and, if necessary, byte-swapped the
541          * incoming request message body, but I am responsible for the
542          * message buffers.
543          */
544
545         /* do nothing for sec context finalize */
546         if (lustre_msg_get_opc(req->rq_reqmsg) == SEC_CTX_FINI)
547                 return 0;
548
549         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
550
551         if (!req->rq_export) {
552                 rc = ldlm_callback_reply(req, -ENOTCONN);
553                 ldlm_callback_errmsg(req, "Operate on unconnected server",
554                                      rc, NULL);
555                 return 0;
556         }
557
558         LASSERT(req->rq_export->exp_obd);
559
560         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
561         case LDLM_BL_CALLBACK:
562                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
563                         if (cfs_fail_err)
564                                 ldlm_callback_reply(req, -(int)cfs_fail_err);
565                         return 0;
566                 }
567                 break;
568         case LDLM_CP_CALLBACK:
569                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
570                         return 0;
571                 break;
572         case LDLM_GL_CALLBACK:
573                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
574                         return 0;
575                 break;
576         case LDLM_SET_INFO:
577                 rc = ldlm_handle_setinfo(req);
578                 ldlm_callback_reply(req, rc);
579                 return 0;
580         case OBD_QC_CALLBACK:
581                 req_capsule_set(&req->rq_pill, &RQF_QC_CALLBACK);
582                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_QC_CALLBACK_NET))
583                         return 0;
584                 rc = ldlm_handle_qc_callback(req);
585                 ldlm_callback_reply(req, rc);
586                 return 0;
587         default:
588                 CERROR("unknown opcode %u\n",
589                        lustre_msg_get_opc(req->rq_reqmsg));
590                 ldlm_callback_reply(req, -EPROTO);
591                 return 0;
592         }
593
594         ns = req->rq_export->exp_obd->obd_namespace;
595         LASSERT(ns);
596
597         req_capsule_set(&req->rq_pill, &RQF_LDLM_CALLBACK);
598
599         dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
600         if (!dlm_req) {
601                 rc = ldlm_callback_reply(req, -EPROTO);
602                 ldlm_callback_errmsg(req, "Operate without parameter", rc,
603                                      NULL);
604                 return 0;
605         }
606
607         /* Force a known safe race, send a cancel to the server for a lock
608          * which the server has already started a blocking callback on.
609          */
610         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
611             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
612                 rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
613                 if (rc < 0)
614                         CERROR("ldlm_cli_cancel: %d\n", rc);
615         }
616
617         lock = ldlm_handle2lock_long(&dlm_req->lock_handle[0], 0);
618         if (!lock) {
619                 CDEBUG(D_DLMTRACE, "callback on lock %#llx - lock disappeared\n",
620                        dlm_req->lock_handle[0].cookie);
621                 rc = ldlm_callback_reply(req, -EINVAL);
622                 ldlm_callback_errmsg(req, "Operate with invalid parameter", rc,
623                                      &dlm_req->lock_handle[0]);
624                 return 0;
625         }
626
627         if (ldlm_is_fail_loc(lock) &&
628             lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
629                 OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
630
631         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
632         lock_res_and_lock(lock);
633         lock->l_flags |= ldlm_flags_from_wire(dlm_req->lock_flags &
634                                               LDLM_FL_AST_MASK);
635         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
636                 /* If somebody cancels lock and cache is already dropped,
637                  * or lock is failed before cp_ast received on client,
638                  * we can tell the server we have no lock. Otherwise, we
639                  * should send cancel after dropping the cache.
640                  */
641                 if ((ldlm_is_canceling(lock) && ldlm_is_bl_done(lock)) ||
642                     ldlm_is_failed(lock)) {
643                         LDLM_DEBUG(lock,
644                                    "callback on lock %#llx - lock disappeared",
645                                    dlm_req->lock_handle[0].cookie);
646                         unlock_res_and_lock(lock);
647                         LDLM_LOCK_RELEASE(lock);
648                         rc = ldlm_callback_reply(req, -EINVAL);
649                         ldlm_callback_errmsg(req, "Operate on stale lock", rc,
650                                              &dlm_req->lock_handle[0]);
651                         return 0;
652                 }
653                 /* BL_AST locks are not needed in LRU.
654                  * Let ldlm_cancel_lru() be fast.
655                  */
656                 ldlm_lock_remove_from_lru(lock);
657                 ldlm_set_bl_ast(lock);
658         }
659         unlock_res_and_lock(lock);
660
661         /* We want the ost thread to get this reply so that it can respond
662          * to ost requests (write cache writeback) that might be triggered
663          * in the callback.
664          *
665          * But we'd also like to be able to indicate in the reply that we're
666          * cancelling right now, because it's unused, or have an intent result
667          * in the reply, so we might have to push the responsibility for sending
668          * the reply down into the AST handlers, alas.
669          */
670
671         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
672         case LDLM_BL_CALLBACK:
673                 CDEBUG(D_INODE, "blocking ast\n");
674                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
675                 if (!ldlm_is_cancel_on_block(lock)) {
676                         rc = ldlm_callback_reply(req, 0);
677                         if (req->rq_no_reply || rc)
678                                 ldlm_callback_errmsg(req, "Normal process", rc,
679                                                      &dlm_req->lock_handle[0]);
680                 }
681                 if (ldlm_bl_to_thread_lock(ns, &dlm_req->lock_desc, lock))
682                         ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
683                 break;
684         case LDLM_CP_CALLBACK:
685                 CDEBUG(D_INODE, "completion ast\n");
686                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
687                 ldlm_callback_reply(req, 0);
688                 ldlm_handle_cp_callback(req, ns, dlm_req, lock);
689                 break;
690         case LDLM_GL_CALLBACK:
691                 CDEBUG(D_INODE, "glimpse ast\n");
692                 req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
693                 ldlm_handle_gl_callback(req, ns, dlm_req, lock);
694                 break;
695         default:
696                 LBUG();                  /* checked above */
697         }
698
699         return 0;
700 }
701
702 static struct ldlm_bl_work_item *ldlm_bl_get_work(struct ldlm_bl_pool *blp)
703 {
704         struct ldlm_bl_work_item *blwi = NULL;
705         static unsigned int num_bl;
706
707         spin_lock(&blp->blp_lock);
708         /* process a request from the blp_list at least every blp_num_threads */
709         if (!list_empty(&blp->blp_list) &&
710             (list_empty(&blp->blp_prio_list) || num_bl == 0))
711                 blwi = list_entry(blp->blp_list.next,
712                                   struct ldlm_bl_work_item, blwi_entry);
713         else
714                 if (!list_empty(&blp->blp_prio_list))
715                         blwi = list_entry(blp->blp_prio_list.next,
716                                           struct ldlm_bl_work_item,
717                                           blwi_entry);
718
719         if (blwi) {
720                 if (++num_bl >= atomic_read(&blp->blp_num_threads))
721                         num_bl = 0;
722                 list_del(&blwi->blwi_entry);
723         }
724         spin_unlock(&blp->blp_lock);
725
726         return blwi;
727 }
728
729 /* This only contains temporary data until the thread starts */
730 struct ldlm_bl_thread_data {
731         char                    bltd_name[CFS_CURPROC_COMM_MAX];
732         struct ldlm_bl_pool     *bltd_blp;
733         struct completion       bltd_comp;
734         int                     bltd_num;
735 };
736
737 static int ldlm_bl_thread_main(void *arg);
738
739 static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
740 {
741         struct ldlm_bl_thread_data bltd = { .bltd_blp = blp };
742         struct task_struct *task;
743
744         init_completion(&bltd.bltd_comp);
745         bltd.bltd_num = atomic_read(&blp->blp_num_threads);
746         snprintf(bltd.bltd_name, sizeof(bltd.bltd_name),
747                  "ldlm_bl_%02d", bltd.bltd_num);
748         task = kthread_run(ldlm_bl_thread_main, &bltd, "%s", bltd.bltd_name);
749         if (IS_ERR(task)) {
750                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %ld\n",
751                        atomic_read(&blp->blp_num_threads), PTR_ERR(task));
752                 return PTR_ERR(task);
753         }
754         wait_for_completion(&bltd.bltd_comp);
755
756         return 0;
757 }
758
759 /**
760  * Main blocking requests processing thread.
761  *
762  * Callers put locks into its queue by calling ldlm_bl_to_thread.
763  * This thread in the end ends up doing actual call to ->l_blocking_ast
764  * for queued locks.
765  */
766 static int ldlm_bl_thread_main(void *arg)
767 {
768         struct ldlm_bl_pool *blp;
769
770         {
771                 struct ldlm_bl_thread_data *bltd = arg;
772
773                 blp = bltd->bltd_blp;
774
775                 atomic_inc(&blp->blp_num_threads);
776                 atomic_inc(&blp->blp_busy_threads);
777
778                 complete(&bltd->bltd_comp);
779                 /* cannot use bltd after this, it is only on caller's stack */
780         }
781
782         while (1) {
783                 struct l_wait_info lwi = { 0 };
784                 struct ldlm_bl_work_item *blwi = NULL;
785                 int busy;
786
787                 blwi = ldlm_bl_get_work(blp);
788
789                 if (!blwi) {
790                         atomic_dec(&blp->blp_busy_threads);
791                         l_wait_event_exclusive(blp->blp_waitq,
792                                                (blwi = ldlm_bl_get_work(blp)),
793                                                &lwi);
794                         busy = atomic_inc_return(&blp->blp_busy_threads);
795                 } else {
796                         busy = atomic_read(&blp->blp_busy_threads);
797                 }
798
799                 if (!blwi->blwi_ns)
800                         /* added by ldlm_cleanup() */
801                         break;
802
803                 /* Not fatal if racy and have a few too many threads */
804                 if (unlikely(busy < blp->blp_max_threads &&
805                              busy >= atomic_read(&blp->blp_num_threads) &&
806                              !blwi->blwi_mem_pressure))
807                         /* discard the return value, we tried */
808                         ldlm_bl_thread_start(blp);
809
810                 if (blwi->blwi_mem_pressure)
811                         memory_pressure_set();
812
813                 if (blwi->blwi_count) {
814                         int count;
815                         /* The special case when we cancel locks in LRU
816                          * asynchronously, we pass the list of locks here.
817                          * Thus locks are marked LDLM_FL_CANCELING, but NOT
818                          * canceled locally yet.
819                          */
820                         count = ldlm_cli_cancel_list_local(&blwi->blwi_head,
821                                                            blwi->blwi_count,
822                                                            LCF_BL_AST);
823                         ldlm_cli_cancel_list(&blwi->blwi_head, count, NULL,
824                                              blwi->blwi_flags);
825                 } else {
826                         ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
827                                                 blwi->blwi_lock);
828                 }
829                 if (blwi->blwi_mem_pressure)
830                         memory_pressure_clr();
831
832                 if (blwi->blwi_flags & LCF_ASYNC)
833                         kfree(blwi);
834                 else
835                         complete(&blwi->blwi_comp);
836         }
837
838         atomic_dec(&blp->blp_busy_threads);
839         atomic_dec(&blp->blp_num_threads);
840         complete(&blp->blp_comp);
841         return 0;
842 }
843
844 static int ldlm_setup(void);
845 static int ldlm_cleanup(void);
846
847 int ldlm_get_ref(void)
848 {
849         int rc = 0;
850
851         mutex_lock(&ldlm_ref_mutex);
852         if (++ldlm_refcount == 1) {
853                 rc = ldlm_setup();
854                 if (rc)
855                         ldlm_refcount--;
856         }
857         mutex_unlock(&ldlm_ref_mutex);
858
859         return rc;
860 }
861 EXPORT_SYMBOL(ldlm_get_ref);
862
863 void ldlm_put_ref(void)
864 {
865         mutex_lock(&ldlm_ref_mutex);
866         if (ldlm_refcount == 1) {
867                 int rc = ldlm_cleanup();
868
869                 if (rc)
870                         CERROR("ldlm_cleanup failed: %d\n", rc);
871                 else
872                         ldlm_refcount--;
873         } else {
874                 ldlm_refcount--;
875         }
876         mutex_unlock(&ldlm_ref_mutex);
877 }
878 EXPORT_SYMBOL(ldlm_put_ref);
879
880 static ssize_t cancel_unused_locks_before_replay_show(struct kobject *kobj,
881                                                       struct attribute *attr,
882                                                       char *buf)
883 {
884         return sprintf(buf, "%d\n", ldlm_cancel_unused_locks_before_replay);
885 }
886
887 static ssize_t cancel_unused_locks_before_replay_store(struct kobject *kobj,
888                                                        struct attribute *attr,
889                                                        const char *buffer,
890                                                        size_t count)
891 {
892         int rc;
893         unsigned long val;
894
895         rc = kstrtoul(buffer, 10, &val);
896         if (rc)
897                 return rc;
898
899         ldlm_cancel_unused_locks_before_replay = val;
900
901         return count;
902 }
903 LUSTRE_RW_ATTR(cancel_unused_locks_before_replay);
904
905 /* These are for root of /sys/fs/lustre/ldlm */
906 static struct attribute *ldlm_attrs[] = {
907         &lustre_attr_cancel_unused_locks_before_replay.attr,
908         NULL,
909 };
910
911 static struct attribute_group ldlm_attr_group = {
912         .attrs = ldlm_attrs,
913 };
914
915 static int ldlm_setup(void)
916 {
917         static struct ptlrpc_service_conf       conf;
918         struct ldlm_bl_pool                     *blp = NULL;
919         int rc = 0;
920         int i;
921
922         if (ldlm_state)
923                 return -EALREADY;
924
925         ldlm_state = kzalloc(sizeof(*ldlm_state), GFP_NOFS);
926         if (!ldlm_state)
927                 return -ENOMEM;
928
929         ldlm_kobj = kobject_create_and_add("ldlm", lustre_kobj);
930         if (!ldlm_kobj) {
931                 rc = -ENOMEM;
932                 goto out;
933         }
934
935         rc = sysfs_create_group(ldlm_kobj, &ldlm_attr_group);
936         if (rc)
937                 goto out;
938
939         ldlm_ns_kset = kset_create_and_add("namespaces", NULL, ldlm_kobj);
940         if (!ldlm_ns_kset) {
941                 rc = -ENOMEM;
942                 goto out;
943         }
944
945         ldlm_svc_kset = kset_create_and_add("services", NULL, ldlm_kobj);
946         if (!ldlm_svc_kset) {
947                 rc = -ENOMEM;
948                 goto out;
949         }
950
951         rc = ldlm_debugfs_setup();
952         if (rc != 0)
953                 goto out;
954
955         memset(&conf, 0, sizeof(conf));
956         conf = (typeof(conf)) {
957                 .psc_name               = "ldlm_cbd",
958                 .psc_watchdog_factor    = 2,
959                 .psc_buf                = {
960                         .bc_nbufs               = LDLM_CLIENT_NBUFS,
961                         .bc_buf_size            = LDLM_BUFSIZE,
962                         .bc_req_max_size        = LDLM_MAXREQSIZE,
963                         .bc_rep_max_size        = LDLM_MAXREPSIZE,
964                         .bc_req_portal          = LDLM_CB_REQUEST_PORTAL,
965                         .bc_rep_portal          = LDLM_CB_REPLY_PORTAL,
966                 },
967                 .psc_thr                = {
968                         .tc_thr_name            = "ldlm_cb",
969                         .tc_thr_factor          = LDLM_THR_FACTOR,
970                         .tc_nthrs_init          = LDLM_NTHRS_INIT,
971                         .tc_nthrs_base          = LDLM_NTHRS_BASE,
972                         .tc_nthrs_max           = LDLM_NTHRS_MAX,
973                         .tc_nthrs_user          = ldlm_num_threads,
974                         .tc_cpu_affinity        = 1,
975                         .tc_ctx_tags            = LCT_MD_THREAD | LCT_DT_THREAD,
976                 },
977                 .psc_cpt                = {
978                         .cc_pattern             = ldlm_cpts,
979                 },
980                 .psc_ops                = {
981                         .so_req_handler         = ldlm_callback_handler,
982                 },
983         };
984         ldlm_state->ldlm_cb_service =
985                         ptlrpc_register_service(&conf, ldlm_svc_kset,
986                                                 ldlm_svc_debugfs_dir);
987         if (IS_ERR(ldlm_state->ldlm_cb_service)) {
988                 CERROR("failed to start service\n");
989                 rc = PTR_ERR(ldlm_state->ldlm_cb_service);
990                 ldlm_state->ldlm_cb_service = NULL;
991                 goto out;
992         }
993
994         blp = kzalloc(sizeof(*blp), GFP_NOFS);
995         if (!blp) {
996                 rc = -ENOMEM;
997                 goto out;
998         }
999         ldlm_state->ldlm_bl_pool = blp;
1000
1001         spin_lock_init(&blp->blp_lock);
1002         INIT_LIST_HEAD(&blp->blp_list);
1003         INIT_LIST_HEAD(&blp->blp_prio_list);
1004         init_waitqueue_head(&blp->blp_waitq);
1005         atomic_set(&blp->blp_num_threads, 0);
1006         atomic_set(&blp->blp_busy_threads, 0);
1007
1008         if (ldlm_num_threads == 0) {
1009                 blp->blp_min_threads = LDLM_NTHRS_INIT;
1010                 blp->blp_max_threads = LDLM_NTHRS_MAX;
1011         } else {
1012                 blp->blp_min_threads = min_t(int, LDLM_NTHRS_MAX,
1013                                              max_t(int, LDLM_NTHRS_INIT,
1014                                                    ldlm_num_threads));
1015
1016                 blp->blp_max_threads = blp->blp_min_threads;
1017         }
1018
1019         for (i = 0; i < blp->blp_min_threads; i++) {
1020                 rc = ldlm_bl_thread_start(blp);
1021                 if (rc < 0)
1022                         goto out;
1023         }
1024
1025         rc = ldlm_pools_init();
1026         if (rc) {
1027                 CERROR("Failed to initialize LDLM pools: %d\n", rc);
1028                 goto out;
1029         }
1030         return 0;
1031
1032  out:
1033         ldlm_cleanup();
1034         return rc;
1035 }
1036
1037 static int ldlm_cleanup(void)
1038 {
1039         if (!list_empty(ldlm_namespace_list(LDLM_NAMESPACE_SERVER)) ||
1040             !list_empty(ldlm_namespace_list(LDLM_NAMESPACE_CLIENT))) {
1041                 CERROR("ldlm still has namespaces; clean these up first.\n");
1042                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_SERVER, D_DLMTRACE);
1043                 ldlm_dump_all_namespaces(LDLM_NAMESPACE_CLIENT, D_DLMTRACE);
1044                 return -EBUSY;
1045         }
1046
1047         ldlm_pools_fini();
1048
1049         if (ldlm_state->ldlm_bl_pool) {
1050                 struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
1051
1052                 while (atomic_read(&blp->blp_num_threads) > 0) {
1053                         struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
1054
1055                         init_completion(&blp->blp_comp);
1056
1057                         spin_lock(&blp->blp_lock);
1058                         list_add_tail(&blwi.blwi_entry, &blp->blp_list);
1059                         wake_up(&blp->blp_waitq);
1060                         spin_unlock(&blp->blp_lock);
1061
1062                         wait_for_completion(&blp->blp_comp);
1063                 }
1064
1065                 kfree(blp);
1066         }
1067
1068         if (ldlm_state->ldlm_cb_service)
1069                 ptlrpc_unregister_service(ldlm_state->ldlm_cb_service);
1070
1071         if (ldlm_ns_kset)
1072                 kset_unregister(ldlm_ns_kset);
1073         if (ldlm_svc_kset)
1074                 kset_unregister(ldlm_svc_kset);
1075         if (ldlm_kobj)
1076                 kobject_put(ldlm_kobj);
1077
1078         ldlm_debugfs_cleanup();
1079
1080         kfree(ldlm_state);
1081         ldlm_state = NULL;
1082
1083         return 0;
1084 }
1085
1086 int ldlm_init(void)
1087 {
1088         mutex_init(&ldlm_ref_mutex);
1089         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_SERVER));
1090         mutex_init(ldlm_namespace_lock(LDLM_NAMESPACE_CLIENT));
1091         ldlm_resource_slab = kmem_cache_create("ldlm_resources",
1092                                                sizeof(struct ldlm_resource), 0,
1093                                                SLAB_HWCACHE_ALIGN, NULL);
1094         if (!ldlm_resource_slab)
1095                 return -ENOMEM;
1096
1097         ldlm_lock_slab = kmem_cache_create("ldlm_locks",
1098                                            sizeof(struct ldlm_lock), 0,
1099                                            SLAB_HWCACHE_ALIGN |
1100                                            SLAB_DESTROY_BY_RCU, NULL);
1101         if (!ldlm_lock_slab) {
1102                 kmem_cache_destroy(ldlm_resource_slab);
1103                 return -ENOMEM;
1104         }
1105
1106         ldlm_interval_slab = kmem_cache_create("interval_node",
1107                                                sizeof(struct ldlm_interval),
1108                                                0, SLAB_HWCACHE_ALIGN, NULL);
1109         if (!ldlm_interval_slab) {
1110                 kmem_cache_destroy(ldlm_resource_slab);
1111                 kmem_cache_destroy(ldlm_lock_slab);
1112                 return -ENOMEM;
1113         }
1114 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1115         class_export_dump_hook = ldlm_dump_export_locks;
1116 #endif
1117         return 0;
1118 }
1119
1120 void ldlm_exit(void)
1121 {
1122         if (ldlm_refcount)
1123                 CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
1124         kmem_cache_destroy(ldlm_resource_slab);
1125         /* ldlm_lock_put() use RCU to call ldlm_lock_free, so need call
1126          * synchronize_rcu() to wait a grace period elapsed, so that
1127          * ldlm_lock_free() get a chance to be called.
1128          */
1129         synchronize_rcu();
1130         kmem_cache_destroy(ldlm_lock_slab);
1131         kmem_cache_destroy(ldlm_interval_slab);
1132 }