GNU Linux-libre 4.9.337-gnu1
[releases.git] / drivers / staging / lustre / lnet / selftest / rpc.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/selftest/rpc.c
33  *
34  * Author: Isaac Huang <isaac@clusterfs.com>
35  *
36  * 2012-05-13: Liang Zhen <liang@whamcloud.com>
37  * - percpt data for service to improve smp performance
38  * - code cleanup
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42
43 #include "selftest.h"
44
45 enum srpc_state {
46         SRPC_STATE_NONE,
47         SRPC_STATE_NI_INIT,
48         SRPC_STATE_EQ_INIT,
49         SRPC_STATE_RUNNING,
50         SRPC_STATE_STOPPING,
51 };
52
53 static struct smoketest_rpc {
54         spinlock_t       rpc_glock;     /* global lock */
55         struct srpc_service     *rpc_services[SRPC_SERVICE_MAX_ID + 1];
56         lnet_handle_eq_t rpc_lnet_eq;   /* _the_ LNet event queue */
57         enum srpc_state  rpc_state;
58         srpc_counters_t  rpc_counters;
59         __u64            rpc_matchbits; /* matchbits counter */
60 } srpc_data;
61
62 static inline int
63 srpc_serv_portal(int svc_id)
64 {
65         return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ?
66                SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL;
67 }
68
69 /* forward ref's */
70 int srpc_handle_rpc(struct swi_workitem *wi);
71
72 void srpc_get_counters(srpc_counters_t *cnt)
73 {
74         spin_lock(&srpc_data.rpc_glock);
75         *cnt = srpc_data.rpc_counters;
76         spin_unlock(&srpc_data.rpc_glock);
77 }
78
79 void srpc_set_counters(const srpc_counters_t *cnt)
80 {
81         spin_lock(&srpc_data.rpc_glock);
82         srpc_data.rpc_counters = *cnt;
83         spin_unlock(&srpc_data.rpc_glock);
84 }
85
86 static int
87 srpc_add_bulk_page(struct srpc_bulk *bk, struct page *pg, int i, int nob)
88 {
89         nob = min_t(int, nob, PAGE_SIZE);
90
91         LASSERT(nob > 0);
92         LASSERT(i >= 0 && i < bk->bk_niov);
93
94         bk->bk_iovs[i].bv_offset = 0;
95         bk->bk_iovs[i].bv_page = pg;
96         bk->bk_iovs[i].bv_len = nob;
97         return nob;
98 }
99
100 void
101 srpc_free_bulk(struct srpc_bulk *bk)
102 {
103         int i;
104         struct page *pg;
105
106         LASSERT(bk);
107
108         for (i = 0; i < bk->bk_niov; i++) {
109                 pg = bk->bk_iovs[i].bv_page;
110                 if (!pg)
111                         break;
112
113                 __free_page(pg);
114         }
115
116         LIBCFS_FREE(bk, offsetof(struct srpc_bulk, bk_iovs[bk->bk_niov]));
117 }
118
119 struct srpc_bulk *
120 srpc_alloc_bulk(int cpt, unsigned bulk_npg, unsigned bulk_len, int sink)
121 {
122         struct srpc_bulk *bk;
123         int i;
124
125         LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV);
126
127         LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt,
128                          offsetof(struct srpc_bulk, bk_iovs[bulk_npg]));
129         if (!bk) {
130                 CERROR("Can't allocate descriptor for %d pages\n", bulk_npg);
131                 return NULL;
132         }
133
134         memset(bk, 0, offsetof(struct srpc_bulk, bk_iovs[bulk_npg]));
135         bk->bk_sink = sink;
136         bk->bk_len = bulk_len;
137         bk->bk_niov = bulk_npg;
138
139         for (i = 0; i < bulk_npg; i++) {
140                 struct page *pg;
141                 int nob;
142
143                 pg = alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
144                                       GFP_KERNEL, 0);
145                 if (!pg) {
146                         CERROR("Can't allocate page %d of %d\n", i, bulk_npg);
147                         srpc_free_bulk(bk);
148                         return NULL;
149                 }
150
151                 nob = srpc_add_bulk_page(bk, pg, i, bulk_len);
152                 bulk_len -= nob;
153         }
154
155         return bk;
156 }
157
158 static inline __u64
159 srpc_next_id(void)
160 {
161         __u64 id;
162
163         spin_lock(&srpc_data.rpc_glock);
164         id = srpc_data.rpc_matchbits++;
165         spin_unlock(&srpc_data.rpc_glock);
166         return id;
167 }
168
169 static void
170 srpc_init_server_rpc(struct srpc_server_rpc *rpc,
171                      struct srpc_service_cd *scd,
172                      struct srpc_buffer *buffer)
173 {
174         memset(rpc, 0, sizeof(*rpc));
175         swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
176                           srpc_serv_is_framework(scd->scd_svc) ?
177                           lst_sched_serial : lst_sched_test[scd->scd_cpt]);
178
179         rpc->srpc_ev.ev_fired = 1; /* no event expected now */
180
181         rpc->srpc_scd = scd;
182         rpc->srpc_reqstbuf = buffer;
183         rpc->srpc_peer = buffer->buf_peer;
184         rpc->srpc_self = buffer->buf_self;
185         LNetInvalidateHandle(&rpc->srpc_replymdh);
186 }
187
188 static void
189 srpc_service_fini(struct srpc_service *svc)
190 {
191         struct srpc_service_cd *scd;
192         struct srpc_server_rpc *rpc;
193         struct srpc_buffer *buf;
194         struct list_head *q;
195         int i;
196
197         if (!svc->sv_cpt_data)
198                 return;
199
200         cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
201                 while (1) {
202                         if (!list_empty(&scd->scd_buf_posted))
203                                 q = &scd->scd_buf_posted;
204                         else if (!list_empty(&scd->scd_buf_blocked))
205                                 q = &scd->scd_buf_blocked;
206                         else
207                                 break;
208
209                         while (!list_empty(q)) {
210                                 buf = list_entry(q->next, struct srpc_buffer,
211                                                  buf_list);
212                                 list_del(&buf->buf_list);
213                                 LIBCFS_FREE(buf, sizeof(*buf));
214                         }
215                 }
216
217                 LASSERT(list_empty(&scd->scd_rpc_active));
218
219                 while (!list_empty(&scd->scd_rpc_free)) {
220                         rpc = list_entry(scd->scd_rpc_free.next,
221                                          struct srpc_server_rpc,
222                                          srpc_list);
223                         list_del(&rpc->srpc_list);
224                         LIBCFS_FREE(rpc, sizeof(*rpc));
225                 }
226         }
227
228         cfs_percpt_free(svc->sv_cpt_data);
229         svc->sv_cpt_data = NULL;
230 }
231
232 static int
233 srpc_service_nrpcs(struct srpc_service *svc)
234 {
235         int nrpcs = svc->sv_wi_total / svc->sv_ncpts;
236
237         return srpc_serv_is_framework(svc) ?
238                max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
239 }
240
241 int srpc_add_buffer(struct swi_workitem *wi);
242
243 static int
244 srpc_service_init(struct srpc_service *svc)
245 {
246         struct srpc_service_cd *scd;
247         struct srpc_server_rpc *rpc;
248         int nrpcs;
249         int i;
250         int j;
251
252         svc->sv_shuttingdown = 0;
253
254         svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(),
255                                             sizeof(**svc->sv_cpt_data));
256         if (!svc->sv_cpt_data)
257                 return -ENOMEM;
258
259         svc->sv_ncpts = srpc_serv_is_framework(svc) ?
260                         1 : cfs_cpt_number(lnet_cpt_table());
261         nrpcs = srpc_service_nrpcs(svc);
262
263         cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
264                 scd->scd_cpt = i;
265                 scd->scd_svc = svc;
266                 spin_lock_init(&scd->scd_lock);
267                 INIT_LIST_HEAD(&scd->scd_rpc_free);
268                 INIT_LIST_HEAD(&scd->scd_rpc_active);
269                 INIT_LIST_HEAD(&scd->scd_buf_posted);
270                 INIT_LIST_HEAD(&scd->scd_buf_blocked);
271
272                 scd->scd_ev.ev_data = scd;
273                 scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
274
275                 /*
276                  * NB: don't use lst_sched_serial for adding buffer,
277                  * see details in srpc_service_add_buffers()
278                  */
279                 swi_init_workitem(&scd->scd_buf_wi, scd,
280                                   srpc_add_buffer, lst_sched_test[i]);
281
282                 if (i && srpc_serv_is_framework(svc)) {
283                         /*
284                          * NB: framework service only needs srpc_service_cd for
285                          * one partition, but we allocate for all to make
286                          * it easier to implement, it will waste a little
287                          * memory but nobody should care about this
288                          */
289                         continue;
290                 }
291
292                 for (j = 0; j < nrpcs; j++) {
293                         LIBCFS_CPT_ALLOC(rpc, lnet_cpt_table(),
294                                          i, sizeof(*rpc));
295                         if (!rpc) {
296                                 srpc_service_fini(svc);
297                                 return -ENOMEM;
298                         }
299                         list_add(&rpc->srpc_list, &scd->scd_rpc_free);
300                 }
301         }
302
303         return 0;
304 }
305
306 int
307 srpc_add_service(struct srpc_service *sv)
308 {
309         int id = sv->sv_id;
310
311         LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID);
312
313         if (srpc_service_init(sv))
314                 return -ENOMEM;
315
316         spin_lock(&srpc_data.rpc_glock);
317
318         LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
319
320         if (srpc_data.rpc_services[id]) {
321                 spin_unlock(&srpc_data.rpc_glock);
322                 goto failed;
323         }
324
325         srpc_data.rpc_services[id] = sv;
326         spin_unlock(&srpc_data.rpc_glock);
327
328         CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name);
329         return 0;
330
331  failed:
332         srpc_service_fini(sv);
333         return -EBUSY;
334 }
335
336 int
337 srpc_remove_service(struct srpc_service *sv)
338 {
339         int id = sv->sv_id;
340
341         spin_lock(&srpc_data.rpc_glock);
342
343         if (srpc_data.rpc_services[id] != sv) {
344                 spin_unlock(&srpc_data.rpc_glock);
345                 return -ENOENT;
346         }
347
348         srpc_data.rpc_services[id] = NULL;
349         spin_unlock(&srpc_data.rpc_glock);
350         return 0;
351 }
352
353 static int
354 srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
355                        int len, int options, lnet_process_id_t peer,
356                        lnet_handle_md_t *mdh, struct srpc_event *ev)
357 {
358         int rc;
359         lnet_md_t md;
360         lnet_handle_me_t meh;
361
362         rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK,
363                           local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh);
364         if (rc) {
365                 CERROR("LNetMEAttach failed: %d\n", rc);
366                 LASSERT(rc == -ENOMEM);
367                 return -ENOMEM;
368         }
369
370         md.threshold = 1;
371         md.user_ptr = ev;
372         md.start = buf;
373         md.length = len;
374         md.options = options;
375         md.eq_handle = srpc_data.rpc_lnet_eq;
376
377         rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
378         if (rc) {
379                 CERROR("LNetMDAttach failed: %d\n", rc);
380                 LASSERT(rc == -ENOMEM);
381
382                 rc = LNetMEUnlink(meh);
383                 LASSERT(!rc);
384                 return -ENOMEM;
385         }
386
387         CDEBUG(D_NET, "Posted passive RDMA: peer %s, portal %d, matchbits %#llx\n",
388                libcfs_id2str(peer), portal, matchbits);
389         return 0;
390 }
391
392 static int
393 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
394                       int options, lnet_process_id_t peer, lnet_nid_t self,
395                       lnet_handle_md_t *mdh, struct srpc_event *ev)
396 {
397         int rc;
398         lnet_md_t md;
399
400         md.user_ptr = ev;
401         md.start = buf;
402         md.length = len;
403         md.eq_handle = srpc_data.rpc_lnet_eq;
404         md.threshold = options & LNET_MD_OP_GET ? 2 : 1;
405         md.options = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
406
407         rc = LNetMDBind(md, LNET_UNLINK, mdh);
408         if (rc) {
409                 CERROR("LNetMDBind failed: %d\n", rc);
410                 LASSERT(rc == -ENOMEM);
411                 return -ENOMEM;
412         }
413
414         /*
415          * this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
416          * they're only meaningful for MDs attached to an ME (i.e. passive
417          * buffers...
418          */
419         if (options & LNET_MD_OP_PUT) {
420                 rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
421                              portal, matchbits, 0, 0);
422         } else {
423                 LASSERT(options & LNET_MD_OP_GET);
424
425                 rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
426         }
427
428         if (rc) {
429                 CERROR("LNet%s(%s, %d, %lld) failed: %d\n",
430                        options & LNET_MD_OP_PUT ? "Put" : "Get",
431                        libcfs_id2str(peer), portal, matchbits, rc);
432
433                 /*
434                  * The forthcoming unlink event will complete this operation
435                  * with failure, so fall through and return success here.
436                  */
437                 rc = LNetMDUnlink(*mdh);
438                 LASSERT(!rc);
439         } else {
440                 CDEBUG(D_NET, "Posted active RDMA: peer %s, portal %u, matchbits %#llx\n",
441                        libcfs_id2str(peer), portal, matchbits);
442         }
443         return 0;
444 }
445
446 static int
447 srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
448                          lnet_handle_md_t *mdh, struct srpc_event *ev)
449 {
450         lnet_process_id_t any = { 0 };
451
452         any.nid = LNET_NID_ANY;
453         any.pid = LNET_PID_ANY;
454
455         return srpc_post_passive_rdma(srpc_serv_portal(service),
456                                       local, service, buf, len,
457                                       LNET_MD_OP_PUT, any, mdh, ev);
458 }
459
460 static int
461 srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
462 __must_hold(&scd->scd_lock)
463 {
464         struct srpc_service *sv = scd->scd_svc;
465         struct srpc_msg *msg = &buf->buf_msg;
466         int rc;
467
468         LNetInvalidateHandle(&buf->buf_mdh);
469         list_add(&buf->buf_list, &scd->scd_buf_posted);
470         scd->scd_buf_nposted++;
471         spin_unlock(&scd->scd_lock);
472
473         rc = srpc_post_passive_rqtbuf(sv->sv_id,
474                                       !srpc_serv_is_framework(sv),
475                                       msg, sizeof(*msg), &buf->buf_mdh,
476                                       &scd->scd_ev);
477
478         /*
479          * At this point, a RPC (new or delayed) may have arrived in
480          * msg and its event handler has been called. So we must add
481          * buf to scd_buf_posted _before_ dropping scd_lock
482          */
483         spin_lock(&scd->scd_lock);
484
485         if (!rc) {
486                 if (!sv->sv_shuttingdown)
487                         return 0;
488
489                 spin_unlock(&scd->scd_lock);
490                 /*
491                  * srpc_shutdown_service might have tried to unlink me
492                  * when my buf_mdh was still invalid
493                  */
494                 LNetMDUnlink(buf->buf_mdh);
495                 spin_lock(&scd->scd_lock);
496                 return 0;
497         }
498
499         scd->scd_buf_nposted--;
500         if (sv->sv_shuttingdown)
501                 return rc; /* don't allow to change scd_buf_posted */
502
503         list_del(&buf->buf_list);
504         spin_unlock(&scd->scd_lock);
505
506         LIBCFS_FREE(buf, sizeof(*buf));
507
508         spin_lock(&scd->scd_lock);
509         return rc;
510 }
511
512 int
513 srpc_add_buffer(struct swi_workitem *wi)
514 {
515         struct srpc_service_cd *scd = wi->swi_workitem.wi_data;
516         struct srpc_buffer *buf;
517         int rc = 0;
518
519         /*
520          * it's called by workitem scheduler threads, these threads
521          * should have been set CPT affinity, so buffers will be posted
522          * on CPT local list of Portal
523          */
524         spin_lock(&scd->scd_lock);
525
526         while (scd->scd_buf_adjust > 0 &&
527                !scd->scd_svc->sv_shuttingdown) {
528                 scd->scd_buf_adjust--; /* consume it */
529                 scd->scd_buf_posting++;
530
531                 spin_unlock(&scd->scd_lock);
532
533                 LIBCFS_ALLOC(buf, sizeof(*buf));
534                 if (!buf) {
535                         CERROR("Failed to add new buf to service: %s\n",
536                                scd->scd_svc->sv_name);
537                         spin_lock(&scd->scd_lock);
538                         rc = -ENOMEM;
539                         break;
540                 }
541
542                 spin_lock(&scd->scd_lock);
543                 if (scd->scd_svc->sv_shuttingdown) {
544                         spin_unlock(&scd->scd_lock);
545                         LIBCFS_FREE(buf, sizeof(*buf));
546
547                         spin_lock(&scd->scd_lock);
548                         rc = -ESHUTDOWN;
549                         break;
550                 }
551
552                 rc = srpc_service_post_buffer(scd, buf);
553                 if (rc)
554                         break; /* buf has been freed inside */
555
556                 LASSERT(scd->scd_buf_posting > 0);
557                 scd->scd_buf_posting--;
558                 scd->scd_buf_total++;
559                 scd->scd_buf_low = max(2, scd->scd_buf_total / 4);
560         }
561
562         if (rc) {
563                 scd->scd_buf_err_stamp = ktime_get_real_seconds();
564                 scd->scd_buf_err = rc;
565
566                 LASSERT(scd->scd_buf_posting > 0);
567                 scd->scd_buf_posting--;
568         }
569
570         spin_unlock(&scd->scd_lock);
571         return 0;
572 }
573
574 int
575 srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
576 {
577         struct srpc_service_cd *scd;
578         int rc = 0;
579         int i;
580
581         LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer);
582
583         cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
584                 spin_lock(&scd->scd_lock);
585
586                 scd->scd_buf_err = 0;
587                 scd->scd_buf_err_stamp = 0;
588                 scd->scd_buf_posting = 0;
589                 scd->scd_buf_adjust = nbuffer;
590                 /* start to post buffers */
591                 swi_schedule_workitem(&scd->scd_buf_wi);
592                 spin_unlock(&scd->scd_lock);
593
594                 /* framework service only post buffer for one partition  */
595                 if (srpc_serv_is_framework(sv))
596                         break;
597         }
598
599         cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
600                 spin_lock(&scd->scd_lock);
601                 /*
602                  * NB: srpc_service_add_buffers() can be called inside
603                  * thread context of lst_sched_serial, and we don't normally
604                  * allow to sleep inside thread context of WI scheduler
605                  * because it will block current scheduler thread from doing
606                  * anything else, even worse, it could deadlock if it's
607                  * waiting on result from another WI of the same scheduler.
608                  * However, it's safe at here because scd_buf_wi is scheduled
609                  * by thread in a different WI scheduler (lst_sched_test),
610                  * so we don't have any risk of deadlock, though this could
611                  * block all WIs pending on lst_sched_serial for a moment
612                  * which is not good but not fatal.
613                  */
614                 lst_wait_until(scd->scd_buf_err ||
615                                (!scd->scd_buf_adjust &&
616                                 !scd->scd_buf_posting),
617                                scd->scd_lock, "waiting for adding buffer\n");
618
619                 if (scd->scd_buf_err && !rc)
620                         rc = scd->scd_buf_err;
621
622                 spin_unlock(&scd->scd_lock);
623         }
624
625         return rc;
626 }
627
628 void
629 srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer)
630 {
631         struct srpc_service_cd *scd;
632         int num;
633         int i;
634
635         LASSERT(!sv->sv_shuttingdown);
636
637         cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
638                 spin_lock(&scd->scd_lock);
639
640                 num = scd->scd_buf_total + scd->scd_buf_posting;
641                 scd->scd_buf_adjust -= min(nbuffer, num);
642
643                 spin_unlock(&scd->scd_lock);
644         }
645 }
646
647 /* returns 1 if sv has finished, otherwise 0 */
648 int
649 srpc_finish_service(struct srpc_service *sv)
650 {
651         struct srpc_service_cd *scd;
652         struct srpc_server_rpc *rpc;
653         int i;
654
655         LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
656
657         cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
658                 spin_lock(&scd->scd_lock);
659                 if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
660                         spin_unlock(&scd->scd_lock);
661                         return 0;
662                 }
663
664                 if (scd->scd_buf_nposted > 0) {
665                         CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
666                                scd->scd_buf_nposted);
667                         spin_unlock(&scd->scd_lock);
668                         return 0;
669                 }
670
671                 if (list_empty(&scd->scd_rpc_active)) {
672                         spin_unlock(&scd->scd_lock);
673                         continue;
674                 }
675
676                 rpc = list_entry(scd->scd_rpc_active.next,
677                                  struct srpc_server_rpc, srpc_list);
678                 CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
679                         rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
680                         swi_state2str(rpc->srpc_wi.swi_state),
681                         rpc->srpc_wi.swi_workitem.wi_scheduled,
682                         rpc->srpc_wi.swi_workitem.wi_running,
683                         rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
684                         rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
685                 spin_unlock(&scd->scd_lock);
686                 return 0;
687         }
688
689         /* no lock needed from now on */
690         srpc_service_fini(sv);
691         return 1;
692 }
693
694 /* called with sv->sv_lock held */
695 static void
696 srpc_service_recycle_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
697 __must_hold(&scd->scd_lock)
698 {
699         if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
700                 if (srpc_service_post_buffer(scd, buf)) {
701                         CWARN("Failed to post %s buffer\n",
702                               scd->scd_svc->sv_name);
703                 }
704                 return;
705         }
706
707         /* service is shutting down, or we want to recycle some buffers */
708         scd->scd_buf_total--;
709
710         if (scd->scd_buf_adjust < 0) {
711                 scd->scd_buf_adjust++;
712                 if (scd->scd_buf_adjust < 0 &&
713                     !scd->scd_buf_total && !scd->scd_buf_posting) {
714                         CDEBUG(D_INFO,
715                                "Try to recycle %d buffers but nothing left\n",
716                                scd->scd_buf_adjust);
717                         scd->scd_buf_adjust = 0;
718                 }
719         }
720
721         spin_unlock(&scd->scd_lock);
722         LIBCFS_FREE(buf, sizeof(*buf));
723         spin_lock(&scd->scd_lock);
724 }
725
726 void
727 srpc_abort_service(struct srpc_service *sv)
728 {
729         struct srpc_service_cd *scd;
730         struct srpc_server_rpc *rpc;
731         int i;
732
733         CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
734                sv->sv_id, sv->sv_name);
735
736         cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
737                 spin_lock(&scd->scd_lock);
738
739                 /*
740                  * schedule in-flight RPCs to notice the abort, NB:
741                  * racing with incoming RPCs; complete fix should make test
742                  * RPCs carry session ID in its headers
743                  */
744                 list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
745                         rpc->srpc_aborted = 1;
746                         swi_schedule_workitem(&rpc->srpc_wi);
747                 }
748
749                 spin_unlock(&scd->scd_lock);
750         }
751 }
752
753 void
754 srpc_shutdown_service(struct srpc_service *sv)
755 {
756         struct srpc_service_cd *scd;
757         struct srpc_server_rpc *rpc;
758         struct srpc_buffer *buf;
759         int i;
760
761         CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
762                sv->sv_id, sv->sv_name);
763
764         cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
765                 spin_lock(&scd->scd_lock);
766
767         sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
768
769         cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
770                 spin_unlock(&scd->scd_lock);
771
772         cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
773                 spin_lock(&scd->scd_lock);
774
775                 /* schedule in-flight RPCs to notice the shutdown */
776                 list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
777                         swi_schedule_workitem(&rpc->srpc_wi);
778
779                 spin_unlock(&scd->scd_lock);
780
781                 /*
782                  * OK to traverse scd_buf_posted without lock, since no one
783                  * touches scd_buf_posted now
784                  */
785                 list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
786                         LNetMDUnlink(buf->buf_mdh);
787         }
788 }
789
790 static int
791 srpc_send_request(struct srpc_client_rpc *rpc)
792 {
793         struct srpc_event *ev = &rpc->crpc_reqstev;
794         int rc;
795
796         ev->ev_fired = 0;
797         ev->ev_data = rpc;
798         ev->ev_type = SRPC_REQUEST_SENT;
799
800          rc = srpc_post_active_rdma(srpc_serv_portal(rpc->crpc_service),
801                                     rpc->crpc_service, &rpc->crpc_reqstmsg,
802                                     sizeof(struct srpc_msg), LNET_MD_OP_PUT,
803                                     rpc->crpc_dest, LNET_NID_ANY,
804                                     &rpc->crpc_reqstmdh, ev);
805         if (rc) {
806                 LASSERT(rc == -ENOMEM);
807                 ev->ev_fired = 1;  /* no more event expected */
808         }
809         return rc;
810 }
811
812 static int
813 srpc_prepare_reply(struct srpc_client_rpc *rpc)
814 {
815         struct srpc_event *ev = &rpc->crpc_replyev;
816         __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
817         int rc;
818
819         ev->ev_fired = 0;
820         ev->ev_data = rpc;
821         ev->ev_type = SRPC_REPLY_RCVD;
822
823         *id = srpc_next_id();
824
825         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
826                                     &rpc->crpc_replymsg,
827                                     sizeof(struct srpc_msg),
828                                     LNET_MD_OP_PUT, rpc->crpc_dest,
829                                     &rpc->crpc_replymdh, ev);
830         if (rc) {
831                 LASSERT(rc == -ENOMEM);
832                 ev->ev_fired = 1;  /* no more event expected */
833         }
834         return rc;
835 }
836
837 static int
838 srpc_prepare_bulk(struct srpc_client_rpc *rpc)
839 {
840         struct srpc_bulk *bk = &rpc->crpc_bulk;
841         struct srpc_event *ev = &rpc->crpc_bulkev;
842         __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
843         int rc;
844         int opt;
845
846         LASSERT(bk->bk_niov <= LNET_MAX_IOV);
847
848         if (!bk->bk_niov)
849                 return 0; /* nothing to do */
850
851         opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
852         opt |= LNET_MD_KIOV;
853
854         ev->ev_fired = 0;
855         ev->ev_data = rpc;
856         ev->ev_type = SRPC_BULK_REQ_RCVD;
857
858         *id = srpc_next_id();
859
860         rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
861                                     &bk->bk_iovs[0], bk->bk_niov, opt,
862                                     rpc->crpc_dest, &bk->bk_mdh, ev);
863         if (rc) {
864                 LASSERT(rc == -ENOMEM);
865                 ev->ev_fired = 1;  /* no more event expected */
866         }
867         return rc;
868 }
869
870 static int
871 srpc_do_bulk(struct srpc_server_rpc *rpc)
872 {
873         struct srpc_event *ev = &rpc->srpc_ev;
874         struct srpc_bulk *bk = rpc->srpc_bulk;
875         __u64 id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
876         int rc;
877         int opt;
878
879         LASSERT(bk);
880
881         opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
882         opt |= LNET_MD_KIOV;
883
884         ev->ev_fired = 0;
885         ev->ev_data = rpc;
886         ev->ev_type = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
887
888         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
889                                    &bk->bk_iovs[0], bk->bk_niov, opt,
890                                    rpc->srpc_peer, rpc->srpc_self,
891                                    &bk->bk_mdh, ev);
892         if (rc)
893                 ev->ev_fired = 1;  /* no more event expected */
894         return rc;
895 }
896
897 /* only called from srpc_handle_rpc */
898 static void
899 srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
900 {
901         struct srpc_service_cd *scd = rpc->srpc_scd;
902         struct srpc_service *sv = scd->scd_svc;
903         struct srpc_buffer *buffer;
904
905         LASSERT(status || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
906
907         rpc->srpc_status = status;
908
909         CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
910                      "Server RPC %p done: service %s, peer %s, status %s:%d\n",
911                      rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
912                      swi_state2str(rpc->srpc_wi.swi_state), status);
913
914         if (status) {
915                 spin_lock(&srpc_data.rpc_glock);
916                 srpc_data.rpc_counters.rpcs_dropped++;
917                 spin_unlock(&srpc_data.rpc_glock);
918         }
919
920         if (rpc->srpc_done)
921                 (*rpc->srpc_done) (rpc);
922         LASSERT(!rpc->srpc_bulk);
923
924         spin_lock(&scd->scd_lock);
925
926         if (rpc->srpc_reqstbuf) {
927                 /*
928                  * NB might drop sv_lock in srpc_service_recycle_buffer, but
929                  * sv won't go away for scd_rpc_active must not be empty
930                  */
931                 srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf);
932                 rpc->srpc_reqstbuf = NULL;
933         }
934
935         list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
936
937         /*
938          * No one can schedule me now since:
939          * - I'm not on scd_rpc_active.
940          * - all LNet events have been fired.
941          * Cancel pending schedules and prevent future schedule attempts:
942          */
943         LASSERT(rpc->srpc_ev.ev_fired);
944         swi_exit_workitem(&rpc->srpc_wi);
945
946         if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
947                 buffer = list_entry(scd->scd_buf_blocked.next,
948                                     struct srpc_buffer, buf_list);
949                 list_del(&buffer->buf_list);
950
951                 srpc_init_server_rpc(rpc, scd, buffer);
952                 list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
953                 swi_schedule_workitem(&rpc->srpc_wi);
954         } else {
955                 list_add(&rpc->srpc_list, &scd->scd_rpc_free);
956         }
957
958         spin_unlock(&scd->scd_lock);
959 }
960
961 /* handles an incoming RPC */
962 int
963 srpc_handle_rpc(struct swi_workitem *wi)
964 {
965         struct srpc_server_rpc *rpc = wi->swi_workitem.wi_data;
966         struct srpc_service_cd *scd = rpc->srpc_scd;
967         struct srpc_service *sv = scd->scd_svc;
968         struct srpc_event *ev = &rpc->srpc_ev;
969         int rc = 0;
970
971         LASSERT(wi == &rpc->srpc_wi);
972
973         spin_lock(&scd->scd_lock);
974
975         if (sv->sv_shuttingdown || rpc->srpc_aborted) {
976                 spin_unlock(&scd->scd_lock);
977
978                 if (rpc->srpc_bulk)
979                         LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
980                 LNetMDUnlink(rpc->srpc_replymdh);
981
982                 if (ev->ev_fired) { /* no more event, OK to finish */
983                         srpc_server_rpc_done(rpc, -ESHUTDOWN);
984                         return 1;
985                 }
986                 return 0;
987         }
988
989         spin_unlock(&scd->scd_lock);
990
991         switch (wi->swi_state) {
992         default:
993                 LBUG();
994         case SWI_STATE_NEWBORN: {
995                 struct srpc_msg *msg;
996                 struct srpc_generic_reply *reply;
997
998                 msg = &rpc->srpc_reqstbuf->buf_msg;
999                 reply = &rpc->srpc_replymsg.msg_body.reply;
1000
1001                 if (!msg->msg_magic) {
1002                         /* moaned already in srpc_lnet_ev_handler */
1003                         srpc_server_rpc_done(rpc, EBADMSG);
1004                         return 1;
1005                 }
1006
1007                 srpc_unpack_msg_hdr(msg);
1008                 if (msg->msg_version != SRPC_MSG_VERSION) {
1009                         CWARN("Version mismatch: %u, %u expected, from %s\n",
1010                               msg->msg_version, SRPC_MSG_VERSION,
1011                               libcfs_id2str(rpc->srpc_peer));
1012                         reply->status = EPROTO;
1013                         /* drop through and send reply */
1014                 } else {
1015                         reply->status = 0;
1016                         rc = (*sv->sv_handler)(rpc);
1017                         LASSERT(!reply->status || !rpc->srpc_bulk);
1018                         if (rc) {
1019                                 srpc_server_rpc_done(rpc, rc);
1020                                 return 1;
1021                         }
1022                 }
1023
1024                 wi->swi_state = SWI_STATE_BULK_STARTED;
1025
1026                 if (rpc->srpc_bulk) {
1027                         rc = srpc_do_bulk(rpc);
1028                         if (!rc)
1029                                 return 0; /* wait for bulk */
1030
1031                         LASSERT(ev->ev_fired);
1032                         ev->ev_status = rc;
1033                 }
1034         }
1035         case SWI_STATE_BULK_STARTED:
1036                 LASSERT(!rpc->srpc_bulk || ev->ev_fired);
1037
1038                 if (rpc->srpc_bulk) {
1039                         rc = ev->ev_status;
1040
1041                         if (sv->sv_bulk_ready)
1042                                 rc = (*sv->sv_bulk_ready) (rpc, rc);
1043
1044                         if (rc) {
1045                                 srpc_server_rpc_done(rpc, rc);
1046                                 return 1;
1047                         }
1048                 }
1049
1050                 wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
1051                 rc = srpc_send_reply(rpc);
1052                 if (!rc)
1053                         return 0; /* wait for reply */
1054                 srpc_server_rpc_done(rpc, rc);
1055                 return 1;
1056
1057         case SWI_STATE_REPLY_SUBMITTED:
1058                 if (!ev->ev_fired) {
1059                         CERROR("RPC %p: bulk %p, service %d\n",
1060                                rpc, rpc->srpc_bulk, sv->sv_id);
1061                         CERROR("Event: status %d, type %d, lnet %d\n",
1062                                ev->ev_status, ev->ev_type, ev->ev_lnet);
1063                         LASSERT(ev->ev_fired);
1064                 }
1065
1066                 wi->swi_state = SWI_STATE_DONE;
1067                 srpc_server_rpc_done(rpc, ev->ev_status);
1068                 return 1;
1069         }
1070
1071         return 0;
1072 }
1073
1074 static void
1075 srpc_client_rpc_expired(void *data)
1076 {
1077         struct srpc_client_rpc *rpc = data;
1078
1079         CWARN("Client RPC expired: service %d, peer %s, timeout %d.\n",
1080               rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1081               rpc->crpc_timeout);
1082
1083         spin_lock(&rpc->crpc_lock);
1084
1085         rpc->crpc_timeout = 0;
1086         srpc_abort_rpc(rpc, -ETIMEDOUT);
1087
1088         spin_unlock(&rpc->crpc_lock);
1089
1090         spin_lock(&srpc_data.rpc_glock);
1091         srpc_data.rpc_counters.rpcs_expired++;
1092         spin_unlock(&srpc_data.rpc_glock);
1093 }
1094
1095 static void
1096 srpc_add_client_rpc_timer(struct srpc_client_rpc *rpc)
1097 {
1098         struct stt_timer *timer = &rpc->crpc_timer;
1099
1100         if (!rpc->crpc_timeout)
1101                 return;
1102
1103         INIT_LIST_HEAD(&timer->stt_list);
1104         timer->stt_data = rpc;
1105         timer->stt_func = srpc_client_rpc_expired;
1106         timer->stt_expires = ktime_get_real_seconds() + rpc->crpc_timeout;
1107         stt_add_timer(timer);
1108 }
1109
1110 /*
1111  * Called with rpc->crpc_lock held.
1112  *
1113  * Upon exit the RPC expiry timer is not queued and the handler is not
1114  * running on any CPU.
1115  */
1116 static void
1117 srpc_del_client_rpc_timer(struct srpc_client_rpc *rpc)
1118 {
1119         /* timer not planted or already exploded */
1120         if (!rpc->crpc_timeout)
1121                 return;
1122
1123         /* timer successfully defused */
1124         if (stt_del_timer(&rpc->crpc_timer))
1125                 return;
1126
1127         /* timer detonated, wait for it to explode */
1128         while (rpc->crpc_timeout) {
1129                 spin_unlock(&rpc->crpc_lock);
1130
1131                 schedule();
1132
1133                 spin_lock(&rpc->crpc_lock);
1134         }
1135 }
1136
1137 static void
1138 srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
1139 {
1140         struct swi_workitem *wi = &rpc->crpc_wi;
1141
1142         LASSERT(status || wi->swi_state == SWI_STATE_DONE);
1143
1144         spin_lock(&rpc->crpc_lock);
1145
1146         rpc->crpc_closed = 1;
1147         if (!rpc->crpc_status)
1148                 rpc->crpc_status = status;
1149
1150         srpc_del_client_rpc_timer(rpc);
1151
1152         CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
1153                      "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1154                      rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1155                      swi_state2str(wi->swi_state), rpc->crpc_aborted, status);
1156
1157         /*
1158          * No one can schedule me now since:
1159          * - RPC timer has been defused.
1160          * - all LNet events have been fired.
1161          * - crpc_closed has been set, preventing srpc_abort_rpc from
1162          *   scheduling me.
1163          * Cancel pending schedules and prevent future schedule attempts:
1164          */
1165         LASSERT(!srpc_event_pending(rpc));
1166         swi_exit_workitem(wi);
1167
1168         spin_unlock(&rpc->crpc_lock);
1169
1170         (*rpc->crpc_done)(rpc);
1171 }
1172
1173 /* sends an outgoing RPC */
1174 int
1175 srpc_send_rpc(struct swi_workitem *wi)
1176 {
1177         int rc = 0;
1178         struct srpc_client_rpc *rpc;
1179         struct srpc_msg *reply;
1180         int do_bulk;
1181
1182         LASSERT(wi);
1183
1184         rpc = wi->swi_workitem.wi_data;
1185
1186         LASSERT(rpc);
1187         LASSERT(wi == &rpc->crpc_wi);
1188
1189         reply = &rpc->crpc_replymsg;
1190         do_bulk = rpc->crpc_bulk.bk_niov > 0;
1191
1192         spin_lock(&rpc->crpc_lock);
1193
1194         if (rpc->crpc_aborted) {
1195                 spin_unlock(&rpc->crpc_lock);
1196                 goto abort;
1197         }
1198
1199         spin_unlock(&rpc->crpc_lock);
1200
1201         switch (wi->swi_state) {
1202         default:
1203                 LBUG();
1204         case SWI_STATE_NEWBORN:
1205                 LASSERT(!srpc_event_pending(rpc));
1206
1207                 rc = srpc_prepare_reply(rpc);
1208                 if (rc) {
1209                         srpc_client_rpc_done(rpc, rc);
1210                         return 1;
1211                 }
1212
1213                 rc = srpc_prepare_bulk(rpc);
1214                 if (rc)
1215                         break;
1216
1217                 wi->swi_state = SWI_STATE_REQUEST_SUBMITTED;
1218                 rc = srpc_send_request(rpc);
1219                 break;
1220
1221         case SWI_STATE_REQUEST_SUBMITTED:
1222                 /*
1223                  * CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1224                  * order; however, they're processed in a strict order:
1225                  * rqt, rpy, and bulk.
1226                  */
1227                 if (!rpc->crpc_reqstev.ev_fired)
1228                         break;
1229
1230                 rc = rpc->crpc_reqstev.ev_status;
1231                 if (rc)
1232                         break;
1233
1234                 wi->swi_state = SWI_STATE_REQUEST_SENT;
1235                 /* perhaps more events, fall thru */
1236         case SWI_STATE_REQUEST_SENT: {
1237                 enum srpc_msg_type type = srpc_service2reply(rpc->crpc_service);
1238
1239                 if (!rpc->crpc_replyev.ev_fired)
1240                         break;
1241
1242                 rc = rpc->crpc_replyev.ev_status;
1243                 if (rc)
1244                         break;
1245
1246                 srpc_unpack_msg_hdr(reply);
1247                 if (reply->msg_type != type ||
1248                     (reply->msg_magic != SRPC_MSG_MAGIC &&
1249                      reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1250                         CWARN("Bad message from %s: type %u (%d expected), magic %u (%d expected).\n",
1251                               libcfs_id2str(rpc->crpc_dest),
1252                               reply->msg_type, type,
1253                               reply->msg_magic, SRPC_MSG_MAGIC);
1254                         rc = -EBADMSG;
1255                         break;
1256                 }
1257
1258                 if (do_bulk && reply->msg_body.reply.status) {
1259                         CWARN("Remote error %d at %s, unlink bulk buffer in case peer didn't initiate bulk transfer\n",
1260                               reply->msg_body.reply.status,
1261                               libcfs_id2str(rpc->crpc_dest));
1262                         LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1263                 }
1264
1265                 wi->swi_state = SWI_STATE_REPLY_RECEIVED;
1266         }
1267         case SWI_STATE_REPLY_RECEIVED:
1268                 if (do_bulk && !rpc->crpc_bulkev.ev_fired)
1269                         break;
1270
1271                 rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1272
1273                 /*
1274                  * Bulk buffer was unlinked due to remote error. Clear error
1275                  * since reply buffer still contains valid data.
1276                  * NB rpc->crpc_done shouldn't look into bulk data in case of
1277                  * remote error.
1278                  */
1279                 if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1280                     !rpc->crpc_status && reply->msg_body.reply.status)
1281                         rc = 0;
1282
1283                 wi->swi_state = SWI_STATE_DONE;
1284                 srpc_client_rpc_done(rpc, rc);
1285                 return 1;
1286         }
1287
1288         if (rc) {
1289                 spin_lock(&rpc->crpc_lock);
1290                 srpc_abort_rpc(rpc, rc);
1291                 spin_unlock(&rpc->crpc_lock);
1292         }
1293
1294 abort:
1295         if (rpc->crpc_aborted) {
1296                 LNetMDUnlink(rpc->crpc_reqstmdh);
1297                 LNetMDUnlink(rpc->crpc_replymdh);
1298                 LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1299
1300                 if (!srpc_event_pending(rpc)) {
1301                         srpc_client_rpc_done(rpc, -EINTR);
1302                         return 1;
1303                 }
1304         }
1305         return 0;
1306 }
1307
1308 struct srpc_client_rpc *
1309 srpc_create_client_rpc(lnet_process_id_t peer, int service,
1310                        int nbulkiov, int bulklen,
1311                        void (*rpc_done)(struct srpc_client_rpc *),
1312                        void (*rpc_fini)(struct srpc_client_rpc *), void *priv)
1313 {
1314         struct srpc_client_rpc *rpc;
1315
1316         LIBCFS_ALLOC(rpc, offsetof(struct srpc_client_rpc,
1317                                    crpc_bulk.bk_iovs[nbulkiov]));
1318         if (!rpc)
1319                 return NULL;
1320
1321         srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1322                              bulklen, rpc_done, rpc_fini, priv);
1323         return rpc;
1324 }
1325
1326 /* called with rpc->crpc_lock held */
1327 void
1328 srpc_abort_rpc(struct srpc_client_rpc *rpc, int why)
1329 {
1330         LASSERT(why);
1331
1332         if (rpc->crpc_aborted ||        /* already aborted */
1333             rpc->crpc_closed)           /* callback imminent */
1334                 return;
1335
1336         CDEBUG(D_NET, "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1337                rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1338                swi_state2str(rpc->crpc_wi.swi_state), why);
1339
1340         rpc->crpc_aborted = 1;
1341         rpc->crpc_status = why;
1342         swi_schedule_workitem(&rpc->crpc_wi);
1343 }
1344
1345 /* called with rpc->crpc_lock held */
1346 void
1347 srpc_post_rpc(struct srpc_client_rpc *rpc)
1348 {
1349         LASSERT(!rpc->crpc_aborted);
1350         LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
1351
1352         CDEBUG(D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1353                libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1354                rpc->crpc_timeout);
1355
1356         srpc_add_client_rpc_timer(rpc);
1357         swi_schedule_workitem(&rpc->crpc_wi);
1358 }
1359
1360 int
1361 srpc_send_reply(struct srpc_server_rpc *rpc)
1362 {
1363         struct srpc_event *ev = &rpc->srpc_ev;
1364         struct srpc_msg *msg = &rpc->srpc_replymsg;
1365         struct srpc_buffer *buffer = rpc->srpc_reqstbuf;
1366         struct srpc_service_cd *scd = rpc->srpc_scd;
1367         struct srpc_service *sv = scd->scd_svc;
1368         __u64 rpyid;
1369         int rc;
1370
1371         LASSERT(buffer);
1372         rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1373
1374         spin_lock(&scd->scd_lock);
1375
1376         if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) {
1377                 /*
1378                  * Repost buffer before replying since test client
1379                  * might send me another RPC once it gets the reply
1380                  */
1381                 if (srpc_service_post_buffer(scd, buffer))
1382                         CWARN("Failed to repost %s buffer\n", sv->sv_name);
1383                 rpc->srpc_reqstbuf = NULL;
1384         }
1385
1386         spin_unlock(&scd->scd_lock);
1387
1388         ev->ev_fired = 0;
1389         ev->ev_data = rpc;
1390         ev->ev_type = SRPC_REPLY_SENT;
1391
1392         msg->msg_magic = SRPC_MSG_MAGIC;
1393         msg->msg_version = SRPC_MSG_VERSION;
1394         msg->msg_type = srpc_service2reply(sv->sv_id);
1395
1396         rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1397                                    sizeof(*msg), LNET_MD_OP_PUT,
1398                                    rpc->srpc_peer, rpc->srpc_self,
1399                                    &rpc->srpc_replymdh, ev);
1400         if (rc)
1401                 ev->ev_fired = 1; /* no more event expected */
1402         return rc;
1403 }
1404
1405 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1406 static void
1407 srpc_lnet_ev_handler(lnet_event_t *ev)
1408 {
1409         struct srpc_service_cd *scd;
1410         struct srpc_event *rpcev = ev->md.user_ptr;
1411         struct srpc_client_rpc *crpc;
1412         struct srpc_server_rpc *srpc;
1413         struct srpc_buffer *buffer;
1414         struct srpc_service *sv;
1415         struct srpc_msg *msg;
1416         enum srpc_msg_type type;
1417
1418         LASSERT(!in_interrupt());
1419
1420         if (ev->status) {
1421                 __u32 errors;
1422
1423                 spin_lock(&srpc_data.rpc_glock);
1424                 if (ev->status != -ECANCELED) /* cancellation is not error */
1425                         srpc_data.rpc_counters.errors++;
1426                 errors = srpc_data.rpc_counters.errors;
1427                 spin_unlock(&srpc_data.rpc_glock);
1428
1429                 CNETERR("LNet event status %d type %d, RPC errors %u\n",
1430                         ev->status, ev->type, errors);
1431         }
1432
1433         rpcev->ev_lnet = ev->type;
1434
1435         switch (rpcev->ev_type) {
1436         default:
1437                 CERROR("Unknown event: status %d, type %d, lnet %d\n",
1438                        rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1439                 LBUG();
1440         case SRPC_REQUEST_SENT:
1441                 if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1442                         spin_lock(&srpc_data.rpc_glock);
1443                         srpc_data.rpc_counters.rpcs_sent++;
1444                         spin_unlock(&srpc_data.rpc_glock);
1445                 }
1446         case SRPC_REPLY_RCVD:
1447         case SRPC_BULK_REQ_RCVD:
1448                 crpc = rpcev->ev_data;
1449
1450                 if (rpcev != &crpc->crpc_reqstev &&
1451                     rpcev != &crpc->crpc_replyev &&
1452                     rpcev != &crpc->crpc_bulkev) {
1453                         CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
1454                                rpcev, crpc, &crpc->crpc_reqstev,
1455                                &crpc->crpc_replyev, &crpc->crpc_bulkev);
1456                         CERROR("Bad event: status %d, type %d, lnet %d\n",
1457                                rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1458                         LBUG();
1459                 }
1460
1461                 spin_lock(&crpc->crpc_lock);
1462
1463                 LASSERT(!rpcev->ev_fired);
1464                 rpcev->ev_fired = 1;
1465                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1466                                                 -EINTR : ev->status;
1467                 swi_schedule_workitem(&crpc->crpc_wi);
1468
1469                 spin_unlock(&crpc->crpc_lock);
1470                 break;
1471
1472         case SRPC_REQUEST_RCVD:
1473                 scd = rpcev->ev_data;
1474                 sv = scd->scd_svc;
1475
1476                 LASSERT(rpcev == &scd->scd_ev);
1477
1478                 spin_lock(&scd->scd_lock);
1479
1480                 LASSERT(ev->unlinked);
1481                 LASSERT(ev->type == LNET_EVENT_PUT ||
1482                         ev->type == LNET_EVENT_UNLINK);
1483                 LASSERT(ev->type != LNET_EVENT_UNLINK ||
1484                         sv->sv_shuttingdown);
1485
1486                 buffer = container_of(ev->md.start, struct srpc_buffer, buf_msg);
1487                 buffer->buf_peer = ev->initiator;
1488                 buffer->buf_self = ev->target.nid;
1489
1490                 LASSERT(scd->scd_buf_nposted > 0);
1491                 scd->scd_buf_nposted--;
1492
1493                 if (sv->sv_shuttingdown) {
1494                         /*
1495                          * Leave buffer on scd->scd_buf_nposted since
1496                          * srpc_finish_service needs to traverse it.
1497                          */
1498                         spin_unlock(&scd->scd_lock);
1499                         break;
1500                 }
1501
1502                 if (scd->scd_buf_err_stamp &&
1503                     scd->scd_buf_err_stamp < ktime_get_real_seconds()) {
1504                         /* re-enable adding buffer */
1505                         scd->scd_buf_err_stamp = 0;
1506                         scd->scd_buf_err = 0;
1507                 }
1508
1509                 if (!scd->scd_buf_err &&        /* adding buffer is enabled */
1510                     !scd->scd_buf_adjust &&
1511                     scd->scd_buf_nposted < scd->scd_buf_low) {
1512                         scd->scd_buf_adjust = max(scd->scd_buf_total / 2,
1513                                                   SFW_TEST_WI_MIN);
1514                         swi_schedule_workitem(&scd->scd_buf_wi);
1515                 }
1516
1517                 list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
1518                 msg = &buffer->buf_msg;
1519                 type = srpc_service2request(sv->sv_id);
1520
1521                 if (ev->status || ev->mlength != sizeof(*msg) ||
1522                     (msg->msg_type != type &&
1523                      msg->msg_type != __swab32(type)) ||
1524                     (msg->msg_magic != SRPC_MSG_MAGIC &&
1525                      msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1526                         CERROR("Dropping RPC (%s) from %s: status %d mlength %d type %u magic %u.\n",
1527                                sv->sv_name, libcfs_id2str(ev->initiator),
1528                                ev->status, ev->mlength,
1529                                msg->msg_type, msg->msg_magic);
1530
1531                         /*
1532                          * NB can't call srpc_service_recycle_buffer here since
1533                          * it may call LNetM[DE]Attach. The invalid magic tells
1534                          * srpc_handle_rpc to drop this RPC
1535                          */
1536                         msg->msg_magic = 0;
1537                 }
1538
1539                 if (!list_empty(&scd->scd_rpc_free)) {
1540                         srpc = list_entry(scd->scd_rpc_free.next,
1541                                           struct srpc_server_rpc,
1542                                           srpc_list);
1543                         list_del(&srpc->srpc_list);
1544
1545                         srpc_init_server_rpc(srpc, scd, buffer);
1546                         list_add_tail(&srpc->srpc_list,
1547                                       &scd->scd_rpc_active);
1548                         swi_schedule_workitem(&srpc->srpc_wi);
1549                 } else {
1550                         list_add_tail(&buffer->buf_list,
1551                                       &scd->scd_buf_blocked);
1552                 }
1553
1554                 spin_unlock(&scd->scd_lock);
1555
1556                 spin_lock(&srpc_data.rpc_glock);
1557                 srpc_data.rpc_counters.rpcs_rcvd++;
1558                 spin_unlock(&srpc_data.rpc_glock);
1559                 break;
1560
1561         case SRPC_BULK_GET_RPLD:
1562                 LASSERT(ev->type == LNET_EVENT_SEND ||
1563                         ev->type == LNET_EVENT_REPLY ||
1564                         ev->type == LNET_EVENT_UNLINK);
1565
1566                 if (!ev->unlinked)
1567                         break; /* wait for final event */
1568
1569         case SRPC_BULK_PUT_SENT:
1570                 if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1571                         spin_lock(&srpc_data.rpc_glock);
1572
1573                         if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1574                                 srpc_data.rpc_counters.bulk_get += ev->mlength;
1575                         else
1576                                 srpc_data.rpc_counters.bulk_put += ev->mlength;
1577
1578                         spin_unlock(&srpc_data.rpc_glock);
1579                 }
1580         case SRPC_REPLY_SENT:
1581                 srpc = rpcev->ev_data;
1582                 scd = srpc->srpc_scd;
1583
1584                 LASSERT(rpcev == &srpc->srpc_ev);
1585
1586                 spin_lock(&scd->scd_lock);
1587
1588                 rpcev->ev_fired = 1;
1589                 rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1590                                    -EINTR : ev->status;
1591                 swi_schedule_workitem(&srpc->srpc_wi);
1592
1593                 spin_unlock(&scd->scd_lock);
1594                 break;
1595         }
1596 }
1597
1598 int
1599 srpc_startup(void)
1600 {
1601         int rc;
1602
1603         memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1604         spin_lock_init(&srpc_data.rpc_glock);
1605
1606         /* 1 second pause to avoid timestamp reuse */
1607         set_current_state(TASK_UNINTERRUPTIBLE);
1608         schedule_timeout(cfs_time_seconds(1));
1609         srpc_data.rpc_matchbits = ((__u64)ktime_get_real_seconds()) << 48;
1610
1611         srpc_data.rpc_state = SRPC_STATE_NONE;
1612
1613         rc = LNetNIInit(LNET_PID_LUSTRE);
1614         if (rc < 0) {
1615                 CERROR("LNetNIInit() has failed: %d\n", rc);
1616                 return rc;
1617         }
1618
1619         srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1620
1621         LNetInvalidateHandle(&srpc_data.rpc_lnet_eq);
1622         rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1623         if (rc) {
1624                 CERROR("LNetEQAlloc() has failed: %d\n", rc);
1625                 goto bail;
1626         }
1627
1628         rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1629         LASSERT(!rc);
1630         rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL);
1631         LASSERT(!rc);
1632
1633         srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1634
1635         rc = stt_startup();
1636
1637 bail:
1638         if (rc)
1639                 srpc_shutdown();
1640         else
1641                 srpc_data.rpc_state = SRPC_STATE_RUNNING;
1642
1643         return rc;
1644 }
1645
1646 void
1647 srpc_shutdown(void)
1648 {
1649         int i;
1650         int rc;
1651         int state;
1652
1653         state = srpc_data.rpc_state;
1654         srpc_data.rpc_state = SRPC_STATE_STOPPING;
1655
1656         switch (state) {
1657         default:
1658                 LBUG();
1659         case SRPC_STATE_RUNNING:
1660                 spin_lock(&srpc_data.rpc_glock);
1661
1662                 for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1663                         struct srpc_service *sv = srpc_data.rpc_services[i];
1664
1665                         LASSERTF(!sv, "service not empty: id %d, name %s\n",
1666                                  i, sv->sv_name);
1667                 }
1668
1669                 spin_unlock(&srpc_data.rpc_glock);
1670
1671                 stt_shutdown();
1672
1673         case SRPC_STATE_EQ_INIT:
1674                 rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1675                 rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL);
1676                 LASSERT(!rc);
1677                 rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1678                 LASSERT(!rc); /* the EQ should have no user by now */
1679
1680         case SRPC_STATE_NI_INIT:
1681                 LNetNIFini();
1682         }
1683 }