GNU Linux-libre 4.9.337-gnu1
[releases.git] / drivers / staging / lustre / lustre / mdc / mdc_locks.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 # include <linux/module.h>
36
37 #include "../include/lustre_intent.h"
38 #include "../include/obd.h"
39 #include "../include/obd_class.h"
40 #include "../include/lustre_dlm.h"
41 #include "../include/lustre_fid.h"      /* fid_res_name_eq() */
42 #include "../include/lustre_mdc.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre_req_layout.h"
45 #include "mdc_internal.h"
46
47 struct mdc_getattr_args {
48         struct obd_export          *ga_exp;
49         struct md_enqueue_info      *ga_minfo;
50         struct ldlm_enqueue_info    *ga_einfo;
51 };
52
53 int it_open_error(int phase, struct lookup_intent *it)
54 {
55         if (it_disposition(it, DISP_OPEN_LEASE)) {
56                 if (phase >= DISP_OPEN_LEASE)
57                         return it->it_status;
58                 else
59                         return 0;
60         }
61         if (it_disposition(it, DISP_OPEN_OPEN)) {
62                 if (phase >= DISP_OPEN_OPEN)
63                         return it->it_status;
64                 else
65                         return 0;
66         }
67
68         if (it_disposition(it, DISP_OPEN_CREATE)) {
69                 if (phase >= DISP_OPEN_CREATE)
70                         return it->it_status;
71                 else
72                         return 0;
73         }
74
75         if (it_disposition(it, DISP_LOOKUP_EXECD)) {
76                 if (phase >= DISP_LOOKUP_EXECD)
77                         return it->it_status;
78                 else
79                         return 0;
80         }
81
82         if (it_disposition(it, DISP_IT_EXECD)) {
83                 if (phase >= DISP_IT_EXECD)
84                         return it->it_status;
85                 else
86                         return 0;
87         }
88         CERROR("it disp: %X, status: %d\n", it->it_disposition,
89                it->it_status);
90         LBUG();
91         return 0;
92 }
93 EXPORT_SYMBOL(it_open_error);
94
95 /* this must be called on a lockh that is known to have a referenced lock */
96 int mdc_set_lock_data(struct obd_export *exp, const struct lustre_handle *lockh,
97                       void *data, __u64 *bits)
98 {
99         struct ldlm_lock *lock;
100         struct inode *new_inode = data;
101
102         if (bits)
103                 *bits = 0;
104
105         if (!lustre_handle_is_used(lockh))
106                 return 0;
107
108         lock = ldlm_handle2lock(lockh);
109
110         LASSERT(lock);
111         lock_res_and_lock(lock);
112         if (lock->l_resource->lr_lvb_inode &&
113             lock->l_resource->lr_lvb_inode != data) {
114                 struct inode *old_inode = lock->l_resource->lr_lvb_inode;
115
116                 LASSERTF(old_inode->i_state & I_FREEING,
117                          "Found existing inode %p/%lu/%u state %lu in lock: setting data to %p/%lu/%u\n",
118                          old_inode, old_inode->i_ino, old_inode->i_generation,
119                          old_inode->i_state, new_inode, new_inode->i_ino,
120                          new_inode->i_generation);
121         }
122         lock->l_resource->lr_lvb_inode = new_inode;
123         if (bits)
124                 *bits = lock->l_policy_data.l_inodebits.bits;
125
126         unlock_res_and_lock(lock);
127         LDLM_LOCK_PUT(lock);
128
129         return 0;
130 }
131
132 enum ldlm_mode mdc_lock_match(struct obd_export *exp, __u64 flags,
133                               const struct lu_fid *fid, enum ldlm_type type,
134                               ldlm_policy_data_t *policy, enum ldlm_mode mode,
135                               struct lustre_handle *lockh)
136 {
137         struct ldlm_res_id res_id;
138         enum ldlm_mode rc;
139
140         fid_build_reg_res_name(fid, &res_id);
141         /* LU-4405: Clear bits not supported by server */
142         policy->l_inodebits.bits &= exp_connect_ibits(exp);
143         rc = ldlm_lock_match(class_exp2obd(exp)->obd_namespace, flags,
144                              &res_id, type, policy, mode, lockh, 0);
145         return rc;
146 }
147
148 int mdc_cancel_unused(struct obd_export *exp,
149                       const struct lu_fid *fid,
150                       ldlm_policy_data_t *policy,
151                       enum ldlm_mode mode,
152                       enum ldlm_cancel_flags flags,
153                       void *opaque)
154 {
155         struct ldlm_res_id res_id;
156         struct obd_device *obd = class_exp2obd(exp);
157         int rc;
158
159         fid_build_reg_res_name(fid, &res_id);
160         rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
161                                              policy, mode, flags, opaque);
162         return rc;
163 }
164
165 int mdc_null_inode(struct obd_export *exp,
166                    const struct lu_fid *fid)
167 {
168         struct ldlm_res_id res_id;
169         struct ldlm_resource *res;
170         struct ldlm_namespace *ns = class_exp2obd(exp)->obd_namespace;
171
172         LASSERTF(ns, "no namespace passed\n");
173
174         fid_build_reg_res_name(fid, &res_id);
175
176         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
177         if (IS_ERR(res))
178                 return 0;
179
180         lock_res(res);
181         res->lr_lvb_inode = NULL;
182         unlock_res(res);
183
184         ldlm_resource_putref(res);
185         return 0;
186 }
187
188 static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc)
189 {
190         /* Don't hold error requests for replay. */
191         if (req->rq_replay) {
192                 spin_lock(&req->rq_lock);
193                 req->rq_replay = 0;
194                 spin_unlock(&req->rq_lock);
195         }
196         if (rc && req->rq_transno != 0) {
197                 DEBUG_REQ(D_ERROR, req, "transno returned on error rc %d", rc);
198                 LBUG();
199         }
200 }
201
202 /* Save a large LOV EA into the request buffer so that it is available
203  * for replay.  We don't do this in the initial request because the
204  * original request doesn't need this buffer (at most it sends just the
205  * lov_mds_md) and it is a waste of RAM/bandwidth to send the empty
206  * buffer and may also be difficult to allocate and save a very large
207  * request buffer for each open. (bug 5707)
208  *
209  * OOM here may cause recovery failure if lmm is needed (only for the
210  * original open if the MDS crashed just when this client also OOM'd)
211  * but this is incredibly unlikely, and questionable whether the client
212  * could do MDS recovery under OOM anyways...
213  */
214 static void mdc_realloc_openmsg(struct ptlrpc_request *req,
215                                 struct mdt_body *body)
216 {
217         int     rc;
218
219         /* FIXME: remove this explicit offset. */
220         rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
221                                         body->mbo_eadatasize);
222         if (rc) {
223                 CERROR("Can't enlarge segment %d size to %d\n",
224                        DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
225                 body->mbo_valid &= ~OBD_MD_FLEASIZE;
226                 body->mbo_eadatasize = 0;
227         }
228 }
229
230 static struct ptlrpc_request *
231 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
232                      struct md_op_data *op_data)
233 {
234         struct ptlrpc_request *req;
235         struct obd_device     *obddev = class_exp2obd(exp);
236         struct ldlm_intent    *lit;
237         const void *lmm = op_data->op_data;
238         u32 lmmsize = op_data->op_data_size;
239         LIST_HEAD(cancels);
240         int                 count = 0;
241         int                 mode;
242         int                 rc;
243
244         it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
245
246         /* XXX: openlock is not cancelled for cross-refs. */
247         /* If inode is known, cancel conflicting OPEN locks. */
248         if (fid_is_sane(&op_data->op_fid2)) {
249                 if (it->it_flags & MDS_OPEN_LEASE) { /* try to get lease */
250                         if (it->it_flags & FMODE_WRITE)
251                                 mode = LCK_EX;
252                         else
253                                 mode = LCK_PR;
254                 } else {
255                         if (it->it_flags & (FMODE_WRITE | MDS_OPEN_TRUNC))
256                                 mode = LCK_CW;
257                         else if (it->it_flags & __FMODE_EXEC)
258                                 mode = LCK_PR;
259                         else
260                                 mode = LCK_CR;
261                 }
262                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
263                                                 &cancels, mode,
264                                                 MDS_INODELOCK_OPEN);
265         }
266
267         /* If CREATE, cancel parent's UPDATE lock. */
268         if (it->it_op & IT_CREAT)
269                 mode = LCK_EX;
270         else
271                 mode = LCK_CR;
272         count += mdc_resource_get_unused(exp, &op_data->op_fid1,
273                                          &cancels, mode,
274                                          MDS_INODELOCK_UPDATE);
275
276         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
277                                    &RQF_LDLM_INTENT_OPEN);
278         if (!req) {
279                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
280                 return ERR_PTR(-ENOMEM);
281         }
282
283         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
284                              op_data->op_namelen + 1);
285         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
286                              max(lmmsize, obddev->u.cli.cl_default_mds_easize));
287
288         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
289         if (rc < 0) {
290                 ptlrpc_request_free(req);
291                 return ERR_PTR(rc);
292         }
293
294         spin_lock(&req->rq_lock);
295         req->rq_replay = req->rq_import->imp_replayable;
296         spin_unlock(&req->rq_lock);
297
298         /* pack the intent */
299         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
300         lit->opc = (__u64)it->it_op;
301
302         /* pack the intended request */
303         mdc_open_pack(req, op_data, it->it_create_mode, 0, it->it_flags, lmm,
304                       lmmsize);
305
306         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
307                              obddev->u.cli.cl_max_mds_easize);
308
309         ptlrpc_request_set_replen(req);
310         return req;
311 }
312
313 static struct ptlrpc_request *
314 mdc_intent_getxattr_pack(struct obd_export *exp,
315                          struct lookup_intent *it,
316                          struct md_op_data *op_data)
317 {
318         struct ptlrpc_request   *req;
319         struct ldlm_intent      *lit;
320         int rc, count = 0;
321         u32 maxdata;
322         LIST_HEAD(cancels);
323
324         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
325                                    &RQF_LDLM_INTENT_GETXATTR);
326         if (!req)
327                 return ERR_PTR(-ENOMEM);
328
329         rc = ldlm_prep_enqueue_req(exp, req, &cancels, count);
330         if (rc) {
331                 ptlrpc_request_free(req);
332                 return ERR_PTR(rc);
333         }
334
335         /* pack the intent */
336         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
337         lit->opc = IT_GETXATTR;
338
339         maxdata = class_exp2cliimp(exp)->imp_connect_data.ocd_max_easize;
340
341         /* pack the intended request */
342         mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid, maxdata, -1,
343                       0);
344
345         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_SERVER, maxdata);
346
347         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS, RCL_SERVER, maxdata);
348
349         req_capsule_set_size(&req->rq_pill, &RMF_EAVALS_LENS,
350                              RCL_SERVER, maxdata);
351
352         ptlrpc_request_set_replen(req);
353
354         return req;
355 }
356
357 static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
358                                                      struct lookup_intent *it,
359                                                      struct md_op_data *op_data)
360 {
361         struct ptlrpc_request *req;
362         struct obd_device     *obddev = class_exp2obd(exp);
363         struct ldlm_intent    *lit;
364         int                 rc;
365
366         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
367                                    &RQF_LDLM_INTENT_UNLINK);
368         if (!req)
369                 return ERR_PTR(-ENOMEM);
370
371         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
372                              op_data->op_namelen + 1);
373
374         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
375         if (rc) {
376                 ptlrpc_request_free(req);
377                 return ERR_PTR(rc);
378         }
379
380         /* pack the intent */
381         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
382         lit->opc = (__u64)it->it_op;
383
384         /* pack the intended request */
385         mdc_unlink_pack(req, op_data);
386
387         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
388                              obddev->u.cli.cl_default_mds_easize);
389         req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
390                              obddev->u.cli.cl_default_mds_cookiesize);
391         ptlrpc_request_set_replen(req);
392         return req;
393 }
394
395 static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
396                                                       struct lookup_intent *it,
397                                                      struct md_op_data *op_data)
398 {
399         struct ptlrpc_request *req;
400         struct obd_device     *obddev = class_exp2obd(exp);
401         u64                    valid = OBD_MD_FLGETATTR | OBD_MD_FLEASIZE |
402                                        OBD_MD_FLMODEASIZE | OBD_MD_FLDIREA |
403                                        OBD_MD_MEA | OBD_MD_FLACL;
404         struct ldlm_intent    *lit;
405         int                 rc;
406         u32 easize;
407
408         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
409                                    &RQF_LDLM_INTENT_GETATTR);
410         if (!req)
411                 return ERR_PTR(-ENOMEM);
412
413         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
414                              op_data->op_namelen + 1);
415
416         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
417         if (rc) {
418                 ptlrpc_request_free(req);
419                 return ERR_PTR(rc);
420         }
421
422         /* pack the intent */
423         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
424         lit->opc = (__u64)it->it_op;
425
426         if (obddev->u.cli.cl_default_mds_easize > 0)
427                 easize = obddev->u.cli.cl_default_mds_easize;
428         else
429                 easize = obddev->u.cli.cl_max_mds_easize;
430
431         /* pack the intended request */
432         mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
433
434         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
435         ptlrpc_request_set_replen(req);
436         return req;
437 }
438
439 static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
440                                                      struct lookup_intent *it,
441                                                      struct md_op_data *unused)
442 {
443         struct obd_device     *obd = class_exp2obd(exp);
444         struct ptlrpc_request *req;
445         struct ldlm_intent    *lit;
446         struct layout_intent  *layout;
447         int rc;
448
449         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
450                                    &RQF_LDLM_INTENT_LAYOUT);
451         if (!req)
452                 return ERR_PTR(-ENOMEM);
453
454         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, 0);
455         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
456         if (rc) {
457                 ptlrpc_request_free(req);
458                 return ERR_PTR(rc);
459         }
460
461         /* pack the intent */
462         lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
463         lit->opc = (__u64)it->it_op;
464
465         /* pack the layout intent request */
466         layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
467         /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
468          * set for replication
469          */
470         layout->li_opc = LAYOUT_INTENT_ACCESS;
471
472         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
473                              obd->u.cli.cl_default_mds_easize);
474         ptlrpc_request_set_replen(req);
475         return req;
476 }
477
478 static struct ptlrpc_request *
479 mdc_enqueue_pack(struct obd_export *exp, int lvb_len)
480 {
481         struct ptlrpc_request *req;
482         int rc;
483
484         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
485         if (!req)
486                 return ERR_PTR(-ENOMEM);
487
488         rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
489         if (rc) {
490                 ptlrpc_request_free(req);
491                 return ERR_PTR(rc);
492         }
493
494         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
495         ptlrpc_request_set_replen(req);
496         return req;
497 }
498
499 static int mdc_finish_enqueue(struct obd_export *exp,
500                               struct ptlrpc_request *req,
501                               struct ldlm_enqueue_info *einfo,
502                               struct lookup_intent *it,
503                               struct lustre_handle *lockh,
504                               int rc)
505 {
506         struct req_capsule  *pill = &req->rq_pill;
507         struct ldlm_request *lockreq;
508         struct ldlm_reply   *lockrep;
509         struct ldlm_lock    *lock;
510         void            *lvb_data = NULL;
511         u32 lvb_len = 0;
512
513         LASSERT(rc >= 0);
514         /* Similarly, if we're going to replay this request, we don't want to
515          * actually get a lock, just perform the intent.
516          */
517         if (req->rq_transno || req->rq_replay) {
518                 lockreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
519                 lockreq->lock_flags |= ldlm_flags_to_wire(LDLM_FL_INTENT_ONLY);
520         }
521
522         if (rc == ELDLM_LOCK_ABORTED) {
523                 einfo->ei_mode = 0;
524                 memset(lockh, 0, sizeof(*lockh));
525                 rc = 0;
526         } else { /* rc = 0 */
527                 lock = ldlm_handle2lock(lockh);
528
529                 /* If the server gave us back a different lock mode, we should
530                  * fix up our variables.
531                  */
532                 if (lock->l_req_mode != einfo->ei_mode) {
533                         ldlm_lock_addref(lockh, lock->l_req_mode);
534                         ldlm_lock_decref(lockh, einfo->ei_mode);
535                         einfo->ei_mode = lock->l_req_mode;
536                 }
537                 LDLM_LOCK_PUT(lock);
538         }
539
540         lockrep = req_capsule_server_get(pill, &RMF_DLM_REP);
541
542         it->it_disposition = (int)lockrep->lock_policy_res1;
543         it->it_status = (int)lockrep->lock_policy_res2;
544         it->it_lock_mode = einfo->ei_mode;
545         it->it_lock_handle = lockh->cookie;
546         it->it_request = req;
547
548         /* Technically speaking rq_transno must already be zero if
549          * it_status is in error, so the check is a bit redundant
550          */
551         if ((!req->rq_transno || it->it_status < 0) && req->rq_replay)
552                 mdc_clear_replay_flag(req, it->it_status);
553
554         /* If we're doing an IT_OPEN which did not result in an actual
555          * successful open, then we need to remove the bit which saves
556          * this request for unconditional replay.
557          *
558          * It's important that we do this first!  Otherwise we might exit the
559          * function without doing so, and try to replay a failed create
560          * (bug 3440)
561          */
562         if (it->it_op & IT_OPEN && req->rq_replay &&
563             (!it_disposition(it, DISP_OPEN_OPEN) || it->it_status != 0))
564                 mdc_clear_replay_flag(req, it->it_status);
565
566         DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d",
567                   it->it_op, it->it_disposition, it->it_status);
568
569         /* We know what to expect, so we do any byte flipping required here */
570         if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) {
571                 struct mdt_body *body;
572
573                 body = req_capsule_server_get(pill, &RMF_MDT_BODY);
574                 if (!body) {
575                         CERROR("Can't swab mdt_body\n");
576                         return -EPROTO;
577                 }
578
579                 if (it_disposition(it, DISP_OPEN_OPEN) &&
580                     !it_open_error(DISP_OPEN_OPEN, it)) {
581                         /*
582                          * If this is a successful OPEN request, we need to set
583                          * replay handler and data early, so that if replay
584                          * happens immediately after swabbing below, new reply
585                          * is swabbed by that handler correctly.
586                          */
587                         mdc_set_open_replay_data(NULL, NULL, it);
588                 }
589
590                 if ((body->mbo_valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) {
591                         void *eadata;
592
593                         mdc_update_max_ea_from_body(exp, body);
594
595                         /*
596                          * The eadata is opaque; just check that it is there.
597                          * Eventually, obd_unpackmd() will check the contents.
598                          */
599                         eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
600                                                               body->mbo_eadatasize);
601                         if (!eadata)
602                                 return -EPROTO;
603
604                         /* save lvb data and length in case this is for layout
605                          * lock
606                          */
607                         lvb_data = eadata;
608                         lvb_len = body->mbo_eadatasize;
609
610                         /*
611                          * We save the reply LOV EA in case we have to replay a
612                          * create for recovery.  If we didn't allocate a large
613                          * enough request buffer above we need to reallocate it
614                          * here to hold the actual LOV EA.
615                          *
616                          * To not save LOV EA if request is not going to replay
617                          * (for example error one).
618                          */
619                         if ((it->it_op & IT_OPEN) && req->rq_replay) {
620                                 void *lmm;
621
622                                 if (req_capsule_get_size(pill, &RMF_EADATA,
623                                                          RCL_CLIENT) <
624                                     body->mbo_eadatasize)
625                                         mdc_realloc_openmsg(req, body);
626                                 else
627                                         req_capsule_shrink(pill, &RMF_EADATA,
628                                                            body->mbo_eadatasize,
629                                                            RCL_CLIENT);
630
631                                 req_capsule_set_size(pill, &RMF_EADATA,
632                                                      RCL_CLIENT,
633                                                      body->mbo_eadatasize);
634
635                                 lmm = req_capsule_client_get(pill, &RMF_EADATA);
636                                 if (lmm)
637                                         memcpy(lmm, eadata, body->mbo_eadatasize);
638                         }
639                 }
640         } else if (it->it_op & IT_LAYOUT) {
641                 /* maybe the lock was granted right away and layout
642                  * is packed into RMF_DLM_LVB of req
643                  */
644                 lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
645                 if (lvb_len > 0) {
646                         lvb_data = req_capsule_server_sized_get(pill,
647                                                                 &RMF_DLM_LVB,
648                                                                 lvb_len);
649                         if (!lvb_data)
650                                 return -EPROTO;
651                 }
652         }
653
654         /* fill in stripe data for layout lock */
655         lock = ldlm_handle2lock(lockh);
656         if (lock && ldlm_has_layout(lock) && lvb_data) {
657                 void *lmm;
658
659                 LDLM_DEBUG(lock, "layout lock returned by: %s, lvb_len: %d",
660                            ldlm_it2str(it->it_op), lvb_len);
661
662                 lmm = libcfs_kvzalloc(lvb_len, GFP_NOFS);
663                 if (!lmm) {
664                         LDLM_LOCK_PUT(lock);
665                         return -ENOMEM;
666                 }
667                 memcpy(lmm, lvb_data, lvb_len);
668
669                 /* install lvb_data */
670                 lock_res_and_lock(lock);
671                 if (!lock->l_lvb_data) {
672                         lock->l_lvb_type = LVB_T_LAYOUT;
673                         lock->l_lvb_data = lmm;
674                         lock->l_lvb_len = lvb_len;
675                         lmm = NULL;
676                 }
677                 unlock_res_and_lock(lock);
678                 if (lmm)
679                         kvfree(lmm);
680         }
681         if (lock)
682                 LDLM_LOCK_PUT(lock);
683
684         return rc;
685 }
686
687 /* We always reserve enough space in the reply packet for a stripe MD, because
688  * we don't know in advance the file type.
689  */
690 int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
691                 const ldlm_policy_data_t *policy,
692                 struct lookup_intent *it, struct md_op_data *op_data,
693                 struct lustre_handle *lockh, u64 extra_lock_flags)
694 {
695         static const ldlm_policy_data_t lookup_policy = {
696                 .l_inodebits = { MDS_INODELOCK_LOOKUP }
697         };
698         static const ldlm_policy_data_t update_policy = {
699                 .l_inodebits = { MDS_INODELOCK_UPDATE }
700         };
701         static const ldlm_policy_data_t layout_policy = {
702                 .l_inodebits = { MDS_INODELOCK_LAYOUT }
703         };
704         static const ldlm_policy_data_t getxattr_policy = {
705                 .l_inodebits = { MDS_INODELOCK_XATTR }
706         };
707         struct obd_device *obddev = class_exp2obd(exp);
708         struct ptlrpc_request *req = NULL;
709         u64 flags, saved_flags = extra_lock_flags;
710         struct ldlm_res_id res_id;
711         int generation, resends = 0;
712         struct ldlm_reply *lockrep;
713         enum lvb_type lvb_type = LVB_T_NONE;
714         int rc;
715
716         LASSERTF(!it || einfo->ei_type == LDLM_IBITS, "lock type %d\n",
717                  einfo->ei_type);
718         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
719
720         if (it) {
721                 LASSERT(!policy);
722
723                 saved_flags |= LDLM_FL_HAS_INTENT;
724                 if (it->it_op & (IT_OPEN | IT_UNLINK | IT_GETATTR | IT_READDIR))
725                         policy = &update_policy;
726                 else if (it->it_op & IT_LAYOUT)
727                         policy = &layout_policy;
728                 else if (it->it_op & (IT_GETXATTR | IT_SETXATTR))
729                         policy = &getxattr_policy;
730                 else
731                         policy = &lookup_policy;
732         }
733
734         generation = obddev->u.cli.cl_import->imp_generation;
735 resend:
736         flags = saved_flags;
737         if (!it) {
738                 /* The only way right now is FLOCK. */
739                 LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
740                          einfo->ei_type);
741                 res_id.name[3] = LDLM_FLOCK;
742         } else if (it->it_op & IT_OPEN) {
743                 req = mdc_intent_open_pack(exp, it, op_data);
744         } else if (it->it_op & IT_UNLINK) {
745                 req = mdc_intent_unlink_pack(exp, it, op_data);
746         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
747                 req = mdc_intent_getattr_pack(exp, it, op_data);
748         } else if (it->it_op & IT_READDIR) {
749                 req = mdc_enqueue_pack(exp, 0);
750         } else if (it->it_op & IT_LAYOUT) {
751                 if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
752                         return -EOPNOTSUPP;
753                 req = mdc_intent_layout_pack(exp, it, op_data);
754                 lvb_type = LVB_T_LAYOUT;
755         } else if (it->it_op & IT_GETXATTR) {
756                 req = mdc_intent_getxattr_pack(exp, it, op_data);
757         } else {
758                 LBUG();
759                 return -EINVAL;
760         }
761
762         if (IS_ERR(req))
763                 return PTR_ERR(req);
764
765         if (req && it && it->it_op & IT_CREAT)
766                 /* ask ptlrpc not to resend on EINPROGRESS since we have our own
767                  * retry logic
768                  */
769                 req->rq_no_retry_einprogress = 1;
770
771         if (resends) {
772                 req->rq_generation_set = 1;
773                 req->rq_import_generation = generation;
774                 req->rq_sent = ktime_get_real_seconds() + resends;
775         }
776
777         /* It is important to obtain rpc_lock first (if applicable), so that
778          * threads that are serialised with rpc_lock are not polluting our
779          * rpcs in flight counter. We do not do flock request limiting, though
780          */
781         if (it) {
782                 mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
783                 rc = obd_get_request_slot(&obddev->u.cli);
784                 if (rc != 0) {
785                         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
786                         mdc_clear_replay_flag(req, 0);
787                         ptlrpc_req_finished(req);
788                         return rc;
789                 }
790         }
791
792         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
793                               0, lvb_type, lockh, 0);
794         if (!it) {
795                 /* For flock requests we immediately return without further
796                  * delay and let caller deal with the rest, since rest of
797                  * this function metadata processing makes no sense for flock
798                  * requests anyway. But in case of problem during comms with
799                  * Server (ETIMEDOUT) or any signal/kill attempt (EINTR), we
800                  * can not rely on caller and this mainly for F_UNLCKs
801                  * (explicits or automatically generated by Kernel to clean
802                  * current FLocks upon exit) that can't be trashed
803                  */
804                 if (((rc == -EINTR) || (rc == -ETIMEDOUT)) &&
805                     (einfo->ei_type == LDLM_FLOCK) &&
806                     (einfo->ei_mode == LCK_NL))
807                         goto resend;
808                 return rc;
809         }
810
811         obd_put_request_slot(&obddev->u.cli);
812         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
813
814         if (rc < 0) {
815                 CDEBUG(D_INFO, "%s: ldlm_cli_enqueue failed: rc = %d\n",
816                        obddev->obd_name, rc);
817
818                 mdc_clear_replay_flag(req, rc);
819                 ptlrpc_req_finished(req);
820                 return rc;
821         }
822
823         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
824
825         lockrep->lock_policy_res2 =
826                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
827
828         /* Retry the create infinitely when we get -EINPROGRESS from
829          * server. This is required by the new quota design.
830          */
831         if (it->it_op & IT_CREAT &&
832             (int)lockrep->lock_policy_res2 == -EINPROGRESS) {
833                 mdc_clear_replay_flag(req, rc);
834                 ptlrpc_req_finished(req);
835                 resends++;
836
837                 CDEBUG(D_HA, "%s: resend:%d op:%d "DFID"/"DFID"\n",
838                        obddev->obd_name, resends, it->it_op,
839                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
840
841                 if (generation == obddev->u.cli.cl_import->imp_generation) {
842                         goto resend;
843                 } else {
844                         CDEBUG(D_HA, "resend cross eviction\n");
845                         return -EIO;
846                 }
847         }
848
849         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
850         if (rc < 0) {
851                 if (lustre_handle_is_used(lockh)) {
852                         ldlm_lock_decref(lockh, einfo->ei_mode);
853                         memset(lockh, 0, sizeof(*lockh));
854                 }
855                 ptlrpc_req_finished(req);
856
857                 it->it_lock_handle = 0;
858                 it->it_lock_mode = 0;
859                 it->it_request = NULL;
860         }
861
862         return rc;
863 }
864
865 static int mdc_finish_intent_lock(struct obd_export *exp,
866                                   struct ptlrpc_request *request,
867                                   struct md_op_data *op_data,
868                                   struct lookup_intent *it,
869                                   struct lustre_handle *lockh)
870 {
871         struct lustre_handle old_lock;
872         struct mdt_body *mdt_body;
873         struct ldlm_lock *lock;
874         int rc;
875
876         LASSERT(request != LP_POISON);
877         LASSERT(request->rq_repmsg != LP_POISON);
878
879         if (it->it_op & IT_READDIR)
880                 return 0;
881
882         if (!it_disposition(it, DISP_IT_EXECD)) {
883                 /* The server failed before it even started executing the
884                  * intent, i.e. because it couldn't unpack the request.
885                  */
886                 LASSERT(it->it_status != 0);
887                 return it->it_status;
888         }
889         rc = it_open_error(DISP_IT_EXECD, it);
890         if (rc)
891                 return rc;
892
893         mdt_body = req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY);
894         LASSERT(mdt_body);      /* mdc_enqueue checked */
895
896         rc = it_open_error(DISP_LOOKUP_EXECD, it);
897         if (rc)
898                 return rc;
899
900         /* keep requests around for the multiple phases of the call
901          * this shows the DISP_XX must guarantee we make it into the call
902          */
903         if (!it_disposition(it, DISP_ENQ_CREATE_REF) &&
904             it_disposition(it, DISP_OPEN_CREATE) &&
905             !it_open_error(DISP_OPEN_CREATE, it)) {
906                 it_set_disposition(it, DISP_ENQ_CREATE_REF);
907                 ptlrpc_request_addref(request); /* balanced in ll_create_node */
908         }
909         if (!it_disposition(it, DISP_ENQ_OPEN_REF) &&
910             it_disposition(it, DISP_OPEN_OPEN) &&
911             !it_open_error(DISP_OPEN_OPEN, it)) {
912                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
913                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
914                 /* BUG 11546 - eviction in the middle of open rpc processing */
915                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
916         }
917
918         if (it->it_op & IT_CREAT) {
919                 /* XXX this belongs in ll_create_it */
920         } else if (it->it_op == IT_OPEN) {
921                 LASSERT(!it_disposition(it, DISP_OPEN_CREATE));
922         } else {
923                 LASSERT(it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT));
924         }
925
926         /* If we already have a matching lock, then cancel the new
927          * one.  We have to set the data here instead of in
928          * mdc_enqueue, because we need to use the child's inode as
929          * the l_ast_data to match, and that's not available until
930          * intent_finish has performed the iget().)
931          */
932         lock = ldlm_handle2lock(lockh);
933         if (lock) {
934                 ldlm_policy_data_t policy = lock->l_policy_data;
935
936                 LDLM_DEBUG(lock, "matching against this");
937
938                 LASSERTF(fid_res_name_eq(&mdt_body->mbo_fid1,
939                                          &lock->l_resource->lr_name),
940                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
941                          PLDLMRES(lock->l_resource), PFID(&mdt_body->mbo_fid1));
942                 LDLM_LOCK_PUT(lock);
943
944                 memcpy(&old_lock, lockh, sizeof(*lockh));
945                 if (ldlm_lock_match(NULL, LDLM_FL_BLOCK_GRANTED, NULL,
946                                     LDLM_IBITS, &policy, LCK_NL,
947                                     &old_lock, 0)) {
948                         ldlm_lock_decref_and_cancel(lockh,
949                                                     it->it_lock_mode);
950                         memcpy(lockh, &old_lock, sizeof(old_lock));
951                         it->it_lock_handle = lockh->cookie;
952                 }
953         }
954         CDEBUG(D_DENTRY,
955                "D_IT dentry %.*s intent: %s status %d disp %x rc %d\n",
956                (int)op_data->op_namelen, op_data->op_name,
957                ldlm_it2str(it->it_op), it->it_status, it->it_disposition, rc);
958         return rc;
959 }
960
961 int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
962                         struct lu_fid *fid, __u64 *bits)
963 {
964         /* We could just return 1 immediately, but since we should only
965          * be called in revalidate_it if we already have a lock, let's
966          * verify that.
967          */
968         struct ldlm_res_id res_id;
969         struct lustre_handle lockh;
970         ldlm_policy_data_t policy;
971         enum ldlm_mode mode;
972
973         if (it->it_lock_handle) {
974                 lockh.cookie = it->it_lock_handle;
975                 mode = ldlm_revalidate_lock_handle(&lockh, bits);
976         } else {
977                 fid_build_reg_res_name(fid, &res_id);
978                 switch (it->it_op) {
979                 case IT_GETATTR:
980                         /* File attributes are held under multiple bits:
981                          * nlink is under lookup lock, size and times are
982                          * under UPDATE lock and recently we've also got
983                          * a separate permissions lock for owner/group/acl that
984                          * were protected by lookup lock before.
985                          * Getattr must provide all of that information,
986                          * so we need to ensure we have all of those locks.
987                          * Unfortunately, if the bits are split across multiple
988                          * locks, there's no easy way to match all of them here,
989                          * so an extra RPC would be performed to fetch all
990                          * of those bits at once for now.
991                          */
992                         /* For new MDTs(> 2.4), UPDATE|PERM should be enough,
993                          * but for old MDTs (< 2.4), permission is covered
994                          * by LOOKUP lock, so it needs to match all bits here.
995                          */
996                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE |
997                                                   MDS_INODELOCK_LOOKUP |
998                                                   MDS_INODELOCK_PERM;
999                         break;
1000                 case IT_READDIR:
1001                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1002                         break;
1003                 case IT_LAYOUT:
1004                         policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
1005                         break;
1006                 default:
1007                         policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1008                         break;
1009                 }
1010
1011                 mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
1012                                       LDLM_IBITS, &policy,
1013                                       LCK_CR | LCK_CW | LCK_PR | LCK_PW,
1014                                       &lockh);
1015         }
1016
1017         if (mode) {
1018                 it->it_lock_handle = lockh.cookie;
1019                 it->it_lock_mode = mode;
1020         } else {
1021                 it->it_lock_handle = 0;
1022                 it->it_lock_mode = 0;
1023         }
1024
1025         return !!mode;
1026 }
1027
1028 /*
1029  * This long block is all about fixing up the lock and request state
1030  * so that it is correct as of the moment _before_ the operation was
1031  * applied; that way, the VFS will think that everything is normal and
1032  * call Lustre's regular VFS methods.
1033  *
1034  * If we're performing a creation, that means that unless the creation
1035  * failed with EEXIST, we should fake up a negative dentry.
1036  *
1037  * For everything else, we want to lookup to succeed.
1038  *
1039  * One additional note: if CREATE or OPEN succeeded, we add an extra
1040  * reference to the request because we need to keep it around until
1041  * ll_create/ll_open gets called.
1042  *
1043  * The server will return to us, in it_disposition, an indication of
1044  * exactly what it_status refers to.
1045  *
1046  * If DISP_OPEN_OPEN is set, then it_status refers to the open() call,
1047  * otherwise if DISP_OPEN_CREATE is set, then it status is the
1048  * creation failure mode.  In either case, one of DISP_LOOKUP_NEG or
1049  * DISP_LOOKUP_POS will be set, indicating whether the child lookup
1050  * was successful.
1051  *
1052  * Else, if DISP_LOOKUP_EXECD then it_status is the rc of the
1053  * child lookup.
1054  */
1055 int mdc_intent_lock(struct obd_export *exp, struct md_op_data *op_data,
1056                     struct lookup_intent *it, struct ptlrpc_request **reqp,
1057                     ldlm_blocking_callback cb_blocking, __u64 extra_lock_flags)
1058 {
1059         struct ldlm_enqueue_info einfo = {
1060                 .ei_type        = LDLM_IBITS,
1061                 .ei_mode        = it_to_lock_mode(it),
1062                 .ei_cb_bl       = cb_blocking,
1063                 .ei_cb_cp       = ldlm_completion_ast,
1064         };
1065         struct lustre_handle lockh;
1066         int rc = 0;
1067
1068         LASSERT(it);
1069
1070         CDEBUG(D_DLMTRACE, "(name: %.*s,"DFID") in obj "DFID
1071                 ", intent: %s flags %#Lo\n", (int)op_data->op_namelen,
1072                 op_data->op_name, PFID(&op_data->op_fid2),
1073                 PFID(&op_data->op_fid1), ldlm_it2str(it->it_op),
1074                 it->it_flags);
1075
1076         lockh.cookie = 0;
1077         if (fid_is_sane(&op_data->op_fid2) &&
1078             (it->it_op & (IT_LOOKUP | IT_GETATTR | IT_READDIR))) {
1079                 /* We could just return 1 immediately, but since we should only
1080                  * be called in revalidate_it if we already have a lock, let's
1081                  * verify that.
1082                  */
1083                 it->it_lock_handle = 0;
1084                 rc = mdc_revalidate_lock(exp, it, &op_data->op_fid2, NULL);
1085                 /* Only return failure if it was not GETATTR by cfid
1086                  * (from inode_revalidate)
1087                  */
1088                 if (rc || op_data->op_namelen != 0)
1089                         return rc;
1090         }
1091
1092         /* For case if upper layer did not alloc fid, do it now. */
1093         if (!fid_is_sane(&op_data->op_fid2) && it->it_op & IT_CREAT) {
1094                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1095                 if (rc < 0) {
1096                         CERROR("Can't alloc new fid, rc %d\n", rc);
1097                         return rc;
1098                 }
1099         }
1100         rc = mdc_enqueue(exp, &einfo, NULL, it, op_data, &lockh,
1101                          extra_lock_flags);
1102         if (rc < 0)
1103                 return rc;
1104
1105         *reqp = it->it_request;
1106         rc = mdc_finish_intent_lock(exp, *reqp, op_data, it, &lockh);
1107         return rc;
1108 }
1109
1110 static int mdc_intent_getattr_async_interpret(const struct lu_env *env,
1111                                               struct ptlrpc_request *req,
1112                                               void *args, int rc)
1113 {
1114         struct mdc_getattr_args  *ga = args;
1115         struct obd_export       *exp = ga->ga_exp;
1116         struct md_enqueue_info   *minfo = ga->ga_minfo;
1117         struct ldlm_enqueue_info *einfo = ga->ga_einfo;
1118         struct lookup_intent     *it;
1119         struct lustre_handle     *lockh;
1120         struct obd_device       *obddev;
1121         struct ldlm_reply        *lockrep;
1122         __u64                flags = LDLM_FL_HAS_INTENT;
1123
1124         it    = &minfo->mi_it;
1125         lockh = &minfo->mi_lockh;
1126
1127         obddev = class_exp2obd(exp);
1128
1129         obd_put_request_slot(&obddev->u.cli);
1130         if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
1131                 rc = -ETIMEDOUT;
1132
1133         rc = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, 1, einfo->ei_mode,
1134                                    &flags, NULL, 0, lockh, rc);
1135         if (rc < 0) {
1136                 CERROR("ldlm_cli_enqueue_fini: %d\n", rc);
1137                 mdc_clear_replay_flag(req, rc);
1138                 goto out;
1139         }
1140
1141         lockrep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
1142
1143         lockrep->lock_policy_res2 =
1144                 ptlrpc_status_ntoh(lockrep->lock_policy_res2);
1145
1146         rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
1147         if (rc)
1148                 goto out;
1149
1150         rc = mdc_finish_intent_lock(exp, req, &minfo->mi_data, it, lockh);
1151
1152 out:
1153         kfree(einfo);
1154         minfo->mi_cb(req, minfo, rc);
1155         return 0;
1156 }
1157
1158 int mdc_intent_getattr_async(struct obd_export *exp,
1159                              struct md_enqueue_info *minfo,
1160                              struct ldlm_enqueue_info *einfo)
1161 {
1162         struct md_op_data       *op_data = &minfo->mi_data;
1163         struct lookup_intent    *it = &minfo->mi_it;
1164         struct ptlrpc_request   *req;
1165         struct mdc_getattr_args *ga;
1166         struct obd_device       *obddev = class_exp2obd(exp);
1167         struct ldlm_res_id       res_id;
1168         /*XXX: Both MDS_INODELOCK_LOOKUP and MDS_INODELOCK_UPDATE are needed
1169          *     for statahead currently. Consider CMD in future, such two bits
1170          *     maybe managed by different MDS, should be adjusted then.
1171          */
1172         ldlm_policy_data_t       policy = {
1173                                         .l_inodebits = { MDS_INODELOCK_LOOKUP |
1174                                                          MDS_INODELOCK_UPDATE }
1175                                  };
1176         int                   rc = 0;
1177         __u64               flags = LDLM_FL_HAS_INTENT;
1178
1179         CDEBUG(D_DLMTRACE,
1180                "name: %.*s in inode " DFID ", intent: %s flags %#Lo\n",
1181                (int)op_data->op_namelen, op_data->op_name,
1182                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
1183
1184         fid_build_reg_res_name(&op_data->op_fid1, &res_id);
1185         req = mdc_intent_getattr_pack(exp, it, op_data);
1186         if (IS_ERR(req))
1187                 return PTR_ERR(req);
1188
1189         rc = obd_get_request_slot(&obddev->u.cli);
1190         if (rc != 0) {
1191                 ptlrpc_req_finished(req);
1192                 return rc;
1193         }
1194
1195         rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, &policy, &flags, NULL,
1196                               0, LVB_T_NONE, &minfo->mi_lockh, 1);
1197         if (rc < 0) {
1198                 obd_put_request_slot(&obddev->u.cli);
1199                 ptlrpc_req_finished(req);
1200                 return rc;
1201         }
1202
1203         CLASSERT(sizeof(*ga) <= sizeof(req->rq_async_args));
1204         ga = ptlrpc_req_async_args(req);
1205         ga->ga_exp = exp;
1206         ga->ga_minfo = minfo;
1207         ga->ga_einfo = einfo;
1208
1209         req->rq_interpret_reply = mdc_intent_getattr_async_interpret;
1210         ptlrpcd_add_req(req);
1211
1212         return 0;
1213 }