1 /*
2  * Copyright (C) 2005, 2006
3  * Avishay Traeger (avishay@gmail.com)
4  * Copyright (C) 2008, 2009
5  * Boaz Harrosh <ooo@electrozaur.com>
6  *
7  * This file is part of exofs.
8  *
9  * exofs is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation.  Since it is based on ext2, and the only
12  * valid version of GPL for the Linux kernel is version 2, the only valid
13  * version of GPL for exofs is version 2.
14  *
15  * exofs is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public License
21  * along with exofs; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
23  */
24 
25 #include <linux/slab.h>
26 #include <linux/module.h>
27 #include <asm/div64.h>
28 #include <linux/lcm.h>
29 
30 #include "ore_raid.h"
31 
32 MODULE_AUTHOR("Boaz Harrosh <ooo@electrozaur.com>");
33 MODULE_DESCRIPTION("Objects Raid Engine ore.ko");
34 MODULE_LICENSE("GPL");
35 
36 /* ore_verify_layout does a couple of things:
37  * 1. Given a minimum number of needed parameters fixes up the rest of the
38  *    members to be operatonals for the ore. The needed parameters are those
39  *    that are defined by the pnfs-objects layout STD.
40  * 2. Check to see if the current ore code actually supports these parameters
41  *    for example stripe_unit must be a multple of the system PAGE_SIZE,
42  *    and etc...
43  * 3. Cache some havily used calculations that will be needed by users.
44  */
45 
46 enum { BIO_MAX_PAGES_KMALLOC =
47 		(PAGE_SIZE - sizeof(struct bio)) / sizeof(struct bio_vec),};
48 
ore_verify_layout(unsigned total_comps,struct ore_layout * layout)49 int ore_verify_layout(unsigned total_comps, struct ore_layout *layout)
50 {
51 	u64 stripe_length;
52 
53 	switch (layout->raid_algorithm) {
54 	case PNFS_OSD_RAID_0:
55 		layout->parity = 0;
56 		break;
57 	case PNFS_OSD_RAID_5:
58 		layout->parity = 1;
59 		break;
60 	case PNFS_OSD_RAID_PQ:
61 		layout->parity = 2;
62 		break;
63 	case PNFS_OSD_RAID_4:
64 	default:
65 		ORE_ERR("Only RAID_0/5/6 for now received-enum=%d\n",
66 			layout->raid_algorithm);
67 		return -EINVAL;
68 	}
69 	if (0 != (layout->stripe_unit & ~PAGE_MASK)) {
70 		ORE_ERR("Stripe Unit(0x%llx)"
71 			  " must be Multples of PAGE_SIZE(0x%lx)\n",
72 			  _LLU(layout->stripe_unit), PAGE_SIZE);
73 		return -EINVAL;
74 	}
75 	if (layout->group_width) {
76 		if (!layout->group_depth) {
77 			ORE_ERR("group_depth == 0 && group_width != 0\n");
78 			return -EINVAL;
79 		}
80 		if (total_comps < (layout->group_width * layout->mirrors_p1)) {
81 			ORE_ERR("Data Map wrong, "
82 				"numdevs=%d < group_width=%d * mirrors=%d\n",
83 				total_comps, layout->group_width,
84 				layout->mirrors_p1);
85 			return -EINVAL;
86 		}
87 		layout->group_count = total_comps / layout->mirrors_p1 /
88 						layout->group_width;
89 	} else {
90 		if (layout->group_depth) {
91 			printk(KERN_NOTICE "Warning: group_depth ignored "
92 				"group_width == 0 && group_depth == %lld\n",
93 				_LLU(layout->group_depth));
94 		}
95 		layout->group_width = total_comps / layout->mirrors_p1;
96 		layout->group_depth = -1;
97 		layout->group_count = 1;
98 	}
99 
100 	stripe_length = (u64)layout->group_width * layout->stripe_unit;
101 	if (stripe_length >= (1ULL << 32)) {
102 		ORE_ERR("Stripe_length(0x%llx) >= 32bit is not supported\n",
103 			_LLU(stripe_length));
104 		return -EINVAL;
105 	}
106 
107 	layout->max_io_length =
108 		(BIO_MAX_PAGES_KMALLOC * PAGE_SIZE - layout->stripe_unit) *
109 					(layout->group_width - layout->parity);
110 	if (layout->parity) {
111 		unsigned stripe_length =
112 				(layout->group_width - layout->parity) *
113 				layout->stripe_unit;
114 
115 		layout->max_io_length /= stripe_length;
116 		layout->max_io_length *= stripe_length;
117 	}
118 	ORE_DBGMSG("max_io_length=0x%lx\n", layout->max_io_length);
119 
120 	return 0;
121 }
122 EXPORT_SYMBOL(ore_verify_layout);
123 
_ios_cred(struct ore_io_state * ios,unsigned index)124 static u8 *_ios_cred(struct ore_io_state *ios, unsigned index)
125 {
126 	return ios->oc->comps[index & ios->oc->single_comp].cred;
127 }
128 
_ios_obj(struct ore_io_state * ios,unsigned index)129 static struct osd_obj_id *_ios_obj(struct ore_io_state *ios, unsigned index)
130 {
131 	return &ios->oc->comps[index & ios->oc->single_comp].obj;
132 }
133 
_ios_od(struct ore_io_state * ios,unsigned index)134 static struct osd_dev *_ios_od(struct ore_io_state *ios, unsigned index)
135 {
136 	ORE_DBGMSG2("oc->first_dev=%d oc->numdevs=%d i=%d oc->ods=%p\n",
137 		    ios->oc->first_dev, ios->oc->numdevs, index,
138 		    ios->oc->ods);
139 
140 	return ore_comp_dev(ios->oc, index);
141 }
142 
_ore_get_io_state(struct ore_layout * layout,struct ore_components * oc,unsigned numdevs,unsigned sgs_per_dev,unsigned num_par_pages,struct ore_io_state ** pios)143 int  _ore_get_io_state(struct ore_layout *layout,
144 			struct ore_components *oc, unsigned numdevs,
145 			unsigned sgs_per_dev, unsigned num_par_pages,
146 			struct ore_io_state **pios)
147 {
148 	struct ore_io_state *ios;
149 	size_t size_ios, size_extra, size_total;
150 	void *ios_extra;
151 
152 	/*
153 	 * The desired layout looks like this, with the extra_allocation
154 	 * items pointed at from fields within ios or per_dev:
155 
156 	struct __alloc_all_io_state {
157 		struct ore_io_state ios;
158 		struct ore_per_dev_state per_dev[numdevs];
159 		union {
160 			struct osd_sg_entry sglist[sgs_per_dev * numdevs];
161 			struct page *pages[num_par_pages];
162 		} extra_allocation;
163 	} whole_allocation;
164 
165 	*/
166 
167 	/* This should never happen, so abort early if it ever does. */
168 	if (sgs_per_dev && num_par_pages) {
169 		ORE_DBGMSG("Tried to use both pages and sglist\n");
170 		*pios = NULL;
171 		return -EINVAL;
172 	}
173 
174 	if (numdevs > (INT_MAX - sizeof(*ios)) /
175 		       sizeof(struct ore_per_dev_state))
176 		return -ENOMEM;
177 	size_ios = sizeof(*ios) + sizeof(struct ore_per_dev_state) * numdevs;
178 
179 	if (sgs_per_dev * numdevs > INT_MAX / sizeof(struct osd_sg_entry))
180 		return -ENOMEM;
181 	if (num_par_pages > INT_MAX / sizeof(struct page *))
182 		return -ENOMEM;
183 	size_extra = max(sizeof(struct osd_sg_entry) * (sgs_per_dev * numdevs),
184 			 sizeof(struct page *) * num_par_pages);
185 
186 	size_total = size_ios + size_extra;
187 
188 	if (likely(size_total <= PAGE_SIZE)) {
189 		ios = kzalloc(size_total, GFP_KERNEL);
190 		if (unlikely(!ios)) {
191 			ORE_DBGMSG("Failed kzalloc bytes=%zd\n", size_total);
192 			*pios = NULL;
193 			return -ENOMEM;
194 		}
195 		ios_extra = (char *)ios + size_ios;
196 	} else {
197 		ios = kzalloc(size_ios, GFP_KERNEL);
198 		if (unlikely(!ios)) {
199 			ORE_DBGMSG("Failed alloc first part bytes=%zd\n",
200 				   size_ios);
201 			*pios = NULL;
202 			return -ENOMEM;
203 		}
204 		ios_extra = kzalloc(size_extra, GFP_KERNEL);
205 		if (unlikely(!ios_extra)) {
206 			ORE_DBGMSG("Failed alloc second part bytes=%zd\n",
207 				   size_extra);
208 			kfree(ios);
209 			*pios = NULL;
210 			return -ENOMEM;
211 		}
212 
213 		/* In this case the per_dev[0].sgilist holds the pointer to
214 		 * be freed
215 		 */
216 		ios->extra_part_alloc = true;
217 	}
218 
219 	if (num_par_pages) {
220 		ios->parity_pages = ios_extra;
221 		ios->max_par_pages = num_par_pages;
222 	}
223 	if (sgs_per_dev) {
224 		struct osd_sg_entry *sgilist = ios_extra;
225 		unsigned d;
226 
227 		for (d = 0; d < numdevs; ++d) {
228 			ios->per_dev[d].sglist = sgilist;
229 			sgilist += sgs_per_dev;
230 		}
231 		ios->sgs_per_dev = sgs_per_dev;
232 	}
233 
234 	ios->layout = layout;
235 	ios->oc = oc;
236 	*pios = ios;
237 	return 0;
238 }
239 
240 /* Allocate an io_state for only a single group of devices
241  *
242  * If a user needs to call ore_read/write() this version must be used becase it
243  * allocates extra stuff for striping and raid.
244  * The ore might decide to only IO less then @length bytes do to alignmets
245  * and constrains as follows:
246  * - The IO cannot cross group boundary.
247  * - In raid5/6 The end of the IO must align at end of a stripe eg.
248  *   (@offset + @length) % strip_size == 0. Or the complete range is within a
249  *   single stripe.
250  * - Memory condition only permitted a shorter IO. (A user can use @length=~0
251  *   And check the returned ios->length for max_io_size.)
252  *
253  * The caller must check returned ios->length (and/or ios->nr_pages) and
254  * re-issue these pages that fall outside of ios->length
255  */
ore_get_rw_state(struct ore_layout * layout,struct ore_components * oc,bool is_reading,u64 offset,u64 length,struct ore_io_state ** pios)256 int  ore_get_rw_state(struct ore_layout *layout, struct ore_components *oc,
257 		      bool is_reading, u64 offset, u64 length,
258 		      struct ore_io_state **pios)
259 {
260 	struct ore_io_state *ios;
261 	unsigned numdevs = layout->group_width * layout->mirrors_p1;
262 	unsigned sgs_per_dev = 0, max_par_pages = 0;
263 	int ret;
264 
265 	if (layout->parity && length) {
266 		unsigned data_devs = layout->group_width - layout->parity;
267 		unsigned stripe_size = layout->stripe_unit * data_devs;
268 		unsigned pages_in_unit = layout->stripe_unit / PAGE_SIZE;
269 		u32 remainder;
270 		u64 num_stripes;
271 		u64 num_raid_units;
272 
273 		num_stripes = div_u64_rem(length, stripe_size, &remainder);
274 		if (remainder)
275 			++num_stripes;
276 
277 		num_raid_units =  num_stripes * layout->parity;
278 
279 		if (is_reading) {
280 			/* For reads add per_dev sglist array */
281 			/* TODO: Raid 6 we need twice more. Actually:
282 			*         num_stripes / LCMdP(W,P);
283 			*         if (W%P != 0) num_stripes *= parity;
284 			*/
285 
286 			/* first/last seg is split */
287 			num_raid_units += layout->group_width;
288 			sgs_per_dev = div_u64(num_raid_units, data_devs) + 2;
289 		} else {
290 			/* For Writes add parity pages array. */
291 			max_par_pages = num_raid_units * pages_in_unit *
292 						sizeof(struct page *);
293 		}
294 	}
295 
296 	ret = _ore_get_io_state(layout, oc, numdevs, sgs_per_dev, max_par_pages,
297 				pios);
298 	if (unlikely(ret))
299 		return ret;
300 
301 	ios = *pios;
302 	ios->reading = is_reading;
303 	ios->offset = offset;
304 
305 	if (length) {
306 		ore_calc_stripe_info(layout, offset, length, &ios->si);
307 		ios->length = ios->si.length;
308 		ios->nr_pages = ((ios->offset & (PAGE_SIZE - 1)) +
309 				 ios->length + PAGE_SIZE - 1) / PAGE_SIZE;
310 		if (layout->parity)
311 			_ore_post_alloc_raid_stuff(ios);
312 	}
313 
314 	return 0;
315 }
316 EXPORT_SYMBOL(ore_get_rw_state);
317 
318 /* Allocate an io_state for all the devices in the comps array
319  *
320  * This version of io_state allocation is used mostly by create/remove
321  * and trunc where we currently need all the devices. The only wastful
322  * bit is the read/write_attributes with no IO. Those sites should
323  * be converted to use ore_get_rw_state() with length=0
324  */
ore_get_io_state(struct ore_layout * layout,struct ore_components * oc,struct ore_io_state ** pios)325 int  ore_get_io_state(struct ore_layout *layout, struct ore_components *oc,
326 		      struct ore_io_state **pios)
327 {
328 	return _ore_get_io_state(layout, oc, oc->numdevs, 0, 0, pios);
329 }
330 EXPORT_SYMBOL(ore_get_io_state);
331 
ore_put_io_state(struct ore_io_state * ios)332 void ore_put_io_state(struct ore_io_state *ios)
333 {
334 	if (ios) {
335 		unsigned i;
336 
337 		for (i = 0; i < ios->numdevs; i++) {
338 			struct ore_per_dev_state *per_dev = &ios->per_dev[i];
339 
340 			if (per_dev->or)
341 				osd_end_request(per_dev->or);
342 			if (per_dev->bio)
343 				bio_put(per_dev->bio);
344 		}
345 
346 		_ore_free_raid_stuff(ios);
347 		kfree(ios);
348 	}
349 }
350 EXPORT_SYMBOL(ore_put_io_state);
351 
_sync_done(struct ore_io_state * ios,void * p)352 static void _sync_done(struct ore_io_state *ios, void *p)
353 {
354 	struct completion *waiting = p;
355 
356 	complete(waiting);
357 }
358 
_last_io(struct kref * kref)359 static void _last_io(struct kref *kref)
360 {
361 	struct ore_io_state *ios = container_of(
362 					kref, struct ore_io_state, kref);
363 
364 	ios->done(ios, ios->private);
365 }
366 
_done_io(struct osd_request * or,void * p)367 static void _done_io(struct osd_request *or, void *p)
368 {
369 	struct ore_io_state *ios = p;
370 
371 	kref_put(&ios->kref, _last_io);
372 }
373 
ore_io_execute(struct ore_io_state * ios)374 int ore_io_execute(struct ore_io_state *ios)
375 {
376 	DECLARE_COMPLETION_ONSTACK(wait);
377 	bool sync = (ios->done == NULL);
378 	int i, ret;
379 
380 	if (sync) {
381 		ios->done = _sync_done;
382 		ios->private = &wait;
383 	}
384 
385 	for (i = 0; i < ios->numdevs; i++) {
386 		struct osd_request *or = ios->per_dev[i].or;
387 		if (unlikely(!or))
388 			continue;
389 
390 		ret = osd_finalize_request(or, 0, _ios_cred(ios, i), NULL);
391 		if (unlikely(ret)) {
392 			ORE_DBGMSG("Failed to osd_finalize_request() => %d\n",
393 				     ret);
394 			return ret;
395 		}
396 	}
397 
398 	kref_init(&ios->kref);
399 
400 	for (i = 0; i < ios->numdevs; i++) {
401 		struct osd_request *or = ios->per_dev[i].or;
402 		if (unlikely(!or))
403 			continue;
404 
405 		kref_get(&ios->kref);
406 		osd_execute_request_async(or, _done_io, ios);
407 	}
408 
409 	kref_put(&ios->kref, _last_io);
410 	ret = 0;
411 
412 	if (sync) {
413 		wait_for_completion(&wait);
414 		ret = ore_check_io(ios, NULL);
415 	}
416 	return ret;
417 }
418 
_clear_bio(struct bio * bio)419 static void _clear_bio(struct bio *bio)
420 {
421 	struct bio_vec *bv;
422 	unsigned i;
423 
424 	bio_for_each_segment_all(bv, bio, i) {
425 		unsigned this_count = bv->bv_len;
426 
427 		if (likely(PAGE_SIZE == this_count))
428 			clear_highpage(bv->bv_page);
429 		else
430 			zero_user(bv->bv_page, bv->bv_offset, this_count);
431 	}
432 }
433 
ore_check_io(struct ore_io_state * ios,ore_on_dev_error on_dev_error)434 int ore_check_io(struct ore_io_state *ios, ore_on_dev_error on_dev_error)
435 {
436 	enum osd_err_priority acumulated_osd_err = 0;
437 	int acumulated_lin_err = 0;
438 	int i;
439 
440 	for (i = 0; i < ios->numdevs; i++) {
441 		struct osd_sense_info osi;
442 		struct ore_per_dev_state *per_dev = &ios->per_dev[i];
443 		struct osd_request *or = per_dev->or;
444 		int ret;
445 
446 		if (unlikely(!or))
447 			continue;
448 
449 		ret = osd_req_decode_sense(or, &osi);
450 		if (likely(!ret))
451 			continue;
452 
453 		if ((OSD_ERR_PRI_CLEAR_PAGES == osi.osd_err_pri) &&
454 		    per_dev->bio) {
455 			/* start read offset passed endof file.
456 			 * Note: if we do not have bio it means read-attributes
457 			 * In this case we should return error to caller.
458 			 */
459 			_clear_bio(per_dev->bio);
460 			ORE_DBGMSG("start read offset passed end of file "
461 				"offset=0x%llx, length=0x%llx\n",
462 				_LLU(per_dev->offset),
463 				_LLU(per_dev->length));
464 
465 			continue; /* we recovered */
466 		}
467 
468 		if (on_dev_error) {
469 			u64 residual = ios->reading ?
470 					or->in.residual : or->out.residual;
471 			u64 offset = (ios->offset + ios->length) - residual;
472 			unsigned dev = per_dev->dev - ios->oc->first_dev;
473 			struct ore_dev *od = ios->oc->ods[dev];
474 
475 			on_dev_error(ios, od, dev, osi.osd_err_pri,
476 				     offset, residual);
477 		}
478 		if (osi.osd_err_pri >= acumulated_osd_err) {
479 			acumulated_osd_err = osi.osd_err_pri;
480 			acumulated_lin_err = ret;
481 		}
482 	}
483 
484 	return acumulated_lin_err;
485 }
486 EXPORT_SYMBOL(ore_check_io);
487 
488 /*
489  * L - logical offset into the file
490  *
491  * D - number of Data devices
492  *	D = group_width - parity
493  *
494  * U - The number of bytes in a stripe within a group
495  *	U =  stripe_unit * D
496  *
497  * T - The number of bytes striped within a group of component objects
498  *     (before advancing to the next group)
499  *	T = U * group_depth
500  *
501  * S - The number of bytes striped across all component objects
502  *     before the pattern repeats
503  *	S = T * group_count
504  *
505  * M - The "major" (i.e., across all components) cycle number
506  *	M = L / S
507  *
508  * G - Counts the groups from the beginning of the major cycle
509  *	G = (L - (M * S)) / T	[or (L % S) / T]
510  *
511  * H - The byte offset within the group
512  *	H = (L - (M * S)) % T	[or (L % S) % T]
513  *
514  * N - The "minor" (i.e., across the group) stripe number
515  *	N = H / U
516  *
517  * C - The component index coresponding to L
518  *
519  *	C = (H - (N * U)) / stripe_unit + G * D
520  *	[or (L % U) / stripe_unit + G * D]
521  *
522  * O - The component offset coresponding to L
523  *	O = L % stripe_unit + N * stripe_unit + M * group_depth * stripe_unit
524  *
525  * LCMdP – Parity cycle: Lowest Common Multiple of group_width, parity
526  *          divide by parity
527  *	LCMdP = lcm(group_width, parity) / parity
528  *
529  * R - The parity Rotation stripe
530  *     (Note parity cycle always starts at a group's boundary)
531  *	R = N % LCMdP
532  *
533  * I = the first parity device index
534  *	I = (group_width + group_width - R*parity - parity) % group_width
535  *
536  * Craid - The component index Rotated
537  *	Craid = (group_width + C - R*parity) % group_width
538  *      (We add the group_width to avoid negative numbers modulo math)
539  */
ore_calc_stripe_info(struct ore_layout * layout,u64 file_offset,u64 length,struct ore_striping_info * si)540 void ore_calc_stripe_info(struct ore_layout *layout, u64 file_offset,
541 			  u64 length, struct ore_striping_info *si)
542 {
543 	u32	stripe_unit = layout->stripe_unit;
544 	u32	group_width = layout->group_width;
545 	u64	group_depth = layout->group_depth;
546 	u32	parity      = layout->parity;
547 
548 	u32	D = group_width - parity;
549 	u32	U = D * stripe_unit;
550 	u64	T = U * group_depth;
551 	u64	S = T * layout->group_count;
552 	u64	M = div64_u64(file_offset, S);
553 
554 	/*
555 	G = (L - (M * S)) / T
556 	H = (L - (M * S)) % T
557 	*/
558 	u64	LmodS = file_offset - M * S;
559 	u32	G = div64_u64(LmodS, T);
560 	u64	H = LmodS - G * T;
561 
562 	u32	N = div_u64(H, U);
563 	u32	Nlast;
564 
565 	/* "H - (N * U)" is just "H % U" so it's bound to u32 */
566 	u32	C = (u32)(H - (N * U)) / stripe_unit + G * group_width;
567 	u32 first_dev = C - C % group_width;
568 
569 	div_u64_rem(file_offset, stripe_unit, &si->unit_off);
570 
571 	si->obj_offset = si->unit_off + (N * stripe_unit) +
572 				  (M * group_depth * stripe_unit);
573 	si->cur_comp = C - first_dev;
574 	si->cur_pg = si->unit_off / PAGE_SIZE;
575 
576 	if (parity) {
577 		u32 LCMdP = lcm(group_width, parity) / parity;
578 		/* R     = N % LCMdP; */
579 		u32 RxP   = (N % LCMdP) * parity;
580 
581 		si->par_dev = (group_width + group_width - parity - RxP) %
582 			      group_width + first_dev;
583 		si->dev = (group_width + group_width + C - RxP) %
584 			  group_width + first_dev;
585 		si->bytes_in_stripe = U;
586 		si->first_stripe_start = M * S + G * T + N * U;
587 	} else {
588 		/* Make the math correct see _prepare_one_group */
589 		si->par_dev = group_width;
590 		si->dev = C;
591 	}
592 
593 	si->dev *= layout->mirrors_p1;
594 	si->par_dev *= layout->mirrors_p1;
595 	si->offset = file_offset;
596 	si->length = T - H;
597 	if (si->length > length)
598 		si->length = length;
599 
600 	Nlast = div_u64(H + si->length + U - 1, U);
601 	si->maxdevUnits = Nlast - N;
602 
603 	si->M = M;
604 }
605 EXPORT_SYMBOL(ore_calc_stripe_info);
606 
_ore_add_stripe_unit(struct ore_io_state * ios,unsigned * cur_pg,unsigned pgbase,struct page ** pages,struct ore_per_dev_state * per_dev,int cur_len)607 int _ore_add_stripe_unit(struct ore_io_state *ios,  unsigned *cur_pg,
608 			 unsigned pgbase, struct page **pages,
609 			 struct ore_per_dev_state *per_dev, int cur_len)
610 {
611 	unsigned pg = *cur_pg;
612 	struct request_queue *q =
613 			osd_request_queue(_ios_od(ios, per_dev->dev));
614 	unsigned len = cur_len;
615 	int ret;
616 
617 	if (per_dev->bio == NULL) {
618 		unsigned bio_size;
619 
620 		if (!ios->reading) {
621 			bio_size = ios->si.maxdevUnits;
622 		} else {
623 			bio_size = (ios->si.maxdevUnits + 1) *
624 			     (ios->layout->group_width - ios->layout->parity) /
625 			     ios->layout->group_width;
626 		}
627 		bio_size *= (ios->layout->stripe_unit / PAGE_SIZE);
628 
629 		per_dev->bio = bio_kmalloc(GFP_KERNEL, bio_size);
630 		if (unlikely(!per_dev->bio)) {
631 			ORE_DBGMSG("Failed to allocate BIO size=%u\n",
632 				     bio_size);
633 			ret = -ENOMEM;
634 			goto out;
635 		}
636 	}
637 
638 	while (cur_len > 0) {
639 		unsigned pglen = min_t(unsigned, PAGE_SIZE - pgbase, cur_len);
640 		unsigned added_len;
641 
642 		cur_len -= pglen;
643 
644 		added_len = bio_add_pc_page(q, per_dev->bio, pages[pg],
645 					    pglen, pgbase);
646 		if (unlikely(pglen != added_len)) {
647 			/* If bi_vcnt == bi_max then this is a SW BUG */
648 			ORE_DBGMSG("Failed bio_add_pc_page bi_vcnt=0x%x "
649 				   "bi_max=0x%x BIO_MAX=0x%x cur_len=0x%x\n",
650 				   per_dev->bio->bi_vcnt,
651 				   per_dev->bio->bi_max_vecs,
652 				   BIO_MAX_PAGES_KMALLOC, cur_len);
653 			ret = -ENOMEM;
654 			goto out;
655 		}
656 		_add_stripe_page(ios->sp2d, &ios->si, pages[pg]);
657 
658 		pgbase = 0;
659 		++pg;
660 	}
661 	BUG_ON(cur_len);
662 
663 	per_dev->length += len;
664 	*cur_pg = pg;
665 	ret = 0;
666 out:	/* we fail the complete unit on an error eg don't advance
667 	 * per_dev->length and cur_pg. This means that we might have a bigger
668 	 * bio than the CDB requested length (per_dev->length). That's fine
669 	 * only the oposite is fatal.
670 	 */
671 	return ret;
672 }
673 
_add_parity_units(struct ore_io_state * ios,struct ore_striping_info * si,unsigned dev,unsigned first_dev,unsigned mirrors_p1,unsigned devs_in_group,unsigned cur_len)674 static int _add_parity_units(struct ore_io_state *ios,
675 			     struct ore_striping_info *si,
676 			     unsigned dev, unsigned first_dev,
677 			     unsigned mirrors_p1, unsigned devs_in_group,
678 			     unsigned cur_len)
679 {
680 	unsigned do_parity;
681 	int ret = 0;
682 
683 	for (do_parity = ios->layout->parity; do_parity; --do_parity) {
684 		struct ore_per_dev_state *per_dev;
685 
686 		per_dev = &ios->per_dev[dev - first_dev];
687 		if (!per_dev->length && !per_dev->offset) {
688 			/* Only/always the parity unit of the first
689 			 * stripe will be empty. So this is a chance to
690 			 * initialize the per_dev info.
691 			 */
692 			per_dev->dev = dev;
693 			per_dev->offset = si->obj_offset - si->unit_off;
694 		}
695 
696 		ret = _ore_add_parity_unit(ios, si, per_dev, cur_len,
697 					   do_parity == 1);
698 		if (unlikely(ret))
699 				break;
700 
701 		if (do_parity != 1) {
702 			dev = ((dev + mirrors_p1) % devs_in_group) + first_dev;
703 			si->cur_comp = (si->cur_comp + 1) %
704 						       ios->layout->group_width;
705 		}
706 	}
707 
708 	return ret;
709 }
710 
_prepare_for_striping(struct ore_io_state * ios)711 static int _prepare_for_striping(struct ore_io_state *ios)
712 {
713 	struct ore_striping_info *si = &ios->si;
714 	unsigned stripe_unit = ios->layout->stripe_unit;
715 	unsigned mirrors_p1 = ios->layout->mirrors_p1;
716 	unsigned group_width = ios->layout->group_width;
717 	unsigned devs_in_group = group_width * mirrors_p1;
718 	unsigned dev = si->dev;
719 	unsigned first_dev = dev - (dev % devs_in_group);
720 	unsigned cur_pg = ios->pages_consumed;
721 	u64 length = ios->length;
722 	int ret = 0;
723 
724 	if (!ios->pages) {
725 		ios->numdevs = ios->layout->mirrors_p1;
726 		return 0;
727 	}
728 
729 	BUG_ON(length > si->length);
730 
731 	while (length) {
732 		struct ore_per_dev_state *per_dev =
733 						&ios->per_dev[dev - first_dev];
734 		unsigned cur_len, page_off = 0;
735 
736 		if (!per_dev->length && !per_dev->offset) {
737 			/* First time initialize the per_dev info. */
738 			per_dev->dev = dev;
739 			if (dev == si->dev) {
740 				WARN_ON(dev == si->par_dev);
741 				per_dev->offset = si->obj_offset;
742 				cur_len = stripe_unit - si->unit_off;
743 				page_off = si->unit_off & ~PAGE_MASK;
744 				BUG_ON(page_off && (page_off != ios->pgbase));
745 			} else {
746 				per_dev->offset = si->obj_offset - si->unit_off;
747 				cur_len = stripe_unit;
748 			}
749 		} else {
750 			cur_len = stripe_unit;
751 		}
752 		if (cur_len >= length)
753 			cur_len = length;
754 
755 		ret = _ore_add_stripe_unit(ios, &cur_pg, page_off, ios->pages,
756 					   per_dev, cur_len);
757 		if (unlikely(ret))
758 			goto out;
759 
760 		length -= cur_len;
761 
762 		dev = ((dev + mirrors_p1) % devs_in_group) + first_dev;
763 		si->cur_comp = (si->cur_comp + 1) % group_width;
764 		if (unlikely((dev == si->par_dev) || (!length && ios->sp2d))) {
765 			if (!length && ios->sp2d) {
766 				/* If we are writing and this is the very last
767 				 * stripe. then operate on parity dev.
768 				 */
769 				dev = si->par_dev;
770 				/* If last stripe operate on parity comp */
771 				si->cur_comp = group_width - ios->layout->parity;
772 			}
773 
774 			/* In writes cur_len just means if it's the
775 			 * last one. See _ore_add_parity_unit.
776 			 */
777 			ret = _add_parity_units(ios, si, dev, first_dev,
778 						mirrors_p1, devs_in_group,
779 						ios->sp2d ? length : cur_len);
780 			if (unlikely(ret))
781 					goto out;
782 
783 			/* Rotate next par_dev backwards with wraping */
784 			si->par_dev = (devs_in_group + si->par_dev -
785 				       ios->layout->parity * mirrors_p1) %
786 				      devs_in_group + first_dev;
787 			/* Next stripe, start fresh */
788 			si->cur_comp = 0;
789 			si->cur_pg = 0;
790 			si->obj_offset += cur_len;
791 			si->unit_off = 0;
792 		}
793 	}
794 out:
795 	ios->numdevs = devs_in_group;
796 	ios->pages_consumed = cur_pg;
797 	return ret;
798 }
799 
ore_create(struct ore_io_state * ios)800 int ore_create(struct ore_io_state *ios)
801 {
802 	int i, ret;
803 
804 	for (i = 0; i < ios->oc->numdevs; i++) {
805 		struct osd_request *or;
806 
807 		or = osd_start_request(_ios_od(ios, i));
808 		if (unlikely(!or)) {
809 			ORE_ERR("%s: osd_start_request failed\n", __func__);
810 			ret = -ENOMEM;
811 			goto out;
812 		}
813 		ios->per_dev[i].or = or;
814 		ios->numdevs++;
815 
816 		osd_req_create_object(or, _ios_obj(ios, i));
817 	}
818 	ret = ore_io_execute(ios);
819 
820 out:
821 	return ret;
822 }
823 EXPORT_SYMBOL(ore_create);
824 
ore_remove(struct ore_io_state * ios)825 int ore_remove(struct ore_io_state *ios)
826 {
827 	int i, ret;
828 
829 	for (i = 0; i < ios->oc->numdevs; i++) {
830 		struct osd_request *or;
831 
832 		or = osd_start_request(_ios_od(ios, i));
833 		if (unlikely(!or)) {
834 			ORE_ERR("%s: osd_start_request failed\n", __func__);
835 			ret = -ENOMEM;
836 			goto out;
837 		}
838 		ios->per_dev[i].or = or;
839 		ios->numdevs++;
840 
841 		osd_req_remove_object(or, _ios_obj(ios, i));
842 	}
843 	ret = ore_io_execute(ios);
844 
845 out:
846 	return ret;
847 }
848 EXPORT_SYMBOL(ore_remove);
849 
_write_mirror(struct ore_io_state * ios,int cur_comp)850 static int _write_mirror(struct ore_io_state *ios, int cur_comp)
851 {
852 	struct ore_per_dev_state *master_dev = &ios->per_dev[cur_comp];
853 	unsigned dev = ios->per_dev[cur_comp].dev;
854 	unsigned last_comp = cur_comp + ios->layout->mirrors_p1;
855 	int ret = 0;
856 
857 	if (ios->pages && !master_dev->length)
858 		return 0; /* Just an empty slot */
859 
860 	for (; cur_comp < last_comp; ++cur_comp, ++dev) {
861 		struct ore_per_dev_state *per_dev = &ios->per_dev[cur_comp];
862 		struct osd_request *or;
863 
864 		or = osd_start_request(_ios_od(ios, dev));
865 		if (unlikely(!or)) {
866 			ORE_ERR("%s: osd_start_request failed\n", __func__);
867 			ret = -ENOMEM;
868 			goto out;
869 		}
870 		per_dev->or = or;
871 
872 		if (ios->pages) {
873 			struct bio *bio;
874 
875 			if (per_dev != master_dev) {
876 				bio = bio_clone_fast(master_dev->bio,
877 						     GFP_KERNEL, NULL);
878 				if (unlikely(!bio)) {
879 					ORE_DBGMSG(
880 					      "Failed to allocate BIO size=%u\n",
881 					      master_dev->bio->bi_max_vecs);
882 					ret = -ENOMEM;
883 					goto out;
884 				}
885 
886 				bio->bi_disk = NULL;
887 				bio->bi_next = NULL;
888 				per_dev->offset = master_dev->offset;
889 				per_dev->length = master_dev->length;
890 				per_dev->bio =  bio;
891 				per_dev->dev = dev;
892 			} else {
893 				bio = master_dev->bio;
894 				/* FIXME: bio_set_dir() */
895 				bio_set_op_attrs(bio, REQ_OP_WRITE, 0);
896 			}
897 
898 			osd_req_write(or, _ios_obj(ios, cur_comp),
899 				      per_dev->offset, bio, per_dev->length);
900 			ORE_DBGMSG("write(0x%llx) offset=0x%llx "
901 				      "length=0x%llx dev=%d\n",
902 				     _LLU(_ios_obj(ios, cur_comp)->id),
903 				     _LLU(per_dev->offset),
904 				     _LLU(per_dev->length), dev);
905 		} else if (ios->kern_buff) {
906 			per_dev->offset = ios->si.obj_offset;
907 			per_dev->dev = ios->si.dev + dev;
908 
909 			/* no cross device without page array */
910 			BUG_ON((ios->layout->group_width > 1) &&
911 			       (ios->si.unit_off + ios->length >
912 				ios->layout->stripe_unit));
913 
914 			ret = osd_req_write_kern(or, _ios_obj(ios, cur_comp),
915 						 per_dev->offset,
916 						 ios->kern_buff, ios->length);
917 			if (unlikely(ret))
918 				goto out;
919 			ORE_DBGMSG2("write_kern(0x%llx) offset=0x%llx "
920 				      "length=0x%llx dev=%d\n",
921 				     _LLU(_ios_obj(ios, cur_comp)->id),
922 				     _LLU(per_dev->offset),
923 				     _LLU(ios->length), per_dev->dev);
924 		} else {
925 			osd_req_set_attributes(or, _ios_obj(ios, cur_comp));
926 			ORE_DBGMSG2("obj(0x%llx) set_attributes=%d dev=%d\n",
927 				     _LLU(_ios_obj(ios, cur_comp)->id),
928 				     ios->out_attr_len, dev);
929 		}
930 
931 		if (ios->out_attr)
932 			osd_req_add_set_attr_list(or, ios->out_attr,
933 						  ios->out_attr_len);
934 
935 		if (ios->in_attr)
936 			osd_req_add_get_attr_list(or, ios->in_attr,
937 						  ios->in_attr_len);
938 	}
939 
940 out:
941 	return ret;
942 }
943 
ore_write(struct ore_io_state * ios)944 int ore_write(struct ore_io_state *ios)
945 {
946 	int i;
947 	int ret;
948 
949 	if (unlikely(ios->sp2d && !ios->r4w)) {
950 		/* A library is attempting a RAID-write without providing
951 		 * a pages lock interface.
952 		 */
953 		WARN_ON_ONCE(1);
954 		return -ENOTSUPP;
955 	}
956 
957 	ret = _prepare_for_striping(ios);
958 	if (unlikely(ret))
959 		return ret;
960 
961 	for (i = 0; i < ios->numdevs; i += ios->layout->mirrors_p1) {
962 		ret = _write_mirror(ios, i);
963 		if (unlikely(ret))
964 			return ret;
965 	}
966 
967 	ret = ore_io_execute(ios);
968 	return ret;
969 }
970 EXPORT_SYMBOL(ore_write);
971 
_ore_read_mirror(struct ore_io_state * ios,unsigned cur_comp)972 int _ore_read_mirror(struct ore_io_state *ios, unsigned cur_comp)
973 {
974 	struct osd_request *or;
975 	struct ore_per_dev_state *per_dev = &ios->per_dev[cur_comp];
976 	struct osd_obj_id *obj = _ios_obj(ios, cur_comp);
977 	unsigned first_dev = (unsigned)obj->id;
978 
979 	if (ios->pages && !per_dev->length)
980 		return 0; /* Just an empty slot */
981 
982 	first_dev = per_dev->dev + first_dev % ios->layout->mirrors_p1;
983 	or = osd_start_request(_ios_od(ios, first_dev));
984 	if (unlikely(!or)) {
985 		ORE_ERR("%s: osd_start_request failed\n", __func__);
986 		return -ENOMEM;
987 	}
988 	per_dev->or = or;
989 
990 	if (ios->pages) {
991 		if (per_dev->cur_sg) {
992 			/* finalize the last sg_entry */
993 			_ore_add_sg_seg(per_dev, 0, false);
994 			if (unlikely(!per_dev->cur_sg))
995 				return 0; /* Skip parity only device */
996 
997 			osd_req_read_sg(or, obj, per_dev->bio,
998 					per_dev->sglist, per_dev->cur_sg);
999 		} else {
1000 			/* The no raid case */
1001 			osd_req_read(or, obj, per_dev->offset,
1002 				     per_dev->bio, per_dev->length);
1003 		}
1004 
1005 		ORE_DBGMSG("read(0x%llx) offset=0x%llx length=0x%llx"
1006 			     " dev=%d sg_len=%d\n", _LLU(obj->id),
1007 			     _LLU(per_dev->offset), _LLU(per_dev->length),
1008 			     first_dev, per_dev->cur_sg);
1009 	} else {
1010 		BUG_ON(ios->kern_buff);
1011 
1012 		osd_req_get_attributes(or, obj);
1013 		ORE_DBGMSG2("obj(0x%llx) get_attributes=%d dev=%d\n",
1014 			      _LLU(obj->id),
1015 			      ios->in_attr_len, first_dev);
1016 	}
1017 	if (ios->out_attr)
1018 		osd_req_add_set_attr_list(or, ios->out_attr, ios->out_attr_len);
1019 
1020 	if (ios->in_attr)
1021 		osd_req_add_get_attr_list(or, ios->in_attr, ios->in_attr_len);
1022 
1023 	return 0;
1024 }
1025 
ore_read(struct ore_io_state * ios)1026 int ore_read(struct ore_io_state *ios)
1027 {
1028 	int i;
1029 	int ret;
1030 
1031 	ret = _prepare_for_striping(ios);
1032 	if (unlikely(ret))
1033 		return ret;
1034 
1035 	for (i = 0; i < ios->numdevs; i += ios->layout->mirrors_p1) {
1036 		ret = _ore_read_mirror(ios, i);
1037 		if (unlikely(ret))
1038 			return ret;
1039 	}
1040 
1041 	ret = ore_io_execute(ios);
1042 	return ret;
1043 }
1044 EXPORT_SYMBOL(ore_read);
1045 
extract_attr_from_ios(struct ore_io_state * ios,struct osd_attr * attr)1046 int extract_attr_from_ios(struct ore_io_state *ios, struct osd_attr *attr)
1047 {
1048 	struct osd_attr cur_attr = {.attr_page = 0}; /* start with zeros */
1049 	void *iter = NULL;
1050 	int nelem;
1051 
1052 	do {
1053 		nelem = 1;
1054 		osd_req_decode_get_attr_list(ios->per_dev[0].or,
1055 					     &cur_attr, &nelem, &iter);
1056 		if ((cur_attr.attr_page == attr->attr_page) &&
1057 		    (cur_attr.attr_id == attr->attr_id)) {
1058 			attr->len = cur_attr.len;
1059 			attr->val_ptr = cur_attr.val_ptr;
1060 			return 0;
1061 		}
1062 	} while (iter);
1063 
1064 	return -EIO;
1065 }
1066 EXPORT_SYMBOL(extract_attr_from_ios);
1067 
_truncate_mirrors(struct ore_io_state * ios,unsigned cur_comp,struct osd_attr * attr)1068 static int _truncate_mirrors(struct ore_io_state *ios, unsigned cur_comp,
1069 			     struct osd_attr *attr)
1070 {
1071 	int last_comp = cur_comp + ios->layout->mirrors_p1;
1072 
1073 	for (; cur_comp < last_comp; ++cur_comp) {
1074 		struct ore_per_dev_state *per_dev = &ios->per_dev[cur_comp];
1075 		struct osd_request *or;
1076 
1077 		or = osd_start_request(_ios_od(ios, cur_comp));
1078 		if (unlikely(!or)) {
1079 			ORE_ERR("%s: osd_start_request failed\n", __func__);
1080 			return -ENOMEM;
1081 		}
1082 		per_dev->or = or;
1083 
1084 		osd_req_set_attributes(or, _ios_obj(ios, cur_comp));
1085 		osd_req_add_set_attr_list(or, attr, 1);
1086 	}
1087 
1088 	return 0;
1089 }
1090 
1091 struct _trunc_info {
1092 	struct ore_striping_info si;
1093 	u64 prev_group_obj_off;
1094 	u64 next_group_obj_off;
1095 
1096 	unsigned first_group_dev;
1097 	unsigned nex_group_dev;
1098 };
1099 
_calc_trunk_info(struct ore_layout * layout,u64 file_offset,struct _trunc_info * ti)1100 static void _calc_trunk_info(struct ore_layout *layout, u64 file_offset,
1101 			     struct _trunc_info *ti)
1102 {
1103 	unsigned stripe_unit = layout->stripe_unit;
1104 
1105 	ore_calc_stripe_info(layout, file_offset, 0, &ti->si);
1106 
1107 	ti->prev_group_obj_off = ti->si.M * stripe_unit;
1108 	ti->next_group_obj_off = ti->si.M ? (ti->si.M - 1) * stripe_unit : 0;
1109 
1110 	ti->first_group_dev = ti->si.dev - (ti->si.dev % layout->group_width);
1111 	ti->nex_group_dev = ti->first_group_dev + layout->group_width;
1112 }
1113 
ore_truncate(struct ore_layout * layout,struct ore_components * oc,u64 size)1114 int ore_truncate(struct ore_layout *layout, struct ore_components *oc,
1115 		   u64 size)
1116 {
1117 	struct ore_io_state *ios;
1118 	struct exofs_trunc_attr {
1119 		struct osd_attr attr;
1120 		__be64 newsize;
1121 	} *size_attrs;
1122 	struct _trunc_info ti;
1123 	int i, ret;
1124 
1125 	ret = ore_get_io_state(layout, oc, &ios);
1126 	if (unlikely(ret))
1127 		return ret;
1128 
1129 	_calc_trunk_info(ios->layout, size, &ti);
1130 
1131 	size_attrs = kcalloc(ios->oc->numdevs, sizeof(*size_attrs),
1132 			     GFP_KERNEL);
1133 	if (unlikely(!size_attrs)) {
1134 		ret = -ENOMEM;
1135 		goto out;
1136 	}
1137 
1138 	ios->numdevs = ios->oc->numdevs;
1139 
1140 	for (i = 0; i < ios->numdevs; ++i) {
1141 		struct exofs_trunc_attr *size_attr = &size_attrs[i];
1142 		u64 obj_size;
1143 
1144 		if (i < ti.first_group_dev)
1145 			obj_size = ti.prev_group_obj_off;
1146 		else if (i >= ti.nex_group_dev)
1147 			obj_size = ti.next_group_obj_off;
1148 		else if (i < ti.si.dev) /* dev within this group */
1149 			obj_size = ti.si.obj_offset +
1150 				      ios->layout->stripe_unit - ti.si.unit_off;
1151 		else if (i == ti.si.dev)
1152 			obj_size = ti.si.obj_offset;
1153 		else /* i > ti.dev */
1154 			obj_size = ti.si.obj_offset - ti.si.unit_off;
1155 
1156 		size_attr->newsize = cpu_to_be64(obj_size);
1157 		size_attr->attr = g_attr_logical_length;
1158 		size_attr->attr.val_ptr = &size_attr->newsize;
1159 
1160 		ORE_DBGMSG2("trunc(0x%llx) obj_offset=0x%llx dev=%d\n",
1161 			     _LLU(oc->comps->obj.id), _LLU(obj_size), i);
1162 		ret = _truncate_mirrors(ios, i * ios->layout->mirrors_p1,
1163 					&size_attr->attr);
1164 		if (unlikely(ret))
1165 			goto out;
1166 	}
1167 	ret = ore_io_execute(ios);
1168 
1169 out:
1170 	kfree(size_attrs);
1171 	ore_put_io_state(ios);
1172 	return ret;
1173 }
1174 EXPORT_SYMBOL(ore_truncate);
1175 
1176 const struct osd_attr g_attr_logical_length = ATTR_DEF(
1177 	OSD_APAGE_OBJECT_INFORMATION, OSD_ATTR_OI_LOGICAL_LENGTH, 8);
1178 EXPORT_SYMBOL(g_attr_logical_length);
1179