1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmglue.c
5  *
6  * Code which implements an OCFS2 specific interface to our DLM.
7  *
8  * Copyright (C) 2003, 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  */
25 
26 #include <linux/types.h>
27 #include <linux/slab.h>
28 #include <linux/highmem.h>
29 #include <linux/mm.h>
30 #include <linux/kthread.h>
31 #include <linux/pagemap.h>
32 #include <linux/debugfs.h>
33 #include <linux/seq_file.h>
34 #include <linux/time.h>
35 #include <linux/quotaops.h>
36 #include <linux/sched/signal.h>
37 
38 #define MLOG_MASK_PREFIX ML_DLM_GLUE
39 #include <cluster/masklog.h>
40 
41 #include "ocfs2.h"
42 #include "ocfs2_lockingver.h"
43 
44 #include "alloc.h"
45 #include "dcache.h"
46 #include "dlmglue.h"
47 #include "extent_map.h"
48 #include "file.h"
49 #include "heartbeat.h"
50 #include "inode.h"
51 #include "journal.h"
52 #include "stackglue.h"
53 #include "slot_map.h"
54 #include "super.h"
55 #include "uptodate.h"
56 #include "quota.h"
57 #include "refcounttree.h"
58 #include "acl.h"
59 
60 #include "buffer_head_io.h"
61 
62 struct ocfs2_mask_waiter {
63 	struct list_head	mw_item;
64 	int			mw_status;
65 	struct completion	mw_complete;
66 	unsigned long		mw_mask;
67 	unsigned long		mw_goal;
68 #ifdef CONFIG_OCFS2_FS_STATS
69 	ktime_t			mw_lock_start;
70 #endif
71 };
72 
73 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
74 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
75 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres);
76 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres);
77 
78 /*
79  * Return value from ->downconvert_worker functions.
80  *
81  * These control the precise actions of ocfs2_unblock_lock()
82  * and ocfs2_process_blocked_lock()
83  *
84  */
85 enum ocfs2_unblock_action {
86 	UNBLOCK_CONTINUE	= 0, /* Continue downconvert */
87 	UNBLOCK_CONTINUE_POST	= 1, /* Continue downconvert, fire
88 				      * ->post_unlock callback */
89 	UNBLOCK_STOP_POST	= 2, /* Do not downconvert, fire
90 				      * ->post_unlock() callback. */
91 };
92 
93 struct ocfs2_unblock_ctl {
94 	int requeue;
95 	enum ocfs2_unblock_action unblock_action;
96 };
97 
98 /* Lockdep class keys */
99 #ifdef CONFIG_DEBUG_LOCK_ALLOC
100 static struct lock_class_key lockdep_keys[OCFS2_NUM_LOCK_TYPES];
101 #endif
102 
103 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
104 					int new_level);
105 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
106 
107 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
108 				     int blocking);
109 
110 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
111 				       int blocking);
112 
113 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
114 				     struct ocfs2_lock_res *lockres);
115 
116 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres);
117 
118 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
119 					    int new_level);
120 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
121 					 int blocking);
122 
123 #define mlog_meta_lvb(__level, __lockres) ocfs2_dump_meta_lvb_info(__level, __PRETTY_FUNCTION__, __LINE__, __lockres)
124 
125 /* This aids in debugging situations where a bad LVB might be involved. */
ocfs2_dump_meta_lvb_info(u64 level,const char * function,unsigned int line,struct ocfs2_lock_res * lockres)126 static void ocfs2_dump_meta_lvb_info(u64 level,
127 				     const char *function,
128 				     unsigned int line,
129 				     struct ocfs2_lock_res *lockres)
130 {
131 	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
132 
133 	mlog(level, "LVB information for %s (called from %s:%u):\n",
134 	     lockres->l_name, function, line);
135 	mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
136 	     lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
137 	     be32_to_cpu(lvb->lvb_igeneration));
138 	mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
139 	     (unsigned long long)be64_to_cpu(lvb->lvb_isize),
140 	     be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
141 	     be16_to_cpu(lvb->lvb_imode));
142 	mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
143 	     "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
144 	     (long long)be64_to_cpu(lvb->lvb_iatime_packed),
145 	     (long long)be64_to_cpu(lvb->lvb_ictime_packed),
146 	     (long long)be64_to_cpu(lvb->lvb_imtime_packed),
147 	     be32_to_cpu(lvb->lvb_iattr));
148 }
149 
150 
151 /*
152  * OCFS2 Lock Resource Operations
153  *
154  * These fine tune the behavior of the generic dlmglue locking infrastructure.
155  *
156  * The most basic of lock types can point ->l_priv to their respective
157  * struct ocfs2_super and allow the default actions to manage things.
158  *
159  * Right now, each lock type also needs to implement an init function,
160  * and trivial lock/unlock wrappers. ocfs2_simple_drop_lockres()
161  * should be called when the lock is no longer needed (i.e., object
162  * destruction time).
163  */
164 struct ocfs2_lock_res_ops {
165 	/*
166 	 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
167 	 * this callback if ->l_priv is not an ocfs2_super pointer
168 	 */
169 	struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
170 
171 	/*
172 	 * Optionally called in the downconvert thread after a
173 	 * successful downconvert. The lockres will not be referenced
174 	 * after this callback is called, so it is safe to free
175 	 * memory, etc.
176 	 *
177 	 * The exact semantics of when this is called are controlled
178 	 * by ->downconvert_worker()
179 	 */
180 	void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
181 
182 	/*
183 	 * Allow a lock type to add checks to determine whether it is
184 	 * safe to downconvert a lock. Return 0 to re-queue the
185 	 * downconvert at a later time, nonzero to continue.
186 	 *
187 	 * For most locks, the default checks that there are no
188 	 * incompatible holders are sufficient.
189 	 *
190 	 * Called with the lockres spinlock held.
191 	 */
192 	int (*check_downconvert)(struct ocfs2_lock_res *, int);
193 
194 	/*
195 	 * Allows a lock type to populate the lock value block. This
196 	 * is called on downconvert, and when we drop a lock.
197 	 *
198 	 * Locks that want to use this should set LOCK_TYPE_USES_LVB
199 	 * in the flags field.
200 	 *
201 	 * Called with the lockres spinlock held.
202 	 */
203 	void (*set_lvb)(struct ocfs2_lock_res *);
204 
205 	/*
206 	 * Called from the downconvert thread when it is determined
207 	 * that a lock will be downconverted. This is called without
208 	 * any locks held so the function can do work that might
209 	 * schedule (syncing out data, etc).
210 	 *
211 	 * This should return any one of the ocfs2_unblock_action
212 	 * values, depending on what it wants the thread to do.
213 	 */
214 	int (*downconvert_worker)(struct ocfs2_lock_res *, int);
215 
216 	/*
217 	 * LOCK_TYPE_* flags which describe the specific requirements
218 	 * of a lock type. Descriptions of each individual flag follow.
219 	 */
220 	int flags;
221 };
222 
223 /*
224  * Some locks want to "refresh" potentially stale data when a
225  * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
226  * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
227  * individual lockres l_flags member from the ast function. It is
228  * expected that the locking wrapper will clear the
229  * OCFS2_LOCK_NEEDS_REFRESH flag when done.
230  */
231 #define LOCK_TYPE_REQUIRES_REFRESH 0x1
232 
233 /*
234  * Indicate that a lock type makes use of the lock value block. The
235  * ->set_lvb lock type callback must be defined.
236  */
237 #define LOCK_TYPE_USES_LVB		0x2
238 
239 static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
240 	.get_osb	= ocfs2_get_inode_osb,
241 	.flags		= 0,
242 };
243 
244 static struct ocfs2_lock_res_ops ocfs2_inode_inode_lops = {
245 	.get_osb	= ocfs2_get_inode_osb,
246 	.check_downconvert = ocfs2_check_meta_downconvert,
247 	.set_lvb	= ocfs2_set_meta_lvb,
248 	.downconvert_worker = ocfs2_data_convert_worker,
249 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
250 };
251 
252 static struct ocfs2_lock_res_ops ocfs2_super_lops = {
253 	.flags		= LOCK_TYPE_REQUIRES_REFRESH,
254 };
255 
256 static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
257 	.flags		= 0,
258 };
259 
260 static struct ocfs2_lock_res_ops ocfs2_nfs_sync_lops = {
261 	.flags		= 0,
262 };
263 
264 static struct ocfs2_lock_res_ops ocfs2_trim_fs_lops = {
265 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
266 };
267 
268 static struct ocfs2_lock_res_ops ocfs2_orphan_scan_lops = {
269 	.flags		= LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
270 };
271 
272 static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
273 	.get_osb	= ocfs2_get_dentry_osb,
274 	.post_unlock	= ocfs2_dentry_post_unlock,
275 	.downconvert_worker = ocfs2_dentry_convert_worker,
276 	.flags		= 0,
277 };
278 
279 static struct ocfs2_lock_res_ops ocfs2_inode_open_lops = {
280 	.get_osb	= ocfs2_get_inode_osb,
281 	.flags		= 0,
282 };
283 
284 static struct ocfs2_lock_res_ops ocfs2_flock_lops = {
285 	.get_osb	= ocfs2_get_file_osb,
286 	.flags		= 0,
287 };
288 
289 static struct ocfs2_lock_res_ops ocfs2_qinfo_lops = {
290 	.set_lvb	= ocfs2_set_qinfo_lvb,
291 	.get_osb	= ocfs2_get_qinfo_osb,
292 	.flags		= LOCK_TYPE_REQUIRES_REFRESH | LOCK_TYPE_USES_LVB,
293 };
294 
295 static struct ocfs2_lock_res_ops ocfs2_refcount_block_lops = {
296 	.check_downconvert = ocfs2_check_refcount_downconvert,
297 	.downconvert_worker = ocfs2_refcount_convert_worker,
298 	.flags		= 0,
299 };
300 
ocfs2_is_inode_lock(struct ocfs2_lock_res * lockres)301 static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
302 {
303 	return lockres->l_type == OCFS2_LOCK_TYPE_META ||
304 		lockres->l_type == OCFS2_LOCK_TYPE_RW ||
305 		lockres->l_type == OCFS2_LOCK_TYPE_OPEN;
306 }
307 
ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb * lksb)308 static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb)
309 {
310 	return container_of(lksb, struct ocfs2_lock_res, l_lksb);
311 }
312 
ocfs2_lock_res_inode(struct ocfs2_lock_res * lockres)313 static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
314 {
315 	BUG_ON(!ocfs2_is_inode_lock(lockres));
316 
317 	return (struct inode *) lockres->l_priv;
318 }
319 
ocfs2_lock_res_dl(struct ocfs2_lock_res * lockres)320 static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
321 {
322 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
323 
324 	return (struct ocfs2_dentry_lock *)lockres->l_priv;
325 }
326 
ocfs2_lock_res_qinfo(struct ocfs2_lock_res * lockres)327 static inline struct ocfs2_mem_dqinfo *ocfs2_lock_res_qinfo(struct ocfs2_lock_res *lockres)
328 {
329 	BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_QINFO);
330 
331 	return (struct ocfs2_mem_dqinfo *)lockres->l_priv;
332 }
333 
334 static inline struct ocfs2_refcount_tree *
ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res * res)335 ocfs2_lock_res_refcount_tree(struct ocfs2_lock_res *res)
336 {
337 	return container_of(res, struct ocfs2_refcount_tree, rf_lockres);
338 }
339 
ocfs2_get_lockres_osb(struct ocfs2_lock_res * lockres)340 static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
341 {
342 	if (lockres->l_ops->get_osb)
343 		return lockres->l_ops->get_osb(lockres);
344 
345 	return (struct ocfs2_super *)lockres->l_priv;
346 }
347 
348 static int ocfs2_lock_create(struct ocfs2_super *osb,
349 			     struct ocfs2_lock_res *lockres,
350 			     int level,
351 			     u32 dlm_flags);
352 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
353 						     int wanted);
354 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
355 				   struct ocfs2_lock_res *lockres,
356 				   int level, unsigned long caller_ip);
ocfs2_cluster_unlock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,int level)357 static inline void ocfs2_cluster_unlock(struct ocfs2_super *osb,
358 					struct ocfs2_lock_res *lockres,
359 					int level)
360 {
361 	__ocfs2_cluster_unlock(osb, lockres, level, _RET_IP_);
362 }
363 
364 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
365 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
366 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
367 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
368 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
369 					struct ocfs2_lock_res *lockres);
370 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
371 						int convert);
372 #define ocfs2_log_dlm_error(_func, _err, _lockres) do {					\
373 	if ((_lockres)->l_type != OCFS2_LOCK_TYPE_DENTRY)				\
374 		mlog(ML_ERROR, "DLM error %d while calling %s on resource %s\n",	\
375 		     _err, _func, _lockres->l_name);					\
376 	else										\
377 		mlog(ML_ERROR, "DLM error %d while calling %s on resource %.*s%08x\n",	\
378 		     _err, _func, OCFS2_DENTRY_LOCK_INO_START - 1, (_lockres)->l_name,	\
379 		     (unsigned int)ocfs2_get_dentry_lock_ino(_lockres));		\
380 } while (0)
381 static int ocfs2_downconvert_thread(void *arg);
382 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
383 					struct ocfs2_lock_res *lockres);
384 static int ocfs2_inode_lock_update(struct inode *inode,
385 				  struct buffer_head **bh);
386 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
387 static inline int ocfs2_highest_compat_lock_level(int level);
388 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
389 					      int new_level);
390 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
391 				  struct ocfs2_lock_res *lockres,
392 				  int new_level,
393 				  int lvb,
394 				  unsigned int generation);
395 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
396 				        struct ocfs2_lock_res *lockres);
397 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
398 				struct ocfs2_lock_res *lockres);
399 
400 
ocfs2_build_lock_name(enum ocfs2_lock_type type,u64 blkno,u32 generation,char * name)401 static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
402 				  u64 blkno,
403 				  u32 generation,
404 				  char *name)
405 {
406 	int len;
407 
408 	BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
409 
410 	len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
411 		       ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
412 		       (long long)blkno, generation);
413 
414 	BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
415 
416 	mlog(0, "built lock resource with name: %s\n", name);
417 }
418 
419 static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
420 
ocfs2_add_lockres_tracking(struct ocfs2_lock_res * res,struct ocfs2_dlm_debug * dlm_debug)421 static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
422 				       struct ocfs2_dlm_debug *dlm_debug)
423 {
424 	mlog(0, "Add tracking for lockres %s\n", res->l_name);
425 
426 	spin_lock(&ocfs2_dlm_tracking_lock);
427 	list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
428 	spin_unlock(&ocfs2_dlm_tracking_lock);
429 }
430 
ocfs2_remove_lockres_tracking(struct ocfs2_lock_res * res)431 static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
432 {
433 	spin_lock(&ocfs2_dlm_tracking_lock);
434 	if (!list_empty(&res->l_debug_list))
435 		list_del_init(&res->l_debug_list);
436 	spin_unlock(&ocfs2_dlm_tracking_lock);
437 }
438 
439 #ifdef CONFIG_OCFS2_FS_STATS
ocfs2_init_lock_stats(struct ocfs2_lock_res * res)440 static void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
441 {
442 	res->l_lock_refresh = 0;
443 	memset(&res->l_lock_prmode, 0, sizeof(struct ocfs2_lock_stats));
444 	memset(&res->l_lock_exmode, 0, sizeof(struct ocfs2_lock_stats));
445 }
446 
ocfs2_update_lock_stats(struct ocfs2_lock_res * res,int level,struct ocfs2_mask_waiter * mw,int ret)447 static void ocfs2_update_lock_stats(struct ocfs2_lock_res *res, int level,
448 				    struct ocfs2_mask_waiter *mw, int ret)
449 {
450 	u32 usec;
451 	ktime_t kt;
452 	struct ocfs2_lock_stats *stats;
453 
454 	if (level == LKM_PRMODE)
455 		stats = &res->l_lock_prmode;
456 	else if (level == LKM_EXMODE)
457 		stats = &res->l_lock_exmode;
458 	else
459 		return;
460 
461 	kt = ktime_sub(ktime_get(), mw->mw_lock_start);
462 	usec = ktime_to_us(kt);
463 
464 	stats->ls_gets++;
465 	stats->ls_total += ktime_to_ns(kt);
466 	/* overflow */
467 	if (unlikely(stats->ls_gets == 0)) {
468 		stats->ls_gets++;
469 		stats->ls_total = ktime_to_ns(kt);
470 	}
471 
472 	if (stats->ls_max < usec)
473 		stats->ls_max = usec;
474 
475 	if (ret)
476 		stats->ls_fail++;
477 }
478 
ocfs2_track_lock_refresh(struct ocfs2_lock_res * lockres)479 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
480 {
481 	lockres->l_lock_refresh++;
482 }
483 
ocfs2_init_start_time(struct ocfs2_mask_waiter * mw)484 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
485 {
486 	mw->mw_lock_start = ktime_get();
487 }
488 #else
ocfs2_init_lock_stats(struct ocfs2_lock_res * res)489 static inline void ocfs2_init_lock_stats(struct ocfs2_lock_res *res)
490 {
491 }
ocfs2_update_lock_stats(struct ocfs2_lock_res * res,int level,struct ocfs2_mask_waiter * mw,int ret)492 static inline void ocfs2_update_lock_stats(struct ocfs2_lock_res *res,
493 			   int level, struct ocfs2_mask_waiter *mw, int ret)
494 {
495 }
ocfs2_track_lock_refresh(struct ocfs2_lock_res * lockres)496 static inline void ocfs2_track_lock_refresh(struct ocfs2_lock_res *lockres)
497 {
498 }
ocfs2_init_start_time(struct ocfs2_mask_waiter * mw)499 static inline void ocfs2_init_start_time(struct ocfs2_mask_waiter *mw)
500 {
501 }
502 #endif
503 
ocfs2_lock_res_init_common(struct ocfs2_super * osb,struct ocfs2_lock_res * res,enum ocfs2_lock_type type,struct ocfs2_lock_res_ops * ops,void * priv)504 static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
505 				       struct ocfs2_lock_res *res,
506 				       enum ocfs2_lock_type type,
507 				       struct ocfs2_lock_res_ops *ops,
508 				       void *priv)
509 {
510 	res->l_type          = type;
511 	res->l_ops           = ops;
512 	res->l_priv          = priv;
513 
514 	res->l_level         = DLM_LOCK_IV;
515 	res->l_requested     = DLM_LOCK_IV;
516 	res->l_blocking      = DLM_LOCK_IV;
517 	res->l_action        = OCFS2_AST_INVALID;
518 	res->l_unlock_action = OCFS2_UNLOCK_INVALID;
519 
520 	res->l_flags         = OCFS2_LOCK_INITIALIZED;
521 
522 	ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
523 
524 	ocfs2_init_lock_stats(res);
525 #ifdef CONFIG_DEBUG_LOCK_ALLOC
526 	if (type != OCFS2_LOCK_TYPE_OPEN)
527 		lockdep_init_map(&res->l_lockdep_map, ocfs2_lock_type_strings[type],
528 				 &lockdep_keys[type], 0);
529 	else
530 		res->l_lockdep_map.key = NULL;
531 #endif
532 }
533 
ocfs2_lock_res_init_once(struct ocfs2_lock_res * res)534 void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
535 {
536 	/* This also clears out the lock status block */
537 	memset(res, 0, sizeof(struct ocfs2_lock_res));
538 	spin_lock_init(&res->l_lock);
539 	init_waitqueue_head(&res->l_event);
540 	INIT_LIST_HEAD(&res->l_blocked_list);
541 	INIT_LIST_HEAD(&res->l_mask_waiters);
542 	INIT_LIST_HEAD(&res->l_holders);
543 }
544 
ocfs2_inode_lock_res_init(struct ocfs2_lock_res * res,enum ocfs2_lock_type type,unsigned int generation,struct inode * inode)545 void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
546 			       enum ocfs2_lock_type type,
547 			       unsigned int generation,
548 			       struct inode *inode)
549 {
550 	struct ocfs2_lock_res_ops *ops;
551 
552 	switch(type) {
553 		case OCFS2_LOCK_TYPE_RW:
554 			ops = &ocfs2_inode_rw_lops;
555 			break;
556 		case OCFS2_LOCK_TYPE_META:
557 			ops = &ocfs2_inode_inode_lops;
558 			break;
559 		case OCFS2_LOCK_TYPE_OPEN:
560 			ops = &ocfs2_inode_open_lops;
561 			break;
562 		default:
563 			mlog_bug_on_msg(1, "type: %d\n", type);
564 			ops = NULL; /* thanks, gcc */
565 			break;
566 	};
567 
568 	ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
569 			      generation, res->l_name);
570 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
571 }
572 
ocfs2_get_inode_osb(struct ocfs2_lock_res * lockres)573 static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
574 {
575 	struct inode *inode = ocfs2_lock_res_inode(lockres);
576 
577 	return OCFS2_SB(inode->i_sb);
578 }
579 
ocfs2_get_qinfo_osb(struct ocfs2_lock_res * lockres)580 static struct ocfs2_super *ocfs2_get_qinfo_osb(struct ocfs2_lock_res *lockres)
581 {
582 	struct ocfs2_mem_dqinfo *info = lockres->l_priv;
583 
584 	return OCFS2_SB(info->dqi_gi.dqi_sb);
585 }
586 
ocfs2_get_file_osb(struct ocfs2_lock_res * lockres)587 static struct ocfs2_super *ocfs2_get_file_osb(struct ocfs2_lock_res *lockres)
588 {
589 	struct ocfs2_file_private *fp = lockres->l_priv;
590 
591 	return OCFS2_SB(fp->fp_file->f_mapping->host->i_sb);
592 }
593 
ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res * lockres)594 static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
595 {
596 	__be64 inode_blkno_be;
597 
598 	memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
599 	       sizeof(__be64));
600 
601 	return be64_to_cpu(inode_blkno_be);
602 }
603 
ocfs2_get_dentry_osb(struct ocfs2_lock_res * lockres)604 static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
605 {
606 	struct ocfs2_dentry_lock *dl = lockres->l_priv;
607 
608 	return OCFS2_SB(dl->dl_inode->i_sb);
609 }
610 
ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock * dl,u64 parent,struct inode * inode)611 void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
612 				u64 parent, struct inode *inode)
613 {
614 	int len;
615 	u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
616 	__be64 inode_blkno_be = cpu_to_be64(inode_blkno);
617 	struct ocfs2_lock_res *lockres = &dl->dl_lockres;
618 
619 	ocfs2_lock_res_init_once(lockres);
620 
621 	/*
622 	 * Unfortunately, the standard lock naming scheme won't work
623 	 * here because we have two 16 byte values to use. Instead,
624 	 * we'll stuff the inode number as a binary value. We still
625 	 * want error prints to show something without garbling the
626 	 * display, so drop a null byte in there before the inode
627 	 * number. A future version of OCFS2 will likely use all
628 	 * binary lock names. The stringified names have been a
629 	 * tremendous aid in debugging, but now that the debugfs
630 	 * interface exists, we can mangle things there if need be.
631 	 *
632 	 * NOTE: We also drop the standard "pad" value (the total lock
633 	 * name size stays the same though - the last part is all
634 	 * zeros due to the memset in ocfs2_lock_res_init_once()
635 	 */
636 	len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
637 		       "%c%016llx",
638 		       ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
639 		       (long long)parent);
640 
641 	BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
642 
643 	memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
644 	       sizeof(__be64));
645 
646 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
647 				   OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
648 				   dl);
649 }
650 
ocfs2_super_lock_res_init(struct ocfs2_lock_res * res,struct ocfs2_super * osb)651 static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
652 				      struct ocfs2_super *osb)
653 {
654 	/* Superblock lockres doesn't come from a slab so we call init
655 	 * once on it manually.  */
656 	ocfs2_lock_res_init_once(res);
657 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
658 			      0, res->l_name);
659 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
660 				   &ocfs2_super_lops, osb);
661 }
662 
ocfs2_rename_lock_res_init(struct ocfs2_lock_res * res,struct ocfs2_super * osb)663 static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
664 				       struct ocfs2_super *osb)
665 {
666 	/* Rename lockres doesn't come from a slab so we call init
667 	 * once on it manually.  */
668 	ocfs2_lock_res_init_once(res);
669 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
670 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
671 				   &ocfs2_rename_lops, osb);
672 }
673 
ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res * res,struct ocfs2_super * osb)674 static void ocfs2_nfs_sync_lock_res_init(struct ocfs2_lock_res *res,
675 					 struct ocfs2_super *osb)
676 {
677 	/* nfs_sync lockres doesn't come from a slab so we call init
678 	 * once on it manually.  */
679 	ocfs2_lock_res_init_once(res);
680 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_NFS_SYNC, 0, 0, res->l_name);
681 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_NFS_SYNC,
682 				   &ocfs2_nfs_sync_lops, osb);
683 }
684 
ocfs2_nfs_sync_lock_init(struct ocfs2_super * osb)685 static void ocfs2_nfs_sync_lock_init(struct ocfs2_super *osb)
686 {
687 	ocfs2_nfs_sync_lock_res_init(&osb->osb_nfs_sync_lockres, osb);
688 	init_rwsem(&osb->nfs_sync_rwlock);
689 }
690 
ocfs2_trim_fs_lock_res_init(struct ocfs2_super * osb)691 void ocfs2_trim_fs_lock_res_init(struct ocfs2_super *osb)
692 {
693 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
694 
695 	ocfs2_lock_res_init_once(lockres);
696 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_TRIM_FS, 0, 0, lockres->l_name);
697 	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_TRIM_FS,
698 				   &ocfs2_trim_fs_lops, osb);
699 }
700 
ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super * osb)701 void ocfs2_trim_fs_lock_res_uninit(struct ocfs2_super *osb)
702 {
703 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
704 
705 	ocfs2_simple_drop_lockres(osb, lockres);
706 	ocfs2_lock_res_free(lockres);
707 }
708 
ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res * res,struct ocfs2_super * osb)709 static void ocfs2_orphan_scan_lock_res_init(struct ocfs2_lock_res *res,
710 					    struct ocfs2_super *osb)
711 {
712 	ocfs2_lock_res_init_once(res);
713 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_ORPHAN_SCAN, 0, 0, res->l_name);
714 	ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_ORPHAN_SCAN,
715 				   &ocfs2_orphan_scan_lops, osb);
716 }
717 
ocfs2_file_lock_res_init(struct ocfs2_lock_res * lockres,struct ocfs2_file_private * fp)718 void ocfs2_file_lock_res_init(struct ocfs2_lock_res *lockres,
719 			      struct ocfs2_file_private *fp)
720 {
721 	struct inode *inode = fp->fp_file->f_mapping->host;
722 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
723 
724 	ocfs2_lock_res_init_once(lockres);
725 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_FLOCK, oi->ip_blkno,
726 			      inode->i_generation, lockres->l_name);
727 	ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
728 				   OCFS2_LOCK_TYPE_FLOCK, &ocfs2_flock_lops,
729 				   fp);
730 	lockres->l_flags |= OCFS2_LOCK_NOCACHE;
731 }
732 
ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res * lockres,struct ocfs2_mem_dqinfo * info)733 void ocfs2_qinfo_lock_res_init(struct ocfs2_lock_res *lockres,
734 			       struct ocfs2_mem_dqinfo *info)
735 {
736 	ocfs2_lock_res_init_once(lockres);
737 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_QINFO, info->dqi_gi.dqi_type,
738 			      0, lockres->l_name);
739 	ocfs2_lock_res_init_common(OCFS2_SB(info->dqi_gi.dqi_sb), lockres,
740 				   OCFS2_LOCK_TYPE_QINFO, &ocfs2_qinfo_lops,
741 				   info);
742 }
743 
ocfs2_refcount_lock_res_init(struct ocfs2_lock_res * lockres,struct ocfs2_super * osb,u64 ref_blkno,unsigned int generation)744 void ocfs2_refcount_lock_res_init(struct ocfs2_lock_res *lockres,
745 				  struct ocfs2_super *osb, u64 ref_blkno,
746 				  unsigned int generation)
747 {
748 	ocfs2_lock_res_init_once(lockres);
749 	ocfs2_build_lock_name(OCFS2_LOCK_TYPE_REFCOUNT, ref_blkno,
750 			      generation, lockres->l_name);
751 	ocfs2_lock_res_init_common(osb, lockres, OCFS2_LOCK_TYPE_REFCOUNT,
752 				   &ocfs2_refcount_block_lops, osb);
753 }
754 
ocfs2_lock_res_free(struct ocfs2_lock_res * res)755 void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
756 {
757 	if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
758 		return;
759 
760 	ocfs2_remove_lockres_tracking(res);
761 
762 	mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
763 			"Lockres %s is on the blocked list\n",
764 			res->l_name);
765 	mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
766 			"Lockres %s has mask waiters pending\n",
767 			res->l_name);
768 	mlog_bug_on_msg(spin_is_locked(&res->l_lock),
769 			"Lockres %s is locked\n",
770 			res->l_name);
771 	mlog_bug_on_msg(res->l_ro_holders,
772 			"Lockres %s has %u ro holders\n",
773 			res->l_name, res->l_ro_holders);
774 	mlog_bug_on_msg(res->l_ex_holders,
775 			"Lockres %s has %u ex holders\n",
776 			res->l_name, res->l_ex_holders);
777 
778 	/* Need to clear out the lock status block for the dlm */
779 	memset(&res->l_lksb, 0, sizeof(res->l_lksb));
780 
781 	res->l_flags = 0UL;
782 }
783 
784 /*
785  * Keep a list of processes who have interest in a lockres.
786  * Note: this is now only uesed for check recursive cluster locking.
787  */
ocfs2_add_holder(struct ocfs2_lock_res * lockres,struct ocfs2_lock_holder * oh)788 static inline void ocfs2_add_holder(struct ocfs2_lock_res *lockres,
789 				   struct ocfs2_lock_holder *oh)
790 {
791 	INIT_LIST_HEAD(&oh->oh_list);
792 	oh->oh_owner_pid = get_pid(task_pid(current));
793 
794 	spin_lock(&lockres->l_lock);
795 	list_add_tail(&oh->oh_list, &lockres->l_holders);
796 	spin_unlock(&lockres->l_lock);
797 }
798 
799 static struct ocfs2_lock_holder *
ocfs2_pid_holder(struct ocfs2_lock_res * lockres,struct pid * pid)800 ocfs2_pid_holder(struct ocfs2_lock_res *lockres,
801 		struct pid *pid)
802 {
803 	struct ocfs2_lock_holder *oh;
804 
805 	spin_lock(&lockres->l_lock);
806 	list_for_each_entry(oh, &lockres->l_holders, oh_list) {
807 		if (oh->oh_owner_pid == pid) {
808 			spin_unlock(&lockres->l_lock);
809 			return oh;
810 		}
811 	}
812 	spin_unlock(&lockres->l_lock);
813 	return NULL;
814 }
815 
ocfs2_remove_holder(struct ocfs2_lock_res * lockres,struct ocfs2_lock_holder * oh)816 static inline void ocfs2_remove_holder(struct ocfs2_lock_res *lockres,
817 				       struct ocfs2_lock_holder *oh)
818 {
819 	spin_lock(&lockres->l_lock);
820 	list_del(&oh->oh_list);
821 	spin_unlock(&lockres->l_lock);
822 
823 	put_pid(oh->oh_owner_pid);
824 }
825 
826 
ocfs2_inc_holders(struct ocfs2_lock_res * lockres,int level)827 static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
828 				     int level)
829 {
830 	BUG_ON(!lockres);
831 
832 	switch(level) {
833 	case DLM_LOCK_EX:
834 		lockres->l_ex_holders++;
835 		break;
836 	case DLM_LOCK_PR:
837 		lockres->l_ro_holders++;
838 		break;
839 	default:
840 		BUG();
841 	}
842 }
843 
ocfs2_dec_holders(struct ocfs2_lock_res * lockres,int level)844 static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
845 				     int level)
846 {
847 	BUG_ON(!lockres);
848 
849 	switch(level) {
850 	case DLM_LOCK_EX:
851 		BUG_ON(!lockres->l_ex_holders);
852 		lockres->l_ex_holders--;
853 		break;
854 	case DLM_LOCK_PR:
855 		BUG_ON(!lockres->l_ro_holders);
856 		lockres->l_ro_holders--;
857 		break;
858 	default:
859 		BUG();
860 	}
861 }
862 
863 /* WARNING: This function lives in a world where the only three lock
864  * levels are EX, PR, and NL. It *will* have to be adjusted when more
865  * lock types are added. */
ocfs2_highest_compat_lock_level(int level)866 static inline int ocfs2_highest_compat_lock_level(int level)
867 {
868 	int new_level = DLM_LOCK_EX;
869 
870 	if (level == DLM_LOCK_EX)
871 		new_level = DLM_LOCK_NL;
872 	else if (level == DLM_LOCK_PR)
873 		new_level = DLM_LOCK_PR;
874 	return new_level;
875 }
876 
lockres_set_flags(struct ocfs2_lock_res * lockres,unsigned long newflags)877 static void lockres_set_flags(struct ocfs2_lock_res *lockres,
878 			      unsigned long newflags)
879 {
880 	struct ocfs2_mask_waiter *mw, *tmp;
881 
882  	assert_spin_locked(&lockres->l_lock);
883 
884 	lockres->l_flags = newflags;
885 
886 	list_for_each_entry_safe(mw, tmp, &lockres->l_mask_waiters, mw_item) {
887 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
888 			continue;
889 
890 		list_del_init(&mw->mw_item);
891 		mw->mw_status = 0;
892 		complete(&mw->mw_complete);
893 	}
894 }
lockres_or_flags(struct ocfs2_lock_res * lockres,unsigned long or)895 static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
896 {
897 	lockres_set_flags(lockres, lockres->l_flags | or);
898 }
lockres_clear_flags(struct ocfs2_lock_res * lockres,unsigned long clear)899 static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
900 				unsigned long clear)
901 {
902 	lockres_set_flags(lockres, lockres->l_flags & ~clear);
903 }
904 
ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res * lockres)905 static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
906 {
907 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
908 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
909 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
910 	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
911 
912 	lockres->l_level = lockres->l_requested;
913 	if (lockres->l_level <=
914 	    ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
915 		lockres->l_blocking = DLM_LOCK_NL;
916 		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
917 	}
918 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
919 }
920 
ocfs2_generic_handle_convert_action(struct ocfs2_lock_res * lockres)921 static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
922 {
923 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
924 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
925 
926 	/* Convert from RO to EX doesn't really need anything as our
927 	 * information is already up to data. Convert from NL to
928 	 * *anything* however should mark ourselves as needing an
929 	 * update */
930 	if (lockres->l_level == DLM_LOCK_NL &&
931 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
932 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
933 
934 	lockres->l_level = lockres->l_requested;
935 
936 	/*
937 	 * We set the OCFS2_LOCK_UPCONVERT_FINISHING flag before clearing
938 	 * the OCFS2_LOCK_BUSY flag to prevent the dc thread from
939 	 * downconverting the lock before the upconvert has fully completed.
940 	 * Do not prevent the dc thread from downconverting if NONBLOCK lock
941 	 * had already returned.
942 	 */
943 	if (!(lockres->l_flags & OCFS2_LOCK_NONBLOCK_FINISHED))
944 		lockres_or_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
945 	else
946 		lockres_clear_flags(lockres, OCFS2_LOCK_NONBLOCK_FINISHED);
947 
948 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
949 }
950 
ocfs2_generic_handle_attach_action(struct ocfs2_lock_res * lockres)951 static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
952 {
953 	BUG_ON((!(lockres->l_flags & OCFS2_LOCK_BUSY)));
954 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
955 
956 	if (lockres->l_requested > DLM_LOCK_NL &&
957 	    !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
958 	    lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
959 		lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
960 
961 	lockres->l_level = lockres->l_requested;
962 	lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
963 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
964 }
965 
ocfs2_generic_handle_bast(struct ocfs2_lock_res * lockres,int level)966 static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
967 				     int level)
968 {
969 	int needs_downconvert = 0;
970 
971 	assert_spin_locked(&lockres->l_lock);
972 
973 	if (level > lockres->l_blocking) {
974 		/* only schedule a downconvert if we haven't already scheduled
975 		 * one that goes low enough to satisfy the level we're
976 		 * blocking.  this also catches the case where we get
977 		 * duplicate BASTs */
978 		if (ocfs2_highest_compat_lock_level(level) <
979 		    ocfs2_highest_compat_lock_level(lockres->l_blocking))
980 			needs_downconvert = 1;
981 
982 		lockres->l_blocking = level;
983 	}
984 
985 	mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n",
986 	     lockres->l_name, level, lockres->l_level, lockres->l_blocking,
987 	     needs_downconvert);
988 
989 	if (needs_downconvert)
990 		lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
991 	mlog(0, "needs_downconvert = %d\n", needs_downconvert);
992 	return needs_downconvert;
993 }
994 
995 /*
996  * OCFS2_LOCK_PENDING and l_pending_gen.
997  *
998  * Why does OCFS2_LOCK_PENDING exist?  To close a race between setting
999  * OCFS2_LOCK_BUSY and calling ocfs2_dlm_lock().  See ocfs2_unblock_lock()
1000  * for more details on the race.
1001  *
1002  * OCFS2_LOCK_PENDING closes the race quite nicely.  However, it introduces
1003  * a race on itself.  In o2dlm, we can get the ast before ocfs2_dlm_lock()
1004  * returns.  The ast clears OCFS2_LOCK_BUSY, and must therefore clear
1005  * OCFS2_LOCK_PENDING at the same time.  When ocfs2_dlm_lock() returns,
1006  * the caller is going to try to clear PENDING again.  If nothing else is
1007  * happening, __lockres_clear_pending() sees PENDING is unset and does
1008  * nothing.
1009  *
1010  * But what if another path (eg downconvert thread) has just started a
1011  * new locking action?  The other path has re-set PENDING.  Our path
1012  * cannot clear PENDING, because that will re-open the original race
1013  * window.
1014  *
1015  * [Example]
1016  *
1017  * ocfs2_meta_lock()
1018  *  ocfs2_cluster_lock()
1019  *   set BUSY
1020  *   set PENDING
1021  *   drop l_lock
1022  *   ocfs2_dlm_lock()
1023  *    ocfs2_locking_ast()		ocfs2_downconvert_thread()
1024  *     clear PENDING			 ocfs2_unblock_lock()
1025  *					  take_l_lock
1026  *					  !BUSY
1027  *					  ocfs2_prepare_downconvert()
1028  *					   set BUSY
1029  *					   set PENDING
1030  *					  drop l_lock
1031  *   take l_lock
1032  *   clear PENDING
1033  *   drop l_lock
1034  *			<window>
1035  *					  ocfs2_dlm_lock()
1036  *
1037  * So as you can see, we now have a window where l_lock is not held,
1038  * PENDING is not set, and ocfs2_dlm_lock() has not been called.
1039  *
1040  * The core problem is that ocfs2_cluster_lock() has cleared the PENDING
1041  * set by ocfs2_prepare_downconvert().  That wasn't nice.
1042  *
1043  * To solve this we introduce l_pending_gen.  A call to
1044  * lockres_clear_pending() will only do so when it is passed a generation
1045  * number that matches the lockres.  lockres_set_pending() will return the
1046  * current generation number.  When ocfs2_cluster_lock() goes to clear
1047  * PENDING, it passes the generation it got from set_pending().  In our
1048  * example above, the generation numbers will *not* match.  Thus,
1049  * ocfs2_cluster_lock() will not clear the PENDING set by
1050  * ocfs2_prepare_downconvert().
1051  */
1052 
1053 /* Unlocked version for ocfs2_locking_ast() */
__lockres_clear_pending(struct ocfs2_lock_res * lockres,unsigned int generation,struct ocfs2_super * osb)1054 static void __lockres_clear_pending(struct ocfs2_lock_res *lockres,
1055 				    unsigned int generation,
1056 				    struct ocfs2_super *osb)
1057 {
1058 	assert_spin_locked(&lockres->l_lock);
1059 
1060 	/*
1061 	 * The ast and locking functions can race us here.  The winner
1062 	 * will clear pending, the loser will not.
1063 	 */
1064 	if (!(lockres->l_flags & OCFS2_LOCK_PENDING) ||
1065 	    (lockres->l_pending_gen != generation))
1066 		return;
1067 
1068 	lockres_clear_flags(lockres, OCFS2_LOCK_PENDING);
1069 	lockres->l_pending_gen++;
1070 
1071 	/*
1072 	 * The downconvert thread may have skipped us because we
1073 	 * were PENDING.  Wake it up.
1074 	 */
1075 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1076 		ocfs2_wake_downconvert_thread(osb);
1077 }
1078 
1079 /* Locked version for callers of ocfs2_dlm_lock() */
lockres_clear_pending(struct ocfs2_lock_res * lockres,unsigned int generation,struct ocfs2_super * osb)1080 static void lockres_clear_pending(struct ocfs2_lock_res *lockres,
1081 				  unsigned int generation,
1082 				  struct ocfs2_super *osb)
1083 {
1084 	unsigned long flags;
1085 
1086 	spin_lock_irqsave(&lockres->l_lock, flags);
1087 	__lockres_clear_pending(lockres, generation, osb);
1088 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1089 }
1090 
lockres_set_pending(struct ocfs2_lock_res * lockres)1091 static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres)
1092 {
1093 	assert_spin_locked(&lockres->l_lock);
1094 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
1095 
1096 	lockres_or_flags(lockres, OCFS2_LOCK_PENDING);
1097 
1098 	return lockres->l_pending_gen;
1099 }
1100 
ocfs2_blocking_ast(struct ocfs2_dlm_lksb * lksb,int level)1101 static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level)
1102 {
1103 	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1104 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1105 	int needs_downconvert;
1106 	unsigned long flags;
1107 
1108 	BUG_ON(level <= DLM_LOCK_NL);
1109 
1110 	mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, "
1111 	     "type %s\n", lockres->l_name, level, lockres->l_level,
1112 	     ocfs2_lock_type_string(lockres->l_type));
1113 
1114 	/*
1115 	 * We can skip the bast for locks which don't enable caching -
1116 	 * they'll be dropped at the earliest possible time anyway.
1117 	 */
1118 	if (lockres->l_flags & OCFS2_LOCK_NOCACHE)
1119 		return;
1120 
1121 	spin_lock_irqsave(&lockres->l_lock, flags);
1122 	needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
1123 	if (needs_downconvert)
1124 		ocfs2_schedule_blocked_lock(osb, lockres);
1125 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1126 
1127 	wake_up(&lockres->l_event);
1128 
1129 	ocfs2_wake_downconvert_thread(osb);
1130 }
1131 
ocfs2_locking_ast(struct ocfs2_dlm_lksb * lksb)1132 static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb)
1133 {
1134 	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1135 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1136 	unsigned long flags;
1137 	int status;
1138 
1139 	spin_lock_irqsave(&lockres->l_lock, flags);
1140 
1141 	status = ocfs2_dlm_lock_status(&lockres->l_lksb);
1142 
1143 	if (status == -EAGAIN) {
1144 		lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1145 		goto out;
1146 	}
1147 
1148 	if (status) {
1149 		mlog(ML_ERROR, "lockres %s: lksb status value of %d!\n",
1150 		     lockres->l_name, status);
1151 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1152 		return;
1153 	}
1154 
1155 	mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, "
1156 	     "level %d => %d\n", lockres->l_name, lockres->l_action,
1157 	     lockres->l_unlock_action, lockres->l_level, lockres->l_requested);
1158 
1159 	switch(lockres->l_action) {
1160 	case OCFS2_AST_ATTACH:
1161 		ocfs2_generic_handle_attach_action(lockres);
1162 		lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
1163 		break;
1164 	case OCFS2_AST_CONVERT:
1165 		ocfs2_generic_handle_convert_action(lockres);
1166 		break;
1167 	case OCFS2_AST_DOWNCONVERT:
1168 		ocfs2_generic_handle_downconvert_action(lockres);
1169 		break;
1170 	default:
1171 		mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, "
1172 		     "flags 0x%lx, unlock: %u\n",
1173 		     lockres->l_name, lockres->l_action, lockres->l_flags,
1174 		     lockres->l_unlock_action);
1175 		BUG();
1176 	}
1177 out:
1178 	/* set it to something invalid so if we get called again we
1179 	 * can catch it. */
1180 	lockres->l_action = OCFS2_AST_INVALID;
1181 
1182 	/* Did we try to cancel this lock?  Clear that state */
1183 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT)
1184 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1185 
1186 	/*
1187 	 * We may have beaten the locking functions here.  We certainly
1188 	 * know that dlm_lock() has been called :-)
1189 	 * Because we can't have two lock calls in flight at once, we
1190 	 * can use lockres->l_pending_gen.
1191 	 */
1192 	__lockres_clear_pending(lockres, lockres->l_pending_gen,  osb);
1193 
1194 	wake_up(&lockres->l_event);
1195 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1196 }
1197 
ocfs2_unlock_ast(struct ocfs2_dlm_lksb * lksb,int error)1198 static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error)
1199 {
1200 	struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb);
1201 	unsigned long flags;
1202 
1203 	mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n",
1204 	     lockres->l_name, lockres->l_unlock_action);
1205 
1206 	spin_lock_irqsave(&lockres->l_lock, flags);
1207 	if (error) {
1208 		mlog(ML_ERROR, "Dlm passes error %d for lock %s, "
1209 		     "unlock_action %d\n", error, lockres->l_name,
1210 		     lockres->l_unlock_action);
1211 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1212 		return;
1213 	}
1214 
1215 	switch(lockres->l_unlock_action) {
1216 	case OCFS2_UNLOCK_CANCEL_CONVERT:
1217 		mlog(0, "Cancel convert success for %s\n", lockres->l_name);
1218 		lockres->l_action = OCFS2_AST_INVALID;
1219 		/* Downconvert thread may have requeued this lock, we
1220 		 * need to wake it. */
1221 		if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
1222 			ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres));
1223 		break;
1224 	case OCFS2_UNLOCK_DROP_LOCK:
1225 		lockres->l_level = DLM_LOCK_IV;
1226 		break;
1227 	default:
1228 		BUG();
1229 	}
1230 
1231 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1232 	lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1233 	wake_up(&lockres->l_event);
1234 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1235 }
1236 
1237 /*
1238  * This is the filesystem locking protocol.  It provides the lock handling
1239  * hooks for the underlying DLM.  It has a maximum version number.
1240  * The version number allows interoperability with systems running at
1241  * the same major number and an equal or smaller minor number.
1242  *
1243  * Whenever the filesystem does new things with locks (adds or removes a
1244  * lock, orders them differently, does different things underneath a lock),
1245  * the version must be changed.  The protocol is negotiated when joining
1246  * the dlm domain.  A node may join the domain if its major version is
1247  * identical to all other nodes and its minor version is greater than
1248  * or equal to all other nodes.  When its minor version is greater than
1249  * the other nodes, it will run at the minor version specified by the
1250  * other nodes.
1251  *
1252  * If a locking change is made that will not be compatible with older
1253  * versions, the major number must be increased and the minor version set
1254  * to zero.  If a change merely adds a behavior that can be disabled when
1255  * speaking to older versions, the minor version must be increased.  If a
1256  * change adds a fully backwards compatible change (eg, LVB changes that
1257  * are just ignored by older versions), the version does not need to be
1258  * updated.
1259  */
1260 static struct ocfs2_locking_protocol lproto = {
1261 	.lp_max_version = {
1262 		.pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR,
1263 		.pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR,
1264 	},
1265 	.lp_lock_ast		= ocfs2_locking_ast,
1266 	.lp_blocking_ast	= ocfs2_blocking_ast,
1267 	.lp_unlock_ast		= ocfs2_unlock_ast,
1268 };
1269 
ocfs2_set_locking_protocol(void)1270 void ocfs2_set_locking_protocol(void)
1271 {
1272 	ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version);
1273 }
1274 
ocfs2_recover_from_dlm_error(struct ocfs2_lock_res * lockres,int convert)1275 static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
1276 						int convert)
1277 {
1278 	unsigned long flags;
1279 
1280 	spin_lock_irqsave(&lockres->l_lock, flags);
1281 	lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
1282 	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1283 	if (convert)
1284 		lockres->l_action = OCFS2_AST_INVALID;
1285 	else
1286 		lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
1287 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1288 
1289 	wake_up(&lockres->l_event);
1290 }
1291 
1292 /* Note: If we detect another process working on the lock (i.e.,
1293  * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
1294  * to do the right thing in that case.
1295  */
ocfs2_lock_create(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,int level,u32 dlm_flags)1296 static int ocfs2_lock_create(struct ocfs2_super *osb,
1297 			     struct ocfs2_lock_res *lockres,
1298 			     int level,
1299 			     u32 dlm_flags)
1300 {
1301 	int ret = 0;
1302 	unsigned long flags;
1303 	unsigned int gen;
1304 
1305 	mlog(0, "lock %s, level = %d, flags = %u\n", lockres->l_name, level,
1306 	     dlm_flags);
1307 
1308 	spin_lock_irqsave(&lockres->l_lock, flags);
1309 	if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
1310 	    (lockres->l_flags & OCFS2_LOCK_BUSY)) {
1311 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1312 		goto bail;
1313 	}
1314 
1315 	lockres->l_action = OCFS2_AST_ATTACH;
1316 	lockres->l_requested = level;
1317 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1318 	gen = lockres_set_pending(lockres);
1319 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1320 
1321 	ret = ocfs2_dlm_lock(osb->cconn,
1322 			     level,
1323 			     &lockres->l_lksb,
1324 			     dlm_flags,
1325 			     lockres->l_name,
1326 			     OCFS2_LOCK_ID_MAX_LEN - 1);
1327 	lockres_clear_pending(lockres, gen, osb);
1328 	if (ret) {
1329 		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
1330 		ocfs2_recover_from_dlm_error(lockres, 1);
1331 	}
1332 
1333 	mlog(0, "lock %s, return from ocfs2_dlm_lock\n", lockres->l_name);
1334 
1335 bail:
1336 	return ret;
1337 }
1338 
ocfs2_check_wait_flag(struct ocfs2_lock_res * lockres,int flag)1339 static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
1340 					int flag)
1341 {
1342 	unsigned long flags;
1343 	int ret;
1344 
1345 	spin_lock_irqsave(&lockres->l_lock, flags);
1346 	ret = lockres->l_flags & flag;
1347 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1348 
1349 	return ret;
1350 }
1351 
ocfs2_wait_on_busy_lock(struct ocfs2_lock_res * lockres)1352 static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
1353 
1354 {
1355 	wait_event(lockres->l_event,
1356 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
1357 }
1358 
ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res * lockres)1359 static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
1360 
1361 {
1362 	wait_event(lockres->l_event,
1363 		   !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
1364 }
1365 
1366 /* predict what lock level we'll be dropping down to on behalf
1367  * of another node, and return true if the currently wanted
1368  * level will be compatible with it. */
ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res * lockres,int wanted)1369 static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
1370 						     int wanted)
1371 {
1372 	BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
1373 
1374 	return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
1375 }
1376 
ocfs2_init_mask_waiter(struct ocfs2_mask_waiter * mw)1377 static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
1378 {
1379 	INIT_LIST_HEAD(&mw->mw_item);
1380 	init_completion(&mw->mw_complete);
1381 	ocfs2_init_start_time(mw);
1382 }
1383 
ocfs2_wait_for_mask(struct ocfs2_mask_waiter * mw)1384 static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
1385 {
1386 	wait_for_completion(&mw->mw_complete);
1387 	/* Re-arm the completion in case we want to wait on it again */
1388 	reinit_completion(&mw->mw_complete);
1389 	return mw->mw_status;
1390 }
1391 
lockres_add_mask_waiter(struct ocfs2_lock_res * lockres,struct ocfs2_mask_waiter * mw,unsigned long mask,unsigned long goal)1392 static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
1393 				    struct ocfs2_mask_waiter *mw,
1394 				    unsigned long mask,
1395 				    unsigned long goal)
1396 {
1397 	BUG_ON(!list_empty(&mw->mw_item));
1398 
1399 	assert_spin_locked(&lockres->l_lock);
1400 
1401 	list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
1402 	mw->mw_mask = mask;
1403 	mw->mw_goal = goal;
1404 }
1405 
1406 /* returns 0 if the mw that was removed was already satisfied, -EBUSY
1407  * if the mask still hadn't reached its goal */
__lockres_remove_mask_waiter(struct ocfs2_lock_res * lockres,struct ocfs2_mask_waiter * mw)1408 static int __lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1409 				      struct ocfs2_mask_waiter *mw)
1410 {
1411 	int ret = 0;
1412 
1413 	assert_spin_locked(&lockres->l_lock);
1414 	if (!list_empty(&mw->mw_item)) {
1415 		if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
1416 			ret = -EBUSY;
1417 
1418 		list_del_init(&mw->mw_item);
1419 		init_completion(&mw->mw_complete);
1420 	}
1421 
1422 	return ret;
1423 }
1424 
lockres_remove_mask_waiter(struct ocfs2_lock_res * lockres,struct ocfs2_mask_waiter * mw)1425 static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
1426 				      struct ocfs2_mask_waiter *mw)
1427 {
1428 	unsigned long flags;
1429 	int ret = 0;
1430 
1431 	spin_lock_irqsave(&lockres->l_lock, flags);
1432 	ret = __lockres_remove_mask_waiter(lockres, mw);
1433 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1434 
1435 	return ret;
1436 
1437 }
1438 
ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter * mw,struct ocfs2_lock_res * lockres)1439 static int ocfs2_wait_for_mask_interruptible(struct ocfs2_mask_waiter *mw,
1440 					     struct ocfs2_lock_res *lockres)
1441 {
1442 	int ret;
1443 
1444 	ret = wait_for_completion_interruptible(&mw->mw_complete);
1445 	if (ret)
1446 		lockres_remove_mask_waiter(lockres, mw);
1447 	else
1448 		ret = mw->mw_status;
1449 	/* Re-arm the completion in case we want to wait on it again */
1450 	reinit_completion(&mw->mw_complete);
1451 	return ret;
1452 }
1453 
__ocfs2_cluster_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,int level,u32 lkm_flags,int arg_flags,int l_subclass,unsigned long caller_ip)1454 static int __ocfs2_cluster_lock(struct ocfs2_super *osb,
1455 				struct ocfs2_lock_res *lockres,
1456 				int level,
1457 				u32 lkm_flags,
1458 				int arg_flags,
1459 				int l_subclass,
1460 				unsigned long caller_ip)
1461 {
1462 	struct ocfs2_mask_waiter mw;
1463 	int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
1464 	int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
1465 	unsigned long flags;
1466 	unsigned int gen;
1467 	int noqueue_attempted = 0;
1468 	int dlm_locked = 0;
1469 	int kick_dc = 0;
1470 
1471 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) {
1472 		mlog_errno(-EINVAL);
1473 		return -EINVAL;
1474 	}
1475 
1476 	ocfs2_init_mask_waiter(&mw);
1477 
1478 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
1479 		lkm_flags |= DLM_LKF_VALBLK;
1480 
1481 again:
1482 	wait = 0;
1483 
1484 	spin_lock_irqsave(&lockres->l_lock, flags);
1485 
1486 	if (catch_signals && signal_pending(current)) {
1487 		ret = -ERESTARTSYS;
1488 		goto unlock;
1489 	}
1490 
1491 	mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
1492 			"Cluster lock called on freeing lockres %s! flags "
1493 			"0x%lx\n", lockres->l_name, lockres->l_flags);
1494 
1495 	/* We only compare against the currently granted level
1496 	 * here. If the lock is blocked waiting on a downconvert,
1497 	 * we'll get caught below. */
1498 	if (lockres->l_flags & OCFS2_LOCK_BUSY &&
1499 	    level > lockres->l_level) {
1500 		/* is someone sitting in dlm_lock? If so, wait on
1501 		 * them. */
1502 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1503 		wait = 1;
1504 		goto unlock;
1505 	}
1506 
1507 	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING) {
1508 		/*
1509 		 * We've upconverted. If the lock now has a level we can
1510 		 * work with, we take it. If, however, the lock is not at the
1511 		 * required level, we go thru the full cycle. One way this could
1512 		 * happen is if a process requesting an upconvert to PR is
1513 		 * closely followed by another requesting upconvert to an EX.
1514 		 * If the process requesting EX lands here, we want it to
1515 		 * continue attempting to upconvert and let the process
1516 		 * requesting PR take the lock.
1517 		 * If multiple processes request upconvert to PR, the first one
1518 		 * here will take the lock. The others will have to go thru the
1519 		 * OCFS2_LOCK_BLOCKED check to ensure that there is no pending
1520 		 * downconvert request.
1521 		 */
1522 		if (level <= lockres->l_level)
1523 			goto update_holders;
1524 	}
1525 
1526 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
1527 	    !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
1528 		/* is the lock is currently blocked on behalf of
1529 		 * another node */
1530 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
1531 		wait = 1;
1532 		goto unlock;
1533 	}
1534 
1535 	if (level > lockres->l_level) {
1536 		if (noqueue_attempted > 0) {
1537 			ret = -EAGAIN;
1538 			goto unlock;
1539 		}
1540 		if (lkm_flags & DLM_LKF_NOQUEUE)
1541 			noqueue_attempted = 1;
1542 
1543 		if (lockres->l_action != OCFS2_AST_INVALID)
1544 			mlog(ML_ERROR, "lockres %s has action %u pending\n",
1545 			     lockres->l_name, lockres->l_action);
1546 
1547 		if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1548 			lockres->l_action = OCFS2_AST_ATTACH;
1549 			lkm_flags &= ~DLM_LKF_CONVERT;
1550 		} else {
1551 			lockres->l_action = OCFS2_AST_CONVERT;
1552 			lkm_flags |= DLM_LKF_CONVERT;
1553 		}
1554 
1555 		lockres->l_requested = level;
1556 		lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
1557 		gen = lockres_set_pending(lockres);
1558 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1559 
1560 		BUG_ON(level == DLM_LOCK_IV);
1561 		BUG_ON(level == DLM_LOCK_NL);
1562 
1563 		mlog(ML_BASTS, "lockres %s, convert from %d to %d\n",
1564 		     lockres->l_name, lockres->l_level, level);
1565 
1566 		/* call dlm_lock to upgrade lock now */
1567 		ret = ocfs2_dlm_lock(osb->cconn,
1568 				     level,
1569 				     &lockres->l_lksb,
1570 				     lkm_flags,
1571 				     lockres->l_name,
1572 				     OCFS2_LOCK_ID_MAX_LEN - 1);
1573 		lockres_clear_pending(lockres, gen, osb);
1574 		if (ret) {
1575 			if (!(lkm_flags & DLM_LKF_NOQUEUE) ||
1576 			    (ret != -EAGAIN)) {
1577 				ocfs2_log_dlm_error("ocfs2_dlm_lock",
1578 						    ret, lockres);
1579 			}
1580 			ocfs2_recover_from_dlm_error(lockres, 1);
1581 			goto out;
1582 		}
1583 		dlm_locked = 1;
1584 
1585 		mlog(0, "lock %s, successful return from ocfs2_dlm_lock\n",
1586 		     lockres->l_name);
1587 
1588 		/* At this point we've gone inside the dlm and need to
1589 		 * complete our work regardless. */
1590 		catch_signals = 0;
1591 
1592 		/* wait for busy to clear and carry on */
1593 		goto again;
1594 	}
1595 
1596 update_holders:
1597 	/* Ok, if we get here then we're good to go. */
1598 	ocfs2_inc_holders(lockres, level);
1599 
1600 	ret = 0;
1601 unlock:
1602 	lockres_clear_flags(lockres, OCFS2_LOCK_UPCONVERT_FINISHING);
1603 
1604 	/* ocfs2_unblock_lock reques on seeing OCFS2_LOCK_UPCONVERT_FINISHING */
1605 	kick_dc = (lockres->l_flags & OCFS2_LOCK_BLOCKED);
1606 
1607 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1608 	if (kick_dc)
1609 		ocfs2_wake_downconvert_thread(osb);
1610 out:
1611 	/*
1612 	 * This is helping work around a lock inversion between the page lock
1613 	 * and dlm locks.  One path holds the page lock while calling aops
1614 	 * which block acquiring dlm locks.  The voting thread holds dlm
1615 	 * locks while acquiring page locks while down converting data locks.
1616 	 * This block is helping an aop path notice the inversion and back
1617 	 * off to unlock its page lock before trying the dlm lock again.
1618 	 */
1619 	if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1620 	    mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1621 		wait = 0;
1622 		spin_lock_irqsave(&lockres->l_lock, flags);
1623 		if (__lockres_remove_mask_waiter(lockres, &mw)) {
1624 			if (dlm_locked)
1625 				lockres_or_flags(lockres,
1626 					OCFS2_LOCK_NONBLOCK_FINISHED);
1627 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1628 			ret = -EAGAIN;
1629 		} else {
1630 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1631 			goto again;
1632 		}
1633 	}
1634 	if (wait) {
1635 		ret = ocfs2_wait_for_mask(&mw);
1636 		if (ret == 0)
1637 			goto again;
1638 		mlog_errno(ret);
1639 	}
1640 	ocfs2_update_lock_stats(lockres, level, &mw, ret);
1641 
1642 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1643 	if (!ret && lockres->l_lockdep_map.key != NULL) {
1644 		if (level == DLM_LOCK_PR)
1645 			rwsem_acquire_read(&lockres->l_lockdep_map, l_subclass,
1646 				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1647 				caller_ip);
1648 		else
1649 			rwsem_acquire(&lockres->l_lockdep_map, l_subclass,
1650 				!!(arg_flags & OCFS2_META_LOCK_NOQUEUE),
1651 				caller_ip);
1652 	}
1653 #endif
1654 	return ret;
1655 }
1656 
ocfs2_cluster_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,int level,u32 lkm_flags,int arg_flags)1657 static inline int ocfs2_cluster_lock(struct ocfs2_super *osb,
1658 				     struct ocfs2_lock_res *lockres,
1659 				     int level,
1660 				     u32 lkm_flags,
1661 				     int arg_flags)
1662 {
1663 	return __ocfs2_cluster_lock(osb, lockres, level, lkm_flags, arg_flags,
1664 				    0, _RET_IP_);
1665 }
1666 
1667 
__ocfs2_cluster_unlock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,int level,unsigned long caller_ip)1668 static void __ocfs2_cluster_unlock(struct ocfs2_super *osb,
1669 				   struct ocfs2_lock_res *lockres,
1670 				   int level,
1671 				   unsigned long caller_ip)
1672 {
1673 	unsigned long flags;
1674 
1675 	spin_lock_irqsave(&lockres->l_lock, flags);
1676 	ocfs2_dec_holders(lockres, level);
1677 	ocfs2_downconvert_on_unlock(osb, lockres);
1678 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1679 #ifdef CONFIG_DEBUG_LOCK_ALLOC
1680 	if (lockres->l_lockdep_map.key != NULL)
1681 		rwsem_release(&lockres->l_lockdep_map, 1, caller_ip);
1682 #endif
1683 }
1684 
ocfs2_create_new_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,int ex,int local)1685 static int ocfs2_create_new_lock(struct ocfs2_super *osb,
1686 				 struct ocfs2_lock_res *lockres,
1687 				 int ex,
1688 				 int local)
1689 {
1690 	int level =  ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1691 	unsigned long flags;
1692 	u32 lkm_flags = local ? DLM_LKF_LOCAL : 0;
1693 
1694 	spin_lock_irqsave(&lockres->l_lock, flags);
1695 	BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1696 	lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1697 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1698 
1699 	return ocfs2_lock_create(osb, lockres, level, lkm_flags);
1700 }
1701 
1702 /* Grants us an EX lock on the data and metadata resources, skipping
1703  * the normal cluster directory lookup. Use this ONLY on newly created
1704  * inodes which other nodes can't possibly see, and which haven't been
1705  * hashed in the inode hash yet. This can give us a good performance
1706  * increase as it'll skip the network broadcast normally associated
1707  * with creating a new lock resource. */
ocfs2_create_new_inode_locks(struct inode * inode)1708 int ocfs2_create_new_inode_locks(struct inode *inode)
1709 {
1710 	int ret;
1711 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1712 
1713 	BUG_ON(!ocfs2_inode_is_new(inode));
1714 
1715 	mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
1716 
1717 	/* NOTE: That we don't increment any of the holder counts, nor
1718 	 * do we add anything to a journal handle. Since this is
1719 	 * supposed to be a new inode which the cluster doesn't know
1720 	 * about yet, there is no need to.  As far as the LVB handling
1721 	 * is concerned, this is basically like acquiring an EX lock
1722 	 * on a resource which has an invalid one -- we'll set it
1723 	 * valid when we release the EX. */
1724 
1725 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
1726 	if (ret) {
1727 		mlog_errno(ret);
1728 		goto bail;
1729 	}
1730 
1731 	/*
1732 	 * We don't want to use DLM_LKF_LOCAL on a meta data lock as they
1733 	 * don't use a generation in their lock names.
1734 	 */
1735 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_inode_lockres, 1, 0);
1736 	if (ret) {
1737 		mlog_errno(ret);
1738 		goto bail;
1739 	}
1740 
1741 	ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_open_lockres, 0, 0);
1742 	if (ret)
1743 		mlog_errno(ret);
1744 
1745 bail:
1746 	return ret;
1747 }
1748 
ocfs2_rw_lock(struct inode * inode,int write)1749 int ocfs2_rw_lock(struct inode *inode, int write)
1750 {
1751 	int status, level;
1752 	struct ocfs2_lock_res *lockres;
1753 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1754 
1755 	mlog(0, "inode %llu take %s RW lock\n",
1756 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1757 	     write ? "EXMODE" : "PRMODE");
1758 
1759 	if (ocfs2_mount_local(osb))
1760 		return 0;
1761 
1762 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1763 
1764 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1765 
1766 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1767 	if (status < 0)
1768 		mlog_errno(status);
1769 
1770 	return status;
1771 }
1772 
ocfs2_try_rw_lock(struct inode * inode,int write)1773 int ocfs2_try_rw_lock(struct inode *inode, int write)
1774 {
1775 	int status, level;
1776 	struct ocfs2_lock_res *lockres;
1777 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1778 
1779 	mlog(0, "inode %llu try to take %s RW lock\n",
1780 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1781 	     write ? "EXMODE" : "PRMODE");
1782 
1783 	if (ocfs2_mount_local(osb))
1784 		return 0;
1785 
1786 	lockres = &OCFS2_I(inode)->ip_rw_lockres;
1787 
1788 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1789 
1790 	status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1791 	return status;
1792 }
1793 
ocfs2_rw_unlock(struct inode * inode,int write)1794 void ocfs2_rw_unlock(struct inode *inode, int write)
1795 {
1796 	int level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1797 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1798 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1799 
1800 	mlog(0, "inode %llu drop %s RW lock\n",
1801 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1802 	     write ? "EXMODE" : "PRMODE");
1803 
1804 	if (!ocfs2_mount_local(osb))
1805 		ocfs2_cluster_unlock(osb, lockres, level);
1806 }
1807 
1808 /*
1809  * ocfs2_open_lock always get PR mode lock.
1810  */
ocfs2_open_lock(struct inode * inode)1811 int ocfs2_open_lock(struct inode *inode)
1812 {
1813 	int status = 0;
1814 	struct ocfs2_lock_res *lockres;
1815 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1816 
1817 	mlog(0, "inode %llu take PRMODE open lock\n",
1818 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1819 
1820 	if (ocfs2_is_hard_readonly(osb) || ocfs2_mount_local(osb))
1821 		goto out;
1822 
1823 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1824 
1825 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_PR, 0, 0);
1826 	if (status < 0)
1827 		mlog_errno(status);
1828 
1829 out:
1830 	return status;
1831 }
1832 
ocfs2_try_open_lock(struct inode * inode,int write)1833 int ocfs2_try_open_lock(struct inode *inode, int write)
1834 {
1835 	int status = 0, level;
1836 	struct ocfs2_lock_res *lockres;
1837 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1838 
1839 	mlog(0, "inode %llu try to take %s open lock\n",
1840 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
1841 	     write ? "EXMODE" : "PRMODE");
1842 
1843 	if (ocfs2_is_hard_readonly(osb)) {
1844 		if (write)
1845 			status = -EROFS;
1846 		goto out;
1847 	}
1848 
1849 	if (ocfs2_mount_local(osb))
1850 		goto out;
1851 
1852 	lockres = &OCFS2_I(inode)->ip_open_lockres;
1853 
1854 	level = write ? DLM_LOCK_EX : DLM_LOCK_PR;
1855 
1856 	/*
1857 	 * The file system may already holding a PRMODE/EXMODE open lock.
1858 	 * Since we pass DLM_LKF_NOQUEUE, the request won't block waiting on
1859 	 * other nodes and the -EAGAIN will indicate to the caller that
1860 	 * this inode is still in use.
1861 	 */
1862 	status = ocfs2_cluster_lock(osb, lockres, level, DLM_LKF_NOQUEUE, 0);
1863 
1864 out:
1865 	return status;
1866 }
1867 
1868 /*
1869  * ocfs2_open_unlock unlock PR and EX mode open locks.
1870  */
ocfs2_open_unlock(struct inode * inode)1871 void ocfs2_open_unlock(struct inode *inode)
1872 {
1873 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_open_lockres;
1874 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1875 
1876 	mlog(0, "inode %llu drop open lock\n",
1877 	     (unsigned long long)OCFS2_I(inode)->ip_blkno);
1878 
1879 	if (ocfs2_mount_local(osb))
1880 		goto out;
1881 
1882 	if(lockres->l_ro_holders)
1883 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_PR);
1884 	if(lockres->l_ex_holders)
1885 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
1886 
1887 out:
1888 	return;
1889 }
1890 
ocfs2_flock_handle_signal(struct ocfs2_lock_res * lockres,int level)1891 static int ocfs2_flock_handle_signal(struct ocfs2_lock_res *lockres,
1892 				     int level)
1893 {
1894 	int ret;
1895 	struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
1896 	unsigned long flags;
1897 	struct ocfs2_mask_waiter mw;
1898 
1899 	ocfs2_init_mask_waiter(&mw);
1900 
1901 retry_cancel:
1902 	spin_lock_irqsave(&lockres->l_lock, flags);
1903 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
1904 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
1905 		if (ret) {
1906 			spin_unlock_irqrestore(&lockres->l_lock, flags);
1907 			ret = ocfs2_cancel_convert(osb, lockres);
1908 			if (ret < 0) {
1909 				mlog_errno(ret);
1910 				goto out;
1911 			}
1912 			goto retry_cancel;
1913 		}
1914 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1915 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1916 
1917 		ocfs2_wait_for_mask(&mw);
1918 		goto retry_cancel;
1919 	}
1920 
1921 	ret = -ERESTARTSYS;
1922 	/*
1923 	 * We may still have gotten the lock, in which case there's no
1924 	 * point to restarting the syscall.
1925 	 */
1926 	if (lockres->l_level == level)
1927 		ret = 0;
1928 
1929 	mlog(0, "Cancel returning %d. flags: 0x%lx, level: %d, act: %d\n", ret,
1930 	     lockres->l_flags, lockres->l_level, lockres->l_action);
1931 
1932 	spin_unlock_irqrestore(&lockres->l_lock, flags);
1933 
1934 out:
1935 	return ret;
1936 }
1937 
1938 /*
1939  * ocfs2_file_lock() and ocfs2_file_unlock() map to a single pair of
1940  * flock() calls. The locking approach this requires is sufficiently
1941  * different from all other cluster lock types that we implement a
1942  * separate path to the "low-level" dlm calls. In particular:
1943  *
1944  * - No optimization of lock levels is done - we take at exactly
1945  *   what's been requested.
1946  *
1947  * - No lock caching is employed. We immediately downconvert to
1948  *   no-lock at unlock time. This also means flock locks never go on
1949  *   the blocking list).
1950  *
1951  * - Since userspace can trivially deadlock itself with flock, we make
1952  *   sure to allow cancellation of a misbehaving applications flock()
1953  *   request.
1954  *
1955  * - Access to any flock lockres doesn't require concurrency, so we
1956  *   can simplify the code by requiring the caller to guarantee
1957  *   serialization of dlmglue flock calls.
1958  */
ocfs2_file_lock(struct file * file,int ex,int trylock)1959 int ocfs2_file_lock(struct file *file, int ex, int trylock)
1960 {
1961 	int ret, level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
1962 	unsigned int lkm_flags = trylock ? DLM_LKF_NOQUEUE : 0;
1963 	unsigned long flags;
1964 	struct ocfs2_file_private *fp = file->private_data;
1965 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
1966 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
1967 	struct ocfs2_mask_waiter mw;
1968 
1969 	ocfs2_init_mask_waiter(&mw);
1970 
1971 	if ((lockres->l_flags & OCFS2_LOCK_BUSY) ||
1972 	    (lockres->l_level > DLM_LOCK_NL)) {
1973 		mlog(ML_ERROR,
1974 		     "File lock \"%s\" has busy or locked state: flags: 0x%lx, "
1975 		     "level: %u\n", lockres->l_name, lockres->l_flags,
1976 		     lockres->l_level);
1977 		return -EINVAL;
1978 	}
1979 
1980 	spin_lock_irqsave(&lockres->l_lock, flags);
1981 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
1982 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
1983 		spin_unlock_irqrestore(&lockres->l_lock, flags);
1984 
1985 		/*
1986 		 * Get the lock at NLMODE to start - that way we
1987 		 * can cancel the upconvert request if need be.
1988 		 */
1989 		ret = ocfs2_lock_create(osb, lockres, DLM_LOCK_NL, 0);
1990 		if (ret < 0) {
1991 			mlog_errno(ret);
1992 			goto out;
1993 		}
1994 
1995 		ret = ocfs2_wait_for_mask(&mw);
1996 		if (ret) {
1997 			mlog_errno(ret);
1998 			goto out;
1999 		}
2000 		spin_lock_irqsave(&lockres->l_lock, flags);
2001 	}
2002 
2003 	lockres->l_action = OCFS2_AST_CONVERT;
2004 	lkm_flags |= DLM_LKF_CONVERT;
2005 	lockres->l_requested = level;
2006 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2007 
2008 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2009 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2010 
2011 	ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags,
2012 			     lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1);
2013 	if (ret) {
2014 		if (!trylock || (ret != -EAGAIN)) {
2015 			ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
2016 			ret = -EINVAL;
2017 		}
2018 
2019 		ocfs2_recover_from_dlm_error(lockres, 1);
2020 		lockres_remove_mask_waiter(lockres, &mw);
2021 		goto out;
2022 	}
2023 
2024 	ret = ocfs2_wait_for_mask_interruptible(&mw, lockres);
2025 	if (ret == -ERESTARTSYS) {
2026 		/*
2027 		 * Userspace can cause deadlock itself with
2028 		 * flock(). Current behavior locally is to allow the
2029 		 * deadlock, but abort the system call if a signal is
2030 		 * received. We follow this example, otherwise a
2031 		 * poorly written program could sit in kernel until
2032 		 * reboot.
2033 		 *
2034 		 * Handling this is a bit more complicated for Ocfs2
2035 		 * though. We can't exit this function with an
2036 		 * outstanding lock request, so a cancel convert is
2037 		 * required. We intentionally overwrite 'ret' - if the
2038 		 * cancel fails and the lock was granted, it's easier
2039 		 * to just bubble success back up to the user.
2040 		 */
2041 		ret = ocfs2_flock_handle_signal(lockres, level);
2042 	} else if (!ret && (level > lockres->l_level)) {
2043 		/* Trylock failed asynchronously */
2044 		BUG_ON(!trylock);
2045 		ret = -EAGAIN;
2046 	}
2047 
2048 out:
2049 
2050 	mlog(0, "Lock: \"%s\" ex: %d, trylock: %d, returns: %d\n",
2051 	     lockres->l_name, ex, trylock, ret);
2052 	return ret;
2053 }
2054 
ocfs2_file_unlock(struct file * file)2055 void ocfs2_file_unlock(struct file *file)
2056 {
2057 	int ret;
2058 	unsigned int gen;
2059 	unsigned long flags;
2060 	struct ocfs2_file_private *fp = file->private_data;
2061 	struct ocfs2_lock_res *lockres = &fp->fp_flock;
2062 	struct ocfs2_super *osb = OCFS2_SB(file->f_mapping->host->i_sb);
2063 	struct ocfs2_mask_waiter mw;
2064 
2065 	ocfs2_init_mask_waiter(&mw);
2066 
2067 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED))
2068 		return;
2069 
2070 	if (lockres->l_level == DLM_LOCK_NL)
2071 		return;
2072 
2073 	mlog(0, "Unlock: \"%s\" flags: 0x%lx, level: %d, act: %d\n",
2074 	     lockres->l_name, lockres->l_flags, lockres->l_level,
2075 	     lockres->l_action);
2076 
2077 	spin_lock_irqsave(&lockres->l_lock, flags);
2078 	/*
2079 	 * Fake a blocking ast for the downconvert code.
2080 	 */
2081 	lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
2082 	lockres->l_blocking = DLM_LOCK_EX;
2083 
2084 	gen = ocfs2_prepare_downconvert(lockres, DLM_LOCK_NL);
2085 	lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
2086 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2087 
2088 	ret = ocfs2_downconvert_lock(osb, lockres, DLM_LOCK_NL, 0, gen);
2089 	if (ret) {
2090 		mlog_errno(ret);
2091 		return;
2092 	}
2093 
2094 	ret = ocfs2_wait_for_mask(&mw);
2095 	if (ret)
2096 		mlog_errno(ret);
2097 }
2098 
ocfs2_downconvert_on_unlock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)2099 static void ocfs2_downconvert_on_unlock(struct ocfs2_super *osb,
2100 					struct ocfs2_lock_res *lockres)
2101 {
2102 	int kick = 0;
2103 
2104 	/* If we know that another node is waiting on our lock, kick
2105 	 * the downconvert thread * pre-emptively when we reach a release
2106 	 * condition. */
2107 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
2108 		switch(lockres->l_blocking) {
2109 		case DLM_LOCK_EX:
2110 			if (!lockres->l_ex_holders && !lockres->l_ro_holders)
2111 				kick = 1;
2112 			break;
2113 		case DLM_LOCK_PR:
2114 			if (!lockres->l_ex_holders)
2115 				kick = 1;
2116 			break;
2117 		default:
2118 			BUG();
2119 		}
2120 	}
2121 
2122 	if (kick)
2123 		ocfs2_wake_downconvert_thread(osb);
2124 }
2125 
2126 #define OCFS2_SEC_BITS   34
2127 #define OCFS2_SEC_SHIFT  (64 - 34)
2128 #define OCFS2_NSEC_MASK  ((1ULL << OCFS2_SEC_SHIFT) - 1)
2129 
2130 /* LVB only has room for 64 bits of time here so we pack it for
2131  * now. */
ocfs2_pack_timespec(struct timespec * spec)2132 static u64 ocfs2_pack_timespec(struct timespec *spec)
2133 {
2134 	u64 res;
2135 	u64 sec = spec->tv_sec;
2136 	u32 nsec = spec->tv_nsec;
2137 
2138 	res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
2139 
2140 	return res;
2141 }
2142 
2143 /* Call this with the lockres locked. I am reasonably sure we don't
2144  * need ip_lock in this function as anyone who would be changing those
2145  * values is supposed to be blocked in ocfs2_inode_lock right now. */
__ocfs2_stuff_meta_lvb(struct inode * inode)2146 static void __ocfs2_stuff_meta_lvb(struct inode *inode)
2147 {
2148 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2149 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2150 	struct ocfs2_meta_lvb *lvb;
2151 	struct timespec ts;
2152 
2153 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2154 
2155 	/*
2156 	 * Invalidate the LVB of a deleted inode - this way other
2157 	 * nodes are forced to go to disk and discover the new inode
2158 	 * status.
2159 	 */
2160 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2161 		lvb->lvb_version = 0;
2162 		goto out;
2163 	}
2164 
2165 	lvb->lvb_version   = OCFS2_LVB_VERSION;
2166 	lvb->lvb_isize	   = cpu_to_be64(i_size_read(inode));
2167 	lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
2168 	lvb->lvb_iuid      = cpu_to_be32(i_uid_read(inode));
2169 	lvb->lvb_igid      = cpu_to_be32(i_gid_read(inode));
2170 	lvb->lvb_imode     = cpu_to_be16(inode->i_mode);
2171 	lvb->lvb_inlink    = cpu_to_be16(inode->i_nlink);
2172 	ts = timespec64_to_timespec(inode->i_atime);
2173 	lvb->lvb_iatime_packed  =
2174 		cpu_to_be64(ocfs2_pack_timespec(&ts));
2175 	ts = timespec64_to_timespec(inode->i_ctime);
2176 	lvb->lvb_ictime_packed =
2177 		cpu_to_be64(ocfs2_pack_timespec(&ts));
2178 	ts = timespec64_to_timespec(inode->i_mtime);
2179 	lvb->lvb_imtime_packed =
2180 		cpu_to_be64(ocfs2_pack_timespec(&ts));
2181 	lvb->lvb_iattr    = cpu_to_be32(oi->ip_attr);
2182 	lvb->lvb_idynfeatures = cpu_to_be16(oi->ip_dyn_features);
2183 	lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
2184 
2185 out:
2186 	mlog_meta_lvb(0, lockres);
2187 }
2188 
ocfs2_unpack_timespec(struct timespec * spec,u64 packed_time)2189 static void ocfs2_unpack_timespec(struct timespec *spec,
2190 				  u64 packed_time)
2191 {
2192 	spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
2193 	spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
2194 }
2195 
ocfs2_refresh_inode_from_lvb(struct inode * inode)2196 static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
2197 {
2198 	struct timespec ts;
2199 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2200 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2201 	struct ocfs2_meta_lvb *lvb;
2202 
2203 	mlog_meta_lvb(0, lockres);
2204 
2205 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2206 
2207 	/* We're safe here without the lockres lock... */
2208 	spin_lock(&oi->ip_lock);
2209 	oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
2210 	i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
2211 
2212 	oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
2213 	oi->ip_dyn_features = be16_to_cpu(lvb->lvb_idynfeatures);
2214 	ocfs2_set_inode_flags(inode);
2215 
2216 	/* fast-symlinks are a special case */
2217 	if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
2218 		inode->i_blocks = 0;
2219 	else
2220 		inode->i_blocks = ocfs2_inode_sector_count(inode);
2221 
2222 	i_uid_write(inode, be32_to_cpu(lvb->lvb_iuid));
2223 	i_gid_write(inode, be32_to_cpu(lvb->lvb_igid));
2224 	inode->i_mode    = be16_to_cpu(lvb->lvb_imode);
2225 	set_nlink(inode, be16_to_cpu(lvb->lvb_inlink));
2226 	ocfs2_unpack_timespec(&ts,
2227 			      be64_to_cpu(lvb->lvb_iatime_packed));
2228 	inode->i_atime = timespec_to_timespec64(ts);
2229 	ocfs2_unpack_timespec(&ts,
2230 			      be64_to_cpu(lvb->lvb_imtime_packed));
2231 	inode->i_mtime = timespec_to_timespec64(ts);
2232 	ocfs2_unpack_timespec(&ts,
2233 			      be64_to_cpu(lvb->lvb_ictime_packed));
2234 	inode->i_ctime = timespec_to_timespec64(ts);
2235 	spin_unlock(&oi->ip_lock);
2236 }
2237 
ocfs2_meta_lvb_is_trustable(struct inode * inode,struct ocfs2_lock_res * lockres)2238 static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
2239 					      struct ocfs2_lock_res *lockres)
2240 {
2241 	struct ocfs2_meta_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2242 
2243 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)
2244 	    && lvb->lvb_version == OCFS2_LVB_VERSION
2245 	    && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
2246 		return 1;
2247 	return 0;
2248 }
2249 
2250 /* Determine whether a lock resource needs to be refreshed, and
2251  * arbitrate who gets to refresh it.
2252  *
2253  *   0 means no refresh needed.
2254  *
2255  *   > 0 means you need to refresh this and you MUST call
2256  *   ocfs2_complete_lock_res_refresh afterwards. */
ocfs2_should_refresh_lock_res(struct ocfs2_lock_res * lockres)2257 static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
2258 {
2259 	unsigned long flags;
2260 	int status = 0;
2261 
2262 refresh_check:
2263 	spin_lock_irqsave(&lockres->l_lock, flags);
2264 	if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
2265 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2266 		goto bail;
2267 	}
2268 
2269 	if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
2270 		spin_unlock_irqrestore(&lockres->l_lock, flags);
2271 
2272 		ocfs2_wait_on_refreshing_lock(lockres);
2273 		goto refresh_check;
2274 	}
2275 
2276 	/* Ok, I'll be the one to refresh this lock. */
2277 	lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
2278 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2279 
2280 	status = 1;
2281 bail:
2282 	mlog(0, "status %d\n", status);
2283 	return status;
2284 }
2285 
2286 /* If status is non zero, I'll mark it as not being in refresh
2287  * anymroe, but i won't clear the needs refresh flag. */
ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res * lockres,int status)2288 static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
2289 						   int status)
2290 {
2291 	unsigned long flags;
2292 
2293 	spin_lock_irqsave(&lockres->l_lock, flags);
2294 	lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
2295 	if (!status)
2296 		lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
2297 	spin_unlock_irqrestore(&lockres->l_lock, flags);
2298 
2299 	wake_up(&lockres->l_event);
2300 }
2301 
2302 /* may or may not return a bh if it went to disk. */
ocfs2_inode_lock_update(struct inode * inode,struct buffer_head ** bh)2303 static int ocfs2_inode_lock_update(struct inode *inode,
2304 				  struct buffer_head **bh)
2305 {
2306 	int status = 0;
2307 	struct ocfs2_inode_info *oi = OCFS2_I(inode);
2308 	struct ocfs2_lock_res *lockres = &oi->ip_inode_lockres;
2309 	struct ocfs2_dinode *fe;
2310 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2311 
2312 	if (ocfs2_mount_local(osb))
2313 		goto bail;
2314 
2315 	spin_lock(&oi->ip_lock);
2316 	if (oi->ip_flags & OCFS2_INODE_DELETED) {
2317 		mlog(0, "Orphaned inode %llu was deleted while we "
2318 		     "were waiting on a lock. ip_flags = 0x%x\n",
2319 		     (unsigned long long)oi->ip_blkno, oi->ip_flags);
2320 		spin_unlock(&oi->ip_lock);
2321 		status = -ENOENT;
2322 		goto bail;
2323 	}
2324 	spin_unlock(&oi->ip_lock);
2325 
2326 	if (!ocfs2_should_refresh_lock_res(lockres))
2327 		goto bail;
2328 
2329 	/* This will discard any caching information we might have had
2330 	 * for the inode metadata. */
2331 	ocfs2_metadata_cache_purge(INODE_CACHE(inode));
2332 
2333 	ocfs2_extent_map_trunc(inode, 0);
2334 
2335 	if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
2336 		mlog(0, "Trusting LVB on inode %llu\n",
2337 		     (unsigned long long)oi->ip_blkno);
2338 		ocfs2_refresh_inode_from_lvb(inode);
2339 	} else {
2340 		/* Boo, we have to go to disk. */
2341 		/* read bh, cast, ocfs2_refresh_inode */
2342 		status = ocfs2_read_inode_block(inode, bh);
2343 		if (status < 0) {
2344 			mlog_errno(status);
2345 			goto bail_refresh;
2346 		}
2347 		fe = (struct ocfs2_dinode *) (*bh)->b_data;
2348 
2349 		/* This is a good chance to make sure we're not
2350 		 * locking an invalid object.  ocfs2_read_inode_block()
2351 		 * already checked that the inode block is sane.
2352 		 *
2353 		 * We bug on a stale inode here because we checked
2354 		 * above whether it was wiped from disk. The wiping
2355 		 * node provides a guarantee that we receive that
2356 		 * message and can mark the inode before dropping any
2357 		 * locks associated with it. */
2358 		mlog_bug_on_msg(inode->i_generation !=
2359 				le32_to_cpu(fe->i_generation),
2360 				"Invalid dinode %llu disk generation: %u "
2361 				"inode->i_generation: %u\n",
2362 				(unsigned long long)oi->ip_blkno,
2363 				le32_to_cpu(fe->i_generation),
2364 				inode->i_generation);
2365 		mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
2366 				!(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
2367 				"Stale dinode %llu dtime: %llu flags: 0x%x\n",
2368 				(unsigned long long)oi->ip_blkno,
2369 				(unsigned long long)le64_to_cpu(fe->i_dtime),
2370 				le32_to_cpu(fe->i_flags));
2371 
2372 		ocfs2_refresh_inode(inode, fe);
2373 		ocfs2_track_lock_refresh(lockres);
2374 	}
2375 
2376 	status = 0;
2377 bail_refresh:
2378 	ocfs2_complete_lock_res_refresh(lockres, status);
2379 bail:
2380 	return status;
2381 }
2382 
ocfs2_assign_bh(struct inode * inode,struct buffer_head ** ret_bh,struct buffer_head * passed_bh)2383 static int ocfs2_assign_bh(struct inode *inode,
2384 			   struct buffer_head **ret_bh,
2385 			   struct buffer_head *passed_bh)
2386 {
2387 	int status;
2388 
2389 	if (passed_bh) {
2390 		/* Ok, the update went to disk for us, use the
2391 		 * returned bh. */
2392 		*ret_bh = passed_bh;
2393 		get_bh(*ret_bh);
2394 
2395 		return 0;
2396 	}
2397 
2398 	status = ocfs2_read_inode_block(inode, ret_bh);
2399 	if (status < 0)
2400 		mlog_errno(status);
2401 
2402 	return status;
2403 }
2404 
2405 /*
2406  * returns < 0 error if the callback will never be called, otherwise
2407  * the result of the lock will be communicated via the callback.
2408  */
ocfs2_inode_lock_full_nested(struct inode * inode,struct buffer_head ** ret_bh,int ex,int arg_flags,int subclass)2409 int ocfs2_inode_lock_full_nested(struct inode *inode,
2410 				 struct buffer_head **ret_bh,
2411 				 int ex,
2412 				 int arg_flags,
2413 				 int subclass)
2414 {
2415 	int status, level, acquired;
2416 	u32 dlm_flags;
2417 	struct ocfs2_lock_res *lockres = NULL;
2418 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2419 	struct buffer_head *local_bh = NULL;
2420 
2421 	mlog(0, "inode %llu, take %s META lock\n",
2422 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2423 	     ex ? "EXMODE" : "PRMODE");
2424 
2425 	status = 0;
2426 	acquired = 0;
2427 	/* We'll allow faking a readonly metadata lock for
2428 	 * rodevices. */
2429 	if (ocfs2_is_hard_readonly(osb)) {
2430 		if (ex)
2431 			status = -EROFS;
2432 		goto getbh;
2433 	}
2434 
2435 	if ((arg_flags & OCFS2_META_LOCK_GETBH) ||
2436 	    ocfs2_mount_local(osb))
2437 		goto update;
2438 
2439 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2440 		ocfs2_wait_for_recovery(osb);
2441 
2442 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2443 	level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2444 	dlm_flags = 0;
2445 	if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
2446 		dlm_flags |= DLM_LKF_NOQUEUE;
2447 
2448 	status = __ocfs2_cluster_lock(osb, lockres, level, dlm_flags,
2449 				      arg_flags, subclass, _RET_IP_);
2450 	if (status < 0) {
2451 		if (status != -EAGAIN)
2452 			mlog_errno(status);
2453 		goto bail;
2454 	}
2455 
2456 	/* Notify the error cleanup path to drop the cluster lock. */
2457 	acquired = 1;
2458 
2459 	/* We wait twice because a node may have died while we were in
2460 	 * the lower dlm layers. The second time though, we've
2461 	 * committed to owning this lock so we don't allow signals to
2462 	 * abort the operation. */
2463 	if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
2464 		ocfs2_wait_for_recovery(osb);
2465 
2466 update:
2467 	/*
2468 	 * We only see this flag if we're being called from
2469 	 * ocfs2_read_locked_inode(). It means we're locking an inode
2470 	 * which hasn't been populated yet, so clear the refresh flag
2471 	 * and let the caller handle it.
2472 	 */
2473 	if (inode->i_state & I_NEW) {
2474 		status = 0;
2475 		if (lockres)
2476 			ocfs2_complete_lock_res_refresh(lockres, 0);
2477 		goto bail;
2478 	}
2479 
2480 	/* This is fun. The caller may want a bh back, or it may
2481 	 * not. ocfs2_inode_lock_update definitely wants one in, but
2482 	 * may or may not read one, depending on what's in the
2483 	 * LVB. The result of all of this is that we've *only* gone to
2484 	 * disk if we have to, so the complexity is worthwhile. */
2485 	status = ocfs2_inode_lock_update(inode, &local_bh);
2486 	if (status < 0) {
2487 		if (status != -ENOENT)
2488 			mlog_errno(status);
2489 		goto bail;
2490 	}
2491 getbh:
2492 	if (ret_bh) {
2493 		status = ocfs2_assign_bh(inode, ret_bh, local_bh);
2494 		if (status < 0) {
2495 			mlog_errno(status);
2496 			goto bail;
2497 		}
2498 	}
2499 
2500 bail:
2501 	if (status < 0) {
2502 		if (ret_bh && (*ret_bh)) {
2503 			brelse(*ret_bh);
2504 			*ret_bh = NULL;
2505 		}
2506 		if (acquired)
2507 			ocfs2_inode_unlock(inode, ex);
2508 	}
2509 
2510 	if (local_bh)
2511 		brelse(local_bh);
2512 
2513 	return status;
2514 }
2515 
2516 /*
2517  * This is working around a lock inversion between tasks acquiring DLM
2518  * locks while holding a page lock and the downconvert thread which
2519  * blocks dlm lock acquiry while acquiring page locks.
2520  *
2521  * ** These _with_page variantes are only intended to be called from aop
2522  * methods that hold page locks and return a very specific *positive* error
2523  * code that aop methods pass up to the VFS -- test for errors with != 0. **
2524  *
2525  * The DLM is called such that it returns -EAGAIN if it would have
2526  * blocked waiting for the downconvert thread.  In that case we unlock
2527  * our page so the downconvert thread can make progress.  Once we've
2528  * done this we have to return AOP_TRUNCATED_PAGE so the aop method
2529  * that called us can bubble that back up into the VFS who will then
2530  * immediately retry the aop call.
2531  */
ocfs2_inode_lock_with_page(struct inode * inode,struct buffer_head ** ret_bh,int ex,struct page * page)2532 int ocfs2_inode_lock_with_page(struct inode *inode,
2533 			      struct buffer_head **ret_bh,
2534 			      int ex,
2535 			      struct page *page)
2536 {
2537 	int ret;
2538 
2539 	ret = ocfs2_inode_lock_full(inode, ret_bh, ex, OCFS2_LOCK_NONBLOCK);
2540 	if (ret == -EAGAIN) {
2541 		unlock_page(page);
2542 		/*
2543 		 * If we can't get inode lock immediately, we should not return
2544 		 * directly here, since this will lead to a softlockup problem.
2545 		 * The method is to get a blocking lock and immediately unlock
2546 		 * before returning, this can avoid CPU resource waste due to
2547 		 * lots of retries, and benefits fairness in getting lock.
2548 		 */
2549 		if (ocfs2_inode_lock(inode, ret_bh, ex) == 0)
2550 			ocfs2_inode_unlock(inode, ex);
2551 		ret = AOP_TRUNCATED_PAGE;
2552 	}
2553 
2554 	return ret;
2555 }
2556 
ocfs2_inode_lock_atime(struct inode * inode,struct vfsmount * vfsmnt,int * level,int wait)2557 int ocfs2_inode_lock_atime(struct inode *inode,
2558 			  struct vfsmount *vfsmnt,
2559 			  int *level, int wait)
2560 {
2561 	int ret;
2562 
2563 	if (wait)
2564 		ret = ocfs2_inode_lock(inode, NULL, 0);
2565 	else
2566 		ret = ocfs2_try_inode_lock(inode, NULL, 0);
2567 
2568 	if (ret < 0) {
2569 		if (ret != -EAGAIN)
2570 			mlog_errno(ret);
2571 		return ret;
2572 	}
2573 
2574 	/*
2575 	 * If we should update atime, we will get EX lock,
2576 	 * otherwise we just get PR lock.
2577 	 */
2578 	if (ocfs2_should_update_atime(inode, vfsmnt)) {
2579 		struct buffer_head *bh = NULL;
2580 
2581 		ocfs2_inode_unlock(inode, 0);
2582 		if (wait)
2583 			ret = ocfs2_inode_lock(inode, &bh, 1);
2584 		else
2585 			ret = ocfs2_try_inode_lock(inode, &bh, 1);
2586 
2587 		if (ret < 0) {
2588 			if (ret != -EAGAIN)
2589 				mlog_errno(ret);
2590 			return ret;
2591 		}
2592 		*level = 1;
2593 		if (ocfs2_should_update_atime(inode, vfsmnt))
2594 			ocfs2_update_inode_atime(inode, bh);
2595 		if (bh)
2596 			brelse(bh);
2597 	} else
2598 		*level = 0;
2599 
2600 	return ret;
2601 }
2602 
ocfs2_inode_unlock(struct inode * inode,int ex)2603 void ocfs2_inode_unlock(struct inode *inode,
2604 		       int ex)
2605 {
2606 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2607 	struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_inode_lockres;
2608 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
2609 
2610 	mlog(0, "inode %llu drop %s META lock\n",
2611 	     (unsigned long long)OCFS2_I(inode)->ip_blkno,
2612 	     ex ? "EXMODE" : "PRMODE");
2613 
2614 	if (!ocfs2_is_hard_readonly(osb) &&
2615 	    !ocfs2_mount_local(osb))
2616 		ocfs2_cluster_unlock(osb, lockres, level);
2617 }
2618 
2619 /*
2620  * This _tracker variantes are introduced to deal with the recursive cluster
2621  * locking issue. The idea is to keep track of a lock holder on the stack of
2622  * the current process. If there's a lock holder on the stack, we know the
2623  * task context is already protected by cluster locking. Currently, they're
2624  * used in some VFS entry routines.
2625  *
2626  * return < 0 on error, return == 0 if there's no lock holder on the stack
2627  * before this call, return == 1 if this call would be a recursive locking.
2628  * return == -1 if this lock attempt will cause an upgrade which is forbidden.
2629  *
2630  * When taking lock levels into account,we face some different situations.
2631  *
2632  * 1. no lock is held
2633  *    In this case, just lock the inode as requested and return 0
2634  *
2635  * 2. We are holding a lock
2636  *    For this situation, things diverges into several cases
2637  *
2638  *    wanted     holding	     what to do
2639  *    ex		ex	    see 2.1 below
2640  *    ex		pr	    see 2.2 below
2641  *    pr		ex	    see 2.1 below
2642  *    pr		pr	    see 2.1 below
2643  *
2644  *    2.1 lock level that is been held is compatible
2645  *    with the wanted level, so no lock action will be tacken.
2646  *
2647  *    2.2 Otherwise, an upgrade is needed, but it is forbidden.
2648  *
2649  * Reason why upgrade within a process is forbidden is that
2650  * lock upgrade may cause dead lock. The following illustrates
2651  * how it happens.
2652  *
2653  *         thread on node1                             thread on node2
2654  * ocfs2_inode_lock_tracker(ex=0)
2655  *
2656  *                                <======   ocfs2_inode_lock_tracker(ex=1)
2657  *
2658  * ocfs2_inode_lock_tracker(ex=1)
2659  */
ocfs2_inode_lock_tracker(struct inode * inode,struct buffer_head ** ret_bh,int ex,struct ocfs2_lock_holder * oh)2660 int ocfs2_inode_lock_tracker(struct inode *inode,
2661 			     struct buffer_head **ret_bh,
2662 			     int ex,
2663 			     struct ocfs2_lock_holder *oh)
2664 {
2665 	int status = 0;
2666 	struct ocfs2_lock_res *lockres;
2667 	struct ocfs2_lock_holder *tmp_oh;
2668 	struct pid *pid = task_pid(current);
2669 
2670 
2671 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2672 	tmp_oh = ocfs2_pid_holder(lockres, pid);
2673 
2674 	if (!tmp_oh) {
2675 		/*
2676 		 * This corresponds to the case 1.
2677 		 * We haven't got any lock before.
2678 		 */
2679 		status = ocfs2_inode_lock_full(inode, ret_bh, ex, 0);
2680 		if (status < 0) {
2681 			if (status != -ENOENT)
2682 				mlog_errno(status);
2683 			return status;
2684 		}
2685 
2686 		oh->oh_ex = ex;
2687 		ocfs2_add_holder(lockres, oh);
2688 		return 0;
2689 	}
2690 
2691 	if (unlikely(ex && !tmp_oh->oh_ex)) {
2692 		/*
2693 		 * case 2.2 upgrade may cause dead lock, forbid it.
2694 		 */
2695 		mlog(ML_ERROR, "Recursive locking is not permitted to "
2696 		     "upgrade to EX level from PR level.\n");
2697 		dump_stack();
2698 		return -EINVAL;
2699 	}
2700 
2701 	/*
2702 	 *  case 2.1 OCFS2_META_LOCK_GETBH flag make ocfs2_inode_lock_full.
2703 	 *  ignore the lock level and just update it.
2704 	 */
2705 	if (ret_bh) {
2706 		status = ocfs2_inode_lock_full(inode, ret_bh, ex,
2707 					       OCFS2_META_LOCK_GETBH);
2708 		if (status < 0) {
2709 			if (status != -ENOENT)
2710 				mlog_errno(status);
2711 			return status;
2712 		}
2713 	}
2714 	return tmp_oh ? 1 : 0;
2715 }
2716 
ocfs2_inode_unlock_tracker(struct inode * inode,int ex,struct ocfs2_lock_holder * oh,int had_lock)2717 void ocfs2_inode_unlock_tracker(struct inode *inode,
2718 				int ex,
2719 				struct ocfs2_lock_holder *oh,
2720 				int had_lock)
2721 {
2722 	struct ocfs2_lock_res *lockres;
2723 
2724 	lockres = &OCFS2_I(inode)->ip_inode_lockres;
2725 	/* had_lock means that the currect process already takes the cluster
2726 	 * lock previously.
2727 	 * If had_lock is 1, we have nothing to do here.
2728 	 * If had_lock is 0, we will release the lock.
2729 	 */
2730 	if (!had_lock) {
2731 		ocfs2_inode_unlock(inode, oh->oh_ex);
2732 		ocfs2_remove_holder(lockres, oh);
2733 	}
2734 }
2735 
ocfs2_orphan_scan_lock(struct ocfs2_super * osb,u32 * seqno)2736 int ocfs2_orphan_scan_lock(struct ocfs2_super *osb, u32 *seqno)
2737 {
2738 	struct ocfs2_lock_res *lockres;
2739 	struct ocfs2_orphan_scan_lvb *lvb;
2740 	int status = 0;
2741 
2742 	if (ocfs2_is_hard_readonly(osb))
2743 		return -EROFS;
2744 
2745 	if (ocfs2_mount_local(osb))
2746 		return 0;
2747 
2748 	lockres = &osb->osb_orphan_scan.os_lockres;
2749 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2750 	if (status < 0)
2751 		return status;
2752 
2753 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2754 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2755 	    lvb->lvb_version == OCFS2_ORPHAN_LVB_VERSION)
2756 		*seqno = be32_to_cpu(lvb->lvb_os_seqno);
2757 	else
2758 		*seqno = osb->osb_orphan_scan.os_seqno + 1;
2759 
2760 	return status;
2761 }
2762 
ocfs2_orphan_scan_unlock(struct ocfs2_super * osb,u32 seqno)2763 void ocfs2_orphan_scan_unlock(struct ocfs2_super *osb, u32 seqno)
2764 {
2765 	struct ocfs2_lock_res *lockres;
2766 	struct ocfs2_orphan_scan_lvb *lvb;
2767 
2768 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb)) {
2769 		lockres = &osb->osb_orphan_scan.os_lockres;
2770 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2771 		lvb->lvb_version = OCFS2_ORPHAN_LVB_VERSION;
2772 		lvb->lvb_os_seqno = cpu_to_be32(seqno);
2773 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2774 	}
2775 }
2776 
ocfs2_super_lock(struct ocfs2_super * osb,int ex)2777 int ocfs2_super_lock(struct ocfs2_super *osb,
2778 		     int ex)
2779 {
2780 	int status = 0;
2781 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2782 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2783 
2784 	if (ocfs2_is_hard_readonly(osb))
2785 		return -EROFS;
2786 
2787 	if (ocfs2_mount_local(osb))
2788 		goto bail;
2789 
2790 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
2791 	if (status < 0) {
2792 		mlog_errno(status);
2793 		goto bail;
2794 	}
2795 
2796 	/* The super block lock path is really in the best position to
2797 	 * know when resources covered by the lock need to be
2798 	 * refreshed, so we do it here. Of course, making sense of
2799 	 * everything is up to the caller :) */
2800 	status = ocfs2_should_refresh_lock_res(lockres);
2801 	if (status) {
2802 		status = ocfs2_refresh_slot_info(osb);
2803 
2804 		ocfs2_complete_lock_res_refresh(lockres, status);
2805 
2806 		if (status < 0) {
2807 			ocfs2_cluster_unlock(osb, lockres, level);
2808 			mlog_errno(status);
2809 		}
2810 		ocfs2_track_lock_refresh(lockres);
2811 	}
2812 bail:
2813 	return status;
2814 }
2815 
ocfs2_super_unlock(struct ocfs2_super * osb,int ex)2816 void ocfs2_super_unlock(struct ocfs2_super *osb,
2817 			int ex)
2818 {
2819 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2820 	struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
2821 
2822 	if (!ocfs2_mount_local(osb))
2823 		ocfs2_cluster_unlock(osb, lockres, level);
2824 }
2825 
ocfs2_rename_lock(struct ocfs2_super * osb)2826 int ocfs2_rename_lock(struct ocfs2_super *osb)
2827 {
2828 	int status;
2829 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2830 
2831 	if (ocfs2_is_hard_readonly(osb))
2832 		return -EROFS;
2833 
2834 	if (ocfs2_mount_local(osb))
2835 		return 0;
2836 
2837 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX, 0, 0);
2838 	if (status < 0)
2839 		mlog_errno(status);
2840 
2841 	return status;
2842 }
2843 
ocfs2_rename_unlock(struct ocfs2_super * osb)2844 void ocfs2_rename_unlock(struct ocfs2_super *osb)
2845 {
2846 	struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
2847 
2848 	if (!ocfs2_mount_local(osb))
2849 		ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2850 }
2851 
ocfs2_nfs_sync_lock(struct ocfs2_super * osb,int ex)2852 int ocfs2_nfs_sync_lock(struct ocfs2_super *osb, int ex)
2853 {
2854 	int status;
2855 	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2856 
2857 	if (ocfs2_is_hard_readonly(osb))
2858 		return -EROFS;
2859 
2860 	if (ex)
2861 		down_write(&osb->nfs_sync_rwlock);
2862 	else
2863 		down_read(&osb->nfs_sync_rwlock);
2864 
2865 	if (ocfs2_mount_local(osb))
2866 		return 0;
2867 
2868 	status = ocfs2_cluster_lock(osb, lockres, ex ? LKM_EXMODE : LKM_PRMODE,
2869 				    0, 0);
2870 	if (status < 0) {
2871 		mlog(ML_ERROR, "lock on nfs sync lock failed %d\n", status);
2872 
2873 		if (ex)
2874 			up_write(&osb->nfs_sync_rwlock);
2875 		else
2876 			up_read(&osb->nfs_sync_rwlock);
2877 	}
2878 
2879 	return status;
2880 }
2881 
ocfs2_nfs_sync_unlock(struct ocfs2_super * osb,int ex)2882 void ocfs2_nfs_sync_unlock(struct ocfs2_super *osb, int ex)
2883 {
2884 	struct ocfs2_lock_res *lockres = &osb->osb_nfs_sync_lockres;
2885 
2886 	if (!ocfs2_mount_local(osb))
2887 		ocfs2_cluster_unlock(osb, lockres,
2888 				     ex ? LKM_EXMODE : LKM_PRMODE);
2889 	if (ex)
2890 		up_write(&osb->nfs_sync_rwlock);
2891 	else
2892 		up_read(&osb->nfs_sync_rwlock);
2893 }
2894 
ocfs2_trim_fs_lock(struct ocfs2_super * osb,struct ocfs2_trim_fs_info * info,int trylock)2895 int ocfs2_trim_fs_lock(struct ocfs2_super *osb,
2896 		       struct ocfs2_trim_fs_info *info, int trylock)
2897 {
2898 	int status;
2899 	struct ocfs2_trim_fs_lvb *lvb;
2900 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2901 
2902 	if (info)
2903 		info->tf_valid = 0;
2904 
2905 	if (ocfs2_is_hard_readonly(osb))
2906 		return -EROFS;
2907 
2908 	if (ocfs2_mount_local(osb))
2909 		return 0;
2910 
2911 	status = ocfs2_cluster_lock(osb, lockres, DLM_LOCK_EX,
2912 				    trylock ? DLM_LKF_NOQUEUE : 0, 0);
2913 	if (status < 0) {
2914 		if (status != -EAGAIN)
2915 			mlog_errno(status);
2916 		return status;
2917 	}
2918 
2919 	if (info) {
2920 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2921 		if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
2922 		    lvb->lvb_version == OCFS2_TRIMFS_LVB_VERSION) {
2923 			info->tf_valid = 1;
2924 			info->tf_success = lvb->lvb_success;
2925 			info->tf_nodenum = be32_to_cpu(lvb->lvb_nodenum);
2926 			info->tf_start = be64_to_cpu(lvb->lvb_start);
2927 			info->tf_len = be64_to_cpu(lvb->lvb_len);
2928 			info->tf_minlen = be64_to_cpu(lvb->lvb_minlen);
2929 			info->tf_trimlen = be64_to_cpu(lvb->lvb_trimlen);
2930 		}
2931 	}
2932 
2933 	return status;
2934 }
2935 
ocfs2_trim_fs_unlock(struct ocfs2_super * osb,struct ocfs2_trim_fs_info * info)2936 void ocfs2_trim_fs_unlock(struct ocfs2_super *osb,
2937 			  struct ocfs2_trim_fs_info *info)
2938 {
2939 	struct ocfs2_trim_fs_lvb *lvb;
2940 	struct ocfs2_lock_res *lockres = &osb->osb_trim_fs_lockres;
2941 
2942 	if (ocfs2_mount_local(osb))
2943 		return;
2944 
2945 	if (info) {
2946 		lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
2947 		lvb->lvb_version = OCFS2_TRIMFS_LVB_VERSION;
2948 		lvb->lvb_success = info->tf_success;
2949 		lvb->lvb_nodenum = cpu_to_be32(info->tf_nodenum);
2950 		lvb->lvb_start = cpu_to_be64(info->tf_start);
2951 		lvb->lvb_len = cpu_to_be64(info->tf_len);
2952 		lvb->lvb_minlen = cpu_to_be64(info->tf_minlen);
2953 		lvb->lvb_trimlen = cpu_to_be64(info->tf_trimlen);
2954 	}
2955 
2956 	ocfs2_cluster_unlock(osb, lockres, DLM_LOCK_EX);
2957 }
2958 
ocfs2_dentry_lock(struct dentry * dentry,int ex)2959 int ocfs2_dentry_lock(struct dentry *dentry, int ex)
2960 {
2961 	int ret;
2962 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2963 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2964 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2965 
2966 	BUG_ON(!dl);
2967 
2968 	if (ocfs2_is_hard_readonly(osb)) {
2969 		if (ex)
2970 			return -EROFS;
2971 		return 0;
2972 	}
2973 
2974 	if (ocfs2_mount_local(osb))
2975 		return 0;
2976 
2977 	ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
2978 	if (ret < 0)
2979 		mlog_errno(ret);
2980 
2981 	return ret;
2982 }
2983 
ocfs2_dentry_unlock(struct dentry * dentry,int ex)2984 void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
2985 {
2986 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
2987 	struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
2988 	struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
2989 
2990 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
2991 		ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
2992 }
2993 
2994 /* Reference counting of the dlm debug structure. We want this because
2995  * open references on the debug inodes can live on after a mount, so
2996  * we can't rely on the ocfs2_super to always exist. */
ocfs2_dlm_debug_free(struct kref * kref)2997 static void ocfs2_dlm_debug_free(struct kref *kref)
2998 {
2999 	struct ocfs2_dlm_debug *dlm_debug;
3000 
3001 	dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
3002 
3003 	kfree(dlm_debug);
3004 }
3005 
ocfs2_put_dlm_debug(struct ocfs2_dlm_debug * dlm_debug)3006 void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
3007 {
3008 	if (dlm_debug)
3009 		kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
3010 }
3011 
ocfs2_get_dlm_debug(struct ocfs2_dlm_debug * debug)3012 static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
3013 {
3014 	kref_get(&debug->d_refcnt);
3015 }
3016 
ocfs2_new_dlm_debug(void)3017 struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
3018 {
3019 	struct ocfs2_dlm_debug *dlm_debug;
3020 
3021 	dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
3022 	if (!dlm_debug) {
3023 		mlog_errno(-ENOMEM);
3024 		goto out;
3025 	}
3026 
3027 	kref_init(&dlm_debug->d_refcnt);
3028 	INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
3029 	dlm_debug->d_locking_state = NULL;
3030 out:
3031 	return dlm_debug;
3032 }
3033 
3034 /* Access to this is arbitrated for us via seq_file->sem. */
3035 struct ocfs2_dlm_seq_priv {
3036 	struct ocfs2_dlm_debug *p_dlm_debug;
3037 	struct ocfs2_lock_res p_iter_res;
3038 	struct ocfs2_lock_res p_tmp_res;
3039 };
3040 
ocfs2_dlm_next_res(struct ocfs2_lock_res * start,struct ocfs2_dlm_seq_priv * priv)3041 static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
3042 						 struct ocfs2_dlm_seq_priv *priv)
3043 {
3044 	struct ocfs2_lock_res *iter, *ret = NULL;
3045 	struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
3046 
3047 	assert_spin_locked(&ocfs2_dlm_tracking_lock);
3048 
3049 	list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
3050 		/* discover the head of the list */
3051 		if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
3052 			mlog(0, "End of list found, %p\n", ret);
3053 			break;
3054 		}
3055 
3056 		/* We track our "dummy" iteration lockres' by a NULL
3057 		 * l_ops field. */
3058 		if (iter->l_ops != NULL) {
3059 			ret = iter;
3060 			break;
3061 		}
3062 	}
3063 
3064 	return ret;
3065 }
3066 
ocfs2_dlm_seq_start(struct seq_file * m,loff_t * pos)3067 static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
3068 {
3069 	struct ocfs2_dlm_seq_priv *priv = m->private;
3070 	struct ocfs2_lock_res *iter;
3071 
3072 	spin_lock(&ocfs2_dlm_tracking_lock);
3073 	iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
3074 	if (iter) {
3075 		/* Since lockres' have the lifetime of their container
3076 		 * (which can be inodes, ocfs2_supers, etc) we want to
3077 		 * copy this out to a temporary lockres while still
3078 		 * under the spinlock. Obviously after this we can't
3079 		 * trust any pointers on the copy returned, but that's
3080 		 * ok as the information we want isn't typically held
3081 		 * in them. */
3082 		priv->p_tmp_res = *iter;
3083 		iter = &priv->p_tmp_res;
3084 	}
3085 	spin_unlock(&ocfs2_dlm_tracking_lock);
3086 
3087 	return iter;
3088 }
3089 
ocfs2_dlm_seq_stop(struct seq_file * m,void * v)3090 static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
3091 {
3092 }
3093 
ocfs2_dlm_seq_next(struct seq_file * m,void * v,loff_t * pos)3094 static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
3095 {
3096 	struct ocfs2_dlm_seq_priv *priv = m->private;
3097 	struct ocfs2_lock_res *iter = v;
3098 	struct ocfs2_lock_res *dummy = &priv->p_iter_res;
3099 
3100 	spin_lock(&ocfs2_dlm_tracking_lock);
3101 	iter = ocfs2_dlm_next_res(iter, priv);
3102 	list_del_init(&dummy->l_debug_list);
3103 	if (iter) {
3104 		list_add(&dummy->l_debug_list, &iter->l_debug_list);
3105 		priv->p_tmp_res = *iter;
3106 		iter = &priv->p_tmp_res;
3107 	}
3108 	spin_unlock(&ocfs2_dlm_tracking_lock);
3109 
3110 	return iter;
3111 }
3112 
3113 /*
3114  * Version is used by debugfs.ocfs2 to determine the format being used
3115  *
3116  * New in version 2
3117  *	- Lock stats printed
3118  * New in version 3
3119  *	- Max time in lock stats is in usecs (instead of nsecs)
3120  */
3121 #define OCFS2_DLM_DEBUG_STR_VERSION 3
ocfs2_dlm_seq_show(struct seq_file * m,void * v)3122 static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
3123 {
3124 	int i;
3125 	char *lvb;
3126 	struct ocfs2_lock_res *lockres = v;
3127 
3128 	if (!lockres)
3129 		return -EINVAL;
3130 
3131 	seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
3132 
3133 	if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
3134 		seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
3135 			   lockres->l_name,
3136 			   (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
3137 	else
3138 		seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
3139 
3140 	seq_printf(m, "%d\t"
3141 		   "0x%lx\t"
3142 		   "0x%x\t"
3143 		   "0x%x\t"
3144 		   "%u\t"
3145 		   "%u\t"
3146 		   "%d\t"
3147 		   "%d\t",
3148 		   lockres->l_level,
3149 		   lockres->l_flags,
3150 		   lockres->l_action,
3151 		   lockres->l_unlock_action,
3152 		   lockres->l_ro_holders,
3153 		   lockres->l_ex_holders,
3154 		   lockres->l_requested,
3155 		   lockres->l_blocking);
3156 
3157 	/* Dump the raw LVB */
3158 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
3159 	for(i = 0; i < DLM_LVB_LEN; i++)
3160 		seq_printf(m, "0x%x\t", lvb[i]);
3161 
3162 #ifdef CONFIG_OCFS2_FS_STATS
3163 # define lock_num_prmode(_l)		((_l)->l_lock_prmode.ls_gets)
3164 # define lock_num_exmode(_l)		((_l)->l_lock_exmode.ls_gets)
3165 # define lock_num_prmode_failed(_l)	((_l)->l_lock_prmode.ls_fail)
3166 # define lock_num_exmode_failed(_l)	((_l)->l_lock_exmode.ls_fail)
3167 # define lock_total_prmode(_l)		((_l)->l_lock_prmode.ls_total)
3168 # define lock_total_exmode(_l)		((_l)->l_lock_exmode.ls_total)
3169 # define lock_max_prmode(_l)		((_l)->l_lock_prmode.ls_max)
3170 # define lock_max_exmode(_l)		((_l)->l_lock_exmode.ls_max)
3171 # define lock_refresh(_l)		((_l)->l_lock_refresh)
3172 #else
3173 # define lock_num_prmode(_l)		(0)
3174 # define lock_num_exmode(_l)		(0)
3175 # define lock_num_prmode_failed(_l)	(0)
3176 # define lock_num_exmode_failed(_l)	(0)
3177 # define lock_total_prmode(_l)		(0ULL)
3178 # define lock_total_exmode(_l)		(0ULL)
3179 # define lock_max_prmode(_l)		(0)
3180 # define lock_max_exmode(_l)		(0)
3181 # define lock_refresh(_l)		(0)
3182 #endif
3183 	/* The following seq_print was added in version 2 of this output */
3184 	seq_printf(m, "%u\t"
3185 		   "%u\t"
3186 		   "%u\t"
3187 		   "%u\t"
3188 		   "%llu\t"
3189 		   "%llu\t"
3190 		   "%u\t"
3191 		   "%u\t"
3192 		   "%u\t",
3193 		   lock_num_prmode(lockres),
3194 		   lock_num_exmode(lockres),
3195 		   lock_num_prmode_failed(lockres),
3196 		   lock_num_exmode_failed(lockres),
3197 		   lock_total_prmode(lockres),
3198 		   lock_total_exmode(lockres),
3199 		   lock_max_prmode(lockres),
3200 		   lock_max_exmode(lockres),
3201 		   lock_refresh(lockres));
3202 
3203 	/* End the line */
3204 	seq_printf(m, "\n");
3205 	return 0;
3206 }
3207 
3208 static const struct seq_operations ocfs2_dlm_seq_ops = {
3209 	.start =	ocfs2_dlm_seq_start,
3210 	.stop =		ocfs2_dlm_seq_stop,
3211 	.next =		ocfs2_dlm_seq_next,
3212 	.show =		ocfs2_dlm_seq_show,
3213 };
3214 
ocfs2_dlm_debug_release(struct inode * inode,struct file * file)3215 static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
3216 {
3217 	struct seq_file *seq = file->private_data;
3218 	struct ocfs2_dlm_seq_priv *priv = seq->private;
3219 	struct ocfs2_lock_res *res = &priv->p_iter_res;
3220 
3221 	ocfs2_remove_lockres_tracking(res);
3222 	ocfs2_put_dlm_debug(priv->p_dlm_debug);
3223 	return seq_release_private(inode, file);
3224 }
3225 
ocfs2_dlm_debug_open(struct inode * inode,struct file * file)3226 static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
3227 {
3228 	struct ocfs2_dlm_seq_priv *priv;
3229 	struct ocfs2_super *osb;
3230 
3231 	priv = __seq_open_private(file, &ocfs2_dlm_seq_ops, sizeof(*priv));
3232 	if (!priv) {
3233 		mlog_errno(-ENOMEM);
3234 		return -ENOMEM;
3235 	}
3236 
3237 	osb = inode->i_private;
3238 	ocfs2_get_dlm_debug(osb->osb_dlm_debug);
3239 	priv->p_dlm_debug = osb->osb_dlm_debug;
3240 	INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
3241 
3242 	ocfs2_add_lockres_tracking(&priv->p_iter_res,
3243 				   priv->p_dlm_debug);
3244 
3245 	return 0;
3246 }
3247 
3248 static const struct file_operations ocfs2_dlm_debug_fops = {
3249 	.open =		ocfs2_dlm_debug_open,
3250 	.release =	ocfs2_dlm_debug_release,
3251 	.read =		seq_read,
3252 	.llseek =	seq_lseek,
3253 };
3254 
ocfs2_dlm_init_debug(struct ocfs2_super * osb)3255 static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
3256 {
3257 	int ret = 0;
3258 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3259 
3260 	dlm_debug->d_locking_state = debugfs_create_file("locking_state",
3261 							 S_IFREG|S_IRUSR,
3262 							 osb->osb_debug_root,
3263 							 osb,
3264 							 &ocfs2_dlm_debug_fops);
3265 	if (!dlm_debug->d_locking_state) {
3266 		ret = -EINVAL;
3267 		mlog(ML_ERROR,
3268 		     "Unable to create locking state debugfs file.\n");
3269 		goto out;
3270 	}
3271 
3272 	ocfs2_get_dlm_debug(dlm_debug);
3273 out:
3274 	return ret;
3275 }
3276 
ocfs2_dlm_shutdown_debug(struct ocfs2_super * osb)3277 static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
3278 {
3279 	struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
3280 
3281 	if (dlm_debug) {
3282 		debugfs_remove(dlm_debug->d_locking_state);
3283 		ocfs2_put_dlm_debug(dlm_debug);
3284 	}
3285 }
3286 
ocfs2_dlm_init(struct ocfs2_super * osb)3287 int ocfs2_dlm_init(struct ocfs2_super *osb)
3288 {
3289 	int status = 0;
3290 	struct ocfs2_cluster_connection *conn = NULL;
3291 
3292 	if (ocfs2_mount_local(osb)) {
3293 		osb->node_num = 0;
3294 		goto local;
3295 	}
3296 
3297 	status = ocfs2_dlm_init_debug(osb);
3298 	if (status < 0) {
3299 		mlog_errno(status);
3300 		goto bail;
3301 	}
3302 
3303 	/* launch downconvert thread */
3304 	osb->dc_task = kthread_run(ocfs2_downconvert_thread, osb, "ocfs2dc-%s",
3305 			osb->uuid_str);
3306 	if (IS_ERR(osb->dc_task)) {
3307 		status = PTR_ERR(osb->dc_task);
3308 		osb->dc_task = NULL;
3309 		mlog_errno(status);
3310 		goto bail;
3311 	}
3312 
3313 	/* for now, uuid == domain */
3314 	status = ocfs2_cluster_connect(osb->osb_cluster_stack,
3315 				       osb->osb_cluster_name,
3316 				       strlen(osb->osb_cluster_name),
3317 				       osb->uuid_str,
3318 				       strlen(osb->uuid_str),
3319 				       &lproto, ocfs2_do_node_down, osb,
3320 				       &conn);
3321 	if (status) {
3322 		mlog_errno(status);
3323 		goto bail;
3324 	}
3325 
3326 	status = ocfs2_cluster_this_node(conn, &osb->node_num);
3327 	if (status < 0) {
3328 		mlog_errno(status);
3329 		mlog(ML_ERROR,
3330 		     "could not find this host's node number\n");
3331 		ocfs2_cluster_disconnect(conn, 0);
3332 		goto bail;
3333 	}
3334 
3335 local:
3336 	ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
3337 	ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
3338 	ocfs2_nfs_sync_lock_init(osb);
3339 	ocfs2_orphan_scan_lock_res_init(&osb->osb_orphan_scan.os_lockres, osb);
3340 
3341 	osb->cconn = conn;
3342 bail:
3343 	if (status < 0) {
3344 		ocfs2_dlm_shutdown_debug(osb);
3345 		if (osb->dc_task)
3346 			kthread_stop(osb->dc_task);
3347 	}
3348 
3349 	return status;
3350 }
3351 
ocfs2_dlm_shutdown(struct ocfs2_super * osb,int hangup_pending)3352 void ocfs2_dlm_shutdown(struct ocfs2_super *osb,
3353 			int hangup_pending)
3354 {
3355 	ocfs2_drop_osb_locks(osb);
3356 
3357 	/*
3358 	 * Now that we have dropped all locks and ocfs2_dismount_volume()
3359 	 * has disabled recovery, the DLM won't be talking to us.  It's
3360 	 * safe to tear things down before disconnecting the cluster.
3361 	 */
3362 
3363 	if (osb->dc_task) {
3364 		kthread_stop(osb->dc_task);
3365 		osb->dc_task = NULL;
3366 	}
3367 
3368 	ocfs2_lock_res_free(&osb->osb_super_lockres);
3369 	ocfs2_lock_res_free(&osb->osb_rename_lockres);
3370 	ocfs2_lock_res_free(&osb->osb_nfs_sync_lockres);
3371 	ocfs2_lock_res_free(&osb->osb_orphan_scan.os_lockres);
3372 
3373 	ocfs2_cluster_disconnect(osb->cconn, hangup_pending);
3374 	osb->cconn = NULL;
3375 
3376 	ocfs2_dlm_shutdown_debug(osb);
3377 }
3378 
ocfs2_drop_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)3379 static int ocfs2_drop_lock(struct ocfs2_super *osb,
3380 			   struct ocfs2_lock_res *lockres)
3381 {
3382 	int ret;
3383 	unsigned long flags;
3384 	u32 lkm_flags = 0;
3385 
3386 	/* We didn't get anywhere near actually using this lockres. */
3387 	if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
3388 		goto out;
3389 
3390 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3391 		lkm_flags |= DLM_LKF_VALBLK;
3392 
3393 	spin_lock_irqsave(&lockres->l_lock, flags);
3394 
3395 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
3396 			"lockres %s, flags 0x%lx\n",
3397 			lockres->l_name, lockres->l_flags);
3398 
3399 	while (lockres->l_flags & OCFS2_LOCK_BUSY) {
3400 		mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
3401 		     "%u, unlock_action = %u\n",
3402 		     lockres->l_name, lockres->l_flags, lockres->l_action,
3403 		     lockres->l_unlock_action);
3404 
3405 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3406 
3407 		/* XXX: Today we just wait on any busy
3408 		 * locks... Perhaps we need to cancel converts in the
3409 		 * future? */
3410 		ocfs2_wait_on_busy_lock(lockres);
3411 
3412 		spin_lock_irqsave(&lockres->l_lock, flags);
3413 	}
3414 
3415 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3416 		if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
3417 		    lockres->l_level == DLM_LOCK_EX &&
3418 		    !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3419 			lockres->l_ops->set_lvb(lockres);
3420 	}
3421 
3422 	if (lockres->l_flags & OCFS2_LOCK_BUSY)
3423 		mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
3424 		     lockres->l_name);
3425 	if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
3426 		mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
3427 
3428 	if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
3429 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3430 		goto out;
3431 	}
3432 
3433 	lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
3434 
3435 	/* make sure we never get here while waiting for an ast to
3436 	 * fire. */
3437 	BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
3438 
3439 	/* is this necessary? */
3440 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3441 	lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
3442 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3443 
3444 	mlog(0, "lock %s\n", lockres->l_name);
3445 
3446 	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags);
3447 	if (ret) {
3448 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3449 		mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
3450 		ocfs2_dlm_dump_lksb(&lockres->l_lksb);
3451 		BUG();
3452 	}
3453 	mlog(0, "lock %s, successful return from ocfs2_dlm_unlock\n",
3454 	     lockres->l_name);
3455 
3456 	ocfs2_wait_on_busy_lock(lockres);
3457 out:
3458 	return 0;
3459 }
3460 
3461 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
3462 				       struct ocfs2_lock_res *lockres);
3463 
3464 /* Mark the lockres as being dropped. It will no longer be
3465  * queued if blocking, but we still may have to wait on it
3466  * being dequeued from the downconvert thread before we can consider
3467  * it safe to drop.
3468  *
3469  * You can *not* attempt to call cluster_lock on this lockres anymore. */
ocfs2_mark_lockres_freeing(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)3470 void ocfs2_mark_lockres_freeing(struct ocfs2_super *osb,
3471 				struct ocfs2_lock_res *lockres)
3472 {
3473 	int status;
3474 	struct ocfs2_mask_waiter mw;
3475 	unsigned long flags, flags2;
3476 
3477 	ocfs2_init_mask_waiter(&mw);
3478 
3479 	spin_lock_irqsave(&lockres->l_lock, flags);
3480 	lockres->l_flags |= OCFS2_LOCK_FREEING;
3481 	if (lockres->l_flags & OCFS2_LOCK_QUEUED && current == osb->dc_task) {
3482 		/*
3483 		 * We know the downconvert is queued but not in progress
3484 		 * because we are the downconvert thread and processing
3485 		 * different lock. So we can just remove the lock from the
3486 		 * queue. This is not only an optimization but also a way
3487 		 * to avoid the following deadlock:
3488 		 *   ocfs2_dentry_post_unlock()
3489 		 *     ocfs2_dentry_lock_put()
3490 		 *       ocfs2_drop_dentry_lock()
3491 		 *         iput()
3492 		 *           ocfs2_evict_inode()
3493 		 *             ocfs2_clear_inode()
3494 		 *               ocfs2_mark_lockres_freeing()
3495 		 *                 ... blocks waiting for OCFS2_LOCK_QUEUED
3496 		 *                 since we are the downconvert thread which
3497 		 *                 should clear the flag.
3498 		 */
3499 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3500 		spin_lock_irqsave(&osb->dc_task_lock, flags2);
3501 		list_del_init(&lockres->l_blocked_list);
3502 		osb->blocked_lock_count--;
3503 		spin_unlock_irqrestore(&osb->dc_task_lock, flags2);
3504 		/*
3505 		 * Warn if we recurse into another post_unlock call.  Strictly
3506 		 * speaking it isn't a problem but we need to be careful if
3507 		 * that happens (stack overflow, deadlocks, ...) so warn if
3508 		 * ocfs2 grows a path for which this can happen.
3509 		 */
3510 		WARN_ON_ONCE(lockres->l_ops->post_unlock);
3511 		/* Since the lock is freeing we don't do much in the fn below */
3512 		ocfs2_process_blocked_lock(osb, lockres);
3513 		return;
3514 	}
3515 	while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
3516 		lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
3517 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3518 
3519 		mlog(0, "Waiting on lockres %s\n", lockres->l_name);
3520 
3521 		status = ocfs2_wait_for_mask(&mw);
3522 		if (status)
3523 			mlog_errno(status);
3524 
3525 		spin_lock_irqsave(&lockres->l_lock, flags);
3526 	}
3527 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3528 }
3529 
ocfs2_simple_drop_lockres(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)3530 void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
3531 			       struct ocfs2_lock_res *lockres)
3532 {
3533 	int ret;
3534 
3535 	ocfs2_mark_lockres_freeing(osb, lockres);
3536 	ret = ocfs2_drop_lock(osb, lockres);
3537 	if (ret)
3538 		mlog_errno(ret);
3539 }
3540 
ocfs2_drop_osb_locks(struct ocfs2_super * osb)3541 static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
3542 {
3543 	ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
3544 	ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
3545 	ocfs2_simple_drop_lockres(osb, &osb->osb_nfs_sync_lockres);
3546 	ocfs2_simple_drop_lockres(osb, &osb->osb_orphan_scan.os_lockres);
3547 }
3548 
ocfs2_drop_inode_locks(struct inode * inode)3549 int ocfs2_drop_inode_locks(struct inode *inode)
3550 {
3551 	int status, err;
3552 
3553 	/* No need to call ocfs2_mark_lockres_freeing here -
3554 	 * ocfs2_clear_inode has done it for us. */
3555 
3556 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3557 			      &OCFS2_I(inode)->ip_open_lockres);
3558 	if (err < 0)
3559 		mlog_errno(err);
3560 
3561 	status = err;
3562 
3563 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3564 			      &OCFS2_I(inode)->ip_inode_lockres);
3565 	if (err < 0)
3566 		mlog_errno(err);
3567 	if (err < 0 && !status)
3568 		status = err;
3569 
3570 	err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
3571 			      &OCFS2_I(inode)->ip_rw_lockres);
3572 	if (err < 0)
3573 		mlog_errno(err);
3574 	if (err < 0 && !status)
3575 		status = err;
3576 
3577 	return status;
3578 }
3579 
ocfs2_prepare_downconvert(struct ocfs2_lock_res * lockres,int new_level)3580 static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
3581 					      int new_level)
3582 {
3583 	assert_spin_locked(&lockres->l_lock);
3584 
3585 	BUG_ON(lockres->l_blocking <= DLM_LOCK_NL);
3586 
3587 	if (lockres->l_level <= new_level) {
3588 		mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, "
3589 		     "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, "
3590 		     "block %d, pgen %d\n", lockres->l_name, lockres->l_level,
3591 		     new_level, list_empty(&lockres->l_blocked_list),
3592 		     list_empty(&lockres->l_mask_waiters), lockres->l_type,
3593 		     lockres->l_flags, lockres->l_ro_holders,
3594 		     lockres->l_ex_holders, lockres->l_action,
3595 		     lockres->l_unlock_action, lockres->l_requested,
3596 		     lockres->l_blocking, lockres->l_pending_gen);
3597 		BUG();
3598 	}
3599 
3600 	mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n",
3601 	     lockres->l_name, lockres->l_level, new_level, lockres->l_blocking);
3602 
3603 	lockres->l_action = OCFS2_AST_DOWNCONVERT;
3604 	lockres->l_requested = new_level;
3605 	lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
3606 	return lockres_set_pending(lockres);
3607 }
3608 
ocfs2_downconvert_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,int new_level,int lvb,unsigned int generation)3609 static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
3610 				  struct ocfs2_lock_res *lockres,
3611 				  int new_level,
3612 				  int lvb,
3613 				  unsigned int generation)
3614 {
3615 	int ret;
3616 	u32 dlm_flags = DLM_LKF_CONVERT;
3617 
3618 	mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name,
3619 	     lockres->l_level, new_level);
3620 
3621 	/*
3622 	 * On DLM_LKF_VALBLK, fsdlm behaves differently with o2cb. It always
3623 	 * expects DLM_LKF_VALBLK being set if the LKB has LVB, so that
3624 	 * we can recover correctly from node failure. Otherwise, we may get
3625 	 * invalid LVB in LKB, but without DLM_SBF_VALNOTVALID being set.
3626 	 */
3627 	if (ocfs2_userspace_stack(osb) &&
3628 	    lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
3629 		lvb = 1;
3630 
3631 	if (lvb)
3632 		dlm_flags |= DLM_LKF_VALBLK;
3633 
3634 	ret = ocfs2_dlm_lock(osb->cconn,
3635 			     new_level,
3636 			     &lockres->l_lksb,
3637 			     dlm_flags,
3638 			     lockres->l_name,
3639 			     OCFS2_LOCK_ID_MAX_LEN - 1);
3640 	lockres_clear_pending(lockres, generation, osb);
3641 	if (ret) {
3642 		ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres);
3643 		ocfs2_recover_from_dlm_error(lockres, 1);
3644 		goto bail;
3645 	}
3646 
3647 	ret = 0;
3648 bail:
3649 	return ret;
3650 }
3651 
3652 /* returns 1 when the caller should unlock and call ocfs2_dlm_unlock */
ocfs2_prepare_cancel_convert(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)3653 static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
3654 				        struct ocfs2_lock_res *lockres)
3655 {
3656 	assert_spin_locked(&lockres->l_lock);
3657 
3658 	if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
3659 		/* If we're already trying to cancel a lock conversion
3660 		 * then just drop the spinlock and allow the caller to
3661 		 * requeue this lock. */
3662 		mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name);
3663 		return 0;
3664 	}
3665 
3666 	/* were we in a convert when we got the bast fire? */
3667 	BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
3668 	       lockres->l_action != OCFS2_AST_DOWNCONVERT);
3669 	/* set things up for the unlockast to know to just
3670 	 * clear out the ast_action and unset busy, etc. */
3671 	lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
3672 
3673 	mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
3674 			"lock %s, invalid flags: 0x%lx\n",
3675 			lockres->l_name, lockres->l_flags);
3676 
3677 	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3678 
3679 	return 1;
3680 }
3681 
ocfs2_cancel_convert(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)3682 static int ocfs2_cancel_convert(struct ocfs2_super *osb,
3683 				struct ocfs2_lock_res *lockres)
3684 {
3685 	int ret;
3686 
3687 	ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb,
3688 			       DLM_LKF_CANCEL);
3689 	if (ret) {
3690 		ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres);
3691 		ocfs2_recover_from_dlm_error(lockres, 0);
3692 	}
3693 
3694 	mlog(ML_BASTS, "lockres %s\n", lockres->l_name);
3695 
3696 	return ret;
3697 }
3698 
ocfs2_unblock_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres,struct ocfs2_unblock_ctl * ctl)3699 static int ocfs2_unblock_lock(struct ocfs2_super *osb,
3700 			      struct ocfs2_lock_res *lockres,
3701 			      struct ocfs2_unblock_ctl *ctl)
3702 {
3703 	unsigned long flags;
3704 	int blocking;
3705 	int new_level;
3706 	int level;
3707 	int ret = 0;
3708 	int set_lvb = 0;
3709 	unsigned int gen;
3710 
3711 	spin_lock_irqsave(&lockres->l_lock, flags);
3712 
3713 recheck:
3714 	/*
3715 	 * Is it still blocking? If not, we have no more work to do.
3716 	 */
3717 	if (!(lockres->l_flags & OCFS2_LOCK_BLOCKED)) {
3718 		BUG_ON(lockres->l_blocking != DLM_LOCK_NL);
3719 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3720 		ret = 0;
3721 		goto leave;
3722 	}
3723 
3724 	if (lockres->l_flags & OCFS2_LOCK_BUSY) {
3725 		/* XXX
3726 		 * This is a *big* race.  The OCFS2_LOCK_PENDING flag
3727 		 * exists entirely for one reason - another thread has set
3728 		 * OCFS2_LOCK_BUSY, but has *NOT* yet called dlm_lock().
3729 		 *
3730 		 * If we do ocfs2_cancel_convert() before the other thread
3731 		 * calls dlm_lock(), our cancel will do nothing.  We will
3732 		 * get no ast, and we will have no way of knowing the
3733 		 * cancel failed.  Meanwhile, the other thread will call
3734 		 * into dlm_lock() and wait...forever.
3735 		 *
3736 		 * Why forever?  Because another node has asked for the
3737 		 * lock first; that's why we're here in unblock_lock().
3738 		 *
3739 		 * The solution is OCFS2_LOCK_PENDING.  When PENDING is
3740 		 * set, we just requeue the unblock.  Only when the other
3741 		 * thread has called dlm_lock() and cleared PENDING will
3742 		 * we then cancel their request.
3743 		 *
3744 		 * All callers of dlm_lock() must set OCFS2_DLM_PENDING
3745 		 * at the same time they set OCFS2_DLM_BUSY.  They must
3746 		 * clear OCFS2_DLM_PENDING after dlm_lock() returns.
3747 		 */
3748 		if (lockres->l_flags & OCFS2_LOCK_PENDING) {
3749 			mlog(ML_BASTS, "lockres %s, ReQ: Pending\n",
3750 			     lockres->l_name);
3751 			goto leave_requeue;
3752 		}
3753 
3754 		ctl->requeue = 1;
3755 		ret = ocfs2_prepare_cancel_convert(osb, lockres);
3756 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3757 		if (ret) {
3758 			ret = ocfs2_cancel_convert(osb, lockres);
3759 			if (ret < 0)
3760 				mlog_errno(ret);
3761 		}
3762 		goto leave;
3763 	}
3764 
3765 	/*
3766 	 * This prevents livelocks. OCFS2_LOCK_UPCONVERT_FINISHING flag is
3767 	 * set when the ast is received for an upconvert just before the
3768 	 * OCFS2_LOCK_BUSY flag is cleared. Now if the fs received a bast
3769 	 * on the heels of the ast, we want to delay the downconvert just
3770 	 * enough to allow the up requestor to do its task. Because this
3771 	 * lock is in the blocked queue, the lock will be downconverted
3772 	 * as soon as the requestor is done with the lock.
3773 	 */
3774 	if (lockres->l_flags & OCFS2_LOCK_UPCONVERT_FINISHING)
3775 		goto leave_requeue;
3776 
3777 	/*
3778 	 * How can we block and yet be at NL?  We were trying to upconvert
3779 	 * from NL and got canceled.  The code comes back here, and now
3780 	 * we notice and clear BLOCKING.
3781 	 */
3782 	if (lockres->l_level == DLM_LOCK_NL) {
3783 		BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders);
3784 		mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name);
3785 		lockres->l_blocking = DLM_LOCK_NL;
3786 		lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
3787 		spin_unlock_irqrestore(&lockres->l_lock, flags);
3788 		goto leave;
3789 	}
3790 
3791 	/* if we're blocking an exclusive and we have *any* holders,
3792 	 * then requeue. */
3793 	if ((lockres->l_blocking == DLM_LOCK_EX)
3794 	    && (lockres->l_ex_holders || lockres->l_ro_holders)) {
3795 		mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n",
3796 		     lockres->l_name, lockres->l_ex_holders,
3797 		     lockres->l_ro_holders);
3798 		goto leave_requeue;
3799 	}
3800 
3801 	/* If it's a PR we're blocking, then only
3802 	 * requeue if we've got any EX holders */
3803 	if (lockres->l_blocking == DLM_LOCK_PR &&
3804 	    lockres->l_ex_holders) {
3805 		mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n",
3806 		     lockres->l_name, lockres->l_ex_holders);
3807 		goto leave_requeue;
3808 	}
3809 
3810 	/*
3811 	 * Can we get a lock in this state if the holder counts are
3812 	 * zero? The meta data unblock code used to check this.
3813 	 */
3814 	if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
3815 	    && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) {
3816 		mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n",
3817 		     lockres->l_name);
3818 		goto leave_requeue;
3819 	}
3820 
3821 	new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
3822 
3823 	if (lockres->l_ops->check_downconvert
3824 	    && !lockres->l_ops->check_downconvert(lockres, new_level)) {
3825 		mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n",
3826 		     lockres->l_name);
3827 		goto leave_requeue;
3828 	}
3829 
3830 	/* If we get here, then we know that there are no more
3831 	 * incompatible holders (and anyone asking for an incompatible
3832 	 * lock is blocked). We can now downconvert the lock */
3833 	if (!lockres->l_ops->downconvert_worker)
3834 		goto downconvert;
3835 
3836 	/* Some lockres types want to do a bit of work before
3837 	 * downconverting a lock. Allow that here. The worker function
3838 	 * may sleep, so we save off a copy of what we're blocking as
3839 	 * it may change while we're not holding the spin lock. */
3840 	blocking = lockres->l_blocking;
3841 	level = lockres->l_level;
3842 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3843 
3844 	ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
3845 
3846 	if (ctl->unblock_action == UNBLOCK_STOP_POST) {
3847 		mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n",
3848 		     lockres->l_name);
3849 		goto leave;
3850 	}
3851 
3852 	spin_lock_irqsave(&lockres->l_lock, flags);
3853 	if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) {
3854 		/* If this changed underneath us, then we can't drop
3855 		 * it just yet. */
3856 		mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, "
3857 		     "Recheck\n", lockres->l_name, blocking,
3858 		     lockres->l_blocking, level, lockres->l_level);
3859 		goto recheck;
3860 	}
3861 
3862 downconvert:
3863 	ctl->requeue = 0;
3864 
3865 	if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
3866 		if (lockres->l_level == DLM_LOCK_EX)
3867 			set_lvb = 1;
3868 
3869 		/*
3870 		 * We only set the lvb if the lock has been fully
3871 		 * refreshed - otherwise we risk setting stale
3872 		 * data. Otherwise, there's no need to actually clear
3873 		 * out the lvb here as it's value is still valid.
3874 		 */
3875 		if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
3876 			lockres->l_ops->set_lvb(lockres);
3877 	}
3878 
3879 	gen = ocfs2_prepare_downconvert(lockres, new_level);
3880 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3881 	ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb,
3882 				     gen);
3883 
3884 leave:
3885 	if (ret)
3886 		mlog_errno(ret);
3887 	return ret;
3888 
3889 leave_requeue:
3890 	spin_unlock_irqrestore(&lockres->l_lock, flags);
3891 	ctl->requeue = 1;
3892 
3893 	return 0;
3894 }
3895 
ocfs2_data_convert_worker(struct ocfs2_lock_res * lockres,int blocking)3896 static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
3897 				     int blocking)
3898 {
3899 	struct inode *inode;
3900 	struct address_space *mapping;
3901 	struct ocfs2_inode_info *oi;
3902 
3903        	inode = ocfs2_lock_res_inode(lockres);
3904 	mapping = inode->i_mapping;
3905 
3906 	if (S_ISDIR(inode->i_mode)) {
3907 		oi = OCFS2_I(inode);
3908 		oi->ip_dir_lock_gen++;
3909 		mlog(0, "generation: %u\n", oi->ip_dir_lock_gen);
3910 		goto out_forget;
3911 	}
3912 
3913 	if (!S_ISREG(inode->i_mode))
3914 		goto out;
3915 
3916 	/*
3917 	 * We need this before the filemap_fdatawrite() so that it can
3918 	 * transfer the dirty bit from the PTE to the
3919 	 * page. Unfortunately this means that even for EX->PR
3920 	 * downconverts, we'll lose our mappings and have to build
3921 	 * them up again.
3922 	 */
3923 	unmap_mapping_range(mapping, 0, 0, 0);
3924 
3925 	if (filemap_fdatawrite(mapping)) {
3926 		mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
3927 		     (unsigned long long)OCFS2_I(inode)->ip_blkno);
3928 	}
3929 	sync_mapping_buffers(mapping);
3930 	if (blocking == DLM_LOCK_EX) {
3931 		truncate_inode_pages(mapping, 0);
3932 	} else {
3933 		/* We only need to wait on the I/O if we're not also
3934 		 * truncating pages because truncate_inode_pages waits
3935 		 * for us above. We don't truncate pages if we're
3936 		 * blocking anything < EXMODE because we want to keep
3937 		 * them around in that case. */
3938 		filemap_fdatawait(mapping);
3939 	}
3940 
3941 out_forget:
3942 	forget_all_cached_acls(inode);
3943 
3944 out:
3945 	return UNBLOCK_CONTINUE;
3946 }
3947 
ocfs2_ci_checkpointed(struct ocfs2_caching_info * ci,struct ocfs2_lock_res * lockres,int new_level)3948 static int ocfs2_ci_checkpointed(struct ocfs2_caching_info *ci,
3949 				 struct ocfs2_lock_res *lockres,
3950 				 int new_level)
3951 {
3952 	int checkpointed = ocfs2_ci_fully_checkpointed(ci);
3953 
3954 	BUG_ON(new_level != DLM_LOCK_NL && new_level != DLM_LOCK_PR);
3955 	BUG_ON(lockres->l_level != DLM_LOCK_EX && !checkpointed);
3956 
3957 	if (checkpointed)
3958 		return 1;
3959 
3960 	ocfs2_start_checkpoint(OCFS2_SB(ocfs2_metadata_cache_get_super(ci)));
3961 	return 0;
3962 }
3963 
ocfs2_check_meta_downconvert(struct ocfs2_lock_res * lockres,int new_level)3964 static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
3965 					int new_level)
3966 {
3967 	struct inode *inode = ocfs2_lock_res_inode(lockres);
3968 
3969 	return ocfs2_ci_checkpointed(INODE_CACHE(inode), lockres, new_level);
3970 }
3971 
ocfs2_set_meta_lvb(struct ocfs2_lock_res * lockres)3972 static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
3973 {
3974 	struct inode *inode = ocfs2_lock_res_inode(lockres);
3975 
3976 	__ocfs2_stuff_meta_lvb(inode);
3977 }
3978 
3979 /*
3980  * Does the final reference drop on our dentry lock. Right now this
3981  * happens in the downconvert thread, but we could choose to simplify the
3982  * dlmglue API and push these off to the ocfs2_wq in the future.
3983  */
ocfs2_dentry_post_unlock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)3984 static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
3985 				     struct ocfs2_lock_res *lockres)
3986 {
3987 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
3988 	ocfs2_dentry_lock_put(osb, dl);
3989 }
3990 
3991 /*
3992  * d_delete() matching dentries before the lock downconvert.
3993  *
3994  * At this point, any process waiting to destroy the
3995  * dentry_lock due to last ref count is stopped by the
3996  * OCFS2_LOCK_QUEUED flag.
3997  *
3998  * We have two potential problems
3999  *
4000  * 1) If we do the last reference drop on our dentry_lock (via dput)
4001  *    we'll wind up in ocfs2_release_dentry_lock(), waiting on
4002  *    the downconvert to finish. Instead we take an elevated
4003  *    reference and push the drop until after we've completed our
4004  *    unblock processing.
4005  *
4006  * 2) There might be another process with a final reference,
4007  *    waiting on us to finish processing. If this is the case, we
4008  *    detect it and exit out - there's no more dentries anyway.
4009  */
ocfs2_dentry_convert_worker(struct ocfs2_lock_res * lockres,int blocking)4010 static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
4011 				       int blocking)
4012 {
4013 	struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
4014 	struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
4015 	struct dentry *dentry;
4016 	unsigned long flags;
4017 	int extra_ref = 0;
4018 
4019 	/*
4020 	 * This node is blocking another node from getting a read
4021 	 * lock. This happens when we've renamed within a
4022 	 * directory. We've forced the other nodes to d_delete(), but
4023 	 * we never actually dropped our lock because it's still
4024 	 * valid. The downconvert code will retain a PR for this node,
4025 	 * so there's no further work to do.
4026 	 */
4027 	if (blocking == DLM_LOCK_PR)
4028 		return UNBLOCK_CONTINUE;
4029 
4030 	/*
4031 	 * Mark this inode as potentially orphaned. The code in
4032 	 * ocfs2_delete_inode() will figure out whether it actually
4033 	 * needs to be freed or not.
4034 	 */
4035 	spin_lock(&oi->ip_lock);
4036 	oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
4037 	spin_unlock(&oi->ip_lock);
4038 
4039 	/*
4040 	 * Yuck. We need to make sure however that the check of
4041 	 * OCFS2_LOCK_FREEING and the extra reference are atomic with
4042 	 * respect to a reference decrement or the setting of that
4043 	 * flag.
4044 	 */
4045 	spin_lock_irqsave(&lockres->l_lock, flags);
4046 	spin_lock(&dentry_attach_lock);
4047 	if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
4048 	    && dl->dl_count) {
4049 		dl->dl_count++;
4050 		extra_ref = 1;
4051 	}
4052 	spin_unlock(&dentry_attach_lock);
4053 	spin_unlock_irqrestore(&lockres->l_lock, flags);
4054 
4055 	mlog(0, "extra_ref = %d\n", extra_ref);
4056 
4057 	/*
4058 	 * We have a process waiting on us in ocfs2_dentry_iput(),
4059 	 * which means we can't have any more outstanding
4060 	 * aliases. There's no need to do any more work.
4061 	 */
4062 	if (!extra_ref)
4063 		return UNBLOCK_CONTINUE;
4064 
4065 	spin_lock(&dentry_attach_lock);
4066 	while (1) {
4067 		dentry = ocfs2_find_local_alias(dl->dl_inode,
4068 						dl->dl_parent_blkno, 1);
4069 		if (!dentry)
4070 			break;
4071 		spin_unlock(&dentry_attach_lock);
4072 
4073 		if (S_ISDIR(dl->dl_inode->i_mode))
4074 			shrink_dcache_parent(dentry);
4075 
4076 		mlog(0, "d_delete(%pd);\n", dentry);
4077 
4078 		/*
4079 		 * The following dcache calls may do an
4080 		 * iput(). Normally we don't want that from the
4081 		 * downconverting thread, but in this case it's ok
4082 		 * because the requesting node already has an
4083 		 * exclusive lock on the inode, so it can't be queued
4084 		 * for a downconvert.
4085 		 */
4086 		d_delete(dentry);
4087 		dput(dentry);
4088 
4089 		spin_lock(&dentry_attach_lock);
4090 	}
4091 	spin_unlock(&dentry_attach_lock);
4092 
4093 	/*
4094 	 * If we are the last holder of this dentry lock, there is no
4095 	 * reason to downconvert so skip straight to the unlock.
4096 	 */
4097 	if (dl->dl_count == 1)
4098 		return UNBLOCK_STOP_POST;
4099 
4100 	return UNBLOCK_CONTINUE_POST;
4101 }
4102 
ocfs2_check_refcount_downconvert(struct ocfs2_lock_res * lockres,int new_level)4103 static int ocfs2_check_refcount_downconvert(struct ocfs2_lock_res *lockres,
4104 					    int new_level)
4105 {
4106 	struct ocfs2_refcount_tree *tree =
4107 				ocfs2_lock_res_refcount_tree(lockres);
4108 
4109 	return ocfs2_ci_checkpointed(&tree->rf_ci, lockres, new_level);
4110 }
4111 
ocfs2_refcount_convert_worker(struct ocfs2_lock_res * lockres,int blocking)4112 static int ocfs2_refcount_convert_worker(struct ocfs2_lock_res *lockres,
4113 					 int blocking)
4114 {
4115 	struct ocfs2_refcount_tree *tree =
4116 				ocfs2_lock_res_refcount_tree(lockres);
4117 
4118 	ocfs2_metadata_cache_purge(&tree->rf_ci);
4119 
4120 	return UNBLOCK_CONTINUE;
4121 }
4122 
ocfs2_set_qinfo_lvb(struct ocfs2_lock_res * lockres)4123 static void ocfs2_set_qinfo_lvb(struct ocfs2_lock_res *lockres)
4124 {
4125 	struct ocfs2_qinfo_lvb *lvb;
4126 	struct ocfs2_mem_dqinfo *oinfo = ocfs2_lock_res_qinfo(lockres);
4127 	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4128 					    oinfo->dqi_gi.dqi_type);
4129 
4130 	lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4131 	lvb->lvb_version = OCFS2_QINFO_LVB_VERSION;
4132 	lvb->lvb_bgrace = cpu_to_be32(info->dqi_bgrace);
4133 	lvb->lvb_igrace = cpu_to_be32(info->dqi_igrace);
4134 	lvb->lvb_syncms = cpu_to_be32(oinfo->dqi_syncms);
4135 	lvb->lvb_blocks = cpu_to_be32(oinfo->dqi_gi.dqi_blocks);
4136 	lvb->lvb_free_blk = cpu_to_be32(oinfo->dqi_gi.dqi_free_blk);
4137 	lvb->lvb_free_entry = cpu_to_be32(oinfo->dqi_gi.dqi_free_entry);
4138 }
4139 
ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo * oinfo,int ex)4140 void ocfs2_qinfo_unlock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4141 {
4142 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4143 	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4144 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4145 
4146 	if (!ocfs2_is_hard_readonly(osb) && !ocfs2_mount_local(osb))
4147 		ocfs2_cluster_unlock(osb, lockres, level);
4148 }
4149 
ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo * oinfo)4150 static int ocfs2_refresh_qinfo(struct ocfs2_mem_dqinfo *oinfo)
4151 {
4152 	struct mem_dqinfo *info = sb_dqinfo(oinfo->dqi_gi.dqi_sb,
4153 					    oinfo->dqi_gi.dqi_type);
4154 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4155 	struct ocfs2_qinfo_lvb *lvb = ocfs2_dlm_lvb(&lockres->l_lksb);
4156 	struct buffer_head *bh = NULL;
4157 	struct ocfs2_global_disk_dqinfo *gdinfo;
4158 	int status = 0;
4159 
4160 	if (ocfs2_dlm_lvb_valid(&lockres->l_lksb) &&
4161 	    lvb->lvb_version == OCFS2_QINFO_LVB_VERSION) {
4162 		info->dqi_bgrace = be32_to_cpu(lvb->lvb_bgrace);
4163 		info->dqi_igrace = be32_to_cpu(lvb->lvb_igrace);
4164 		oinfo->dqi_syncms = be32_to_cpu(lvb->lvb_syncms);
4165 		oinfo->dqi_gi.dqi_blocks = be32_to_cpu(lvb->lvb_blocks);
4166 		oinfo->dqi_gi.dqi_free_blk = be32_to_cpu(lvb->lvb_free_blk);
4167 		oinfo->dqi_gi.dqi_free_entry =
4168 					be32_to_cpu(lvb->lvb_free_entry);
4169 	} else {
4170 		status = ocfs2_read_quota_phys_block(oinfo->dqi_gqinode,
4171 						     oinfo->dqi_giblk, &bh);
4172 		if (status) {
4173 			mlog_errno(status);
4174 			goto bail;
4175 		}
4176 		gdinfo = (struct ocfs2_global_disk_dqinfo *)
4177 					(bh->b_data + OCFS2_GLOBAL_INFO_OFF);
4178 		info->dqi_bgrace = le32_to_cpu(gdinfo->dqi_bgrace);
4179 		info->dqi_igrace = le32_to_cpu(gdinfo->dqi_igrace);
4180 		oinfo->dqi_syncms = le32_to_cpu(gdinfo->dqi_syncms);
4181 		oinfo->dqi_gi.dqi_blocks = le32_to_cpu(gdinfo->dqi_blocks);
4182 		oinfo->dqi_gi.dqi_free_blk = le32_to_cpu(gdinfo->dqi_free_blk);
4183 		oinfo->dqi_gi.dqi_free_entry =
4184 					le32_to_cpu(gdinfo->dqi_free_entry);
4185 		brelse(bh);
4186 		ocfs2_track_lock_refresh(lockres);
4187 	}
4188 
4189 bail:
4190 	return status;
4191 }
4192 
4193 /* Lock quota info, this function expects at least shared lock on the quota file
4194  * so that we can safely refresh quota info from disk. */
ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo * oinfo,int ex)4195 int ocfs2_qinfo_lock(struct ocfs2_mem_dqinfo *oinfo, int ex)
4196 {
4197 	struct ocfs2_lock_res *lockres = &oinfo->dqi_gqlock;
4198 	struct ocfs2_super *osb = OCFS2_SB(oinfo->dqi_gi.dqi_sb);
4199 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4200 	int status = 0;
4201 
4202 	/* On RO devices, locking really isn't needed... */
4203 	if (ocfs2_is_hard_readonly(osb)) {
4204 		if (ex)
4205 			status = -EROFS;
4206 		goto bail;
4207 	}
4208 	if (ocfs2_mount_local(osb))
4209 		goto bail;
4210 
4211 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4212 	if (status < 0) {
4213 		mlog_errno(status);
4214 		goto bail;
4215 	}
4216 	if (!ocfs2_should_refresh_lock_res(lockres))
4217 		goto bail;
4218 	/* OK, we have the lock but we need to refresh the quota info */
4219 	status = ocfs2_refresh_qinfo(oinfo);
4220 	if (status)
4221 		ocfs2_qinfo_unlock(oinfo, ex);
4222 	ocfs2_complete_lock_res_refresh(lockres, status);
4223 bail:
4224 	return status;
4225 }
4226 
ocfs2_refcount_lock(struct ocfs2_refcount_tree * ref_tree,int ex)4227 int ocfs2_refcount_lock(struct ocfs2_refcount_tree *ref_tree, int ex)
4228 {
4229 	int status;
4230 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4231 	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4232 	struct ocfs2_super *osb = lockres->l_priv;
4233 
4234 
4235 	if (ocfs2_is_hard_readonly(osb))
4236 		return -EROFS;
4237 
4238 	if (ocfs2_mount_local(osb))
4239 		return 0;
4240 
4241 	status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
4242 	if (status < 0)
4243 		mlog_errno(status);
4244 
4245 	return status;
4246 }
4247 
ocfs2_refcount_unlock(struct ocfs2_refcount_tree * ref_tree,int ex)4248 void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex)
4249 {
4250 	int level = ex ? DLM_LOCK_EX : DLM_LOCK_PR;
4251 	struct ocfs2_lock_res *lockres = &ref_tree->rf_lockres;
4252 	struct ocfs2_super *osb = lockres->l_priv;
4253 
4254 	if (!ocfs2_mount_local(osb))
4255 		ocfs2_cluster_unlock(osb, lockres, level);
4256 }
4257 
ocfs2_process_blocked_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)4258 static void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
4259 				       struct ocfs2_lock_res *lockres)
4260 {
4261 	int status;
4262 	struct ocfs2_unblock_ctl ctl = {0, 0,};
4263 	unsigned long flags;
4264 
4265 	/* Our reference to the lockres in this function can be
4266 	 * considered valid until we remove the OCFS2_LOCK_QUEUED
4267 	 * flag. */
4268 
4269 	BUG_ON(!lockres);
4270 	BUG_ON(!lockres->l_ops);
4271 
4272 	mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name);
4273 
4274 	/* Detect whether a lock has been marked as going away while
4275 	 * the downconvert thread was processing other things. A lock can
4276 	 * still be marked with OCFS2_LOCK_FREEING after this check,
4277 	 * but short circuiting here will still save us some
4278 	 * performance. */
4279 	spin_lock_irqsave(&lockres->l_lock, flags);
4280 	if (lockres->l_flags & OCFS2_LOCK_FREEING)
4281 		goto unqueue;
4282 	spin_unlock_irqrestore(&lockres->l_lock, flags);
4283 
4284 	status = ocfs2_unblock_lock(osb, lockres, &ctl);
4285 	if (status < 0)
4286 		mlog_errno(status);
4287 
4288 	spin_lock_irqsave(&lockres->l_lock, flags);
4289 unqueue:
4290 	if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
4291 		lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
4292 	} else
4293 		ocfs2_schedule_blocked_lock(osb, lockres);
4294 
4295 	mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name,
4296 	     ctl.requeue ? "yes" : "no");
4297 	spin_unlock_irqrestore(&lockres->l_lock, flags);
4298 
4299 	if (ctl.unblock_action != UNBLOCK_CONTINUE
4300 	    && lockres->l_ops->post_unlock)
4301 		lockres->l_ops->post_unlock(osb, lockres);
4302 }
4303 
ocfs2_schedule_blocked_lock(struct ocfs2_super * osb,struct ocfs2_lock_res * lockres)4304 static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
4305 					struct ocfs2_lock_res *lockres)
4306 {
4307 	unsigned long flags;
4308 
4309 	assert_spin_locked(&lockres->l_lock);
4310 
4311 	if (lockres->l_flags & OCFS2_LOCK_FREEING) {
4312 		/* Do not schedule a lock for downconvert when it's on
4313 		 * the way to destruction - any nodes wanting access
4314 		 * to the resource will get it soon. */
4315 		mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n",
4316 		     lockres->l_name, lockres->l_flags);
4317 		return;
4318 	}
4319 
4320 	lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
4321 
4322 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4323 	if (list_empty(&lockres->l_blocked_list)) {
4324 		list_add_tail(&lockres->l_blocked_list,
4325 			      &osb->blocked_lock_list);
4326 		osb->blocked_lock_count++;
4327 	}
4328 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4329 }
4330 
ocfs2_downconvert_thread_do_work(struct ocfs2_super * osb)4331 static void ocfs2_downconvert_thread_do_work(struct ocfs2_super *osb)
4332 {
4333 	unsigned long processed;
4334 	unsigned long flags;
4335 	struct ocfs2_lock_res *lockres;
4336 
4337 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4338 	/* grab this early so we know to try again if a state change and
4339 	 * wake happens part-way through our work  */
4340 	osb->dc_work_sequence = osb->dc_wake_sequence;
4341 
4342 	processed = osb->blocked_lock_count;
4343 	/*
4344 	 * blocked lock processing in this loop might call iput which can
4345 	 * remove items off osb->blocked_lock_list. Downconvert up to
4346 	 * 'processed' number of locks, but stop short if we had some
4347 	 * removed in ocfs2_mark_lockres_freeing when downconverting.
4348 	 */
4349 	while (processed && !list_empty(&osb->blocked_lock_list)) {
4350 		lockres = list_entry(osb->blocked_lock_list.next,
4351 				     struct ocfs2_lock_res, l_blocked_list);
4352 		list_del_init(&lockres->l_blocked_list);
4353 		osb->blocked_lock_count--;
4354 		spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4355 
4356 		BUG_ON(!processed);
4357 		processed--;
4358 
4359 		ocfs2_process_blocked_lock(osb, lockres);
4360 
4361 		spin_lock_irqsave(&osb->dc_task_lock, flags);
4362 	}
4363 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4364 }
4365 
ocfs2_downconvert_thread_lists_empty(struct ocfs2_super * osb)4366 static int ocfs2_downconvert_thread_lists_empty(struct ocfs2_super *osb)
4367 {
4368 	int empty = 0;
4369 	unsigned long flags;
4370 
4371 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4372 	if (list_empty(&osb->blocked_lock_list))
4373 		empty = 1;
4374 
4375 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4376 	return empty;
4377 }
4378 
ocfs2_downconvert_thread_should_wake(struct ocfs2_super * osb)4379 static int ocfs2_downconvert_thread_should_wake(struct ocfs2_super *osb)
4380 {
4381 	int should_wake = 0;
4382 	unsigned long flags;
4383 
4384 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4385 	if (osb->dc_work_sequence != osb->dc_wake_sequence)
4386 		should_wake = 1;
4387 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4388 
4389 	return should_wake;
4390 }
4391 
ocfs2_downconvert_thread(void * arg)4392 static int ocfs2_downconvert_thread(void *arg)
4393 {
4394 	int status = 0;
4395 	struct ocfs2_super *osb = arg;
4396 
4397 	/* only quit once we've been asked to stop and there is no more
4398 	 * work available */
4399 	while (!(kthread_should_stop() &&
4400 		ocfs2_downconvert_thread_lists_empty(osb))) {
4401 
4402 		wait_event_interruptible(osb->dc_event,
4403 					 ocfs2_downconvert_thread_should_wake(osb) ||
4404 					 kthread_should_stop());
4405 
4406 		mlog(0, "downconvert_thread: awoken\n");
4407 
4408 		ocfs2_downconvert_thread_do_work(osb);
4409 	}
4410 
4411 	osb->dc_task = NULL;
4412 	return status;
4413 }
4414 
ocfs2_wake_downconvert_thread(struct ocfs2_super * osb)4415 void ocfs2_wake_downconvert_thread(struct ocfs2_super *osb)
4416 {
4417 	unsigned long flags;
4418 
4419 	spin_lock_irqsave(&osb->dc_task_lock, flags);
4420 	/* make sure the voting thread gets a swipe at whatever changes
4421 	 * the caller may have made to the voting state */
4422 	osb->dc_wake_sequence++;
4423 	spin_unlock_irqrestore(&osb->dc_task_lock, flags);
4424 	wake_up(&osb->dc_event);
4425 }
4426