GNU Linux-libre 4.9.337-gnu1
[releases.git] / drivers / staging / lustre / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39 #include "../include/obd_class.h"
40 #include "../include/lprocfs_status.h"
41 #include "../include/lustre_kernelcomm.h"
42
43 spinlock_t obd_types_lock;
44
45 static struct kmem_cache *obd_device_cachep;
46 struct kmem_cache *obdo_cachep;
47 EXPORT_SYMBOL(obdo_cachep);
48 static struct kmem_cache *import_cachep;
49
50 static struct list_head      obd_zombie_imports;
51 static struct list_head      obd_zombie_exports;
52 static spinlock_t  obd_zombie_impexp_lock;
53 static void obd_zombie_impexp_notify(void);
54 static void obd_zombie_export_add(struct obd_export *exp);
55 static void obd_zombie_import_add(struct obd_import *imp);
56
57 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
58 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
59
60 /*
61  * support functions: we could use inter-module communication, but this
62  * is more portable to other OS's
63  */
64 static struct obd_device *obd_device_alloc(void)
65 {
66         struct obd_device *obd;
67
68         obd = kmem_cache_zalloc(obd_device_cachep, GFP_NOFS);
69         if (obd)
70                 obd->obd_magic = OBD_DEVICE_MAGIC;
71         return obd;
72 }
73
74 static void obd_device_free(struct obd_device *obd)
75 {
76         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
77                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
78         if (obd->obd_namespace) {
79                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
80                        obd, obd->obd_namespace, obd->obd_force);
81                 LBUG();
82         }
83         lu_ref_fini(&obd->obd_reference);
84         kmem_cache_free(obd_device_cachep, obd);
85 }
86
87 static struct obd_type *class_search_type(const char *name)
88 {
89         struct list_head *tmp;
90         struct obd_type *type;
91
92         spin_lock(&obd_types_lock);
93         list_for_each(tmp, &obd_types) {
94                 type = list_entry(tmp, struct obd_type, typ_chain);
95                 if (strcmp(type->typ_name, name) == 0) {
96                         spin_unlock(&obd_types_lock);
97                         return type;
98                 }
99         }
100         spin_unlock(&obd_types_lock);
101         return NULL;
102 }
103
104 static struct obd_type *class_get_type(const char *name)
105 {
106         struct obd_type *type = class_search_type(name);
107
108         if (!type) {
109                 const char *modname = name;
110
111                 if (!request_module("%s", modname)) {
112                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
113                         type = class_search_type(name);
114                 } else {
115                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
116                                            modname);
117                 }
118         }
119         if (type) {
120                 spin_lock(&type->obd_type_lock);
121                 type->typ_refcnt++;
122                 try_module_get(type->typ_dt_ops->owner);
123                 spin_unlock(&type->obd_type_lock);
124         }
125         return type;
126 }
127
128 void class_put_type(struct obd_type *type)
129 {
130         LASSERT(type);
131         spin_lock(&type->obd_type_lock);
132         type->typ_refcnt--;
133         module_put(type->typ_dt_ops->owner);
134         spin_unlock(&type->obd_type_lock);
135 }
136
137 #define CLASS_MAX_NAME 1024
138
139 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
140                         const char *name,
141                         struct lu_device_type *ldt)
142 {
143         struct obd_type *type;
144         int rc;
145
146         /* sanity check */
147         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
148
149         if (class_search_type(name)) {
150                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
151                 return -EEXIST;
152         }
153
154         rc = -ENOMEM;
155         type = kzalloc(sizeof(*type), GFP_NOFS);
156         if (!type)
157                 return rc;
158
159         type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
160         type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
161         type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
162
163         if (!type->typ_dt_ops ||
164             !type->typ_md_ops ||
165             !type->typ_name)
166                 goto failed;
167
168         *type->typ_dt_ops = *dt_ops;
169         /* md_ops is optional */
170         if (md_ops)
171                 *type->typ_md_ops = *md_ops;
172         strcpy(type->typ_name, name);
173         spin_lock_init(&type->obd_type_lock);
174
175         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
176                                                     debugfs_lustre_root,
177                                                     NULL, type);
178         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
179                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
180                                              : -ENOMEM;
181                 type->typ_debugfs_entry = NULL;
182                 goto failed;
183         }
184
185         type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
186         if (!type->typ_kobj) {
187                 rc = -ENOMEM;
188                 goto failed;
189         }
190
191         if (ldt) {
192                 type->typ_lu = ldt;
193                 rc = lu_device_type_init(ldt);
194                 if (rc != 0)
195                         goto failed;
196         }
197
198         spin_lock(&obd_types_lock);
199         list_add(&type->typ_chain, &obd_types);
200         spin_unlock(&obd_types_lock);
201
202         return 0;
203
204  failed:
205         if (type->typ_kobj)
206                 kobject_put(type->typ_kobj);
207         kfree(type->typ_name);
208         kfree(type->typ_md_ops);
209         kfree(type->typ_dt_ops);
210         kfree(type);
211         return rc;
212 }
213 EXPORT_SYMBOL(class_register_type);
214
215 int class_unregister_type(const char *name)
216 {
217         struct obd_type *type = class_search_type(name);
218
219         if (!type) {
220                 CERROR("unknown obd type\n");
221                 return -EINVAL;
222         }
223
224         if (type->typ_refcnt) {
225                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
226                 /* This is a bad situation, let's make the best of it */
227                 /* Remove ops, but leave the name for debugging */
228                 kfree(type->typ_dt_ops);
229                 kfree(type->typ_md_ops);
230                 return -EBUSY;
231         }
232
233         if (type->typ_kobj)
234                 kobject_put(type->typ_kobj);
235
236         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
237                 ldebugfs_remove(&type->typ_debugfs_entry);
238
239         if (type->typ_lu)
240                 lu_device_type_fini(type->typ_lu);
241
242         spin_lock(&obd_types_lock);
243         list_del(&type->typ_chain);
244         spin_unlock(&obd_types_lock);
245         kfree(type->typ_name);
246         kfree(type->typ_dt_ops);
247         kfree(type->typ_md_ops);
248         kfree(type);
249         return 0;
250 } /* class_unregister_type */
251 EXPORT_SYMBOL(class_unregister_type);
252
253 /**
254  * Create a new obd device.
255  *
256  * Find an empty slot in ::obd_devs[], create a new obd device in it.
257  *
258  * \param[in] type_name obd device type string.
259  * \param[in] name      obd device name.
260  *
261  * \retval NULL if create fails, otherwise return the obd device
262  *       pointer created.
263  */
264 struct obd_device *class_newdev(const char *type_name, const char *name)
265 {
266         struct obd_device *result = NULL;
267         struct obd_device *newdev;
268         struct obd_type *type = NULL;
269         int i;
270         int new_obd_minor = 0;
271
272         if (strlen(name) >= MAX_OBD_NAME) {
273                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
274                 return ERR_PTR(-EINVAL);
275         }
276
277         type = class_get_type(type_name);
278         if (!type) {
279                 CERROR("OBD: unknown type: %s\n", type_name);
280                 return ERR_PTR(-ENODEV);
281         }
282
283         newdev = obd_device_alloc();
284         if (!newdev) {
285                 result = ERR_PTR(-ENOMEM);
286                 goto out_type;
287         }
288
289         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
290
291         write_lock(&obd_dev_lock);
292         for (i = 0; i < class_devno_max(); i++) {
293                 struct obd_device *obd = class_num2obd(i);
294
295                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
296                         CERROR("Device %s already exists at %d, won't add\n",
297                                name, i);
298                         if (result) {
299                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
300                                          "%p obd_magic %08x != %08x\n", result,
301                                          result->obd_magic, OBD_DEVICE_MAGIC);
302                                 LASSERTF(result->obd_minor == new_obd_minor,
303                                          "%p obd_minor %d != %d\n", result,
304                                          result->obd_minor, new_obd_minor);
305
306                                 obd_devs[result->obd_minor] = NULL;
307                                 result->obd_name[0] = '\0';
308                          }
309                         result = ERR_PTR(-EEXIST);
310                         break;
311                 }
312                 if (!result && !obd) {
313                         result = newdev;
314                         result->obd_minor = i;
315                         new_obd_minor = i;
316                         result->obd_type = type;
317                         strncpy(result->obd_name, name,
318                                 sizeof(result->obd_name) - 1);
319                         obd_devs[i] = result;
320                 }
321         }
322         write_unlock(&obd_dev_lock);
323
324         if (!result && i >= class_devno_max()) {
325                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
326                        class_devno_max());
327                 result = ERR_PTR(-EOVERFLOW);
328                 goto out;
329         }
330
331         if (IS_ERR(result))
332                 goto out;
333
334         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
335                result->obd_name, result);
336
337         return result;
338 out:
339         obd_device_free(newdev);
340 out_type:
341         class_put_type(type);
342         return result;
343 }
344
345 void class_release_dev(struct obd_device *obd)
346 {
347         struct obd_type *obd_type = obd->obd_type;
348
349         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
353         LASSERT(obd_type);
354
355         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
356                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
357
358         write_lock(&obd_dev_lock);
359         obd_devs[obd->obd_minor] = NULL;
360         write_unlock(&obd_dev_lock);
361         obd_device_free(obd);
362
363         class_put_type(obd_type);
364 }
365
366 int class_name2dev(const char *name)
367 {
368         int i;
369
370         if (!name)
371                 return -1;
372
373         read_lock(&obd_dev_lock);
374         for (i = 0; i < class_devno_max(); i++) {
375                 struct obd_device *obd = class_num2obd(i);
376
377                 if (obd && strcmp(name, obd->obd_name) == 0) {
378                         /* Make sure we finished attaching before we give
379                          * out any references
380                          */
381                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382                         if (obd->obd_attached) {
383                                 read_unlock(&obd_dev_lock);
384                                 return i;
385                         }
386                         break;
387                 }
388         }
389         read_unlock(&obd_dev_lock);
390
391         return -1;
392 }
393
394 struct obd_device *class_name2obd(const char *name)
395 {
396         int dev = class_name2dev(name);
397
398         if (dev < 0 || dev > class_devno_max())
399                 return NULL;
400         return class_num2obd(dev);
401 }
402 EXPORT_SYMBOL(class_name2obd);
403
404 int class_uuid2dev(struct obd_uuid *uuid)
405 {
406         int i;
407
408         read_lock(&obd_dev_lock);
409         for (i = 0; i < class_devno_max(); i++) {
410                 struct obd_device *obd = class_num2obd(i);
411
412                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
413                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414                         read_unlock(&obd_dev_lock);
415                         return i;
416                 }
417         }
418         read_unlock(&obd_dev_lock);
419
420         return -1;
421 }
422
423 /**
424  * Get obd device from ::obd_devs[]
425  *
426  * \param num [in] array index
427  *
428  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
429  *       otherwise return the obd device there.
430  */
431 struct obd_device *class_num2obd(int num)
432 {
433         struct obd_device *obd = NULL;
434
435         if (num < class_devno_max()) {
436                 obd = obd_devs[num];
437                 if (!obd)
438                         return NULL;
439
440                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
441                          "%p obd_magic %08x != %08x\n",
442                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
443                 LASSERTF(obd->obd_minor == num,
444                          "%p obd_minor %0d != %0d\n",
445                          obd, obd->obd_minor, num);
446         }
447
448         return obd;
449 }
450
451 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
452  * specified, then only the client with that uuid is returned,
453  * otherwise any client connected to the tgt is returned.
454  */
455 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
456                                          const char *typ_name,
457                                          struct obd_uuid *grp_uuid)
458 {
459         int i;
460
461         read_lock(&obd_dev_lock);
462         for (i = 0; i < class_devno_max(); i++) {
463                 struct obd_device *obd = class_num2obd(i);
464
465                 if (!obd)
466                         continue;
467                 if ((strncmp(obd->obd_type->typ_name, typ_name,
468                              strlen(typ_name)) == 0)) {
469                         if (obd_uuid_equals(tgt_uuid,
470                                             &obd->u.cli.cl_target_uuid) &&
471                             ((grp_uuid) ? obd_uuid_equals(grp_uuid,
472                                                          &obd->obd_uuid) : 1)) {
473                                 read_unlock(&obd_dev_lock);
474                                 return obd;
475                         }
476                 }
477         }
478         read_unlock(&obd_dev_lock);
479
480         return NULL;
481 }
482 EXPORT_SYMBOL(class_find_client_obd);
483
484 /* Iterate the obd_device list looking devices have grp_uuid. Start
485  * searching at *next, and if a device is found, the next index to look
486  * at is saved in *next. If next is NULL, then the first matching device
487  * will always be returned.
488  */
489 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
490 {
491         int i;
492
493         if (!next)
494                 i = 0;
495         else if (*next >= 0 && *next < class_devno_max())
496                 i = *next;
497         else
498                 return NULL;
499
500         read_lock(&obd_dev_lock);
501         for (; i < class_devno_max(); i++) {
502                 struct obd_device *obd = class_num2obd(i);
503
504                 if (!obd)
505                         continue;
506                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
507                         if (next)
508                                 *next = i + 1;
509                         read_unlock(&obd_dev_lock);
510                         return obd;
511                 }
512         }
513         read_unlock(&obd_dev_lock);
514
515         return NULL;
516 }
517 EXPORT_SYMBOL(class_devices_in_group);
518
519 /**
520  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
521  * adjust sptlrpc settings accordingly.
522  */
523 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
524 {
525         struct obd_device  *obd;
526         const char       *type;
527         int              i, rc = 0, rc2;
528
529         LASSERT(namelen > 0);
530
531         read_lock(&obd_dev_lock);
532         for (i = 0; i < class_devno_max(); i++) {
533                 obd = class_num2obd(i);
534
535                 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
536                         continue;
537
538                 /* only notify mdc, osc, mdt, ost */
539                 type = obd->obd_type->typ_name;
540                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
541                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
542                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
543                     strcmp(type, LUSTRE_OST_NAME) != 0)
544                         continue;
545
546                 if (strncmp(obd->obd_name, fsname, namelen))
547                         continue;
548
549                 class_incref(obd, __func__, obd);
550                 read_unlock(&obd_dev_lock);
551                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
552                                          sizeof(KEY_SPTLRPC_CONF),
553                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
554                 rc = rc ? rc : rc2;
555                 class_decref(obd, __func__, obd);
556                 read_lock(&obd_dev_lock);
557         }
558         read_unlock(&obd_dev_lock);
559         return rc;
560 }
561 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
562
563 void obd_cleanup_caches(void)
564 {
565         kmem_cache_destroy(obd_device_cachep);
566         obd_device_cachep = NULL;
567         kmem_cache_destroy(obdo_cachep);
568         obdo_cachep = NULL;
569         kmem_cache_destroy(import_cachep);
570         import_cachep = NULL;
571 }
572
573 int obd_init_caches(void)
574 {
575         LASSERT(!obd_device_cachep);
576         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
577                                               sizeof(struct obd_device),
578                                               0, 0, NULL);
579         if (!obd_device_cachep)
580                 goto out;
581
582         LASSERT(!obdo_cachep);
583         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
584                                         0, 0, NULL);
585         if (!obdo_cachep)
586                 goto out;
587
588         LASSERT(!import_cachep);
589         import_cachep = kmem_cache_create("ll_import_cache",
590                                           sizeof(struct obd_import),
591                                           0, 0, NULL);
592         if (!import_cachep)
593                 goto out;
594
595         return 0;
596  out:
597         obd_cleanup_caches();
598         return -ENOMEM;
599 }
600
601 /* map connection to client */
602 struct obd_export *class_conn2export(struct lustre_handle *conn)
603 {
604         struct obd_export *export;
605
606         if (!conn) {
607                 CDEBUG(D_CACHE, "looking for null handle\n");
608                 return NULL;
609         }
610
611         if (conn->cookie == -1) {  /* this means assign a new connection */
612                 CDEBUG(D_CACHE, "want a new connection\n");
613                 return NULL;
614         }
615
616         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
617         export = class_handle2object(conn->cookie, NULL);
618         return export;
619 }
620 EXPORT_SYMBOL(class_conn2export);
621
622 struct obd_device *class_exp2obd(struct obd_export *exp)
623 {
624         if (exp)
625                 return exp->exp_obd;
626         return NULL;
627 }
628 EXPORT_SYMBOL(class_exp2obd);
629
630 struct obd_import *class_exp2cliimp(struct obd_export *exp)
631 {
632         struct obd_device *obd = exp->exp_obd;
633
634         if (!obd)
635                 return NULL;
636         return obd->u.cli.cl_import;
637 }
638 EXPORT_SYMBOL(class_exp2cliimp);
639
640 /* Export management functions */
641 static void class_export_destroy(struct obd_export *exp)
642 {
643         struct obd_device *obd = exp->exp_obd;
644
645         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
646         LASSERT(obd);
647
648         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
649                exp->exp_client_uuid.uuid, obd->obd_name);
650
651         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
652         if (exp->exp_connection)
653                 ptlrpc_put_connection_superhack(exp->exp_connection);
654
655         LASSERT(list_empty(&exp->exp_outstanding_replies));
656         LASSERT(list_empty(&exp->exp_uncommitted_replies));
657         LASSERT(list_empty(&exp->exp_req_replay_queue));
658         LASSERT(list_empty(&exp->exp_hp_rpcs));
659         obd_destroy_export(exp);
660         class_decref(obd, "export", exp);
661
662         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
663 }
664
665 static void export_handle_addref(void *export)
666 {
667         class_export_get(export);
668 }
669
670 static struct portals_handle_ops export_handle_ops = {
671         .hop_addref = export_handle_addref,
672         .hop_free   = NULL,
673 };
674
675 struct obd_export *class_export_get(struct obd_export *exp)
676 {
677         atomic_inc(&exp->exp_refcount);
678         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
679                atomic_read(&exp->exp_refcount));
680         return exp;
681 }
682 EXPORT_SYMBOL(class_export_get);
683
684 void class_export_put(struct obd_export *exp)
685 {
686         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
687         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
688                atomic_read(&exp->exp_refcount) - 1);
689
690         if (atomic_dec_and_test(&exp->exp_refcount)) {
691                 LASSERT(!list_empty(&exp->exp_obd_chain));
692                 CDEBUG(D_IOCTL, "final put %p/%s\n",
693                        exp, exp->exp_client_uuid.uuid);
694
695                 /* release nid stat refererence */
696                 lprocfs_exp_cleanup(exp);
697
698                 obd_zombie_export_add(exp);
699         }
700 }
701 EXPORT_SYMBOL(class_export_put);
702
703 /* Creates a new export, adds it to the hash table, and returns a
704  * pointer to it. The refcount is 2: one for the hash reference, and
705  * one for the pointer returned by this function.
706  */
707 struct obd_export *class_new_export(struct obd_device *obd,
708                                     struct obd_uuid *cluuid)
709 {
710         struct obd_export *export;
711         struct cfs_hash *hash = NULL;
712         int rc = 0;
713
714         export = kzalloc(sizeof(*export), GFP_NOFS);
715         if (!export)
716                 return ERR_PTR(-ENOMEM);
717
718         export->exp_conn_cnt = 0;
719         export->exp_lock_hash = NULL;
720         export->exp_flock_hash = NULL;
721         atomic_set(&export->exp_refcount, 2);
722         atomic_set(&export->exp_rpc_count, 0);
723         atomic_set(&export->exp_cb_count, 0);
724         atomic_set(&export->exp_locks_count, 0);
725 #if LUSTRE_TRACKS_LOCK_EXP_REFS
726         INIT_LIST_HEAD(&export->exp_locks_list);
727         spin_lock_init(&export->exp_locks_list_guard);
728 #endif
729         atomic_set(&export->exp_replay_count, 0);
730         export->exp_obd = obd;
731         INIT_LIST_HEAD(&export->exp_outstanding_replies);
732         spin_lock_init(&export->exp_uncommitted_replies_lock);
733         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
734         INIT_LIST_HEAD(&export->exp_req_replay_queue);
735         INIT_LIST_HEAD(&export->exp_handle.h_link);
736         INIT_LIST_HEAD(&export->exp_hp_rpcs);
737         class_handle_hash(&export->exp_handle, &export_handle_ops);
738         spin_lock_init(&export->exp_lock);
739         spin_lock_init(&export->exp_rpc_lock);
740         INIT_HLIST_NODE(&export->exp_uuid_hash);
741         spin_lock_init(&export->exp_bl_list_lock);
742         INIT_LIST_HEAD(&export->exp_bl_list);
743
744         export->exp_sp_peer = LUSTRE_SP_ANY;
745         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
746         export->exp_client_uuid = *cluuid;
747         obd_init_export(export);
748
749         spin_lock(&obd->obd_dev_lock);
750         /* shouldn't happen, but might race */
751         if (obd->obd_stopping) {
752                 rc = -ENODEV;
753                 goto exit_unlock;
754         }
755
756         hash = cfs_hash_getref(obd->obd_uuid_hash);
757         if (!hash) {
758                 rc = -ENODEV;
759                 goto exit_unlock;
760         }
761         spin_unlock(&obd->obd_dev_lock);
762
763         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
764                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
765                 if (rc != 0) {
766                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
767                                       obd->obd_name, cluuid->uuid, rc);
768                         rc = -EALREADY;
769                         goto exit_err;
770                 }
771         }
772
773         spin_lock(&obd->obd_dev_lock);
774         if (obd->obd_stopping) {
775                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
776                 rc = -ENODEV;
777                 goto exit_unlock;
778         }
779
780         class_incref(obd, "export", export);
781         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
782         export->exp_obd->obd_num_exports++;
783         spin_unlock(&obd->obd_dev_lock);
784         cfs_hash_putref(hash);
785         return export;
786
787 exit_unlock:
788         spin_unlock(&obd->obd_dev_lock);
789 exit_err:
790         if (hash)
791                 cfs_hash_putref(hash);
792         class_handle_unhash(&export->exp_handle);
793         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
794         obd_destroy_export(export);
795         kfree(export);
796         return ERR_PTR(rc);
797 }
798 EXPORT_SYMBOL(class_new_export);
799
800 void class_unlink_export(struct obd_export *exp)
801 {
802         class_handle_unhash(&exp->exp_handle);
803
804         spin_lock(&exp->exp_obd->obd_dev_lock);
805         /* delete an uuid-export hashitem from hashtables */
806         if (!hlist_unhashed(&exp->exp_uuid_hash))
807                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
808                              &exp->exp_client_uuid,
809                              &exp->exp_uuid_hash);
810
811         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
812         exp->exp_obd->obd_num_exports--;
813         spin_unlock(&exp->exp_obd->obd_dev_lock);
814         class_export_put(exp);
815 }
816
817 /* Import management functions */
818 static void class_import_destroy(struct obd_import *imp)
819 {
820         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
821                imp->imp_obd->obd_name);
822
823         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
824
825         ptlrpc_put_connection_superhack(imp->imp_connection);
826
827         while (!list_empty(&imp->imp_conn_list)) {
828                 struct obd_import_conn *imp_conn;
829
830                 imp_conn = list_entry(imp->imp_conn_list.next,
831                                       struct obd_import_conn, oic_item);
832                 list_del_init(&imp_conn->oic_item);
833                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
834                 kfree(imp_conn);
835         }
836
837         LASSERT(!imp->imp_sec);
838         class_decref(imp->imp_obd, "import", imp);
839         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
840 }
841
842 static void import_handle_addref(void *import)
843 {
844         class_import_get(import);
845 }
846
847 static struct portals_handle_ops import_handle_ops = {
848         .hop_addref = import_handle_addref,
849         .hop_free   = NULL,
850 };
851
852 struct obd_import *class_import_get(struct obd_import *import)
853 {
854         atomic_inc(&import->imp_refcount);
855         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
856                atomic_read(&import->imp_refcount),
857                import->imp_obd->obd_name);
858         return import;
859 }
860 EXPORT_SYMBOL(class_import_get);
861
862 void class_import_put(struct obd_import *imp)
863 {
864         LASSERT(list_empty(&imp->imp_zombie_chain));
865         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
866
867         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
868                atomic_read(&imp->imp_refcount) - 1,
869                imp->imp_obd->obd_name);
870
871         if (atomic_dec_and_test(&imp->imp_refcount)) {
872                 CDEBUG(D_INFO, "final put import %p\n", imp);
873                 obd_zombie_import_add(imp);
874         }
875
876         /* catch possible import put race */
877         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
878 }
879 EXPORT_SYMBOL(class_import_put);
880
881 static void init_imp_at(struct imp_at *at)
882 {
883         int i;
884
885         at_init(&at->iat_net_latency, 0, 0);
886         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
887                 /* max service estimates are tracked on the server side, so
888                  * don't use the AT history here, just use the last reported
889                  * val. (But keep hist for proc histogram, worst_ever)
890                  */
891                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
892                         AT_FLG_NOHIST);
893         }
894 }
895
896 struct obd_import *class_new_import(struct obd_device *obd)
897 {
898         struct obd_import *imp;
899
900         imp = kzalloc(sizeof(*imp), GFP_NOFS);
901         if (!imp)
902                 return NULL;
903
904         INIT_LIST_HEAD(&imp->imp_pinger_chain);
905         INIT_LIST_HEAD(&imp->imp_zombie_chain);
906         INIT_LIST_HEAD(&imp->imp_replay_list);
907         INIT_LIST_HEAD(&imp->imp_sending_list);
908         INIT_LIST_HEAD(&imp->imp_delayed_list);
909         INIT_LIST_HEAD(&imp->imp_committed_list);
910         imp->imp_replay_cursor = &imp->imp_committed_list;
911         spin_lock_init(&imp->imp_lock);
912         imp->imp_last_success_conn = 0;
913         imp->imp_state = LUSTRE_IMP_NEW;
914         imp->imp_obd = class_incref(obd, "import", imp);
915         mutex_init(&imp->imp_sec_mutex);
916         init_waitqueue_head(&imp->imp_recovery_waitq);
917
918         atomic_set(&imp->imp_refcount, 2);
919         atomic_set(&imp->imp_unregistering, 0);
920         atomic_set(&imp->imp_inflight, 0);
921         atomic_set(&imp->imp_replay_inflight, 0);
922         atomic_set(&imp->imp_inval_count, 0);
923         INIT_LIST_HEAD(&imp->imp_conn_list);
924         INIT_LIST_HEAD(&imp->imp_handle.h_link);
925         class_handle_hash(&imp->imp_handle, &import_handle_ops);
926         init_imp_at(&imp->imp_at);
927
928         /* the default magic is V2, will be used in connect RPC, and
929          * then adjusted according to the flags in request/reply.
930          */
931         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
932
933         return imp;
934 }
935 EXPORT_SYMBOL(class_new_import);
936
937 void class_destroy_import(struct obd_import *import)
938 {
939         LASSERT(import);
940         LASSERT(import != LP_POISON);
941
942         class_handle_unhash(&import->imp_handle);
943
944         spin_lock(&import->imp_lock);
945         import->imp_generation++;
946         spin_unlock(&import->imp_lock);
947         class_import_put(import);
948 }
949 EXPORT_SYMBOL(class_destroy_import);
950
951 #if LUSTRE_TRACKS_LOCK_EXP_REFS
952
953 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
954 {
955         spin_lock(&exp->exp_locks_list_guard);
956
957         LASSERT(lock->l_exp_refs_nr >= 0);
958
959         if (lock->l_exp_refs_target && lock->l_exp_refs_target != exp) {
960                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
961                               exp, lock, lock->l_exp_refs_target);
962         }
963         if ((lock->l_exp_refs_nr++) == 0) {
964                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
965                 lock->l_exp_refs_target = exp;
966         }
967         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
968                lock, exp, lock->l_exp_refs_nr);
969         spin_unlock(&exp->exp_locks_list_guard);
970 }
971
972 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
973 {
974         spin_lock(&exp->exp_locks_list_guard);
975         LASSERT(lock->l_exp_refs_nr > 0);
976         if (lock->l_exp_refs_target != exp) {
977                 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
978                               lock, lock->l_exp_refs_target, exp);
979         }
980         if (-- lock->l_exp_refs_nr == 0) {
981                 list_del_init(&lock->l_exp_refs_link);
982                 lock->l_exp_refs_target = NULL;
983         }
984         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
985                lock, exp, lock->l_exp_refs_nr);
986         spin_unlock(&exp->exp_locks_list_guard);
987 }
988 #endif
989
990 /* A connection defines an export context in which preallocation can
991  * be managed. This releases the export pointer reference, and returns
992  * the export handle, so the export refcount is 1 when this function
993  * returns.
994  */
995 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
996                   struct obd_uuid *cluuid)
997 {
998         struct obd_export *export;
999
1000         LASSERT(conn);
1001         LASSERT(obd);
1002         LASSERT(cluuid);
1003
1004         export = class_new_export(obd, cluuid);
1005         if (IS_ERR(export))
1006                 return PTR_ERR(export);
1007
1008         conn->cookie = export->exp_handle.h_cookie;
1009         class_export_put(export);
1010
1011         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1012                cluuid->uuid, conn->cookie);
1013         return 0;
1014 }
1015 EXPORT_SYMBOL(class_connect);
1016
1017 /* This function removes 1-3 references from the export:
1018  * 1 - for export pointer passed
1019  * and if disconnect really need
1020  * 2 - removing from hash
1021  * 3 - in client_unlink_export
1022  * The export pointer passed to this function can destroyed
1023  */
1024 int class_disconnect(struct obd_export *export)
1025 {
1026         int already_disconnected;
1027
1028         if (!export) {
1029                 CWARN("attempting to free NULL export %p\n", export);
1030                 return -EINVAL;
1031         }
1032
1033         spin_lock(&export->exp_lock);
1034         already_disconnected = export->exp_disconnected;
1035         export->exp_disconnected = 1;
1036         spin_unlock(&export->exp_lock);
1037
1038         /* class_cleanup(), abort_recovery(), and class_fail_export()
1039          * all end up in here, and if any of them race we shouldn't
1040          * call extra class_export_puts().
1041          */
1042         if (already_disconnected)
1043                 goto no_disconn;
1044
1045         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1046                export->exp_handle.h_cookie);
1047
1048         class_unlink_export(export);
1049 no_disconn:
1050         class_export_put(export);
1051         return 0;
1052 }
1053 EXPORT_SYMBOL(class_disconnect);
1054
1055 void class_fail_export(struct obd_export *exp)
1056 {
1057         int rc, already_failed;
1058
1059         spin_lock(&exp->exp_lock);
1060         already_failed = exp->exp_failed;
1061         exp->exp_failed = 1;
1062         spin_unlock(&exp->exp_lock);
1063
1064         if (already_failed) {
1065                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1066                        exp, exp->exp_client_uuid.uuid);
1067                 return;
1068         }
1069
1070         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1071                exp, exp->exp_client_uuid.uuid);
1072
1073         if (obd_dump_on_timeout)
1074                 libcfs_debug_dumplog();
1075
1076         /* need for safe call CDEBUG after obd_disconnect */
1077         class_export_get(exp);
1078
1079         /* Most callers into obd_disconnect are removing their own reference
1080          * (request, for example) in addition to the one from the hash table.
1081          * We don't have such a reference here, so make one.
1082          */
1083         class_export_get(exp);
1084         rc = obd_disconnect(exp);
1085         if (rc)
1086                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1087         else
1088                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1089                        exp, exp->exp_client_uuid.uuid);
1090         class_export_put(exp);
1091 }
1092 EXPORT_SYMBOL(class_fail_export);
1093
1094 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1095 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1096 #endif
1097
1098 /* Total amount of zombies to be destroyed */
1099 static int zombies_count;
1100
1101 /**
1102  * kill zombie imports and exports
1103  */
1104 static void obd_zombie_impexp_cull(void)
1105 {
1106         struct obd_import *import;
1107         struct obd_export *export;
1108
1109         do {
1110                 spin_lock(&obd_zombie_impexp_lock);
1111
1112                 import = NULL;
1113                 if (!list_empty(&obd_zombie_imports)) {
1114                         import = list_entry(obd_zombie_imports.next,
1115                                             struct obd_import,
1116                                             imp_zombie_chain);
1117                         list_del_init(&import->imp_zombie_chain);
1118                 }
1119
1120                 export = NULL;
1121                 if (!list_empty(&obd_zombie_exports)) {
1122                         export = list_entry(obd_zombie_exports.next,
1123                                             struct obd_export,
1124                                             exp_obd_chain);
1125                         list_del_init(&export->exp_obd_chain);
1126                 }
1127
1128                 spin_unlock(&obd_zombie_impexp_lock);
1129
1130                 if (import) {
1131                         class_import_destroy(import);
1132                         spin_lock(&obd_zombie_impexp_lock);
1133                         zombies_count--;
1134                         spin_unlock(&obd_zombie_impexp_lock);
1135                 }
1136
1137                 if (export) {
1138                         class_export_destroy(export);
1139                         spin_lock(&obd_zombie_impexp_lock);
1140                         zombies_count--;
1141                         spin_unlock(&obd_zombie_impexp_lock);
1142                 }
1143
1144                 cond_resched();
1145         } while (import || export);
1146 }
1147
1148 static struct completion        obd_zombie_start;
1149 static struct completion        obd_zombie_stop;
1150 static unsigned long            obd_zombie_flags;
1151 static wait_queue_head_t                obd_zombie_waitq;
1152 static pid_t                    obd_zombie_pid;
1153
1154 enum {
1155         OBD_ZOMBIE_STOP         = 0x0001,
1156 };
1157
1158 /**
1159  * check for work for kill zombie import/export thread.
1160  */
1161 static int obd_zombie_impexp_check(void *arg)
1162 {
1163         int rc;
1164
1165         spin_lock(&obd_zombie_impexp_lock);
1166         rc = (zombies_count == 0) &&
1167              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1168         spin_unlock(&obd_zombie_impexp_lock);
1169
1170         return rc;
1171 }
1172
1173 /**
1174  * Add export to the obd_zombie thread and notify it.
1175  */
1176 static void obd_zombie_export_add(struct obd_export *exp)
1177 {
1178         spin_lock(&exp->exp_obd->obd_dev_lock);
1179         LASSERT(!list_empty(&exp->exp_obd_chain));
1180         list_del_init(&exp->exp_obd_chain);
1181         spin_unlock(&exp->exp_obd->obd_dev_lock);
1182         spin_lock(&obd_zombie_impexp_lock);
1183         zombies_count++;
1184         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1185         spin_unlock(&obd_zombie_impexp_lock);
1186
1187         obd_zombie_impexp_notify();
1188 }
1189
1190 /**
1191  * Add import to the obd_zombie thread and notify it.
1192  */
1193 static void obd_zombie_import_add(struct obd_import *imp)
1194 {
1195         LASSERT(!imp->imp_sec);
1196         spin_lock(&obd_zombie_impexp_lock);
1197         LASSERT(list_empty(&imp->imp_zombie_chain));
1198         zombies_count++;
1199         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1200         spin_unlock(&obd_zombie_impexp_lock);
1201
1202         obd_zombie_impexp_notify();
1203 }
1204
1205 /**
1206  * notify import/export destroy thread about new zombie.
1207  */
1208 static void obd_zombie_impexp_notify(void)
1209 {
1210         /*
1211          * Make sure obd_zombie_impexp_thread get this notification.
1212          * It is possible this signal only get by obd_zombie_barrier, and
1213          * barrier gulps this notification and sleeps away and hangs ensues
1214          */
1215         wake_up_all(&obd_zombie_waitq);
1216 }
1217
1218 /**
1219  * check whether obd_zombie is idle
1220  */
1221 static int obd_zombie_is_idle(void)
1222 {
1223         int rc;
1224
1225         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1226         spin_lock(&obd_zombie_impexp_lock);
1227         rc = (zombies_count == 0);
1228         spin_unlock(&obd_zombie_impexp_lock);
1229         return rc;
1230 }
1231
1232 /**
1233  * wait when obd_zombie import/export queues become empty
1234  */
1235 void obd_zombie_barrier(void)
1236 {
1237         struct l_wait_info lwi = { 0 };
1238
1239         if (obd_zombie_pid == current_pid())
1240                 /* don't wait for myself */
1241                 return;
1242         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1243 }
1244 EXPORT_SYMBOL(obd_zombie_barrier);
1245
1246 /**
1247  * destroy zombie export/import thread.
1248  */
1249 static int obd_zombie_impexp_thread(void *unused)
1250 {
1251         unshare_fs_struct();
1252         complete(&obd_zombie_start);
1253
1254         obd_zombie_pid = current_pid();
1255
1256         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1257                 struct l_wait_info lwi = { 0 };
1258
1259                 l_wait_event(obd_zombie_waitq,
1260                              !obd_zombie_impexp_check(NULL), &lwi);
1261                 obd_zombie_impexp_cull();
1262
1263                 /*
1264                  * Notify obd_zombie_barrier callers that queues
1265                  * may be empty.
1266                  */
1267                 wake_up(&obd_zombie_waitq);
1268         }
1269
1270         complete(&obd_zombie_stop);
1271
1272         return 0;
1273 }
1274
1275 /**
1276  * start destroy zombie import/export thread
1277  */
1278 int obd_zombie_impexp_init(void)
1279 {
1280         struct task_struct *task;
1281
1282         INIT_LIST_HEAD(&obd_zombie_imports);
1283         INIT_LIST_HEAD(&obd_zombie_exports);
1284         spin_lock_init(&obd_zombie_impexp_lock);
1285         init_completion(&obd_zombie_start);
1286         init_completion(&obd_zombie_stop);
1287         init_waitqueue_head(&obd_zombie_waitq);
1288         obd_zombie_pid = 0;
1289
1290         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1291         if (IS_ERR(task))
1292                 return PTR_ERR(task);
1293
1294         wait_for_completion(&obd_zombie_start);
1295         return 0;
1296 }
1297
1298 /**
1299  * stop destroy zombie import/export thread
1300  */
1301 void obd_zombie_impexp_stop(void)
1302 {
1303         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1304         obd_zombie_impexp_notify();
1305         wait_for_completion(&obd_zombie_stop);
1306 }
1307
1308 struct obd_request_slot_waiter {
1309         struct list_head        orsw_entry;
1310         wait_queue_head_t       orsw_waitq;
1311         bool                    orsw_signaled;
1312 };
1313
1314 static bool obd_request_slot_avail(struct client_obd *cli,
1315                                    struct obd_request_slot_waiter *orsw)
1316 {
1317         bool avail;
1318
1319         spin_lock(&cli->cl_loi_list_lock);
1320         avail = !!list_empty(&orsw->orsw_entry);
1321         spin_unlock(&cli->cl_loi_list_lock);
1322
1323         return avail;
1324 };
1325
1326 /*
1327  * For network flow control, the RPC sponsor needs to acquire a credit
1328  * before sending the RPC. The credits count for a connection is defined
1329  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1330  * the subsequent RPC sponsors need to wait until others released their
1331  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1332  */
1333 int obd_get_request_slot(struct client_obd *cli)
1334 {
1335         struct obd_request_slot_waiter orsw;
1336         struct l_wait_info lwi;
1337         int rc;
1338
1339         spin_lock(&cli->cl_loi_list_lock);
1340         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1341                 cli->cl_r_in_flight++;
1342                 spin_unlock(&cli->cl_loi_list_lock);
1343                 return 0;
1344         }
1345
1346         init_waitqueue_head(&orsw.orsw_waitq);
1347         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1348         orsw.orsw_signaled = false;
1349         spin_unlock(&cli->cl_loi_list_lock);
1350
1351         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1352         rc = l_wait_event(orsw.orsw_waitq,
1353                           obd_request_slot_avail(cli, &orsw) ||
1354                           orsw.orsw_signaled,
1355                           &lwi);
1356
1357         /*
1358          * Here, we must take the lock to avoid the on-stack 'orsw' to be
1359          * freed but other (such as obd_put_request_slot) is using it.
1360          */
1361         spin_lock(&cli->cl_loi_list_lock);
1362         if (rc) {
1363                 if (!orsw.orsw_signaled) {
1364                         if (list_empty(&orsw.orsw_entry))
1365                                 cli->cl_r_in_flight--;
1366                         else
1367                                 list_del(&orsw.orsw_entry);
1368                 }
1369         }
1370
1371         if (orsw.orsw_signaled) {
1372                 LASSERT(list_empty(&orsw.orsw_entry));
1373
1374                 rc = -EINTR;
1375         }
1376         spin_unlock(&cli->cl_loi_list_lock);
1377
1378         return rc;
1379 }
1380 EXPORT_SYMBOL(obd_get_request_slot);
1381
1382 void obd_put_request_slot(struct client_obd *cli)
1383 {
1384         struct obd_request_slot_waiter *orsw;
1385
1386         spin_lock(&cli->cl_loi_list_lock);
1387         cli->cl_r_in_flight--;
1388
1389         /* If there is free slot, wakeup the first waiter. */
1390         if (!list_empty(&cli->cl_loi_read_list) &&
1391             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1392                 orsw = list_entry(cli->cl_loi_read_list.next,
1393                                   struct obd_request_slot_waiter, orsw_entry);
1394                 list_del_init(&orsw->orsw_entry);
1395                 cli->cl_r_in_flight++;
1396                 wake_up(&orsw->orsw_waitq);
1397         }
1398         spin_unlock(&cli->cl_loi_list_lock);
1399 }
1400 EXPORT_SYMBOL(obd_put_request_slot);
1401
1402 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1403 {
1404         return cli->cl_max_rpcs_in_flight;
1405 }
1406 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1407
1408 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1409 {
1410         struct obd_request_slot_waiter *orsw;
1411         __u32 old;
1412         int diff;
1413         int i;
1414
1415         if (max > OBD_MAX_RIF_MAX || max < 1)
1416                 return -ERANGE;
1417
1418         spin_lock(&cli->cl_loi_list_lock);
1419         old = cli->cl_max_rpcs_in_flight;
1420         cli->cl_max_rpcs_in_flight = max;
1421         diff = max - old;
1422
1423         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1424         for (i = 0; i < diff; i++) {
1425                 if (list_empty(&cli->cl_loi_read_list))
1426                         break;
1427
1428                 orsw = list_entry(cli->cl_loi_read_list.next,
1429                                   struct obd_request_slot_waiter, orsw_entry);
1430                 list_del_init(&orsw->orsw_entry);
1431                 cli->cl_r_in_flight++;
1432                 wake_up(&orsw->orsw_waitq);
1433         }
1434         spin_unlock(&cli->cl_loi_list_lock);
1435
1436         return 0;
1437 }
1438 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);