1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "dir.h"
18 #include "ast.h"
19 #include "recover.h"
20 #include "lowcomms.h"
21 #include "lock.h"
22 #include "requestqueue.h"
23 #include "recoverd.h"
24 
25 
26 /* If the start for which we're re-enabling locking (seq) has been superseded
27    by a newer stop (ls_recover_seq), we need to leave locking disabled.
28 
29    We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees
30    locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
31    enables locking and clears the requestqueue between a and b. */
32 
enable_locking(struct dlm_ls * ls,uint64_t seq)33 static int enable_locking(struct dlm_ls *ls, uint64_t seq)
34 {
35 	int error = -EINTR;
36 
37 	down_write(&ls->ls_recv_active);
38 
39 	spin_lock(&ls->ls_recover_lock);
40 	if (ls->ls_recover_seq == seq) {
41 		set_bit(LSFL_RUNNING, &ls->ls_flags);
42 		/* unblocks processes waiting to enter the dlm */
43 		up_write(&ls->ls_in_recovery);
44 		clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
45 		error = 0;
46 	}
47 	spin_unlock(&ls->ls_recover_lock);
48 
49 	up_write(&ls->ls_recv_active);
50 	return error;
51 }
52 
ls_recover(struct dlm_ls * ls,struct dlm_recover * rv)53 static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
54 {
55 	unsigned long start;
56 	int error, neg = 0;
57 
58 	log_rinfo(ls, "dlm_recover %llu", (unsigned long long)rv->seq);
59 
60 	mutex_lock(&ls->ls_recoverd_active);
61 
62 	dlm_callback_suspend(ls);
63 
64 	dlm_clear_toss(ls);
65 
66 	/*
67 	 * This list of root rsb's will be the basis of most of the recovery
68 	 * routines.
69 	 */
70 
71 	dlm_create_root_list(ls);
72 
73 	/*
74 	 * Add or remove nodes from the lockspace's ls_nodes list.
75 	 */
76 
77 	error = dlm_recover_members(ls, rv, &neg);
78 	if (error) {
79 		log_rinfo(ls, "dlm_recover_members error %d", error);
80 		goto fail;
81 	}
82 
83 	dlm_recover_dir_nodeid(ls);
84 
85 	ls->ls_recover_dir_sent_res = 0;
86 	ls->ls_recover_dir_sent_msg = 0;
87 	ls->ls_recover_locks_in = 0;
88 
89 	dlm_set_recover_status(ls, DLM_RS_NODES);
90 
91 	error = dlm_recover_members_wait(ls);
92 	if (error) {
93 		log_rinfo(ls, "dlm_recover_members_wait error %d", error);
94 		goto fail;
95 	}
96 
97 	start = jiffies;
98 
99 	/*
100 	 * Rebuild our own share of the directory by collecting from all other
101 	 * nodes their master rsb names that hash to us.
102 	 */
103 
104 	error = dlm_recover_directory(ls);
105 	if (error) {
106 		log_rinfo(ls, "dlm_recover_directory error %d", error);
107 		goto fail;
108 	}
109 
110 	dlm_set_recover_status(ls, DLM_RS_DIR);
111 
112 	error = dlm_recover_directory_wait(ls);
113 	if (error) {
114 		log_rinfo(ls, "dlm_recover_directory_wait error %d", error);
115 		goto fail;
116 	}
117 
118 	log_rinfo(ls, "dlm_recover_directory %u out %u messages",
119 		  ls->ls_recover_dir_sent_res, ls->ls_recover_dir_sent_msg);
120 
121 	/*
122 	 * We may have outstanding operations that are waiting for a reply from
123 	 * a failed node.  Mark these to be resent after recovery.  Unlock and
124 	 * cancel ops can just be completed.
125 	 */
126 
127 	dlm_recover_waiters_pre(ls);
128 
129 	error = dlm_recovery_stopped(ls);
130 	if (error)
131 		goto fail;
132 
133 	if (neg || dlm_no_directory(ls)) {
134 		/*
135 		 * Clear lkb's for departed nodes.
136 		 */
137 
138 		dlm_recover_purge(ls);
139 
140 		/*
141 		 * Get new master nodeid's for rsb's that were mastered on
142 		 * departed nodes.
143 		 */
144 
145 		error = dlm_recover_masters(ls);
146 		if (error) {
147 			log_rinfo(ls, "dlm_recover_masters error %d", error);
148 			goto fail;
149 		}
150 
151 		/*
152 		 * Send our locks on remastered rsb's to the new masters.
153 		 */
154 
155 		error = dlm_recover_locks(ls);
156 		if (error) {
157 			log_rinfo(ls, "dlm_recover_locks error %d", error);
158 			goto fail;
159 		}
160 
161 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
162 
163 		error = dlm_recover_locks_wait(ls);
164 		if (error) {
165 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
166 			goto fail;
167 		}
168 
169 		log_rinfo(ls, "dlm_recover_locks %u in",
170 			  ls->ls_recover_locks_in);
171 
172 		/*
173 		 * Finalize state in master rsb's now that all locks can be
174 		 * checked.  This includes conversion resolution and lvb
175 		 * settings.
176 		 */
177 
178 		dlm_recover_rsbs(ls);
179 	} else {
180 		/*
181 		 * Other lockspace members may be going through the "neg" steps
182 		 * while also adding us to the lockspace, in which case they'll
183 		 * be doing the recover_locks (RS_LOCKS) barrier.
184 		 */
185 		dlm_set_recover_status(ls, DLM_RS_LOCKS);
186 
187 		error = dlm_recover_locks_wait(ls);
188 		if (error) {
189 			log_rinfo(ls, "dlm_recover_locks_wait error %d", error);
190 			goto fail;
191 		}
192 	}
193 
194 	dlm_release_root_list(ls);
195 
196 	/*
197 	 * Purge directory-related requests that are saved in requestqueue.
198 	 * All dir requests from before recovery are invalid now due to the dir
199 	 * rebuild and will be resent by the requesting nodes.
200 	 */
201 
202 	dlm_purge_requestqueue(ls);
203 
204 	dlm_set_recover_status(ls, DLM_RS_DONE);
205 
206 	error = dlm_recover_done_wait(ls);
207 	if (error) {
208 		log_rinfo(ls, "dlm_recover_done_wait error %d", error);
209 		goto fail;
210 	}
211 
212 	dlm_clear_members_gone(ls);
213 
214 	dlm_adjust_timeouts(ls);
215 
216 	dlm_callback_resume(ls);
217 
218 	error = enable_locking(ls, rv->seq);
219 	if (error) {
220 		log_rinfo(ls, "enable_locking error %d", error);
221 		goto fail;
222 	}
223 
224 	error = dlm_process_requestqueue(ls);
225 	if (error) {
226 		log_rinfo(ls, "dlm_process_requestqueue error %d", error);
227 		goto fail;
228 	}
229 
230 	error = dlm_recover_waiters_post(ls);
231 	if (error) {
232 		log_rinfo(ls, "dlm_recover_waiters_post error %d", error);
233 		goto fail;
234 	}
235 
236 	dlm_recover_grant(ls);
237 
238 	log_rinfo(ls, "dlm_recover %llu generation %u done: %u ms",
239 		  (unsigned long long)rv->seq, ls->ls_generation,
240 		  jiffies_to_msecs(jiffies - start));
241 	mutex_unlock(&ls->ls_recoverd_active);
242 
243 	dlm_lsop_recover_done(ls);
244 	return 0;
245 
246  fail:
247 	dlm_release_root_list(ls);
248 	log_rinfo(ls, "dlm_recover %llu error %d",
249 		  (unsigned long long)rv->seq, error);
250 	mutex_unlock(&ls->ls_recoverd_active);
251 	return error;
252 }
253 
254 /* The dlm_ls_start() that created the rv we take here may already have been
255    stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
256    flag set. */
257 
do_ls_recovery(struct dlm_ls * ls)258 static void do_ls_recovery(struct dlm_ls *ls)
259 {
260 	struct dlm_recover *rv = NULL;
261 
262 	spin_lock(&ls->ls_recover_lock);
263 	rv = ls->ls_recover_args;
264 	ls->ls_recover_args = NULL;
265 	if (rv && ls->ls_recover_seq == rv->seq)
266 		clear_bit(LSFL_RECOVER_STOP, &ls->ls_flags);
267 	spin_unlock(&ls->ls_recover_lock);
268 
269 	if (rv) {
270 		ls_recover(ls, rv);
271 		kfree(rv->nodes);
272 		kfree(rv);
273 	}
274 }
275 
dlm_recoverd(void * arg)276 static int dlm_recoverd(void *arg)
277 {
278 	struct dlm_ls *ls;
279 
280 	ls = dlm_find_lockspace_local(arg);
281 	if (!ls) {
282 		log_print("dlm_recoverd: no lockspace %p", arg);
283 		return -1;
284 	}
285 
286 	down_write(&ls->ls_in_recovery);
287 	set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
288 	wake_up(&ls->ls_recover_lock_wait);
289 
290 	while (1) {
291 		/*
292 		 * We call kthread_should_stop() after set_current_state().
293 		 * This is because it works correctly if kthread_stop() is
294 		 * called just before set_current_state().
295 		 */
296 		set_current_state(TASK_INTERRUPTIBLE);
297 		if (kthread_should_stop()) {
298 			set_current_state(TASK_RUNNING);
299 			break;
300 		}
301 		if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
302 		    !test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
303 			if (kthread_should_stop())
304 				break;
305 			schedule();
306 		}
307 		set_current_state(TASK_RUNNING);
308 
309 		if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
310 			down_write(&ls->ls_in_recovery);
311 			set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
312 			wake_up(&ls->ls_recover_lock_wait);
313 		}
314 
315 		if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
316 			do_ls_recovery(ls);
317 	}
318 
319 	if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
320 		up_write(&ls->ls_in_recovery);
321 
322 	dlm_put_lockspace(ls);
323 	return 0;
324 }
325 
dlm_recoverd_start(struct dlm_ls * ls)326 int dlm_recoverd_start(struct dlm_ls *ls)
327 {
328 	struct task_struct *p;
329 	int error = 0;
330 
331 	p = kthread_run(dlm_recoverd, ls, "dlm_recoverd");
332 	if (IS_ERR(p))
333 		error = PTR_ERR(p);
334 	else
335                 ls->ls_recoverd_task = p;
336 	return error;
337 }
338 
dlm_recoverd_stop(struct dlm_ls * ls)339 void dlm_recoverd_stop(struct dlm_ls *ls)
340 {
341 	kthread_stop(ls->ls_recoverd_task);
342 }
343 
dlm_recoverd_suspend(struct dlm_ls * ls)344 void dlm_recoverd_suspend(struct dlm_ls *ls)
345 {
346 	wake_up(&ls->ls_wait_general);
347 	mutex_lock(&ls->ls_recoverd_active);
348 }
349 
dlm_recoverd_resume(struct dlm_ls * ls)350 void dlm_recoverd_resume(struct dlm_ls *ls)
351 {
352 	mutex_unlock(&ls->ls_recoverd_active);
353 }
354 
355