GNU Linux-libre 4.9.309-gnu1
[releases.git] / fs / ocfs2 / cluster / heartbeat.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36 #include <linux/debugfs.h>
37 #include <linux/slab.h>
38 #include <linux/bitmap.h>
39 #include <linux/ktime.h>
40 #include "heartbeat.h"
41 #include "tcp.h"
42 #include "nodemanager.h"
43 #include "quorum.h"
44
45 #include "masklog.h"
46
47
48 /*
49  * The first heartbeat pass had one global thread that would serialize all hb
50  * callback calls.  This global serializing sem should only be removed once
51  * we've made sure that all callees can deal with being called concurrently
52  * from multiple hb region threads.
53  */
54 static DECLARE_RWSEM(o2hb_callback_sem);
55
56 /*
57  * multiple hb threads are watching multiple regions.  A node is live
58  * whenever any of the threads sees activity from the node in its region.
59  */
60 static DEFINE_SPINLOCK(o2hb_live_lock);
61 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
62 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
63 static LIST_HEAD(o2hb_node_events);
64 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
65
66 /*
67  * In global heartbeat, we maintain a series of region bitmaps.
68  *      - o2hb_region_bitmap allows us to limit the region number to max region.
69  *      - o2hb_live_region_bitmap tracks live regions (seen steady iterations).
70  *      - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
71  *              heartbeat on it.
72  *      - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
73  */
74 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
75 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
76 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
77 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
78
79 #define O2HB_DB_TYPE_LIVENODES          0
80 #define O2HB_DB_TYPE_LIVEREGIONS        1
81 #define O2HB_DB_TYPE_QUORUMREGIONS      2
82 #define O2HB_DB_TYPE_FAILEDREGIONS      3
83 #define O2HB_DB_TYPE_REGION_LIVENODES   4
84 #define O2HB_DB_TYPE_REGION_NUMBER      5
85 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME        6
86 #define O2HB_DB_TYPE_REGION_PINNED      7
87 struct o2hb_debug_buf {
88         int db_type;
89         int db_size;
90         int db_len;
91         void *db_data;
92 };
93
94 static struct o2hb_debug_buf *o2hb_db_livenodes;
95 static struct o2hb_debug_buf *o2hb_db_liveregions;
96 static struct o2hb_debug_buf *o2hb_db_quorumregions;
97 static struct o2hb_debug_buf *o2hb_db_failedregions;
98
99 #define O2HB_DEBUG_DIR                  "o2hb"
100 #define O2HB_DEBUG_LIVENODES            "livenodes"
101 #define O2HB_DEBUG_LIVEREGIONS          "live_regions"
102 #define O2HB_DEBUG_QUORUMREGIONS        "quorum_regions"
103 #define O2HB_DEBUG_FAILEDREGIONS        "failed_regions"
104 #define O2HB_DEBUG_REGION_NUMBER        "num"
105 #define O2HB_DEBUG_REGION_ELAPSED_TIME  "elapsed_time_in_ms"
106 #define O2HB_DEBUG_REGION_PINNED        "pinned"
107
108 static struct dentry *o2hb_debug_dir;
109 static struct dentry *o2hb_debug_livenodes;
110 static struct dentry *o2hb_debug_liveregions;
111 static struct dentry *o2hb_debug_quorumregions;
112 static struct dentry *o2hb_debug_failedregions;
113
114 static LIST_HEAD(o2hb_all_regions);
115
116 static struct o2hb_callback {
117         struct list_head list;
118 } o2hb_callbacks[O2HB_NUM_CB];
119
120 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
121
122 #define O2HB_DEFAULT_BLOCK_BITS       9
123
124 enum o2hb_heartbeat_modes {
125         O2HB_HEARTBEAT_LOCAL            = 0,
126         O2HB_HEARTBEAT_GLOBAL,
127         O2HB_HEARTBEAT_NUM_MODES,
128 };
129
130 char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
131                 "local",        /* O2HB_HEARTBEAT_LOCAL */
132                 "global",       /* O2HB_HEARTBEAT_GLOBAL */
133 };
134
135 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
136 unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
137
138 /*
139  * o2hb_dependent_users tracks the number of registered callbacks that depend
140  * on heartbeat. o2net and o2dlm are two entities that register this callback.
141  * However only o2dlm depends on the heartbeat. It does not want the heartbeat
142  * to stop while a dlm domain is still active.
143  */
144 unsigned int o2hb_dependent_users;
145
146 /*
147  * In global heartbeat mode, all regions are pinned if there are one or more
148  * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
149  * regions are unpinned if the region count exceeds the cut off or the number
150  * of dependent users falls to zero.
151  */
152 #define O2HB_PIN_CUT_OFF                3
153
154 /*
155  * In local heartbeat mode, we assume the dlm domain name to be the same as
156  * region uuid. This is true for domains created for the file system but not
157  * necessarily true for userdlm domains. This is a known limitation.
158  *
159  * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
160  * works for both file system and userdlm domains.
161  */
162 static int o2hb_region_pin(const char *region_uuid);
163 static void o2hb_region_unpin(const char *region_uuid);
164
165 /* Only sets a new threshold if there are no active regions.
166  *
167  * No locking or otherwise interesting code is required for reading
168  * o2hb_dead_threshold as it can't change once regions are active and
169  * it's not interesting to anyone until then anyway. */
170 static void o2hb_dead_threshold_set(unsigned int threshold)
171 {
172         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
173                 spin_lock(&o2hb_live_lock);
174                 if (list_empty(&o2hb_all_regions))
175                         o2hb_dead_threshold = threshold;
176                 spin_unlock(&o2hb_live_lock);
177         }
178 }
179
180 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
181 {
182         int ret = -1;
183
184         if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
185                 spin_lock(&o2hb_live_lock);
186                 if (list_empty(&o2hb_all_regions)) {
187                         o2hb_heartbeat_mode = hb_mode;
188                         ret = 0;
189                 }
190                 spin_unlock(&o2hb_live_lock);
191         }
192
193         return ret;
194 }
195
196 struct o2hb_node_event {
197         struct list_head        hn_item;
198         enum o2hb_callback_type hn_event_type;
199         struct o2nm_node        *hn_node;
200         int                     hn_node_num;
201 };
202
203 struct o2hb_disk_slot {
204         struct o2hb_disk_heartbeat_block *ds_raw_block;
205         u8                      ds_node_num;
206         u64                     ds_last_time;
207         u64                     ds_last_generation;
208         u16                     ds_equal_samples;
209         u16                     ds_changed_samples;
210         struct list_head        ds_live_item;
211 };
212
213 /* each thread owns a region.. when we're asked to tear down the region
214  * we ask the thread to stop, who cleans up the region */
215 struct o2hb_region {
216         struct config_item      hr_item;
217
218         struct list_head        hr_all_item;
219         unsigned                hr_unclean_stop:1,
220                                 hr_aborted_start:1,
221                                 hr_item_pinned:1,
222                                 hr_item_dropped:1,
223                                 hr_node_deleted:1;
224
225         /* protected by the hr_callback_sem */
226         struct task_struct      *hr_task;
227
228         unsigned int            hr_blocks;
229         unsigned long long      hr_start_block;
230
231         unsigned int            hr_block_bits;
232         unsigned int            hr_block_bytes;
233
234         unsigned int            hr_slots_per_page;
235         unsigned int            hr_num_pages;
236
237         struct page             **hr_slot_data;
238         struct block_device     *hr_bdev;
239         struct o2hb_disk_slot   *hr_slots;
240
241         /* live node map of this region */
242         unsigned long           hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
243         unsigned int            hr_region_num;
244
245         struct dentry           *hr_debug_dir;
246         struct dentry           *hr_debug_livenodes;
247         struct dentry           *hr_debug_regnum;
248         struct dentry           *hr_debug_elapsed_time;
249         struct dentry           *hr_debug_pinned;
250         struct o2hb_debug_buf   *hr_db_livenodes;
251         struct o2hb_debug_buf   *hr_db_regnum;
252         struct o2hb_debug_buf   *hr_db_elapsed_time;
253         struct o2hb_debug_buf   *hr_db_pinned;
254
255         /* let the person setting up hb wait for it to return until it
256          * has reached a 'steady' state.  This will be fixed when we have
257          * a more complete api that doesn't lead to this sort of fragility. */
258         atomic_t                hr_steady_iterations;
259
260         /* terminate o2hb thread if it does not reach steady state
261          * (hr_steady_iterations == 0) within hr_unsteady_iterations */
262         atomic_t                hr_unsteady_iterations;
263
264         char                    hr_dev_name[BDEVNAME_SIZE];
265
266         unsigned int            hr_timeout_ms;
267
268         /* randomized as the region goes up and down so that a node
269          * recognizes a node going up and down in one iteration */
270         u64                     hr_generation;
271
272         struct delayed_work     hr_write_timeout_work;
273         unsigned long           hr_last_timeout_start;
274
275         /* negotiate timer, used to negotiate extending hb timeout. */
276         struct delayed_work     hr_nego_timeout_work;
277         unsigned long           hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
278
279         /* Used during o2hb_check_slot to hold a copy of the block
280          * being checked because we temporarily have to zero out the
281          * crc field. */
282         struct o2hb_disk_heartbeat_block *hr_tmp_block;
283
284         /* Message key for negotiate timeout message. */
285         unsigned int            hr_key;
286         struct list_head        hr_handler_list;
287
288         /* last hb status, 0 for success, other value for error. */
289         int                     hr_last_hb_status;
290 };
291
292 struct o2hb_bio_wait_ctxt {
293         atomic_t          wc_num_reqs;
294         struct completion wc_io_complete;
295         int               wc_error;
296 };
297
298 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
299
300 enum {
301         O2HB_NEGO_TIMEOUT_MSG = 1,
302         O2HB_NEGO_APPROVE_MSG = 2,
303 };
304
305 struct o2hb_nego_msg {
306         u8 node_num;
307 };
308
309 static void o2hb_write_timeout(struct work_struct *work)
310 {
311         int failed, quorum;
312         struct o2hb_region *reg =
313                 container_of(work, struct o2hb_region,
314                              hr_write_timeout_work.work);
315
316         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
317              "milliseconds\n", reg->hr_dev_name,
318              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
319
320         if (o2hb_global_heartbeat_active()) {
321                 spin_lock(&o2hb_live_lock);
322                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
323                         set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
324                 failed = bitmap_weight(o2hb_failed_region_bitmap,
325                                         O2NM_MAX_REGIONS);
326                 quorum = bitmap_weight(o2hb_quorum_region_bitmap,
327                                         O2NM_MAX_REGIONS);
328                 spin_unlock(&o2hb_live_lock);
329
330                 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
331                      quorum, failed);
332
333                 /*
334                  * Fence if the number of failed regions >= half the number
335                  * of  quorum regions
336                  */
337                 if ((failed << 1) < quorum)
338                         return;
339         }
340
341         o2quo_disk_timeout();
342 }
343
344 static void o2hb_arm_timeout(struct o2hb_region *reg)
345 {
346         /* Arm writeout only after thread reaches steady state */
347         if (atomic_read(&reg->hr_steady_iterations) != 0)
348                 return;
349
350         mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
351              O2HB_MAX_WRITE_TIMEOUT_MS);
352
353         if (o2hb_global_heartbeat_active()) {
354                 spin_lock(&o2hb_live_lock);
355                 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
356                 spin_unlock(&o2hb_live_lock);
357         }
358         cancel_delayed_work(&reg->hr_write_timeout_work);
359         schedule_delayed_work(&reg->hr_write_timeout_work,
360                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
361
362         cancel_delayed_work(&reg->hr_nego_timeout_work);
363         /* negotiate timeout must be less than write timeout. */
364         schedule_delayed_work(&reg->hr_nego_timeout_work,
365                               msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
366         memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
367 }
368
369 static void o2hb_disarm_timeout(struct o2hb_region *reg)
370 {
371         cancel_delayed_work_sync(&reg->hr_write_timeout_work);
372         cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
373 }
374
375 static int o2hb_send_nego_msg(int key, int type, u8 target)
376 {
377         struct o2hb_nego_msg msg;
378         int status, ret;
379
380         msg.node_num = o2nm_this_node();
381 again:
382         ret = o2net_send_message(type, key, &msg, sizeof(msg),
383                         target, &status);
384
385         if (ret == -EAGAIN || ret == -ENOMEM) {
386                 msleep(100);
387                 goto again;
388         }
389
390         return ret;
391 }
392
393 static void o2hb_nego_timeout(struct work_struct *work)
394 {
395         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
396         int master_node, i, ret;
397         struct o2hb_region *reg;
398
399         reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
400         /* don't negotiate timeout if last hb failed since it is very
401          * possible io failed. Should let write timeout fence self.
402          */
403         if (reg->hr_last_hb_status)
404                 return;
405
406         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
407         /* lowest node as master node to make negotiate decision. */
408         master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
409
410         if (master_node == o2nm_this_node()) {
411                 if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
412                         printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
413                                 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
414                                 config_item_name(&reg->hr_item), reg->hr_dev_name);
415                         set_bit(master_node, reg->hr_nego_node_bitmap);
416                 }
417                 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
418                                 sizeof(reg->hr_nego_node_bitmap))) {
419                         /* check negotiate bitmap every second to do timeout
420                          * approve decision.
421                          */
422                         schedule_delayed_work(&reg->hr_nego_timeout_work,
423                                 msecs_to_jiffies(1000));
424
425                         return;
426                 }
427
428                 printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
429                         config_item_name(&reg->hr_item), reg->hr_dev_name);
430                 /* approve negotiate timeout request. */
431                 o2hb_arm_timeout(reg);
432
433                 i = -1;
434                 while ((i = find_next_bit(live_node_bitmap,
435                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
436                         if (i == master_node)
437                                 continue;
438
439                         mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
440                         ret = o2hb_send_nego_msg(reg->hr_key,
441                                         O2HB_NEGO_APPROVE_MSG, i);
442                         if (ret)
443                                 mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
444                                         i, ret);
445                 }
446         } else {
447                 /* negotiate timeout with master node. */
448                 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
449                         o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
450                         reg->hr_dev_name, master_node);
451                 ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
452                                 master_node);
453                 if (ret)
454                         mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
455                                 master_node, ret);
456         }
457 }
458
459 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
460                                 void **ret_data)
461 {
462         struct o2hb_region *reg = data;
463         struct o2hb_nego_msg *nego_msg;
464
465         nego_msg = (struct o2hb_nego_msg *)msg->buf;
466         printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
467                 nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
468         if (nego_msg->node_num < O2NM_MAX_NODES)
469                 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
470         else
471                 mlog(ML_ERROR, "got nego timeout message from bad node.\n");
472
473         return 0;
474 }
475
476 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
477                                 void **ret_data)
478 {
479         struct o2hb_region *reg = data;
480
481         printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
482                 config_item_name(&reg->hr_item), reg->hr_dev_name);
483         o2hb_arm_timeout(reg);
484         return 0;
485 }
486
487 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
488 {
489         atomic_set(&wc->wc_num_reqs, 1);
490         init_completion(&wc->wc_io_complete);
491         wc->wc_error = 0;
492 }
493
494 /* Used in error paths too */
495 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
496                                      unsigned int num)
497 {
498         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
499          * good news is that the fast path only completes one at a time */
500         while(num--) {
501                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
502                         BUG_ON(num > 0);
503                         complete(&wc->wc_io_complete);
504                 }
505         }
506 }
507
508 static void o2hb_wait_on_io(struct o2hb_region *reg,
509                             struct o2hb_bio_wait_ctxt *wc)
510 {
511         o2hb_bio_wait_dec(wc, 1);
512         wait_for_completion(&wc->wc_io_complete);
513 }
514
515 static void o2hb_bio_end_io(struct bio *bio)
516 {
517         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
518
519         if (bio->bi_error) {
520                 mlog(ML_ERROR, "IO Error %d\n", bio->bi_error);
521                 wc->wc_error = bio->bi_error;
522         }
523
524         o2hb_bio_wait_dec(wc, 1);
525         bio_put(bio);
526 }
527
528 /* Setup a Bio to cover I/O against num_slots slots starting at
529  * start_slot. */
530 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
531                                       struct o2hb_bio_wait_ctxt *wc,
532                                       unsigned int *current_slot,
533                                       unsigned int max_slots, int op,
534                                       int op_flags)
535 {
536         int len, current_page;
537         unsigned int vec_len, vec_start;
538         unsigned int bits = reg->hr_block_bits;
539         unsigned int spp = reg->hr_slots_per_page;
540         unsigned int cs = *current_slot;
541         struct bio *bio;
542         struct page *page;
543
544         /* Testing has shown this allocation to take long enough under
545          * GFP_KERNEL that the local node can get fenced. It would be
546          * nicest if we could pre-allocate these bios and avoid this
547          * all together. */
548         bio = bio_alloc(GFP_ATOMIC, 16);
549         if (!bio) {
550                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
551                 bio = ERR_PTR(-ENOMEM);
552                 goto bail;
553         }
554
555         /* Must put everything in 512 byte sectors for the bio... */
556         bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
557         bio->bi_bdev = reg->hr_bdev;
558         bio->bi_private = wc;
559         bio->bi_end_io = o2hb_bio_end_io;
560         bio_set_op_attrs(bio, op, op_flags);
561
562         vec_start = (cs << bits) % PAGE_SIZE;
563         while(cs < max_slots) {
564                 current_page = cs / spp;
565                 page = reg->hr_slot_data[current_page];
566
567                 vec_len = min(PAGE_SIZE - vec_start,
568                               (max_slots-cs) * (PAGE_SIZE/spp) );
569
570                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
571                      current_page, vec_len, vec_start);
572
573                 len = bio_add_page(bio, page, vec_len, vec_start);
574                 if (len != vec_len) break;
575
576                 cs += vec_len / (PAGE_SIZE/spp);
577                 vec_start = 0;
578         }
579
580 bail:
581         *current_slot = cs;
582         return bio;
583 }
584
585 static int o2hb_read_slots(struct o2hb_region *reg,
586                            unsigned int max_slots)
587 {
588         unsigned int current_slot=0;
589         int status;
590         struct o2hb_bio_wait_ctxt wc;
591         struct bio *bio;
592
593         o2hb_bio_wait_init(&wc);
594
595         while(current_slot < max_slots) {
596                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots,
597                                          REQ_OP_READ, 0);
598                 if (IS_ERR(bio)) {
599                         status = PTR_ERR(bio);
600                         mlog_errno(status);
601                         goto bail_and_wait;
602                 }
603
604                 atomic_inc(&wc.wc_num_reqs);
605                 submit_bio(bio);
606         }
607
608         status = 0;
609
610 bail_and_wait:
611         o2hb_wait_on_io(reg, &wc);
612         if (wc.wc_error && !status)
613                 status = wc.wc_error;
614
615         return status;
616 }
617
618 static int o2hb_issue_node_write(struct o2hb_region *reg,
619                                  struct o2hb_bio_wait_ctxt *write_wc)
620 {
621         int status;
622         unsigned int slot;
623         struct bio *bio;
624
625         o2hb_bio_wait_init(write_wc);
626
627         slot = o2nm_this_node();
628
629         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE,
630                                  WRITE_SYNC);
631         if (IS_ERR(bio)) {
632                 status = PTR_ERR(bio);
633                 mlog_errno(status);
634                 goto bail;
635         }
636
637         atomic_inc(&write_wc->wc_num_reqs);
638         submit_bio(bio);
639
640         status = 0;
641 bail:
642         return status;
643 }
644
645 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
646                                      struct o2hb_disk_heartbeat_block *hb_block)
647 {
648         __le32 old_cksum;
649         u32 ret;
650
651         /* We want to compute the block crc with a 0 value in the
652          * hb_cksum field. Save it off here and replace after the
653          * crc. */
654         old_cksum = hb_block->hb_cksum;
655         hb_block->hb_cksum = 0;
656
657         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
658
659         hb_block->hb_cksum = old_cksum;
660
661         return ret;
662 }
663
664 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
665 {
666         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
667              "cksum = 0x%x, generation 0x%llx\n",
668              (long long)le64_to_cpu(hb_block->hb_seq),
669              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
670              (long long)le64_to_cpu(hb_block->hb_generation));
671 }
672
673 static int o2hb_verify_crc(struct o2hb_region *reg,
674                            struct o2hb_disk_heartbeat_block *hb_block)
675 {
676         u32 read, computed;
677
678         read = le32_to_cpu(hb_block->hb_cksum);
679         computed = o2hb_compute_block_crc_le(reg, hb_block);
680
681         return read == computed;
682 }
683
684 /*
685  * Compare the slot data with what we wrote in the last iteration.
686  * If the match fails, print an appropriate error message. This is to
687  * detect errors like... another node hearting on the same slot,
688  * flaky device that is losing writes, etc.
689  * Returns 1 if check succeeds, 0 otherwise.
690  */
691 static int o2hb_check_own_slot(struct o2hb_region *reg)
692 {
693         struct o2hb_disk_slot *slot;
694         struct o2hb_disk_heartbeat_block *hb_block;
695         char *errstr;
696
697         slot = &reg->hr_slots[o2nm_this_node()];
698         /* Don't check on our 1st timestamp */
699         if (!slot->ds_last_time)
700                 return 0;
701
702         hb_block = slot->ds_raw_block;
703         if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
704             le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
705             hb_block->hb_node == slot->ds_node_num)
706                 return 1;
707
708 #define ERRSTR1         "Another node is heartbeating on device"
709 #define ERRSTR2         "Heartbeat generation mismatch on device"
710 #define ERRSTR3         "Heartbeat sequence mismatch on device"
711
712         if (hb_block->hb_node != slot->ds_node_num)
713                 errstr = ERRSTR1;
714         else if (le64_to_cpu(hb_block->hb_generation) !=
715                  slot->ds_last_generation)
716                 errstr = ERRSTR2;
717         else
718                 errstr = ERRSTR3;
719
720         mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
721              "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
722              slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
723              (unsigned long long)slot->ds_last_time, hb_block->hb_node,
724              (unsigned long long)le64_to_cpu(hb_block->hb_generation),
725              (unsigned long long)le64_to_cpu(hb_block->hb_seq));
726
727         return 0;
728 }
729
730 static inline void o2hb_prepare_block(struct o2hb_region *reg,
731                                       u64 generation)
732 {
733         int node_num;
734         u64 cputime;
735         struct o2hb_disk_slot *slot;
736         struct o2hb_disk_heartbeat_block *hb_block;
737
738         node_num = o2nm_this_node();
739         slot = &reg->hr_slots[node_num];
740
741         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
742         memset(hb_block, 0, reg->hr_block_bytes);
743         /* TODO: time stuff */
744         cputime = CURRENT_TIME.tv_sec;
745         if (!cputime)
746                 cputime = 1;
747
748         hb_block->hb_seq = cpu_to_le64(cputime);
749         hb_block->hb_node = node_num;
750         hb_block->hb_generation = cpu_to_le64(generation);
751         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
752
753         /* This step must always happen last! */
754         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
755                                                                    hb_block));
756
757         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
758              (long long)generation,
759              le32_to_cpu(hb_block->hb_cksum));
760 }
761
762 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
763                                 struct o2nm_node *node,
764                                 int idx)
765 {
766         struct o2hb_callback_func *f;
767
768         list_for_each_entry(f, &hbcall->list, hc_item) {
769                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
770                 (f->hc_func)(node, idx, f->hc_data);
771         }
772 }
773
774 /* Will run the list in order until we process the passed event */
775 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
776 {
777         struct o2hb_callback *hbcall;
778         struct o2hb_node_event *event;
779
780         /* Holding callback sem assures we don't alter the callback
781          * lists when doing this, and serializes ourselves with other
782          * processes wanting callbacks. */
783         down_write(&o2hb_callback_sem);
784
785         spin_lock(&o2hb_live_lock);
786         while (!list_empty(&o2hb_node_events)
787                && !list_empty(&queued_event->hn_item)) {
788                 event = list_entry(o2hb_node_events.next,
789                                    struct o2hb_node_event,
790                                    hn_item);
791                 list_del_init(&event->hn_item);
792                 spin_unlock(&o2hb_live_lock);
793
794                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
795                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
796                      event->hn_node_num);
797
798                 hbcall = hbcall_from_type(event->hn_event_type);
799
800                 /* We should *never* have gotten on to the list with a
801                  * bad type... This isn't something that we should try
802                  * to recover from. */
803                 BUG_ON(IS_ERR(hbcall));
804
805                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
806
807                 spin_lock(&o2hb_live_lock);
808         }
809         spin_unlock(&o2hb_live_lock);
810
811         up_write(&o2hb_callback_sem);
812 }
813
814 static void o2hb_queue_node_event(struct o2hb_node_event *event,
815                                   enum o2hb_callback_type type,
816                                   struct o2nm_node *node,
817                                   int node_num)
818 {
819         assert_spin_locked(&o2hb_live_lock);
820
821         BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
822
823         event->hn_event_type = type;
824         event->hn_node = node;
825         event->hn_node_num = node_num;
826
827         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
828              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
829
830         list_add_tail(&event->hn_item, &o2hb_node_events);
831 }
832
833 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
834 {
835         struct o2hb_node_event event =
836                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
837         struct o2nm_node *node;
838         int queued = 0;
839
840         node = o2nm_get_node_by_num(slot->ds_node_num);
841         if (!node)
842                 return;
843
844         spin_lock(&o2hb_live_lock);
845         if (!list_empty(&slot->ds_live_item)) {
846                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
847                      slot->ds_node_num);
848
849                 list_del_init(&slot->ds_live_item);
850
851                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
852                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
853
854                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
855                                               slot->ds_node_num);
856                         queued = 1;
857                 }
858         }
859         spin_unlock(&o2hb_live_lock);
860
861         if (queued)
862                 o2hb_run_event_list(&event);
863
864         o2nm_node_put(node);
865 }
866
867 static void o2hb_set_quorum_device(struct o2hb_region *reg)
868 {
869         if (!o2hb_global_heartbeat_active())
870                 return;
871
872         /* Prevent race with o2hb_heartbeat_group_drop_item() */
873         if (kthread_should_stop())
874                 return;
875
876         /* Tag region as quorum only after thread reaches steady state */
877         if (atomic_read(&reg->hr_steady_iterations) != 0)
878                 return;
879
880         spin_lock(&o2hb_live_lock);
881
882         if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
883                 goto unlock;
884
885         /*
886          * A region can be added to the quorum only when it sees all
887          * live nodes heartbeat on it. In other words, the region has been
888          * added to all nodes.
889          */
890         if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
891                    sizeof(o2hb_live_node_bitmap)))
892                 goto unlock;
893
894         printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
895                config_item_name(&reg->hr_item), reg->hr_dev_name);
896
897         set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
898
899         /*
900          * If global heartbeat active, unpin all regions if the
901          * region count > CUT_OFF
902          */
903         if (bitmap_weight(o2hb_quorum_region_bitmap,
904                            O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
905                 o2hb_region_unpin(NULL);
906 unlock:
907         spin_unlock(&o2hb_live_lock);
908 }
909
910 static int o2hb_check_slot(struct o2hb_region *reg,
911                            struct o2hb_disk_slot *slot)
912 {
913         int changed = 0, gen_changed = 0;
914         struct o2hb_node_event event =
915                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
916         struct o2nm_node *node;
917         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
918         u64 cputime;
919         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
920         unsigned int slot_dead_ms;
921         int tmp;
922         int queued = 0;
923
924         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
925
926         /*
927          * If a node is no longer configured but is still in the livemap, we
928          * may need to clear that bit from the livemap.
929          */
930         node = o2nm_get_node_by_num(slot->ds_node_num);
931         if (!node) {
932                 spin_lock(&o2hb_live_lock);
933                 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
934                 spin_unlock(&o2hb_live_lock);
935                 if (!tmp)
936                         return 0;
937         }
938
939         if (!o2hb_verify_crc(reg, hb_block)) {
940                 /* all paths from here will drop o2hb_live_lock for
941                  * us. */
942                 spin_lock(&o2hb_live_lock);
943
944                 /* Don't print an error on the console in this case -
945                  * a freshly formatted heartbeat area will not have a
946                  * crc set on it. */
947                 if (list_empty(&slot->ds_live_item))
948                         goto out;
949
950                 /* The node is live but pushed out a bad crc. We
951                  * consider it a transient miss but don't populate any
952                  * other values as they may be junk. */
953                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
954                      slot->ds_node_num, reg->hr_dev_name);
955                 o2hb_dump_slot(hb_block);
956
957                 slot->ds_equal_samples++;
958                 goto fire_callbacks;
959         }
960
961         /* we don't care if these wrap.. the state transitions below
962          * clear at the right places */
963         cputime = le64_to_cpu(hb_block->hb_seq);
964         if (slot->ds_last_time != cputime)
965                 slot->ds_changed_samples++;
966         else
967                 slot->ds_equal_samples++;
968         slot->ds_last_time = cputime;
969
970         /* The node changed heartbeat generations. We assume this to
971          * mean it dropped off but came back before we timed out. We
972          * want to consider it down for the time being but don't want
973          * to lose any changed_samples state we might build up to
974          * considering it live again. */
975         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
976                 gen_changed = 1;
977                 slot->ds_equal_samples = 0;
978                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
979                      "to 0x%llx)\n", slot->ds_node_num,
980                      (long long)slot->ds_last_generation,
981                      (long long)le64_to_cpu(hb_block->hb_generation));
982         }
983
984         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
985
986         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
987              "seq %llu last %llu changed %u equal %u\n",
988              slot->ds_node_num, (long long)slot->ds_last_generation,
989              le32_to_cpu(hb_block->hb_cksum),
990              (unsigned long long)le64_to_cpu(hb_block->hb_seq),
991              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
992              slot->ds_equal_samples);
993
994         spin_lock(&o2hb_live_lock);
995
996 fire_callbacks:
997         /* dead nodes only come to life after some number of
998          * changes at any time during their dead time */
999         if (list_empty(&slot->ds_live_item) &&
1000             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
1001                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
1002                      slot->ds_node_num, (long long)slot->ds_last_generation);
1003
1004                 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1005
1006                 /* first on the list generates a callback */
1007                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1008                         mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
1009                              "bitmap\n", slot->ds_node_num);
1010                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1011
1012                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
1013                                               slot->ds_node_num);
1014
1015                         changed = 1;
1016                         queued = 1;
1017                 }
1018
1019                 list_add_tail(&slot->ds_live_item,
1020                               &o2hb_live_slots[slot->ds_node_num]);
1021
1022                 slot->ds_equal_samples = 0;
1023
1024                 /* We want to be sure that all nodes agree on the
1025                  * number of milliseconds before a node will be
1026                  * considered dead. The self-fencing timeout is
1027                  * computed from this value, and a discrepancy might
1028                  * result in heartbeat calling a node dead when it
1029                  * hasn't self-fenced yet. */
1030                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1031                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
1032                         /* TODO: Perhaps we can fail the region here. */
1033                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
1034                              "of %u ms, but our count is %u ms.\n"
1035                              "Please double check your configuration values "
1036                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1037                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
1038                              dead_ms);
1039                 }
1040                 goto out;
1041         }
1042
1043         /* if the list is dead, we're done.. */
1044         if (list_empty(&slot->ds_live_item))
1045                 goto out;
1046
1047         /* live nodes only go dead after enough consequtive missed
1048          * samples..  reset the missed counter whenever we see
1049          * activity */
1050         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1051                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
1052                      slot->ds_node_num);
1053
1054                 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1055
1056                 /* last off the live_slot generates a callback */
1057                 list_del_init(&slot->ds_live_item);
1058                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1059                         mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1060                              "nodes bitmap\n", slot->ds_node_num);
1061                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1062
1063                         /* node can be null */
1064                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1065                                               node, slot->ds_node_num);
1066
1067                         changed = 1;
1068                         queued = 1;
1069                 }
1070
1071                 /* We don't clear this because the node is still
1072                  * actually writing new blocks. */
1073                 if (!gen_changed)
1074                         slot->ds_changed_samples = 0;
1075                 goto out;
1076         }
1077         if (slot->ds_changed_samples) {
1078                 slot->ds_changed_samples = 0;
1079                 slot->ds_equal_samples = 0;
1080         }
1081 out:
1082         spin_unlock(&o2hb_live_lock);
1083
1084         if (queued)
1085                 o2hb_run_event_list(&event);
1086
1087         if (node)
1088                 o2nm_node_put(node);
1089         return changed;
1090 }
1091
1092 static int o2hb_highest_node(unsigned long *nodes, int numbits)
1093 {
1094         return find_last_bit(nodes, numbits);
1095 }
1096
1097 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1098 {
1099         int i, ret, highest_node;
1100         int membership_change = 0, own_slot_ok = 0;
1101         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1102         unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1103         struct o2hb_bio_wait_ctxt write_wc;
1104
1105         ret = o2nm_configured_node_map(configured_nodes,
1106                                        sizeof(configured_nodes));
1107         if (ret) {
1108                 mlog_errno(ret);
1109                 goto bail;
1110         }
1111
1112         /*
1113          * If a node is not configured but is in the livemap, we still need
1114          * to read the slot so as to be able to remove it from the livemap.
1115          */
1116         o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1117         i = -1;
1118         while ((i = find_next_bit(live_node_bitmap,
1119                                   O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1120                 set_bit(i, configured_nodes);
1121         }
1122
1123         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1124         if (highest_node >= O2NM_MAX_NODES) {
1125                 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1126                 ret = -EINVAL;
1127                 goto bail;
1128         }
1129
1130         /* No sense in reading the slots of nodes that don't exist
1131          * yet. Of course, if the node definitions have holes in them
1132          * then we're reading an empty slot anyway... Consider this
1133          * best-effort. */
1134         ret = o2hb_read_slots(reg, highest_node + 1);
1135         if (ret < 0) {
1136                 mlog_errno(ret);
1137                 goto bail;
1138         }
1139
1140         /* With an up to date view of the slots, we can check that no
1141          * other node has been improperly configured to heartbeat in
1142          * our slot. */
1143         own_slot_ok = o2hb_check_own_slot(reg);
1144
1145         /* fill in the proper info for our next heartbeat */
1146         o2hb_prepare_block(reg, reg->hr_generation);
1147
1148         ret = o2hb_issue_node_write(reg, &write_wc);
1149         if (ret < 0) {
1150                 mlog_errno(ret);
1151                 goto bail;
1152         }
1153
1154         i = -1;
1155         while((i = find_next_bit(configured_nodes,
1156                                  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1157                 membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1158         }
1159
1160         /*
1161          * We have to be sure we've advertised ourselves on disk
1162          * before we can go to steady state.  This ensures that
1163          * people we find in our steady state have seen us.
1164          */
1165         o2hb_wait_on_io(reg, &write_wc);
1166         if (write_wc.wc_error) {
1167                 /* Do not re-arm the write timeout on I/O error - we
1168                  * can't be sure that the new block ever made it to
1169                  * disk */
1170                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1171                      write_wc.wc_error, reg->hr_dev_name);
1172                 ret = write_wc.wc_error;
1173                 goto bail;
1174         }
1175
1176         /* Skip disarming the timeout if own slot has stale/bad data */
1177         if (own_slot_ok) {
1178                 o2hb_set_quorum_device(reg);
1179                 o2hb_arm_timeout(reg);
1180                 reg->hr_last_timeout_start = jiffies;
1181         }
1182
1183 bail:
1184         /* let the person who launched us know when things are steady */
1185         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1186                 if (!ret && own_slot_ok && !membership_change) {
1187                         if (atomic_dec_and_test(&reg->hr_steady_iterations))
1188                                 wake_up(&o2hb_steady_queue);
1189                 }
1190         }
1191
1192         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1193                 if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1194                         printk(KERN_NOTICE "o2hb: Unable to stabilize "
1195                                "heartbeart on region %s (%s)\n",
1196                                config_item_name(&reg->hr_item),
1197                                reg->hr_dev_name);
1198                         atomic_set(&reg->hr_steady_iterations, 0);
1199                         reg->hr_aborted_start = 1;
1200                         wake_up(&o2hb_steady_queue);
1201                         ret = -EIO;
1202                 }
1203         }
1204
1205         return ret;
1206 }
1207
1208 /*
1209  * we ride the region ref that the region dir holds.  before the region
1210  * dir is removed and drops it ref it will wait to tear down this
1211  * thread.
1212  */
1213 static int o2hb_thread(void *data)
1214 {
1215         int i, ret;
1216         struct o2hb_region *reg = data;
1217         struct o2hb_bio_wait_ctxt write_wc;
1218         ktime_t before_hb, after_hb;
1219         unsigned int elapsed_msec;
1220
1221         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1222
1223         set_user_nice(current, MIN_NICE);
1224
1225         /* Pin node */
1226         ret = o2nm_depend_this_node();
1227         if (ret) {
1228                 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1229                 reg->hr_node_deleted = 1;
1230                 wake_up(&o2hb_steady_queue);
1231                 return 0;
1232         }
1233
1234         while (!kthread_should_stop() &&
1235                !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1236                 /* We track the time spent inside
1237                  * o2hb_do_disk_heartbeat so that we avoid more than
1238                  * hr_timeout_ms between disk writes. On busy systems
1239                  * this should result in a heartbeat which is less
1240                  * likely to time itself out. */
1241                 before_hb = ktime_get_real();
1242
1243                 ret = o2hb_do_disk_heartbeat(reg);
1244                 reg->hr_last_hb_status = ret;
1245
1246                 after_hb = ktime_get_real();
1247
1248                 elapsed_msec = (unsigned int)
1249                                 ktime_ms_delta(after_hb, before_hb);
1250
1251                 mlog(ML_HEARTBEAT,
1252                      "start = %lld, end = %lld, msec = %u, ret = %d\n",
1253                      before_hb.tv64, after_hb.tv64, elapsed_msec, ret);
1254
1255                 if (!kthread_should_stop() &&
1256                     elapsed_msec < reg->hr_timeout_ms) {
1257                         /* the kthread api has blocked signals for us so no
1258                          * need to record the return value. */
1259                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1260                 }
1261         }
1262
1263         o2hb_disarm_timeout(reg);
1264
1265         /* unclean stop is only used in very bad situation */
1266         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1267                 o2hb_shutdown_slot(&reg->hr_slots[i]);
1268
1269         /* Explicit down notification - avoid forcing the other nodes
1270          * to timeout on this region when we could just as easily
1271          * write a clear generation - thus indicating to them that
1272          * this node has left this region.
1273          */
1274         if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1275                 o2hb_prepare_block(reg, 0);
1276                 ret = o2hb_issue_node_write(reg, &write_wc);
1277                 if (ret == 0)
1278                         o2hb_wait_on_io(reg, &write_wc);
1279                 else
1280                         mlog_errno(ret);
1281         }
1282
1283         /* Unpin node */
1284         o2nm_undepend_this_node();
1285
1286         mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1287
1288         return 0;
1289 }
1290
1291 #ifdef CONFIG_DEBUG_FS
1292 static int o2hb_debug_open(struct inode *inode, struct file *file)
1293 {
1294         struct o2hb_debug_buf *db = inode->i_private;
1295         struct o2hb_region *reg;
1296         unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1297         unsigned long lts;
1298         char *buf = NULL;
1299         int i = -1;
1300         int out = 0;
1301
1302         /* max_nodes should be the largest bitmap we pass here */
1303         BUG_ON(sizeof(map) < db->db_size);
1304
1305         buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1306         if (!buf)
1307                 goto bail;
1308
1309         switch (db->db_type) {
1310         case O2HB_DB_TYPE_LIVENODES:
1311         case O2HB_DB_TYPE_LIVEREGIONS:
1312         case O2HB_DB_TYPE_QUORUMREGIONS:
1313         case O2HB_DB_TYPE_FAILEDREGIONS:
1314                 spin_lock(&o2hb_live_lock);
1315                 memcpy(map, db->db_data, db->db_size);
1316                 spin_unlock(&o2hb_live_lock);
1317                 break;
1318
1319         case O2HB_DB_TYPE_REGION_LIVENODES:
1320                 spin_lock(&o2hb_live_lock);
1321                 reg = (struct o2hb_region *)db->db_data;
1322                 memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1323                 spin_unlock(&o2hb_live_lock);
1324                 break;
1325
1326         case O2HB_DB_TYPE_REGION_NUMBER:
1327                 reg = (struct o2hb_region *)db->db_data;
1328                 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1329                                 reg->hr_region_num);
1330                 goto done;
1331
1332         case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1333                 reg = (struct o2hb_region *)db->db_data;
1334                 lts = reg->hr_last_timeout_start;
1335                 /* If 0, it has never been set before */
1336                 if (lts)
1337                         lts = jiffies_to_msecs(jiffies - lts);
1338                 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1339                 goto done;
1340
1341         case O2HB_DB_TYPE_REGION_PINNED:
1342                 reg = (struct o2hb_region *)db->db_data;
1343                 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1344                                 !!reg->hr_item_pinned);
1345                 goto done;
1346
1347         default:
1348                 goto done;
1349         }
1350
1351         while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1352                 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1353         out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1354
1355 done:
1356         i_size_write(inode, out);
1357
1358         file->private_data = buf;
1359
1360         return 0;
1361 bail:
1362         return -ENOMEM;
1363 }
1364
1365 static int o2hb_debug_release(struct inode *inode, struct file *file)
1366 {
1367         kfree(file->private_data);
1368         return 0;
1369 }
1370
1371 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1372                                  size_t nbytes, loff_t *ppos)
1373 {
1374         return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1375                                        i_size_read(file->f_mapping->host));
1376 }
1377 #else
1378 static int o2hb_debug_open(struct inode *inode, struct file *file)
1379 {
1380         return 0;
1381 }
1382 static int o2hb_debug_release(struct inode *inode, struct file *file)
1383 {
1384         return 0;
1385 }
1386 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1387                                size_t nbytes, loff_t *ppos)
1388 {
1389         return 0;
1390 }
1391 #endif  /* CONFIG_DEBUG_FS */
1392
1393 static const struct file_operations o2hb_debug_fops = {
1394         .open =         o2hb_debug_open,
1395         .release =      o2hb_debug_release,
1396         .read =         o2hb_debug_read,
1397         .llseek =       generic_file_llseek,
1398 };
1399
1400 void o2hb_exit(void)
1401 {
1402         debugfs_remove(o2hb_debug_failedregions);
1403         debugfs_remove(o2hb_debug_quorumregions);
1404         debugfs_remove(o2hb_debug_liveregions);
1405         debugfs_remove(o2hb_debug_livenodes);
1406         debugfs_remove(o2hb_debug_dir);
1407         kfree(o2hb_db_livenodes);
1408         kfree(o2hb_db_liveregions);
1409         kfree(o2hb_db_quorumregions);
1410         kfree(o2hb_db_failedregions);
1411 }
1412
1413 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1414                                         struct o2hb_debug_buf **db, int db_len,
1415                                         int type, int size, int len, void *data)
1416 {
1417         *db = kmalloc(db_len, GFP_KERNEL);
1418         if (!*db)
1419                 return NULL;
1420
1421         (*db)->db_type = type;
1422         (*db)->db_size = size;
1423         (*db)->db_len = len;
1424         (*db)->db_data = data;
1425
1426         return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1427                                    &o2hb_debug_fops);
1428 }
1429
1430 static int o2hb_debug_init(void)
1431 {
1432         int ret = -ENOMEM;
1433
1434         o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1435         if (!o2hb_debug_dir) {
1436                 mlog_errno(ret);
1437                 goto bail;
1438         }
1439
1440         o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1441                                                  o2hb_debug_dir,
1442                                                  &o2hb_db_livenodes,
1443                                                  sizeof(*o2hb_db_livenodes),
1444                                                  O2HB_DB_TYPE_LIVENODES,
1445                                                  sizeof(o2hb_live_node_bitmap),
1446                                                  O2NM_MAX_NODES,
1447                                                  o2hb_live_node_bitmap);
1448         if (!o2hb_debug_livenodes) {
1449                 mlog_errno(ret);
1450                 goto bail;
1451         }
1452
1453         o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1454                                                    o2hb_debug_dir,
1455                                                    &o2hb_db_liveregions,
1456                                                    sizeof(*o2hb_db_liveregions),
1457                                                    O2HB_DB_TYPE_LIVEREGIONS,
1458                                                    sizeof(o2hb_live_region_bitmap),
1459                                                    O2NM_MAX_REGIONS,
1460                                                    o2hb_live_region_bitmap);
1461         if (!o2hb_debug_liveregions) {
1462                 mlog_errno(ret);
1463                 goto bail;
1464         }
1465
1466         o2hb_debug_quorumregions =
1467                         o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1468                                           o2hb_debug_dir,
1469                                           &o2hb_db_quorumregions,
1470                                           sizeof(*o2hb_db_quorumregions),
1471                                           O2HB_DB_TYPE_QUORUMREGIONS,
1472                                           sizeof(o2hb_quorum_region_bitmap),
1473                                           O2NM_MAX_REGIONS,
1474                                           o2hb_quorum_region_bitmap);
1475         if (!o2hb_debug_quorumregions) {
1476                 mlog_errno(ret);
1477                 goto bail;
1478         }
1479
1480         o2hb_debug_failedregions =
1481                         o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1482                                           o2hb_debug_dir,
1483                                           &o2hb_db_failedregions,
1484                                           sizeof(*o2hb_db_failedregions),
1485                                           O2HB_DB_TYPE_FAILEDREGIONS,
1486                                           sizeof(o2hb_failed_region_bitmap),
1487                                           O2NM_MAX_REGIONS,
1488                                           o2hb_failed_region_bitmap);
1489         if (!o2hb_debug_failedregions) {
1490                 mlog_errno(ret);
1491                 goto bail;
1492         }
1493
1494         ret = 0;
1495 bail:
1496         if (ret)
1497                 o2hb_exit();
1498
1499         return ret;
1500 }
1501
1502 int o2hb_init(void)
1503 {
1504         int i;
1505
1506         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1507                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1508
1509         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1510                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
1511
1512         INIT_LIST_HEAD(&o2hb_node_events);
1513
1514         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1515         memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1516         memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1517         memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1518         memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1519
1520         o2hb_dependent_users = 0;
1521
1522         return o2hb_debug_init();
1523 }
1524
1525 /* if we're already in a callback then we're already serialized by the sem */
1526 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1527                                              unsigned bytes)
1528 {
1529         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1530
1531         memcpy(map, &o2hb_live_node_bitmap, bytes);
1532 }
1533
1534 /*
1535  * get a map of all nodes that are heartbeating in any regions
1536  */
1537 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1538 {
1539         /* callers want to serialize this map and callbacks so that they
1540          * can trust that they don't miss nodes coming to the party */
1541         down_read(&o2hb_callback_sem);
1542         spin_lock(&o2hb_live_lock);
1543         o2hb_fill_node_map_from_callback(map, bytes);
1544         spin_unlock(&o2hb_live_lock);
1545         up_read(&o2hb_callback_sem);
1546 }
1547 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1548
1549 /*
1550  * heartbeat configfs bits.  The heartbeat set is a default set under
1551  * the cluster set in nodemanager.c.
1552  */
1553
1554 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1555 {
1556         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1557 }
1558
1559 /* drop_item only drops its ref after killing the thread, nothing should
1560  * be using the region anymore.  this has to clean up any state that
1561  * attributes might have built up. */
1562 static void o2hb_region_release(struct config_item *item)
1563 {
1564         int i;
1565         struct page *page;
1566         struct o2hb_region *reg = to_o2hb_region(item);
1567
1568         mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1569
1570         kfree(reg->hr_tmp_block);
1571
1572         if (reg->hr_slot_data) {
1573                 for (i = 0; i < reg->hr_num_pages; i++) {
1574                         page = reg->hr_slot_data[i];
1575                         if (page)
1576                                 __free_page(page);
1577                 }
1578                 kfree(reg->hr_slot_data);
1579         }
1580
1581         if (reg->hr_bdev)
1582                 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1583
1584         kfree(reg->hr_slots);
1585
1586         debugfs_remove(reg->hr_debug_livenodes);
1587         debugfs_remove(reg->hr_debug_regnum);
1588         debugfs_remove(reg->hr_debug_elapsed_time);
1589         debugfs_remove(reg->hr_debug_pinned);
1590         debugfs_remove(reg->hr_debug_dir);
1591         kfree(reg->hr_db_livenodes);
1592         kfree(reg->hr_db_regnum);
1593         kfree(reg->hr_db_elapsed_time);
1594         kfree(reg->hr_db_pinned);
1595
1596         spin_lock(&o2hb_live_lock);
1597         list_del(&reg->hr_all_item);
1598         spin_unlock(&o2hb_live_lock);
1599
1600         o2net_unregister_handler_list(&reg->hr_handler_list);
1601         kfree(reg);
1602 }
1603
1604 static int o2hb_read_block_input(struct o2hb_region *reg,
1605                                  const char *page,
1606                                  unsigned long *ret_bytes,
1607                                  unsigned int *ret_bits)
1608 {
1609         unsigned long bytes;
1610         char *p = (char *)page;
1611
1612         bytes = simple_strtoul(p, &p, 0);
1613         if (!p || (*p && (*p != '\n')))
1614                 return -EINVAL;
1615
1616         /* Heartbeat and fs min / max block sizes are the same. */
1617         if (bytes > 4096 || bytes < 512)
1618                 return -ERANGE;
1619         if (hweight16(bytes) != 1)
1620                 return -EINVAL;
1621
1622         if (ret_bytes)
1623                 *ret_bytes = bytes;
1624         if (ret_bits)
1625                 *ret_bits = ffs(bytes) - 1;
1626
1627         return 0;
1628 }
1629
1630 static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1631                                             char *page)
1632 {
1633         return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1634 }
1635
1636 static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1637                                              const char *page,
1638                                              size_t count)
1639 {
1640         struct o2hb_region *reg = to_o2hb_region(item);
1641         int status;
1642         unsigned long block_bytes;
1643         unsigned int block_bits;
1644
1645         if (reg->hr_bdev)
1646                 return -EINVAL;
1647
1648         status = o2hb_read_block_input(reg, page, &block_bytes,
1649                                        &block_bits);
1650         if (status)
1651                 return status;
1652
1653         reg->hr_block_bytes = (unsigned int)block_bytes;
1654         reg->hr_block_bits = block_bits;
1655
1656         return count;
1657 }
1658
1659 static ssize_t o2hb_region_start_block_show(struct config_item *item,
1660                                             char *page)
1661 {
1662         return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1663 }
1664
1665 static ssize_t o2hb_region_start_block_store(struct config_item *item,
1666                                              const char *page,
1667                                              size_t count)
1668 {
1669         struct o2hb_region *reg = to_o2hb_region(item);
1670         unsigned long long tmp;
1671         char *p = (char *)page;
1672
1673         if (reg->hr_bdev)
1674                 return -EINVAL;
1675
1676         tmp = simple_strtoull(p, &p, 0);
1677         if (!p || (*p && (*p != '\n')))
1678                 return -EINVAL;
1679
1680         reg->hr_start_block = tmp;
1681
1682         return count;
1683 }
1684
1685 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1686 {
1687         return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1688 }
1689
1690 static ssize_t o2hb_region_blocks_store(struct config_item *item,
1691                                         const char *page,
1692                                         size_t count)
1693 {
1694         struct o2hb_region *reg = to_o2hb_region(item);
1695         unsigned long tmp;
1696         char *p = (char *)page;
1697
1698         if (reg->hr_bdev)
1699                 return -EINVAL;
1700
1701         tmp = simple_strtoul(p, &p, 0);
1702         if (!p || (*p && (*p != '\n')))
1703                 return -EINVAL;
1704
1705         if (tmp > O2NM_MAX_NODES || tmp == 0)
1706                 return -ERANGE;
1707
1708         reg->hr_blocks = (unsigned int)tmp;
1709
1710         return count;
1711 }
1712
1713 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1714 {
1715         unsigned int ret = 0;
1716
1717         if (to_o2hb_region(item)->hr_bdev)
1718                 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1719
1720         return ret;
1721 }
1722
1723 static void o2hb_init_region_params(struct o2hb_region *reg)
1724 {
1725         reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1726         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1727
1728         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1729              reg->hr_start_block, reg->hr_blocks);
1730         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1731              reg->hr_block_bytes, reg->hr_block_bits);
1732         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1733         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1734 }
1735
1736 static int o2hb_map_slot_data(struct o2hb_region *reg)
1737 {
1738         int i, j;
1739         unsigned int last_slot;
1740         unsigned int spp = reg->hr_slots_per_page;
1741         struct page *page;
1742         char *raw;
1743         struct o2hb_disk_slot *slot;
1744
1745         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1746         if (reg->hr_tmp_block == NULL)
1747                 return -ENOMEM;
1748
1749         reg->hr_slots = kcalloc(reg->hr_blocks,
1750                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1751         if (reg->hr_slots == NULL)
1752                 return -ENOMEM;
1753
1754         for(i = 0; i < reg->hr_blocks; i++) {
1755                 slot = &reg->hr_slots[i];
1756                 slot->ds_node_num = i;
1757                 INIT_LIST_HEAD(&slot->ds_live_item);
1758                 slot->ds_raw_block = NULL;
1759         }
1760
1761         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1762         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1763                            "at %u blocks per page\n",
1764              reg->hr_num_pages, reg->hr_blocks, spp);
1765
1766         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1767                                     GFP_KERNEL);
1768         if (!reg->hr_slot_data)
1769                 return -ENOMEM;
1770
1771         for(i = 0; i < reg->hr_num_pages; i++) {
1772                 page = alloc_page(GFP_KERNEL);
1773                 if (!page)
1774                         return -ENOMEM;
1775
1776                 reg->hr_slot_data[i] = page;
1777
1778                 last_slot = i * spp;
1779                 raw = page_address(page);
1780                 for (j = 0;
1781                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1782                      j++) {
1783                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1784
1785                         slot = &reg->hr_slots[j + last_slot];
1786                         slot->ds_raw_block =
1787                                 (struct o2hb_disk_heartbeat_block *) raw;
1788
1789                         raw += reg->hr_block_bytes;
1790                 }
1791         }
1792
1793         return 0;
1794 }
1795
1796 /* Read in all the slots available and populate the tracking
1797  * structures so that we can start with a baseline idea of what's
1798  * there. */
1799 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1800 {
1801         int ret, i;
1802         struct o2hb_disk_slot *slot;
1803         struct o2hb_disk_heartbeat_block *hb_block;
1804
1805         ret = o2hb_read_slots(reg, reg->hr_blocks);
1806         if (ret)
1807                 goto out;
1808
1809         /* We only want to get an idea of the values initially in each
1810          * slot, so we do no verification - o2hb_check_slot will
1811          * actually determine if each configured slot is valid and
1812          * whether any values have changed. */
1813         for(i = 0; i < reg->hr_blocks; i++) {
1814                 slot = &reg->hr_slots[i];
1815                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1816
1817                 /* Only fill the values that o2hb_check_slot uses to
1818                  * determine changing slots */
1819                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1820                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1821         }
1822
1823 out:
1824         return ret;
1825 }
1826
1827 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1828 static ssize_t o2hb_region_dev_store(struct config_item *item,
1829                                      const char *page,
1830                                      size_t count)
1831 {
1832         struct o2hb_region *reg = to_o2hb_region(item);
1833         struct task_struct *hb_task;
1834         long fd;
1835         int sectsize;
1836         char *p = (char *)page;
1837         struct fd f;
1838         struct inode *inode;
1839         ssize_t ret = -EINVAL;
1840         int live_threshold;
1841
1842         if (reg->hr_bdev)
1843                 goto out;
1844
1845         /* We can't heartbeat without having had our node number
1846          * configured yet. */
1847         if (o2nm_this_node() == O2NM_MAX_NODES)
1848                 goto out;
1849
1850         fd = simple_strtol(p, &p, 0);
1851         if (!p || (*p && (*p != '\n')))
1852                 goto out;
1853
1854         if (fd < 0 || fd >= INT_MAX)
1855                 goto out;
1856
1857         f = fdget(fd);
1858         if (f.file == NULL)
1859                 goto out;
1860
1861         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1862             reg->hr_block_bytes == 0)
1863                 goto out2;
1864
1865         inode = igrab(f.file->f_mapping->host);
1866         if (inode == NULL)
1867                 goto out2;
1868
1869         if (!S_ISBLK(inode->i_mode))
1870                 goto out3;
1871
1872         reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1873         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1874         if (ret) {
1875                 reg->hr_bdev = NULL;
1876                 goto out3;
1877         }
1878         inode = NULL;
1879
1880         bdevname(reg->hr_bdev, reg->hr_dev_name);
1881
1882         sectsize = bdev_logical_block_size(reg->hr_bdev);
1883         if (sectsize != reg->hr_block_bytes) {
1884                 mlog(ML_ERROR,
1885                      "blocksize %u incorrect for device, expected %d",
1886                      reg->hr_block_bytes, sectsize);
1887                 ret = -EINVAL;
1888                 goto out3;
1889         }
1890
1891         o2hb_init_region_params(reg);
1892
1893         /* Generation of zero is invalid */
1894         do {
1895                 get_random_bytes(&reg->hr_generation,
1896                                  sizeof(reg->hr_generation));
1897         } while (reg->hr_generation == 0);
1898
1899         ret = o2hb_map_slot_data(reg);
1900         if (ret) {
1901                 mlog_errno(ret);
1902                 goto out3;
1903         }
1904
1905         ret = o2hb_populate_slot_data(reg);
1906         if (ret) {
1907                 mlog_errno(ret);
1908                 goto out3;
1909         }
1910
1911         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1912         INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1913
1914         /*
1915          * A node is considered live after it has beat LIVE_THRESHOLD
1916          * times.  We're not steady until we've given them a chance
1917          * _after_ our first read.
1918          * The default threshold is bare minimum so as to limit the delay
1919          * during mounts. For global heartbeat, the threshold doubled for the
1920          * first region.
1921          */
1922         live_threshold = O2HB_LIVE_THRESHOLD;
1923         if (o2hb_global_heartbeat_active()) {
1924                 spin_lock(&o2hb_live_lock);
1925                 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1926                         live_threshold <<= 1;
1927                 spin_unlock(&o2hb_live_lock);
1928         }
1929         ++live_threshold;
1930         atomic_set(&reg->hr_steady_iterations, live_threshold);
1931         /* unsteady_iterations is triple the steady_iterations */
1932         atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1933
1934         hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1935                               reg->hr_item.ci_name);
1936         if (IS_ERR(hb_task)) {
1937                 ret = PTR_ERR(hb_task);
1938                 mlog_errno(ret);
1939                 goto out3;
1940         }
1941
1942         spin_lock(&o2hb_live_lock);
1943         reg->hr_task = hb_task;
1944         spin_unlock(&o2hb_live_lock);
1945
1946         ret = wait_event_interruptible(o2hb_steady_queue,
1947                                 atomic_read(&reg->hr_steady_iterations) == 0 ||
1948                                 reg->hr_node_deleted);
1949         if (ret) {
1950                 atomic_set(&reg->hr_steady_iterations, 0);
1951                 reg->hr_aborted_start = 1;
1952         }
1953
1954         if (reg->hr_aborted_start) {
1955                 ret = -EIO;
1956                 goto out3;
1957         }
1958
1959         if (reg->hr_node_deleted) {
1960                 ret = -EINVAL;
1961                 goto out3;
1962         }
1963
1964         /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1965         spin_lock(&o2hb_live_lock);
1966         hb_task = reg->hr_task;
1967         if (o2hb_global_heartbeat_active())
1968                 set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1969         spin_unlock(&o2hb_live_lock);
1970
1971         if (hb_task)
1972                 ret = count;
1973         else
1974                 ret = -EIO;
1975
1976         if (hb_task && o2hb_global_heartbeat_active())
1977                 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1978                        config_item_name(&reg->hr_item), reg->hr_dev_name);
1979
1980 out3:
1981         iput(inode);
1982 out2:
1983         fdput(f);
1984 out:
1985         if (ret < 0) {
1986                 if (reg->hr_bdev) {
1987                         blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1988                         reg->hr_bdev = NULL;
1989                 }
1990         }
1991         return ret;
1992 }
1993
1994 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1995 {
1996         struct o2hb_region *reg = to_o2hb_region(item);
1997         pid_t pid = 0;
1998
1999         spin_lock(&o2hb_live_lock);
2000         if (reg->hr_task)
2001                 pid = task_pid_nr(reg->hr_task);
2002         spin_unlock(&o2hb_live_lock);
2003
2004         if (!pid)
2005                 return 0;
2006
2007         return sprintf(page, "%u\n", pid);
2008 }
2009
2010 CONFIGFS_ATTR(o2hb_region_, block_bytes);
2011 CONFIGFS_ATTR(o2hb_region_, start_block);
2012 CONFIGFS_ATTR(o2hb_region_, blocks);
2013 CONFIGFS_ATTR(o2hb_region_, dev);
2014 CONFIGFS_ATTR_RO(o2hb_region_, pid);
2015
2016 static struct configfs_attribute *o2hb_region_attrs[] = {
2017         &o2hb_region_attr_block_bytes,
2018         &o2hb_region_attr_start_block,
2019         &o2hb_region_attr_blocks,
2020         &o2hb_region_attr_dev,
2021         &o2hb_region_attr_pid,
2022         NULL,
2023 };
2024
2025 static struct configfs_item_operations o2hb_region_item_ops = {
2026         .release                = o2hb_region_release,
2027 };
2028
2029 static struct config_item_type o2hb_region_type = {
2030         .ct_item_ops    = &o2hb_region_item_ops,
2031         .ct_attrs       = o2hb_region_attrs,
2032         .ct_owner       = THIS_MODULE,
2033 };
2034
2035 /* heartbeat set */
2036
2037 struct o2hb_heartbeat_group {
2038         struct config_group hs_group;
2039         /* some stuff? */
2040 };
2041
2042 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
2043 {
2044         return group ?
2045                 container_of(group, struct o2hb_heartbeat_group, hs_group)
2046                 : NULL;
2047 }
2048
2049 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
2050 {
2051         int ret = -ENOMEM;
2052
2053         reg->hr_debug_dir =
2054                 debugfs_create_dir(config_item_name(&reg->hr_item), dir);
2055         if (!reg->hr_debug_dir) {
2056                 mlog_errno(ret);
2057                 goto bail;
2058         }
2059
2060         reg->hr_debug_livenodes =
2061                         o2hb_debug_create(O2HB_DEBUG_LIVENODES,
2062                                           reg->hr_debug_dir,
2063                                           &(reg->hr_db_livenodes),
2064                                           sizeof(*(reg->hr_db_livenodes)),
2065                                           O2HB_DB_TYPE_REGION_LIVENODES,
2066                                           sizeof(reg->hr_live_node_bitmap),
2067                                           O2NM_MAX_NODES, reg);
2068         if (!reg->hr_debug_livenodes) {
2069                 mlog_errno(ret);
2070                 goto bail;
2071         }
2072
2073         reg->hr_debug_regnum =
2074                         o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2075                                           reg->hr_debug_dir,
2076                                           &(reg->hr_db_regnum),
2077                                           sizeof(*(reg->hr_db_regnum)),
2078                                           O2HB_DB_TYPE_REGION_NUMBER,
2079                                           0, O2NM_MAX_NODES, reg);
2080         if (!reg->hr_debug_regnum) {
2081                 mlog_errno(ret);
2082                 goto bail;
2083         }
2084
2085         reg->hr_debug_elapsed_time =
2086                         o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2087                                           reg->hr_debug_dir,
2088                                           &(reg->hr_db_elapsed_time),
2089                                           sizeof(*(reg->hr_db_elapsed_time)),
2090                                           O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2091                                           0, 0, reg);
2092         if (!reg->hr_debug_elapsed_time) {
2093                 mlog_errno(ret);
2094                 goto bail;
2095         }
2096
2097         reg->hr_debug_pinned =
2098                         o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2099                                           reg->hr_debug_dir,
2100                                           &(reg->hr_db_pinned),
2101                                           sizeof(*(reg->hr_db_pinned)),
2102                                           O2HB_DB_TYPE_REGION_PINNED,
2103                                           0, 0, reg);
2104         if (!reg->hr_debug_pinned) {
2105                 mlog_errno(ret);
2106                 goto bail;
2107         }
2108
2109         ret = 0;
2110 bail:
2111         return ret;
2112 }
2113
2114 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2115                                                           const char *name)
2116 {
2117         struct o2hb_region *reg = NULL;
2118         int ret;
2119
2120         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2121         if (reg == NULL)
2122                 return ERR_PTR(-ENOMEM);
2123
2124         if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2125                 ret = -ENAMETOOLONG;
2126                 goto free;
2127         }
2128
2129         spin_lock(&o2hb_live_lock);
2130         reg->hr_region_num = 0;
2131         if (o2hb_global_heartbeat_active()) {
2132                 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2133                                                          O2NM_MAX_REGIONS);
2134                 if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2135                         spin_unlock(&o2hb_live_lock);
2136                         ret = -EFBIG;
2137                         goto free;
2138                 }
2139                 set_bit(reg->hr_region_num, o2hb_region_bitmap);
2140         }
2141         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2142         spin_unlock(&o2hb_live_lock);
2143
2144         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2145
2146         /* this is the same way to generate msg key as dlm, for local heartbeat,
2147          * name is also the same, so make initial crc value different to avoid
2148          * message key conflict.
2149          */
2150         reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2151                 name, strlen(name));
2152         INIT_LIST_HEAD(&reg->hr_handler_list);
2153         ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2154                         sizeof(struct o2hb_nego_msg),
2155                         o2hb_nego_timeout_handler,
2156                         reg, NULL, &reg->hr_handler_list);
2157         if (ret)
2158                 goto remove_item;
2159
2160         ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2161                         sizeof(struct o2hb_nego_msg),
2162                         o2hb_nego_approve_handler,
2163                         reg, NULL, &reg->hr_handler_list);
2164         if (ret)
2165                 goto unregister_handler;
2166
2167         ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2168         if (ret) {
2169                 config_item_put(&reg->hr_item);
2170                 goto unregister_handler;
2171         }
2172
2173         return &reg->hr_item;
2174
2175 unregister_handler:
2176         o2net_unregister_handler_list(&reg->hr_handler_list);
2177 remove_item:
2178         spin_lock(&o2hb_live_lock);
2179         list_del(&reg->hr_all_item);
2180         if (o2hb_global_heartbeat_active())
2181                 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2182         spin_unlock(&o2hb_live_lock);
2183 free:
2184         kfree(reg);
2185         return ERR_PTR(ret);
2186 }
2187
2188 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2189                                            struct config_item *item)
2190 {
2191         struct task_struct *hb_task;
2192         struct o2hb_region *reg = to_o2hb_region(item);
2193         int quorum_region = 0;
2194
2195         /* stop the thread when the user removes the region dir */
2196         spin_lock(&o2hb_live_lock);
2197         hb_task = reg->hr_task;
2198         reg->hr_task = NULL;
2199         reg->hr_item_dropped = 1;
2200         spin_unlock(&o2hb_live_lock);
2201
2202         if (hb_task)
2203                 kthread_stop(hb_task);
2204
2205         if (o2hb_global_heartbeat_active()) {
2206                 spin_lock(&o2hb_live_lock);
2207                 clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2208                 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2209                 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2210                         quorum_region = 1;
2211                 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2212                 spin_unlock(&o2hb_live_lock);
2213                 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2214                        ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2215                         "stopped" : "start aborted"), config_item_name(item),
2216                        reg->hr_dev_name);
2217         }
2218
2219         /*
2220          * If we're racing a dev_write(), we need to wake them.  They will
2221          * check reg->hr_task
2222          */
2223         if (atomic_read(&reg->hr_steady_iterations) != 0) {
2224                 reg->hr_aborted_start = 1;
2225                 atomic_set(&reg->hr_steady_iterations, 0);
2226                 wake_up(&o2hb_steady_queue);
2227         }
2228
2229         config_item_put(item);
2230
2231         if (!o2hb_global_heartbeat_active() || !quorum_region)
2232                 return;
2233
2234         /*
2235          * If global heartbeat active and there are dependent users,
2236          * pin all regions if quorum region count <= CUT_OFF
2237          */
2238         spin_lock(&o2hb_live_lock);
2239
2240         if (!o2hb_dependent_users)
2241                 goto unlock;
2242
2243         if (bitmap_weight(o2hb_quorum_region_bitmap,
2244                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2245                 o2hb_region_pin(NULL);
2246
2247 unlock:
2248         spin_unlock(&o2hb_live_lock);
2249 }
2250
2251 static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item,
2252                 char *page)
2253 {
2254         return sprintf(page, "%u\n", o2hb_dead_threshold);
2255 }
2256
2257 static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item,
2258                 const char *page, size_t count)
2259 {
2260         unsigned long tmp;
2261         char *p = (char *)page;
2262
2263         tmp = simple_strtoul(p, &p, 10);
2264         if (!p || (*p && (*p != '\n')))
2265                 return -EINVAL;
2266
2267         /* this will validate ranges for us. */
2268         o2hb_dead_threshold_set((unsigned int) tmp);
2269
2270         return count;
2271 }
2272
2273 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2274                 char *page)
2275 {
2276         return sprintf(page, "%s\n",
2277                        o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2278 }
2279
2280 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2281                 const char *page, size_t count)
2282 {
2283         unsigned int i;
2284         int ret;
2285         size_t len;
2286
2287         len = (page[count - 1] == '\n') ? count - 1 : count;
2288         if (!len)
2289                 return -EINVAL;
2290
2291         for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2292                 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2293                         continue;
2294
2295                 ret = o2hb_global_heartbeat_mode_set(i);
2296                 if (!ret)
2297                         printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2298                                o2hb_heartbeat_mode_desc[i]);
2299                 return count;
2300         }
2301
2302         return -EINVAL;
2303
2304 }
2305
2306 CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold);
2307 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2308
2309 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2310         &o2hb_heartbeat_group_attr_dead_threshold,
2311         &o2hb_heartbeat_group_attr_mode,
2312         NULL,
2313 };
2314
2315 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2316         .make_item      = o2hb_heartbeat_group_make_item,
2317         .drop_item      = o2hb_heartbeat_group_drop_item,
2318 };
2319
2320 static struct config_item_type o2hb_heartbeat_group_type = {
2321         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
2322         .ct_attrs       = o2hb_heartbeat_group_attrs,
2323         .ct_owner       = THIS_MODULE,
2324 };
2325
2326 /* this is just here to avoid touching group in heartbeat.h which the
2327  * entire damn world #includes */
2328 struct config_group *o2hb_alloc_hb_set(void)
2329 {
2330         struct o2hb_heartbeat_group *hs = NULL;
2331         struct config_group *ret = NULL;
2332
2333         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2334         if (hs == NULL)
2335                 goto out;
2336
2337         config_group_init_type_name(&hs->hs_group, "heartbeat",
2338                                     &o2hb_heartbeat_group_type);
2339
2340         ret = &hs->hs_group;
2341 out:
2342         if (ret == NULL)
2343                 kfree(hs);
2344         return ret;
2345 }
2346
2347 void o2hb_free_hb_set(struct config_group *group)
2348 {
2349         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2350         kfree(hs);
2351 }
2352
2353 /* hb callback registration and issuing */
2354
2355 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2356 {
2357         if (type == O2HB_NUM_CB)
2358                 return ERR_PTR(-EINVAL);
2359
2360         return &o2hb_callbacks[type];
2361 }
2362
2363 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2364                          enum o2hb_callback_type type,
2365                          o2hb_cb_func *func,
2366                          void *data,
2367                          int priority)
2368 {
2369         INIT_LIST_HEAD(&hc->hc_item);
2370         hc->hc_func = func;
2371         hc->hc_data = data;
2372         hc->hc_priority = priority;
2373         hc->hc_type = type;
2374         hc->hc_magic = O2HB_CB_MAGIC;
2375 }
2376 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2377
2378 /*
2379  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2380  * In global heartbeat mode, region_uuid passed is NULL.
2381  *
2382  * In local, we only pin the matching region. In global we pin all the active
2383  * regions.
2384  */
2385 static int o2hb_region_pin(const char *region_uuid)
2386 {
2387         int ret = 0, found = 0;
2388         struct o2hb_region *reg;
2389         char *uuid;
2390
2391         assert_spin_locked(&o2hb_live_lock);
2392
2393         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2394                 if (reg->hr_item_dropped)
2395                         continue;
2396
2397                 uuid = config_item_name(&reg->hr_item);
2398
2399                 /* local heartbeat */
2400                 if (region_uuid) {
2401                         if (strcmp(region_uuid, uuid))
2402                                 continue;
2403                         found = 1;
2404                 }
2405
2406                 if (reg->hr_item_pinned || reg->hr_item_dropped)
2407                         goto skip_pin;
2408
2409                 /* Ignore ENOENT only for local hb (userdlm domain) */
2410                 ret = o2nm_depend_item(&reg->hr_item);
2411                 if (!ret) {
2412                         mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2413                         reg->hr_item_pinned = 1;
2414                 } else {
2415                         if (ret == -ENOENT && found)
2416                                 ret = 0;
2417                         else {
2418                                 mlog(ML_ERROR, "Pin region %s fails with %d\n",
2419                                      uuid, ret);
2420                                 break;
2421                         }
2422                 }
2423 skip_pin:
2424                 if (found)
2425                         break;
2426         }
2427
2428         return ret;
2429 }
2430
2431 /*
2432  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2433  * In global heartbeat mode, region_uuid passed is NULL.
2434  *
2435  * In local, we only unpin the matching region. In global we unpin all the
2436  * active regions.
2437  */
2438 static void o2hb_region_unpin(const char *region_uuid)
2439 {
2440         struct o2hb_region *reg;
2441         char *uuid;
2442         int found = 0;
2443
2444         assert_spin_locked(&o2hb_live_lock);
2445
2446         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2447                 if (reg->hr_item_dropped)
2448                         continue;
2449
2450                 uuid = config_item_name(&reg->hr_item);
2451                 if (region_uuid) {
2452                         if (strcmp(region_uuid, uuid))
2453                                 continue;
2454                         found = 1;
2455                 }
2456
2457                 if (reg->hr_item_pinned) {
2458                         mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2459                         o2nm_undepend_item(&reg->hr_item);
2460                         reg->hr_item_pinned = 0;
2461                 }
2462                 if (found)
2463                         break;
2464         }
2465 }
2466
2467 static int o2hb_region_inc_user(const char *region_uuid)
2468 {
2469         int ret = 0;
2470
2471         spin_lock(&o2hb_live_lock);
2472
2473         /* local heartbeat */
2474         if (!o2hb_global_heartbeat_active()) {
2475             ret = o2hb_region_pin(region_uuid);
2476             goto unlock;
2477         }
2478
2479         /*
2480          * if global heartbeat active and this is the first dependent user,
2481          * pin all regions if quorum region count <= CUT_OFF
2482          */
2483         o2hb_dependent_users++;
2484         if (o2hb_dependent_users > 1)
2485                 goto unlock;
2486
2487         if (bitmap_weight(o2hb_quorum_region_bitmap,
2488                            O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2489                 ret = o2hb_region_pin(NULL);
2490
2491 unlock:
2492         spin_unlock(&o2hb_live_lock);
2493         return ret;
2494 }
2495
2496 void o2hb_region_dec_user(const char *region_uuid)
2497 {
2498         spin_lock(&o2hb_live_lock);
2499
2500         /* local heartbeat */
2501         if (!o2hb_global_heartbeat_active()) {
2502             o2hb_region_unpin(region_uuid);
2503             goto unlock;
2504         }
2505
2506         /*
2507          * if global heartbeat active and there are no dependent users,
2508          * unpin all quorum regions
2509          */
2510         o2hb_dependent_users--;
2511         if (!o2hb_dependent_users)
2512                 o2hb_region_unpin(NULL);
2513
2514 unlock:
2515         spin_unlock(&o2hb_live_lock);
2516 }
2517
2518 int o2hb_register_callback(const char *region_uuid,
2519                            struct o2hb_callback_func *hc)
2520 {
2521         struct o2hb_callback_func *f;
2522         struct o2hb_callback *hbcall;
2523         int ret;
2524
2525         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2526         BUG_ON(!list_empty(&hc->hc_item));
2527
2528         hbcall = hbcall_from_type(hc->hc_type);
2529         if (IS_ERR(hbcall)) {
2530                 ret = PTR_ERR(hbcall);
2531                 goto out;
2532         }
2533
2534         if (region_uuid) {
2535                 ret = o2hb_region_inc_user(region_uuid);
2536                 if (ret) {
2537                         mlog_errno(ret);
2538                         goto out;
2539                 }
2540         }
2541
2542         down_write(&o2hb_callback_sem);
2543
2544         list_for_each_entry(f, &hbcall->list, hc_item) {
2545                 if (hc->hc_priority < f->hc_priority) {
2546                         list_add_tail(&hc->hc_item, &f->hc_item);
2547                         break;
2548                 }
2549         }
2550         if (list_empty(&hc->hc_item))
2551                 list_add_tail(&hc->hc_item, &hbcall->list);
2552
2553         up_write(&o2hb_callback_sem);
2554         ret = 0;
2555 out:
2556         mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2557              ret, __builtin_return_address(0), hc);
2558         return ret;
2559 }
2560 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2561
2562 void o2hb_unregister_callback(const char *region_uuid,
2563                               struct o2hb_callback_func *hc)
2564 {
2565         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2566
2567         mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2568              __builtin_return_address(0), hc);
2569
2570         /* XXX Can this happen _with_ a region reference? */
2571         if (list_empty(&hc->hc_item))
2572                 return;
2573
2574         if (region_uuid)
2575                 o2hb_region_dec_user(region_uuid);
2576
2577         down_write(&o2hb_callback_sem);
2578
2579         list_del_init(&hc->hc_item);
2580
2581         up_write(&o2hb_callback_sem);
2582 }
2583 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2584
2585 int o2hb_check_node_heartbeating(u8 node_num)
2586 {
2587         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2588
2589         o2hb_fill_node_map(testing_map, sizeof(testing_map));
2590         if (!test_bit(node_num, testing_map)) {
2591                 mlog(ML_HEARTBEAT,
2592                      "node (%u) does not have heartbeating enabled.\n",
2593                      node_num);
2594                 return 0;
2595         }
2596
2597         return 1;
2598 }
2599 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
2600
2601 int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2602 {
2603         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2604
2605         spin_lock(&o2hb_live_lock);
2606         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2607         spin_unlock(&o2hb_live_lock);
2608         if (!test_bit(node_num, testing_map)) {
2609                 mlog(ML_HEARTBEAT,
2610                      "node (%u) does not have heartbeating enabled.\n",
2611                      node_num);
2612                 return 0;
2613         }
2614
2615         return 1;
2616 }
2617 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2618
2619 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2620 {
2621         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2622
2623         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2624         if (!test_bit(node_num, testing_map)) {
2625                 mlog(ML_HEARTBEAT,
2626                      "node (%u) does not have heartbeating enabled.\n",
2627                      node_num);
2628                 return 0;
2629         }
2630
2631         return 1;
2632 }
2633 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2634
2635 /* Makes sure our local node is configured with a node number, and is
2636  * heartbeating. */
2637 int o2hb_check_local_node_heartbeating(void)
2638 {
2639         u8 node_num;
2640
2641         /* if this node was set then we have networking */
2642         node_num = o2nm_this_node();
2643         if (node_num == O2NM_MAX_NODES) {
2644                 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
2645                 return 0;
2646         }
2647
2648         return o2hb_check_node_heartbeating(node_num);
2649 }
2650 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
2651
2652 /*
2653  * this is just a hack until we get the plumbing which flips file systems
2654  * read only and drops the hb ref instead of killing the node dead.
2655  */
2656 void o2hb_stop_all_regions(void)
2657 {
2658         struct o2hb_region *reg;
2659
2660         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2661
2662         spin_lock(&o2hb_live_lock);
2663
2664         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2665                 reg->hr_unclean_stop = 1;
2666
2667         spin_unlock(&o2hb_live_lock);
2668 }
2669 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2670
2671 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2672 {
2673         struct o2hb_region *reg;
2674         int numregs = 0;
2675         char *p;
2676
2677         spin_lock(&o2hb_live_lock);
2678
2679         p = region_uuids;
2680         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2681                 if (reg->hr_item_dropped)
2682                         continue;
2683
2684                 mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2685                 if (numregs < max_regions) {
2686                         memcpy(p, config_item_name(&reg->hr_item),
2687                                O2HB_MAX_REGION_NAME_LEN);
2688                         p += O2HB_MAX_REGION_NAME_LEN;
2689                 }
2690                 numregs++;
2691         }
2692
2693         spin_unlock(&o2hb_live_lock);
2694
2695         return numregs;
2696 }
2697 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2698
2699 int o2hb_global_heartbeat_active(void)
2700 {
2701         return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2702 }
2703 EXPORT_SYMBOL(o2hb_global_heartbeat_active);