4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
33 #define DEBUG_SUBSYSTEM S_MDC
35 # include <linux/module.h>
37 #include "../include/lustre_intent.h"
38 #include "../include/obd.h"
39 #include "../include/obd_class.h"
40 #include "../include/lustre_dlm.h"
41 #include "../include/lustre_fid.h" /* fid_res_name_eq() */
42 #include "../include/lustre_mdc.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre_req_layout.h"
45 #include "mdc_internal.h"
47 struct mdc_getattr_args {
48 struct obd_export *ga_exp;
49 struct md_enqueue_info *ga_minfo;
50 struct ldlm_enqueue_info *ga_einfo;
53 int it_open_error(int phase, struct lookup_intent *it)
55 if (it_disposition(it, DISP_OPEN_LEASE)) {
56 if (phase >= DISP_OPEN_LEASE)
61 if (it_disposition(it, DISP_OPEN_OPEN)) {
62 if (phase >= DISP_OPEN_OPEN)
68 if (it_disposition(it, DISP_OPEN_CREATE)) {
69 if (phase >= DISP_OPEN_CREATE)
75 if (it_disposition(it, DISP_LOOKUP_EXECD)) {
76 if (phase >= DISP_LOOKUP_EXECD)
82 if (it_disposition(it, DISP_IT_EXECD)) {
83 if (phase >= DISP_IT_EXECD)
88 CERROR("it disp: %X, status: %d\n", it->it_disposition,
93 EXPORT_SYMBOL(it_open_error);
95 /* this must be called on a lockh that is known to have a referenced lock */
96 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
97 void *data, __u64 *bits)
99 struct ldlm_lock *lock;
100 struct inode *new_inode = data;
105 if (!lustre_handle_is_used(lockh))
108 lock = ldlm_handle2lock(lockh);
111 lock_res_and_lock(lock);
112 if (lock->l_resource->lr_lvb_inode &&
113 lock->l_resource->lr_lvb_inode != data) {
114 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
116 LASSERTF(old_inode->i_state & I_FREEING,
117 "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
118 old_inode, old_inode->i_ino, old_inode->i_generation,
119 old_inode->i_state, new_inode, new_inode->i_ino,
120 new_inode->i_generation);
122 lock->l_resource->lr_lvb_inode = new_inode;
124 *bits = lock->l_policy_data.l_inodebits.bits;
126 unlock_res_and_lock(lock);
132 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
133 const struct lu_fid *fid, enum ldlm_type type,
134 ldlm_policy_data_t *policy, enum ldlm_mode mode,
135 struct lustre_handle *lockh)
137 struct ldlm_res_id res_id;
140 fid_build_reg_res_name(fid, &res_id);
141 /* LU-4405: Clear bits not supported by server */
142 policy->l_inodebits.bits &= exp_connect_ibits(exp);
143 rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
144 &res_id, type, policy, mode, lockh, 0);
148 int mdc_cancel_unused(struct obd_export *exp,
149 const struct lu_fid *fid,
150 ldlm_policy_data_t *policy,
152 enum ldlm_cancel_flags flags,
155 struct ldlm_res_id res_id;
156 struct obd_device *obd = class_exp2obd(exp);
159 fid_build_reg_res_name(fid, &res_id);
160 rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
161 policy, mode, flags, opaque);
165 int mdc_null_inode(struct obd_export *exp,
166 const struct lu_fid *fid)
168 struct ldlm_res_id res_id;
169 struct ldlm_resource *res;
170 struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
172 LASSERTF(ns, "no namespace passed\n");
174 fid_build_reg_res_name(fid, &res_id);
176 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
181 res->lr_lvb_inode = NULL;
184 ldlm_resource_putref(res);
188 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
190 /* Don't hold error requests for replay. */
191 if (req->rq_replay) {
192 spin_lock(&req->rq_lock);
194 spin_unlock(&req->rq_lock);
196 if (rc && req->rq_transno != 0) {
197 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
202 /* Save a large LOV EA into the request buffer so that it is available
203 * for replay. We don't do this in the initial request because the
204 * original request doesn't need this buffer (at most it sends just the
205 * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
206 * buffer and may also be difficult to allocate and save a very large
207 * request buffer for each open. (bug 5707)
209 * OOM here may cause recovery failure if lmm is needed (only for the
210 * original open if the MDS crashed just when this client also OOM'd)
211 * but this is incredibly unlikely, and questionable whether the client
212 * could do MDS recovery under OOM anyways...
214 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
215 struct mdt_body *body)
219 /* FIXME: remove this explicit offset. */
220 rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
221 body->mbo_eadatasize);
223 CERROR("Can't enlarge segment %d size to %d\n",
224 DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
225 body->mbo_valid &= ~OBD_MD_FLEASIZE;
226 body->mbo_eadatasize = 0;
230 static struct ptlrpc_request *
231 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
232 struct md_op_data *op_data)
234 struct ptlrpc_request *req;
235 struct obd_device *obddev = class_exp2obd(exp);
236 struct ldlm_intent *lit;
237 const void *lmm = op_data->op_data;
238 u32 lmmsize = op_data->op_data_size;
244 it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
246 /* XXX: openlock is not cancelled for cross-refs. */
247 /* If inode is known, cancel conflicting OPEN locks. */
248 if (fid_is_sane(&op_data->op_fid2)) {
249 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
250 if (it->it_flags & FMODE_WRITE)
255 if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
257 else if (it->it_flags & __FMODE_EXEC)
262 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
267 /* If CREATE, cancel parent's UPDATE lock. */
268 if (it->it_op & IT_CREAT)
272 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
274 MDS_INODELOCK_UPDATE);
276 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
277 &RQF_LDLM_INTENT_OPEN);
279 ldlm_lock_list_put(&cancels, l_bl_ast, count);
280 return ERR_PTR(-ENOMEM);
283 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
284 op_data->op_namelen + 1);
285 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
286 max(lmmsize, obddev->u.cli.cl_default_mds_easize));
288 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
290 ptlrpc_request_free(req);
294 spin_lock(&req->rq_lock);
295 req->rq_replay = req->rq_import->imp_replayable;
296 spin_unlock(&req->rq_lock);
298 /* pack the intent */
299 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
300 lit->opc = (__u64)it->it_op;
302 /* pack the intended request */
303 mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
306 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
307 obddev->u.cli.cl_max_mds_easize);
309 ptlrpc_request_set_replen(req);
313 static struct ptlrpc_request *
314 mdc_intent_getxattr_pack(struct obd_export *exp,
315 struct lookup_intent *it,
316 struct md_op_data *op_data)
318 struct ptlrpc_request *req;
319 struct ldlm_intent *lit;
324 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
325 &RQF_LDLM_INTENT_GETXATTR);
327 return ERR_PTR(-ENOMEM);
329 rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
331 ptlrpc_request_free(req);
335 /* pack the intent */
336 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
337 lit->opc = IT_GETXATTR;
339 maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
341 /* pack the intended request */
342 mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
345 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
347 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
349 req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
350 RCL_SERVER, maxdata);
352 ptlrpc_request_set_replen(req);
357 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
358 struct lookup_intent *it,
359 struct md_op_data *op_data)
361 struct ptlrpc_request *req;
362 struct obd_device *obddev = class_exp2obd(exp);
363 struct ldlm_intent *lit;
366 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
367 &RQF_LDLM_INTENT_UNLINK);
369 return ERR_PTR(-ENOMEM);
371 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
372 op_data->op_namelen + 1);
374 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
376 ptlrpc_request_free(req);
380 /* pack the intent */
381 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
382 lit->opc = (__u64)it->it_op;
384 /* pack the intended request */
385 mdc_unlink_pack(req, op_data);
387 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
388 obddev->u.cli.cl_default_mds_easize);
389 req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
390 obddev->u.cli.cl_default_mds_cookiesize);
391 ptlrpc_request_set_replen(req);
395 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
396 struct lookup_intent *it,
397 struct md_op_data *op_data)
399 struct ptlrpc_request *req;
400 struct obd_device *obddev = class_exp2obd(exp);
401 u64 valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
402 OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
403 OBD_MD_MEA | OBD_MD_FLACL;
404 struct ldlm_intent *lit;
408 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409 &RQF_LDLM_INTENT_GETATTR);
411 return ERR_PTR(-ENOMEM);
413 req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414 op_data->op_namelen + 1);
416 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
418 ptlrpc_request_free(req);
422 /* pack the intent */
423 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424 lit->opc = (__u64)it->it_op;
426 if (obddev->u.cli.cl_default_mds_easize > 0)
427 easize = obddev->u.cli.cl_default_mds_easize;
429 easize = obddev->u.cli.cl_max_mds_easize;
431 /* pack the intended request */
432 mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
434 req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
435 ptlrpc_request_set_replen(req);
439 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
440 struct lookup_intent *it,
441 struct md_op_data *unused)
443 struct obd_device *obd = class_exp2obd(exp);
444 struct ptlrpc_request *req;
445 struct ldlm_intent *lit;
446 struct layout_intent *layout;
449 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
450 &RQF_LDLM_INTENT_LAYOUT);
452 return ERR_PTR(-ENOMEM);
454 req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
455 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
457 ptlrpc_request_free(req);
461 /* pack the intent */
462 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463 lit->opc = (__u64)it->it_op;
465 /* pack the layout intent request */
466 layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
467 /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
468 * set for replication
470 layout->li_opc = LAYOUT_INTENT_ACCESS;
472 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
473 obd->u.cli.cl_default_mds_easize);
474 ptlrpc_request_set_replen(req);
478 static struct ptlrpc_request *
479 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
481 struct ptlrpc_request *req;
484 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
486 return ERR_PTR(-ENOMEM);
488 rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
490 ptlrpc_request_free(req);
494 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
495 ptlrpc_request_set_replen(req);
499 static int mdc_finish_enqueue(struct obd_export *exp,
500 struct ptlrpc_request *req,
501 struct ldlm_enqueue_info *einfo,
502 struct lookup_intent *it,
503 struct lustre_handle *lockh,
506 struct req_capsule *pill = &req->rq_pill;
507 struct ldlm_request *lockreq;
508 struct ldlm_reply *lockrep;
509 struct ldlm_lock *lock;
510 void *lvb_data = NULL;
514 /* Similarly, if we're going to replay this request, we don't want to
515 * actually get a lock, just perform the intent.
517 if (req->rq_transno || req->rq_replay) {
518 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
519 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
522 if (rc == ELDLM_LOCK_ABORTED) {
524 memset(lockh, 0, sizeof(*lockh));
526 } else { /* rc = 0 */
527 lock = ldlm_handle2lock(lockh);
529 /* If the server gave us back a different lock mode, we should
530 * fix up our variables.
532 if (lock->l_req_mode != einfo->ei_mode) {
533 ldlm_lock_addref(lockh, lock->l_req_mode);
534 ldlm_lock_decref(lockh, einfo->ei_mode);
535 einfo->ei_mode = lock->l_req_mode;
540 lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
542 it->it_disposition = (int)lockrep->lock_policy_res1;
543 it->it_status = (int)lockrep->lock_policy_res2;
544 it->it_lock_mode = einfo->ei_mode;
545 it->it_lock_handle = lockh->cookie;
546 it->it_request = req;
548 /* Technically speaking rq_transno must already be zero if
549 * it_status is in error, so the check is a bit redundant
551 if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
552 mdc_clear_replay_flag(req, it->it_status);
554 /* If we're doing an IT_OPEN which did not result in an actual
555 * successful open, then we need to remove the bit which saves
556 * this request for unconditional replay.
558 * It's important that we do this first! Otherwise we might exit the
559 * function without doing so, and try to replay a failed create
562 if (it->it_op & IT_OPEN && req->rq_replay &&
563 (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
564 mdc_clear_replay_flag(req, it->it_status);
566 DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
567 it->it_op, it->it_disposition, it->it_status);
569 /* We know what to expect, so we do any byte flipping required here */
570 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
571 struct mdt_body *body;
573 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
575 CERROR("Can't swab mdt_body\n");
579 if (it_disposition(it, DISP_OPEN_OPEN) &&
580 !it_open_error(DISP_OPEN_OPEN, it)) {
582 * If this is a successful OPEN request, we need to set
583 * replay handler and data early, so that if replay
584 * happens immediately after swabbing below, new reply
585 * is swabbed by that handler correctly.
587 mdc_set_open_replay_data(NULL, NULL, it);
590 if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
593 mdc_update_max_ea_from_body(exp, body);
596 * The eadata is opaque; just check that it is there.
597 * Eventually, obd_unpackmd() will check the contents.
599 eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
600 body->mbo_eadatasize);
604 /* save lvb data and length in case this is for layout
608 lvb_len = body->mbo_eadatasize;
611 * We save the reply LOV EA in case we have to replay a
612 * create for recovery. If we didn't allocate a large
613 * enough request buffer above we need to reallocate it
614 * here to hold the actual LOV EA.
616 * To not save LOV EA if request is not going to replay
617 * (for example error one).
619 if ((it->it_op & IT_OPEN) && req->rq_replay) {
622 if (req_capsule_get_size(pill, &RMF_EADATA,
624 body->mbo_eadatasize)
625 mdc_realloc_openmsg(req, body);
627 req_capsule_shrink(pill, &RMF_EADATA,
628 body->mbo_eadatasize,
631 req_capsule_set_size(pill, &RMF_EADATA,
633 body->mbo_eadatasize);
635 lmm = req_capsule_client_get(pill, &RMF_EADATA);
637 memcpy(lmm, eadata, body->mbo_eadatasize);
640 } else if (it->it_op & IT_LAYOUT) {
641 /* maybe the lock was granted right away and layout
642 * is packed into RMF_DLM_LVB of req
644 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
646 lvb_data = req_capsule_server_sized_get(pill,
654 /* fill in stripe data for layout lock */
655 lock = ldlm_handle2lock(lockh);
656 if (lock && ldlm_has_layout(lock) && lvb_data) {
659 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
660 ldlm_it2str(it->it_op), lvb_len);
662 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
667 memcpy(lmm, lvb_data, lvb_len);
669 /* install lvb_data */
670 lock_res_and_lock(lock);
671 if (!lock->l_lvb_data) {
672 lock->l_lvb_type = LVB_T_LAYOUT;
673 lock->l_lvb_data = lmm;
674 lock->l_lvb_len = lvb_len;
677 unlock_res_and_lock(lock);
687 /* We always reserve enough space in the reply packet for a stripe MD, because
688 * we don't know in advance the file type.
690 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
691 const ldlm_policy_data_t *policy,
692 struct lookup_intent *it, struct md_op_data *op_data,
693 struct lustre_handle *lockh, u64 extra_lock_flags)
695 static const ldlm_policy_data_t lookup_policy = {
696 .l_inodebits = { MDS_INODELOCK_LOOKUP }
698 static const ldlm_policy_data_t update_policy = {
699 .l_inodebits = { MDS_INODELOCK_UPDATE }
701 static const ldlm_policy_data_t layout_policy = {
702 .l_inodebits = { MDS_INODELOCK_LAYOUT }
704 static const ldlm_policy_data_t getxattr_policy = {
705 .l_inodebits = { MDS_INODELOCK_XATTR }
707 struct obd_device *obddev = class_exp2obd(exp);
708 struct ptlrpc_request *req = NULL;
709 u64 flags, saved_flags = extra_lock_flags;
710 struct ldlm_res_id res_id;
711 int generation, resends = 0;
712 struct ldlm_reply *lockrep;
713 enum lvb_type lvb_type = LVB_T_NONE;
716 LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
718 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
723 saved_flags |= LDLM_FL_HAS_INTENT;
724 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
725 policy = &update_policy;
726 else if (it->it_op & IT_LAYOUT)
727 policy = &layout_policy;
728 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
729 policy = &getxattr_policy;
731 policy = &lookup_policy;
734 generation = obddev->u.cli.cl_import->imp_generation;
738 /* The only way right now is FLOCK. */
739 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
741 res_id.name[3] = LDLM_FLOCK;
742 } else if (it->it_op & IT_OPEN) {
743 req = mdc_intent_open_pack(exp, it, op_data);
744 } else if (it->it_op & IT_UNLINK) {
745 req = mdc_intent_unlink_pack(exp, it, op_data);
746 } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
747 req = mdc_intent_getattr_pack(exp, it, op_data);
748 } else if (it->it_op & IT_READDIR) {
749 req = mdc_enqueue_pack(exp, 0);
750 } else if (it->it_op & IT_LAYOUT) {
751 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
753 req = mdc_intent_layout_pack(exp, it, op_data);
754 lvb_type = LVB_T_LAYOUT;
755 } else if (it->it_op & IT_GETXATTR) {
756 req = mdc_intent_getxattr_pack(exp, it, op_data);
765 if (req && it && it->it_op & IT_CREAT)
766 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
769 req->rq_no_retry_einprogress = 1;
772 req->rq_generation_set = 1;
773 req->rq_import_generation = generation;
774 req->rq_sent = ktime_get_real_seconds() + resends;
777 /* It is important to obtain rpc_lock first (if applicable), so that
778 * threads that are serialised with rpc_lock are not polluting our
779 * rpcs in flight counter. We do not do flock request limiting, though
782 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
783 rc = obd_get_request_slot(&obddev->u.cli);
785 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
786 mdc_clear_replay_flag(req, 0);
787 ptlrpc_req_finished(req);
792 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
793 0, lvb_type, lockh, 0);
795 /* For flock requests we immediately return without further
796 * delay and let caller deal with the rest, since rest of
797 * this function metadata processing makes no sense for flock
798 * requests anyway. But in case of problem during comms with
799 * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
800 * can not rely on caller and this mainly for F_UNLCKs
801 * (explicits or automatically generated by Kernel to clean
802 * current FLocks upon exit) that can't be trashed
804 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
805 (einfo->ei_type == LDLM_FLOCK) &&
806 (einfo->ei_mode == LCK_NL))
811 obd_put_request_slot(&obddev->u.cli);
812 mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
815 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
816 obddev->obd_name, rc);
818 mdc_clear_replay_flag(req, rc);
819 ptlrpc_req_finished(req);
823 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
825 lockrep->lock_policy_res2 =
826 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
828 /* Retry the create infinitely when we get -EINPROGRESS from
829 * server. This is required by the new quota design.
831 if (it->it_op & IT_CREAT &&
832 (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
833 mdc_clear_replay_flag(req, rc);
834 ptlrpc_req_finished(req);
837 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
838 obddev->obd_name, resends, it->it_op,
839 PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
841 if (generation == obddev->u.cli.cl_import->imp_generation) {
844 CDEBUG(D_HA, "resend cross eviction\n");
849 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
851 if (lustre_handle_is_used(lockh)) {
852 ldlm_lock_decref(lockh, einfo->ei_mode);
853 memset(lockh, 0, sizeof(*lockh));
855 ptlrpc_req_finished(req);
857 it->it_lock_handle = 0;
858 it->it_lock_mode = 0;
859 it->it_request = NULL;
865 static int mdc_finish_intent_lock(struct obd_export *exp,
866 struct ptlrpc_request *request,
867 struct md_op_data *op_data,
868 struct lookup_intent *it,
869 struct lustre_handle *lockh)
871 struct lustre_handle old_lock;
872 struct mdt_body *mdt_body;
873 struct ldlm_lock *lock;
876 LASSERT(request != LP_POISON);
877 LASSERT(request->rq_repmsg != LP_POISON);
879 if (it->it_op & IT_READDIR)
882 if (!it_disposition(it, DISP_IT_EXECD)) {
883 /* The server failed before it even started executing the
884 * intent, i.e. because it couldn't unpack the request.
886 LASSERT(it->it_status != 0);
887 return it->it_status;
889 rc = it_open_error(DISP_IT_EXECD, it);
893 mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
894 LASSERT(mdt_body); /* mdc_enqueue checked */
896 rc = it_open_error(DISP_LOOKUP_EXECD, it);
900 /* keep requests around for the multiple phases of the call
901 * this shows the DISP_XX must guarantee we make it into the call
903 if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
904 it_disposition(it, DISP_OPEN_CREATE) &&
905 !it_open_error(DISP_OPEN_CREATE, it)) {
906 it_set_disposition(it, DISP_ENQ_CREATE_REF);
907 ptlrpc_request_addref(request); /* balanced in ll_create_node */
909 if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
910 it_disposition(it, DISP_OPEN_OPEN) &&
911 !it_open_error(DISP_OPEN_OPEN, it)) {
912 it_set_disposition(it, DISP_ENQ_OPEN_REF);
913 ptlrpc_request_addref(request); /* balanced in ll_file_open */
914 /* BUG 11546 - eviction in the middle of open rpc processing */
915 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
918 if (it->it_op & IT_CREAT) {
919 /* XXX this belongs in ll_create_it */
920 } else if (it->it_op == IT_OPEN) {
921 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
923 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
926 /* If we already have a matching lock, then cancel the new
927 * one. We have to set the data here instead of in
928 * mdc_enqueue, because we need to use the child's inode as
929 * the l_ast_data to match, and that's not available until
930 * intent_finish has performed the iget().)
932 lock = ldlm_handle2lock(lockh);
934 ldlm_policy_data_t policy = lock->l_policy_data;
936 LDLM_DEBUG(lock, "matching against this");
938 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
939 &lock->l_resource->lr_name),
940 "Lock res_id: "DLDLMRES", fid: "DFID"\n",
941 PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
944 memcpy(&old_lock, lockh, sizeof(*lockh));
945 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
946 LDLM_IBITS, &policy, LCK_NL,
948 ldlm_lock_decref_and_cancel(lockh,
950 memcpy(lockh, &old_lock, sizeof(old_lock));
951 it->it_lock_handle = lockh->cookie;
955 "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
956 (int)op_data->op_namelen, op_data->op_name,
957 ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
961 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
962 struct lu_fid *fid, __u64 *bits)
964 /* We could just return 1 immediately, but since we should only
965 * be called in revalidate_it if we already have a lock, let's
968 struct ldlm_res_id res_id;
969 struct lustre_handle lockh;
970 ldlm_policy_data_t policy;
973 if (it->it_lock_handle) {
974 lockh.cookie = it->it_lock_handle;
975 mode = ldlm_revalidate_lock_handle(&lockh, bits);
977 fid_build_reg_res_name(fid, &res_id);
980 /* File attributes are held under multiple bits:
981 * nlink is under lookup lock, size and times are
982 * under UPDATE lock and recently we've also got
983 * a separate permissions lock for owner/group/acl that
984 * were protected by lookup lock before.
985 * Getattr must provide all of that information,
986 * so we need to ensure we have all of those locks.
987 * Unfortunately, if the bits are split across multiple
988 * locks, there's no easy way to match all of them here,
989 * so an extra RPC would be performed to fetch all
990 * of those bits at once for now.
992 /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
993 * but for old MDTs (< 2.4), permission is covered
994 * by LOOKUP lock, so it needs to match all bits here.
996 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
997 MDS_INODELOCK_LOOKUP |
1001 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1004 policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1007 policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1011 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1012 LDLM_IBITS, &policy,
1013 LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1018 it->it_lock_handle = lockh.cookie;
1019 it->it_lock_mode = mode;
1021 it->it_lock_handle = 0;
1022 it->it_lock_mode = 0;
1029 * This long block is all about fixing up the lock and request state
1030 * so that it is correct as of the moment _before_ the operation was
1031 * applied; that way, the VFS will think that everything is normal and
1032 * call Lustre's regular VFS methods.
1034 * If we're performing a creation, that means that unless the creation
1035 * failed with EEXIST, we should fake up a negative dentry.
1037 * For everything else, we want to lookup to succeed.
1039 * One additional note: if CREATE or OPEN succeeded, we add an extra
1040 * reference to the request because we need to keep it around until
1041 * ll_create/ll_open gets called.
1043 * The server will return to us, in it_disposition, an indication of
1044 * exactly what it_status refers to.
1046 * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1047 * otherwise if DISP_OPEN_CREATE is set, then it status is the
1048 * creation failure mode. In either case, one of DISP_LOOKUP_NEG or
1049 * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1052 * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1055 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1056 struct lookup_intent *it, struct ptlrpc_request **reqp,
1057 ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1059 struct ldlm_enqueue_info einfo = {
1060 .ei_type = LDLM_IBITS,
1061 .ei_mode = it_to_lock_mode(it),
1062 .ei_cb_bl = cb_blocking,
1063 .ei_cb_cp = ldlm_completion_ast,
1065 struct lustre_handle lockh;
1070 CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1071 ", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
1072 op_data->op_name, PFID(&op_data->op_fid2),
1073 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1077 if (fid_is_sane(&op_data->op_fid2) &&
1078 (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1079 /* We could just return 1 immediately, but since we should only
1080 * be called in revalidate_it if we already have a lock, let's
1083 it->it_lock_handle = 0;
1084 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1085 /* Only return failure if it was not GETATTR by cfid
1086 * (from inode_revalidate)
1088 if (rc || op_data->op_namelen != 0)
1092 /* For case if upper layer did not alloc fid, do it now. */
1093 if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1094 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1096 CERROR("Can't alloc new fid, rc %d\n", rc);
1100 rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1105 *reqp = it->it_request;
1106 rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1110 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1111 struct ptlrpc_request *req,
1114 struct mdc_getattr_args *ga = args;
1115 struct obd_export *exp = ga->ga_exp;
1116 struct md_enqueue_info *minfo = ga->ga_minfo;
1117 struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1118 struct lookup_intent *it;
1119 struct lustre_handle *lockh;
1120 struct obd_device *obddev;
1121 struct ldlm_reply *lockrep;
1122 __u64 flags = LDLM_FL_HAS_INTENT;
1125 lockh = &minfo->mi_lockh;
1127 obddev = class_exp2obd(exp);
1129 obd_put_request_slot(&obddev->u.cli);
1130 if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1133 rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1134 &flags, NULL, 0, lockh, rc);
1136 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1137 mdc_clear_replay_flag(req, rc);
1141 lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1143 lockrep->lock_policy_res2 =
1144 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1146 rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1150 rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1154 minfo->mi_cb(req, minfo, rc);
1158 int mdc_intent_getattr_async(struct obd_export *exp,
1159 struct md_enqueue_info *minfo,
1160 struct ldlm_enqueue_info *einfo)
1162 struct md_op_data *op_data = &minfo->mi_data;
1163 struct lookup_intent *it = &minfo->mi_it;
1164 struct ptlrpc_request *req;
1165 struct mdc_getattr_args *ga;
1166 struct obd_device *obddev = class_exp2obd(exp);
1167 struct ldlm_res_id res_id;
1168 /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1169 * for statahead currently. Consider CMD in future, such two bits
1170 * maybe managed by different MDS, should be adjusted then.
1172 ldlm_policy_data_t policy = {
1173 .l_inodebits = { MDS_INODELOCK_LOOKUP |
1174 MDS_INODELOCK_UPDATE }
1177 __u64 flags = LDLM_FL_HAS_INTENT;
1180 "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
1181 (int)op_data->op_namelen, op_data->op_name,
1182 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1184 fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1185 req = mdc_intent_getattr_pack(exp, it, op_data);
1187 return PTR_ERR(req);
1189 rc = obd_get_request_slot(&obddev->u.cli);
1191 ptlrpc_req_finished(req);
1195 rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1196 0, LVB_T_NONE, &minfo->mi_lockh, 1);
1198 obd_put_request_slot(&obddev->u.cli);
1199 ptlrpc_req_finished(req);
1203 CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1204 ga = ptlrpc_req_async_args(req);
1206 ga->ga_minfo = minfo;
1207 ga->ga_einfo = einfo;
1209 req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1210 ptlrpcd_add_req(req);