GNU Linux-libre 4.14.266-gnu1
[releases.git] / drivers / staging / lustre / lustre / lmv / lmv_obd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_LMV
34 #include <linux/slab.h>
35 #include <linux/module.h>
36 #include <linux/init.h>
37 #include <linux/pagemap.h>
38 #include <linux/mm.h>
39 #include <asm/div64.h>
40 #include <linux/seq_file.h>
41 #include <linux/namei.h>
42 #include <linux/uaccess.h>
43
44 #include <obd_support.h>
45 #include <lustre_net.h>
46 #include <obd_class.h>
47 #include <lustre_lmv.h>
48 #include <lprocfs_status.h>
49 #include <cl_object.h>
50 #include <lustre_fid.h>
51 #include <uapi/linux/lustre/lustre_ioctl.h>
52 #include <lustre_kernelcomm.h>
53 #include "lmv_internal.h"
54
55 static int lmv_check_connect(struct obd_device *obd);
56
57 static void lmv_activate_target(struct lmv_obd *lmv,
58                                 struct lmv_tgt_desc *tgt,
59                                 int activate)
60 {
61         if (tgt->ltd_active == activate)
62                 return;
63
64         tgt->ltd_active = activate;
65         lmv->desc.ld_active_tgt_count += (activate ? 1 : -1);
66         tgt->ltd_exp->exp_obd->obd_inactive = !activate;
67 }
68
69 /**
70  * Error codes:
71  *
72  *  -EINVAL  : UUID can't be found in the LMV's target list
73  *  -ENOTCONN: The UUID is found, but the target connection is bad (!)
74  *  -EBADF   : The UUID is found, but the OBD of the wrong type (!)
75  */
76 static int lmv_set_mdc_active(struct lmv_obd *lmv, const struct obd_uuid *uuid,
77                               int activate)
78 {
79         struct lmv_tgt_desc *tgt = NULL;
80         struct obd_device      *obd;
81         u32                  i;
82         int                  rc = 0;
83
84         CDEBUG(D_INFO, "Searching in lmv %p for uuid %s (activate=%d)\n",
85                lmv, uuid->uuid, activate);
86
87         spin_lock(&lmv->lmv_lock);
88         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
89                 tgt = lmv->tgts[i];
90                 if (!tgt || !tgt->ltd_exp)
91                         continue;
92
93                 CDEBUG(D_INFO, "Target idx %d is %s conn %#llx\n", i,
94                        tgt->ltd_uuid.uuid, tgt->ltd_exp->exp_handle.h_cookie);
95
96                 if (obd_uuid_equals(uuid, &tgt->ltd_uuid))
97                         break;
98         }
99
100         if (i == lmv->desc.ld_tgt_count) {
101                 rc = -EINVAL;
102                 goto out_lmv_lock;
103         }
104
105         obd = class_exp2obd(tgt->ltd_exp);
106         if (!obd) {
107                 rc = -ENOTCONN;
108                 goto out_lmv_lock;
109         }
110
111         CDEBUG(D_INFO, "Found OBD %s=%s device %d (%p) type %s at LMV idx %d\n",
112                obd->obd_name, obd->obd_uuid.uuid, obd->obd_minor, obd,
113                obd->obd_type->typ_name, i);
114         LASSERT(strcmp(obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0);
115
116         if (tgt->ltd_active == activate) {
117                 CDEBUG(D_INFO, "OBD %p already %sactive!\n", obd,
118                        activate ? "" : "in");
119                 goto out_lmv_lock;
120         }
121
122         CDEBUG(D_INFO, "Marking OBD %p %sactive\n", obd,
123                activate ? "" : "in");
124         lmv_activate_target(lmv, tgt, activate);
125
126  out_lmv_lock:
127         spin_unlock(&lmv->lmv_lock);
128         return rc;
129 }
130
131 static struct obd_uuid *lmv_get_uuid(struct obd_export *exp)
132 {
133         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
134         struct lmv_tgt_desc *tgt = lmv->tgts[0];
135
136         return tgt ? obd_get_uuid(tgt->ltd_exp) : NULL;
137 }
138
139 static int lmv_notify(struct obd_device *obd, struct obd_device *watched,
140                       enum obd_notify_event ev, void *data)
141 {
142         struct obd_connect_data *conn_data;
143         struct lmv_obd    *lmv = &obd->u.lmv;
144         struct obd_uuid  *uuid;
145         int                   rc = 0;
146
147         if (strcmp(watched->obd_type->typ_name, LUSTRE_MDC_NAME)) {
148                 CERROR("unexpected notification of %s %s!\n",
149                        watched->obd_type->typ_name,
150                        watched->obd_name);
151                 return -EINVAL;
152         }
153
154         uuid = &watched->u.cli.cl_target_uuid;
155         if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
156                 /*
157                  * Set MDC as active before notifying the observer, so the
158                  * observer can use the MDC normally.
159                  */
160                 rc = lmv_set_mdc_active(lmv, uuid,
161                                         ev == OBD_NOTIFY_ACTIVE);
162                 if (rc) {
163                         CERROR("%sactivation of %s failed: %d\n",
164                                ev == OBD_NOTIFY_ACTIVE ? "" : "de",
165                                uuid->uuid, rc);
166                         return rc;
167                 }
168         } else if (ev == OBD_NOTIFY_OCD) {
169                 conn_data = &watched->u.cli.cl_import->imp_connect_data;
170                 /*
171                  * XXX: Make sure that ocd_connect_flags from all targets are
172                  * the same. Otherwise one of MDTs runs wrong version or
173                  * something like this.  --umka
174                  */
175                 obd->obd_self_export->exp_connect_data = *conn_data;
176         }
177
178         /*
179          * Pass the notification up the chain.
180          */
181         if (obd->obd_observer)
182                 rc = obd_notify(obd->obd_observer, watched, ev, data);
183
184         return rc;
185 }
186
187 static int lmv_connect(const struct lu_env *env,
188                        struct obd_export **pexp, struct obd_device *obd,
189                        struct obd_uuid *cluuid, struct obd_connect_data *data,
190                        void *localdata)
191 {
192         struct lmv_obd  *lmv = &obd->u.lmv;
193         struct lustre_handle  conn = { 0 };
194         struct obd_export *exp;
195         int                 rc = 0;
196
197         rc = class_connect(&conn, obd, cluuid);
198         if (rc) {
199                 CERROR("class_connection() returned %d\n", rc);
200                 return rc;
201         }
202
203         exp = class_conn2export(&conn);
204
205         lmv->connected = 0;
206         lmv->cluuid = *cluuid;
207         lmv->conn_data = *data;
208
209         lmv->lmv_tgts_kobj = kobject_create_and_add("target_obds",
210                                                     &obd->obd_kobj);
211         rc = lmv_check_connect(obd);
212         if (rc)
213                 goto out_sysfs;
214
215         *pexp = exp;
216
217         return rc;
218
219 out_sysfs:
220         if (lmv->lmv_tgts_kobj)
221                 kobject_put(lmv->lmv_tgts_kobj);
222
223         class_disconnect(exp);
224
225         return rc;
226 }
227
228 static int lmv_init_ea_size(struct obd_export *exp, u32 easize, u32 def_easize)
229 {
230         struct obd_device   *obd = exp->exp_obd;
231         struct lmv_obd      *lmv = &obd->u.lmv;
232         u32 i;
233         int               rc = 0;
234         int               change = 0;
235
236         if (lmv->max_easize < easize) {
237                 lmv->max_easize = easize;
238                 change = 1;
239         }
240         if (lmv->max_def_easize < def_easize) {
241                 lmv->max_def_easize = def_easize;
242                 change = 1;
243         }
244
245         if (change == 0)
246                 return 0;
247
248         if (lmv->connected == 0)
249                 return 0;
250
251         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
252                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
253
254                 if (!tgt || !tgt->ltd_exp || !tgt->ltd_active) {
255                         CWARN("%s: NULL export for %d\n", obd->obd_name, i);
256                         continue;
257                 }
258
259                 rc = md_init_ea_size(tgt->ltd_exp, easize, def_easize);
260                 if (rc) {
261                         CERROR("%s: obd_init_ea_size() failed on MDT target %d: rc = %d\n",
262                                obd->obd_name, i, rc);
263                         break;
264                 }
265         }
266         return rc;
267 }
268
269 #define MAX_STRING_SIZE 128
270
271 static int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
272 {
273         struct lmv_obd    *lmv = &obd->u.lmv;
274         struct obd_uuid  *cluuid = &lmv->cluuid;
275         struct obd_uuid   lmv_mdc_uuid = { "LMV_MDC_UUID" };
276         struct obd_device       *mdc_obd;
277         struct obd_export       *mdc_exp;
278         struct lu_fld_target     target;
279         int                   rc;
280
281         mdc_obd = class_find_client_obd(&tgt->ltd_uuid, LUSTRE_MDC_NAME,
282                                         &obd->obd_uuid);
283         if (!mdc_obd) {
284                 CERROR("target %s not attached\n", tgt->ltd_uuid.uuid);
285                 return -EINVAL;
286         }
287
288         CDEBUG(D_CONFIG, "connect to %s(%s) - %s, %s FOR %s\n",
289                mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
290                tgt->ltd_uuid.uuid, obd->obd_uuid.uuid, cluuid->uuid);
291
292         if (!mdc_obd->obd_set_up) {
293                 CERROR("target %s is not set up\n", tgt->ltd_uuid.uuid);
294                 return -EINVAL;
295         }
296
297         rc = obd_connect(NULL, &mdc_exp, mdc_obd, &lmv_mdc_uuid,
298                          &lmv->conn_data, NULL);
299         if (rc) {
300                 CERROR("target %s connect error %d\n", tgt->ltd_uuid.uuid, rc);
301                 return rc;
302         }
303
304         /*
305          * Init fid sequence client for this mdc and add new fld target.
306          */
307         rc = obd_fid_init(mdc_obd, mdc_exp, LUSTRE_SEQ_METADATA);
308         if (rc)
309                 return rc;
310
311         target.ft_srv = NULL;
312         target.ft_exp = mdc_exp;
313         target.ft_idx = tgt->ltd_idx;
314
315         fld_client_add_target(&lmv->lmv_fld, &target);
316
317         rc = obd_register_observer(mdc_obd, obd);
318         if (rc) {
319                 obd_disconnect(mdc_exp);
320                 CERROR("target %s register_observer error %d\n",
321                        tgt->ltd_uuid.uuid, rc);
322                 return rc;
323         }
324
325         if (obd->obd_observer) {
326                 /*
327                  * Tell the observer about the new target.
328                  */
329                 rc = obd_notify(obd->obd_observer, mdc_exp->exp_obd,
330                                 OBD_NOTIFY_ACTIVE,
331                                 (void *)(tgt - lmv->tgts[0]));
332                 if (rc) {
333                         obd_disconnect(mdc_exp);
334                         return rc;
335                 }
336         }
337
338         tgt->ltd_active = 1;
339         tgt->ltd_exp = mdc_exp;
340         lmv->desc.ld_active_tgt_count++;
341
342         md_init_ea_size(tgt->ltd_exp, lmv->max_easize, lmv->max_def_easize);
343
344         CDEBUG(D_CONFIG, "Connected to %s(%s) successfully (%d)\n",
345                mdc_obd->obd_name, mdc_obd->obd_uuid.uuid,
346                atomic_read(&obd->obd_refcount));
347
348         if (lmv->lmv_tgts_kobj)
349                 /* Even if we failed to create the link, that's fine */
350                 rc = sysfs_create_link(lmv->lmv_tgts_kobj, &mdc_obd->obd_kobj,
351                                        mdc_obd->obd_name);
352         return 0;
353 }
354
355 static void lmv_del_target(struct lmv_obd *lmv, int index)
356 {
357         if (!lmv->tgts[index])
358                 return;
359
360         kfree(lmv->tgts[index]);
361         lmv->tgts[index] = NULL;
362 }
363
364 static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp,
365                           __u32 index, int gen)
366 {
367         struct lmv_obd      *lmv = &obd->u.lmv;
368         struct obd_device *mdc_obd;
369         struct lmv_tgt_desc *tgt;
370         int orig_tgt_count = 0;
371         int               rc = 0;
372
373         CDEBUG(D_CONFIG, "Target uuid: %s. index %d\n", uuidp->uuid, index);
374
375         mdc_obd = class_find_client_obd(uuidp, LUSTRE_MDC_NAME,
376                                         &obd->obd_uuid);
377         if (!mdc_obd) {
378                 CERROR("%s: Target %s not attached: rc = %d\n",
379                        obd->obd_name, uuidp->uuid, -EINVAL);
380                 return -EINVAL;
381         }
382
383         mutex_lock(&lmv->lmv_init_mutex);
384
385         if ((index < lmv->tgts_size) && lmv->tgts[index]) {
386                 tgt = lmv->tgts[index];
387                 CERROR("%s: UUID %s already assigned at LOV target index %d: rc = %d\n",
388                        obd->obd_name,
389                        obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST);
390                 mutex_unlock(&lmv->lmv_init_mutex);
391                 return -EEXIST;
392         }
393
394         if (index >= lmv->tgts_size) {
395                 /* We need to reallocate the lmv target array. */
396                 struct lmv_tgt_desc **newtgts, **old = NULL;
397                 __u32 newsize = 1;
398                 __u32 oldsize = 0;
399
400                 while (newsize < index + 1)
401                         newsize <<= 1;
402                 newtgts = kcalloc(newsize, sizeof(*newtgts), GFP_NOFS);
403                 if (!newtgts) {
404                         mutex_unlock(&lmv->lmv_init_mutex);
405                         return -ENOMEM;
406                 }
407
408                 if (lmv->tgts_size) {
409                         memcpy(newtgts, lmv->tgts,
410                                sizeof(*newtgts) * lmv->tgts_size);
411                         old = lmv->tgts;
412                         oldsize = lmv->tgts_size;
413                 }
414
415                 lmv->tgts = newtgts;
416                 lmv->tgts_size = newsize;
417                 smp_rmb();
418                 kfree(old);
419
420                 CDEBUG(D_CONFIG, "tgts: %p size: %d\n", lmv->tgts,
421                        lmv->tgts_size);
422         }
423
424         tgt = kzalloc(sizeof(*tgt), GFP_NOFS);
425         if (!tgt) {
426                 mutex_unlock(&lmv->lmv_init_mutex);
427                 return -ENOMEM;
428         }
429
430         mutex_init(&tgt->ltd_fid_mutex);
431         tgt->ltd_idx = index;
432         tgt->ltd_uuid = *uuidp;
433         tgt->ltd_active = 0;
434         lmv->tgts[index] = tgt;
435         if (index >= lmv->desc.ld_tgt_count) {
436                 orig_tgt_count = lmv->desc.ld_tgt_count;
437                 lmv->desc.ld_tgt_count = index + 1;
438         }
439
440         if (!lmv->connected) {
441                 /* lmv_check_connect() will connect this target. */
442                 mutex_unlock(&lmv->lmv_init_mutex);
443                 return rc;
444         }
445
446         /* Otherwise let's connect it ourselves */
447         mutex_unlock(&lmv->lmv_init_mutex);
448         rc = lmv_connect_mdc(obd, tgt);
449         if (rc) {
450                 spin_lock(&lmv->lmv_lock);
451                 if (lmv->desc.ld_tgt_count == index + 1)
452                         lmv->desc.ld_tgt_count = orig_tgt_count;
453                 memset(tgt, 0, sizeof(*tgt));
454                 spin_unlock(&lmv->lmv_lock);
455         } else {
456                 int easize = sizeof(struct lmv_stripe_md) +
457                              lmv->desc.ld_tgt_count * sizeof(struct lu_fid);
458                 lmv_init_ea_size(obd->obd_self_export, easize, 0);
459         }
460
461         return rc;
462 }
463
464 static int lmv_check_connect(struct obd_device *obd)
465 {
466         struct lmv_obd       *lmv = &obd->u.lmv;
467         struct lmv_tgt_desc  *tgt;
468         u32 i;
469         int                rc;
470         int                easize;
471
472         if (lmv->connected)
473                 return 0;
474
475         mutex_lock(&lmv->lmv_init_mutex);
476         if (lmv->connected) {
477                 mutex_unlock(&lmv->lmv_init_mutex);
478                 return 0;
479         }
480
481         if (lmv->desc.ld_tgt_count == 0) {
482                 mutex_unlock(&lmv->lmv_init_mutex);
483                 CERROR("%s: no targets configured.\n", obd->obd_name);
484                 return -EINVAL;
485         }
486
487         LASSERT(lmv->tgts);
488
489         if (!lmv->tgts[0]) {
490                 mutex_unlock(&lmv->lmv_init_mutex);
491                 CERROR("%s: no target configured for index 0.\n",
492                        obd->obd_name);
493                 return -EINVAL;
494         }
495
496         CDEBUG(D_CONFIG, "Time to connect %s to %s\n",
497                lmv->cluuid.uuid, obd->obd_name);
498
499         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
500                 tgt = lmv->tgts[i];
501                 if (!tgt)
502                         continue;
503                 rc = lmv_connect_mdc(obd, tgt);
504                 if (rc)
505                         goto out_disc;
506         }
507
508         lmv->connected = 1;
509         easize = lmv_mds_md_size(lmv->desc.ld_tgt_count, LMV_MAGIC);
510         lmv_init_ea_size(obd->obd_self_export, easize, 0);
511         mutex_unlock(&lmv->lmv_init_mutex);
512         return 0;
513
514  out_disc:
515         while (i-- > 0) {
516                 int rc2;
517
518                 tgt = lmv->tgts[i];
519                 if (!tgt)
520                         continue;
521                 tgt->ltd_active = 0;
522                 if (tgt->ltd_exp) {
523                         --lmv->desc.ld_active_tgt_count;
524                         rc2 = obd_disconnect(tgt->ltd_exp);
525                         if (rc2) {
526                                 CERROR("LMV target %s disconnect on MDC idx %d: error %d\n",
527                                        tgt->ltd_uuid.uuid, i, rc2);
528                         }
529                 }
530         }
531
532         mutex_unlock(&lmv->lmv_init_mutex);
533         return rc;
534 }
535
536 static int lmv_disconnect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
537 {
538         struct lmv_obd   *lmv = &obd->u.lmv;
539         struct obd_device      *mdc_obd;
540         int                  rc;
541
542         mdc_obd = class_exp2obd(tgt->ltd_exp);
543
544         if (mdc_obd) {
545                 mdc_obd->obd_force = obd->obd_force;
546                 mdc_obd->obd_fail = obd->obd_fail;
547                 mdc_obd->obd_no_recov = obd->obd_no_recov;
548
549                 if (lmv->lmv_tgts_kobj)
550                         sysfs_remove_link(lmv->lmv_tgts_kobj,
551                                           mdc_obd->obd_name);
552         }
553
554         rc = obd_fid_fini(tgt->ltd_exp->exp_obd);
555         if (rc)
556                 CERROR("Can't finalize fids factory\n");
557
558         CDEBUG(D_INFO, "Disconnected from %s(%s) successfully\n",
559                tgt->ltd_exp->exp_obd->obd_name,
560                tgt->ltd_exp->exp_obd->obd_uuid.uuid);
561
562         obd_register_observer(tgt->ltd_exp->exp_obd, NULL);
563         rc = obd_disconnect(tgt->ltd_exp);
564         if (rc) {
565                 if (tgt->ltd_active) {
566                         CERROR("Target %s disconnect error %d\n",
567                                tgt->ltd_uuid.uuid, rc);
568                 }
569         }
570
571         lmv_activate_target(lmv, tgt, 0);
572         tgt->ltd_exp = NULL;
573         return 0;
574 }
575
576 static int lmv_disconnect(struct obd_export *exp)
577 {
578         struct obd_device     *obd = class_exp2obd(exp);
579         struct lmv_obd  *lmv = &obd->u.lmv;
580         int                 rc;
581         u32 i;
582
583         if (!lmv->tgts)
584                 goto out_local;
585
586         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
587                 if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
588                         continue;
589
590                 lmv_disconnect_mdc(obd, lmv->tgts[i]);
591         }
592
593         if (lmv->lmv_tgts_kobj)
594                 kobject_put(lmv->lmv_tgts_kobj);
595
596 out_local:
597         /*
598          * This is the case when no real connection is established by
599          * lmv_check_connect().
600          */
601         if (!lmv->connected)
602                 class_export_put(exp);
603         rc = class_disconnect(exp);
604         lmv->connected = 0;
605         return rc;
606 }
607
608 static int lmv_fid2path(struct obd_export *exp, int len, void *karg,
609                         void __user *uarg)
610 {
611         struct obd_device       *obddev = class_exp2obd(exp);
612         struct lmv_obd          *lmv = &obddev->u.lmv;
613         struct getinfo_fid2path *gf;
614         struct lmv_tgt_desc     *tgt;
615         struct getinfo_fid2path *remote_gf = NULL;
616         int                     remote_gf_size = 0;
617         int                     rc;
618
619         gf = karg;
620         tgt = lmv_find_target(lmv, &gf->gf_fid);
621         if (IS_ERR(tgt))
622                 return PTR_ERR(tgt);
623
624 repeat_fid2path:
625         rc = obd_iocontrol(OBD_IOC_FID2PATH, tgt->ltd_exp, len, gf, uarg);
626         if (rc != 0 && rc != -EREMOTE)
627                 goto out_fid2path;
628
629         /* If remote_gf != NULL, it means just building the
630          * path on the remote MDT, copy this path segment to gf
631          */
632         if (remote_gf) {
633                 struct getinfo_fid2path *ori_gf;
634                 char *ptr;
635
636                 ori_gf = karg;
637                 if (strlen(ori_gf->gf_path) + 1 +
638                     strlen(gf->gf_path) + 1 > ori_gf->gf_pathlen) {
639                         rc = -EOVERFLOW;
640                         goto out_fid2path;
641                 }
642
643                 ptr = ori_gf->gf_path;
644
645                 memmove(ptr + strlen(gf->gf_path) + 1, ptr,
646                         strlen(ori_gf->gf_path));
647
648                 strcpy(ptr, gf->gf_path);
649                 ptr += strlen(gf->gf_path);
650                 *ptr = '/';
651         }
652
653         CDEBUG(D_INFO, "%s: get path %s " DFID " rec: %llu ln: %u\n",
654                tgt->ltd_exp->exp_obd->obd_name,
655                gf->gf_path, PFID(&gf->gf_fid), gf->gf_recno,
656                gf->gf_linkno);
657
658         if (rc == 0)
659                 goto out_fid2path;
660
661         /* sigh, has to go to another MDT to do path building further */
662         if (!remote_gf) {
663                 remote_gf_size = sizeof(*remote_gf) + PATH_MAX;
664                 remote_gf = kzalloc(remote_gf_size, GFP_NOFS);
665                 if (!remote_gf) {
666                         rc = -ENOMEM;
667                         goto out_fid2path;
668                 }
669                 remote_gf->gf_pathlen = PATH_MAX;
670         }
671
672         if (!fid_is_sane(&gf->gf_fid)) {
673                 CERROR("%s: invalid FID " DFID ": rc = %d\n",
674                        tgt->ltd_exp->exp_obd->obd_name,
675                        PFID(&gf->gf_fid), -EINVAL);
676                 rc = -EINVAL;
677                 goto out_fid2path;
678         }
679
680         tgt = lmv_find_target(lmv, &gf->gf_fid);
681         if (IS_ERR(tgt)) {
682                 rc = -EINVAL;
683                 goto out_fid2path;
684         }
685
686         remote_gf->gf_fid = gf->gf_fid;
687         remote_gf->gf_recno = -1;
688         remote_gf->gf_linkno = -1;
689         memset(remote_gf->gf_path, 0, remote_gf->gf_pathlen);
690         gf = remote_gf;
691         goto repeat_fid2path;
692
693 out_fid2path:
694         kfree(remote_gf);
695         return rc;
696 }
697
698 static int lmv_hsm_req_count(struct lmv_obd *lmv,
699                              const struct hsm_user_request *hur,
700                              const struct lmv_tgt_desc *tgt_mds)
701 {
702         u32 i, nr = 0;
703         struct lmv_tgt_desc    *curr_tgt;
704
705         /* count how many requests must be sent to the given target */
706         for (i = 0; i < hur->hur_request.hr_itemcount; i++) {
707                 curr_tgt = lmv_find_target(lmv, &hur->hur_user_item[i].hui_fid);
708                 if (IS_ERR(curr_tgt))
709                         return PTR_ERR(curr_tgt);
710                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid))
711                         nr++;
712         }
713         return nr;
714 }
715
716 static int lmv_hsm_req_build(struct lmv_obd *lmv,
717                              struct hsm_user_request *hur_in,
718                              const struct lmv_tgt_desc *tgt_mds,
719                              struct hsm_user_request *hur_out)
720 {
721         int                     i, nr_out;
722         struct lmv_tgt_desc    *curr_tgt;
723
724         /* build the hsm_user_request for the given target */
725         hur_out->hur_request = hur_in->hur_request;
726         nr_out = 0;
727         for (i = 0; i < hur_in->hur_request.hr_itemcount; i++) {
728                 curr_tgt = lmv_find_target(lmv,
729                                            &hur_in->hur_user_item[i].hui_fid);
730                 if (IS_ERR(curr_tgt))
731                         return PTR_ERR(curr_tgt);
732                 if (obd_uuid_equals(&curr_tgt->ltd_uuid, &tgt_mds->ltd_uuid)) {
733                         hur_out->hur_user_item[nr_out] =
734                                 hur_in->hur_user_item[i];
735                         nr_out++;
736                 }
737         }
738         hur_out->hur_request.hr_itemcount = nr_out;
739         memcpy(hur_data(hur_out), hur_data(hur_in),
740                hur_in->hur_request.hr_data_len);
741
742         return 0;
743 }
744
745 static int lmv_hsm_ct_unregister(struct lmv_obd *lmv, unsigned int cmd, int len,
746                                  struct lustre_kernelcomm *lk,
747                                  void __user *uarg)
748 {
749         __u32 i;
750
751         /* unregister request (call from llapi_hsm_copytool_fini) */
752         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
753                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
754
755                 if (!tgt || !tgt->ltd_exp)
756                         continue;
757
758                 /* best effort: try to clean as much as possible
759                  * (continue on error)
760                  */
761                 obd_iocontrol(cmd, lmv->tgts[i]->ltd_exp, len, lk, uarg);
762         }
763
764         /* Whatever the result, remove copytool from kuc groups.
765          * Unreached coordinators will get EPIPE on next requests
766          * and will unregister automatically.
767          */
768         return libcfs_kkuc_group_rem(lk->lk_uid, lk->lk_group);
769 }
770
771 static int lmv_hsm_ct_register(struct lmv_obd *lmv, unsigned int cmd, int len,
772                                struct lustre_kernelcomm *lk, void __user *uarg)
773 {
774         struct file *filp;
775         __u32 i, j;
776         int err, rc = 0;
777         bool any_set = false;
778         struct kkuc_ct_data kcd = { 0 };
779
780         /* All or nothing: try to register to all MDS.
781          * In case of failure, unregister from previous MDS,
782          * except if it because of inactive target.
783          */
784         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
785                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
786
787                 if (!tgt || !tgt->ltd_exp)
788                         continue;
789
790                 err = obd_iocontrol(cmd, tgt->ltd_exp, len, lk, uarg);
791                 if (err) {
792                         if (tgt->ltd_active) {
793                                 /* permanent error */
794                                 CERROR("error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
795                                        tgt->ltd_uuid.uuid, i, cmd, err);
796                                 rc = err;
797                                 lk->lk_flags |= LK_FLG_STOP;
798                                 /* unregister from previous MDS */
799                                 for (j = 0; j < i; j++) {
800                                         tgt = lmv->tgts[j];
801
802                                         if (!tgt || !tgt->ltd_exp)
803                                                 continue;
804                                         obd_iocontrol(cmd, tgt->ltd_exp, len,
805                                                       lk, uarg);
806                                 }
807                                 return rc;
808                         }
809                         /* else: transient error.
810                          * kuc will register to the missing MDT when it is back
811                          */
812                 } else {
813                         any_set = true;
814                 }
815         }
816
817         if (!any_set)
818                 /* no registration done: return error */
819                 return -ENOTCONN;
820
821         /* at least one registration done, with no failure */
822         filp = fget(lk->lk_wfd);
823         if (!filp)
824                 return -EBADF;
825
826         kcd.kcd_magic = KKUC_CT_DATA_MAGIC;
827         kcd.kcd_uuid = lmv->cluuid;
828         kcd.kcd_archive = lk->lk_data;
829
830         rc = libcfs_kkuc_group_add(filp, lk->lk_uid, lk->lk_group,
831                                    &kcd, sizeof(kcd));
832         if (rc)
833                 fput(filp);
834
835         return rc;
836 }
837
838 static int lmv_iocontrol(unsigned int cmd, struct obd_export *exp,
839                          int len, void *karg, void __user *uarg)
840 {
841         struct obd_device    *obddev = class_exp2obd(exp);
842         struct lmv_obd       *lmv = &obddev->u.lmv;
843         struct lmv_tgt_desc *tgt = NULL;
844         u32 i = 0;
845         int                rc = 0;
846         int                set = 0;
847         u32 count = lmv->desc.ld_tgt_count;
848
849         if (count == 0)
850                 return -ENOTTY;
851
852         switch (cmd) {
853         case IOC_OBD_STATFS: {
854                 struct obd_ioctl_data *data = karg;
855                 struct obd_device *mdc_obd;
856                 struct obd_statfs stat_buf = {0};
857                 __u32 index;
858
859                 memcpy(&index, data->ioc_inlbuf2, sizeof(__u32));
860                 if (index >= count)
861                         return -ENODEV;
862
863                 tgt = lmv->tgts[index];
864                 if (!tgt || !tgt->ltd_active)
865                         return -ENODATA;
866
867                 mdc_obd = class_exp2obd(tgt->ltd_exp);
868                 if (!mdc_obd)
869                         return -EINVAL;
870
871                 /* copy UUID */
872                 if (copy_to_user(data->ioc_pbuf2, obd2cli_tgt(mdc_obd),
873                                  min((int)data->ioc_plen2,
874                                      (int)sizeof(struct obd_uuid))))
875                         return -EFAULT;
876
877                 rc = obd_statfs(NULL, tgt->ltd_exp, &stat_buf,
878                                 cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
879                                 0);
880                 if (rc)
881                         return rc;
882                 if (copy_to_user(data->ioc_pbuf1, &stat_buf,
883                                  min((int)data->ioc_plen1,
884                                      (int)sizeof(stat_buf))))
885                         return -EFAULT;
886                 break;
887         }
888         case OBD_IOC_QUOTACTL: {
889                 struct if_quotactl *qctl = karg;
890                 struct obd_quotactl *oqctl;
891
892                 if (qctl->qc_valid == QC_MDTIDX) {
893                         if (count <= qctl->qc_idx)
894                                 return -EINVAL;
895
896                         tgt = lmv->tgts[qctl->qc_idx];
897                         if (!tgt || !tgt->ltd_exp)
898                                 return -EINVAL;
899                 } else if (qctl->qc_valid == QC_UUID) {
900                         for (i = 0; i < count; i++) {
901                                 tgt = lmv->tgts[i];
902                                 if (!tgt)
903                                         continue;
904                                 if (!obd_uuid_equals(&tgt->ltd_uuid,
905                                                      &qctl->obd_uuid))
906                                         continue;
907
908                                 if (!tgt->ltd_exp)
909                                         return -EINVAL;
910
911                                 break;
912                         }
913                 } else {
914                         return -EINVAL;
915                 }
916
917                 if (i >= count)
918                         return -EAGAIN;
919
920                 LASSERT(tgt && tgt->ltd_exp);
921                 oqctl = kzalloc(sizeof(*oqctl), GFP_NOFS);
922                 if (!oqctl)
923                         return -ENOMEM;
924
925                 QCTL_COPY(oqctl, qctl);
926                 rc = obd_quotactl(tgt->ltd_exp, oqctl);
927                 if (rc == 0) {
928                         QCTL_COPY(qctl, oqctl);
929                         qctl->qc_valid = QC_MDTIDX;
930                         qctl->obd_uuid = tgt->ltd_uuid;
931                 }
932                 kfree(oqctl);
933                 break;
934         }
935         case OBD_IOC_CHANGELOG_SEND:
936         case OBD_IOC_CHANGELOG_CLEAR: {
937                 struct ioc_changelog *icc = karg;
938
939                 if (icc->icc_mdtindex >= count)
940                         return -ENODEV;
941
942                 tgt = lmv->tgts[icc->icc_mdtindex];
943                 if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
944                         return -ENODEV;
945                 rc = obd_iocontrol(cmd, tgt->ltd_exp, sizeof(*icc), icc, NULL);
946                 break;
947         }
948         case LL_IOC_GET_CONNECT_FLAGS: {
949                 tgt = lmv->tgts[0];
950
951                 if (!tgt || !tgt->ltd_exp)
952                         return -ENODATA;
953                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
954                 break;
955         }
956         case LL_IOC_FID2MDTIDX: {
957                 struct lu_fid *fid = karg;
958                 int mdt_index;
959
960                 rc = lmv_fld_lookup(lmv, fid, &mdt_index);
961                 if (rc)
962                         return rc;
963
964                 /*
965                  * Note: this is from llite(see ll_dir_ioctl()), @uarg does not
966                  * point to user space memory for FID2MDTIDX.
967                  */
968                 *(__u32 *)uarg = mdt_index;
969                 break;
970         }
971         case OBD_IOC_FID2PATH: {
972                 rc = lmv_fid2path(exp, len, karg, uarg);
973                 break;
974         }
975         case LL_IOC_HSM_STATE_GET:
976         case LL_IOC_HSM_STATE_SET:
977         case LL_IOC_HSM_ACTION: {
978                 struct md_op_data       *op_data = karg;
979
980                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
981                 if (IS_ERR(tgt))
982                         return PTR_ERR(tgt);
983
984                 if (!tgt->ltd_exp)
985                         return -EINVAL;
986
987                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
988                 break;
989         }
990         case LL_IOC_HSM_PROGRESS: {
991                 const struct hsm_progress_kernel *hpk = karg;
992
993                 tgt = lmv_find_target(lmv, &hpk->hpk_fid);
994                 if (IS_ERR(tgt))
995                         return PTR_ERR(tgt);
996                 rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
997                 break;
998         }
999         case LL_IOC_HSM_REQUEST: {
1000                 struct hsm_user_request *hur = karg;
1001                 unsigned int reqcount = hur->hur_request.hr_itemcount;
1002
1003                 if (reqcount == 0)
1004                         return 0;
1005
1006                 /* if the request is about a single fid
1007                  * or if there is a single MDS, no need to split
1008                  * the request.
1009                  */
1010                 if (reqcount == 1 || count == 1) {
1011                         tgt = lmv_find_target(lmv,
1012                                               &hur->hur_user_item[0].hui_fid);
1013                         if (IS_ERR(tgt))
1014                                 return PTR_ERR(tgt);
1015                         rc = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1016                 } else {
1017                         /* split fid list to their respective MDS */
1018                         for (i = 0; i < count; i++) {
1019                                 struct hsm_user_request *req;
1020                                 size_t reqlen;
1021                                 int nr, rc1;
1022
1023                                 tgt = lmv->tgts[i];
1024                                 if (!tgt || !tgt->ltd_exp)
1025                                         continue;
1026
1027                                 nr = lmv_hsm_req_count(lmv, hur, tgt);
1028                                 if (nr < 0)
1029                                         return nr;
1030                                 if (nr == 0) /* nothing for this MDS */
1031                                         continue;
1032
1033                                 /* build a request with fids for this MDS */
1034                                 reqlen = offsetof(typeof(*hur),
1035                                                   hur_user_item[nr])
1036                                          + hur->hur_request.hr_data_len;
1037                                 req = libcfs_kvzalloc(reqlen, GFP_NOFS);
1038                                 if (!req)
1039                                         return -ENOMEM;
1040
1041                                 rc1 = lmv_hsm_req_build(lmv, hur, tgt, req);
1042                                 if (rc1 < 0)
1043                                         goto hsm_req_err;
1044
1045                                 rc1 = obd_iocontrol(cmd, tgt->ltd_exp, reqlen,
1046                                                     req, uarg);
1047 hsm_req_err:
1048                                 if (rc1 != 0 && rc == 0)
1049                                         rc = rc1;
1050                                 kvfree(req);
1051                         }
1052                 }
1053                 break;
1054         }
1055         case LL_IOC_LOV_SWAP_LAYOUTS: {
1056                 struct md_op_data       *op_data = karg;
1057                 struct lmv_tgt_desc     *tgt1, *tgt2;
1058
1059                 tgt1 = lmv_find_target(lmv, &op_data->op_fid1);
1060                 if (IS_ERR(tgt1))
1061                         return PTR_ERR(tgt1);
1062
1063                 tgt2 = lmv_find_target(lmv, &op_data->op_fid2);
1064                 if (IS_ERR(tgt2))
1065                         return PTR_ERR(tgt2);
1066
1067                 if (!tgt1->ltd_exp || !tgt2->ltd_exp)
1068                         return -EINVAL;
1069
1070                 /* only files on same MDT can have their layouts swapped */
1071                 if (tgt1->ltd_idx != tgt2->ltd_idx)
1072                         return -EPERM;
1073
1074                 rc = obd_iocontrol(cmd, tgt1->ltd_exp, len, karg, uarg);
1075                 break;
1076         }
1077         case LL_IOC_HSM_CT_START: {
1078                 struct lustre_kernelcomm *lk = karg;
1079
1080                 if (lk->lk_flags & LK_FLG_STOP)
1081                         rc = lmv_hsm_ct_unregister(lmv, cmd, len, lk, uarg);
1082                 else
1083                         rc = lmv_hsm_ct_register(lmv, cmd, len, lk, uarg);
1084                 break;
1085         }
1086         default:
1087                 for (i = 0; i < count; i++) {
1088                         struct obd_device *mdc_obd;
1089                         int err;
1090
1091                         tgt = lmv->tgts[i];
1092                         if (!tgt || !tgt->ltd_exp)
1093                                 continue;
1094                         /* ll_umount_begin() sets force flag but for lmv, not
1095                          * mdc. Let's pass it through
1096                          */
1097                         mdc_obd = class_exp2obd(tgt->ltd_exp);
1098                         mdc_obd->obd_force = obddev->obd_force;
1099                         err = obd_iocontrol(cmd, tgt->ltd_exp, len, karg, uarg);
1100                         if (err) {
1101                                 if (tgt->ltd_active) {
1102                                         CERROR("%s: error: iocontrol MDC %s on MDTidx %d cmd %x: err = %d\n",
1103                                                lmv2obd_dev(lmv)->obd_name,
1104                                                tgt->ltd_uuid.uuid, i, cmd, err);
1105                                         if (!rc)
1106                                                 rc = err;
1107                                 }
1108                         } else {
1109                                 set = 1;
1110                         }
1111                 }
1112                 if (!set && !rc)
1113                         rc = -EIO;
1114         }
1115         return rc;
1116 }
1117
1118 /**
1119  * This is _inode_ placement policy function (not name).
1120  */
1121 static int lmv_placement_policy(struct obd_device *obd,
1122                                 struct md_op_data *op_data, u32 *mds)
1123 {
1124         struct lmv_obd    *lmv = &obd->u.lmv;
1125
1126         LASSERT(mds);
1127
1128         if (lmv->desc.ld_tgt_count == 1) {
1129                 *mds = 0;
1130                 return 0;
1131         }
1132
1133         if (op_data->op_default_stripe_offset != -1) {
1134                 *mds = op_data->op_default_stripe_offset;
1135                 return 0;
1136         }
1137
1138         /**
1139          * If stripe_offset is provided during setdirstripe
1140          * (setdirstripe -i xx), xx MDS will be chosen.
1141          */
1142         if (op_data->op_cli_flags & CLI_SET_MEA && op_data->op_data) {
1143                 struct lmv_user_md *lum;
1144
1145                 lum = op_data->op_data;
1146                 if (le32_to_cpu(lum->lum_stripe_offset) != (__u32)-1) {
1147                         *mds = le32_to_cpu(lum->lum_stripe_offset);
1148                 } else {
1149                         /*
1150                          * -1 means default, which will be in the same MDT with
1151                          * the stripe
1152                          */
1153                         *mds = op_data->op_mds;
1154                         lum->lum_stripe_offset = cpu_to_le32(op_data->op_mds);
1155                 }
1156         } else {
1157                 /*
1158                  * Allocate new fid on target according to operation type and
1159                  * parent home mds.
1160                  */
1161                 *mds = op_data->op_mds;
1162         }
1163
1164         return 0;
1165 }
1166
1167 int __lmv_fid_alloc(struct lmv_obd *lmv, struct lu_fid *fid, u32 mds)
1168 {
1169         struct lmv_tgt_desc     *tgt;
1170         int                      rc;
1171
1172         tgt = lmv_get_target(lmv, mds, NULL);
1173         if (IS_ERR(tgt))
1174                 return PTR_ERR(tgt);
1175
1176         /*
1177          * New seq alloc and FLD setup should be atomic. Otherwise we may find
1178          * on server that seq in new allocated fid is not yet known.
1179          */
1180         mutex_lock(&tgt->ltd_fid_mutex);
1181
1182         if (tgt->ltd_active == 0 || !tgt->ltd_exp) {
1183                 rc = -ENODEV;
1184                 goto out;
1185         }
1186
1187         /*
1188          * Asking underlaying tgt layer to allocate new fid.
1189          */
1190         rc = obd_fid_alloc(NULL, tgt->ltd_exp, fid, NULL);
1191         if (rc > 0) {
1192                 LASSERT(fid_is_sane(fid));
1193                 rc = 0;
1194         }
1195
1196 out:
1197         mutex_unlock(&tgt->ltd_fid_mutex);
1198         return rc;
1199 }
1200
1201 int lmv_fid_alloc(const struct lu_env *env, struct obd_export *exp,
1202                   struct lu_fid *fid, struct md_op_data *op_data)
1203 {
1204         struct obd_device     *obd = class_exp2obd(exp);
1205         struct lmv_obd  *lmv = &obd->u.lmv;
1206         u32                    mds = 0;
1207         int                 rc;
1208
1209         LASSERT(op_data);
1210         LASSERT(fid);
1211
1212         rc = lmv_placement_policy(obd, op_data, &mds);
1213         if (rc) {
1214                 CERROR("Can't get target for allocating fid, rc %d\n",
1215                        rc);
1216                 return rc;
1217         }
1218
1219         rc = __lmv_fid_alloc(lmv, fid, mds);
1220         if (rc) {
1221                 CERROR("Can't alloc new fid, rc %d\n", rc);
1222                 return rc;
1223         }
1224
1225         return rc;
1226 }
1227
1228 static int lmv_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
1229 {
1230         struct lmv_obd       *lmv = &obd->u.lmv;
1231         struct lprocfs_static_vars  lvars = { NULL };
1232         struct lmv_desc     *desc;
1233         int                      rc;
1234
1235         if (LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
1236                 CERROR("LMV setup requires a descriptor\n");
1237                 return -EINVAL;
1238         }
1239
1240         desc = (struct lmv_desc *)lustre_cfg_buf(lcfg, 1);
1241         if (sizeof(*desc) > LUSTRE_CFG_BUFLEN(lcfg, 1)) {
1242                 CERROR("Lmv descriptor size wrong: %d > %d\n",
1243                        (int)sizeof(*desc), LUSTRE_CFG_BUFLEN(lcfg, 1));
1244                 return -EINVAL;
1245         }
1246
1247         lmv->tgts_size = 32U;
1248         lmv->tgts = kcalloc(lmv->tgts_size, sizeof(*lmv->tgts), GFP_NOFS);
1249         if (!lmv->tgts)
1250                 return -ENOMEM;
1251
1252         obd_str2uuid(&lmv->desc.ld_uuid, desc->ld_uuid.uuid);
1253         lmv->desc.ld_tgt_count = 0;
1254         lmv->desc.ld_active_tgt_count = 0;
1255         lmv->max_def_easize = 0;
1256         lmv->max_easize = 0;
1257
1258         spin_lock_init(&lmv->lmv_lock);
1259         mutex_init(&lmv->lmv_init_mutex);
1260
1261         lprocfs_lmv_init_vars(&lvars);
1262
1263         lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
1264         rc = ldebugfs_seq_create(obd->obd_debugfs_entry, "target_obd",
1265                                  0444, &lmv_proc_target_fops, obd);
1266         if (rc)
1267                 CWARN("%s: error adding LMV target_obd file: rc = %d\n",
1268                       obd->obd_name, rc);
1269         rc = fld_client_init(&lmv->lmv_fld, obd->obd_name,
1270                              LUSTRE_CLI_FLD_HASH_DHT);
1271         if (rc) {
1272                 CERROR("Can't init FLD, err %d\n", rc);
1273                 goto out;
1274         }
1275
1276         return 0;
1277
1278 out:
1279         return rc;
1280 }
1281
1282 static int lmv_cleanup(struct obd_device *obd)
1283 {
1284         struct lmv_obd   *lmv = &obd->u.lmv;
1285
1286         fld_client_fini(&lmv->lmv_fld);
1287         if (lmv->tgts) {
1288                 int i;
1289
1290                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1291                         if (!lmv->tgts[i])
1292                                 continue;
1293                         lmv_del_target(lmv, i);
1294                 }
1295                 kfree(lmv->tgts);
1296                 lmv->tgts_size = 0;
1297         }
1298         return 0;
1299 }
1300
1301 static int lmv_process_config(struct obd_device *obd, u32 len, void *buf)
1302 {
1303         struct lustre_cfg       *lcfg = buf;
1304         struct obd_uuid         obd_uuid;
1305         int                     gen;
1306         __u32                   index;
1307         int                     rc;
1308
1309         switch (lcfg->lcfg_command) {
1310         case LCFG_ADD_MDC:
1311                 /* modify_mdc_tgts add 0:lustre-clilmv  1:lustre-MDT0000_UUID
1312                  * 2:0  3:1  4:lustre-MDT0000-mdc_UUID
1313                  */
1314                 if (LUSTRE_CFG_BUFLEN(lcfg, 1) > sizeof(obd_uuid.uuid)) {
1315                         rc = -EINVAL;
1316                         goto out;
1317                 }
1318
1319                 obd_str2uuid(&obd_uuid,  lustre_cfg_buf(lcfg, 1));
1320
1321                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%u", &index) != 1) {
1322                         rc = -EINVAL;
1323                         goto out;
1324                 }
1325                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1) {
1326                         rc = -EINVAL;
1327                         goto out;
1328                 }
1329                 rc = lmv_add_target(obd, &obd_uuid, index, gen);
1330                 goto out;
1331         default:
1332                 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
1333                 rc = -EINVAL;
1334                 goto out;
1335         }
1336 out:
1337         return rc;
1338 }
1339
1340 static int lmv_statfs(const struct lu_env *env, struct obd_export *exp,
1341                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
1342 {
1343         struct obd_device     *obd = class_exp2obd(exp);
1344         struct lmv_obd  *lmv = &obd->u.lmv;
1345         struct obd_statfs     *temp;
1346         int                 rc = 0;
1347         u32 i;
1348
1349         temp = kzalloc(sizeof(*temp), GFP_NOFS);
1350         if (!temp)
1351                 return -ENOMEM;
1352
1353         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1354                 if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1355                         continue;
1356
1357                 rc = obd_statfs(env, lmv->tgts[i]->ltd_exp, temp,
1358                                 max_age, flags);
1359                 if (rc) {
1360                         CERROR("can't stat MDS #%d (%s), error %d\n", i,
1361                                lmv->tgts[i]->ltd_exp->exp_obd->obd_name,
1362                                rc);
1363                         goto out_free_temp;
1364                 }
1365
1366                 if (i == 0) {
1367                         *osfs = *temp;
1368                         /* If the statfs is from mount, it will needs
1369                          * retrieve necessary information from MDT0.
1370                          * i.e. mount does not need the merged osfs
1371                          * from all of MDT.
1372                          * And also clients can be mounted as long as
1373                          * MDT0 is in service
1374                          */
1375                         if (flags & OBD_STATFS_FOR_MDT0)
1376                                 goto out_free_temp;
1377                 } else {
1378                         osfs->os_bavail += temp->os_bavail;
1379                         osfs->os_blocks += temp->os_blocks;
1380                         osfs->os_ffree += temp->os_ffree;
1381                         osfs->os_files += temp->os_files;
1382                 }
1383         }
1384
1385 out_free_temp:
1386         kfree(temp);
1387         return rc;
1388 }
1389
1390 static int lmv_getstatus(struct obd_export *exp,
1391                          struct lu_fid *fid)
1392 {
1393         struct obd_device    *obd = exp->exp_obd;
1394         struct lmv_obd       *lmv = &obd->u.lmv;
1395
1396         return md_getstatus(lmv->tgts[0]->ltd_exp, fid);
1397 }
1398
1399 static int lmv_getxattr(struct obd_export *exp, const struct lu_fid *fid,
1400                         u64 valid, const char *name,
1401                         const char *input, int input_size, int output_size,
1402                         int flags, struct ptlrpc_request **request)
1403 {
1404         struct obd_device      *obd = exp->exp_obd;
1405         struct lmv_obd   *lmv = &obd->u.lmv;
1406         struct lmv_tgt_desc    *tgt;
1407
1408         tgt = lmv_find_target(lmv, fid);
1409         if (IS_ERR(tgt))
1410                 return PTR_ERR(tgt);
1411
1412         return md_getxattr(tgt->ltd_exp, fid, valid, name, input,
1413                          input_size, output_size, flags, request);
1414 }
1415
1416 static int lmv_setxattr(struct obd_export *exp, const struct lu_fid *fid,
1417                         u64 valid, const char *name,
1418                         const char *input, int input_size, int output_size,
1419                         int flags, __u32 suppgid,
1420                         struct ptlrpc_request **request)
1421 {
1422         struct obd_device      *obd = exp->exp_obd;
1423         struct lmv_obd   *lmv = &obd->u.lmv;
1424         struct lmv_tgt_desc    *tgt;
1425
1426         tgt = lmv_find_target(lmv, fid);
1427         if (IS_ERR(tgt))
1428                 return PTR_ERR(tgt);
1429
1430         return md_setxattr(tgt->ltd_exp, fid, valid, name, input,
1431                          input_size, output_size, flags, suppgid,
1432                          request);
1433 }
1434
1435 static int lmv_getattr(struct obd_export *exp, struct md_op_data *op_data,
1436                        struct ptlrpc_request **request)
1437 {
1438         struct obd_device       *obd = exp->exp_obd;
1439         struct lmv_obd    *lmv = &obd->u.lmv;
1440         struct lmv_tgt_desc     *tgt;
1441
1442         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1443         if (IS_ERR(tgt))
1444                 return PTR_ERR(tgt);
1445
1446         if (op_data->op_flags & MF_GET_MDT_IDX) {
1447                 op_data->op_mds = tgt->ltd_idx;
1448                 return 0;
1449         }
1450
1451         return md_getattr(tgt->ltd_exp, op_data, request);
1452 }
1453
1454 static int lmv_null_inode(struct obd_export *exp, const struct lu_fid *fid)
1455 {
1456         struct obd_device   *obd = exp->exp_obd;
1457         struct lmv_obd      *lmv = &obd->u.lmv;
1458         u32 i;
1459
1460         CDEBUG(D_INODE, "CBDATA for " DFID "\n", PFID(fid));
1461
1462         /*
1463          * With DNE every object can have two locks in different namespaces:
1464          * lookup lock in space of MDT storing direntry and update/open lock in
1465          * space of MDT storing inode.
1466          */
1467         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
1468                 if (!lmv->tgts[i] || !lmv->tgts[i]->ltd_exp)
1469                         continue;
1470                 md_null_inode(lmv->tgts[i]->ltd_exp, fid);
1471         }
1472
1473         return 0;
1474 }
1475
1476 static int lmv_close(struct obd_export *exp, struct md_op_data *op_data,
1477                      struct md_open_data *mod, struct ptlrpc_request **request)
1478 {
1479         struct obd_device     *obd = exp->exp_obd;
1480         struct lmv_obd  *lmv = &obd->u.lmv;
1481         struct lmv_tgt_desc   *tgt;
1482
1483         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1484         if (IS_ERR(tgt))
1485                 return PTR_ERR(tgt);
1486
1487         CDEBUG(D_INODE, "CLOSE " DFID "\n", PFID(&op_data->op_fid1));
1488         return md_close(tgt->ltd_exp, op_data, mod, request);
1489 }
1490
1491 /**
1492  * Choosing the MDT by name or FID in @op_data.
1493  * For non-striped directory, it will locate MDT by fid.
1494  * For striped-directory, it will locate MDT by name. And also
1495  * it will reset op_fid1 with the FID of the chosen stripe.
1496  **/
1497 static struct lmv_tgt_desc *
1498 lmv_locate_target_for_name(struct lmv_obd *lmv, struct lmv_stripe_md *lsm,
1499                            const char *name, int namelen, struct lu_fid *fid,
1500                            u32 *mds)
1501 {
1502         const struct lmv_oinfo *oinfo;
1503         struct lmv_tgt_desc *tgt;
1504
1505         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NAME_HASH)) {
1506                 if (cfs_fail_val >= lsm->lsm_md_stripe_count)
1507                         return ERR_PTR(-EBADF);
1508                 oinfo = &lsm->lsm_md_oinfo[cfs_fail_val];
1509         } else {
1510                 oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
1511                 if (IS_ERR(oinfo))
1512                         return ERR_CAST(oinfo);
1513         }
1514
1515         if (fid)
1516                 *fid = oinfo->lmo_fid;
1517         if (mds)
1518                 *mds = oinfo->lmo_mds;
1519
1520         tgt = lmv_get_target(lmv, oinfo->lmo_mds, NULL);
1521
1522         CDEBUG(D_INFO, "locate on mds %u " DFID "\n", oinfo->lmo_mds,
1523                PFID(&oinfo->lmo_fid));
1524         return tgt;
1525 }
1526
1527 /**
1528  * Locate mds by fid or name
1529  *
1530  * For striped directory (lsm != NULL), it will locate the stripe
1531  * by name hash (see lsm_name_to_stripe_info()). Note: if the hash_type
1532  * is unknown, it will return -EBADFD, and lmv_intent_lookup might need
1533  * walk through all of stripes to locate the entry.
1534  *
1535  * For normal direcotry, it will locate MDS by FID directly.
1536  * \param[in] lmv       LMV device
1537  * \param[in] op_data   client MD stack parameters, name, namelen
1538  *                      mds_num etc.
1539  * \param[in] fid       object FID used to locate MDS.
1540  *
1541  * retval               pointer to the lmv_tgt_desc if succeed.
1542  *                      ERR_PTR(errno) if failed.
1543  */
1544 struct lmv_tgt_desc*
1545 lmv_locate_mds(struct lmv_obd *lmv, struct md_op_data *op_data,
1546                struct lu_fid *fid)
1547 {
1548         struct lmv_stripe_md *lsm = op_data->op_mea1;
1549         struct lmv_tgt_desc *tgt;
1550
1551         /*
1552          * During creating VOLATILE file, it should honor the mdt
1553          * index if the file under striped dir is being restored, see
1554          * ct_restore().
1555          */
1556         if (op_data->op_bias & MDS_CREATE_VOLATILE &&
1557             (int)op_data->op_mds != -1) {
1558                 int i;
1559
1560                 tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
1561                 if (IS_ERR(tgt))
1562                         return tgt;
1563
1564                 if (lsm) {
1565                         /* refill the right parent fid */
1566                         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
1567                                 struct lmv_oinfo *oinfo;
1568
1569                                 oinfo = &lsm->lsm_md_oinfo[i];
1570                                 if (oinfo->lmo_mds == op_data->op_mds) {
1571                                         *fid = oinfo->lmo_fid;
1572                                         break;
1573                                 }
1574                         }
1575
1576                         if (i == lsm->lsm_md_stripe_count)
1577                                 *fid = lsm->lsm_md_oinfo[0].lmo_fid;
1578                 }
1579
1580                 return tgt;
1581         }
1582
1583         if (!lsm || !op_data->op_namelen) {
1584                 tgt = lmv_find_target(lmv, fid);
1585                 if (IS_ERR(tgt))
1586                         return tgt;
1587
1588                 op_data->op_mds = tgt->ltd_idx;
1589
1590                 return tgt;
1591         }
1592
1593         return lmv_locate_target_for_name(lmv, lsm, op_data->op_name,
1594                                           op_data->op_namelen, fid,
1595                                           &op_data->op_mds);
1596 }
1597
1598 static int lmv_create(struct obd_export *exp, struct md_op_data *op_data,
1599                       const void *data, size_t datalen, umode_t mode,
1600                       uid_t uid, gid_t gid, cfs_cap_t cap_effective,
1601                       __u64 rdev, struct ptlrpc_request **request)
1602 {
1603         struct obd_device       *obd = exp->exp_obd;
1604         struct lmv_obd    *lmv = &obd->u.lmv;
1605         struct lmv_tgt_desc     *tgt;
1606         int                   rc;
1607
1608         if (!lmv->desc.ld_active_tgt_count)
1609                 return -EIO;
1610
1611         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1612         if (IS_ERR(tgt))
1613                 return PTR_ERR(tgt);
1614
1615         CDEBUG(D_INODE, "CREATE name '%.*s' on " DFID " -> mds #%x\n",
1616                (int)op_data->op_namelen, op_data->op_name,
1617                PFID(&op_data->op_fid1), op_data->op_mds);
1618
1619         rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1620         if (rc)
1621                 return rc;
1622
1623         if (exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE) {
1624                 /*
1625                  * Send the create request to the MDT where the object
1626                  * will be located
1627                  */
1628                 tgt = lmv_find_target(lmv, &op_data->op_fid2);
1629                 if (IS_ERR(tgt))
1630                         return PTR_ERR(tgt);
1631
1632                 op_data->op_mds = tgt->ltd_idx;
1633         } else {
1634                 CDEBUG(D_CONFIG, "Server doesn't support striped dirs\n");
1635         }
1636
1637         CDEBUG(D_INODE, "CREATE obj " DFID " -> mds #%x\n",
1638                PFID(&op_data->op_fid1), op_data->op_mds);
1639
1640         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1641         rc = md_create(tgt->ltd_exp, op_data, data, datalen, mode, uid, gid,
1642                        cap_effective, rdev, request);
1643
1644         if (rc == 0) {
1645                 if (!*request)
1646                         return rc;
1647                 CDEBUG(D_INODE, "Created - " DFID "\n", PFID(&op_data->op_fid2));
1648         }
1649         return rc;
1650 }
1651
1652 static int
1653 lmv_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
1654             const union ldlm_policy_data *policy,
1655             struct lookup_intent *it, struct md_op_data *op_data,
1656             struct lustre_handle *lockh, __u64 extra_lock_flags)
1657 {
1658         struct obd_device       *obd = exp->exp_obd;
1659         struct lmv_obd     *lmv = &obd->u.lmv;
1660         struct lmv_tgt_desc      *tgt;
1661
1662         CDEBUG(D_INODE, "ENQUEUE '%s' on " DFID "\n",
1663                LL_IT2STR(it), PFID(&op_data->op_fid1));
1664
1665         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1666         if (IS_ERR(tgt))
1667                 return PTR_ERR(tgt);
1668
1669         CDEBUG(D_INODE, "ENQUEUE '%s' on " DFID " -> mds #%u\n",
1670                LL_IT2STR(it), PFID(&op_data->op_fid1), tgt->ltd_idx);
1671
1672         return md_enqueue(tgt->ltd_exp, einfo, policy, it, op_data, lockh,
1673                         extra_lock_flags);
1674 }
1675
1676 static int
1677 lmv_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
1678                  struct ptlrpc_request **preq)
1679 {
1680         struct ptlrpc_request   *req = NULL;
1681         struct obd_device       *obd = exp->exp_obd;
1682         struct lmv_obd    *lmv = &obd->u.lmv;
1683         struct lmv_tgt_desc     *tgt;
1684         struct mdt_body  *body;
1685         int                   rc;
1686
1687         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
1688         if (IS_ERR(tgt))
1689                 return PTR_ERR(tgt);
1690
1691         CDEBUG(D_INODE, "GETATTR_NAME for %*s on " DFID " -> mds #%u\n",
1692                (int)op_data->op_namelen, op_data->op_name,
1693                PFID(&op_data->op_fid1), tgt->ltd_idx);
1694
1695         rc = md_getattr_name(tgt->ltd_exp, op_data, preq);
1696         if (rc != 0)
1697                 return rc;
1698
1699         body = req_capsule_server_get(&(*preq)->rq_pill, &RMF_MDT_BODY);
1700         if (body->mbo_valid & OBD_MD_MDS) {
1701                 struct lu_fid rid = body->mbo_fid1;
1702
1703                 CDEBUG(D_INODE, "Request attrs for " DFID "\n",
1704                        PFID(&rid));
1705
1706                 tgt = lmv_find_target(lmv, &rid);
1707                 if (IS_ERR(tgt)) {
1708                         ptlrpc_req_finished(*preq);
1709                         *preq = NULL;
1710                         return PTR_ERR(tgt);
1711                 }
1712
1713                 op_data->op_fid1 = rid;
1714                 op_data->op_valid |= OBD_MD_FLCROSSREF;
1715                 op_data->op_namelen = 0;
1716                 op_data->op_name = NULL;
1717                 rc = md_getattr_name(tgt->ltd_exp, op_data, &req);
1718                 ptlrpc_req_finished(*preq);
1719                 *preq = req;
1720         }
1721
1722         return rc;
1723 }
1724
1725 #define md_op_data_fid(op_data, fl)                  \
1726         (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
1727          fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
1728          fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
1729          fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
1730          NULL)
1731
1732 static int lmv_early_cancel(struct obd_export *exp, struct lmv_tgt_desc *tgt,
1733                             struct md_op_data *op_data, int op_tgt,
1734                             enum ldlm_mode mode, int bits, int flag)
1735 {
1736         struct lu_fid     *fid = md_op_data_fid(op_data, flag);
1737         struct obd_device      *obd = exp->exp_obd;
1738         struct lmv_obd   *lmv = &obd->u.lmv;
1739         union ldlm_policy_data policy = { { 0 } };
1740         int                  rc = 0;
1741
1742         if (!fid_is_sane(fid))
1743                 return 0;
1744
1745         if (!tgt) {
1746                 tgt = lmv_find_target(lmv, fid);
1747                 if (IS_ERR(tgt))
1748                         return PTR_ERR(tgt);
1749         }
1750
1751         if (tgt->ltd_idx != op_tgt) {
1752                 CDEBUG(D_INODE, "EARLY_CANCEL on " DFID "\n", PFID(fid));
1753                 policy.l_inodebits.bits = bits;
1754                 rc = md_cancel_unused(tgt->ltd_exp, fid, &policy,
1755                                       mode, LCF_ASYNC, NULL);
1756         } else {
1757                 CDEBUG(D_INODE,
1758                        "EARLY_CANCEL skip operation target %d on " DFID "\n",
1759                        op_tgt, PFID(fid));
1760                 op_data->op_flags |= flag;
1761                 rc = 0;
1762         }
1763
1764         return rc;
1765 }
1766
1767 /*
1768  * llite passes fid of an target inode in op_data->op_fid1 and id of directory in
1769  * op_data->op_fid2
1770  */
1771 static int lmv_link(struct obd_export *exp, struct md_op_data *op_data,
1772                     struct ptlrpc_request **request)
1773 {
1774         struct obd_device       *obd = exp->exp_obd;
1775         struct lmv_obd    *lmv = &obd->u.lmv;
1776         struct lmv_tgt_desc     *tgt;
1777         int                   rc;
1778
1779         LASSERT(op_data->op_namelen != 0);
1780
1781         CDEBUG(D_INODE, "LINK " DFID ":%*s to " DFID "\n",
1782                PFID(&op_data->op_fid2), (int)op_data->op_namelen,
1783                op_data->op_name, PFID(&op_data->op_fid1));
1784
1785         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1786         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1787         op_data->op_cap = cfs_curproc_cap_pack();
1788         if (op_data->op_mea2) {
1789                 struct lmv_stripe_md *lsm = op_data->op_mea2;
1790                 const struct lmv_oinfo *oinfo;
1791
1792                 oinfo = lsm_name_to_stripe_info(lsm, op_data->op_name,
1793                                                 op_data->op_namelen);
1794                 if (IS_ERR(oinfo))
1795                         return PTR_ERR(oinfo);
1796
1797                 op_data->op_fid2 = oinfo->lmo_fid;
1798         }
1799
1800         tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
1801         if (IS_ERR(tgt))
1802                 return PTR_ERR(tgt);
1803
1804         /*
1805          * Cancel UPDATE lock on child (fid1).
1806          */
1807         op_data->op_flags |= MF_MDC_CANCEL_FID2;
1808         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
1809                               MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
1810         if (rc != 0)
1811                 return rc;
1812
1813         return md_link(tgt->ltd_exp, op_data, request);
1814 }
1815
1816 static int lmv_rename(struct obd_export *exp, struct md_op_data *op_data,
1817                       const char *old, size_t oldlen,
1818                       const char *new, size_t newlen,
1819                       struct ptlrpc_request **request)
1820 {
1821         struct obd_device       *obd = exp->exp_obd;
1822         struct lmv_obd    *lmv = &obd->u.lmv;
1823         struct obd_export *target_exp;
1824         struct lmv_tgt_desc     *src_tgt;
1825         struct lmv_tgt_desc *tgt_tgt;
1826         struct mdt_body *body;
1827         int                     rc;
1828
1829         LASSERT(oldlen != 0);
1830
1831         CDEBUG(D_INODE, "RENAME %.*s in " DFID ":%d to %.*s in " DFID ":%d\n",
1832                (int)oldlen, old, PFID(&op_data->op_fid1),
1833                op_data->op_mea1 ? op_data->op_mea1->lsm_md_stripe_count : 0,
1834                (int)newlen, new, PFID(&op_data->op_fid2),
1835                op_data->op_mea2 ? op_data->op_mea2->lsm_md_stripe_count : 0);
1836
1837         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
1838         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
1839         op_data->op_cap = cfs_curproc_cap_pack();
1840
1841         if (op_data->op_cli_flags & CLI_MIGRATE) {
1842                 LASSERTF(fid_is_sane(&op_data->op_fid3), "invalid FID " DFID "\n",
1843                          PFID(&op_data->op_fid3));
1844
1845                 if (op_data->op_mea1) {
1846                         struct lmv_stripe_md *lsm = op_data->op_mea1;
1847                         struct lmv_tgt_desc *tmp;
1848
1849                         /* Fix the parent fid for striped dir */
1850                         tmp = lmv_locate_target_for_name(lmv, lsm, old,
1851                                                          oldlen,
1852                                                          &op_data->op_fid1,
1853                                                          NULL);
1854                         if (IS_ERR(tmp))
1855                                 return PTR_ERR(tmp);
1856                 }
1857
1858                 rc = lmv_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
1859                 if (rc)
1860                         return rc;
1861                 src_tgt = lmv_find_target(lmv, &op_data->op_fid3);
1862                 if (IS_ERR(src_tgt))
1863                         return PTR_ERR(src_tgt);
1864
1865                 target_exp = src_tgt->ltd_exp;
1866         } else {
1867                 if (op_data->op_mea1) {
1868                         struct lmv_stripe_md *lsm = op_data->op_mea1;
1869
1870                         src_tgt = lmv_locate_target_for_name(lmv, lsm, old,
1871                                                              oldlen,
1872                                                              &op_data->op_fid1,
1873                                                              &op_data->op_mds);
1874                 } else {
1875                         src_tgt = lmv_find_target(lmv, &op_data->op_fid1);
1876                 }
1877                 if (IS_ERR(src_tgt))
1878                         return PTR_ERR(src_tgt);
1879
1880                 if (op_data->op_mea2) {
1881                         struct lmv_stripe_md *lsm = op_data->op_mea2;
1882
1883                         tgt_tgt = lmv_locate_target_for_name(lmv, lsm, new,
1884                                                              newlen,
1885                                                              &op_data->op_fid2,
1886                                                              &op_data->op_mds);
1887                 } else {
1888                         tgt_tgt = lmv_find_target(lmv, &op_data->op_fid2);
1889                 }
1890                 if (IS_ERR(tgt_tgt))
1891                         return PTR_ERR(tgt_tgt);
1892
1893                 target_exp = tgt_tgt->ltd_exp;
1894         }
1895
1896         /*
1897          * LOOKUP lock on src child (fid3) should also be cancelled for
1898          * src_tgt in mdc_rename.
1899          */
1900         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
1901
1902         /*
1903          * Cancel UPDATE locks on tgt parent (fid2), tgt_tgt is its
1904          * own target.
1905          */
1906         rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1907                               LCK_EX, MDS_INODELOCK_UPDATE,
1908                               MF_MDC_CANCEL_FID2);
1909         if (rc)
1910                 return rc;
1911         /*
1912          * Cancel LOOKUP locks on source child (fid3) for parent tgt_tgt.
1913          */
1914         if (fid_is_sane(&op_data->op_fid3)) {
1915                 struct lmv_tgt_desc *tgt;
1916
1917                 tgt = lmv_find_target(lmv, &op_data->op_fid1);
1918                 if (IS_ERR(tgt))
1919                         return PTR_ERR(tgt);
1920
1921                 /* Cancel LOOKUP lock on its parent */
1922                 rc = lmv_early_cancel(exp, tgt, op_data, src_tgt->ltd_idx,
1923                                       LCK_EX, MDS_INODELOCK_LOOKUP,
1924                                       MF_MDC_CANCEL_FID3);
1925                 if (rc)
1926                         return rc;
1927
1928                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1929                                       LCK_EX, MDS_INODELOCK_FULL,
1930                                       MF_MDC_CANCEL_FID3);
1931                 if (rc)
1932                         return rc;
1933         }
1934
1935 retry_rename:
1936         /*
1937          * Cancel all the locks on tgt child (fid4).
1938          */
1939         if (fid_is_sane(&op_data->op_fid4)) {
1940                 struct lmv_tgt_desc *tgt;
1941
1942                 rc = lmv_early_cancel(exp, NULL, op_data, src_tgt->ltd_idx,
1943                                       LCK_EX, MDS_INODELOCK_FULL,
1944                                       MF_MDC_CANCEL_FID4);
1945                 if (rc)
1946                         return rc;
1947
1948                 tgt = lmv_find_target(lmv, &op_data->op_fid4);
1949                 if (IS_ERR(tgt))
1950                         return PTR_ERR(tgt);
1951
1952                 /*
1953                  * Since the target child might be destroyed, and it might
1954                  * become orphan, and we can only check orphan on the local
1955                  * MDT right now, so we send rename request to the MDT where
1956                  * target child is located. If target child does not exist,
1957                  * then it will send the request to the target parent
1958                  */
1959                 target_exp = tgt->ltd_exp;
1960         }
1961
1962         rc = md_rename(target_exp, op_data, old, oldlen, new, newlen, request);
1963         if (rc && rc != -EREMOTE)
1964                 return rc;
1965
1966         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
1967         if (!body)
1968                 return -EPROTO;
1969
1970         /* Not cross-ref case, just get out of here. */
1971         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
1972                 return rc;
1973
1974         CDEBUG(D_INODE, "%s: try rename to another MDT for " DFID "\n",
1975                exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
1976
1977         op_data->op_fid4 = body->mbo_fid1;
1978         ptlrpc_req_finished(*request);
1979         *request = NULL;
1980         goto retry_rename;
1981 }
1982
1983 static int lmv_setattr(struct obd_export *exp, struct md_op_data *op_data,
1984                        void *ea, size_t ealen, struct ptlrpc_request **request)
1985 {
1986         struct obd_device       *obd = exp->exp_obd;
1987         struct lmv_obd    *lmv = &obd->u.lmv;
1988         struct lmv_tgt_desc     *tgt;
1989
1990         CDEBUG(D_INODE, "SETATTR for " DFID ", valid 0x%x\n",
1991                PFID(&op_data->op_fid1), op_data->op_attr.ia_valid);
1992
1993         op_data->op_flags |= MF_MDC_CANCEL_FID1;
1994         tgt = lmv_find_target(lmv, &op_data->op_fid1);
1995         if (IS_ERR(tgt))
1996                 return PTR_ERR(tgt);
1997
1998         return md_setattr(tgt->ltd_exp, op_data, ea, ealen, request);
1999 }
2000
2001 static int lmv_sync(struct obd_export *exp, const struct lu_fid *fid,
2002                     struct ptlrpc_request **request)
2003 {
2004         struct obd_device        *obd = exp->exp_obd;
2005         struct lmv_obd      *lmv = &obd->u.lmv;
2006         struct lmv_tgt_desc       *tgt;
2007
2008         tgt = lmv_find_target(lmv, fid);
2009         if (IS_ERR(tgt))
2010                 return PTR_ERR(tgt);
2011
2012         return md_sync(tgt->ltd_exp, fid, request);
2013 }
2014
2015 /**
2016  * Get current minimum entry from striped directory
2017  *
2018  * This function will search the dir entry, whose hash value is the
2019  * closest(>=) to @hash_offset, from all of sub-stripes, and it is
2020  * only being called for striped directory.
2021  *
2022  * \param[in] exp               export of LMV
2023  * \param[in] op_data           parameters transferred beween client MD stack
2024  *                              stripe_information will be included in this
2025  *                              parameter
2026  * \param[in] cb_op             ldlm callback being used in enqueue in
2027  *                              mdc_read_page
2028  * \param[in] hash_offset       the hash value, which is used to locate
2029  *                              minum(closet) dir entry
2030  * \param[in|out] stripe_offset the caller use this to indicate the stripe
2031  *                              index of last entry, so to avoid hash conflict
2032  *                              between stripes. It will also be used to
2033  *                              return the stripe index of current dir entry.
2034  * \param[in|out] entp          the minum entry and it also is being used
2035  *                              to input the last dir entry to resolve the
2036  *                              hash conflict
2037  *
2038  * \param[out] ppage            the page which holds the minum entry
2039  *
2040  * \retval                      = 0 get the entry successfully
2041  *                              negative errno (< 0) does not get the entry
2042  */
2043 static int lmv_get_min_striped_entry(struct obd_export *exp,
2044                                      struct md_op_data *op_data,
2045                                      struct md_callback *cb_op,
2046                                      __u64 hash_offset, int *stripe_offset,
2047                                      struct lu_dirent **entp,
2048                                      struct page **ppage)
2049 {
2050         struct lmv_stripe_md *lsm = op_data->op_mea1;
2051         struct obd_device *obd = exp->exp_obd;
2052         struct lmv_obd *lmv = &obd->u.lmv;
2053         struct lu_dirent *min_ent = NULL;
2054         struct page *min_page = NULL;
2055         struct lmv_tgt_desc *tgt;
2056         int stripe_count;
2057         int min_idx = 0;
2058         int rc = 0;
2059         int i;
2060
2061         stripe_count = lsm->lsm_md_stripe_count;
2062         for (i = 0; i < stripe_count; i++) {
2063                 __u64 stripe_hash = hash_offset;
2064                 struct lu_dirent *ent = NULL;
2065                 struct page *page = NULL;
2066                 struct lu_dirpage *dp;
2067
2068                 tgt = lmv_get_target(lmv, lsm->lsm_md_oinfo[i].lmo_mds, NULL);
2069                 if (IS_ERR(tgt)) {
2070                         rc = PTR_ERR(tgt);
2071                         goto out;
2072                 }
2073
2074                 /*
2075                  * op_data will be shared by each stripe, so we need
2076                  * reset these value for each stripe
2077                  */
2078                 op_data->op_fid1 = lsm->lsm_md_oinfo[i].lmo_fid;
2079                 op_data->op_fid2 = lsm->lsm_md_oinfo[i].lmo_fid;
2080                 op_data->op_data = lsm->lsm_md_oinfo[i].lmo_root;
2081 next:
2082                 rc = md_read_page(tgt->ltd_exp, op_data, cb_op, stripe_hash,
2083                                   &page);
2084                 if (rc)
2085                         goto out;
2086
2087                 dp = page_address(page);
2088                 for (ent = lu_dirent_start(dp); ent;
2089                      ent = lu_dirent_next(ent)) {
2090                         /* Skip dummy entry */
2091                         if (!le16_to_cpu(ent->lde_namelen))
2092                                 continue;
2093
2094                         if (le64_to_cpu(ent->lde_hash) < hash_offset)
2095                                 continue;
2096
2097                         if (le64_to_cpu(ent->lde_hash) == hash_offset &&
2098                             (*entp == ent || i < *stripe_offset))
2099                                 continue;
2100
2101                         /* skip . and .. for other stripes */
2102                         if (i && (!strncmp(ent->lde_name, ".",
2103                                            le16_to_cpu(ent->lde_namelen)) ||
2104                                   !strncmp(ent->lde_name, "..",
2105                                            le16_to_cpu(ent->lde_namelen))))
2106                                 continue;
2107                         break;
2108                 }
2109
2110                 if (!ent) {
2111                         stripe_hash = le64_to_cpu(dp->ldp_hash_end);
2112
2113                         kunmap(page);
2114                         put_page(page);
2115                         page = NULL;
2116
2117                         /*
2118                          * reach the end of current stripe, go to next stripe
2119                          */
2120                         if (stripe_hash == MDS_DIR_END_OFF)
2121                                 continue;
2122                         else
2123                                 goto next;
2124                 }
2125
2126                 if (min_ent) {
2127                         if (le64_to_cpu(min_ent->lde_hash) >
2128                             le64_to_cpu(ent->lde_hash)) {
2129                                 min_ent = ent;
2130                                 kunmap(min_page);
2131                                 put_page(min_page);
2132                                 min_idx = i;
2133                                 min_page = page;
2134                         } else {
2135                                 kunmap(page);
2136                                 put_page(page);
2137                                 page = NULL;
2138                         }
2139                 } else {
2140                         min_ent = ent;
2141                         min_page = page;
2142                         min_idx = i;
2143                 }
2144         }
2145
2146 out:
2147         if (*ppage) {
2148                 kunmap(*ppage);
2149                 put_page(*ppage);
2150         }
2151         *stripe_offset = min_idx;
2152         *entp = min_ent;
2153         *ppage = min_page;
2154         return rc;
2155 }
2156
2157 /**
2158  * Build dir entry page from a striped directory
2159  *
2160  * This function gets one entry by @offset from a striped directory. It will
2161  * read entries from all of stripes, and choose one closest to the required
2162  * offset(&offset). A few notes
2163  * 1. skip . and .. for non-zero stripes, because there can only have one .
2164  * and .. in a directory.
2165  * 2. op_data will be shared by all of stripes, instead of allocating new
2166  * one, so need to restore before reusing.
2167  * 3. release the entry page if that is not being chosen.
2168  *
2169  * \param[in] exp       obd export refer to LMV
2170  * \param[in] op_data   hold those MD parameters of read_entry
2171  * \param[in] cb_op     ldlm callback being used in enqueue in mdc_read_entry
2172  * \param[out] ldp      the entry being read
2173  * \param[out] ppage    the page holding the entry. Note: because the entry
2174  *                      will be accessed in upper layer, so we need hold the
2175  *                      page until the usages of entry is finished, see
2176  *                      ll_dir_entry_next.
2177  *
2178  * retval               =0 if get entry successfully
2179  *                      <0 cannot get entry
2180  */
2181 static int lmv_read_striped_page(struct obd_export *exp,
2182                                  struct md_op_data *op_data,
2183                                  struct md_callback *cb_op,
2184                                  __u64 offset, struct page **ppage)
2185 {
2186         struct inode *master_inode = op_data->op_data;
2187         struct lu_fid master_fid = op_data->op_fid1;
2188         __u64 hash_offset = offset;
2189         __u32 ldp_flags;
2190         struct page *min_ent_page = NULL;
2191         struct page *ent_page = NULL;
2192         struct lu_dirent *min_ent = NULL;
2193         struct lu_dirent *last_ent;
2194         struct lu_dirent *ent;
2195         struct lu_dirpage *dp;
2196         size_t left_bytes;
2197         int ent_idx = 0;
2198         void *area;
2199         int rc;
2200
2201         /*
2202          * Allocate a page and read entries from all of stripes and fill
2203          * the page by hash order
2204          */
2205         ent_page = alloc_page(GFP_KERNEL);
2206         if (!ent_page)
2207                 return -ENOMEM;
2208
2209         /* Initialize the entry page */
2210         dp = kmap(ent_page);
2211         memset(dp, 0, sizeof(*dp));
2212         dp->ldp_hash_start = cpu_to_le64(offset);
2213         ldp_flags = LDF_COLLIDE;
2214
2215         area = dp + 1;
2216         left_bytes = PAGE_SIZE - sizeof(*dp);
2217         ent = area;
2218         last_ent = ent;
2219         do {
2220                 __u16 ent_size;
2221
2222                 /* Find the minum entry from all sub-stripes */
2223                 rc = lmv_get_min_striped_entry(exp, op_data, cb_op, hash_offset,
2224                                                &ent_idx, &min_ent,
2225                                                &min_ent_page);
2226                 if (rc)
2227                         goto out;
2228
2229                 /*
2230                  * If it can not get minum entry, it means it already reaches
2231                  * the end of this directory
2232                  */
2233                 if (!min_ent) {
2234                         last_ent->lde_reclen = 0;
2235                         hash_offset = MDS_DIR_END_OFF;
2236                         goto out;
2237                 }
2238
2239                 ent_size = le16_to_cpu(min_ent->lde_reclen);
2240
2241                 /*
2242                  * the last entry lde_reclen is 0, but it might not
2243                  * the end of this entry of this temporay entry
2244                  */
2245                 if (!ent_size)
2246                         ent_size = lu_dirent_calc_size(
2247                                         le16_to_cpu(min_ent->lde_namelen),
2248                                         le32_to_cpu(min_ent->lde_attrs));
2249                 if (ent_size > left_bytes) {
2250                         last_ent->lde_reclen = cpu_to_le16(0);
2251                         hash_offset = le64_to_cpu(min_ent->lde_hash);
2252                         goto out;
2253                 }
2254
2255                 memcpy(ent, min_ent, ent_size);
2256
2257                 /*
2258                  * Replace . with master FID and Replace .. with the parent FID
2259                  * of master object
2260                  */
2261                 if (!strncmp(ent->lde_name, ".",
2262                              le16_to_cpu(ent->lde_namelen)) &&
2263                     le16_to_cpu(ent->lde_namelen) == 1)
2264                         fid_cpu_to_le(&ent->lde_fid, &master_fid);
2265                 else if (!strncmp(ent->lde_name, "..",
2266                                   le16_to_cpu(ent->lde_namelen)) &&
2267                          le16_to_cpu(ent->lde_namelen) == 2)
2268                         fid_cpu_to_le(&ent->lde_fid, &op_data->op_fid3);
2269
2270                 left_bytes -= ent_size;
2271                 ent->lde_reclen = cpu_to_le16(ent_size);
2272                 last_ent = ent;
2273                 ent = (void *)ent + ent_size;
2274                 hash_offset = le64_to_cpu(min_ent->lde_hash);
2275                 if (hash_offset == MDS_DIR_END_OFF) {
2276                         last_ent->lde_reclen = 0;
2277                         break;
2278                 }
2279         } while (1);
2280 out:
2281         if (min_ent_page) {
2282                 kunmap(min_ent_page);
2283                 put_page(min_ent_page);
2284         }
2285
2286         if (unlikely(rc)) {
2287                 __free_page(ent_page);
2288                 ent_page = NULL;
2289         } else {
2290                 if (ent == area)
2291                         ldp_flags |= LDF_EMPTY;
2292                 dp->ldp_flags |= cpu_to_le32(ldp_flags);
2293                 dp->ldp_hash_end = cpu_to_le64(hash_offset);
2294         }
2295
2296         /*
2297          * We do not want to allocate md_op_data during each
2298          * dir entry reading, so op_data will be shared by every stripe,
2299          * then we need to restore it back to original value before
2300          * return to the upper layer
2301          */
2302         op_data->op_fid1 = master_fid;
2303         op_data->op_fid2 = master_fid;
2304         op_data->op_data = master_inode;
2305
2306         *ppage = ent_page;
2307
2308         return rc;
2309 }
2310
2311 static int lmv_read_page(struct obd_export *exp, struct md_op_data *op_data,
2312                          struct md_callback *cb_op, __u64 offset,
2313                          struct page **ppage)
2314 {
2315         struct lmv_stripe_md *lsm = op_data->op_mea1;
2316         struct obd_device *obd = exp->exp_obd;
2317         struct lmv_obd *lmv = &obd->u.lmv;
2318         struct lmv_tgt_desc *tgt;
2319
2320         if (unlikely(lsm))
2321                 return lmv_read_striped_page(exp, op_data, cb_op, offset, ppage);
2322
2323         tgt = lmv_find_target(lmv, &op_data->op_fid1);
2324         if (IS_ERR(tgt))
2325                 return PTR_ERR(tgt);
2326
2327         return md_read_page(tgt->ltd_exp, op_data, cb_op, offset, ppage);
2328 }
2329
2330 /**
2331  * Unlink a file/directory
2332  *
2333  * Unlink a file or directory under the parent dir. The unlink request
2334  * usually will be sent to the MDT where the child is located, but if
2335  * the client does not have the child FID then request will be sent to the
2336  * MDT where the parent is located.
2337  *
2338  * If the parent is a striped directory then it also needs to locate which
2339  * stripe the name of the child is located, and replace the parent FID
2340  * (@op->op_fid1) with the stripe FID. Note: if the stripe is unknown,
2341  * it will walk through all of sub-stripes until the child is being
2342  * unlinked finally.
2343  *
2344  * \param[in] exp       export refer to LMV
2345  * \param[in] op_data   different parameters transferred beween client
2346  *                      MD stacks, name, namelen, FIDs etc.
2347  *                      op_fid1 is the parent FID, op_fid2 is the child
2348  *                      FID.
2349  * \param[out] request point to the request of unlink.
2350  *
2351  * retval               0 if succeed
2352  *                      negative errno if failed.
2353  */
2354 static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
2355                       struct ptlrpc_request **request)
2356 {
2357         struct lmv_stripe_md *lsm = op_data->op_mea1;
2358         struct obd_device    *obd = exp->exp_obd;
2359         struct lmv_obd    *lmv = &obd->u.lmv;
2360         struct lmv_tgt_desc *parent_tgt = NULL;
2361         struct lmv_tgt_desc     *tgt = NULL;
2362         struct mdt_body         *body;
2363         int stripe_index = 0;
2364         int                  rc;
2365
2366 retry_unlink:
2367         /* For striped dir, we need to locate the parent as well */
2368         if (lsm) {
2369                 struct lmv_tgt_desc *tmp;
2370
2371                 LASSERT(op_data->op_name && op_data->op_namelen);
2372
2373                 tmp = lmv_locate_target_for_name(lmv, lsm,
2374                                                  op_data->op_name,
2375                                                  op_data->op_namelen,
2376                                                  &op_data->op_fid1,
2377                                                  &op_data->op_mds);
2378
2379                 /*
2380                  * return -EBADFD means unknown hash type, might
2381                  * need try all sub-stripe here
2382                  */
2383                 if (IS_ERR(tmp) && PTR_ERR(tmp) != -EBADFD)
2384                         return PTR_ERR(tmp);
2385
2386                 /*
2387                  * Note: both migrating dir and unknown hash dir need to
2388                  * try all of sub-stripes, so we need start search the
2389                  * name from stripe 0, but migrating dir is already handled
2390                  * inside lmv_locate_target_for_name(), so we only check
2391                  * unknown hash type directory here
2392                  */
2393                 if (!lmv_is_known_hash_type(lsm->lsm_md_hash_type)) {
2394                         struct lmv_oinfo *oinfo;
2395
2396                         oinfo = &lsm->lsm_md_oinfo[stripe_index];
2397
2398                         op_data->op_fid1 = oinfo->lmo_fid;
2399                         op_data->op_mds = oinfo->lmo_mds;
2400                 }
2401         }
2402
2403 try_next_stripe:
2404         /* Send unlink requests to the MDT where the child is located */
2405         if (likely(!fid_is_zero(&op_data->op_fid2)))
2406                 tgt = lmv_find_target(lmv, &op_data->op_fid2);
2407         else if (lsm)
2408                 tgt = lmv_get_target(lmv, op_data->op_mds, NULL);
2409         else
2410                 tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2411
2412         if (IS_ERR(tgt))
2413                 return PTR_ERR(tgt);
2414
2415         op_data->op_fsuid = from_kuid(&init_user_ns, current_fsuid());
2416         op_data->op_fsgid = from_kgid(&init_user_ns, current_fsgid());
2417         op_data->op_cap = cfs_curproc_cap_pack();
2418
2419         /*
2420          * If child's fid is given, cancel unused locks for it if it is from
2421          * another export than parent.
2422          *
2423          * LOOKUP lock for child (fid3) should also be cancelled on parent
2424          * tgt_tgt in mdc_unlink().
2425          */
2426         op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
2427
2428         /*
2429          * Cancel FULL locks on child (fid3).
2430          */
2431         parent_tgt = lmv_find_target(lmv, &op_data->op_fid1);
2432         if (IS_ERR(parent_tgt))
2433                 return PTR_ERR(parent_tgt);
2434
2435         if (parent_tgt != tgt) {
2436                 rc = lmv_early_cancel(exp, parent_tgt, op_data, tgt->ltd_idx,
2437                                       LCK_EX, MDS_INODELOCK_LOOKUP,
2438                                       MF_MDC_CANCEL_FID3);
2439         }
2440
2441         rc = lmv_early_cancel(exp, NULL, op_data, tgt->ltd_idx, LCK_EX,
2442                               MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
2443         if (rc != 0)
2444                 return rc;
2445
2446         CDEBUG(D_INODE, "unlink with fid=" DFID "/" DFID " -> mds #%u\n",
2447                PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
2448
2449         rc = md_unlink(tgt->ltd_exp, op_data, request);
2450         if (rc != 0 && rc != -EREMOTE  && rc != -ENOENT)
2451                 return rc;
2452
2453         /* Try next stripe if it is needed. */
2454         if (rc == -ENOENT && lsm && lmv_need_try_all_stripes(lsm)) {
2455                 struct lmv_oinfo *oinfo;
2456
2457                 stripe_index++;
2458                 if (stripe_index >= lsm->lsm_md_stripe_count)
2459                         return rc;
2460
2461                 oinfo = &lsm->lsm_md_oinfo[stripe_index];
2462
2463                 op_data->op_fid1 = oinfo->lmo_fid;
2464                 op_data->op_mds = oinfo->lmo_mds;
2465
2466                 ptlrpc_req_finished(*request);
2467                 *request = NULL;
2468
2469                 goto try_next_stripe;
2470         }
2471
2472         body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
2473         if (!body)
2474                 return -EPROTO;
2475
2476         /* Not cross-ref case, just get out of here. */
2477         if (likely(!(body->mbo_valid & OBD_MD_MDS)))
2478                 return rc;
2479
2480         CDEBUG(D_INODE, "%s: try unlink to another MDT for " DFID "\n",
2481                exp->exp_obd->obd_name, PFID(&body->mbo_fid1));
2482
2483         /* This is a remote object, try remote MDT, Note: it may
2484          * try more than 1 time here, Considering following case
2485          * /mnt/lustre is root on MDT0, remote1 is on MDT1
2486          * 1. Initially A does not know where remote1 is, it send
2487          *    unlink RPC to MDT0, MDT0 return -EREMOTE, it will
2488          *    resend unlink RPC to MDT1 (retry 1st time).
2489          *
2490          * 2. During the unlink RPC in flight,
2491          *    client B mv /mnt/lustre/remote1 /mnt/lustre/remote2
2492          *    and create new remote1, but on MDT0
2493          *
2494          * 3. MDT1 get unlink RPC(from A), then do remote lock on
2495          *    /mnt/lustre, then lookup get fid of remote1, and find
2496          *    it is remote dir again, and replay -EREMOTE again.
2497          *
2498          * 4. Then A will resend unlink RPC to MDT0. (retry 2nd times).
2499          *
2500          * In theory, it might try unlimited time here, but it should
2501          * be very rare case.
2502          */
2503         op_data->op_fid2 = body->mbo_fid1;
2504         ptlrpc_req_finished(*request);
2505         *request = NULL;
2506
2507         goto retry_unlink;
2508 }
2509
2510 static int lmv_precleanup(struct obd_device *obd)
2511 {
2512         fld_client_debugfs_fini(&obd->u.lmv.lmv_fld);
2513         lprocfs_obd_cleanup(obd);
2514         return 0;
2515 }
2516
2517 /**
2518  * Get by key a value associated with a LMV device.
2519  *
2520  * Dispatch request to lower-layer devices as needed.
2521  *
2522  * \param[in]  env      execution environment for this thread
2523  * \param[in]  exp      export for the LMV device
2524  * \param[in]  keylen   length of key identifier
2525  * \param[in]  key      identifier of key to get value for
2526  * \param[in]  vallen   size of \a val
2527  * \param[out] val      pointer to storage location for value
2528  *
2529  * \retval 0            on success
2530  * \retval negative     negated errno on failure
2531  */
2532 static int lmv_get_info(const struct lu_env *env, struct obd_export *exp,
2533                         __u32 keylen, void *key, __u32 *vallen, void *val)
2534 {
2535         struct obd_device       *obd;
2536         struct lmv_obd    *lmv;
2537         int                   rc = 0;
2538
2539         obd = class_exp2obd(exp);
2540         if (!obd) {
2541                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2542                        exp->exp_handle.h_cookie);
2543                 return -EINVAL;
2544         }
2545
2546         lmv = &obd->u.lmv;
2547         if (keylen >= strlen("remote_flag") && !strcmp(key, "remote_flag")) {
2548                 int i;
2549
2550                 LASSERT(*vallen == sizeof(__u32));
2551                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2552                         struct lmv_tgt_desc *tgt = lmv->tgts[i];
2553
2554                         /*
2555                          * All tgts should be connected when this gets called.
2556                          */
2557                         if (!tgt || !tgt->ltd_exp)
2558                                 continue;
2559
2560                         if (!obd_get_info(env, tgt->ltd_exp, keylen, key,
2561                                           vallen, val))
2562                                 return 0;
2563                 }
2564                 return -EINVAL;
2565         } else if (KEY_IS(KEY_MAX_EASIZE) ||
2566                    KEY_IS(KEY_DEFAULT_EASIZE) ||
2567                    KEY_IS(KEY_CONN_DATA)) {
2568                 /*
2569                  * Forwarding this request to first MDS, it should know LOV
2570                  * desc.
2571                  */
2572                 rc = obd_get_info(env, lmv->tgts[0]->ltd_exp, keylen, key,
2573                                   vallen, val);
2574                 if (!rc && KEY_IS(KEY_CONN_DATA))
2575                         exp->exp_connect_data = *(struct obd_connect_data *)val;
2576                 return rc;
2577         } else if (KEY_IS(KEY_TGT_COUNT)) {
2578                 *((int *)val) = lmv->desc.ld_tgt_count;
2579                 return 0;
2580         }
2581
2582         CDEBUG(D_IOCTL, "Invalid key\n");
2583         return -EINVAL;
2584 }
2585
2586 /**
2587  * Asynchronously set by key a value associated with a LMV device.
2588  *
2589  * Dispatch request to lower-layer devices as needed.
2590  *
2591  * \param[in] env       execution environment for this thread
2592  * \param[in] exp       export for the LMV device
2593  * \param[in] keylen    length of key identifier
2594  * \param[in] key       identifier of key to store value for
2595  * \param[in] vallen    size of value to store
2596  * \param[in] val       pointer to data to be stored
2597  * \param[in] set       optional list of related ptlrpc requests
2598  *
2599  * \retval 0            on success
2600  * \retval negative     negated errno on failure
2601  */
2602 static int lmv_set_info_async(const struct lu_env *env, struct obd_export *exp,
2603                               u32 keylen, void *key, u32 vallen,
2604                               void *val, struct ptlrpc_request_set *set)
2605 {
2606         struct lmv_tgt_desc    *tgt;
2607         struct obd_device      *obd;
2608         struct lmv_obd   *lmv;
2609         int rc = 0;
2610
2611         obd = class_exp2obd(exp);
2612         if (!obd) {
2613                 CDEBUG(D_IOCTL, "Invalid client cookie %#llx\n",
2614                        exp->exp_handle.h_cookie);
2615                 return -EINVAL;
2616         }
2617         lmv = &obd->u.lmv;
2618
2619         if (KEY_IS(KEY_READ_ONLY) || KEY_IS(KEY_FLUSH_CTX) ||
2620             KEY_IS(KEY_DEFAULT_EASIZE)) {
2621                 int i, err = 0;
2622
2623                 for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2624                         tgt = lmv->tgts[i];
2625
2626                         if (!tgt || !tgt->ltd_exp)
2627                                 continue;
2628
2629                         err = obd_set_info_async(env, tgt->ltd_exp,
2630                                                  keylen, key, vallen, val, set);
2631                         if (err && rc == 0)
2632                                 rc = err;
2633                 }
2634
2635                 return rc;
2636         }
2637
2638         return -EINVAL;
2639 }
2640
2641 static int lmv_unpack_md_v1(struct obd_export *exp, struct lmv_stripe_md *lsm,
2642                             const struct lmv_mds_md_v1 *lmm1)
2643 {
2644         struct lmv_obd *lmv = &exp->exp_obd->u.lmv;
2645         int stripe_count;
2646         int rc = 0;
2647         int cplen;
2648         int i;
2649
2650         lsm->lsm_md_magic = le32_to_cpu(lmm1->lmv_magic);
2651         lsm->lsm_md_stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2652         lsm->lsm_md_master_mdt_index = le32_to_cpu(lmm1->lmv_master_mdt_index);
2653         if (OBD_FAIL_CHECK(OBD_FAIL_UNKNOWN_LMV_STRIPE))
2654                 lsm->lsm_md_hash_type = LMV_HASH_TYPE_UNKNOWN;
2655         else
2656                 lsm->lsm_md_hash_type = le32_to_cpu(lmm1->lmv_hash_type);
2657         lsm->lsm_md_layout_version = le32_to_cpu(lmm1->lmv_layout_version);
2658         cplen = strlcpy(lsm->lsm_md_pool_name, lmm1->lmv_pool_name,
2659                         sizeof(lsm->lsm_md_pool_name));
2660
2661         if (cplen >= sizeof(lsm->lsm_md_pool_name))
2662                 return -E2BIG;
2663
2664         CDEBUG(D_INFO, "unpack lsm count %d, master %d hash_type %d layout_version %d\n",
2665                lsm->lsm_md_stripe_count, lsm->lsm_md_master_mdt_index,
2666                lsm->lsm_md_hash_type, lsm->lsm_md_layout_version);
2667
2668         stripe_count = le32_to_cpu(lmm1->lmv_stripe_count);
2669         for (i = 0; i < stripe_count; i++) {
2670                 fid_le_to_cpu(&lsm->lsm_md_oinfo[i].lmo_fid,
2671                               &lmm1->lmv_stripe_fids[i]);
2672                 rc = lmv_fld_lookup(lmv, &lsm->lsm_md_oinfo[i].lmo_fid,
2673                                     &lsm->lsm_md_oinfo[i].lmo_mds);
2674                 if (rc)
2675                         return rc;
2676                 CDEBUG(D_INFO, "unpack fid #%d " DFID "\n", i,
2677                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid));
2678         }
2679
2680         return rc;
2681 }
2682
2683 static int lmv_unpackmd(struct obd_export *exp, struct lmv_stripe_md **lsmp,
2684                         const union lmv_mds_md *lmm, size_t lmm_size)
2685 {
2686         struct lmv_stripe_md *lsm;
2687         bool allocated = false;
2688         int lsm_size, rc;
2689
2690         LASSERT(lsmp);
2691
2692         lsm = *lsmp;
2693         /* Free memmd */
2694         if (lsm && !lmm) {
2695                 int i;
2696
2697                 for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
2698                         /*
2699                          * For migrating inode, the master stripe and master
2700                          * object will be the same, so do not need iput, see
2701                          * ll_update_lsm_md
2702                          */
2703                         if (!(lsm->lsm_md_hash_type & LMV_HASH_FLAG_MIGRATION &&
2704                               !i) && lsm->lsm_md_oinfo[i].lmo_root)
2705                                 iput(lsm->lsm_md_oinfo[i].lmo_root);
2706                 }
2707
2708                 kvfree(lsm);
2709                 *lsmp = NULL;
2710                 return 0;
2711         }
2712
2713         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_STRIPE)
2714                 return -EPERM;
2715
2716         /* Unpack memmd */
2717         if (le32_to_cpu(lmm->lmv_magic) != LMV_MAGIC_V1 &&
2718             le32_to_cpu(lmm->lmv_magic) != LMV_USER_MAGIC) {
2719                 CERROR("%s: invalid lmv magic %x: rc = %d\n",
2720                        exp->exp_obd->obd_name, le32_to_cpu(lmm->lmv_magic),
2721                        -EIO);
2722                 return -EIO;
2723         }
2724
2725         if (le32_to_cpu(lmm->lmv_magic) == LMV_MAGIC_V1)
2726                 lsm_size = lmv_stripe_md_size(lmv_mds_md_stripe_count_get(lmm));
2727         else
2728                 /**
2729                  * Unpack default dirstripe(lmv_user_md) to lmv_stripe_md,
2730                  * stripecount should be 0 then.
2731                  */
2732                 lsm_size = lmv_stripe_md_size(0);
2733
2734         if (!lsm) {
2735                 lsm = libcfs_kvzalloc(lsm_size, GFP_NOFS);
2736                 if (!lsm)
2737                         return -ENOMEM;
2738                 allocated = true;
2739                 *lsmp = lsm;
2740         }
2741
2742         switch (le32_to_cpu(lmm->lmv_magic)) {
2743         case LMV_MAGIC_V1:
2744                 rc = lmv_unpack_md_v1(exp, lsm, &lmm->lmv_md_v1);
2745                 break;
2746         default:
2747                 CERROR("%s: unrecognized magic %x\n", exp->exp_obd->obd_name,
2748                        le32_to_cpu(lmm->lmv_magic));
2749                 rc = -EINVAL;
2750                 break;
2751         }
2752
2753         if (rc && allocated) {
2754                 kvfree(lsm);
2755                 *lsmp = NULL;
2756                 lsm_size = rc;
2757         }
2758         return lsm_size;
2759 }
2760
2761 void lmv_free_memmd(struct lmv_stripe_md *lsm)
2762 {
2763         lmv_unpackmd(NULL, &lsm, NULL, 0);
2764 }
2765 EXPORT_SYMBOL(lmv_free_memmd);
2766
2767 static int lmv_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
2768                              union ldlm_policy_data *policy,
2769                              enum ldlm_mode mode, enum ldlm_cancel_flags flags,
2770                              void *opaque)
2771 {
2772         struct obd_device       *obd = exp->exp_obd;
2773         struct lmv_obd    *lmv = &obd->u.lmv;
2774         int                   rc = 0;
2775         int                   err;
2776         u32 i;
2777
2778         LASSERT(fid);
2779
2780         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2781                 struct lmv_tgt_desc *tgt = lmv->tgts[i];
2782
2783                 if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
2784                         continue;
2785
2786                 err = md_cancel_unused(tgt->ltd_exp, fid, policy, mode, flags,
2787                                        opaque);
2788                 if (!rc)
2789                         rc = err;
2790         }
2791         return rc;
2792 }
2793
2794 static int lmv_set_lock_data(struct obd_export *exp,
2795                              const struct lustre_handle *lockh,
2796                              void *data, __u64 *bits)
2797 {
2798         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2799         struct lmv_tgt_desc *tgt = lmv->tgts[0];
2800
2801         if (!tgt || !tgt->ltd_exp)
2802                 return -EINVAL;
2803
2804         return md_set_lock_data(tgt->ltd_exp, lockh, data, bits);
2805 }
2806
2807 static enum ldlm_mode lmv_lock_match(struct obd_export *exp, __u64 flags,
2808                                      const struct lu_fid *fid,
2809                                      enum ldlm_type type,
2810                                      union ldlm_policy_data *policy,
2811                                      enum ldlm_mode mode,
2812                                      struct lustre_handle *lockh)
2813 {
2814         struct obd_device       *obd = exp->exp_obd;
2815         struct lmv_obd    *lmv = &obd->u.lmv;
2816         enum ldlm_mode        rc;
2817         int tgt;
2818         u32 i;
2819
2820         CDEBUG(D_INODE, "Lock match for " DFID "\n", PFID(fid));
2821
2822         /*
2823          * With DNE every object can have two locks in different namespaces:
2824          * lookup lock in space of MDT storing direntry and update/open lock in
2825          * space of MDT storing inode.  Try the MDT that the FID maps to first,
2826          * since this can be easily found, and only try others if that fails.
2827          */
2828         for (i = 0, tgt = lmv_find_target_index(lmv, fid);
2829              i < lmv->desc.ld_tgt_count;
2830              i++, tgt = (tgt + 1) % lmv->desc.ld_tgt_count) {
2831                 if (tgt < 0) {
2832                         CDEBUG(D_HA, "%s: " DFID " is inaccessible: rc = %d\n",
2833                                obd->obd_name, PFID(fid), tgt);
2834                         tgt = 0;
2835                 }
2836
2837                 if (!lmv->tgts[tgt] || !lmv->tgts[tgt]->ltd_exp ||
2838                     !lmv->tgts[tgt]->ltd_active)
2839                         continue;
2840
2841                 rc = md_lock_match(lmv->tgts[tgt]->ltd_exp, flags, fid,
2842                                    type, policy, mode, lockh);
2843                 if (rc)
2844                         return rc;
2845         }
2846
2847         return 0;
2848 }
2849
2850 static int lmv_get_lustre_md(struct obd_export *exp,
2851                              struct ptlrpc_request *req,
2852                              struct obd_export *dt_exp,
2853                              struct obd_export *md_exp,
2854                              struct lustre_md *md)
2855 {
2856         struct lmv_obd    *lmv = &exp->exp_obd->u.lmv;
2857         struct lmv_tgt_desc *tgt = lmv->tgts[0];
2858
2859         if (!tgt || !tgt->ltd_exp)
2860                 return -EINVAL;
2861         return md_get_lustre_md(tgt->ltd_exp, req, dt_exp, md_exp, md);
2862 }
2863
2864 static int lmv_free_lustre_md(struct obd_export *exp, struct lustre_md *md)
2865 {
2866         struct obd_device       *obd = exp->exp_obd;
2867         struct lmv_obd    *lmv = &obd->u.lmv;
2868         struct lmv_tgt_desc *tgt = lmv->tgts[0];
2869
2870         if (md->lmv) {
2871                 lmv_free_memmd(md->lmv);
2872                 md->lmv = NULL;
2873         }
2874         if (!tgt || !tgt->ltd_exp)
2875                 return -EINVAL;
2876         return md_free_lustre_md(tgt->ltd_exp, md);
2877 }
2878
2879 static int lmv_set_open_replay_data(struct obd_export *exp,
2880                                     struct obd_client_handle *och,
2881                                     struct lookup_intent *it)
2882 {
2883         struct obd_device       *obd = exp->exp_obd;
2884         struct lmv_obd    *lmv = &obd->u.lmv;
2885         struct lmv_tgt_desc     *tgt;
2886
2887         tgt = lmv_find_target(lmv, &och->och_fid);
2888         if (IS_ERR(tgt))
2889                 return PTR_ERR(tgt);
2890
2891         return md_set_open_replay_data(tgt->ltd_exp, och, it);
2892 }
2893
2894 static int lmv_clear_open_replay_data(struct obd_export *exp,
2895                                       struct obd_client_handle *och)
2896 {
2897         struct obd_device       *obd = exp->exp_obd;
2898         struct lmv_obd    *lmv = &obd->u.lmv;
2899         struct lmv_tgt_desc     *tgt;
2900
2901         tgt = lmv_find_target(lmv, &och->och_fid);
2902         if (IS_ERR(tgt))
2903                 return PTR_ERR(tgt);
2904
2905         return md_clear_open_replay_data(tgt->ltd_exp, och);
2906 }
2907
2908 static int lmv_intent_getattr_async(struct obd_export *exp,
2909                                     struct md_enqueue_info *minfo)
2910 {
2911         struct md_op_data       *op_data = &minfo->mi_data;
2912         struct obd_device       *obd = exp->exp_obd;
2913         struct lmv_obd    *lmv = &obd->u.lmv;
2914         struct lmv_tgt_desc *ptgt = NULL;
2915         struct lmv_tgt_desc *ctgt = NULL;
2916
2917         if (!fid_is_sane(&op_data->op_fid2))
2918                 return -EINVAL;
2919
2920         ptgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
2921         if (IS_ERR(ptgt))
2922                 return PTR_ERR(ptgt);
2923
2924         ctgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
2925         if (IS_ERR(ctgt))
2926                 return PTR_ERR(ctgt);
2927
2928         /*
2929          * if child is on remote MDT, we need 2 async RPCs to fetch both LOOKUP
2930          * lock on parent, and UPDATE lock on child MDT, which makes all
2931          * complicated. Considering remote dir is rare case, and not supporting
2932          * it in statahead won't cause any issue, drop its support for now.
2933          */
2934         if (ptgt != ctgt)
2935                 return -ENOTSUPP;
2936
2937         return md_intent_getattr_async(ptgt->ltd_exp, minfo);
2938 }
2939
2940 static int lmv_revalidate_lock(struct obd_export *exp, struct lookup_intent *it,
2941                                struct lu_fid *fid, __u64 *bits)
2942 {
2943         struct obd_device       *obd = exp->exp_obd;
2944         struct lmv_obd    *lmv = &obd->u.lmv;
2945         struct lmv_tgt_desc     *tgt;
2946
2947         tgt = lmv_find_target(lmv, fid);
2948         if (IS_ERR(tgt))
2949                 return PTR_ERR(tgt);
2950
2951         return md_revalidate_lock(tgt->ltd_exp, it, fid, bits);
2952 }
2953
2954 static int
2955 lmv_get_fid_from_lsm(struct obd_export *exp,
2956                      const struct lmv_stripe_md *lsm,
2957                      const char *name, int namelen, struct lu_fid *fid)
2958 {
2959         const struct lmv_oinfo *oinfo;
2960
2961         LASSERT(lsm);
2962         oinfo = lsm_name_to_stripe_info(lsm, name, namelen);
2963         if (IS_ERR(oinfo))
2964                 return PTR_ERR(oinfo);
2965
2966         *fid = oinfo->lmo_fid;
2967
2968         return 0;
2969 }
2970
2971 /**
2972  * For lmv, only need to send request to master MDT, and the master MDT will
2973  * process with other slave MDTs. The only exception is Q_GETOQUOTA for which
2974  * we directly fetch data from the slave MDTs.
2975  */
2976 static int lmv_quotactl(struct obd_device *unused, struct obd_export *exp,
2977                         struct obd_quotactl *oqctl)
2978 {
2979         struct obd_device   *obd = class_exp2obd(exp);
2980         struct lmv_obd      *lmv = &obd->u.lmv;
2981         struct lmv_tgt_desc *tgt = lmv->tgts[0];
2982         int rc = 0;
2983         __u64 curspace = 0, curinodes = 0;
2984         u32 i;
2985
2986         if (!tgt || !tgt->ltd_exp || !tgt->ltd_active ||
2987             !lmv->desc.ld_tgt_count) {
2988                 CERROR("master lmv inactive\n");
2989                 return -EIO;
2990         }
2991
2992         if (oqctl->qc_cmd != Q_GETOQUOTA)
2993                 return obd_quotactl(tgt->ltd_exp, oqctl);
2994
2995         for (i = 0; i < lmv->desc.ld_tgt_count; i++) {
2996                 int err;
2997
2998                 tgt = lmv->tgts[i];
2999
3000                 if (!tgt || !tgt->ltd_exp || !tgt->ltd_active)
3001                         continue;
3002
3003                 err = obd_quotactl(tgt->ltd_exp, oqctl);
3004                 if (err) {
3005                         CERROR("getquota on mdt %d failed. %d\n", i, err);
3006                         if (!rc)
3007                                 rc = err;
3008                 } else {
3009                         curspace += oqctl->qc_dqblk.dqb_curspace;
3010                         curinodes += oqctl->qc_dqblk.dqb_curinodes;
3011                 }
3012         }
3013         oqctl->qc_dqblk.dqb_curspace = curspace;
3014         oqctl->qc_dqblk.dqb_curinodes = curinodes;
3015
3016         return rc;
3017 }
3018
3019 static int lmv_merge_attr(struct obd_export *exp,
3020                           const struct lmv_stripe_md *lsm,
3021                           struct cl_attr *attr,
3022                           ldlm_blocking_callback cb_blocking)
3023 {
3024         int rc, i;
3025
3026         rc = lmv_revalidate_slaves(exp, lsm, cb_blocking, 0);
3027         if (rc < 0)
3028                 return rc;
3029
3030         for (i = 0; i < lsm->lsm_md_stripe_count; i++) {
3031                 struct inode *inode = lsm->lsm_md_oinfo[i].lmo_root;
3032
3033                 CDEBUG(D_INFO, "" DFID " size %llu, blocks %llu nlink %u, atime %lu ctime %lu, mtime %lu.\n",
3034                        PFID(&lsm->lsm_md_oinfo[i].lmo_fid),
3035                        i_size_read(inode), (unsigned long long)inode->i_blocks,
3036                        inode->i_nlink, LTIME_S(inode->i_atime),
3037                        LTIME_S(inode->i_ctime), LTIME_S(inode->i_mtime));
3038
3039                 /* for slave stripe, it needs to subtract nlink for . and .. */
3040                 if (i)
3041                         attr->cat_nlink += inode->i_nlink - 2;
3042                 else
3043                         attr->cat_nlink = inode->i_nlink;
3044
3045                 attr->cat_size += i_size_read(inode);
3046                 attr->cat_blocks += inode->i_blocks;
3047
3048                 if (attr->cat_atime < LTIME_S(inode->i_atime))
3049                         attr->cat_atime = LTIME_S(inode->i_atime);
3050
3051                 if (attr->cat_ctime < LTIME_S(inode->i_ctime))
3052                         attr->cat_ctime = LTIME_S(inode->i_ctime);
3053
3054                 if (attr->cat_mtime < LTIME_S(inode->i_mtime))
3055                         attr->cat_mtime = LTIME_S(inode->i_mtime);
3056         }
3057         return 0;
3058 }
3059
3060 static struct obd_ops lmv_obd_ops = {
3061         .owner          = THIS_MODULE,
3062         .setup          = lmv_setup,
3063         .cleanup        = lmv_cleanup,
3064         .precleanup     = lmv_precleanup,
3065         .process_config = lmv_process_config,
3066         .connect        = lmv_connect,
3067         .disconnect     = lmv_disconnect,
3068         .statfs         = lmv_statfs,
3069         .get_info       = lmv_get_info,
3070         .set_info_async = lmv_set_info_async,
3071         .notify         = lmv_notify,
3072         .get_uuid       = lmv_get_uuid,
3073         .iocontrol      = lmv_iocontrol,
3074         .quotactl       = lmv_quotactl
3075 };
3076
3077 static struct md_ops lmv_md_ops = {
3078         .getstatus              = lmv_getstatus,
3079         .null_inode             = lmv_null_inode,
3080         .close                  = lmv_close,
3081         .create                 = lmv_create,
3082         .enqueue                = lmv_enqueue,
3083         .getattr                = lmv_getattr,
3084         .getxattr               = lmv_getxattr,
3085         .getattr_name           = lmv_getattr_name,
3086         .intent_lock            = lmv_intent_lock,
3087         .link                   = lmv_link,
3088         .rename                 = lmv_rename,
3089         .setattr                = lmv_setattr,
3090         .setxattr               = lmv_setxattr,
3091         .sync                   = lmv_sync,
3092         .read_page              = lmv_read_page,
3093         .unlink                 = lmv_unlink,
3094         .init_ea_size           = lmv_init_ea_size,
3095         .cancel_unused          = lmv_cancel_unused,
3096         .set_lock_data          = lmv_set_lock_data,
3097         .lock_match             = lmv_lock_match,
3098         .get_lustre_md          = lmv_get_lustre_md,
3099         .free_lustre_md         = lmv_free_lustre_md,
3100         .merge_attr             = lmv_merge_attr,
3101         .set_open_replay_data   = lmv_set_open_replay_data,
3102         .clear_open_replay_data = lmv_clear_open_replay_data,
3103         .intent_getattr_async   = lmv_intent_getattr_async,
3104         .revalidate_lock        = lmv_revalidate_lock,
3105         .get_fid_from_lsm       = lmv_get_fid_from_lsm,
3106         .unpackmd               = lmv_unpackmd,
3107 };
3108
3109 static int __init lmv_init(void)
3110 {
3111         struct lprocfs_static_vars lvars;
3112
3113         lprocfs_lmv_init_vars(&lvars);
3114
3115         return class_register_type(&lmv_obd_ops, &lmv_md_ops,
3116                                  LUSTRE_LMV_NAME, NULL);
3117 }
3118
3119 static void lmv_exit(void)
3120 {
3121         class_unregister_type(LUSTRE_LMV_NAME);
3122 }
3123
3124 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
3125 MODULE_DESCRIPTION("Lustre Logical Metadata Volume");
3126 MODULE_VERSION(LUSTRE_VERSION_STRING);
3127 MODULE_LICENSE("GPL");
3128
3129 module_init(lmv_init);
3130 module_exit(lmv_exit);