summaryrefslogtreecommitdiff
path: root/fs/dlm/lock.c
diff options
context:
space:
mode:
authorDavid Teigland <teigland@redhat.com>2006-07-12 16:44:04 -0500
committerSteven Whitehouse <swhiteho@redhat.com>2006-07-13 09:25:34 -0400
commit597d0cae0f99f62501e229bed50e8149604015bb (patch)
treeb6cab09ff6fe2246740848164c0a52d5c03136a0 /fs/dlm/lock.c
parent2eb168ca94aba3bcae350ad9b31870955174a218 (diff)
downloadlinux-3.10-597d0cae0f99f62501e229bed50e8149604015bb.tar.gz
linux-3.10-597d0cae0f99f62501e229bed50e8149604015bb.tar.bz2
linux-3.10-597d0cae0f99f62501e229bed50e8149604015bb.zip
[DLM] dlm: user locks
This changes the way the dlm handles user locks. The core dlm is now aware of user locks so they can be dealt with more efficiently. There is no more dlm_device module which previously managed its own duplicate copy of every user lock. Signed-off-by: Patrick Caulfield <pcaulfie@redhat.com> Signed-off-by: David Teigland <teigland@redhat.com> Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
Diffstat (limited to 'fs/dlm/lock.c')
-rw-r--r--fs/dlm/lock.c304
1 files changed, 298 insertions, 6 deletions
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 5f696390410..4e222f873b6 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -55,8 +55,9 @@
R: do_xxxx()
L: receive_xxxx_reply() <- R: send_xxxx_reply()
*/
-
+#include <linux/types.h>
#include "dlm_internal.h"
+#include <linux/dlm_device.h>
#include "memory.h"
#include "lowcomms.h"
#include "requestqueue.h"
@@ -69,6 +70,7 @@
#include "rcom.h"
#include "recover.h"
#include "lvb_table.h"
+#include "user.h"
#include "config.h"
static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
@@ -84,6 +86,8 @@ static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
struct dlm_message *ms);
static int receive_extralen(struct dlm_message *ms);
+#define FAKE_USER_AST (void*)0xff00ff00
+
/*
* Lock compatibilty matrix - thanks Steve
* UN = Unlocked state. Not really a state, used as a flag
@@ -152,7 +156,7 @@ static const int __quecvt_compat_matrix[8][8] = {
{0, 0, 0, 0, 0, 0, 0, 0} /* PD */
};
-static void dlm_print_lkb(struct dlm_lkb *lkb)
+void dlm_print_lkb(struct dlm_lkb *lkb)
{
printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
" status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
@@ -291,7 +295,7 @@ static int search_rsb_list(struct list_head *head, char *name, int len,
if (len == r->res_length && !memcmp(name, r->res_name, len))
goto found;
}
- return -ENOENT;
+ return -EBADR;
found:
if (r->res_nodeid && (flags & R_MASTER))
@@ -376,7 +380,7 @@ static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
if (!error)
goto out;
- if (error == -ENOENT && !(flags & R_CREATE))
+ if (error == -EBADR && !(flags & R_CREATE))
goto out;
/* the rsb was found but wasn't a master copy */
@@ -920,7 +924,7 @@ static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
return;
- b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
+ b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
if (b == 1) {
int len = receive_extralen(ms);
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
@@ -963,6 +967,8 @@ static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
lkb->lkb_rqmode = DLM_LOCK_IV;
switch (lkb->lkb_status) {
+ case DLM_LKSTS_GRANTED:
+ break;
case DLM_LKSTS_CONVERT:
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
break;
@@ -1727,6 +1733,11 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
return -DLM_EUNLOCK;
}
+/* FIXME: if revert_lock() finds that the lkb is granted, we should
+ skip the queue_cast(ECANCEL). It indicates that the request/convert
+ completed (and queued a normal ast) just before the cancel; we don't
+ want to clobber the sb_result for the normal ast with ECANCEL. */
+
static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
revert_lock(r, lkb);
@@ -2739,7 +2750,7 @@ static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
confirm_master(r, error);
break;
- case -ENOENT:
+ case -EBADR:
case -ENOTBLK:
/* find_rsb failed to find rsb or rsb wasn't master */
r->res_nodeid = -1;
@@ -3545,3 +3556,284 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
return 0;
}
+int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
+ int mode, uint32_t flags, void *name, unsigned int namelen,
+ uint32_t parent_lkid)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ int error;
+
+ lock_recovery(ls);
+
+ error = create_lkb(ls, &lkb);
+ if (error) {
+ kfree(ua);
+ goto out;
+ }
+
+ if (flags & DLM_LKF_VALBLK) {
+ ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+ if (!ua->lksb.sb_lvbptr) {
+ kfree(ua);
+ __put_lkb(ls, lkb);
+ error = -ENOMEM;
+ goto out;
+ }
+ }
+
+ /* After ua is attached to lkb it will be freed by free_lkb().
+ When DLM_IFL_USER is set, the dlm knows that this is a userspace
+ lock and that lkb_astparam is the dlm_user_args structure. */
+
+ error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
+ FAKE_USER_AST, ua, FAKE_USER_AST, &args);
+ lkb->lkb_flags |= DLM_IFL_USER;
+ ua->old_mode = DLM_LOCK_IV;
+
+ if (error) {
+ __put_lkb(ls, lkb);
+ goto out;
+ }
+
+ error = request_lock(ls, lkb, name, namelen, &args);
+
+ switch (error) {
+ case 0:
+ break;
+ case -EINPROGRESS:
+ error = 0;
+ break;
+ case -EAGAIN:
+ error = 0;
+ /* fall through */
+ default:
+ __put_lkb(ls, lkb);
+ goto out;
+ }
+
+ /* add this new lkb to the per-process list of locks */
+ spin_lock(&ua->proc->locks_spin);
+ kref_get(&lkb->lkb_ref);
+ list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
+ spin_unlock(&ua->proc->locks_spin);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ /* user can change the params on its lock when it converts it, or
+ add an lvb that didn't exist before */
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
+ ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+ if (!ua->lksb.sb_lvbptr) {
+ error = -ENOMEM;
+ goto out_put;
+ }
+ }
+ if (lvb_in && ua->lksb.sb_lvbptr)
+ memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+
+ ua->castparam = ua_tmp->castparam;
+ ua->castaddr = ua_tmp->castaddr;
+ ua->bastparam = ua_tmp->bastparam;
+ ua->bastaddr = ua_tmp->bastaddr;
+ ua->old_mode = lkb->lkb_grmode;
+
+ error = set_lock_args(mode, &ua->lksb, flags, 0, 0, FAKE_USER_AST, ua,
+ FAKE_USER_AST, &args);
+ if (error)
+ goto out_put;
+
+ error = convert_lock(ls, lkb, &args);
+
+ if (error == -EINPROGRESS || error == -EAGAIN)
+ error = 0;
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ kfree(ua_tmp);
+ return error;
+}
+
+int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ uint32_t flags, uint32_t lkid, char *lvb_in)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (lvb_in && ua->lksb.sb_lvbptr)
+ memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
+ ua->castparam = ua_tmp->castparam;
+
+ error = set_unlock_args(flags, ua, &args);
+ if (error)
+ goto out_put;
+
+ error = unlock_lock(ls, lkb, &args);
+
+ if (error == -DLM_EUNLOCK)
+ error = 0;
+ if (error)
+ goto out_put;
+
+ spin_lock(&ua->proc->locks_spin);
+ list_del(&lkb->lkb_ownqueue);
+ spin_unlock(&ua->proc->locks_spin);
+
+ /* this removes the reference for the proc->locks list added by
+ dlm_user_request */
+ unhold_lkb(lkb);
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
+ uint32_t flags, uint32_t lkid)
+{
+ struct dlm_lkb *lkb;
+ struct dlm_args args;
+ struct dlm_user_args *ua;
+ int error;
+
+ lock_recovery(ls);
+
+ error = find_lkb(ls, lkid, &lkb);
+ if (error)
+ goto out;
+
+ ua = (struct dlm_user_args *)lkb->lkb_astparam;
+ ua->castparam = ua_tmp->castparam;
+
+ error = set_unlock_args(flags, ua, &args);
+ if (error)
+ goto out_put;
+
+ error = cancel_lock(ls, lkb, &args);
+
+ if (error == -DLM_ECANCEL)
+ error = 0;
+ if (error)
+ goto out_put;
+
+ /* this lkb was removed from the WAITING queue */
+ if (lkb->lkb_grmode == DLM_LOCK_IV) {
+ spin_lock(&ua->proc->locks_spin);
+ list_del(&lkb->lkb_ownqueue);
+ spin_unlock(&ua->proc->locks_spin);
+ unhold_lkb(lkb);
+ }
+ out_put:
+ dlm_put_lkb(lkb);
+ out:
+ unlock_recovery(ls);
+ return error;
+}
+
+static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+ struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+
+ if (ua->lksb.sb_lvbptr)
+ kfree(ua->lksb.sb_lvbptr);
+ kfree(ua);
+ lkb->lkb_astparam = (long)NULL;
+
+ /* TODO: propogate to master if needed */
+ return 0;
+}
+
+/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
+ Regardless of what rsb queue the lock is on, it's removed and freed. */
+
+static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
+{
+ struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
+ struct dlm_args args;
+ int error;
+
+ /* FIXME: we need to handle the case where the lkb is in limbo
+ while the rsb is being looked up, currently we assert in
+ _unlock_lock/is_remote because rsb nodeid is -1. */
+
+ set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
+
+ error = unlock_lock(ls, lkb, &args);
+ if (error == -DLM_EUNLOCK)
+ error = 0;
+ return error;
+}
+
+/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
+ 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
+ which we clear here. */
+
+/* proc CLOSING flag is set so no more device_reads should look at proc->asts
+ list, and no more device_writes should add lkb's to proc->locks list; so we
+ shouldn't need to take asts_spin or locks_spin here. this assumes that
+ device reads/writes/closes are serialized -- FIXME: we may need to serialize
+ them ourself. */
+
+void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
+{
+ struct dlm_lkb *lkb, *safe;
+
+ lock_recovery(ls);
+ mutex_lock(&ls->ls_clear_proc_locks);
+
+ list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
+ if (lkb->lkb_ast_type) {
+ list_del(&lkb->lkb_astqueue);
+ unhold_lkb(lkb);
+ }
+
+ list_del(&lkb->lkb_ownqueue);
+
+ if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
+ lkb->lkb_flags |= DLM_IFL_ORPHAN;
+ orphan_proc_lock(ls, lkb);
+ } else {
+ lkb->lkb_flags |= DLM_IFL_DEAD;
+ unlock_proc_lock(ls, lkb);
+ }
+
+ /* this removes the reference for the proc->locks list
+ added by dlm_user_request, it may result in the lkb
+ being freed */
+
+ dlm_put_lkb(lkb);
+ }
+ mutex_unlock(&ls->ls_clear_proc_locks);
+ unlock_recovery(ls);
+}