1 /*
2  * Copyright (C) 2016 CNEX Labs
3  * Initial release: Javier Gonzalez <javier@cnexlabs.com>
4  *                  Matias Bjorling <matias@cnexlabs.com>
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License version
8  * 2 as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License for more details.
14  *
15  * pblk-gc.c - pblk's garbage collector
16  */
17 
18 #include "pblk.h"
19 #include <linux/delay.h>
20 
pblk_gc_free_gc_rq(struct pblk_gc_rq * gc_rq)21 static void pblk_gc_free_gc_rq(struct pblk_gc_rq *gc_rq)
22 {
23 	if (gc_rq->data)
24 		vfree(gc_rq->data);
25 	kfree(gc_rq);
26 }
27 
pblk_gc_write(struct pblk * pblk)28 static int pblk_gc_write(struct pblk *pblk)
29 {
30 	struct pblk_gc *gc = &pblk->gc;
31 	struct pblk_gc_rq *gc_rq, *tgc_rq;
32 	LIST_HEAD(w_list);
33 
34 	spin_lock(&gc->w_lock);
35 	if (list_empty(&gc->w_list)) {
36 		spin_unlock(&gc->w_lock);
37 		return 1;
38 	}
39 
40 	list_cut_position(&w_list, &gc->w_list, gc->w_list.prev);
41 	gc->w_entries = 0;
42 	spin_unlock(&gc->w_lock);
43 
44 	list_for_each_entry_safe(gc_rq, tgc_rq, &w_list, list) {
45 		pblk_write_gc_to_cache(pblk, gc_rq);
46 		list_del(&gc_rq->list);
47 		kref_put(&gc_rq->line->ref, pblk_line_put);
48 		pblk_gc_free_gc_rq(gc_rq);
49 	}
50 
51 	return 0;
52 }
53 
pblk_gc_writer_kick(struct pblk_gc * gc)54 static void pblk_gc_writer_kick(struct pblk_gc *gc)
55 {
56 	wake_up_process(gc->gc_writer_ts);
57 }
58 
pblk_put_line_back(struct pblk * pblk,struct pblk_line * line)59 static void pblk_put_line_back(struct pblk *pblk, struct pblk_line *line)
60 {
61 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
62 	struct list_head *move_list;
63 
64 	spin_lock(&line->lock);
65 	WARN_ON(line->state != PBLK_LINESTATE_GC);
66 	line->state = PBLK_LINESTATE_CLOSED;
67 	move_list = pblk_line_gc_list(pblk, line);
68 	spin_unlock(&line->lock);
69 
70 	if (move_list) {
71 		spin_lock(&l_mg->gc_lock);
72 		list_add_tail(&line->list, move_list);
73 		spin_unlock(&l_mg->gc_lock);
74 	}
75 }
76 
pblk_gc_line_ws(struct work_struct * work)77 static void pblk_gc_line_ws(struct work_struct *work)
78 {
79 	struct pblk_line_ws *gc_rq_ws = container_of(work,
80 						struct pblk_line_ws, ws);
81 	struct pblk *pblk = gc_rq_ws->pblk;
82 	struct nvm_tgt_dev *dev = pblk->dev;
83 	struct nvm_geo *geo = &dev->geo;
84 	struct pblk_gc *gc = &pblk->gc;
85 	struct pblk_line *line = gc_rq_ws->line;
86 	struct pblk_gc_rq *gc_rq = gc_rq_ws->priv;
87 	int ret;
88 
89 	up(&gc->gc_sem);
90 
91 	gc_rq->data = vmalloc(array_size(gc_rq->nr_secs, geo->csecs));
92 	if (!gc_rq->data) {
93 		pblk_err(pblk, "could not GC line:%d (%d/%d)\n",
94 					line->id, *line->vsc, gc_rq->nr_secs);
95 		goto out;
96 	}
97 
98 	/* Read from GC victim block */
99 	ret = pblk_submit_read_gc(pblk, gc_rq);
100 	if (ret) {
101 		pblk_err(pblk, "failed GC read in line:%d (err:%d)\n",
102 								line->id, ret);
103 		goto out;
104 	}
105 
106 	if (!gc_rq->secs_to_gc)
107 		goto out;
108 
109 retry:
110 	spin_lock(&gc->w_lock);
111 	if (gc->w_entries >= PBLK_GC_RQ_QD) {
112 		spin_unlock(&gc->w_lock);
113 		pblk_gc_writer_kick(&pblk->gc);
114 		usleep_range(128, 256);
115 		goto retry;
116 	}
117 	gc->w_entries++;
118 	list_add_tail(&gc_rq->list, &gc->w_list);
119 	spin_unlock(&gc->w_lock);
120 
121 	pblk_gc_writer_kick(&pblk->gc);
122 
123 	kfree(gc_rq_ws);
124 	return;
125 
126 out:
127 	pblk_gc_free_gc_rq(gc_rq);
128 	kref_put(&line->ref, pblk_line_put);
129 	kfree(gc_rq_ws);
130 }
131 
get_lba_list_from_emeta(struct pblk * pblk,struct pblk_line * line)132 static __le64 *get_lba_list_from_emeta(struct pblk *pblk,
133 				       struct pblk_line *line)
134 {
135 	struct line_emeta *emeta_buf;
136 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
137 	struct pblk_line_meta *lm = &pblk->lm;
138 	unsigned int lba_list_size = lm->emeta_len[2];
139 	__le64 *lba_list;
140 	int ret;
141 
142 	emeta_buf = pblk_malloc(lm->emeta_len[0],
143 				l_mg->emeta_alloc_type, GFP_KERNEL);
144 	if (!emeta_buf)
145 		return NULL;
146 
147 	ret = pblk_line_read_emeta(pblk, line, emeta_buf);
148 	if (ret) {
149 		pblk_err(pblk, "line %d read emeta failed (%d)\n",
150 				line->id, ret);
151 		pblk_mfree(emeta_buf, l_mg->emeta_alloc_type);
152 		return NULL;
153 	}
154 
155 	/* If this read fails, it means that emeta is corrupted.
156 	 * For now, leave the line untouched.
157 	 * TODO: Implement a recovery routine that scans and moves
158 	 * all sectors on the line.
159 	 */
160 
161 	ret = pblk_recov_check_emeta(pblk, emeta_buf);
162 	if (ret) {
163 		pblk_err(pblk, "inconsistent emeta (line %d)\n",
164 				line->id);
165 		pblk_mfree(emeta_buf, l_mg->emeta_alloc_type);
166 		return NULL;
167 	}
168 
169 	lba_list = pblk_malloc(lba_list_size,
170 			       l_mg->emeta_alloc_type, GFP_KERNEL);
171 	if (lba_list)
172 		memcpy(lba_list, emeta_to_lbas(pblk, emeta_buf), lba_list_size);
173 
174 	pblk_mfree(emeta_buf, l_mg->emeta_alloc_type);
175 
176 	return lba_list;
177 }
178 
pblk_gc_line_prepare_ws(struct work_struct * work)179 static void pblk_gc_line_prepare_ws(struct work_struct *work)
180 {
181 	struct pblk_line_ws *line_ws = container_of(work, struct pblk_line_ws,
182 									ws);
183 	struct pblk *pblk = line_ws->pblk;
184 	struct pblk_line *line = line_ws->line;
185 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
186 	struct pblk_line_meta *lm = &pblk->lm;
187 	struct pblk_gc *gc = &pblk->gc;
188 	struct pblk_line_ws *gc_rq_ws;
189 	struct pblk_gc_rq *gc_rq;
190 	__le64 *lba_list;
191 	unsigned long *invalid_bitmap;
192 	int sec_left, nr_secs, bit;
193 
194 	invalid_bitmap = kmalloc(lm->sec_bitmap_len, GFP_KERNEL);
195 	if (!invalid_bitmap)
196 		goto fail_free_ws;
197 
198 	if (line->w_err_gc->has_write_err) {
199 		lba_list = line->w_err_gc->lba_list;
200 		line->w_err_gc->lba_list = NULL;
201 	} else {
202 		lba_list = get_lba_list_from_emeta(pblk, line);
203 		if (!lba_list) {
204 			pblk_err(pblk, "could not interpret emeta (line %d)\n",
205 					line->id);
206 			goto fail_free_invalid_bitmap;
207 		}
208 	}
209 
210 	spin_lock(&line->lock);
211 	bitmap_copy(invalid_bitmap, line->invalid_bitmap, lm->sec_per_line);
212 	sec_left = pblk_line_vsc(line);
213 	spin_unlock(&line->lock);
214 
215 	if (sec_left < 0) {
216 		pblk_err(pblk, "corrupted GC line (%d)\n", line->id);
217 		goto fail_free_lba_list;
218 	}
219 
220 	bit = -1;
221 next_rq:
222 	gc_rq = kmalloc(sizeof(struct pblk_gc_rq), GFP_KERNEL);
223 	if (!gc_rq)
224 		goto fail_free_lba_list;
225 
226 	nr_secs = 0;
227 	do {
228 		bit = find_next_zero_bit(invalid_bitmap, lm->sec_per_line,
229 								bit + 1);
230 		if (bit > line->emeta_ssec)
231 			break;
232 
233 		gc_rq->paddr_list[nr_secs] = bit;
234 		gc_rq->lba_list[nr_secs++] = le64_to_cpu(lba_list[bit]);
235 	} while (nr_secs < pblk->max_write_pgs);
236 
237 	if (unlikely(!nr_secs)) {
238 		kfree(gc_rq);
239 		goto out;
240 	}
241 
242 	gc_rq->nr_secs = nr_secs;
243 	gc_rq->line = line;
244 
245 	gc_rq_ws = kmalloc(sizeof(struct pblk_line_ws), GFP_KERNEL);
246 	if (!gc_rq_ws)
247 		goto fail_free_gc_rq;
248 
249 	gc_rq_ws->pblk = pblk;
250 	gc_rq_ws->line = line;
251 	gc_rq_ws->priv = gc_rq;
252 
253 	/* The write GC path can be much slower than the read GC one due to
254 	 * the budget imposed by the rate-limiter. Balance in case that we get
255 	 * back pressure from the write GC path.
256 	 */
257 	while (down_timeout(&gc->gc_sem, msecs_to_jiffies(30000)))
258 		io_schedule();
259 
260 	kref_get(&line->ref);
261 
262 	INIT_WORK(&gc_rq_ws->ws, pblk_gc_line_ws);
263 	queue_work(gc->gc_line_reader_wq, &gc_rq_ws->ws);
264 
265 	sec_left -= nr_secs;
266 	if (sec_left > 0)
267 		goto next_rq;
268 
269 out:
270 	pblk_mfree(lba_list, l_mg->emeta_alloc_type);
271 	kfree(line_ws);
272 	kfree(invalid_bitmap);
273 
274 	kref_put(&line->ref, pblk_line_put);
275 	atomic_dec(&gc->read_inflight_gc);
276 
277 	return;
278 
279 fail_free_gc_rq:
280 	kfree(gc_rq);
281 fail_free_lba_list:
282 	pblk_mfree(lba_list, l_mg->emeta_alloc_type);
283 fail_free_invalid_bitmap:
284 	kfree(invalid_bitmap);
285 fail_free_ws:
286 	kfree(line_ws);
287 
288 	pblk_put_line_back(pblk, line);
289 	kref_put(&line->ref, pblk_line_put);
290 	atomic_dec(&gc->read_inflight_gc);
291 
292 	pblk_err(pblk, "failed to GC line %d\n", line->id);
293 }
294 
pblk_gc_line(struct pblk * pblk,struct pblk_line * line)295 static int pblk_gc_line(struct pblk *pblk, struct pblk_line *line)
296 {
297 	struct pblk_gc *gc = &pblk->gc;
298 	struct pblk_line_ws *line_ws;
299 
300 	pblk_debug(pblk, "line '%d' being reclaimed for GC\n", line->id);
301 
302 	line_ws = kmalloc(sizeof(struct pblk_line_ws), GFP_KERNEL);
303 	if (!line_ws)
304 		return -ENOMEM;
305 
306 	line_ws->pblk = pblk;
307 	line_ws->line = line;
308 
309 	atomic_inc(&gc->pipeline_gc);
310 	INIT_WORK(&line_ws->ws, pblk_gc_line_prepare_ws);
311 	queue_work(gc->gc_reader_wq, &line_ws->ws);
312 
313 	return 0;
314 }
315 
pblk_gc_reader_kick(struct pblk_gc * gc)316 static void pblk_gc_reader_kick(struct pblk_gc *gc)
317 {
318 	wake_up_process(gc->gc_reader_ts);
319 }
320 
pblk_gc_kick(struct pblk * pblk)321 static void pblk_gc_kick(struct pblk *pblk)
322 {
323 	struct pblk_gc *gc = &pblk->gc;
324 
325 	pblk_gc_writer_kick(gc);
326 	pblk_gc_reader_kick(gc);
327 
328 	/* If we're shutting down GC, let's not start it up again */
329 	if (gc->gc_enabled) {
330 		wake_up_process(gc->gc_ts);
331 		mod_timer(&gc->gc_timer,
332 			  jiffies + msecs_to_jiffies(GC_TIME_MSECS));
333 	}
334 }
335 
pblk_gc_read(struct pblk * pblk)336 static int pblk_gc_read(struct pblk *pblk)
337 {
338 	struct pblk_gc *gc = &pblk->gc;
339 	struct pblk_line *line;
340 
341 	spin_lock(&gc->r_lock);
342 	if (list_empty(&gc->r_list)) {
343 		spin_unlock(&gc->r_lock);
344 		return 1;
345 	}
346 
347 	line = list_first_entry(&gc->r_list, struct pblk_line, list);
348 	list_del(&line->list);
349 	spin_unlock(&gc->r_lock);
350 
351 	pblk_gc_kick(pblk);
352 
353 	if (pblk_gc_line(pblk, line))
354 		pblk_err(pblk, "failed to GC line %d\n", line->id);
355 
356 	return 0;
357 }
358 
pblk_gc_get_victim_line(struct pblk * pblk,struct list_head * group_list)359 static struct pblk_line *pblk_gc_get_victim_line(struct pblk *pblk,
360 						 struct list_head *group_list)
361 {
362 	struct pblk_line *line, *victim;
363 	int line_vsc, victim_vsc;
364 
365 	victim = list_first_entry(group_list, struct pblk_line, list);
366 	list_for_each_entry(line, group_list, list) {
367 		line_vsc = le32_to_cpu(*line->vsc);
368 		victim_vsc = le32_to_cpu(*victim->vsc);
369 		if (line_vsc < victim_vsc)
370 			victim = line;
371 	}
372 
373 	return victim;
374 }
375 
pblk_gc_should_run(struct pblk_gc * gc,struct pblk_rl * rl)376 static bool pblk_gc_should_run(struct pblk_gc *gc, struct pblk_rl *rl)
377 {
378 	unsigned int nr_blocks_free, nr_blocks_need;
379 	unsigned int werr_lines = atomic_read(&rl->werr_lines);
380 
381 	nr_blocks_need = pblk_rl_high_thrs(rl);
382 	nr_blocks_free = pblk_rl_nr_free_blks(rl);
383 
384 	/* This is not critical, no need to take lock here */
385 	return ((werr_lines > 0) ||
386 		((gc->gc_active) && (nr_blocks_need > nr_blocks_free)));
387 }
388 
pblk_gc_free_full_lines(struct pblk * pblk)389 void pblk_gc_free_full_lines(struct pblk *pblk)
390 {
391 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
392 	struct pblk_gc *gc = &pblk->gc;
393 	struct pblk_line *line;
394 
395 	do {
396 		spin_lock(&l_mg->gc_lock);
397 		if (list_empty(&l_mg->gc_full_list)) {
398 			spin_unlock(&l_mg->gc_lock);
399 			return;
400 		}
401 
402 		line = list_first_entry(&l_mg->gc_full_list,
403 							struct pblk_line, list);
404 
405 		spin_lock(&line->lock);
406 		WARN_ON(line->state != PBLK_LINESTATE_CLOSED);
407 		line->state = PBLK_LINESTATE_GC;
408 		spin_unlock(&line->lock);
409 
410 		list_del(&line->list);
411 		spin_unlock(&l_mg->gc_lock);
412 
413 		atomic_inc(&gc->pipeline_gc);
414 		kref_put(&line->ref, pblk_line_put);
415 	} while (1);
416 }
417 
418 /*
419  * Lines with no valid sectors will be returned to the free list immediately. If
420  * GC is activated - either because the free block count is under the determined
421  * threshold, or because it is being forced from user space - only lines with a
422  * high count of invalid sectors will be recycled.
423  */
pblk_gc_run(struct pblk * pblk)424 static void pblk_gc_run(struct pblk *pblk)
425 {
426 	struct pblk_line_mgmt *l_mg = &pblk->l_mg;
427 	struct pblk_gc *gc = &pblk->gc;
428 	struct pblk_line *line;
429 	struct list_head *group_list;
430 	bool run_gc;
431 	int read_inflight_gc, gc_group = 0, prev_group = 0;
432 
433 	pblk_gc_free_full_lines(pblk);
434 
435 	run_gc = pblk_gc_should_run(&pblk->gc, &pblk->rl);
436 	if (!run_gc || (atomic_read(&gc->read_inflight_gc) >= PBLK_GC_L_QD))
437 		return;
438 
439 next_gc_group:
440 	group_list = l_mg->gc_lists[gc_group++];
441 
442 	do {
443 		spin_lock(&l_mg->gc_lock);
444 		if (list_empty(group_list)) {
445 			spin_unlock(&l_mg->gc_lock);
446 			break;
447 		}
448 
449 		line = pblk_gc_get_victim_line(pblk, group_list);
450 
451 		spin_lock(&line->lock);
452 		WARN_ON(line->state != PBLK_LINESTATE_CLOSED);
453 		line->state = PBLK_LINESTATE_GC;
454 		spin_unlock(&line->lock);
455 
456 		list_del(&line->list);
457 		spin_unlock(&l_mg->gc_lock);
458 
459 		spin_lock(&gc->r_lock);
460 		list_add_tail(&line->list, &gc->r_list);
461 		spin_unlock(&gc->r_lock);
462 
463 		read_inflight_gc = atomic_inc_return(&gc->read_inflight_gc);
464 		pblk_gc_reader_kick(gc);
465 
466 		prev_group = 1;
467 
468 		/* No need to queue up more GC lines than we can handle */
469 		run_gc = pblk_gc_should_run(&pblk->gc, &pblk->rl);
470 		if (!run_gc || read_inflight_gc >= PBLK_GC_L_QD)
471 			break;
472 	} while (1);
473 
474 	if (!prev_group && pblk->rl.rb_state > gc_group &&
475 						gc_group < PBLK_GC_NR_LISTS)
476 		goto next_gc_group;
477 }
478 
pblk_gc_timer(struct timer_list * t)479 static void pblk_gc_timer(struct timer_list *t)
480 {
481 	struct pblk *pblk = from_timer(pblk, t, gc.gc_timer);
482 
483 	pblk_gc_kick(pblk);
484 }
485 
pblk_gc_ts(void * data)486 static int pblk_gc_ts(void *data)
487 {
488 	struct pblk *pblk = data;
489 
490 	while (!kthread_should_stop()) {
491 		pblk_gc_run(pblk);
492 		set_current_state(TASK_INTERRUPTIBLE);
493 		io_schedule();
494 	}
495 
496 	return 0;
497 }
498 
pblk_gc_writer_ts(void * data)499 static int pblk_gc_writer_ts(void *data)
500 {
501 	struct pblk *pblk = data;
502 
503 	while (!kthread_should_stop()) {
504 		if (!pblk_gc_write(pblk))
505 			continue;
506 		set_current_state(TASK_INTERRUPTIBLE);
507 		io_schedule();
508 	}
509 
510 	return 0;
511 }
512 
pblk_gc_reader_ts(void * data)513 static int pblk_gc_reader_ts(void *data)
514 {
515 	struct pblk *pblk = data;
516 	struct pblk_gc *gc = &pblk->gc;
517 
518 	while (!kthread_should_stop()) {
519 		if (!pblk_gc_read(pblk))
520 			continue;
521 		set_current_state(TASK_INTERRUPTIBLE);
522 		io_schedule();
523 	}
524 
525 #ifdef CONFIG_NVM_PBLK_DEBUG
526 	pblk_info(pblk, "flushing gc pipeline, %d lines left\n",
527 		atomic_read(&gc->pipeline_gc));
528 #endif
529 
530 	do {
531 		if (!atomic_read(&gc->pipeline_gc))
532 			break;
533 
534 		schedule();
535 	} while (1);
536 
537 	return 0;
538 }
539 
pblk_gc_start(struct pblk * pblk)540 static void pblk_gc_start(struct pblk *pblk)
541 {
542 	pblk->gc.gc_active = 1;
543 	pblk_debug(pblk, "gc start\n");
544 }
545 
pblk_gc_should_start(struct pblk * pblk)546 void pblk_gc_should_start(struct pblk *pblk)
547 {
548 	struct pblk_gc *gc = &pblk->gc;
549 
550 	if (gc->gc_enabled && !gc->gc_active) {
551 		pblk_gc_start(pblk);
552 		pblk_gc_kick(pblk);
553 	}
554 }
555 
pblk_gc_should_stop(struct pblk * pblk)556 void pblk_gc_should_stop(struct pblk *pblk)
557 {
558 	struct pblk_gc *gc = &pblk->gc;
559 
560 	if (gc->gc_active && !gc->gc_forced)
561 		gc->gc_active = 0;
562 }
563 
pblk_gc_should_kick(struct pblk * pblk)564 void pblk_gc_should_kick(struct pblk *pblk)
565 {
566 	pblk_rl_update_rates(&pblk->rl);
567 }
568 
pblk_gc_sysfs_state_show(struct pblk * pblk,int * gc_enabled,int * gc_active)569 void pblk_gc_sysfs_state_show(struct pblk *pblk, int *gc_enabled,
570 			      int *gc_active)
571 {
572 	struct pblk_gc *gc = &pblk->gc;
573 
574 	spin_lock(&gc->lock);
575 	*gc_enabled = gc->gc_enabled;
576 	*gc_active = gc->gc_active;
577 	spin_unlock(&gc->lock);
578 }
579 
pblk_gc_sysfs_force(struct pblk * pblk,int force)580 int pblk_gc_sysfs_force(struct pblk *pblk, int force)
581 {
582 	struct pblk_gc *gc = &pblk->gc;
583 
584 	if (force < 0 || force > 1)
585 		return -EINVAL;
586 
587 	spin_lock(&gc->lock);
588 	gc->gc_forced = force;
589 
590 	if (force)
591 		gc->gc_enabled = 1;
592 	else
593 		gc->gc_enabled = 0;
594 	spin_unlock(&gc->lock);
595 
596 	pblk_gc_should_start(pblk);
597 
598 	return 0;
599 }
600 
pblk_gc_init(struct pblk * pblk)601 int pblk_gc_init(struct pblk *pblk)
602 {
603 	struct pblk_gc *gc = &pblk->gc;
604 	int ret;
605 
606 	gc->gc_ts = kthread_create(pblk_gc_ts, pblk, "pblk-gc-ts");
607 	if (IS_ERR(gc->gc_ts)) {
608 		pblk_err(pblk, "could not allocate GC main kthread\n");
609 		return PTR_ERR(gc->gc_ts);
610 	}
611 
612 	gc->gc_writer_ts = kthread_create(pblk_gc_writer_ts, pblk,
613 							"pblk-gc-writer-ts");
614 	if (IS_ERR(gc->gc_writer_ts)) {
615 		pblk_err(pblk, "could not allocate GC writer kthread\n");
616 		ret = PTR_ERR(gc->gc_writer_ts);
617 		goto fail_free_main_kthread;
618 	}
619 
620 	gc->gc_reader_ts = kthread_create(pblk_gc_reader_ts, pblk,
621 							"pblk-gc-reader-ts");
622 	if (IS_ERR(gc->gc_reader_ts)) {
623 		pblk_err(pblk, "could not allocate GC reader kthread\n");
624 		ret = PTR_ERR(gc->gc_reader_ts);
625 		goto fail_free_writer_kthread;
626 	}
627 
628 	timer_setup(&gc->gc_timer, pblk_gc_timer, 0);
629 	mod_timer(&gc->gc_timer, jiffies + msecs_to_jiffies(GC_TIME_MSECS));
630 
631 	gc->gc_active = 0;
632 	gc->gc_forced = 0;
633 	gc->gc_enabled = 1;
634 	gc->w_entries = 0;
635 	atomic_set(&gc->read_inflight_gc, 0);
636 	atomic_set(&gc->pipeline_gc, 0);
637 
638 	/* Workqueue that reads valid sectors from a line and submit them to the
639 	 * GC writer to be recycled.
640 	 */
641 	gc->gc_line_reader_wq = alloc_workqueue("pblk-gc-line-reader-wq",
642 			WQ_MEM_RECLAIM | WQ_UNBOUND, PBLK_GC_MAX_READERS);
643 	if (!gc->gc_line_reader_wq) {
644 		pblk_err(pblk, "could not allocate GC line reader workqueue\n");
645 		ret = -ENOMEM;
646 		goto fail_free_reader_kthread;
647 	}
648 
649 	/* Workqueue that prepare lines for GC */
650 	gc->gc_reader_wq = alloc_workqueue("pblk-gc-line_wq",
651 					WQ_MEM_RECLAIM | WQ_UNBOUND, 1);
652 	if (!gc->gc_reader_wq) {
653 		pblk_err(pblk, "could not allocate GC reader workqueue\n");
654 		ret = -ENOMEM;
655 		goto fail_free_reader_line_wq;
656 	}
657 
658 	spin_lock_init(&gc->lock);
659 	spin_lock_init(&gc->w_lock);
660 	spin_lock_init(&gc->r_lock);
661 
662 	sema_init(&gc->gc_sem, PBLK_GC_RQ_QD);
663 
664 	INIT_LIST_HEAD(&gc->w_list);
665 	INIT_LIST_HEAD(&gc->r_list);
666 
667 	return 0;
668 
669 fail_free_reader_line_wq:
670 	destroy_workqueue(gc->gc_line_reader_wq);
671 fail_free_reader_kthread:
672 	kthread_stop(gc->gc_reader_ts);
673 fail_free_writer_kthread:
674 	kthread_stop(gc->gc_writer_ts);
675 fail_free_main_kthread:
676 	kthread_stop(gc->gc_ts);
677 
678 	return ret;
679 }
680 
pblk_gc_exit(struct pblk * pblk,bool graceful)681 void pblk_gc_exit(struct pblk *pblk, bool graceful)
682 {
683 	struct pblk_gc *gc = &pblk->gc;
684 
685 	gc->gc_enabled = 0;
686 	del_timer_sync(&gc->gc_timer);
687 	gc->gc_active = 0;
688 
689 	if (gc->gc_ts)
690 		kthread_stop(gc->gc_ts);
691 
692 	if (gc->gc_reader_ts)
693 		kthread_stop(gc->gc_reader_ts);
694 
695 	if (graceful) {
696 		flush_workqueue(gc->gc_reader_wq);
697 		flush_workqueue(gc->gc_line_reader_wq);
698 	}
699 
700 	destroy_workqueue(gc->gc_reader_wq);
701 	destroy_workqueue(gc->gc_line_reader_wq);
702 
703 	if (gc->gc_writer_ts)
704 		kthread_stop(gc->gc_writer_ts);
705 }
706