Lines Matching refs:dlm

55 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node);
58 static int dlm_do_recovery(struct dlm_ctxt *dlm);
60 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
61 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node);
62 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node);
63 static int dlm_request_all_locks(struct dlm_ctxt *dlm,
65 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm);
72 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,
77 static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
80 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
81 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
83 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node);
84 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,
86 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,
94 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
117 static inline void dlm_set_reco_dead_node(struct dlm_ctxt *dlm, in dlm_set_reco_dead_node() argument
120 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_dead_node()
121 if (dlm->reco.dead_node != dead_node) in dlm_set_reco_dead_node()
123 dlm->name, dlm->reco.dead_node, dead_node); in dlm_set_reco_dead_node()
124 dlm->reco.dead_node = dead_node; in dlm_set_reco_dead_node()
127 static inline void dlm_set_reco_master(struct dlm_ctxt *dlm, in dlm_set_reco_master() argument
130 assert_spin_locked(&dlm->spinlock); in dlm_set_reco_master()
132 dlm->name, dlm->reco.new_master, master); in dlm_set_reco_master()
133 dlm->reco.new_master = master; in dlm_set_reco_master()
136 static inline void __dlm_reset_recovery(struct dlm_ctxt *dlm) in __dlm_reset_recovery() argument
138 assert_spin_locked(&dlm->spinlock); in __dlm_reset_recovery()
139 clear_bit(dlm->reco.dead_node, dlm->recovery_map); in __dlm_reset_recovery()
140 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in __dlm_reset_recovery()
141 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); in __dlm_reset_recovery()
144 static inline void dlm_reset_recovery(struct dlm_ctxt *dlm) in dlm_reset_recovery() argument
146 spin_lock(&dlm->spinlock); in dlm_reset_recovery()
147 __dlm_reset_recovery(dlm); in dlm_reset_recovery()
148 spin_unlock(&dlm->spinlock); in dlm_reset_recovery()
154 struct dlm_ctxt *dlm = in dlm_dispatch_work() local
161 spin_lock(&dlm->work_lock); in dlm_dispatch_work()
162 list_splice_init(&dlm->work_list, &tmp_list); in dlm_dispatch_work()
163 spin_unlock(&dlm->work_lock); in dlm_dispatch_work()
168 mlog(0, "%s: work thread has %d work items\n", dlm->name, tot); in dlm_dispatch_work()
176 BUG_ON(item->dlm != dlm); in dlm_dispatch_work()
182 dlm_put(dlm); in dlm_dispatch_work()
191 void dlm_kick_recovery_thread(struct dlm_ctxt *dlm) in dlm_kick_recovery_thread() argument
199 wake_up(&dlm->dlm_reco_thread_wq); in dlm_kick_recovery_thread()
203 int dlm_launch_recovery_thread(struct dlm_ctxt *dlm) in dlm_launch_recovery_thread() argument
207 dlm->dlm_reco_thread_task = kthread_run(dlm_recovery_thread, dlm, in dlm_launch_recovery_thread()
208 "dlm_reco-%s", dlm->name); in dlm_launch_recovery_thread()
209 if (IS_ERR(dlm->dlm_reco_thread_task)) { in dlm_launch_recovery_thread()
210 mlog_errno(PTR_ERR(dlm->dlm_reco_thread_task)); in dlm_launch_recovery_thread()
211 dlm->dlm_reco_thread_task = NULL; in dlm_launch_recovery_thread()
218 void dlm_complete_recovery_thread(struct dlm_ctxt *dlm) in dlm_complete_recovery_thread() argument
220 if (dlm->dlm_reco_thread_task) { in dlm_complete_recovery_thread()
222 kthread_stop(dlm->dlm_reco_thread_task); in dlm_complete_recovery_thread()
223 dlm->dlm_reco_thread_task = NULL; in dlm_complete_recovery_thread()
252 static void dlm_print_reco_node_status(struct dlm_ctxt *dlm) in dlm_print_reco_node_status() argument
258 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_print_reco_node_status()
259 dlm->reco.state & DLM_RECO_STATE_ACTIVE ? "ACTIVE" : "inactive", in dlm_print_reco_node_status()
260 dlm->reco.dead_node, dlm->reco.new_master); in dlm_print_reco_node_status()
262 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_print_reco_node_status()
291 dlm->name, ndata->node_num, st); in dlm_print_reco_node_status()
293 list_for_each_entry(res, &dlm->reco.resources, recovering) { in dlm_print_reco_node_status()
295 dlm->name, res->lockname.len, res->lockname.name); in dlm_print_reco_node_status()
304 struct dlm_ctxt *dlm = data; in dlm_recovery_thread() local
307 mlog(0, "dlm thread running for %s...\n", dlm->name); in dlm_recovery_thread()
310 if (dlm_domain_fully_joined(dlm)) { in dlm_recovery_thread()
311 status = dlm_do_recovery(dlm); in dlm_recovery_thread()
320 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_recovery_thread()
330 static int dlm_reco_master_ready(struct dlm_ctxt *dlm) in dlm_reco_master_ready() argument
333 spin_lock(&dlm->spinlock); in dlm_reco_master_ready()
334 ready = (dlm->reco.new_master != O2NM_INVALID_NODE_NUM); in dlm_reco_master_ready()
335 spin_unlock(&dlm->spinlock); in dlm_reco_master_ready()
341 int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node) in dlm_is_node_dead() argument
344 spin_lock(&dlm->spinlock); in dlm_is_node_dead()
345 dead = !test_bit(node, dlm->domain_map); in dlm_is_node_dead()
346 spin_unlock(&dlm->spinlock); in dlm_is_node_dead()
352 static int dlm_is_node_recovered(struct dlm_ctxt *dlm, u8 node) in dlm_is_node_recovered() argument
355 spin_lock(&dlm->spinlock); in dlm_is_node_recovered()
356 recovered = !test_bit(node, dlm->recovery_map); in dlm_is_node_recovered()
357 spin_unlock(&dlm->spinlock); in dlm_is_node_recovered()
362 void dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout) in dlm_wait_for_node_death() argument
364 if (dlm_is_node_dead(dlm, node)) in dlm_wait_for_node_death()
368 "domain %s\n", node, dlm->name); in dlm_wait_for_node_death()
371 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
372 dlm_is_node_dead(dlm, node), in dlm_wait_for_node_death()
375 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_death()
376 dlm_is_node_dead(dlm, node)); in dlm_wait_for_node_death()
379 void dlm_wait_for_node_recovery(struct dlm_ctxt *dlm, u8 node, int timeout) in dlm_wait_for_node_recovery() argument
381 if (dlm_is_node_recovered(dlm, node)) in dlm_wait_for_node_recovery()
385 "domain %s\n", node, dlm->name); in dlm_wait_for_node_recovery()
388 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
389 dlm_is_node_recovered(dlm, node), in dlm_wait_for_node_recovery()
392 wait_event(dlm->dlm_reco_thread_wq, in dlm_wait_for_node_recovery()
393 dlm_is_node_recovered(dlm, node)); in dlm_wait_for_node_recovery()
402 static int dlm_in_recovery(struct dlm_ctxt *dlm) in dlm_in_recovery() argument
405 spin_lock(&dlm->spinlock); in dlm_in_recovery()
406 in_recovery = !!(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_in_recovery()
407 spin_unlock(&dlm->spinlock); in dlm_in_recovery()
412 void dlm_wait_for_recovery(struct dlm_ctxt *dlm) in dlm_wait_for_recovery() argument
414 if (dlm_in_recovery(dlm)) { in dlm_wait_for_recovery()
417 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_wait_for_recovery()
418 dlm->reco.state, dlm->reco.new_master, in dlm_wait_for_recovery()
419 dlm->reco.dead_node); in dlm_wait_for_recovery()
421 wait_event(dlm->reco.event, !dlm_in_recovery(dlm)); in dlm_wait_for_recovery()
424 static void dlm_begin_recovery(struct dlm_ctxt *dlm) in dlm_begin_recovery() argument
426 assert_spin_locked(&dlm->spinlock); in dlm_begin_recovery()
427 BUG_ON(dlm->reco.state & DLM_RECO_STATE_ACTIVE); in dlm_begin_recovery()
429 dlm->name, dlm->reco.dead_node); in dlm_begin_recovery()
430 dlm->reco.state |= DLM_RECO_STATE_ACTIVE; in dlm_begin_recovery()
433 static void dlm_end_recovery(struct dlm_ctxt *dlm) in dlm_end_recovery() argument
435 spin_lock(&dlm->spinlock); in dlm_end_recovery()
436 BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); in dlm_end_recovery()
437 dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; in dlm_end_recovery()
438 spin_unlock(&dlm->spinlock); in dlm_end_recovery()
439 printk(KERN_NOTICE "o2dlm: End recovery on domain %s\n", dlm->name); in dlm_end_recovery()
440 wake_up(&dlm->reco.event); in dlm_end_recovery()
443 static void dlm_print_recovery_master(struct dlm_ctxt *dlm) in dlm_print_recovery_master() argument
446 "dead node %u in domain %s\n", dlm->reco.new_master, in dlm_print_recovery_master()
447 (dlm->node_num == dlm->reco.new_master ? "me" : "he"), in dlm_print_recovery_master()
448 dlm->reco.dead_node, dlm->name); in dlm_print_recovery_master()
451 static int dlm_do_recovery(struct dlm_ctxt *dlm) in dlm_do_recovery() argument
456 spin_lock(&dlm->spinlock); in dlm_do_recovery()
458 if (dlm->migrate_done) { in dlm_do_recovery()
460 "lock resources\n", dlm->name); in dlm_do_recovery()
461 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
466 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM && in dlm_do_recovery()
467 test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_do_recovery()
469 dlm->reco.new_master, dlm->reco.dead_node); in dlm_do_recovery()
471 dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
475 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
478 bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES, 0); in dlm_do_recovery()
480 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
482 dlm_set_reco_dead_node(dlm, bit); in dlm_do_recovery()
483 } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { in dlm_do_recovery()
486 dlm->reco.dead_node); in dlm_do_recovery()
487 dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); in dlm_do_recovery()
490 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
492 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
497 dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), in dlm_do_recovery()
498 dlm->reco.dead_node); in dlm_do_recovery()
502 dlm_begin_recovery(dlm); in dlm_do_recovery()
504 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
506 if (dlm->reco.new_master == dlm->node_num) in dlm_do_recovery()
509 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_do_recovery()
514 ret = dlm_pick_recovery_master(dlm); in dlm_do_recovery()
522 dlm_print_recovery_master(dlm); in dlm_do_recovery()
527 dlm_end_recovery(dlm); in dlm_do_recovery()
533 dlm_print_recovery_master(dlm); in dlm_do_recovery()
535 status = dlm_remaster_locks(dlm, dlm->reco.dead_node); in dlm_do_recovery()
539 "retrying.\n", dlm->name, status, dlm->reco.dead_node); in dlm_do_recovery()
546 dlm->name, dlm->reco.dead_node, dlm->node_num); in dlm_do_recovery()
547 spin_lock(&dlm->spinlock); in dlm_do_recovery()
548 __dlm_reset_recovery(dlm); in dlm_do_recovery()
549 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_do_recovery()
550 spin_unlock(&dlm->spinlock); in dlm_do_recovery()
552 dlm_end_recovery(dlm); in dlm_do_recovery()
558 static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node) in dlm_remaster_locks() argument
569 status = dlm_init_recovery_area(dlm, dead_node); in dlm_remaster_locks()
572 "retrying\n", dlm->name); in dlm_remaster_locks()
579 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
583 mlog(0, "%s: Requesting lock info from node %u\n", dlm->name, in dlm_remaster_locks()
586 if (ndata->node_num == dlm->node_num) { in dlm_remaster_locks()
592 status = dlm_request_all_locks(dlm, ndata->node_num, in dlm_remaster_locks()
602 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
603 dlm_is_node_dead(dlm, in dlm_remaster_locks()
608 dlm_is_node_dead(dlm, ndata->node_num) ? in dlm_remaster_locks()
615 dlm->name, ndata->node_num, in dlm_remaster_locks()
656 mlog(0, "%s: Done requesting all lock info\n", dlm->name); in dlm_remaster_locks()
666 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_remaster_locks()
686 dlm->name, ndata->node_num, in dlm_remaster_locks()
693 dlm->name, ndata->node_num); in dlm_remaster_locks()
697 dlm->name, ndata->node_num); in dlm_remaster_locks()
712 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
713 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_remaster_locks()
714 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
720 ret = dlm_send_finalize_reco_message(dlm); in dlm_remaster_locks()
724 spin_lock(&dlm->spinlock); in dlm_remaster_locks()
725 dlm_finish_local_lockres_recovery(dlm, dead_node, in dlm_remaster_locks()
726 dlm->node_num); in dlm_remaster_locks()
727 spin_unlock(&dlm->spinlock); in dlm_remaster_locks()
731 "dead=%u, this=%u, new=%u\n", dlm->name, in dlm_remaster_locks()
732 jiffies, dlm->reco.dead_node, in dlm_remaster_locks()
733 dlm->node_num, dlm->reco.new_master); in dlm_remaster_locks()
737 dlm_kick_thread(dlm, NULL); in dlm_remaster_locks()
742 wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, in dlm_remaster_locks()
749 dlm_destroy_recovery_area(dlm); in dlm_remaster_locks()
754 static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node) in dlm_init_recovery_area() argument
759 spin_lock(&dlm->spinlock); in dlm_init_recovery_area()
760 memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map)); in dlm_init_recovery_area()
763 spin_unlock(&dlm->spinlock); in dlm_init_recovery_area()
766 num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); in dlm_init_recovery_area()
774 dlm_destroy_recovery_area(dlm); in dlm_init_recovery_area()
780 list_add_tail(&ndata->list, &dlm->reco.node_data); in dlm_init_recovery_area()
788 static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm) in dlm_destroy_recovery_area() argument
794 list_splice_init(&dlm->reco.node_data, &tmplist); in dlm_destroy_recovery_area()
803 static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, in dlm_request_all_locks() argument
817 lr.node_idx = dlm->node_num; in dlm_request_all_locks()
821 ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, in dlm_request_all_locks()
827 "to recover dead node %u\n", dlm->name, ret, in dlm_request_all_locks()
840 struct dlm_ctxt *dlm = data; in dlm_request_all_locks_handler() local
845 if (!dlm_grab(dlm)) in dlm_request_all_locks_handler()
848 if (lr->dead_node != dlm->reco.dead_node) { in dlm_request_all_locks_handler()
850 "dead_node is %u\n", dlm->name, lr->node_idx, in dlm_request_all_locks_handler()
851 lr->dead_node, dlm->reco.dead_node); in dlm_request_all_locks_handler()
852 dlm_print_reco_node_status(dlm); in dlm_request_all_locks_handler()
854 dlm_put(dlm); in dlm_request_all_locks_handler()
857 BUG_ON(lr->dead_node != dlm->reco.dead_node); in dlm_request_all_locks_handler()
861 dlm_put(dlm); in dlm_request_all_locks_handler()
869 dlm_put(dlm); in dlm_request_all_locks_handler()
874 dlm_grab(dlm); /* get an extra ref for the work item */ in dlm_request_all_locks_handler()
875 dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf); in dlm_request_all_locks_handler()
878 spin_lock(&dlm->work_lock); in dlm_request_all_locks_handler()
879 list_add_tail(&item->list, &dlm->work_list); in dlm_request_all_locks_handler()
880 spin_unlock(&dlm->work_lock); in dlm_request_all_locks_handler()
881 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_request_all_locks_handler()
883 dlm_put(dlm); in dlm_request_all_locks_handler()
891 struct dlm_ctxt *dlm; in dlm_request_all_locks_worker() local
897 dlm = item->dlm; in dlm_request_all_locks_worker()
903 dlm->name, dead_node, reco_master); in dlm_request_all_locks_worker()
905 if (dead_node != dlm->reco.dead_node || in dlm_request_all_locks_worker()
906 reco_master != dlm->reco.new_master) { in dlm_request_all_locks_worker()
909 if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { in dlm_request_all_locks_worker()
912 " current=(dead=%u,mas=%u)\n", dlm->name, in dlm_request_all_locks_worker()
914 dlm->reco.dead_node, dlm->reco.new_master); in dlm_request_all_locks_worker()
918 dlm->name, dlm->reco.dead_node, in dlm_request_all_locks_worker()
919 dlm->reco.new_master, dead_node, reco_master); in dlm_request_all_locks_worker()
930 dlm_move_reco_locks_to_list(dlm, &resources, dead_node); in dlm_request_all_locks_worker()
937 ret = dlm_send_one_lockres(dlm, res, mres, reco_master, in dlm_request_all_locks_worker()
941 "recovery state for dead node %u, ret=%d\n", dlm->name, in dlm_request_all_locks_worker()
949 spin_lock(&dlm->spinlock); in dlm_request_all_locks_worker()
950 list_splice_init(&resources, &dlm->reco.resources); in dlm_request_all_locks_worker()
951 spin_unlock(&dlm->spinlock); in dlm_request_all_locks_worker()
954 ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); in dlm_request_all_locks_worker()
958 dlm->name, reco_master, dead_node, ret); in dlm_request_all_locks_worker()
966 static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to) in dlm_send_all_done_msg() argument
972 done_msg.node_idx = dlm->node_num; in dlm_send_all_done_msg()
978 ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, in dlm_send_all_done_msg()
982 "to recover dead node %u\n", dlm->name, ret, send_to, in dlm_send_all_done_msg()
996 struct dlm_ctxt *dlm = data; in dlm_reco_data_done_handler() local
1001 if (!dlm_grab(dlm)) in dlm_reco_data_done_handler()
1006 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
1008 mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), in dlm_reco_data_done_handler()
1011 dlm->reco.dead_node, done->node_idx, dlm->node_num); in dlm_reco_data_done_handler()
1014 list_for_each_entry(ndata, &dlm->reco.node_data, list) { in dlm_reco_data_done_handler()
1047 dlm_kick_recovery_thread(dlm); in dlm_reco_data_done_handler()
1052 dlm_put(dlm); in dlm_reco_data_done_handler()
1058 static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, in dlm_move_reco_locks_to_list() argument
1065 spin_lock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1066 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_move_reco_locks_to_list()
1077 dead_node, dlm->name); in dlm_move_reco_locks_to_list()
1101 spin_unlock(&dlm->spinlock); in dlm_move_reco_locks_to_list()
1119 static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, in dlm_send_mig_lockres_msg() argument
1145 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_mig_lockres_msg()
1150 ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, in dlm_send_mig_lockres_msg()
1156 "node %u (%s)\n", dlm->name, mres->lockname_len, in dlm_send_mig_lockres_msg()
1257 static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, in dlm_add_dummy_lock() argument
1267 dummy.ml.node = dlm->node_num; in dlm_add_dummy_lock()
1271 static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, in dlm_is_dummy_lock() argument
1286 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, in dlm_send_one_lockres() argument
1324 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, in dlm_send_one_lockres()
1333 dlm->name, res->lockname.len, res->lockname.name, in dlm_send_one_lockres()
1336 dlm_add_dummy_lock(dlm, mres); in dlm_send_one_lockres()
1339 ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); in dlm_send_one_lockres()
1346 dlm->name, ret); in dlm_send_one_lockres()
1350 "lockres %.*s\n", dlm->name, send_to, in dlm_send_one_lockres()
1374 struct dlm_ctxt *dlm = data; in dlm_mig_lockres_handler() local
1385 if (!dlm_grab(dlm)) in dlm_mig_lockres_handler()
1388 if (!dlm_joined(dlm)) { in dlm_mig_lockres_handler()
1391 dlm->name, mres->lockname_len, in dlm_mig_lockres_handler()
1393 dlm_put(dlm); in dlm_mig_lockres_handler()
1421 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1422 res = __dlm_lookup_lockres_full(dlm, mres->lockname, mres->lockname_len, in dlm_mig_lockres_handler()
1431 " ref!\n", dlm->name, in dlm_mig_lockres_handler()
1435 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1456 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1463 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1465 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1468 res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); in dlm_mig_lockres_handler()
1482 spin_lock(&dlm->spinlock); in dlm_mig_lockres_handler()
1483 __dlm_insert_lockres(dlm, res); in dlm_mig_lockres_handler()
1484 spin_unlock(&dlm->spinlock); in dlm_mig_lockres_handler()
1517 dlm_lockres_grab_inflight_ref(dlm, res); in dlm_mig_lockres_handler()
1527 dlm_change_lockres_owner(dlm, res, dlm->node_num); in dlm_mig_lockres_handler()
1532 dlm_grab(dlm); /* get an extra ref for the work item */ in dlm_mig_lockres_handler()
1534 dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf); in dlm_mig_lockres_handler()
1538 spin_lock(&dlm->work_lock); in dlm_mig_lockres_handler()
1539 list_add_tail(&item->list, &dlm->work_list); in dlm_mig_lockres_handler()
1540 spin_unlock(&dlm->work_lock); in dlm_mig_lockres_handler()
1541 queue_work(dlm->dlm_worker, &dlm->dispatched_work); in dlm_mig_lockres_handler()
1548 dlm_put(dlm); in dlm_mig_lockres_handler()
1561 struct dlm_ctxt *dlm; in dlm_mig_lockres_worker() local
1568 dlm = item->dlm; in dlm_mig_lockres_worker()
1579 ret = dlm_lockres_master_requery(dlm, res, &real_master); in dlm_mig_lockres_worker()
1591 dlm_lockres_drop_inflight_ref(dlm, res); in dlm_mig_lockres_worker()
1602 ret = dlm_process_recovery_data(dlm, res, mres); in dlm_mig_lockres_worker()
1610 ret = dlm_finish_migration(dlm, res, mres->master); in dlm_mig_lockres_worker()
1627 static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, in dlm_lockres_master_requery() argument
1660 spin_lock(&dlm->spinlock); in dlm_lockres_master_requery()
1661 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_lockres_master_requery()
1662 spin_unlock(&dlm->spinlock); in dlm_lockres_master_requery()
1666 if (nodenum == dlm->node_num) in dlm_lockres_master_requery()
1668 ret = dlm_do_master_requery(dlm, res, nodenum, real_master); in dlm_lockres_master_requery()
1685 int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, in dlm_do_master_requery() argument
1693 req.node_idx = dlm->node_num; in dlm_do_master_requery()
1698 ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key, in dlm_do_master_requery()
1703 dlm->key, nodenum); in dlm_do_master_requery()
1726 struct dlm_ctxt *dlm = data; in dlm_master_requery_handler() local
1734 if (!dlm_grab(dlm)) { in dlm_master_requery_handler()
1742 spin_lock(&dlm->spinlock); in dlm_master_requery_handler()
1743 res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash); in dlm_master_requery_handler()
1747 if (master == dlm->node_num) { in dlm_master_requery_handler()
1748 int ret = dlm_dispatch_assert_master(dlm, res, in dlm_master_requery_handler()
1754 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1755 dlm_put(dlm); in dlm_master_requery_handler()
1760 __dlm_lockres_grab_inflight_worker(dlm, res); in dlm_master_requery_handler()
1769 spin_unlock(&dlm->spinlock); in dlm_master_requery_handler()
1772 dlm_put(dlm); in dlm_master_requery_handler()
1813 static int dlm_process_recovery_data(struct dlm_ctxt *dlm, in dlm_process_recovery_data() argument
1832 if (dlm_is_dummy_lock(dlm, ml, &from)) { in dlm_process_recovery_data()
1836 dlm->name, mres->lockname_len, mres->lockname, in dlm_process_recovery_data()
1839 dlm_lockres_set_refmap_bit(dlm, res, from); in dlm_process_recovery_data()
1853 if (ml->node == dlm->node_num) { in dlm_process_recovery_data()
1981 "lvb! type=%d\n", dlm->name, in dlm_process_recovery_data()
2021 "exists on this lockres!\n", dlm->name, in dlm_process_recovery_data()
2050 "setting refmap bit\n", dlm->name, in dlm_process_recovery_data()
2052 dlm_lockres_set_refmap_bit(dlm, res, ml->node); in dlm_process_recovery_data()
2061 dlm_lockres_drop_inflight_ref(dlm, res); in dlm_process_recovery_data()
2070 void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, in dlm_move_lockres_to_recovery_list() argument
2077 assert_spin_locked(&dlm->spinlock); in dlm_move_lockres_to_recovery_list()
2083 dlm->name, res->lockname.len, res->lockname.name); in dlm_move_lockres_to_recovery_list()
2089 list_add_tail(&res->recovering, &dlm->reco.resources); in dlm_move_lockres_to_recovery_list()
2154 static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, in dlm_finish_local_lockres_recovery() argument
2161 assert_spin_locked(&dlm->spinlock); in dlm_finish_local_lockres_recovery()
2163 list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { in dlm_finish_local_lockres_recovery()
2166 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2172 dlm_change_lockres_owner(dlm, res, new_master); in dlm_finish_local_lockres_recovery()
2175 __dlm_dirty_lockres(dlm, res); in dlm_finish_local_lockres_recovery()
2187 bucket = dlm_lockres_hash(dlm, i); in dlm_finish_local_lockres_recovery()
2200 res->owner != dlm->node_num) in dlm_finish_local_lockres_recovery()
2211 dlm->name, res->lockname.len, res->lockname.name, in dlm_finish_local_lockres_recovery()
2214 dlm_change_lockres_owner(dlm, res, new_master); in dlm_finish_local_lockres_recovery()
2217 __dlm_dirty_lockres(dlm, res); in dlm_finish_local_lockres_recovery()
2235 static void dlm_revalidate_lvb(struct dlm_ctxt *dlm, in dlm_revalidate_lvb() argument
2244 assert_spin_locked(&dlm->spinlock); in dlm_revalidate_lvb()
2247 if (res->owner == dlm->node_num) in dlm_revalidate_lvb()
2254 search_node = dlm->node_num; in dlm_revalidate_lvb()
2278 static void dlm_free_dead_locks(struct dlm_ctxt *dlm, in dlm_free_dead_locks() argument
2288 assert_spin_locked(&dlm->spinlock); in dlm_free_dead_locks()
2325 "dropping ref from lockres\n", dlm->name, in dlm_free_dead_locks()
2329 "but ref was not set\n", dlm->name, in dlm_free_dead_locks()
2334 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_free_dead_locks()
2337 "no locks and had not purged before dying\n", dlm->name, in dlm_free_dead_locks()
2339 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_free_dead_locks()
2343 __dlm_dirty_lockres(dlm, res); in dlm_free_dead_locks()
2346 static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node) in dlm_do_local_recovery_cleanup() argument
2356 dlm_clean_master_list(dlm, dead_node); in dlm_do_local_recovery_cleanup()
2373 bucket = dlm_lockres_hash(dlm, i); in dlm_do_local_recovery_cleanup()
2385 dead_node, dlm->name); in dlm_do_local_recovery_cleanup()
2399 __dlm_do_purge_lockres(dlm, res); in dlm_do_local_recovery_cleanup()
2404 } else if (res->owner == dlm->node_num) in dlm_do_local_recovery_cleanup()
2405 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2411 dlm_revalidate_lvb(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2418 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2421 __dlm_do_purge_lockres(dlm, res); in dlm_do_local_recovery_cleanup()
2427 dlm_move_lockres_to_recovery_list(dlm, res); in dlm_do_local_recovery_cleanup()
2428 } else if (res->owner == dlm->node_num) { in dlm_do_local_recovery_cleanup()
2429 dlm_free_dead_locks(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2430 __dlm_lockres_calc_usage(dlm, res); in dlm_do_local_recovery_cleanup()
2435 dlm->name, res->lockname.len, in dlm_do_local_recovery_cleanup()
2437 dlm_lockres_clear_refmap_bit(dlm, res, dead_node); in dlm_do_local_recovery_cleanup()
2446 static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx) in __dlm_hb_node_down() argument
2448 assert_spin_locked(&dlm->spinlock); in __dlm_hb_node_down()
2450 if (dlm->reco.new_master == idx) { in __dlm_hb_node_down()
2452 dlm->name, idx); in __dlm_hb_node_down()
2453 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in __dlm_hb_node_down()
2458 "finalize1 state, clearing\n", dlm->name, idx); in __dlm_hb_node_down()
2459 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in __dlm_hb_node_down()
2460 __dlm_reset_recovery(dlm); in __dlm_hb_node_down()
2465 if (dlm->joining_node == idx) { in __dlm_hb_node_down()
2467 __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); in __dlm_hb_node_down()
2471 if (!test_bit(idx, dlm->live_nodes_map)) { in __dlm_hb_node_down()
2474 dlm->name, idx); in __dlm_hb_node_down()
2479 if (!test_bit(idx, dlm->domain_map)) { in __dlm_hb_node_down()
2486 clear_bit(idx, dlm->live_nodes_map); in __dlm_hb_node_down()
2489 if (!test_bit(idx, dlm->recovery_map)) in __dlm_hb_node_down()
2490 dlm_do_local_recovery_cleanup(dlm, idx); in __dlm_hb_node_down()
2493 dlm_hb_event_notify_attached(dlm, idx, 0); in __dlm_hb_node_down()
2496 clear_bit(idx, dlm->domain_map); in __dlm_hb_node_down()
2497 clear_bit(idx, dlm->exit_domain_map); in __dlm_hb_node_down()
2500 wake_up(&dlm->migration_wq); in __dlm_hb_node_down()
2502 set_bit(idx, dlm->recovery_map); in __dlm_hb_node_down()
2507 struct dlm_ctxt *dlm = data; in dlm_hb_node_down_cb() local
2509 if (!dlm_grab(dlm)) in dlm_hb_node_down_cb()
2516 if (test_bit(idx, dlm->domain_map)) in dlm_hb_node_down_cb()
2517 dlm_fire_domain_eviction_callbacks(dlm, idx); in dlm_hb_node_down_cb()
2519 spin_lock(&dlm->spinlock); in dlm_hb_node_down_cb()
2520 __dlm_hb_node_down(dlm, idx); in dlm_hb_node_down_cb()
2521 spin_unlock(&dlm->spinlock); in dlm_hb_node_down_cb()
2523 dlm_put(dlm); in dlm_hb_node_down_cb()
2528 struct dlm_ctxt *dlm = data; in dlm_hb_node_up_cb() local
2530 if (!dlm_grab(dlm)) in dlm_hb_node_up_cb()
2533 spin_lock(&dlm->spinlock); in dlm_hb_node_up_cb()
2534 set_bit(idx, dlm->live_nodes_map); in dlm_hb_node_up_cb()
2537 spin_unlock(&dlm->spinlock); in dlm_hb_node_up_cb()
2539 dlm_put(dlm); in dlm_hb_node_up_cb()
2544 struct dlm_ctxt *dlm = astdata; in dlm_reco_ast() local
2546 dlm->node_num, dlm->name); in dlm_reco_ast()
2550 struct dlm_ctxt *dlm = astdata; in dlm_reco_bast() local
2552 dlm->node_num, dlm->name); in dlm_reco_bast()
2571 static int dlm_pick_recovery_master(struct dlm_ctxt *dlm) in dlm_pick_recovery_master() argument
2578 dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2582 ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, in dlm_pick_recovery_master()
2584 dlm_reco_ast, dlm, dlm_reco_bast); in dlm_pick_recovery_master()
2587 dlm->name, ret, lksb.status); in dlm_pick_recovery_master()
2591 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2595 if (dlm_reco_master_ready(dlm)) { in dlm_pick_recovery_master()
2597 "do the recovery\n", dlm->name, in dlm_pick_recovery_master()
2598 dlm->reco.new_master); in dlm_pick_recovery_master()
2604 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2605 if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2608 "node got recovered already\n", dlm->name); in dlm_pick_recovery_master()
2609 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_pick_recovery_master()
2612 dlm->name, dlm->reco.new_master); in dlm_pick_recovery_master()
2616 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2623 "begin_reco now\n", dlm->name, in dlm_pick_recovery_master()
2624 dlm->reco.dead_node, dlm->node_num); in dlm_pick_recovery_master()
2625 status = dlm_send_begin_reco_message(dlm, in dlm_pick_recovery_master()
2626 dlm->reco.dead_node); in dlm_pick_recovery_master()
2631 spin_lock(&dlm->spinlock); in dlm_pick_recovery_master()
2632 dlm_set_reco_master(dlm, dlm->node_num); in dlm_pick_recovery_master()
2633 spin_unlock(&dlm->spinlock); in dlm_pick_recovery_master()
2638 ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); in dlm_pick_recovery_master()
2641 ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm); in dlm_pick_recovery_master()
2654 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2658 wait_event_timeout(dlm->dlm_reco_thread_wq, in dlm_pick_recovery_master()
2659 dlm_reco_master_ready(dlm), in dlm_pick_recovery_master()
2661 if (!dlm_reco_master_ready(dlm)) { in dlm_pick_recovery_master()
2663 dlm->name); in dlm_pick_recovery_master()
2668 dlm->name, dlm->reco.new_master, dlm->reco.dead_node); in dlm_pick_recovery_master()
2672 dlm->name, dlm->node_num); in dlm_pick_recovery_master()
2679 "lksb.status=%s\n", dlm->name, dlm_errname(ret), in dlm_pick_recovery_master()
2681 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, in dlm_pick_recovery_master()
2695 static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node) in dlm_send_begin_reco_message() argument
2703 mlog(0, "%s: dead node is %u\n", dlm->name, dead_node); in dlm_send_begin_reco_message()
2705 spin_lock(&dlm->spinlock); in dlm_send_begin_reco_message()
2706 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_begin_reco_message()
2707 spin_unlock(&dlm->spinlock); in dlm_send_begin_reco_message()
2712 br.node_idx = dlm->node_num; in dlm_send_begin_reco_message()
2722 if (nodenum == dlm->node_num) { in dlm_send_begin_reco_message()
2730 ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key, in dlm_send_begin_reco_message()
2739 "begin reco msg (%d)\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2751 "to complete, backoff for a bit\n", dlm->name, in dlm_send_begin_reco_message()
2763 "returned %d\n", dlm->name, nodenum, ret); in dlm_send_begin_reco_message()
2764 res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, in dlm_send_begin_reco_message()
2785 struct dlm_ctxt *dlm = data; in dlm_begin_reco_handler() local
2789 if (!dlm_grab(dlm)) in dlm_begin_reco_handler()
2792 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2793 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_begin_reco_handler()
2796 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2797 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2798 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2799 dlm_put(dlm); in dlm_begin_reco_handler()
2802 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2805 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2806 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2808 dlm_fire_domain_eviction_callbacks(dlm, br->dead_node); in dlm_begin_reco_handler()
2810 spin_lock(&dlm->spinlock); in dlm_begin_reco_handler()
2811 if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2812 if (test_bit(dlm->reco.new_master, dlm->recovery_map)) { in dlm_begin_reco_handler()
2814 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2818 "to %u\n", dlm->name, dlm->reco.new_master, in dlm_begin_reco_handler()
2823 if (dlm->reco.dead_node != O2NM_INVALID_NODE_NUM) { in dlm_begin_reco_handler()
2825 "node %u changing it to %u\n", dlm->name, in dlm_begin_reco_handler()
2826 dlm->reco.dead_node, br->node_idx, br->dead_node); in dlm_begin_reco_handler()
2828 dlm_set_reco_master(dlm, br->node_idx); in dlm_begin_reco_handler()
2829 dlm_set_reco_dead_node(dlm, br->dead_node); in dlm_begin_reco_handler()
2830 if (!test_bit(br->dead_node, dlm->recovery_map)) { in dlm_begin_reco_handler()
2834 if (!test_bit(br->dead_node, dlm->domain_map) || in dlm_begin_reco_handler()
2835 !test_bit(br->dead_node, dlm->live_nodes_map)) in dlm_begin_reco_handler()
2841 set_bit(br->dead_node, dlm->domain_map); in dlm_begin_reco_handler()
2842 set_bit(br->dead_node, dlm->live_nodes_map); in dlm_begin_reco_handler()
2843 __dlm_hb_node_down(dlm, br->dead_node); in dlm_begin_reco_handler()
2845 spin_unlock(&dlm->spinlock); in dlm_begin_reco_handler()
2847 dlm_kick_recovery_thread(dlm); in dlm_begin_reco_handler()
2850 dlm->name, br->node_idx, br->dead_node, in dlm_begin_reco_handler()
2851 dlm->reco.dead_node, dlm->reco.new_master); in dlm_begin_reco_handler()
2853 dlm_put(dlm); in dlm_begin_reco_handler()
2858 static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm) in dlm_send_finalize_reco_message() argument
2868 "stage %d\n", dlm->name, dlm->reco.dead_node, stage); in dlm_send_finalize_reco_message()
2870 spin_lock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2871 dlm_node_iter_init(dlm->domain_map, &iter); in dlm_send_finalize_reco_message()
2872 spin_unlock(&dlm->spinlock); in dlm_send_finalize_reco_message()
2876 fr.node_idx = dlm->node_num; in dlm_send_finalize_reco_message()
2877 fr.dead_node = dlm->reco.dead_node; in dlm_send_finalize_reco_message()
2882 if (nodenum == dlm->node_num) in dlm_send_finalize_reco_message()
2884 ret = o2net_send_message(DLM_FINALIZE_RECO_MSG, dlm->key, in dlm_send_finalize_reco_message()
2891 dlm->key, nodenum); in dlm_send_finalize_reco_message()
2917 struct dlm_ctxt *dlm = data; in dlm_finalize_reco_handler() local
2922 if (!dlm_grab(dlm)) in dlm_finalize_reco_handler()
2929 "node %u (%u:%u)\n", dlm->name, fr->node_idx, stage, in dlm_finalize_reco_handler()
2930 fr->dead_node, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()
2932 spin_lock(&dlm->spinlock); in dlm_finalize_reco_handler()
2934 if (dlm->reco.new_master != fr->node_idx) { in dlm_finalize_reco_handler()
2937 fr->node_idx, dlm->reco.new_master, fr->dead_node); in dlm_finalize_reco_handler()
2940 if (dlm->reco.dead_node != fr->dead_node) { in dlm_finalize_reco_handler()
2943 fr->node_idx, fr->dead_node, dlm->reco.dead_node); in dlm_finalize_reco_handler()
2949 dlm_finish_local_lockres_recovery(dlm, fr->dead_node, fr->node_idx); in dlm_finalize_reco_handler()
2950 if (dlm->reco.state & DLM_RECO_STATE_FINALIZE) { in dlm_finalize_reco_handler()
2954 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2955 dlm_print_reco_node_status(dlm); in dlm_finalize_reco_handler()
2958 dlm->reco.state |= DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2959 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2962 if (!(dlm->reco.state & DLM_RECO_STATE_FINALIZE)) { in dlm_finalize_reco_handler()
2966 dlm->name, fr->node_idx, fr->dead_node); in dlm_finalize_reco_handler()
2967 dlm_print_reco_node_status(dlm); in dlm_finalize_reco_handler()
2970 dlm->reco.state &= ~DLM_RECO_STATE_FINALIZE; in dlm_finalize_reco_handler()
2971 __dlm_reset_recovery(dlm); in dlm_finalize_reco_handler()
2972 spin_unlock(&dlm->spinlock); in dlm_finalize_reco_handler()
2973 dlm_kick_recovery_thread(dlm); in dlm_finalize_reco_handler()
2978 dlm->name, fr->node_idx, dlm->reco.dead_node, dlm->reco.new_master); in dlm_finalize_reco_handler()
2980 dlm_put(dlm); in dlm_finalize_reco_handler()