diff options
Diffstat (limited to 'fs/dlm/lockspace.c')
-rw-r--r-- | fs/dlm/lockspace.c | 212 |
1 files changed, 63 insertions, 149 deletions
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c index 0455dddb0797..475ab4370dda 100644 --- a/fs/dlm/lockspace.c +++ b/fs/dlm/lockspace.c @@ -29,8 +29,6 @@ static int ls_count; static struct mutex ls_lock; static struct list_head lslist; static spinlock_t lslist_lock; -static struct task_struct * scand_task; - static ssize_t dlm_control_store(struct dlm_ls *ls, const char *buf, size_t len) { @@ -247,66 +245,11 @@ void dlm_lockspace_exit(void) kset_unregister(dlm_kset); } -static struct dlm_ls *find_ls_to_scan(void) -{ - struct dlm_ls *ls; - - spin_lock(&lslist_lock); - list_for_each_entry(ls, &lslist, ls_list) { - if (time_after_eq(jiffies, ls->ls_scan_time + - dlm_config.ci_scan_secs * HZ)) { - spin_unlock(&lslist_lock); - return ls; - } - } - spin_unlock(&lslist_lock); - return NULL; -} - -static int dlm_scand(void *data) -{ - struct dlm_ls *ls; - - while (!kthread_should_stop()) { - ls = find_ls_to_scan(); - if (ls) { - if (dlm_lock_recovery_try(ls)) { - ls->ls_scan_time = jiffies; - dlm_scan_rsbs(ls); - dlm_unlock_recovery(ls); - } else { - ls->ls_scan_time += HZ; - } - continue; - } - schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ); - } - return 0; -} - -static int dlm_scand_start(void) -{ - struct task_struct *p; - int error = 0; - - p = kthread_run(dlm_scand, NULL, "dlm_scand"); - if (IS_ERR(p)) - error = PTR_ERR(p); - else - scand_task = p; - return error; -} - -static void dlm_scand_stop(void) -{ - kthread_stop(scand_task); -} - struct dlm_ls *dlm_find_lockspace_global(uint32_t id) { struct dlm_ls *ls; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_global_id == id) { @@ -316,7 +259,7 @@ struct dlm_ls *dlm_find_lockspace_global(uint32_t id) } ls = NULL; out: - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); return ls; } @@ -324,7 +267,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) { struct dlm_ls *ls; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_local_handle == lockspace) { atomic_inc(&ls->ls_count); @@ -333,7 +276,7 @@ struct dlm_ls *dlm_find_lockspace_local(dlm_lockspace_t *lockspace) } ls = NULL; out: - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); return ls; } @@ -341,7 +284,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor) { struct dlm_ls *ls; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (ls->ls_device.minor == minor) { atomic_inc(&ls->ls_count); @@ -350,7 +293,7 @@ struct dlm_ls *dlm_find_lockspace_device(int minor) } ls = NULL; out: - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); return ls; } @@ -365,15 +308,15 @@ static void remove_lockspace(struct dlm_ls *ls) retry: wait_event(ls->ls_count_wait, atomic_read(&ls->ls_count) == 0); - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); if (atomic_read(&ls->ls_count) != 0) { - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); goto retry; } WARN_ON(ls->ls_create_count != 0); list_del(&ls->ls_list); - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); } static int threads_start(void) @@ -382,22 +325,9 @@ static int threads_start(void) /* Thread for sending/receiving messages for all lockspace's */ error = dlm_midcomms_start(); - if (error) { + if (error) log_print("cannot start dlm midcomms %d", error); - goto fail; - } - - error = dlm_scand_start(); - if (error) { - log_print("cannot start dlm_scand thread %d", error); - goto midcomms_fail; - } - - return 0; - midcomms_fail: - dlm_midcomms_stop(); - fail: return error; } @@ -407,9 +337,9 @@ static int new_lockspace(const char *name, const char *cluster, int *ops_result, dlm_lockspace_t **lockspace) { struct dlm_ls *ls; - int i, size, error; int do_unreg = 0; int namelen = strlen(name); + int error; if (namelen > DLM_LOCKSPACE_LEN || namelen == 0) return -EINVAL; @@ -448,7 +378,7 @@ static int new_lockspace(const char *name, const char *cluster, error = 0; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { WARN_ON(ls->ls_create_count <= 0); if (ls->ls_namelen != namelen) @@ -464,7 +394,7 @@ static int new_lockspace(const char *name, const char *cluster, error = 1; break; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); if (error) goto out; @@ -492,32 +422,21 @@ static int new_lockspace(const char *name, const char *cluster, */ ls->ls_exflags = (flags & ~(DLM_LSFL_FS | DLM_LSFL_NEWEXCL)); - size = READ_ONCE(dlm_config.ci_rsbtbl_size); - ls->ls_rsbtbl_size = size; + INIT_LIST_HEAD(&ls->ls_toss); + INIT_LIST_HEAD(&ls->ls_keep); + rwlock_init(&ls->ls_rsbtbl_lock); - ls->ls_rsbtbl = vmalloc(array_size(size, sizeof(struct dlm_rsbtable))); - if (!ls->ls_rsbtbl) + error = rhashtable_init(&ls->ls_rsbtbl, &dlm_rhash_rsb_params); + if (error) goto out_lsfree; - for (i = 0; i < size; i++) { - ls->ls_rsbtbl[i].keep.rb_node = NULL; - ls->ls_rsbtbl[i].toss.rb_node = NULL; - spin_lock_init(&ls->ls_rsbtbl[i].lock); - } - - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) { - ls->ls_remove_names[i] = kzalloc(DLM_RESNAME_MAXLEN+1, - GFP_KERNEL); - if (!ls->ls_remove_names[i]) - goto out_rsbtbl; - } idr_init(&ls->ls_lkbidr); - spin_lock_init(&ls->ls_lkbidr_spin); + rwlock_init(&ls->ls_lkbidr_lock); INIT_LIST_HEAD(&ls->ls_waiters); - mutex_init(&ls->ls_waiters_mutex); + spin_lock_init(&ls->ls_waiters_lock); INIT_LIST_HEAD(&ls->ls_orphans); - mutex_init(&ls->ls_orphans_mutex); + spin_lock_init(&ls->ls_orphans_lock); INIT_LIST_HEAD(&ls->ls_new_rsb); spin_lock_init(&ls->ls_new_rsb_spin); @@ -552,11 +471,9 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_recover_seq = get_random_u64(); ls->ls_recover_args = NULL; init_rwsem(&ls->ls_in_recovery); - init_rwsem(&ls->ls_recv_active); + rwlock_init(&ls->ls_recv_active); INIT_LIST_HEAD(&ls->ls_requestqueue); - atomic_set(&ls->ls_requestqueue_cnt, 0); - init_waitqueue_head(&ls->ls_requestqueue_wait); - mutex_init(&ls->ls_requestqueue_mutex); + rwlock_init(&ls->ls_requestqueue_lock); spin_lock_init(&ls->ls_clear_proc_locks); /* Due backwards compatibility with 3.1 we need to use maximum @@ -565,8 +482,10 @@ static int new_lockspace(const char *name, const char *cluster, * might send less. */ ls->ls_recover_buf = kmalloc(DLM_MAX_SOCKET_BUFSIZE, GFP_NOFS); - if (!ls->ls_recover_buf) + if (!ls->ls_recover_buf) { + error = -ENOMEM; goto out_lkbidr; + } ls->ls_slot = 0; ls->ls_num_slots = 0; @@ -580,13 +499,20 @@ static int new_lockspace(const char *name, const char *cluster, ls->ls_recover_list_count = 0; ls->ls_local_handle = ls; init_waitqueue_head(&ls->ls_wait_general); - INIT_LIST_HEAD(&ls->ls_root_list); - init_rwsem(&ls->ls_root_sem); + INIT_LIST_HEAD(&ls->ls_masters_list); + rwlock_init(&ls->ls_masters_lock); + INIT_LIST_HEAD(&ls->ls_dir_dump_list); + rwlock_init(&ls->ls_dir_dump_lock); + + INIT_LIST_HEAD(&ls->ls_toss_q); + spin_lock_init(&ls->ls_toss_q_lock); + timer_setup(&ls->ls_timer, dlm_rsb_toss_timer, + TIMER_DEFERRABLE); - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); ls->ls_create_count = 1; list_add(&ls->ls_list, &lslist); - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); if (flags & DLM_LSFL_FS) { error = dlm_callback_start(ls); @@ -655,17 +581,14 @@ static int new_lockspace(const char *name, const char *cluster, out_callback: dlm_callback_stop(ls); out_delist: - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_del(&ls->ls_list); - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); idr_destroy(&ls->ls_recover_idr); kfree(ls->ls_recover_buf); out_lkbidr: idr_destroy(&ls->ls_lkbidr); - out_rsbtbl: - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) - kfree(ls->ls_remove_names[i]); - vfree(ls->ls_rsbtbl); + rhashtable_destroy(&ls->ls_rsbtbl); out_lsfree: if (do_unreg) kobject_put(&ls->ls_kobj); @@ -697,7 +620,6 @@ static int __dlm_new_lockspace(const char *name, const char *cluster, if (error > 0) error = 0; if (!ls_count) { - dlm_scand_stop(); dlm_midcomms_shutdown(); dlm_midcomms_stop(); } @@ -756,7 +678,7 @@ static int lockspace_busy(struct dlm_ls *ls, int force) { int rv; - spin_lock(&ls->ls_lkbidr_spin); + read_lock_bh(&ls->ls_lkbidr_lock); if (force == 0) { rv = idr_for_each(&ls->ls_lkbidr, lkb_idr_is_any, ls); } else if (force == 1) { @@ -764,19 +686,25 @@ static int lockspace_busy(struct dlm_ls *ls, int force) } else { rv = 0; } - spin_unlock(&ls->ls_lkbidr_spin); + read_unlock_bh(&ls->ls_lkbidr_lock); return rv; } +static void rhash_free_rsb(void *ptr, void *arg) +{ + struct dlm_rsb *rsb = ptr; + + dlm_free_rsb(rsb); +} + static int release_lockspace(struct dlm_ls *ls, int force) { struct dlm_rsb *rsb; - struct rb_node *n; - int i, busy, rv; + int busy, rv; busy = lockspace_busy(ls, force); - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); if (ls->ls_create_count == 1) { if (busy) { rv = -EBUSY; @@ -790,7 +718,7 @@ static int release_lockspace(struct dlm_ls *ls, int force) } else { rv = -EINVAL; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); if (rv) { log_debug(ls, "release_lockspace no remove %d", rv); @@ -807,8 +735,13 @@ static int release_lockspace(struct dlm_ls *ls, int force) dlm_recoverd_stop(ls); + /* clear the LSFL_RUNNING flag to fast up + * time_shutdown_sync(), we don't care anymore + */ + clear_bit(LSFL_RUNNING, &ls->ls_flags); + timer_shutdown_sync(&ls->ls_timer); + if (ls_count == 1) { - dlm_scand_stop(); dlm_clear_members(ls); dlm_midcomms_shutdown(); } @@ -830,27 +763,9 @@ static int release_lockspace(struct dlm_ls *ls, int force) idr_destroy(&ls->ls_lkbidr); /* - * Free all rsb's on rsbtbl[] lists + * Free all rsb's on rsbtbl */ - - for (i = 0; i < ls->ls_rsbtbl_size; i++) { - while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) { - rsb = rb_entry(n, struct dlm_rsb, res_hashnode); - rb_erase(n, &ls->ls_rsbtbl[i].keep); - dlm_free_rsb(rsb); - } - - while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) { - rsb = rb_entry(n, struct dlm_rsb, res_hashnode); - rb_erase(n, &ls->ls_rsbtbl[i].toss); - dlm_free_rsb(rsb); - } - } - - vfree(ls->ls_rsbtbl); - - for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) - kfree(ls->ls_remove_names[i]); + rhashtable_free_and_destroy(&ls->ls_rsbtbl, rhash_free_rsb, NULL); while (!list_empty(&ls->ls_new_rsb)) { rsb = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, @@ -918,20 +833,19 @@ void dlm_stop_lockspaces(void) restart: count = 0; - spin_lock(&lslist_lock); + spin_lock_bh(&lslist_lock); list_for_each_entry(ls, &lslist, ls_list) { if (!test_bit(LSFL_RUNNING, &ls->ls_flags)) { count++; continue; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); log_error(ls, "no userland control daemon, stopping lockspace"); dlm_ls_stop(ls); goto restart; } - spin_unlock(&lslist_lock); + spin_unlock_bh(&lslist_lock); if (count) log_print("dlm user daemon left %d lockspaces", count); } - |