GNU Linux-libre 4.9.309-gnu1
[releases.git] / drivers / staging / lustre / lnet / lnet / net_fault.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  * Lustre is a trademark of Seagate, Inc.
28  *
29  * lnet/lnet/net_fault.c
30  *
31  * Lustre network fault simulation
32  *
33  * Author: liang.zhen@intel.com
34  */
35
36 #define DEBUG_SUBSYSTEM S_LNET
37
38 #include "../../include/linux/lnet/lib-lnet.h"
39 #include "../../include/linux/lnet/lnetctl.h"
40
41 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
42                                  LNET_GET_BIT | LNET_REPLY_BIT)
43
44 struct lnet_drop_rule {
45         /** link chain on the_lnet.ln_drop_rules */
46         struct list_head        dr_link;
47         /** attributes of this rule */
48         struct lnet_fault_attr  dr_attr;
49         /** lock to protect \a dr_drop_at and \a dr_stat */
50         spinlock_t              dr_lock;
51         /**
52          * the message sequence to drop, which means message is dropped when
53          * dr_stat.drs_count == dr_drop_at
54          */
55         unsigned long           dr_drop_at;
56         /**
57          * seconds to drop the next message, it's exclusive with dr_drop_at
58          */
59         unsigned long           dr_drop_time;
60         /** baseline to caculate dr_drop_time */
61         unsigned long           dr_time_base;
62         /** statistic of dropped messages */
63         struct lnet_fault_stat  dr_stat;
64 };
65
66 static bool
67 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
68 {
69         if (nid == msg_nid || nid == LNET_NID_ANY)
70                 return true;
71
72         if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
73                 return false;
74
75         /* 255.255.255.255@net is wildcard for all addresses in a network */
76         return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
77 }
78
79 static bool
80 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
81                       lnet_nid_t dst, unsigned int type, unsigned int portal)
82 {
83         if (!lnet_fault_nid_match(attr->fa_src, src) ||
84             !lnet_fault_nid_match(attr->fa_dst, dst))
85                 return false;
86
87         if (!(attr->fa_msg_mask & (1 << type)))
88                 return false;
89
90         /**
91          * NB: ACK and REPLY have no portal, but they should have been
92          * rejected by message mask
93          */
94         if (attr->fa_ptl_mask && /* has portal filter */
95             !(attr->fa_ptl_mask & (1ULL << portal)))
96                 return false;
97
98         return true;
99 }
100
101 static int
102 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
103 {
104         if (!attr->fa_msg_mask)
105                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
106
107         if (!attr->fa_ptl_mask) /* no portal filter */
108                 return 0;
109
110         /* NB: only PUT and GET can be filtered if portal filter has been set */
111         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
112         if (!attr->fa_msg_mask) {
113                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
114                        attr->fa_msg_mask);
115                 return -EINVAL;
116         }
117         return 0;
118 }
119
120 static void
121 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
122 {
123         /* NB: fs_counter is NOT updated by this function */
124         switch (type) {
125         case LNET_MSG_PUT:
126                 stat->fs_put++;
127                 return;
128         case LNET_MSG_ACK:
129                 stat->fs_ack++;
130                 return;
131         case LNET_MSG_GET:
132                 stat->fs_get++;
133                 return;
134         case LNET_MSG_REPLY:
135                 stat->fs_reply++;
136                 return;
137         }
138 }
139
140 /**
141  * LNet message drop simulation
142  */
143
144 /**
145  * Add a new drop rule to LNet
146  * There is no check for duplicated drop rule, all rules will be checked for
147  * incoming message.
148  */
149 static int
150 lnet_drop_rule_add(struct lnet_fault_attr *attr)
151 {
152         struct lnet_drop_rule *rule;
153
154         if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
155                 CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
156                        attr->u.drop.da_rate, attr->u.drop.da_interval);
157                 return -EINVAL;
158         }
159
160         if (lnet_fault_attr_validate(attr))
161                 return -EINVAL;
162
163         CFS_ALLOC_PTR(rule);
164         if (!rule)
165                 return -ENOMEM;
166
167         spin_lock_init(&rule->dr_lock);
168
169         rule->dr_attr = *attr;
170         if (attr->u.drop.da_interval) {
171                 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
172                 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
173                                                     attr->u.drop.da_interval);
174         } else {
175                 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
176         }
177
178         lnet_net_lock(LNET_LOCK_EX);
179         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
180         lnet_net_unlock(LNET_LOCK_EX);
181
182         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
183                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
184                attr->u.drop.da_rate, attr->u.drop.da_interval);
185         return 0;
186 }
187
188 /**
189  * Remove matched drop rules from lnet, all rules that can match \a src and
190  * \a dst will be removed.
191  * If \a src is zero, then all rules have \a dst as destination will be remove
192  * If \a dst is zero, then all rules have \a src as source will be removed
193  * If both of them are zero, all rules will be removed
194  */
195 static int
196 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
197 {
198         struct lnet_drop_rule *rule;
199         struct lnet_drop_rule *tmp;
200         struct list_head zombies;
201         int n = 0;
202
203         INIT_LIST_HEAD(&zombies);
204
205         lnet_net_lock(LNET_LOCK_EX);
206         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
207                 if (rule->dr_attr.fa_src != src && src)
208                         continue;
209
210                 if (rule->dr_attr.fa_dst != dst && dst)
211                         continue;
212
213                 list_move(&rule->dr_link, &zombies);
214         }
215         lnet_net_unlock(LNET_LOCK_EX);
216
217         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
218                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
219                        libcfs_nid2str(rule->dr_attr.fa_src),
220                        libcfs_nid2str(rule->dr_attr.fa_dst),
221                        rule->dr_attr.u.drop.da_rate,
222                        rule->dr_attr.u.drop.da_interval);
223
224                 list_del(&rule->dr_link);
225                 CFS_FREE_PTR(rule);
226                 n++;
227         }
228
229         return n;
230 }
231
232 /**
233  * List drop rule at position of \a pos
234  */
235 static int
236 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
237                     struct lnet_fault_stat *stat)
238 {
239         struct lnet_drop_rule *rule;
240         int cpt;
241         int i = 0;
242         int rc = -ENOENT;
243
244         cpt = lnet_net_lock_current();
245         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
246                 if (i++ < pos)
247                         continue;
248
249                 spin_lock(&rule->dr_lock);
250                 *attr = rule->dr_attr;
251                 *stat = rule->dr_stat;
252                 spin_unlock(&rule->dr_lock);
253                 rc = 0;
254                 break;
255         }
256
257         lnet_net_unlock(cpt);
258         return rc;
259 }
260
261 /**
262  * reset counters for all drop rules
263  */
264 static void
265 lnet_drop_rule_reset(void)
266 {
267         struct lnet_drop_rule *rule;
268         int cpt;
269
270         cpt = lnet_net_lock_current();
271
272         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
273                 struct lnet_fault_attr *attr = &rule->dr_attr;
274
275                 spin_lock(&rule->dr_lock);
276
277                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
278                 if (attr->u.drop.da_rate) {
279                         rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
280                 } else {
281                         rule->dr_drop_time = cfs_time_shift(cfs_rand() %
282                                                 attr->u.drop.da_interval);
283                         rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
284                 }
285                 spin_unlock(&rule->dr_lock);
286         }
287
288         lnet_net_unlock(cpt);
289 }
290
291 /**
292  * check source/destination NID, portal, message type and drop rate,
293  * decide whether should drop this message or not
294  */
295 static bool
296 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
297                 lnet_nid_t dst, unsigned int type, unsigned int portal)
298 {
299         struct lnet_fault_attr *attr = &rule->dr_attr;
300         bool drop;
301
302         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
303                 return false;
304
305         /* match this rule, check drop rate now */
306         spin_lock(&rule->dr_lock);
307         if (rule->dr_drop_time) { /* time based drop */
308                 unsigned long now = cfs_time_current();
309
310                 rule->dr_stat.fs_count++;
311                 drop = cfs_time_aftereq(now, rule->dr_drop_time);
312                 if (drop) {
313                         if (cfs_time_after(now, rule->dr_time_base))
314                                 rule->dr_time_base = now;
315
316                         rule->dr_drop_time = rule->dr_time_base +
317                                              cfs_time_seconds(cfs_rand() %
318                                                 attr->u.drop.da_interval);
319                         rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
320
321                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
322                                libcfs_nid2str(attr->fa_src),
323                                libcfs_nid2str(attr->fa_dst),
324                                rule->dr_drop_time);
325                 }
326
327         } else { /* rate based drop */
328                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
329
330                 if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
331                         rule->dr_drop_at = rule->dr_stat.fs_count +
332                                            cfs_rand() % attr->u.drop.da_rate;
333                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
334                                libcfs_nid2str(attr->fa_src),
335                                libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
336                 }
337         }
338
339         if (drop) { /* drop this message, update counters */
340                 lnet_fault_stat_inc(&rule->dr_stat, type);
341                 rule->dr_stat.u.drop.ds_dropped++;
342         }
343
344         spin_unlock(&rule->dr_lock);
345         return drop;
346 }
347
348 /**
349  * Check if message from \a src to \a dst can match any existed drop rule
350  */
351 bool
352 lnet_drop_rule_match(lnet_hdr_t *hdr)
353 {
354         struct lnet_drop_rule *rule;
355         lnet_nid_t src = le64_to_cpu(hdr->src_nid);
356         lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
357         unsigned int typ = le32_to_cpu(hdr->type);
358         unsigned int ptl = -1;
359         bool drop = false;
360         int cpt;
361
362         /**
363          * NB: if Portal is specified, then only PUT and GET will be
364          * filtered by drop rule
365          */
366         if (typ == LNET_MSG_PUT)
367                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
368         else if (typ == LNET_MSG_GET)
369                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
370
371         cpt = lnet_net_lock_current();
372         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
373                 drop = drop_rule_match(rule, src, dst, typ, ptl);
374                 if (drop)
375                         break;
376         }
377
378         lnet_net_unlock(cpt);
379         return drop;
380 }
381
382 /**
383  * LNet Delay Simulation
384  */
385 /** timestamp (second) to send delayed message */
386 #define msg_delay_send           msg_ev.hdr_data
387
388 struct lnet_delay_rule {
389         /** link chain on the_lnet.ln_delay_rules */
390         struct list_head        dl_link;
391         /** link chain on delay_dd.dd_sched_rules */
392         struct list_head        dl_sched_link;
393         /** attributes of this rule */
394         struct lnet_fault_attr  dl_attr;
395         /** lock to protect \a below members */
396         spinlock_t              dl_lock;
397         /** refcount of delay rule */
398         atomic_t                dl_refcount;
399         /**
400          * the message sequence to delay, which means message is delayed when
401          * dl_stat.fs_count == dl_delay_at
402          */
403         unsigned long           dl_delay_at;
404         /**
405          * seconds to delay the next message, it's exclusive with dl_delay_at
406          */
407         unsigned long           dl_delay_time;
408         /** baseline to caculate dl_delay_time */
409         unsigned long           dl_time_base;
410         /** jiffies to send the next delayed message */
411         unsigned long           dl_msg_send;
412         /** delayed message list */
413         struct list_head        dl_msg_list;
414         /** statistic of delayed messages */
415         struct lnet_fault_stat  dl_stat;
416         /** timer to wakeup delay_daemon */
417         struct timer_list       dl_timer;
418 };
419
420 struct delay_daemon_data {
421         /** serialise rule add/remove */
422         struct mutex            dd_mutex;
423         /** protect rules on \a dd_sched_rules */
424         spinlock_t              dd_lock;
425         /** scheduled delay rules (by timer) */
426         struct list_head        dd_sched_rules;
427         /** daemon thread sleeps at here */
428         wait_queue_head_t       dd_waitq;
429         /** controller (lctl command) wait at here */
430         wait_queue_head_t       dd_ctl_waitq;
431         /** daemon is running */
432         unsigned int            dd_running;
433         /** daemon stopped */
434         unsigned int            dd_stopped;
435 };
436
437 static struct delay_daemon_data delay_dd;
438
439 static unsigned long
440 round_timeout(unsigned long timeout)
441 {
442         return cfs_time_seconds((unsigned int)
443                         cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
444 }
445
446 static void
447 delay_rule_decref(struct lnet_delay_rule *rule)
448 {
449         if (atomic_dec_and_test(&rule->dl_refcount)) {
450                 LASSERT(list_empty(&rule->dl_sched_link));
451                 LASSERT(list_empty(&rule->dl_msg_list));
452                 LASSERT(list_empty(&rule->dl_link));
453
454                 CFS_FREE_PTR(rule);
455         }
456 }
457
458 /**
459  * check source/destination NID, portal, message type and delay rate,
460  * decide whether should delay this message or not
461  */
462 static bool
463 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
464                  lnet_nid_t dst, unsigned int type, unsigned int portal,
465                  struct lnet_msg *msg)
466 {
467         struct lnet_fault_attr *attr = &rule->dl_attr;
468         bool delay;
469
470         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
471                 return false;
472
473         /* match this rule, check delay rate now */
474         spin_lock(&rule->dl_lock);
475         if (rule->dl_delay_time) { /* time based delay */
476                 unsigned long now = cfs_time_current();
477
478                 rule->dl_stat.fs_count++;
479                 delay = cfs_time_aftereq(now, rule->dl_delay_time);
480                 if (delay) {
481                         if (cfs_time_after(now, rule->dl_time_base))
482                                 rule->dl_time_base = now;
483
484                         rule->dl_delay_time = rule->dl_time_base +
485                                              cfs_time_seconds(cfs_rand() %
486                                                 attr->u.delay.la_interval);
487                         rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
488
489                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
490                                libcfs_nid2str(attr->fa_src),
491                                libcfs_nid2str(attr->fa_dst),
492                                rule->dl_delay_time);
493                 }
494
495         } else { /* rate based delay */
496                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
497                 /* generate the next random rate sequence */
498                 if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
499                         rule->dl_delay_at = rule->dl_stat.fs_count +
500                                             cfs_rand() % attr->u.delay.la_rate;
501                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
502                                libcfs_nid2str(attr->fa_src),
503                                libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
504                 }
505         }
506
507         if (!delay) {
508                 spin_unlock(&rule->dl_lock);
509                 return false;
510         }
511
512         /* delay this message, update counters */
513         lnet_fault_stat_inc(&rule->dl_stat, type);
514         rule->dl_stat.u.delay.ls_delayed++;
515
516         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
517         msg->msg_delay_send = round_timeout(
518                         cfs_time_shift(attr->u.delay.la_latency));
519         if (rule->dl_msg_send == -1) {
520                 rule->dl_msg_send = msg->msg_delay_send;
521                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
522         }
523
524         spin_unlock(&rule->dl_lock);
525         return true;
526 }
527
528 /**
529  * check if \a msg can match any Delay Rule, receiving of this message
530  * will be delayed if there is a match.
531  */
532 bool
533 lnet_delay_rule_match_locked(lnet_hdr_t *hdr, struct lnet_msg *msg)
534 {
535         struct lnet_delay_rule *rule;
536         lnet_nid_t src = le64_to_cpu(hdr->src_nid);
537         lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
538         unsigned int typ = le32_to_cpu(hdr->type);
539         unsigned int ptl = -1;
540
541         /* NB: called with hold of lnet_net_lock */
542
543         /**
544          * NB: if Portal is specified, then only PUT and GET will be
545          * filtered by delay rule
546          */
547         if (typ == LNET_MSG_PUT)
548                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
549         else if (typ == LNET_MSG_GET)
550                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
551
552         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
553                 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
554                         return true;
555         }
556
557         return false;
558 }
559
560 /** check out delayed messages for send */
561 static void
562 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
563                   struct list_head *msg_list)
564 {
565         struct lnet_msg *msg;
566         struct lnet_msg *tmp;
567         unsigned long now = cfs_time_current();
568
569         if (!all && rule->dl_msg_send > now)
570                 return;
571
572         spin_lock(&rule->dl_lock);
573         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
574                 if (!all && msg->msg_delay_send > now)
575                         break;
576
577                 msg->msg_delay_send = 0;
578                 list_move_tail(&msg->msg_list, msg_list);
579         }
580
581         if (list_empty(&rule->dl_msg_list)) {
582                 del_timer(&rule->dl_timer);
583                 rule->dl_msg_send = -1;
584
585         } else if (!list_empty(msg_list)) {
586                 /*
587                  * dequeued some timedout messages, update timer for the
588                  * next delayed message on rule
589                  */
590                 msg = list_entry(rule->dl_msg_list.next,
591                                  struct lnet_msg, msg_list);
592                 rule->dl_msg_send = msg->msg_delay_send;
593                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
594         }
595         spin_unlock(&rule->dl_lock);
596 }
597
598 static void
599 delayed_msg_process(struct list_head *msg_list, bool drop)
600 {
601         struct lnet_msg *msg;
602
603         while (!list_empty(msg_list)) {
604                 struct lnet_ni *ni;
605                 int cpt;
606                 int rc;
607
608                 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
609                 LASSERT(msg->msg_rxpeer);
610
611                 ni = msg->msg_rxpeer->lp_ni;
612                 cpt = msg->msg_rx_cpt;
613
614                 list_del_init(&msg->msg_list);
615                 if (drop) {
616                         rc = -ECANCELED;
617
618                 } else if (!msg->msg_routing) {
619                         rc = lnet_parse_local(ni, msg);
620                         if (!rc)
621                                 continue;
622
623                 } else {
624                         lnet_net_lock(cpt);
625                         rc = lnet_parse_forward_locked(ni, msg);
626                         lnet_net_unlock(cpt);
627
628                         switch (rc) {
629                         case LNET_CREDIT_OK:
630                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
631                                              0, msg->msg_len, msg->msg_len);
632                         case LNET_CREDIT_WAIT:
633                                 continue;
634                         default: /* failures */
635                                 break;
636                         }
637                 }
638
639                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
640                 lnet_finalize(ni, msg, rc);
641         }
642 }
643
644 /**
645  * Process delayed messages for scheduled rules
646  * This function can either be called by delay_rule_daemon, or by lnet_finalise
647  */
648 void
649 lnet_delay_rule_check(void)
650 {
651         struct lnet_delay_rule *rule;
652         struct list_head msgs;
653
654         INIT_LIST_HEAD(&msgs);
655         while (1) {
656                 if (list_empty(&delay_dd.dd_sched_rules))
657                         break;
658
659                 spin_lock_bh(&delay_dd.dd_lock);
660                 if (list_empty(&delay_dd.dd_sched_rules)) {
661                         spin_unlock_bh(&delay_dd.dd_lock);
662                         break;
663                 }
664
665                 rule = list_entry(delay_dd.dd_sched_rules.next,
666                                   struct lnet_delay_rule, dl_sched_link);
667                 list_del_init(&rule->dl_sched_link);
668                 spin_unlock_bh(&delay_dd.dd_lock);
669
670                 delayed_msg_check(rule, false, &msgs);
671                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
672         }
673
674         if (!list_empty(&msgs))
675                 delayed_msg_process(&msgs, false);
676 }
677
678 /** daemon thread to handle delayed messages */
679 static int
680 lnet_delay_rule_daemon(void *arg)
681 {
682         delay_dd.dd_running = 1;
683         wake_up(&delay_dd.dd_ctl_waitq);
684
685         while (delay_dd.dd_running) {
686                 wait_event_interruptible(delay_dd.dd_waitq,
687                                          !delay_dd.dd_running ||
688                                          !list_empty(&delay_dd.dd_sched_rules));
689                 lnet_delay_rule_check();
690         }
691
692         /* in case more rules have been enqueued after my last check */
693         lnet_delay_rule_check();
694         delay_dd.dd_stopped = 1;
695         wake_up(&delay_dd.dd_ctl_waitq);
696
697         return 0;
698 }
699
700 static void
701 delay_timer_cb(unsigned long arg)
702 {
703         struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
704
705         spin_lock_bh(&delay_dd.dd_lock);
706         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
707                 atomic_inc(&rule->dl_refcount);
708                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
709                 wake_up(&delay_dd.dd_waitq);
710         }
711         spin_unlock_bh(&delay_dd.dd_lock);
712 }
713
714 /**
715  * Add a new delay rule to LNet
716  * There is no check for duplicated delay rule, all rules will be checked for
717  * incoming message.
718  */
719 int
720 lnet_delay_rule_add(struct lnet_fault_attr *attr)
721 {
722         struct lnet_delay_rule *rule;
723         int rc = 0;
724
725         if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
726                 CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
727                        attr->u.delay.la_rate, attr->u.delay.la_interval);
728                 return -EINVAL;
729         }
730
731         if (!attr->u.delay.la_latency) {
732                 CDEBUG(D_NET, "delay latency cannot be zero\n");
733                 return -EINVAL;
734         }
735
736         if (lnet_fault_attr_validate(attr))
737                 return -EINVAL;
738
739         CFS_ALLOC_PTR(rule);
740         if (!rule)
741                 return -ENOMEM;
742
743         mutex_lock(&delay_dd.dd_mutex);
744         if (!delay_dd.dd_running) {
745                 struct task_struct *task;
746
747                 /**
748                  *  NB: although LND threads will process delayed message
749                  * in lnet_finalize, but there is no guarantee that LND
750                  * threads will be waken up if no other message needs to
751                  * be handled.
752                  * Only one daemon thread, performance is not the concern
753                  * of this simualation module.
754                  */
755                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
756                 if (IS_ERR(task)) {
757                         rc = PTR_ERR(task);
758                         goto failed;
759                 }
760                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
761         }
762
763         setup_timer(&rule->dl_timer, delay_timer_cb, (unsigned long)rule);
764
765         spin_lock_init(&rule->dl_lock);
766         INIT_LIST_HEAD(&rule->dl_msg_list);
767         INIT_LIST_HEAD(&rule->dl_sched_link);
768
769         rule->dl_attr = *attr;
770         if (attr->u.delay.la_interval) {
771                 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
772                 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
773                                                      attr->u.delay.la_interval);
774         } else {
775                 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
776         }
777
778         rule->dl_msg_send = -1;
779
780         lnet_net_lock(LNET_LOCK_EX);
781         atomic_set(&rule->dl_refcount, 1);
782         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
783         lnet_net_unlock(LNET_LOCK_EX);
784
785         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
786                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
787                attr->u.delay.la_rate);
788
789         mutex_unlock(&delay_dd.dd_mutex);
790         return 0;
791 failed:
792         mutex_unlock(&delay_dd.dd_mutex);
793         CFS_FREE_PTR(rule);
794         return rc;
795 }
796
797 /**
798  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
799  * and \a dst are zero, all rules will be removed, otherwise only matched rules
800  * will be removed.
801  * If \a src is zero, then all rules have \a dst as destination will be remove
802  * If \a dst is zero, then all rules have \a src as source will be removed
803  *
804  * When a delay rule is removed, all delayed messages of this rule will be
805  * processed immediately.
806  */
807 int
808 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
809 {
810         struct lnet_delay_rule *rule;
811         struct lnet_delay_rule *tmp;
812         struct list_head rule_list;
813         struct list_head msg_list;
814         int n = 0;
815         bool cleanup;
816
817         INIT_LIST_HEAD(&rule_list);
818         INIT_LIST_HEAD(&msg_list);
819
820         if (shutdown) {
821                 src = 0;
822                 dst = 0;
823         }
824
825         mutex_lock(&delay_dd.dd_mutex);
826         lnet_net_lock(LNET_LOCK_EX);
827
828         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
829                 if (rule->dl_attr.fa_src != src && src)
830                         continue;
831
832                 if (rule->dl_attr.fa_dst != dst && dst)
833                         continue;
834
835                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
836                        libcfs_nid2str(rule->dl_attr.fa_src),
837                        libcfs_nid2str(rule->dl_attr.fa_dst),
838                        rule->dl_attr.u.delay.la_rate,
839                        rule->dl_attr.u.delay.la_interval);
840                 /* refcount is taken over by rule_list */
841                 list_move(&rule->dl_link, &rule_list);
842         }
843
844         /* check if we need to shutdown delay_daemon */
845         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
846                   !list_empty(&rule_list);
847         lnet_net_unlock(LNET_LOCK_EX);
848
849         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
850                 list_del_init(&rule->dl_link);
851
852                 del_timer_sync(&rule->dl_timer);
853                 delayed_msg_check(rule, true, &msg_list);
854                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
855                 n++;
856         }
857
858         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
859                 LASSERT(delay_dd.dd_running);
860                 delay_dd.dd_running = 0;
861                 wake_up(&delay_dd.dd_waitq);
862
863                 while (!delay_dd.dd_stopped)
864                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
865         }
866         mutex_unlock(&delay_dd.dd_mutex);
867
868         if (!list_empty(&msg_list))
869                 delayed_msg_process(&msg_list, shutdown);
870
871         return n;
872 }
873
874 /**
875  * List Delay Rule at position of \a pos
876  */
877 int
878 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
879                      struct lnet_fault_stat *stat)
880 {
881         struct lnet_delay_rule *rule;
882         int cpt;
883         int i = 0;
884         int rc = -ENOENT;
885
886         cpt = lnet_net_lock_current();
887         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
888                 if (i++ < pos)
889                         continue;
890
891                 spin_lock(&rule->dl_lock);
892                 *attr = rule->dl_attr;
893                 *stat = rule->dl_stat;
894                 spin_unlock(&rule->dl_lock);
895                 rc = 0;
896                 break;
897         }
898
899         lnet_net_unlock(cpt);
900         return rc;
901 }
902
903 /**
904  * reset counters for all Delay Rules
905  */
906 void
907 lnet_delay_rule_reset(void)
908 {
909         struct lnet_delay_rule *rule;
910         int cpt;
911
912         cpt = lnet_net_lock_current();
913
914         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
915                 struct lnet_fault_attr *attr = &rule->dl_attr;
916
917                 spin_lock(&rule->dl_lock);
918
919                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
920                 if (attr->u.delay.la_rate) {
921                         rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
922                 } else {
923                         rule->dl_delay_time = cfs_time_shift(cfs_rand() %
924                                                 attr->u.delay.la_interval);
925                         rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
926                 }
927                 spin_unlock(&rule->dl_lock);
928         }
929
930         lnet_net_unlock(cpt);
931 }
932
933 int
934 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
935 {
936         struct lnet_fault_attr *attr;
937         struct lnet_fault_stat *stat;
938
939         attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
940
941         switch (opc) {
942         default:
943                 return -EINVAL;
944
945         case LNET_CTL_DROP_ADD:
946                 if (!attr)
947                         return -EINVAL;
948
949                 return lnet_drop_rule_add(attr);
950
951         case LNET_CTL_DROP_DEL:
952                 if (!attr)
953                         return -EINVAL;
954
955                 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
956                                                      attr->fa_dst);
957                 return 0;
958
959         case LNET_CTL_DROP_RESET:
960                 lnet_drop_rule_reset();
961                 return 0;
962
963         case LNET_CTL_DROP_LIST:
964                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
965                 if (!attr || !stat)
966                         return -EINVAL;
967
968                 return lnet_drop_rule_list(data->ioc_count, attr, stat);
969
970         case LNET_CTL_DELAY_ADD:
971                 if (!attr)
972                         return -EINVAL;
973
974                 return lnet_delay_rule_add(attr);
975
976         case LNET_CTL_DELAY_DEL:
977                 if (!attr)
978                         return -EINVAL;
979
980                 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
981                                                       attr->fa_dst, false);
982                 return 0;
983
984         case LNET_CTL_DELAY_RESET:
985                 lnet_delay_rule_reset();
986                 return 0;
987
988         case LNET_CTL_DELAY_LIST:
989                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
990                 if (!attr || !stat)
991                         return -EINVAL;
992
993                 return lnet_delay_rule_list(data->ioc_count, attr, stat);
994         }
995 }
996
997 int
998 lnet_fault_init(void)
999 {
1000         CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1001         CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1002         CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1003         CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1004
1005         mutex_init(&delay_dd.dd_mutex);
1006         spin_lock_init(&delay_dd.dd_lock);
1007         init_waitqueue_head(&delay_dd.dd_waitq);
1008         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1009         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1010
1011         return 0;
1012 }
1013
1014 void
1015 lnet_fault_fini(void)
1016 {
1017         lnet_drop_rule_del(0, 0);
1018         lnet_delay_rule_del(0, 0, true);
1019
1020         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1021         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1022         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1023 }