GNU Linux-libre 4.9.309-gnu1
[releases.git] / drivers / staging / lustre / lustre / mdc / mdc_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_MDC
34
35 # include <linux/module.h>
36 # include <linux/kernel.h>
37
38 #include "../include/obd_class.h"
39 #include "mdc_internal.h"
40 #include "../include/lustre_fid.h"
41
42 /* mdc_setattr does its own semaphore handling */
43 static int mdc_reint(struct ptlrpc_request *request,
44                      struct mdc_rpc_lock *rpc_lock,
45                      int level)
46 {
47         int rc;
48
49         request->rq_send_state = level;
50
51         mdc_get_rpc_lock(rpc_lock, NULL);
52         rc = ptlrpc_queue_wait(request);
53         mdc_put_rpc_lock(rpc_lock, NULL);
54         if (rc)
55                 CDEBUG(D_INFO, "error in handling %d\n", rc);
56         else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY))
57                 rc = -EPROTO;
58
59         return rc;
60 }
61
62 /* Find and cancel locally locks matched by inode @bits & @mode in the resource
63  * found by @fid. Found locks are added into @cancel list. Returns the amount of
64  * locks added to @cancels list.
65  */
66 int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
67                             struct list_head *cancels, enum ldlm_mode mode,
68                             __u64 bits)
69 {
70         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
71         ldlm_policy_data_t policy = {};
72         struct ldlm_res_id res_id;
73         struct ldlm_resource *res;
74         int count;
75
76         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
77          * export) but disabled through procfs (flag in NS).
78          *
79          * This distinguishes from a case when ELC is not supported originally,
80          * when we still want to cancel locks in advance and just cancel them
81          * locally, without sending any RPC.
82          */
83         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
84                 return 0;
85
86         fid_build_reg_res_name(fid, &res_id);
87         res = ldlm_resource_get(exp->exp_obd->obd_namespace,
88                                 NULL, &res_id, 0, 0);
89         if (IS_ERR(res))
90                 return 0;
91         LDLM_RESOURCE_ADDREF(res);
92         /* Initialize ibits lock policy. */
93         policy.l_inodebits.bits = bits;
94         count = ldlm_cancel_resource_local(res, cancels, &policy,
95                                            mode, 0, 0, NULL);
96         LDLM_RESOURCE_DELREF(res);
97         ldlm_resource_putref(res);
98         return count;
99 }
100
101 int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
102                 void *ea, size_t ealen, void *ea2, size_t ea2len,
103                 struct ptlrpc_request **request, struct md_open_data **mod)
104 {
105         LIST_HEAD(cancels);
106         struct ptlrpc_request *req;
107         struct mdc_rpc_lock *rpc_lock;
108         struct obd_device *obd = exp->exp_obd;
109         int count = 0, rc;
110         __u64 bits;
111
112         bits = MDS_INODELOCK_UPDATE;
113         if (op_data->op_attr.ia_valid & (ATTR_MODE | ATTR_UID | ATTR_GID))
114                 bits |= MDS_INODELOCK_LOOKUP;
115         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
116             (fid_is_sane(&op_data->op_fid1)))
117                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
118                                                 &cancels, LCK_EX, bits);
119         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
120                                    &RQF_MDS_REINT_SETATTR);
121         if (!req) {
122                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
123                 return -ENOMEM;
124         }
125         if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
126                 req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
127                                      0);
128         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
129         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
130                              ea2len);
131
132         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
133         if (rc) {
134                 ptlrpc_request_free(req);
135                 return rc;
136         }
137
138         rpc_lock = obd->u.cli.cl_rpc_lock;
139
140         if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
141                 CDEBUG(D_INODE, "setting mtime %ld, ctime %ld\n",
142                        LTIME_S(op_data->op_attr.ia_mtime),
143                        LTIME_S(op_data->op_attr.ia_ctime));
144         mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
145
146         ptlrpc_request_set_replen(req);
147         if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
148             req->rq_import->imp_replayable) {
149                 LASSERT(!*mod);
150
151                 *mod = obd_mod_alloc();
152                 if (!*mod) {
153                         DEBUG_REQ(D_ERROR, req, "Can't allocate md_open_data");
154                 } else {
155                         req->rq_replay = 1;
156                         req->rq_cb_data = *mod;
157                         (*mod)->mod_open_req = req;
158                         req->rq_commit_cb = mdc_commit_open;
159                         (*mod)->mod_is_create = true;
160                         /**
161                          * Take an extra reference on \var mod, it protects \var
162                          * mod from being freed on eviction (commit callback is
163                          * called despite rq_replay flag).
164                          * Will be put on mdc_done_writing().
165                          */
166                         obd_mod_get(*mod);
167                 }
168         }
169
170         rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
171
172         /* Save the obtained info in the original RPC for the replay case. */
173         if (rc == 0 && (op_data->op_flags & MF_EPOCH_OPEN)) {
174                 struct mdt_ioepoch *epoch;
175                 struct mdt_body  *body;
176
177                 epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
178                 body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
179                 epoch->handle = body->mbo_handle;
180                 epoch->ioepoch = body->mbo_ioepoch;
181                 req->rq_replay_cb = mdc_replay_open;
182         /** bug 3633, open may be committed and estale answer is not error */
183         } else if (rc == -ESTALE && (op_data->op_flags & MF_SOM_CHANGE)) {
184                 rc = 0;
185         } else if (rc == -ERESTARTSYS) {
186                 rc = 0;
187         }
188         *request = req;
189         if (rc && req->rq_commit_cb) {
190                 /* Put an extra reference on \var mod on error case. */
191                 if (mod && *mod)
192                         obd_mod_put(*mod);
193                 req->rq_commit_cb(req);
194         }
195         return rc;
196 }
197
198 int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
199                const void *data, size_t datalen, umode_t mode,
200                uid_t uid, gid_t gid, cfs_cap_t cap_effective,
201                __u64 rdev, struct ptlrpc_request **request)
202 {
203         struct ptlrpc_request *req;
204         int level, rc;
205         int count, resends = 0;
206         struct obd_import *import = exp->exp_obd->u.cli.cl_import;
207         int generation = import->imp_generation;
208         LIST_HEAD(cancels);
209
210         /* For case if upper layer did not alloc fid, do it now. */
211         if (!fid_is_sane(&op_data->op_fid2)) {
212                 /*
213                  * mdc_fid_alloc() may return errno 1 in case of switch to new
214                  * sequence, handle this.
215                  */
216                 rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
217                 if (rc < 0)
218                         return rc;
219         }
220
221 rebuild:
222         count = 0;
223         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
224             (fid_is_sane(&op_data->op_fid1)))
225                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
226                                                 &cancels, LCK_EX,
227                                                 MDS_INODELOCK_UPDATE);
228
229         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
230                                    &RQF_MDS_REINT_CREATE_ACL);
231         if (!req) {
232                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
233                 return -ENOMEM;
234         }
235         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
236                              op_data->op_namelen + 1);
237         req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
238                              data && datalen ? datalen : 0);
239
240         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
241         if (rc) {
242                 ptlrpc_request_free(req);
243                 return rc;
244         }
245
246         /*
247          * mdc_create_pack() fills msg->bufs[1] with name and msg->bufs[2] with
248          * tgt, for symlinks or lov MD data.
249          */
250         mdc_create_pack(req, op_data, data, datalen, mode, uid,
251                         gid, cap_effective, rdev);
252
253         ptlrpc_request_set_replen(req);
254
255         /* ask ptlrpc not to resend on EINPROGRESS since we have our own retry
256          * logic here
257          */
258         req->rq_no_retry_einprogress = 1;
259
260         if (resends) {
261                 req->rq_generation_set = 1;
262                 req->rq_import_generation = generation;
263                 req->rq_sent = ktime_get_real_seconds() + resends;
264         }
265         level = LUSTRE_IMP_FULL;
266  resend:
267         rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
268
269         /* Resend if we were told to. */
270         if (rc == -ERESTARTSYS) {
271                 level = LUSTRE_IMP_RECOVER;
272                 goto resend;
273         } else if (rc == -EINPROGRESS) {
274                 /* Retry create infinitely until succeed or get other
275                  * error code.
276                  */
277                 ptlrpc_req_finished(req);
278                 resends++;
279
280                 CDEBUG(D_HA, "%s: resend:%d create on "DFID"/"DFID"\n",
281                        exp->exp_obd->obd_name, resends,
282                        PFID(&op_data->op_fid1), PFID(&op_data->op_fid2));
283
284                 if (generation == import->imp_generation) {
285                         goto rebuild;
286                 } else {
287                         CDEBUG(D_HA, "resend cross eviction\n");
288                         return -EIO;
289                 }
290         }
291
292         *request = req;
293         return rc;
294 }
295
296 int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
297                struct ptlrpc_request **request)
298 {
299         LIST_HEAD(cancels);
300         struct obd_device *obd = class_exp2obd(exp);
301         struct ptlrpc_request *req = *request;
302         int count = 0, rc;
303
304         LASSERT(!req);
305
306         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
307             (fid_is_sane(&op_data->op_fid1)))
308                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
309                                                 &cancels, LCK_EX,
310                                                 MDS_INODELOCK_UPDATE);
311         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
312             (fid_is_sane(&op_data->op_fid3)))
313                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
314                                                  &cancels, LCK_EX,
315                                                  MDS_INODELOCK_FULL);
316         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
317                                    &RQF_MDS_REINT_UNLINK);
318         if (!req) {
319                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
320                 return -ENOMEM;
321         }
322         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
323                              op_data->op_namelen + 1);
324
325         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
326         if (rc) {
327                 ptlrpc_request_free(req);
328                 return rc;
329         }
330
331         mdc_unlink_pack(req, op_data);
332
333         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
334                              obd->u.cli.cl_default_mds_easize);
335         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
336                              obd->u.cli.cl_default_mds_cookiesize);
337         ptlrpc_request_set_replen(req);
338
339         *request = req;
340
341         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
342         if (rc == -ERESTARTSYS)
343                 rc = 0;
344         return rc;
345 }
346
347 int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
348              struct ptlrpc_request **request)
349 {
350         LIST_HEAD(cancels);
351         struct obd_device *obd = exp->exp_obd;
352         struct ptlrpc_request *req;
353         int count = 0, rc;
354
355         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
356             (fid_is_sane(&op_data->op_fid2)))
357                 count = mdc_resource_get_unused(exp, &op_data->op_fid2,
358                                                 &cancels, LCK_EX,
359                                                 MDS_INODELOCK_UPDATE);
360         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
361             (fid_is_sane(&op_data->op_fid1)))
362                 count += mdc_resource_get_unused(exp, &op_data->op_fid1,
363                                                  &cancels, LCK_EX,
364                                                  MDS_INODELOCK_UPDATE);
365
366         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_REINT_LINK);
367         if (!req) {
368                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
369                 return -ENOMEM;
370         }
371         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
372                              op_data->op_namelen + 1);
373
374         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
375         if (rc) {
376                 ptlrpc_request_free(req);
377                 return rc;
378         }
379
380         mdc_link_pack(req, op_data);
381         ptlrpc_request_set_replen(req);
382
383         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
384         *request = req;
385         if (rc == -ERESTARTSYS)
386                 rc = 0;
387
388         return rc;
389 }
390
391 int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
392                const char *old, size_t oldlen, const char *new, size_t newlen,
393                struct ptlrpc_request **request)
394 {
395         LIST_HEAD(cancels);
396         struct obd_device *obd = exp->exp_obd;
397         struct ptlrpc_request *req;
398         int count = 0, rc;
399
400         if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
401             (fid_is_sane(&op_data->op_fid1)))
402                 count = mdc_resource_get_unused(exp, &op_data->op_fid1,
403                                                 &cancels, LCK_EX,
404                                                 MDS_INODELOCK_UPDATE);
405         if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
406             (fid_is_sane(&op_data->op_fid2)))
407                 count += mdc_resource_get_unused(exp, &op_data->op_fid2,
408                                                  &cancels, LCK_EX,
409                                                  MDS_INODELOCK_UPDATE);
410         if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
411             (fid_is_sane(&op_data->op_fid3)))
412                 count += mdc_resource_get_unused(exp, &op_data->op_fid3,
413                                                  &cancels, LCK_EX,
414                                                  MDS_INODELOCK_LOOKUP);
415         if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
416             (fid_is_sane(&op_data->op_fid4)))
417                 count += mdc_resource_get_unused(exp, &op_data->op_fid4,
418                                                  &cancels, LCK_EX,
419                                                  MDS_INODELOCK_FULL);
420
421         req = ptlrpc_request_alloc(class_exp2cliimp(exp),
422                                    &RQF_MDS_REINT_RENAME);
423         if (!req) {
424                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
425                 return -ENOMEM;
426         }
427
428         req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
429         req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT,
430                              newlen + 1);
431
432         rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
433         if (rc) {
434                 ptlrpc_request_free(req);
435                 return rc;
436         }
437
438         if (exp_connect_cancelset(exp) && req)
439                 ldlm_cli_cancel_list(&cancels, count, req, 0);
440
441         mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
442
443         req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
444                              obd->u.cli.cl_default_mds_easize);
445         req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
446                              obd->u.cli.cl_default_mds_cookiesize);
447         ptlrpc_request_set_replen(req);
448
449         rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
450         *request = req;
451         if (rc == -ERESTARTSYS)
452                 rc = 0;
453
454         return rc;
455 }