4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
39 #include "../include/obd_class.h"
40 #include "../include/lprocfs_status.h"
41 #include "../include/lustre_kernelcomm.h"
43 spinlock_t obd_types_lock;
45 static struct kmem_cache *obd_device_cachep;
46 struct kmem_cache *obdo_cachep;
47 EXPORT_SYMBOL(obdo_cachep);
48 static struct kmem_cache *import_cachep;
50 static struct list_head obd_zombie_imports;
51 static struct list_head obd_zombie_exports;
52 static spinlock_t obd_zombie_impexp_lock;
53 static void obd_zombie_impexp_notify(void);
54 static void obd_zombie_export_add(struct obd_export *exp);
55 static void obd_zombie_import_add(struct obd_import *imp);
57 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
58 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
61 * support functions: we could use inter-module communication, but this
62 * is more portable to other OS's
64 static struct obd_device *obd_device_alloc(void)
66 struct obd_device *obd;
68 obd = kmem_cache_zalloc(obd_device_cachep, GFP_NOFS);
70 obd->obd_magic = OBD_DEVICE_MAGIC;
74 static void obd_device_free(struct obd_device *obd)
76 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
77 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
78 if (obd->obd_namespace) {
79 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
80 obd, obd->obd_namespace, obd->obd_force);
83 lu_ref_fini(&obd->obd_reference);
84 kmem_cache_free(obd_device_cachep, obd);
87 static struct obd_type *class_search_type(const char *name)
89 struct list_head *tmp;
90 struct obd_type *type;
92 spin_lock(&obd_types_lock);
93 list_for_each(tmp, &obd_types) {
94 type = list_entry(tmp, struct obd_type, typ_chain);
95 if (strcmp(type->typ_name, name) == 0) {
96 spin_unlock(&obd_types_lock);
100 spin_unlock(&obd_types_lock);
104 static struct obd_type *class_get_type(const char *name)
106 struct obd_type *type = class_search_type(name);
109 const char *modname = name;
111 if (!request_module("%s", modname)) {
112 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
113 type = class_search_type(name);
115 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
120 spin_lock(&type->obd_type_lock);
122 try_module_get(type->typ_dt_ops->owner);
123 spin_unlock(&type->obd_type_lock);
128 void class_put_type(struct obd_type *type)
131 spin_lock(&type->obd_type_lock);
133 module_put(type->typ_dt_ops->owner);
134 spin_unlock(&type->obd_type_lock);
137 #define CLASS_MAX_NAME 1024
139 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
141 struct lu_device_type *ldt)
143 struct obd_type *type;
147 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
149 if (class_search_type(name)) {
150 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
155 type = kzalloc(sizeof(*type), GFP_NOFS);
159 type->typ_dt_ops = kzalloc(sizeof(*type->typ_dt_ops), GFP_NOFS);
160 type->typ_md_ops = kzalloc(sizeof(*type->typ_md_ops), GFP_NOFS);
161 type->typ_name = kzalloc(strlen(name) + 1, GFP_NOFS);
163 if (!type->typ_dt_ops ||
168 *type->typ_dt_ops = *dt_ops;
169 /* md_ops is optional */
171 *type->typ_md_ops = *md_ops;
172 strcpy(type->typ_name, name);
173 spin_lock_init(&type->obd_type_lock);
175 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
178 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
179 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
181 type->typ_debugfs_entry = NULL;
185 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
186 if (!type->typ_kobj) {
193 rc = lu_device_type_init(ldt);
198 spin_lock(&obd_types_lock);
199 list_add(&type->typ_chain, &obd_types);
200 spin_unlock(&obd_types_lock);
206 kobject_put(type->typ_kobj);
207 kfree(type->typ_name);
208 kfree(type->typ_md_ops);
209 kfree(type->typ_dt_ops);
213 EXPORT_SYMBOL(class_register_type);
215 int class_unregister_type(const char *name)
217 struct obd_type *type = class_search_type(name);
220 CERROR("unknown obd type\n");
224 if (type->typ_refcnt) {
225 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
226 /* This is a bad situation, let's make the best of it */
227 /* Remove ops, but leave the name for debugging */
228 kfree(type->typ_dt_ops);
229 kfree(type->typ_md_ops);
234 kobject_put(type->typ_kobj);
236 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
237 ldebugfs_remove(&type->typ_debugfs_entry);
240 lu_device_type_fini(type->typ_lu);
242 spin_lock(&obd_types_lock);
243 list_del(&type->typ_chain);
244 spin_unlock(&obd_types_lock);
245 kfree(type->typ_name);
246 kfree(type->typ_dt_ops);
247 kfree(type->typ_md_ops);
250 } /* class_unregister_type */
251 EXPORT_SYMBOL(class_unregister_type);
254 * Create a new obd device.
256 * Find an empty slot in ::obd_devs[], create a new obd device in it.
258 * \param[in] type_name obd device type string.
259 * \param[in] name obd device name.
261 * \retval NULL if create fails, otherwise return the obd device
264 struct obd_device *class_newdev(const char *type_name, const char *name)
266 struct obd_device *result = NULL;
267 struct obd_device *newdev;
268 struct obd_type *type = NULL;
270 int new_obd_minor = 0;
272 if (strlen(name) >= MAX_OBD_NAME) {
273 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
274 return ERR_PTR(-EINVAL);
277 type = class_get_type(type_name);
279 CERROR("OBD: unknown type: %s\n", type_name);
280 return ERR_PTR(-ENODEV);
283 newdev = obd_device_alloc();
285 result = ERR_PTR(-ENOMEM);
289 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
291 write_lock(&obd_dev_lock);
292 for (i = 0; i < class_devno_max(); i++) {
293 struct obd_device *obd = class_num2obd(i);
295 if (obd && (strcmp(name, obd->obd_name) == 0)) {
296 CERROR("Device %s already exists at %d, won't add\n",
299 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
300 "%p obd_magic %08x != %08x\n", result,
301 result->obd_magic, OBD_DEVICE_MAGIC);
302 LASSERTF(result->obd_minor == new_obd_minor,
303 "%p obd_minor %d != %d\n", result,
304 result->obd_minor, new_obd_minor);
306 obd_devs[result->obd_minor] = NULL;
307 result->obd_name[0] = '\0';
309 result = ERR_PTR(-EEXIST);
312 if (!result && !obd) {
314 result->obd_minor = i;
316 result->obd_type = type;
317 strncpy(result->obd_name, name,
318 sizeof(result->obd_name) - 1);
319 obd_devs[i] = result;
322 write_unlock(&obd_dev_lock);
324 if (!result && i >= class_devno_max()) {
325 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
327 result = ERR_PTR(-EOVERFLOW);
334 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
335 result->obd_name, result);
339 obd_device_free(newdev);
341 class_put_type(type);
345 void class_release_dev(struct obd_device *obd)
347 struct obd_type *obd_type = obd->obd_type;
349 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
350 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
351 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
352 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
355 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
356 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
358 write_lock(&obd_dev_lock);
359 obd_devs[obd->obd_minor] = NULL;
360 write_unlock(&obd_dev_lock);
361 obd_device_free(obd);
363 class_put_type(obd_type);
366 int class_name2dev(const char *name)
373 read_lock(&obd_dev_lock);
374 for (i = 0; i < class_devno_max(); i++) {
375 struct obd_device *obd = class_num2obd(i);
377 if (obd && strcmp(name, obd->obd_name) == 0) {
378 /* Make sure we finished attaching before we give
381 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382 if (obd->obd_attached) {
383 read_unlock(&obd_dev_lock);
389 read_unlock(&obd_dev_lock);
394 struct obd_device *class_name2obd(const char *name)
396 int dev = class_name2dev(name);
398 if (dev < 0 || dev > class_devno_max())
400 return class_num2obd(dev);
402 EXPORT_SYMBOL(class_name2obd);
404 int class_uuid2dev(struct obd_uuid *uuid)
408 read_lock(&obd_dev_lock);
409 for (i = 0; i < class_devno_max(); i++) {
410 struct obd_device *obd = class_num2obd(i);
412 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
413 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414 read_unlock(&obd_dev_lock);
418 read_unlock(&obd_dev_lock);
424 * Get obd device from ::obd_devs[]
426 * \param num [in] array index
428 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
429 * otherwise return the obd device there.
431 struct obd_device *class_num2obd(int num)
433 struct obd_device *obd = NULL;
435 if (num < class_devno_max()) {
440 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
441 "%p obd_magic %08x != %08x\n",
442 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
443 LASSERTF(obd->obd_minor == num,
444 "%p obd_minor %0d != %0d\n",
445 obd, obd->obd_minor, num);
451 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
452 * specified, then only the client with that uuid is returned,
453 * otherwise any client connected to the tgt is returned.
455 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
456 const char *typ_name,
457 struct obd_uuid *grp_uuid)
461 read_lock(&obd_dev_lock);
462 for (i = 0; i < class_devno_max(); i++) {
463 struct obd_device *obd = class_num2obd(i);
467 if ((strncmp(obd->obd_type->typ_name, typ_name,
468 strlen(typ_name)) == 0)) {
469 if (obd_uuid_equals(tgt_uuid,
470 &obd->u.cli.cl_target_uuid) &&
471 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
472 &obd->obd_uuid) : 1)) {
473 read_unlock(&obd_dev_lock);
478 read_unlock(&obd_dev_lock);
482 EXPORT_SYMBOL(class_find_client_obd);
484 /* Iterate the obd_device list looking devices have grp_uuid. Start
485 * searching at *next, and if a device is found, the next index to look
486 * at is saved in *next. If next is NULL, then the first matching device
487 * will always be returned.
489 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
495 else if (*next >= 0 && *next < class_devno_max())
500 read_lock(&obd_dev_lock);
501 for (; i < class_devno_max(); i++) {
502 struct obd_device *obd = class_num2obd(i);
506 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
509 read_unlock(&obd_dev_lock);
513 read_unlock(&obd_dev_lock);
517 EXPORT_SYMBOL(class_devices_in_group);
520 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
521 * adjust sptlrpc settings accordingly.
523 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
525 struct obd_device *obd;
529 LASSERT(namelen > 0);
531 read_lock(&obd_dev_lock);
532 for (i = 0; i < class_devno_max(); i++) {
533 obd = class_num2obd(i);
535 if (!obd || obd->obd_set_up == 0 || obd->obd_stopping)
538 /* only notify mdc, osc, mdt, ost */
539 type = obd->obd_type->typ_name;
540 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
541 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
542 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
543 strcmp(type, LUSTRE_OST_NAME) != 0)
546 if (strncmp(obd->obd_name, fsname, namelen))
549 class_incref(obd, __func__, obd);
550 read_unlock(&obd_dev_lock);
551 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
552 sizeof(KEY_SPTLRPC_CONF),
553 KEY_SPTLRPC_CONF, 0, NULL, NULL);
555 class_decref(obd, __func__, obd);
556 read_lock(&obd_dev_lock);
558 read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
563 void obd_cleanup_caches(void)
565 kmem_cache_destroy(obd_device_cachep);
566 obd_device_cachep = NULL;
567 kmem_cache_destroy(obdo_cachep);
569 kmem_cache_destroy(import_cachep);
570 import_cachep = NULL;
573 int obd_init_caches(void)
575 LASSERT(!obd_device_cachep);
576 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
577 sizeof(struct obd_device),
579 if (!obd_device_cachep)
582 LASSERT(!obdo_cachep);
583 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
588 LASSERT(!import_cachep);
589 import_cachep = kmem_cache_create("ll_import_cache",
590 sizeof(struct obd_import),
597 obd_cleanup_caches();
601 /* map connection to client */
602 struct obd_export *class_conn2export(struct lustre_handle *conn)
604 struct obd_export *export;
607 CDEBUG(D_CACHE, "looking for null handle\n");
611 if (conn->cookie == -1) { /* this means assign a new connection */
612 CDEBUG(D_CACHE, "want a new connection\n");
616 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
617 export = class_handle2object(conn->cookie, NULL);
620 EXPORT_SYMBOL(class_conn2export);
622 struct obd_device *class_exp2obd(struct obd_export *exp)
628 EXPORT_SYMBOL(class_exp2obd);
630 struct obd_import *class_exp2cliimp(struct obd_export *exp)
632 struct obd_device *obd = exp->exp_obd;
636 return obd->u.cli.cl_import;
638 EXPORT_SYMBOL(class_exp2cliimp);
640 /* Export management functions */
641 static void class_export_destroy(struct obd_export *exp)
643 struct obd_device *obd = exp->exp_obd;
645 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
648 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
649 exp->exp_client_uuid.uuid, obd->obd_name);
651 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
652 if (exp->exp_connection)
653 ptlrpc_put_connection_superhack(exp->exp_connection);
655 LASSERT(list_empty(&exp->exp_outstanding_replies));
656 LASSERT(list_empty(&exp->exp_uncommitted_replies));
657 LASSERT(list_empty(&exp->exp_req_replay_queue));
658 LASSERT(list_empty(&exp->exp_hp_rpcs));
659 obd_destroy_export(exp);
660 class_decref(obd, "export", exp);
662 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
665 static void export_handle_addref(void *export)
667 class_export_get(export);
670 static struct portals_handle_ops export_handle_ops = {
671 .hop_addref = export_handle_addref,
675 struct obd_export *class_export_get(struct obd_export *exp)
677 atomic_inc(&exp->exp_refcount);
678 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
679 atomic_read(&exp->exp_refcount));
682 EXPORT_SYMBOL(class_export_get);
684 void class_export_put(struct obd_export *exp)
686 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
687 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
688 atomic_read(&exp->exp_refcount) - 1);
690 if (atomic_dec_and_test(&exp->exp_refcount)) {
691 LASSERT(!list_empty(&exp->exp_obd_chain));
692 CDEBUG(D_IOCTL, "final put %p/%s\n",
693 exp, exp->exp_client_uuid.uuid);
695 /* release nid stat refererence */
696 lprocfs_exp_cleanup(exp);
698 obd_zombie_export_add(exp);
701 EXPORT_SYMBOL(class_export_put);
703 /* Creates a new export, adds it to the hash table, and returns a
704 * pointer to it. The refcount is 2: one for the hash reference, and
705 * one for the pointer returned by this function.
707 struct obd_export *class_new_export(struct obd_device *obd,
708 struct obd_uuid *cluuid)
710 struct obd_export *export;
711 struct cfs_hash *hash = NULL;
714 export = kzalloc(sizeof(*export), GFP_NOFS);
716 return ERR_PTR(-ENOMEM);
718 export->exp_conn_cnt = 0;
719 export->exp_lock_hash = NULL;
720 export->exp_flock_hash = NULL;
721 atomic_set(&export->exp_refcount, 2);
722 atomic_set(&export->exp_rpc_count, 0);
723 atomic_set(&export->exp_cb_count, 0);
724 atomic_set(&export->exp_locks_count, 0);
725 #if LUSTRE_TRACKS_LOCK_EXP_REFS
726 INIT_LIST_HEAD(&export->exp_locks_list);
727 spin_lock_init(&export->exp_locks_list_guard);
729 atomic_set(&export->exp_replay_count, 0);
730 export->exp_obd = obd;
731 INIT_LIST_HEAD(&export->exp_outstanding_replies);
732 spin_lock_init(&export->exp_uncommitted_replies_lock);
733 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
734 INIT_LIST_HEAD(&export->exp_req_replay_queue);
735 INIT_LIST_HEAD(&export->exp_handle.h_link);
736 INIT_LIST_HEAD(&export->exp_hp_rpcs);
737 class_handle_hash(&export->exp_handle, &export_handle_ops);
738 spin_lock_init(&export->exp_lock);
739 spin_lock_init(&export->exp_rpc_lock);
740 INIT_HLIST_NODE(&export->exp_uuid_hash);
741 spin_lock_init(&export->exp_bl_list_lock);
742 INIT_LIST_HEAD(&export->exp_bl_list);
744 export->exp_sp_peer = LUSTRE_SP_ANY;
745 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
746 export->exp_client_uuid = *cluuid;
747 obd_init_export(export);
749 spin_lock(&obd->obd_dev_lock);
750 /* shouldn't happen, but might race */
751 if (obd->obd_stopping) {
756 hash = cfs_hash_getref(obd->obd_uuid_hash);
761 spin_unlock(&obd->obd_dev_lock);
763 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
764 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
766 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
767 obd->obd_name, cluuid->uuid, rc);
773 spin_lock(&obd->obd_dev_lock);
774 if (obd->obd_stopping) {
775 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
780 class_incref(obd, "export", export);
781 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
782 export->exp_obd->obd_num_exports++;
783 spin_unlock(&obd->obd_dev_lock);
784 cfs_hash_putref(hash);
788 spin_unlock(&obd->obd_dev_lock);
791 cfs_hash_putref(hash);
792 class_handle_unhash(&export->exp_handle);
793 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
794 obd_destroy_export(export);
798 EXPORT_SYMBOL(class_new_export);
800 void class_unlink_export(struct obd_export *exp)
802 class_handle_unhash(&exp->exp_handle);
804 spin_lock(&exp->exp_obd->obd_dev_lock);
805 /* delete an uuid-export hashitem from hashtables */
806 if (!hlist_unhashed(&exp->exp_uuid_hash))
807 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
808 &exp->exp_client_uuid,
809 &exp->exp_uuid_hash);
811 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
812 exp->exp_obd->obd_num_exports--;
813 spin_unlock(&exp->exp_obd->obd_dev_lock);
814 class_export_put(exp);
817 /* Import management functions */
818 static void class_import_destroy(struct obd_import *imp)
820 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
821 imp->imp_obd->obd_name);
823 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
825 ptlrpc_put_connection_superhack(imp->imp_connection);
827 while (!list_empty(&imp->imp_conn_list)) {
828 struct obd_import_conn *imp_conn;
830 imp_conn = list_entry(imp->imp_conn_list.next,
831 struct obd_import_conn, oic_item);
832 list_del_init(&imp_conn->oic_item);
833 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
837 LASSERT(!imp->imp_sec);
838 class_decref(imp->imp_obd, "import", imp);
839 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
842 static void import_handle_addref(void *import)
844 class_import_get(import);
847 static struct portals_handle_ops import_handle_ops = {
848 .hop_addref = import_handle_addref,
852 struct obd_import *class_import_get(struct obd_import *import)
854 atomic_inc(&import->imp_refcount);
855 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
856 atomic_read(&import->imp_refcount),
857 import->imp_obd->obd_name);
860 EXPORT_SYMBOL(class_import_get);
862 void class_import_put(struct obd_import *imp)
864 LASSERT(list_empty(&imp->imp_zombie_chain));
865 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
867 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
868 atomic_read(&imp->imp_refcount) - 1,
869 imp->imp_obd->obd_name);
871 if (atomic_dec_and_test(&imp->imp_refcount)) {
872 CDEBUG(D_INFO, "final put import %p\n", imp);
873 obd_zombie_import_add(imp);
876 /* catch possible import put race */
877 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
879 EXPORT_SYMBOL(class_import_put);
881 static void init_imp_at(struct imp_at *at)
885 at_init(&at->iat_net_latency, 0, 0);
886 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
887 /* max service estimates are tracked on the server side, so
888 * don't use the AT history here, just use the last reported
889 * val. (But keep hist for proc histogram, worst_ever)
891 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
896 struct obd_import *class_new_import(struct obd_device *obd)
898 struct obd_import *imp;
900 imp = kzalloc(sizeof(*imp), GFP_NOFS);
904 INIT_LIST_HEAD(&imp->imp_pinger_chain);
905 INIT_LIST_HEAD(&imp->imp_zombie_chain);
906 INIT_LIST_HEAD(&imp->imp_replay_list);
907 INIT_LIST_HEAD(&imp->imp_sending_list);
908 INIT_LIST_HEAD(&imp->imp_delayed_list);
909 INIT_LIST_HEAD(&imp->imp_committed_list);
910 imp->imp_replay_cursor = &imp->imp_committed_list;
911 spin_lock_init(&imp->imp_lock);
912 imp->imp_last_success_conn = 0;
913 imp->imp_state = LUSTRE_IMP_NEW;
914 imp->imp_obd = class_incref(obd, "import", imp);
915 mutex_init(&imp->imp_sec_mutex);
916 init_waitqueue_head(&imp->imp_recovery_waitq);
918 atomic_set(&imp->imp_refcount, 2);
919 atomic_set(&imp->imp_unregistering, 0);
920 atomic_set(&imp->imp_inflight, 0);
921 atomic_set(&imp->imp_replay_inflight, 0);
922 atomic_set(&imp->imp_inval_count, 0);
923 INIT_LIST_HEAD(&imp->imp_conn_list);
924 INIT_LIST_HEAD(&imp->imp_handle.h_link);
925 class_handle_hash(&imp->imp_handle, &import_handle_ops);
926 init_imp_at(&imp->imp_at);
928 /* the default magic is V2, will be used in connect RPC, and
929 * then adjusted according to the flags in request/reply.
931 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
935 EXPORT_SYMBOL(class_new_import);
937 void class_destroy_import(struct obd_import *import)
940 LASSERT(import != LP_POISON);
942 class_handle_unhash(&import->imp_handle);
944 spin_lock(&import->imp_lock);
945 import->imp_generation++;
946 spin_unlock(&import->imp_lock);
947 class_import_put(import);
949 EXPORT_SYMBOL(class_destroy_import);
951 #if LUSTRE_TRACKS_LOCK_EXP_REFS
953 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
955 spin_lock(&exp->exp_locks_list_guard);
957 LASSERT(lock->l_exp_refs_nr >= 0);
959 if (lock->l_exp_refs_target && lock->l_exp_refs_target != exp) {
960 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
961 exp, lock, lock->l_exp_refs_target);
963 if ((lock->l_exp_refs_nr++) == 0) {
964 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
965 lock->l_exp_refs_target = exp;
967 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
968 lock, exp, lock->l_exp_refs_nr);
969 spin_unlock(&exp->exp_locks_list_guard);
972 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
974 spin_lock(&exp->exp_locks_list_guard);
975 LASSERT(lock->l_exp_refs_nr > 0);
976 if (lock->l_exp_refs_target != exp) {
977 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
978 lock, lock->l_exp_refs_target, exp);
980 if (-- lock->l_exp_refs_nr == 0) {
981 list_del_init(&lock->l_exp_refs_link);
982 lock->l_exp_refs_target = NULL;
984 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
985 lock, exp, lock->l_exp_refs_nr);
986 spin_unlock(&exp->exp_locks_list_guard);
990 /* A connection defines an export context in which preallocation can
991 * be managed. This releases the export pointer reference, and returns
992 * the export handle, so the export refcount is 1 when this function
995 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
996 struct obd_uuid *cluuid)
998 struct obd_export *export;
1004 export = class_new_export(obd, cluuid);
1006 return PTR_ERR(export);
1008 conn->cookie = export->exp_handle.h_cookie;
1009 class_export_put(export);
1011 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1012 cluuid->uuid, conn->cookie);
1015 EXPORT_SYMBOL(class_connect);
1017 /* This function removes 1-3 references from the export:
1018 * 1 - for export pointer passed
1019 * and if disconnect really need
1020 * 2 - removing from hash
1021 * 3 - in client_unlink_export
1022 * The export pointer passed to this function can destroyed
1024 int class_disconnect(struct obd_export *export)
1026 int already_disconnected;
1029 CWARN("attempting to free NULL export %p\n", export);
1033 spin_lock(&export->exp_lock);
1034 already_disconnected = export->exp_disconnected;
1035 export->exp_disconnected = 1;
1036 spin_unlock(&export->exp_lock);
1038 /* class_cleanup(), abort_recovery(), and class_fail_export()
1039 * all end up in here, and if any of them race we shouldn't
1040 * call extra class_export_puts().
1042 if (already_disconnected)
1045 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1046 export->exp_handle.h_cookie);
1048 class_unlink_export(export);
1050 class_export_put(export);
1053 EXPORT_SYMBOL(class_disconnect);
1055 void class_fail_export(struct obd_export *exp)
1057 int rc, already_failed;
1059 spin_lock(&exp->exp_lock);
1060 already_failed = exp->exp_failed;
1061 exp->exp_failed = 1;
1062 spin_unlock(&exp->exp_lock);
1064 if (already_failed) {
1065 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1066 exp, exp->exp_client_uuid.uuid);
1070 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1071 exp, exp->exp_client_uuid.uuid);
1073 if (obd_dump_on_timeout)
1074 libcfs_debug_dumplog();
1076 /* need for safe call CDEBUG after obd_disconnect */
1077 class_export_get(exp);
1079 /* Most callers into obd_disconnect are removing their own reference
1080 * (request, for example) in addition to the one from the hash table.
1081 * We don't have such a reference here, so make one.
1083 class_export_get(exp);
1084 rc = obd_disconnect(exp);
1086 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1088 CDEBUG(D_HA, "disconnected export %p/%s\n",
1089 exp, exp->exp_client_uuid.uuid);
1090 class_export_put(exp);
1092 EXPORT_SYMBOL(class_fail_export);
1094 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1095 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1098 /* Total amount of zombies to be destroyed */
1099 static int zombies_count;
1102 * kill zombie imports and exports
1104 static void obd_zombie_impexp_cull(void)
1106 struct obd_import *import;
1107 struct obd_export *export;
1110 spin_lock(&obd_zombie_impexp_lock);
1113 if (!list_empty(&obd_zombie_imports)) {
1114 import = list_entry(obd_zombie_imports.next,
1117 list_del_init(&import->imp_zombie_chain);
1121 if (!list_empty(&obd_zombie_exports)) {
1122 export = list_entry(obd_zombie_exports.next,
1125 list_del_init(&export->exp_obd_chain);
1128 spin_unlock(&obd_zombie_impexp_lock);
1131 class_import_destroy(import);
1132 spin_lock(&obd_zombie_impexp_lock);
1134 spin_unlock(&obd_zombie_impexp_lock);
1138 class_export_destroy(export);
1139 spin_lock(&obd_zombie_impexp_lock);
1141 spin_unlock(&obd_zombie_impexp_lock);
1145 } while (import || export);
1148 static struct completion obd_zombie_start;
1149 static struct completion obd_zombie_stop;
1150 static unsigned long obd_zombie_flags;
1151 static wait_queue_head_t obd_zombie_waitq;
1152 static pid_t obd_zombie_pid;
1155 OBD_ZOMBIE_STOP = 0x0001,
1159 * check for work for kill zombie import/export thread.
1161 static int obd_zombie_impexp_check(void *arg)
1165 spin_lock(&obd_zombie_impexp_lock);
1166 rc = (zombies_count == 0) &&
1167 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1168 spin_unlock(&obd_zombie_impexp_lock);
1174 * Add export to the obd_zombie thread and notify it.
1176 static void obd_zombie_export_add(struct obd_export *exp)
1178 spin_lock(&exp->exp_obd->obd_dev_lock);
1179 LASSERT(!list_empty(&exp->exp_obd_chain));
1180 list_del_init(&exp->exp_obd_chain);
1181 spin_unlock(&exp->exp_obd->obd_dev_lock);
1182 spin_lock(&obd_zombie_impexp_lock);
1184 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1185 spin_unlock(&obd_zombie_impexp_lock);
1187 obd_zombie_impexp_notify();
1191 * Add import to the obd_zombie thread and notify it.
1193 static void obd_zombie_import_add(struct obd_import *imp)
1195 LASSERT(!imp->imp_sec);
1196 spin_lock(&obd_zombie_impexp_lock);
1197 LASSERT(list_empty(&imp->imp_zombie_chain));
1199 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1200 spin_unlock(&obd_zombie_impexp_lock);
1202 obd_zombie_impexp_notify();
1206 * notify import/export destroy thread about new zombie.
1208 static void obd_zombie_impexp_notify(void)
1211 * Make sure obd_zombie_impexp_thread get this notification.
1212 * It is possible this signal only get by obd_zombie_barrier, and
1213 * barrier gulps this notification and sleeps away and hangs ensues
1215 wake_up_all(&obd_zombie_waitq);
1219 * check whether obd_zombie is idle
1221 static int obd_zombie_is_idle(void)
1225 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1226 spin_lock(&obd_zombie_impexp_lock);
1227 rc = (zombies_count == 0);
1228 spin_unlock(&obd_zombie_impexp_lock);
1233 * wait when obd_zombie import/export queues become empty
1235 void obd_zombie_barrier(void)
1237 struct l_wait_info lwi = { 0 };
1239 if (obd_zombie_pid == current_pid())
1240 /* don't wait for myself */
1242 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1244 EXPORT_SYMBOL(obd_zombie_barrier);
1247 * destroy zombie export/import thread.
1249 static int obd_zombie_impexp_thread(void *unused)
1251 unshare_fs_struct();
1252 complete(&obd_zombie_start);
1254 obd_zombie_pid = current_pid();
1256 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1257 struct l_wait_info lwi = { 0 };
1259 l_wait_event(obd_zombie_waitq,
1260 !obd_zombie_impexp_check(NULL), &lwi);
1261 obd_zombie_impexp_cull();
1264 * Notify obd_zombie_barrier callers that queues
1267 wake_up(&obd_zombie_waitq);
1270 complete(&obd_zombie_stop);
1276 * start destroy zombie import/export thread
1278 int obd_zombie_impexp_init(void)
1280 struct task_struct *task;
1282 INIT_LIST_HEAD(&obd_zombie_imports);
1283 INIT_LIST_HEAD(&obd_zombie_exports);
1284 spin_lock_init(&obd_zombie_impexp_lock);
1285 init_completion(&obd_zombie_start);
1286 init_completion(&obd_zombie_stop);
1287 init_waitqueue_head(&obd_zombie_waitq);
1290 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1292 return PTR_ERR(task);
1294 wait_for_completion(&obd_zombie_start);
1299 * stop destroy zombie import/export thread
1301 void obd_zombie_impexp_stop(void)
1303 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1304 obd_zombie_impexp_notify();
1305 wait_for_completion(&obd_zombie_stop);
1308 struct obd_request_slot_waiter {
1309 struct list_head orsw_entry;
1310 wait_queue_head_t orsw_waitq;
1314 static bool obd_request_slot_avail(struct client_obd *cli,
1315 struct obd_request_slot_waiter *orsw)
1319 spin_lock(&cli->cl_loi_list_lock);
1320 avail = !!list_empty(&orsw->orsw_entry);
1321 spin_unlock(&cli->cl_loi_list_lock);
1327 * For network flow control, the RPC sponsor needs to acquire a credit
1328 * before sending the RPC. The credits count for a connection is defined
1329 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1330 * the subsequent RPC sponsors need to wait until others released their
1331 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1333 int obd_get_request_slot(struct client_obd *cli)
1335 struct obd_request_slot_waiter orsw;
1336 struct l_wait_info lwi;
1339 spin_lock(&cli->cl_loi_list_lock);
1340 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1341 cli->cl_r_in_flight++;
1342 spin_unlock(&cli->cl_loi_list_lock);
1346 init_waitqueue_head(&orsw.orsw_waitq);
1347 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1348 orsw.orsw_signaled = false;
1349 spin_unlock(&cli->cl_loi_list_lock);
1351 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1352 rc = l_wait_event(orsw.orsw_waitq,
1353 obd_request_slot_avail(cli, &orsw) ||
1358 * Here, we must take the lock to avoid the on-stack 'orsw' to be
1359 * freed but other (such as obd_put_request_slot) is using it.
1361 spin_lock(&cli->cl_loi_list_lock);
1363 if (!orsw.orsw_signaled) {
1364 if (list_empty(&orsw.orsw_entry))
1365 cli->cl_r_in_flight--;
1367 list_del(&orsw.orsw_entry);
1371 if (orsw.orsw_signaled) {
1372 LASSERT(list_empty(&orsw.orsw_entry));
1376 spin_unlock(&cli->cl_loi_list_lock);
1380 EXPORT_SYMBOL(obd_get_request_slot);
1382 void obd_put_request_slot(struct client_obd *cli)
1384 struct obd_request_slot_waiter *orsw;
1386 spin_lock(&cli->cl_loi_list_lock);
1387 cli->cl_r_in_flight--;
1389 /* If there is free slot, wakeup the first waiter. */
1390 if (!list_empty(&cli->cl_loi_read_list) &&
1391 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1392 orsw = list_entry(cli->cl_loi_read_list.next,
1393 struct obd_request_slot_waiter, orsw_entry);
1394 list_del_init(&orsw->orsw_entry);
1395 cli->cl_r_in_flight++;
1396 wake_up(&orsw->orsw_waitq);
1398 spin_unlock(&cli->cl_loi_list_lock);
1400 EXPORT_SYMBOL(obd_put_request_slot);
1402 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1404 return cli->cl_max_rpcs_in_flight;
1406 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1408 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1410 struct obd_request_slot_waiter *orsw;
1415 if (max > OBD_MAX_RIF_MAX || max < 1)
1418 spin_lock(&cli->cl_loi_list_lock);
1419 old = cli->cl_max_rpcs_in_flight;
1420 cli->cl_max_rpcs_in_flight = max;
1423 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1424 for (i = 0; i < diff; i++) {
1425 if (list_empty(&cli->cl_loi_read_list))
1428 orsw = list_entry(cli->cl_loi_read_list.next,
1429 struct obd_request_slot_waiter, orsw_entry);
1430 list_del_init(&orsw->orsw_entry);
1431 cli->cl_r_in_flight++;
1432 wake_up(&orsw->orsw_waitq);
1434 spin_unlock(&cli->cl_loi_list_lock);
1438 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);