GNU Linux-libre 4.9.337-gnu1
[releases.git] / drivers / staging / lustre / lustre / obdclass / llog.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/llog.c
33  *
34  * OST<->MDS recovery logging infrastructure.
35  * Invariants in implementation:
36  * - we do not share logs among different OST<->MDS connections, so that
37  *   if an OST or MDS fails it need only look at log(s) relevant to itself
38  *
39  * Author: Andreas Dilger <adilger@clusterfs.com>
40  * Author: Alex Zhuravlev <bzzz@whamcloud.com>
41  * Author: Mikhail Pershin <tappro@whamcloud.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_LOG
45
46 #include "../include/obd_class.h"
47 #include "../include/lustre_log.h"
48 #include "llog_internal.h"
49
50 /*
51  * Allocate a new log or catalog handle
52  * Used inside llog_open().
53  */
54 static struct llog_handle *llog_alloc_handle(void)
55 {
56         struct llog_handle *loghandle;
57
58         loghandle = kzalloc(sizeof(*loghandle), GFP_NOFS);
59         if (!loghandle)
60                 return NULL;
61
62         init_rwsem(&loghandle->lgh_lock);
63         spin_lock_init(&loghandle->lgh_hdr_lock);
64         INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
65         atomic_set(&loghandle->lgh_refcount, 1);
66
67         return loghandle;
68 }
69
70 /*
71  * Free llog handle and header data if exists. Used in llog_close() only
72  */
73 static void llog_free_handle(struct llog_handle *loghandle)
74 {
75         /* failed llog_init_handle */
76         if (!loghandle->lgh_hdr)
77                 goto out;
78
79         if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN)
80                 LASSERT(list_empty(&loghandle->u.phd.phd_entry));
81         else if (loghandle->lgh_hdr->llh_flags & LLOG_F_IS_CAT)
82                 LASSERT(list_empty(&loghandle->u.chd.chd_head));
83         LASSERT(sizeof(*loghandle->lgh_hdr) == LLOG_CHUNK_SIZE);
84         kfree(loghandle->lgh_hdr);
85 out:
86         kfree(loghandle);
87 }
88
89 void llog_handle_get(struct llog_handle *loghandle)
90 {
91         atomic_inc(&loghandle->lgh_refcount);
92 }
93
94 void llog_handle_put(struct llog_handle *loghandle)
95 {
96         LASSERT(atomic_read(&loghandle->lgh_refcount) > 0);
97         if (atomic_dec_and_test(&loghandle->lgh_refcount))
98                 llog_free_handle(loghandle);
99 }
100
101 static int llog_read_header(const struct lu_env *env,
102                             struct llog_handle *handle,
103                             struct obd_uuid *uuid)
104 {
105         struct llog_operations *lop;
106         int rc;
107
108         rc = llog_handle2ops(handle, &lop);
109         if (rc)
110                 return rc;
111
112         if (!lop->lop_read_header)
113                 return -EOPNOTSUPP;
114
115         rc = lop->lop_read_header(env, handle);
116         if (rc == LLOG_EEMPTY) {
117                 struct llog_log_hdr *llh = handle->lgh_hdr;
118
119                 handle->lgh_last_idx = 0; /* header is record with index 0 */
120                 llh->llh_count = 1;      /* for the header record */
121                 llh->llh_hdr.lrh_type = LLOG_HDR_MAGIC;
122                 llh->llh_hdr.lrh_len = LLOG_CHUNK_SIZE;
123                 llh->llh_tail.lrt_len = LLOG_CHUNK_SIZE;
124                 llh->llh_hdr.lrh_index = 0;
125                 llh->llh_tail.lrt_index = 0;
126                 llh->llh_timestamp = ktime_get_real_seconds();
127                 if (uuid)
128                         memcpy(&llh->llh_tgtuuid, uuid,
129                                sizeof(llh->llh_tgtuuid));
130                 llh->llh_bitmap_offset = offsetof(typeof(*llh), llh_bitmap);
131                 ext2_set_bit(0, llh->llh_bitmap);
132                 rc = 0;
133         }
134         return rc;
135 }
136
137 int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
138                      int flags, struct obd_uuid *uuid)
139 {
140         enum llog_flag fmt = flags & LLOG_F_EXT_MASK;
141         struct llog_log_hdr     *llh;
142         int                      rc;
143
144         LASSERT(!handle->lgh_hdr);
145
146         llh = kzalloc(sizeof(*llh), GFP_NOFS);
147         if (!llh)
148                 return -ENOMEM;
149         handle->lgh_hdr = llh;
150         /* first assign flags to use llog_client_ops */
151         llh->llh_flags = flags;
152         rc = llog_read_header(env, handle, uuid);
153         if (rc == 0) {
154                 if (unlikely((llh->llh_flags & LLOG_F_IS_PLAIN &&
155                               flags & LLOG_F_IS_CAT) ||
156                              (llh->llh_flags & LLOG_F_IS_CAT &&
157                               flags & LLOG_F_IS_PLAIN))) {
158                         CERROR("%s: llog type is %s but initializing %s\n",
159                                handle->lgh_ctxt->loc_obd->obd_name,
160                                llh->llh_flags & LLOG_F_IS_CAT ?
161                                "catalog" : "plain",
162                                flags & LLOG_F_IS_CAT ? "catalog" : "plain");
163                         rc = -EINVAL;
164                         goto out;
165                 } else if (llh->llh_flags &
166                            (LLOG_F_IS_PLAIN | LLOG_F_IS_CAT)) {
167                         /*
168                          * it is possible to open llog without specifying llog
169                          * type so it is taken from llh_flags
170                          */
171                         flags = llh->llh_flags;
172                 } else {
173                         /* for some reason the llh_flags has no type set */
174                         CERROR("llog type is not specified!\n");
175                         rc = -EINVAL;
176                         goto out;
177                 }
178                 if (unlikely(uuid &&
179                              !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
180                         CERROR("%s: llog uuid mismatch: %s/%s\n",
181                                handle->lgh_ctxt->loc_obd->obd_name,
182                                (char *)uuid->uuid,
183                                (char *)llh->llh_tgtuuid.uuid);
184                         rc = -EEXIST;
185                         goto out;
186                 }
187         }
188         if (flags & LLOG_F_IS_CAT) {
189                 LASSERT(list_empty(&handle->u.chd.chd_head));
190                 INIT_LIST_HEAD(&handle->u.chd.chd_head);
191                 llh->llh_size = sizeof(struct llog_logid_rec);
192         } else if (!(flags & LLOG_F_IS_PLAIN)) {
193                 CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
194                        handle->lgh_ctxt->loc_obd->obd_name,
195                        flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
196                 rc = -EINVAL;
197         }
198         llh->llh_flags |= fmt;
199 out:
200         if (rc) {
201                 kfree(llh);
202                 handle->lgh_hdr = NULL;
203         }
204         return rc;
205 }
206 EXPORT_SYMBOL(llog_init_handle);
207
208 static int llog_process_thread(void *arg)
209 {
210         struct llog_process_info        *lpi = arg;
211         struct llog_handle              *loghandle = lpi->lpi_loghandle;
212         struct llog_log_hdr             *llh = loghandle->lgh_hdr;
213         struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
214         char                            *buf;
215         __u64                            cur_offset = LLOG_CHUNK_SIZE;
216         __u64                            last_offset;
217         int                              rc = 0, index = 1, last_index;
218         int                              saved_index = 0;
219         int                              last_called_index = 0;
220
221         LASSERT(llh);
222
223         buf = kzalloc(LLOG_CHUNK_SIZE, GFP_NOFS);
224         if (!buf) {
225                 lpi->lpi_rc = -ENOMEM;
226                 return 0;
227         }
228
229         if (cd) {
230                 last_called_index = cd->lpcd_first_idx;
231                 index = cd->lpcd_first_idx + 1;
232         }
233         if (cd && cd->lpcd_last_idx)
234                 last_index = cd->lpcd_last_idx;
235         else
236                 last_index = LLOG_BITMAP_BYTES * 8 - 1;
237
238         /* Record is not in this buffer. */
239         if (index > last_index)
240                 goto out;
241
242         while (rc == 0) {
243                 struct llog_rec_hdr *rec;
244
245                 /* skip records not set in bitmap */
246                 while (index <= last_index &&
247                        !ext2_test_bit(index, llh->llh_bitmap))
248                         ++index;
249
250                 LASSERT(index <= last_index + 1);
251                 if (index == last_index + 1)
252                         break;
253 repeat:
254                 CDEBUG(D_OTHER, "index: %d last_index %d\n",
255                        index, last_index);
256
257                 /* get the buf with our target record; avoid old garbage */
258                 memset(buf, 0, LLOG_CHUNK_SIZE);
259                 last_offset = cur_offset;
260                 rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
261                                      index, &cur_offset, buf, LLOG_CHUNK_SIZE);
262                 if (rc)
263                         goto out;
264
265                 /* NB: when rec->lrh_len is accessed it is already swabbed
266                  * since it is used at the "end" of the loop and the rec
267                  * swabbing is done at the beginning of the loop.
268                  */
269                 for (rec = (struct llog_rec_hdr *)buf;
270                      (char *)rec < buf + LLOG_CHUNK_SIZE;
271                      rec = llog_rec_hdr_next(rec)) {
272                         CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
273                                rec, rec->lrh_type);
274
275                         if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
276                                 lustre_swab_llog_rec(rec);
277
278                         CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
279                                rec->lrh_type, rec->lrh_index);
280
281                         if (rec->lrh_index == 0) {
282                                 /* probably another rec just got added? */
283                                 rc = 0;
284                                 if (index <= loghandle->lgh_last_idx)
285                                         goto repeat;
286                                 goto out; /* no more records */
287                         }
288                         if (rec->lrh_len == 0 ||
289                             rec->lrh_len > LLOG_CHUNK_SIZE) {
290                                 CWARN("invalid length %d in llog record for index %d/%d\n",
291                                       rec->lrh_len,
292                                       rec->lrh_index, index);
293                                 rc = -EINVAL;
294                                 goto out;
295                         }
296
297                         if (rec->lrh_index < index) {
298                                 CDEBUG(D_OTHER, "skipping lrh_index %d\n",
299                                        rec->lrh_index);
300                                 continue;
301                         }
302
303                         CDEBUG(D_OTHER,
304                                "lrh_index: %d lrh_len: %d (%d remains)\n",
305                                rec->lrh_index, rec->lrh_len,
306                                (int)(buf + LLOG_CHUNK_SIZE - (char *)rec));
307
308                         loghandle->lgh_cur_idx = rec->lrh_index;
309                         loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
310                                                     last_offset;
311
312                         /* if set, process the callback on this record */
313                         if (ext2_test_bit(index, llh->llh_bitmap)) {
314                                 rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
315                                                  lpi->lpi_cbdata);
316                                 last_called_index = index;
317                                 if (rc)
318                                         goto out;
319                         } else {
320                                 CDEBUG(D_OTHER, "Skipped index %d\n", index);
321                         }
322
323                         /* next record, still in buffer? */
324                         ++index;
325                         if (index > last_index) {
326                                 rc = 0;
327                                 goto out;
328                         }
329                 }
330         }
331
332 out:
333         if (cd)
334                 cd->lpcd_last_idx = last_called_index;
335
336         kfree(buf);
337         lpi->lpi_rc = rc;
338         return 0;
339 }
340
341 static int llog_process_thread_daemonize(void *arg)
342 {
343         struct llog_process_info        *lpi = arg;
344         struct lu_env                    env;
345         int                              rc;
346
347         unshare_fs_struct();
348
349         /* client env has no keys, tags is just 0 */
350         rc = lu_env_init(&env, LCT_LOCAL | LCT_MG_THREAD);
351         if (rc)
352                 goto out;
353         lpi->lpi_env = &env;
354
355         rc = llog_process_thread(arg);
356
357         lu_env_fini(&env);
358 out:
359         complete(&lpi->lpi_completion);
360         return rc;
361 }
362
363 int llog_process_or_fork(const struct lu_env *env,
364                          struct llog_handle *loghandle,
365                          llog_cb_t cb, void *data, void *catdata, bool fork)
366 {
367         struct llog_process_info *lpi;
368         int                   rc;
369
370         lpi = kzalloc(sizeof(*lpi), GFP_NOFS);
371         if (!lpi)
372                 return -ENOMEM;
373         lpi->lpi_loghandle = loghandle;
374         lpi->lpi_cb     = cb;
375         lpi->lpi_cbdata    = data;
376         lpi->lpi_catdata   = catdata;
377
378         if (fork) {
379                 struct task_struct *task;
380
381                 /* The new thread can't use parent env,
382                  * init the new one in llog_process_thread_daemonize.
383                  */
384                 lpi->lpi_env = NULL;
385                 init_completion(&lpi->lpi_completion);
386                 task = kthread_run(llog_process_thread_daemonize, lpi,
387                                    "llog_process_thread");
388                 if (IS_ERR(task)) {
389                         rc = PTR_ERR(task);
390                         CERROR("%s: cannot start thread: rc = %d\n",
391                                loghandle->lgh_ctxt->loc_obd->obd_name, rc);
392                         goto out_lpi;
393                 }
394                 wait_for_completion(&lpi->lpi_completion);
395         } else {
396                 lpi->lpi_env = env;
397                 llog_process_thread(lpi);
398         }
399         rc = lpi->lpi_rc;
400 out_lpi:
401         kfree(lpi);
402         return rc;
403 }
404 EXPORT_SYMBOL(llog_process_or_fork);
405
406 int llog_process(const struct lu_env *env, struct llog_handle *loghandle,
407                  llog_cb_t cb, void *data, void *catdata)
408 {
409         return llog_process_or_fork(env, loghandle, cb, data, catdata, true);
410 }
411 EXPORT_SYMBOL(llog_process);
412
413 int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
414               struct llog_handle **lgh, struct llog_logid *logid,
415               char *name, enum llog_open_param open_param)
416 {
417         int      raised;
418         int      rc;
419
420         LASSERT(ctxt);
421         LASSERT(ctxt->loc_logops);
422
423         if (!ctxt->loc_logops->lop_open) {
424                 *lgh = NULL;
425                 return -EOPNOTSUPP;
426         }
427
428         *lgh = llog_alloc_handle();
429         if (!*lgh)
430                 return -ENOMEM;
431         (*lgh)->lgh_ctxt = ctxt;
432         (*lgh)->lgh_logops = ctxt->loc_logops;
433
434         raised = cfs_cap_raised(CFS_CAP_SYS_RESOURCE);
435         if (!raised)
436                 cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
437         rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
438         if (!raised)
439                 cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
440         if (rc) {
441                 llog_free_handle(*lgh);
442                 *lgh = NULL;
443         }
444         return rc;
445 }
446 EXPORT_SYMBOL(llog_open);
447
448 int llog_close(const struct lu_env *env, struct llog_handle *loghandle)
449 {
450         struct llog_operations  *lop;
451         int                      rc;
452
453         rc = llog_handle2ops(loghandle, &lop);
454         if (rc)
455                 goto out;
456         if (!lop->lop_close) {
457                 rc = -EOPNOTSUPP;
458                 goto out;
459         }
460         rc = lop->lop_close(env, loghandle);
461 out:
462         llog_handle_put(loghandle);
463         return rc;
464 }
465 EXPORT_SYMBOL(llog_close);