1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13 
14 #include "dlm_internal.h"
15 #include "lockspace.h"
16 #include "member.h"
17 #include "lowcomms.h"
18 #include "rcom.h"
19 #include "config.h"
20 #include "memory.h"
21 #include "recover.h"
22 #include "util.h"
23 #include "lock.h"
24 #include "dir.h"
25 
26 /*
27  * We use the upper 16 bits of the hash value to select the directory node.
28  * Low bits are used for distribution of rsb's among hash buckets on each node.
29  *
30  * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
31  * num_nodes to the hash value.  This value in the desired range is used as an
32  * offset into the sorted list of nodeid's to give the particular nodeid.
33  */
34 
dlm_hash2nodeid(struct dlm_ls * ls,uint32_t hash)35 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
36 {
37 	uint32_t node;
38 
39 	if (ls->ls_num_nodes == 1)
40 		return dlm_our_nodeid();
41 	else {
42 		node = (hash >> 16) % ls->ls_total_weight;
43 		return ls->ls_node_array[node];
44 	}
45 }
46 
dlm_dir_nodeid(struct dlm_rsb * r)47 int dlm_dir_nodeid(struct dlm_rsb *r)
48 {
49 	return r->res_dir_nodeid;
50 }
51 
dlm_recover_dir_nodeid(struct dlm_ls * ls)52 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
53 {
54 	struct dlm_rsb *r;
55 
56 	down_read(&ls->ls_root_sem);
57 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
58 		r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
59 	}
60 	up_read(&ls->ls_root_sem);
61 }
62 
dlm_recover_directory(struct dlm_ls * ls)63 int dlm_recover_directory(struct dlm_ls *ls)
64 {
65 	struct dlm_member *memb;
66 	char *b, *last_name = NULL;
67 	int error = -ENOMEM, last_len, nodeid, result;
68 	uint16_t namelen;
69 	unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
70 
71 	log_rinfo(ls, "dlm_recover_directory");
72 
73 	if (dlm_no_directory(ls))
74 		goto out_status;
75 
76 	last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
77 	if (!last_name)
78 		goto out;
79 
80 	list_for_each_entry(memb, &ls->ls_nodes, list) {
81 		if (memb->nodeid == dlm_our_nodeid())
82 			continue;
83 
84 		memset(last_name, 0, DLM_RESNAME_MAXLEN);
85 		last_len = 0;
86 
87 		for (;;) {
88 			int left;
89 			error = dlm_recovery_stopped(ls);
90 			if (error)
91 				goto out_free;
92 
93 			error = dlm_rcom_names(ls, memb->nodeid,
94 					       last_name, last_len);
95 			if (error)
96 				goto out_free;
97 
98 			cond_resched();
99 
100 			/*
101 			 * pick namelen/name pairs out of received buffer
102 			 */
103 
104 			b = ls->ls_recover_buf->rc_buf;
105 			left = ls->ls_recover_buf->rc_header.h_length;
106 			left -= sizeof(struct dlm_rcom);
107 
108 			for (;;) {
109 				__be16 v;
110 
111 				error = -EINVAL;
112 				if (left < sizeof(__be16))
113 					goto out_free;
114 
115 				memcpy(&v, b, sizeof(__be16));
116 				namelen = be16_to_cpu(v);
117 				b += sizeof(__be16);
118 				left -= sizeof(__be16);
119 
120 				/* namelen of 0xFFFFF marks end of names for
121 				   this node; namelen of 0 marks end of the
122 				   buffer */
123 
124 				if (namelen == 0xFFFF)
125 					goto done;
126 				if (!namelen)
127 					break;
128 
129 				if (namelen > left)
130 					goto out_free;
131 
132 				if (namelen > DLM_RESNAME_MAXLEN)
133 					goto out_free;
134 
135 				error = dlm_master_lookup(ls, memb->nodeid,
136 							  b, namelen,
137 							  DLM_LU_RECOVER_DIR,
138 							  &nodeid, &result);
139 				if (error) {
140 					log_error(ls, "recover_dir lookup %d",
141 						  error);
142 					goto out_free;
143 				}
144 
145 				/* The name was found in rsbtbl, but the
146 				 * master nodeid is different from
147 				 * memb->nodeid which says it is the master.
148 				 * This should not happen. */
149 
150 				if (result == DLM_LU_MATCH &&
151 				    nodeid != memb->nodeid) {
152 					count_bad++;
153 					log_error(ls, "recover_dir lookup %d "
154 						  "nodeid %d memb %d bad %u",
155 						  result, nodeid, memb->nodeid,
156 						  count_bad);
157 					print_hex_dump_bytes("dlm_recover_dir ",
158 							     DUMP_PREFIX_NONE,
159 							     b, namelen);
160 				}
161 
162 				/* The name was found in rsbtbl, and the
163 				 * master nodeid matches memb->nodeid. */
164 
165 				if (result == DLM_LU_MATCH &&
166 				    nodeid == memb->nodeid) {
167 					count_match++;
168 				}
169 
170 				/* The name was not found in rsbtbl and was
171 				 * added with memb->nodeid as the master. */
172 
173 				if (result == DLM_LU_ADD) {
174 					count_add++;
175 				}
176 
177 				last_len = namelen;
178 				memcpy(last_name, b, namelen);
179 				b += namelen;
180 				left -= namelen;
181 				count++;
182 			}
183 		}
184 	 done:
185 		;
186 	}
187 
188  out_status:
189 	error = 0;
190 	dlm_set_recover_status(ls, DLM_RS_DIR);
191 
192 	log_rinfo(ls, "dlm_recover_directory %u in %u new",
193 		  count, count_add);
194  out_free:
195 	kfree(last_name);
196  out:
197 	return error;
198 }
199 
find_rsb_root(struct dlm_ls * ls,char * name,int len)200 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
201 {
202 	struct dlm_rsb *r;
203 	uint32_t hash, bucket;
204 	int rv;
205 
206 	hash = jhash(name, len, 0);
207 	bucket = hash & (ls->ls_rsbtbl_size - 1);
208 
209 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
210 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211 	if (rv)
212 		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213 					 name, len, &r);
214 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
215 
216 	if (!rv)
217 		return r;
218 
219 	down_read(&ls->ls_root_sem);
220 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
221 		if (len == r->res_length && !memcmp(name, r->res_name, len)) {
222 			up_read(&ls->ls_root_sem);
223 			log_debug(ls, "find_rsb_root revert to root_list %s",
224 				  r->res_name);
225 			return r;
226 		}
227 	}
228 	up_read(&ls->ls_root_sem);
229 	return NULL;
230 }
231 
232 /* Find the rsb where we left off (or start again), then send rsb names
233    for rsb's we're master of and whose directory node matches the requesting
234    node.  inbuf is the rsb name last sent, inlen is the name's length */
235 
dlm_copy_master_names(struct dlm_ls * ls,char * inbuf,int inlen,char * outbuf,int outlen,int nodeid)236 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
237  			   char *outbuf, int outlen, int nodeid)
238 {
239 	struct list_head *list;
240 	struct dlm_rsb *r;
241 	int offset = 0, dir_nodeid;
242 	__be16 be_namelen;
243 
244 	down_read(&ls->ls_root_sem);
245 
246 	if (inlen > 1) {
247 		r = find_rsb_root(ls, inbuf, inlen);
248 		if (!r) {
249 			inbuf[inlen - 1] = '\0';
250 			log_error(ls, "copy_master_names from %d start %d %s",
251 				  nodeid, inlen, inbuf);
252 			goto out;
253 		}
254 		list = r->res_root_list.next;
255 	} else {
256 		list = ls->ls_root_list.next;
257 	}
258 
259 	for (offset = 0; list != &ls->ls_root_list; list = list->next) {
260 		r = list_entry(list, struct dlm_rsb, res_root_list);
261 		if (r->res_nodeid)
262 			continue;
263 
264 		dir_nodeid = dlm_dir_nodeid(r);
265 		if (dir_nodeid != nodeid)
266 			continue;
267 
268 		/*
269 		 * The block ends when we can't fit the following in the
270 		 * remaining buffer space:
271 		 * namelen (uint16_t) +
272 		 * name (r->res_length) +
273 		 * end-of-block record 0x0000 (uint16_t)
274 		 */
275 
276 		if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
277 			/* Write end-of-block record */
278 			be_namelen = cpu_to_be16(0);
279 			memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
280 			offset += sizeof(__be16);
281 			ls->ls_recover_dir_sent_msg++;
282 			goto out;
283 		}
284 
285 		be_namelen = cpu_to_be16(r->res_length);
286 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
287 		offset += sizeof(__be16);
288 		memcpy(outbuf + offset, r->res_name, r->res_length);
289 		offset += r->res_length;
290 		ls->ls_recover_dir_sent_res++;
291 	}
292 
293 	/*
294 	 * If we've reached the end of the list (and there's room) write a
295 	 * terminating record.
296 	 */
297 
298 	if ((list == &ls->ls_root_list) &&
299 	    (offset + sizeof(uint16_t) <= outlen)) {
300 		be_namelen = cpu_to_be16(0xFFFF);
301 		memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
302 		offset += sizeof(__be16);
303 		ls->ls_recover_dir_sent_msg++;
304 	}
305  out:
306 	up_read(&ls->ls_root_sem);
307 }
308 
309