diff options
31 files changed, 2014 insertions, 1592 deletions
diff --git a/fs/ocfs2/Makefile b/fs/ocfs2/Makefile index 600d2d2ade1..791c0886c06 100644 --- a/fs/ocfs2/Makefile +++ b/fs/ocfs2/Makefile @@ -46,6 +46,7 @@ ocfs2_stackglue-objs := stackglue.o ocfs2_stack_o2cb-objs := stack_o2cb.o ocfs2_stack_user-objs := stack_user.o +obj-$(CONFIG_OCFS2_FS) += dlmfs/ # cluster/ is always needed when OCFS2_FS for masklog support obj-$(CONFIG_OCFS2_FS) += cluster/ obj-$(CONFIG_OCFS2_FS_O2CB) += dlm/ diff --git a/fs/ocfs2/alloc.c b/fs/ocfs2/alloc.c index d17bdc718f7..2bbe1ecc08c 100644 --- a/fs/ocfs2/alloc.c +++ b/fs/ocfs2/alloc.c @@ -1050,7 +1050,8 @@ static int ocfs2_create_new_meta_bhs(handle_t *handle, strcpy(eb->h_signature, OCFS2_EXTENT_BLOCK_SIGNATURE); eb->h_blkno = cpu_to_le64(first_blkno); eb->h_fs_generation = cpu_to_le32(osb->fs_generation); - eb->h_suballoc_slot = cpu_to_le16(osb->slot_num); + eb->h_suballoc_slot = + cpu_to_le16(meta_ac->ac_alloc_slot); eb->h_suballoc_bit = cpu_to_le16(suballoc_bit_start); eb->h_list.l_count = cpu_to_le16(ocfs2_extent_recs_per_eb(osb->sb)); @@ -6037,7 +6038,7 @@ static void ocfs2_truncate_log_worker(struct work_struct *work) if (status < 0) mlog_errno(status); else - ocfs2_init_inode_steal_slot(osb); + ocfs2_init_steal_slots(osb); mlog_exit(status); } diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c index 7e9df11260f..4c2a6d282c4 100644 --- a/fs/ocfs2/aops.c +++ b/fs/ocfs2/aops.c @@ -577,8 +577,9 @@ static int ocfs2_direct_IO_get_blocks(struct inode *inode, sector_t iblock, goto bail; } - /* We should already CoW the refcounted extent. */ - BUG_ON(ext_flags & OCFS2_EXT_REFCOUNTED); + /* We should already CoW the refcounted extent in case of create. */ + BUG_ON(create && (ext_flags & OCFS2_EXT_REFCOUNTED)); + /* * get_more_blocks() expects us to describe a hole by clearing * the mapped bit on bh_result(). diff --git a/fs/ocfs2/cluster/masklog.c b/fs/ocfs2/cluster/masklog.c index 1cd2934de61..b39da877b12 100644 --- a/fs/ocfs2/cluster/masklog.c +++ b/fs/ocfs2/cluster/masklog.c @@ -112,6 +112,7 @@ static struct mlog_attribute mlog_attrs[MLOG_MAX_BITS] = { define_mask(XATTR), define_mask(QUOTA), define_mask(REFCOUNT), + define_mask(BASTS), define_mask(ERROR), define_mask(NOTICE), define_mask(KTHREAD), diff --git a/fs/ocfs2/cluster/masklog.h b/fs/ocfs2/cluster/masklog.h index 9b4d11726cf..3dfddbec32f 100644 --- a/fs/ocfs2/cluster/masklog.h +++ b/fs/ocfs2/cluster/masklog.h @@ -114,6 +114,7 @@ #define ML_XATTR 0x0000000020000000ULL /* ocfs2 extended attributes */ #define ML_QUOTA 0x0000000040000000ULL /* ocfs2 quota operations */ #define ML_REFCOUNT 0x0000000080000000ULL /* refcount tree operations */ +#define ML_BASTS 0x0000001000000000ULL /* dlmglue asts and basts */ /* bits that are infrequently given and frequently matched in the high word */ #define ML_ERROR 0x0000000100000000ULL /* sent to KERN_ERR */ #define ML_NOTICE 0x0000000200000000ULL /* setn to KERN_NOTICE */ @@ -194,9 +195,9 @@ extern struct mlog_bits mlog_and_bits, mlog_not_bits; * previous token if args expands to nothing. */ #define __mlog_printk(level, fmt, args...) \ - printk(level "(%u,%lu):%s:%d " fmt, task_pid_nr(current), \ - __mlog_cpu_guess, __PRETTY_FUNCTION__, __LINE__ , \ - ##args) + printk(level "(%s,%u,%lu):%s:%d " fmt, current->comm, \ + task_pid_nr(current), __mlog_cpu_guess, \ + __PRETTY_FUNCTION__, __LINE__ , ##args) #define mlog(mask, fmt, args...) do { \ u64 __m = MLOG_MASK_PREFIX | (mask); \ diff --git a/fs/ocfs2/dir.c b/fs/ocfs2/dir.c index 28c3ec23879..765d66c7098 100644 --- a/fs/ocfs2/dir.c +++ b/fs/ocfs2/dir.c @@ -2439,7 +2439,7 @@ static int ocfs2_dx_dir_attach_index(struct ocfs2_super *osb, dx_root = (struct ocfs2_dx_root_block *)dx_root_bh->b_data; memset(dx_root, 0, osb->sb->s_blocksize); strcpy(dx_root->dr_signature, OCFS2_DX_ROOT_SIGNATURE); - dx_root->dr_suballoc_slot = cpu_to_le16(osb->slot_num); + dx_root->dr_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot); dx_root->dr_suballoc_bit = cpu_to_le16(dr_suballoc_bit); dx_root->dr_fs_generation = cpu_to_le32(osb->fs_generation); dx_root->dr_blkno = cpu_to_le64(dr_blkno); diff --git a/fs/ocfs2/dlm/Makefile b/fs/ocfs2/dlm/Makefile index 19036137570..dcebf0d920f 100644 --- a/fs/ocfs2/dlm/Makefile +++ b/fs/ocfs2/dlm/Makefile @@ -1,8 +1,7 @@ EXTRA_CFLAGS += -Ifs/ocfs2 -obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o ocfs2_dlmfs.o +obj-$(CONFIG_OCFS2_FS_O2CB) += ocfs2_dlm.o ocfs2_dlm-objs := dlmdomain.o dlmdebug.o dlmthread.o dlmrecovery.o \ dlmmaster.o dlmast.o dlmconvert.o dlmlock.o dlmunlock.o dlmver.o -ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c index 344bcf90cbf..b4f99de2caf 100644 --- a/fs/ocfs2/dlm/dlmrecovery.c +++ b/fs/ocfs2/dlm/dlmrecovery.c @@ -310,7 +310,7 @@ static int dlm_recovery_thread(void *data) mlog(0, "dlm thread running for %s...\n", dlm->name); while (!kthread_should_stop()) { - if (dlm_joined(dlm)) { + if (dlm_domain_fully_joined(dlm)) { status = dlm_do_recovery(dlm); if (status == -EAGAIN) { /* do not sleep, recheck immediately. */ diff --git a/fs/ocfs2/dlmfs/Makefile b/fs/ocfs2/dlmfs/Makefile new file mode 100644 index 00000000000..df69b4856d0 --- /dev/null +++ b/fs/ocfs2/dlmfs/Makefile @@ -0,0 +1,5 @@ +EXTRA_CFLAGS += -Ifs/ocfs2 + +obj-$(CONFIG_OCFS2_FS) += ocfs2_dlmfs.o + +ocfs2_dlmfs-objs := userdlm.o dlmfs.o dlmfsver.o diff --git a/fs/ocfs2/dlm/dlmfs.c b/fs/ocfs2/dlmfs/dlmfs.c index 02bf17808bd..1b0de157a08 100644 --- a/fs/ocfs2/dlm/dlmfs.c +++ b/fs/ocfs2/dlmfs/dlmfs.c @@ -43,24 +43,17 @@ #include <linux/init.h> #include <linux/string.h> #include <linux/backing-dev.h> +#include <linux/poll.h> #include <asm/uaccess.h> - -#include "cluster/nodemanager.h" -#include "cluster/heartbeat.h" -#include "cluster/tcp.h" - -#include "dlmapi.h" - +#include "stackglue.h" #include "userdlm.h" - #include "dlmfsver.h" #define MLOG_MASK_PREFIX ML_DLMFS #include "cluster/masklog.h" -#include "ocfs2_lockingver.h" static const struct super_operations dlmfs_ops; static const struct file_operations dlmfs_file_operations; @@ -71,15 +64,46 @@ static struct kmem_cache *dlmfs_inode_cache; struct workqueue_struct *user_dlm_worker; + + /* - * This is the userdlmfs locking protocol version. + * These are the ABI capabilities of dlmfs. + * + * Over time, dlmfs has added some features that were not part of the + * initial ABI. Unfortunately, some of these features are not detectable + * via standard usage. For example, Linux's default poll always returns + * POLLIN, so there is no way for a caller of poll(2) to know when dlmfs + * added poll support. Instead, we provide this list of new capabilities. + * + * Capabilities is a read-only attribute. We do it as a module parameter + * so we can discover it whether dlmfs is built in, loaded, or even not + * loaded. * - * See fs/ocfs2/dlmglue.c for more details on locking versions. + * The ABI features are local to this machine's dlmfs mount. This is + * distinct from the locking protocol, which is concerned with inter-node + * interaction. + * + * Capabilities: + * - bast : POLLIN against the file descriptor of a held lock + * signifies a bast fired on the lock. */ -static const struct dlm_protocol_version user_locking_protocol = { - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, -}; +#define DLMFS_CAPABILITIES "bast stackglue" +extern int param_set_dlmfs_capabilities(const char *val, + struct kernel_param *kp) +{ + printk(KERN_ERR "%s: readonly parameter\n", kp->name); + return -EINVAL; +} +static int param_get_dlmfs_capabilities(char *buffer, + struct kernel_param *kp) +{ + return strlcpy(buffer, DLMFS_CAPABILITIES, + strlen(DLMFS_CAPABILITIES) + 1); +} +module_param_call(capabilities, param_set_dlmfs_capabilities, + param_get_dlmfs_capabilities, NULL, 0444); +MODULE_PARM_DESC(capabilities, DLMFS_CAPABILITIES); + /* * decodes a set of open flags into a valid lock level and a set of flags. @@ -179,13 +203,46 @@ static int dlmfs_file_release(struct inode *inode, return 0; } +/* + * We do ->setattr() just to override size changes. Our size is the size + * of the LVB and nothing else. + */ +static int dlmfs_file_setattr(struct dentry *dentry, struct iattr *attr) +{ + int error; + struct inode *inode = dentry->d_inode; + + attr->ia_valid &= ~ATTR_SIZE; + error = inode_change_ok(inode, attr); + if (!error) + error = inode_setattr(inode, attr); + + return error; +} + +static unsigned int dlmfs_file_poll(struct file *file, poll_table *wait) +{ + int event = 0; + struct inode *inode = file->f_path.dentry->d_inode; + struct dlmfs_inode_private *ip = DLMFS_I(inode); + + poll_wait(file, &ip->ip_lockres.l_event, wait); + + spin_lock(&ip->ip_lockres.l_lock); + if (ip->ip_lockres.l_flags & USER_LOCK_BLOCKED) + event = POLLIN | POLLRDNORM; + spin_unlock(&ip->ip_lockres.l_lock); + + return event; +} + static ssize_t dlmfs_file_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos) { int bytes_left; - ssize_t readlen; + ssize_t readlen, got; char *lvb_buf; struct inode *inode = filp->f_path.dentry->d_inode; @@ -211,9 +268,13 @@ static ssize_t dlmfs_file_read(struct file *filp, if (!lvb_buf) return -ENOMEM; - user_dlm_read_lvb(inode, lvb_buf, readlen); - bytes_left = __copy_to_user(buf, lvb_buf, readlen); - readlen -= bytes_left; + got = user_dlm_read_lvb(inode, lvb_buf, readlen); + if (got) { + BUG_ON(got != readlen); + bytes_left = __copy_to_user(buf, lvb_buf, readlen); + readlen -= bytes_left; + } else + readlen = 0; kfree(lvb_buf); @@ -272,7 +333,7 @@ static void dlmfs_init_once(void *foo) struct dlmfs_inode_private *ip = (struct dlmfs_inode_private *) foo; - ip->ip_dlm = NULL; + ip->ip_conn = NULL; ip->ip_parent = NULL; inode_init_once(&ip->ip_vfs_inode); @@ -314,14 +375,14 @@ static void dlmfs_clear_inode(struct inode *inode) goto clear_fields; } - mlog(0, "we're a directory, ip->ip_dlm = 0x%p\n", ip->ip_dlm); + mlog(0, "we're a directory, ip->ip_conn = 0x%p\n", ip->ip_conn); /* we must be a directory. If required, lets unregister the * dlm context now. */ - if (ip->ip_dlm) - user_dlm_unregister_context(ip->ip_dlm); + if (ip->ip_conn) + user_dlm_unregister(ip->ip_conn); clear_fields: ip->ip_parent = NULL; - ip->ip_dlm = NULL; + ip->ip_conn = NULL; } static struct backing_dev_info dlmfs_backing_dev_info = { @@ -371,7 +432,7 @@ static struct inode *dlmfs_get_inode(struct inode *parent, inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; ip = DLMFS_I(inode); - ip->ip_dlm = DLMFS_I(parent)->ip_dlm; + ip->ip_conn = DLMFS_I(parent)->ip_conn; switch (mode & S_IFMT) { default: @@ -425,13 +486,12 @@ static int dlmfs_mkdir(struct inode * dir, struct inode *inode = NULL; struct qstr *domain = &dentry->d_name; struct dlmfs_inode_private *ip; - struct dlm_ctxt *dlm; - struct dlm_protocol_version proto = user_locking_protocol; + struct ocfs2_cluster_connection *conn; mlog(0, "mkdir %.*s\n", domain->len, domain->name); /* verify that we have a proper domain */ - if (domain->len >= O2NM_MAX_NAME_LEN) { + if (domain->len >= GROUP_NAME_MAX) { status = -EINVAL; mlog(ML_ERROR, "invalid domain name for directory.\n"); goto bail; @@ -446,14 +506,14 @@ static int dlmfs_mkdir(struct inode * dir, ip = DLMFS_I(inode); - dlm = user_dlm_register_context(domain, &proto); - if (IS_ERR(dlm)) { - status = PTR_ERR(dlm); + conn = user_dlm_register(domain); + if (IS_ERR(conn)) { + status = PTR_ERR(conn); mlog(ML_ERROR, "Error %d could not register domain \"%.*s\"\n", status, domain->len, domain->name); goto bail; } - ip->ip_dlm = dlm; + ip->ip_conn = conn; inc_nlink(dir); d_instantiate(dentry, inode); @@ -549,6 +609,7 @@ static int dlmfs_fill_super(struct super_block * sb, static const struct file_operations dlmfs_file_operations = { .open = dlmfs_file_open, .release = dlmfs_file_release, + .poll = dlmfs_file_poll, .read = dlmfs_file_read, .write = dlmfs_file_write, }; @@ -576,6 +637,7 @@ static const struct super_operations dlmfs_ops = { static const struct inode_operations dlmfs_file_inode_operations = { .getattr = simple_getattr, + .setattr = dlmfs_file_setattr, }; static int dlmfs_get_sb(struct file_system_type *fs_type, @@ -620,6 +682,7 @@ static int __init init_dlmfs_fs(void) } cleanup_worker = 1; + user_dlm_set_locking_protocol(); status = register_filesystem(&dlmfs_fs_type); bail: if (status) { diff --git a/fs/ocfs2/dlm/dlmfsver.c b/fs/ocfs2/dlmfs/dlmfsver.c index a733b3321f8..a733b3321f8 100644 --- a/fs/ocfs2/dlm/dlmfsver.c +++ b/fs/ocfs2/dlmfs/dlmfsver.c diff --git a/fs/ocfs2/dlm/dlmfsver.h b/fs/ocfs2/dlmfs/dlmfsver.h index f35eadbed25..f35eadbed25 100644 --- a/fs/ocfs2/dlm/dlmfsver.h +++ b/fs/ocfs2/dlmfs/dlmfsver.h diff --git a/fs/ocfs2/dlm/userdlm.c b/fs/ocfs2/dlmfs/userdlm.c index 4cb1d3dae25..0499e3fb7bd 100644 --- a/fs/ocfs2/dlm/userdlm.c +++ b/fs/ocfs2/dlmfs/userdlm.c @@ -34,18 +34,19 @@ #include <linux/types.h> #include <linux/crc32.h> - -#include "cluster/nodemanager.h" -#include "cluster/heartbeat.h" -#include "cluster/tcp.h" - -#include "dlmapi.h" - +#include "ocfs2_lockingver.h" +#include "stackglue.h" #include "userdlm.h" #define MLOG_MASK_PREFIX ML_DLMFS #include "cluster/masklog.h" + +static inline struct user_lock_res *user_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) +{ + return container_of(lksb, struct user_lock_res, l_lksb); +} + static inline int user_check_wait_flag(struct user_lock_res *lockres, int flag) { @@ -73,15 +74,15 @@ static inline void user_wait_on_blocked_lock(struct user_lock_res *lockres) } /* I heart container_of... */ -static inline struct dlm_ctxt * -dlm_ctxt_from_user_lockres(struct user_lock_res *lockres) +static inline struct ocfs2_cluster_connection * +cluster_connection_from_user_lockres(struct user_lock_res *lockres) { struct dlmfs_inode_private *ip; ip = container_of(lockres, struct dlmfs_inode_private, ip_lockres); - return ip->ip_dlm; + return ip->ip_conn; } static struct inode * @@ -103,9 +104,9 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres) } #define user_log_dlm_error(_func, _stat, _lockres) do { \ - mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \ - "resource %.*s: %s\n", dlm_errname(_stat), _func, \ - _lockres->l_namelen, _lockres->l_name, dlm_errmsg(_stat)); \ + mlog(ML_ERROR, "Dlm error %d while calling %s on " \ + "resource %.*s\n", _stat, _func, \ + _lockres->l_namelen, _lockres->l_name); \ } while (0) /* WARNING: This function lives in a world where the only three lock @@ -113,34 +114,35 @@ static inline void user_recover_from_dlm_error(struct user_lock_res *lockres) * lock types are added. */ static inline int user_highest_compat_lock_level(int level) { - int new_level = LKM_EXMODE; + int new_level = DLM_LOCK_EX; - if (level == LKM_EXMODE) - new_level = LKM_NLMODE; - else if (level == LKM_PRMODE) - new_level = LKM_PRMODE; + if (level == DLM_LOCK_EX) + new_level = DLM_LOCK_NL; + else if (level == DLM_LOCK_PR) + new_level = DLM_LOCK_PR; return new_level; } -static void user_ast(void *opaque) +static void user_ast(struct ocfs2_dlm_lksb *lksb) { - struct user_lock_res *lockres = opaque; - struct dlm_lockstatus *lksb; + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); + int status; - mlog(0, "AST fired for lockres %.*s\n", lockres->l_namelen, - lockres->l_name); + mlog(ML_BASTS, "AST fired for lockres %.*s, level %d => %d\n", + lockres->l_namelen, lockres->l_name, lockres->l_level, + lockres->l_requested); spin_lock(&lockres->l_lock); - lksb = &(lockres->l_lksb); - if (lksb->status != DLM_NORMAL) { + status = ocfs2_dlm_lock_status(&lockres->l_lksb); + if (status) { mlog(ML_ERROR, "lksb status value of %u on lockres %.*s\n", - lksb->status, lockres->l_namelen, lockres->l_name); + status, lockres->l_namelen, lockres->l_name); spin_unlock(&lockres->l_lock); return; } - mlog_bug_on_msg(lockres->l_requested == LKM_IVMODE, + mlog_bug_on_msg(lockres->l_requested == DLM_LOCK_IV, "Lockres %.*s, requested ivmode. flags 0x%x\n", lockres->l_namelen, lockres->l_name, lockres->l_flags); @@ -148,13 +150,13 @@ static void user_ast(void *opaque) if (lockres->l_requested < lockres->l_level) { if (lockres->l_requested <= user_highest_compat_lock_level(lockres->l_blocking)) { - lockres->l_blocking = LKM_NLMODE; + lockres->l_blocking = DLM_LOCK_NL; lockres->l_flags &= ~USER_LOCK_BLOCKED; } } lockres->l_level = lockres->l_requested; - lockres->l_requested = LKM_IVMODE; + lockres->l_requested = DLM_LOCK_IV; lockres->l_flags |= USER_LOCK_ATTACHED; lockres->l_flags &= ~USER_LOCK_BUSY; @@ -193,11 +195,11 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres) return; switch (lockres->l_blocking) { - case LKM_EXMODE: + case DLM_LOCK_EX: if (!lockres->l_ex_holders && !lockres->l_ro_holders) queue = 1; break; - case LKM_PRMODE: + case DLM_LOCK_PR: if (!lockres->l_ex_holders) queue = 1; break; @@ -209,12 +211,12 @@ static void __user_dlm_cond_queue_lockres(struct user_lock_res *lockres) __user_dlm_queue_lockres(lockres); } -static void user_bast(void *opaque, int level) +static void user_bast(struct ocfs2_dlm_lksb *lksb, int level) { - struct user_lock_res *lockres = opaque; + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); - mlog(0, "Blocking AST fired for lockres %.*s. Blocking level %d\n", - lockres->l_namelen, lockres->l_name, level); + mlog(ML_BASTS, "BAST fired for lockres %.*s, blocking %d, level %d\n", + lockres->l_namelen, lockres->l_name, level, lockres->l_level); spin_lock(&lockres->l_lock); lockres->l_flags |= USER_LOCK_BLOCKED; @@ -227,15 +229,15 @@ static void user_bast(void *opaque, int level) wake_up(&lockres->l_event); } -static void user_unlock_ast(void *opaque, enum dlm_status status) +static void user_unlock_ast(struct ocfs2_dlm_lksb *lksb, int status) { - struct user_lock_res *lockres = opaque; + struct user_lock_res *lockres = user_lksb_to_lock_res(lksb); - mlog(0, "UNLOCK AST called on lock %.*s\n", lockres->l_namelen, - lockres->l_name); + mlog(ML_BASTS, "UNLOCK AST fired for lockres %.*s, flags 0x%x\n", + lockres->l_namelen, lockres->l_name, lockres->l_flags); - if (status != DLM_NORMAL && status != DLM_CANCELGRANT) - mlog(ML_ERROR, "Dlm returns status %d\n", status); + if (status) + mlog(ML_ERROR, "dlm returns status %d\n", status); spin_lock(&lockres->l_lock); /* The teardown flag gets set early during the unlock process, @@ -243,7 +245,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status) * for a concurrent cancel. */ if (lockres->l_flags & USER_LOCK_IN_TEARDOWN && !(lockres->l_flags & USER_LOCK_IN_CANCEL)) { - lockres->l_level = LKM_IVMODE; + lockres->l_level = DLM_LOCK_IV; } else if (status == DLM_CANCELGRANT) { /* We tried to cancel a convert request, but it was * already granted. Don't clear the busy flag - the @@ -254,7 +256,7 @@ static void user_unlock_ast(void *opaque, enum dlm_status status) } else { BUG_ON(!(lockres->l_flags & USER_LOCK_IN_CANCEL)); /* Cancel succeeded, we want to re-queue */ - lockres->l_requested = LKM_IVMODE; /* cancel an + lockres->l_requested = DLM_LOCK_IV; /* cancel an * upconvert * request. */ lockres->l_flags &= ~USER_LOCK_IN_CANCEL; @@ -271,6 +273,21 @@ out_noclear: wake_up(&lockres->l_event); } +/* + * This is the userdlmfs locking protocol version. + * + * See fs/ocfs2/dlmglue.c for more details on locking versions. + */ +static struct ocfs2_locking_protocol user_dlm_lproto = { + .lp_max_version = { + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, + }, + .lp_lock_ast = user_ast, + .lp_blocking_ast = user_bast, + .lp_unlock_ast = user_unlock_ast, +}; + static inline void user_dlm_drop_inode_ref(struct user_lock_res *lockres) { struct inode *inode; @@ -283,10 +300,10 @@ static void user_dlm_unblock_lock(struct work_struct *work) int new_level, status; struct user_lock_res *lockres = container_of(work, struct user_lock_res, l_work); - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); + struct ocfs2_cluster_connection *conn = + cluster_connection_from_user_lockres(lockres); - mlog(0, "processing lockres %.*s\n", lockres->l_namelen, - lockres->l_name); + mlog(0, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); spin_lock(&lockres->l_lock); @@ -304,17 +321,23 @@ static void user_dlm_unblock_lock(struct work_struct *work) * flag, and finally we might get another bast which re-queues * us before our ast for the downconvert is called. */ if (!(lockres->l_flags & USER_LOCK_BLOCKED)) { + mlog(ML_BASTS, "lockres %.*s USER_LOCK_BLOCKED\n", + lockres->l_namelen, lockres->l_name); spin_unlock(&lockres->l_lock); goto drop_ref; } if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { + mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_TEARDOWN\n", + lockres->l_namelen, lockres->l_name); spin_unlock(&lockres->l_lock); goto drop_ref; } if (lockres->l_flags & USER_LOCK_BUSY) { if (lockres->l_flags & USER_LOCK_IN_CANCEL) { + mlog(ML_BASTS, "lockres %.*s USER_LOCK_IN_CANCEL\n", + lockres->l_namelen, lockres->l_name); spin_unlock(&lockres->l_lock); goto drop_ref; } @@ -322,32 +345,31 @@ static void user_dlm_unblock_lock(struct work_struct *work) lockres->l_flags |= USER_LOCK_IN_CANCEL; spin_unlock(&lockres->l_lock); - status = dlmunlock(dlm, - &lockres->l_lksb, - LKM_CANCEL, - user_unlock_ast, - lockres); - if (status != DLM_NORMAL) - user_log_dlm_error("dlmunlock", status, lockres); + status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, + DLM_LKF_CANCEL); + if (status) + user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); goto drop_ref; } /* If there are still incompat holders, we can exit safely * without worrying about re-queueing this lock as that will * happen on the last call to user_cluster_unlock. */ - if ((lockres->l_blocking == LKM_EXMODE) + if ((lockres->l_blocking == DLM_LOCK_EX) && (lockres->l_ex_holders || lockres->l_ro_holders)) { spin_unlock(&lockres->l_lock); - mlog(0, "can't downconvert for ex: ro = %u, ex = %u\n", - lockres->l_ro_holders, lockres->l_ex_holders); + mlog(ML_BASTS, "lockres %.*s, EX/PR Holders %u,%u\n", + lockres->l_namelen, lockres->l_name, + lockres->l_ex_holders, lockres->l_ro_holders); goto drop_ref; } - if ((lockres->l_blocking == LKM_PRMODE) + if ((lockres->l_blocking == DLM_LOCK_PR) && lockres->l_ex_holders) { spin_unlock(&lockres->l_lock); - mlog(0, "can't downconvert for pr: ex = %u\n", - lockres->l_ex_holders); + mlog(ML_BASTS, "lockres %.*s, EX Holders %u\n", + lockres->l_namelen, lockres->l_name, + lockres->l_ex_holders); goto drop_ref; } @@ -355,22 +377,17 @@ static void user_dlm_unblock_lock(struct work_struct *work) new_level = user_highest_compat_lock_level(lockres->l_blocking); lockres->l_requested = new_level; lockres->l_flags |= USER_LOCK_BUSY; - mlog(0, "Downconvert lock from %d to %d\n", - lockres->l_level, new_level); + mlog(ML_BASTS, "lockres %.*s, downconvert %d => %d\n", + lockres->l_namelen, lockres->l_name, lockres->l_level, new_level); spin_unlock(&lockres->l_lock); /* need lock downconvert request now... */ - status = dlmlock(dlm, - new_level, - &lockres->l_lksb, - LKM_CONVERT|LKM_VALBLK, - lockres->l_name, - lockres->l_namelen, - user_ast, - lockres, - user_bast); - if (status != DLM_NORMAL) { - user_log_dlm_error("dlmlock", status, lockres); + status = ocfs2_dlm_lock(conn, new_level, &lockres->l_lksb, + DLM_LKF_CONVERT|DLM_LKF_VALBLK, + lockres->l_name, + lockres->l_namelen); + if (status) { + user_log_dlm_error("ocfs2_dlm_lock", status, lockres); user_recover_from_dlm_error(lockres); } @@ -382,10 +399,10 @@ static inline void user_dlm_inc_holders(struct user_lock_res *lockres, int level) { switch(level) { - case LKM_EXMODE: + case DLM_LOCK_EX: lockres->l_ex_holders++; break; - case LKM_PRMODE: + case DLM_LOCK_PR: lockres->l_ro_holders++; break; default: @@ -410,20 +427,19 @@ int user_dlm_cluster_lock(struct user_lock_res *lockres, int lkm_flags) { int status, local_flags; - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); + struct ocfs2_cluster_connection *conn = + cluster_connection_from_user_lockres(lockres); - if (level != LKM_EXMODE && - level != LKM_PRMODE) { + if (level != DLM_LOCK_EX && + level != DLM_LOCK_PR) { mlog(ML_ERROR, "lockres %.*s: invalid request!\n", lockres->l_namelen, lockres->l_name); status = -EINVAL; goto bail; } - mlog(0, "lockres %.*s: asking for %s lock, passed flags = 0x%x\n", - lockres->l_namelen, lockres->l_name, - (level == LKM_EXMODE) ? "LKM_EXMODE" : "LKM_PRMODE", - lkm_flags); + mlog(ML_BASTS, "lockres %.*s, level %d, flags = 0x%x\n", + lockres->l_namelen, lockres->l_name, level, lkm_flags); again: if (signal_pending(current)) { @@ -457,35 +473,26 @@ again: } if (level > lockres->l_level) { - local_flags = lkm_flags | LKM_VALBLK; - if (lockres->l_level != LKM_IVMODE) - local_flags |= LKM_CONVERT; + local_flags = lkm_flags | DLM_LKF_VALBLK; + if (lockres->l_level != DLM_LOCK_IV) + local_flags |= DLM_LKF_CONVERT; lockres->l_requested = level; lockres->l_flags |= USER_LOCK_BUSY; spin_unlock(&lockres->l_lock); - BUG_ON(level == LKM_IVMODE); - BUG_ON(level == LKM_NLMODE); + BUG_ON(level == DLM_LOCK_IV); + BUG_ON(level == DLM_LOCK_NL); /* call dlm_lock to upgrade lock now */ - status = dlmlock(dlm, - level, - &lockres->l_lksb, - local_flags, - lockres->l_name, - lockres->l_namelen, - user_ast, - lockres, - user_bast); - if (status != DLM_NORMAL) { - if ((lkm_flags & LKM_NOQUEUE) && - (status == DLM_NOTQUEUED)) - status = -EAGAIN; - else { - user_log_dlm_error("dlmlock", status, lockres); - status = -EINVAL; - } + status = ocfs2_dlm_lock(conn, level, &lockres->l_lksb, + local_flags, lockres->l_name, + lockres->l_namelen); + if (status) { + if ((lkm_flags & DLM_LKF_NOQUEUE) && + (status != -EAGAIN)) + user_log_dlm_error("ocfs2_dlm_lock", + status, lockres); user_recover_from_dlm_error(lockres); goto bail; } @@ -506,11 +513,11 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres, int level) { switch(level) { - case LKM_EXMODE: + case DLM_LOCK_EX: BUG_ON(!lockres->l_ex_holders); lockres->l_ex_holders--; break; - case LKM_PRMODE: + case DLM_LOCK_PR: BUG_ON(!lockres->l_ro_holders); lockres->l_ro_holders--; break; @@ -522,8 +529,8 @@ static inline void user_dlm_dec_holders(struct user_lock_res *lockres, void user_dlm_cluster_unlock(struct user_lock_res *lockres, int level) { - if (level != LKM_EXMODE && - level != LKM_PRMODE) { + if (level != DLM_LOCK_EX && + level != DLM_LOCK_PR) { mlog(ML_ERROR, "lockres %.*s: invalid request!\n", lockres->l_namelen, lockres->l_name); return; @@ -540,33 +547,40 @@ void user_dlm_write_lvb(struct inode *inode, unsigned int len) { struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; - char *lvb = lockres->l_lksb.lvb; + char *lvb; BUG_ON(len > DLM_LVB_LEN); spin_lock(&lockres->l_lock); - BUG_ON(lockres->l_level < LKM_EXMODE); + BUG_ON(lockres->l_level < DLM_LOCK_EX); + lvb = ocfs2_dlm_lvb(&lockres->l_lksb); memcpy(lvb, val, len); spin_unlock(&lockres->l_lock); } -void user_dlm_read_lvb(struct inode *inode, - char *val, - unsigned int len) +ssize_t user_dlm_read_lvb(struct inode *inode, + char *val, + unsigned int len) { struct user_lock_res *lockres = &DLMFS_I(inode)->ip_lockres; - char *lvb = lockres->l_lksb.lvb; + char *lvb; + ssize_t ret = len; BUG_ON(len > DLM_LVB_LEN); spin_lock(&lockres->l_lock); - BUG_ON(lockres->l_level < LKM_PRMODE); - memcpy(val, lvb, len); + BUG_ON(lockres->l_level < DLM_LOCK_PR); + if (ocfs2_dlm_lvb_valid(&lockres->l_lksb)) { + lvb = ocfs2_dlm_lvb(&lockres->l_lksb); + memcpy(val, lvb, len); + } else + ret = 0; spin_unlock(&lockres->l_lock); + return ret; } void user_dlm_lock_res_init(struct user_lock_res *lockres, @@ -576,9 +590,9 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres, spin_lock_init(&lockres->l_lock); init_waitqueue_head(&lockres->l_event); - lockres->l_level = LKM_IVMODE; - lockres->l_requested = LKM_IVMODE; - lockres->l_blocking = LKM_IVMODE; + lockres->l_level = DLM_LOCK_IV; + lockres->l_requested = DLM_LOCK_IV; + lockres->l_blocking = DLM_LOCK_IV; /* should have been checked before getting here. */ BUG_ON(dentry->d_name.len >= USER_DLM_LOCK_ID_MAX_LEN); @@ -592,9 +606,10 @@ void user_dlm_lock_res_init(struct user_lock_res *lockres, int user_dlm_destroy_lock(struct user_lock_res *lockres) { int status = -EBUSY; - struct dlm_ctxt *dlm = dlm_ctxt_from_user_lockres(lockres); + struct ocfs2_cluster_connection *conn = + cluster_connection_from_user_lockres(lockres); - mlog(0, "asked to destroy %.*s\n", lockres->l_namelen, lockres->l_name); + mlog(ML_BASTS, "lockres %.*s\n", lockres->l_namelen, lockres->l_name); spin_lock(&lockres->l_lock); if (lockres->l_flags & USER_LOCK_IN_TEARDOWN) { @@ -627,14 +642,9 @@ int user_dlm_destroy_lock(struct user_lock_res *lockres) lockres->l_flags |= USER_LOCK_BUSY; spin_unlock(&lockres->l_lock); - status = dlmunlock(dlm, - &lockres->l_lksb, - LKM_VALBLK, - user_unlock_ast, - lockres); - if (status != DLM_NORMAL) { - user_log_dlm_error("dlmunlock", status, lockres); - status = -EINVAL; + status = ocfs2_dlm_unlock(conn, &lockres->l_lksb, DLM_LKF_VALBLK); + if (status) { + user_log_dlm_error("ocfs2_dlm_unlock", status, lockres); goto bail; } @@ -645,32 +655,34 @@ bail: return status; } -struct dlm_ctxt *user_dlm_register_context(struct qstr *name, - struct dlm_protocol_version *proto) +static void user_dlm_recovery_handler_noop(int node_num, + void *recovery_data) { - struct dlm_ctxt *dlm; - u32 dlm_key; - char *domain; - - domain = kmalloc(name->len + 1, GFP_NOFS); - if (!domain) { - mlog_errno(-ENOMEM); - return ERR_PTR(-ENOMEM); - } + /* We ignore recovery events */ + return; +} - dlm_key = crc32_le(0, name->name, name->len); +void user_dlm_set_locking_protocol(void) +{ + ocfs2_stack_glue_set_max_proto_version(&user_dlm_lproto.lp_max_version); +} - snprintf(domain, name->len + 1, "%.*s", name->len, name->name); +struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name) +{ + int rc; + struct ocfs2_cluster_connection *conn; - dlm = dlm_register_domain(domain, dlm_key, proto); - if (IS_ERR(dlm)) - mlog_errno(PTR_ERR(dlm)); + rc = ocfs2_cluster_connect_agnostic(name->name, name->len, + &user_dlm_lproto, + user_dlm_recovery_handler_noop, + NULL, &conn); + if (rc) + mlog_errno(rc); - kfree(domain); - return dlm; + return rc ? ERR_PTR(rc) : conn; } -void user_dlm_unregister_context(struct dlm_ctxt *dlm) +void user_dlm_unregister(struct ocfs2_cluster_connection *conn) { - dlm_unregister_domain(dlm); + ocfs2_cluster_disconnect(conn, 0); } diff --git a/fs/ocfs2/dlm/userdlm.h b/fs/ocfs2/dlmfs/userdlm.h index 0c3cc03c61f..3b42d79531d 100644 --- a/fs/ocfs2/dlm/userdlm.h +++ b/fs/ocfs2/dlmfs/userdlm.h @@ -57,7 +57,7 @@ struct user_lock_res { int l_level; unsigned int l_ro_holders; unsigned int l_ex_holders; - struct dlm_lockstatus l_lksb; + struct ocfs2_dlm_lksb l_lksb; int l_requested; int l_blocking; @@ -80,15 +80,15 @@ void user_dlm_cluster_unlock(struct user_lock_res *lockres, void user_dlm_write_lvb(struct inode *inode, const char *val, unsigned int len); -void user_dlm_read_lvb(struct inode *inode, - char *val, - unsigned int len); -struct dlm_ctxt *user_dlm_register_context(struct qstr *name, - struct dlm_protocol_version *proto); -void user_dlm_unregister_context(struct dlm_ctxt *dlm); +ssize_t user_dlm_read_lvb(struct inode *inode, + char *val, + unsigned int len); +struct ocfs2_cluster_connection *user_dlm_register(struct qstr *name); +void user_dlm_unregister(struct ocfs2_cluster_connection *conn); +void user_dlm_set_locking_protocol(void); struct dlmfs_inode_private { - struct dlm_ctxt *ip_dlm; + struct ocfs2_cluster_connection *ip_conn; struct user_lock_res ip_lockres; /* unused for directories. */ struct inode *ip_parent; diff --git a/fs/ocfs2/dlmglue.c b/fs/ocfs2/dlmglue.c index e044019cb3b..8298608d416 100644 --- a/fs/ocfs2/dlmglue.c +++ b/fs/ocfs2/dlmglue.c @@ -297,6 +297,11 @@ static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres) lockres->l_type == OCFS2_LOCK_TYPE_OPEN; } +static inline struct ocfs2_lock_res *ocfs2_lksb_to_lock_res(struct ocfs2_dlm_lksb *lksb) +{ + return container_of(lksb, struct ocfs2_lock_res, l_lksb); +} + static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres) { BUG_ON(!ocfs2_is_inode_lock(lockres)); @@ -927,6 +932,10 @@ static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, lockres->l_blocking = level; } + mlog(ML_BASTS, "lockres %s, block %d, level %d, l_block %d, dwn %d\n", + lockres->l_name, level, lockres->l_level, lockres->l_blocking, + needs_downconvert); + if (needs_downconvert) lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); @@ -1040,18 +1049,17 @@ static unsigned int lockres_set_pending(struct ocfs2_lock_res *lockres) return lockres->l_pending_gen; } - -static void ocfs2_blocking_ast(void *opaque, int level) +static void ocfs2_blocking_ast(struct ocfs2_dlm_lksb *lksb, int level) { - struct ocfs2_lock_res *lockres = opaque; + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); int needs_downconvert; unsigned long flags; BUG_ON(level <= DLM_LOCK_NL); - mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n", - lockres->l_name, level, lockres->l_level, + mlog(ML_BASTS, "BAST fired for lockres %s, blocking %d, level %d, " + "type %s\n", lockres->l_name, level, lockres->l_level, ocfs2_lock_type_string(lockres->l_type)); /* @@ -1072,9 +1080,9 @@ static void ocfs2_blocking_ast(void *opaque, int level) ocfs2_wake_downconvert_thread(osb); } -static void ocfs2_locking_ast(void *opaque) +static void ocfs2_locking_ast(struct ocfs2_dlm_lksb *lksb) { - struct ocfs2_lock_res *lockres = opaque; + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres); unsigned long flags; int status; @@ -1095,6 +1103,10 @@ static void ocfs2_locking_ast(void *opaque) return; } + mlog(ML_BASTS, "AST fired for lockres %s, action %d, unlock %d, " + "level %d => %d\n", lockres->l_name, lockres->l_action, + lockres->l_unlock_action, lockres->l_level, lockres->l_requested); + switch(lockres->l_action) { case OCFS2_AST_ATTACH: ocfs2_generic_handle_attach_action(lockres); @@ -1107,8 +1119,8 @@ static void ocfs2_locking_ast(void *opaque) ocfs2_generic_handle_downconvert_action(lockres); break; default: - mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " - "lockres flags = 0x%lx, unlock action: %u\n", + mlog(ML_ERROR, "lockres %s: AST fired with invalid action: %u, " + "flags 0x%lx, unlock: %u\n", lockres->l_name, lockres->l_action, lockres->l_flags, lockres->l_unlock_action); BUG(); @@ -1134,6 +1146,88 @@ out: spin_unlock_irqrestore(&lockres->l_lock, flags); } +static void ocfs2_unlock_ast(struct ocfs2_dlm_lksb *lksb, int error) +{ + struct ocfs2_lock_res *lockres = ocfs2_lksb_to_lock_res(lksb); + unsigned long flags; + + mlog_entry_void(); + + mlog(ML_BASTS, "UNLOCK AST fired for lockres %s, action = %d\n", + lockres->l_name, lockres->l_unlock_action); + + spin_lock_irqsave(&lockres->l_lock, flags); + if (error) { + mlog(ML_ERROR, "Dlm passes error %d for lock %s, " + "unlock_action %d\n", error, lockres->l_name, + lockres->l_unlock_action); + spin_unlock_irqrestore(&lockres->l_lock, flags); + mlog_exit_void(); + return; + } + + switch(lockres->l_unlock_action) { + case OCFS2_UNLOCK_CANCEL_CONVERT: + mlog(0, "Cancel convert success for %s\n", lockres->l_name); + lockres->l_action = OCFS2_AST_INVALID; + /* Downconvert thread may have requeued this lock, we + * need to wake it. */ + if (lockres->l_flags & OCFS2_LOCK_BLOCKED) + ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); + break; + case OCFS2_UNLOCK_DROP_LOCK: + lockres->l_level = DLM_LOCK_IV; + break; + default: + BUG(); + } + + lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); + lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; + wake_up(&lockres->l_event); + spin_unlock_irqrestore(&lockres->l_lock, flags); + + mlog_exit_void(); +} + +/* + * This is the filesystem locking protocol. It provides the lock handling + * hooks for the underlying DLM. It has a maximum version number. + * The version number allows interoperability with systems running at + * the same major number and an equal or smaller minor number. + * + * Whenever the filesystem does new things with locks (adds or removes a + * lock, orders them differently, does different things underneath a lock), + * the version must be changed. The protocol is negotiated when joining + * the dlm domain. A node may join the domain if its major version is + * identical to all other nodes and its minor version is greater than + * or equal to all other nodes. When its minor version is greater than + * the other nodes, it will run at the minor version specified by the + * other nodes. + * + * If a locking change is made that will not be compatible with older + * versions, the major number must be increased and the minor version set + * to zero. If a change merely adds a behavior that can be disabled when + * speaking to older versions, the minor version must be increased. If a + * change adds a fully backwards compatible change (eg, LVB changes that + * are just ignored by older versions), the version does not need to be + * updated. + */ +static struct ocfs2_locking_protocol lproto = { + .lp_max_version = { + .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, + .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, + }, + .lp_lock_ast = ocfs2_locking_ast, + .lp_blocking_ast = ocfs2_blocking_ast, + .lp_unlock_ast = ocfs2_unlock_ast, +}; + +void ocfs2_set_locking_protocol(void) +{ + ocfs2_stack_glue_set_max_proto_version(&lproto.lp_max_version); +} + static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, int convert) { @@ -1189,8 +1283,7 @@ static int ocfs2_lock_create(struct ocfs2_super *osb, &lockres->l_lksb, dlm_flags, lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + OCFS2_LOCK_ID_MAX_LEN - 1); lockres_clear_pending(lockres, gen, osb); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); @@ -1412,7 +1505,7 @@ again: BUG_ON(level == DLM_LOCK_IV); BUG_ON(level == DLM_LOCK_NL); - mlog(0, "lock %s, convert from %d to level = %d\n", + mlog(ML_BASTS, "lockres %s, convert from %d to %d\n", lockres->l_name, lockres->l_level, level); /* call dlm_lock to upgrade lock now */ @@ -1421,8 +1514,7 @@ again: &lockres->l_lksb, lkm_flags, lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + OCFS2_LOCK_ID_MAX_LEN - 1); lockres_clear_pending(lockres, gen, osb); if (ret) { if (!(lkm_flags & DLM_LKF_NOQUEUE) || @@ -1859,8 +1951,7 @@ int ocfs2_file_lock(struct file *file, int ex, int trylock) spin_unlock_irqrestore(&lockres->l_lock, flags); ret = ocfs2_dlm_lock(osb->cconn, level, &lockres->l_lksb, lkm_flags, - lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + lockres->l_name, OCFS2_LOCK_ID_MAX_LEN - 1); if (ret) { if (!trylock || (ret != -EAGAIN)) { ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); @@ -2989,7 +3080,7 @@ int ocfs2_dlm_init(struct ocfs2_super *osb) status = ocfs2_cluster_connect(osb->osb_cluster_stack, osb->uuid_str, strlen(osb->uuid_str), - ocfs2_do_node_down, osb, + &lproto, ocfs2_do_node_down, osb, &conn); if (status) { mlog_errno(status); @@ -3056,50 +3147,6 @@ void ocfs2_dlm_shutdown(struct ocfs2_super *osb, mlog_exit_void(); } -static void ocfs2_unlock_ast(void *opaque, int error) -{ - struct ocfs2_lock_res *lockres = opaque; - unsigned long flags; - - mlog_entry_void(); - - mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, - lockres->l_unlock_action); - - spin_lock_irqsave(&lockres->l_lock, flags); - if (error) { - mlog(ML_ERROR, "Dlm passes error %d for lock %s, " - "unlock_action %d\n", error, lockres->l_name, - lockres->l_unlock_action); - spin_unlock_irqrestore(&lockres->l_lock, flags); - mlog_exit_void(); - return; - } - - switch(lockres->l_unlock_action) { - case OCFS2_UNLOCK_CANCEL_CONVERT: - mlog(0, "Cancel convert success for %s\n", lockres->l_name); - lockres->l_action = OCFS2_AST_INVALID; - /* Downconvert thread may have requeued this lock, we - * need to wake it. */ - if (lockres->l_flags & OCFS2_LOCK_BLOCKED) - ocfs2_wake_downconvert_thread(ocfs2_get_lockres_osb(lockres)); - break; - case OCFS2_UNLOCK_DROP_LOCK: - lockres->l_level = DLM_LOCK_IV; - break; - default: - BUG(); - } - - lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); - lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; - wake_up(&lockres->l_event); - spin_unlock_irqrestore(&lockres->l_lock, flags); - - mlog_exit_void(); -} - static int ocfs2_drop_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { @@ -3167,8 +3214,7 @@ static int ocfs2_drop_lock(struct ocfs2_super *osb, mlog(0, "lock %s\n", lockres->l_name); - ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags, - lockres); + ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, lkm_flags); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); @@ -3276,13 +3322,20 @@ static unsigned int ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres, BUG_ON(lockres->l_blocking <= DLM_LOCK_NL); if (lockres->l_level <= new_level) { - mlog(ML_ERROR, "lockres->l_level (%d) <= new_level (%d)\n", - lockres->l_level, new_level); + mlog(ML_ERROR, "lockres %s, lvl %d <= %d, blcklst %d, mask %d, " + "type %d, flags 0x%lx, hold %d %d, act %d %d, req %d, " + "block %d, pgen %d\n", lockres->l_name, lockres->l_level, + new_level, list_empty(&lockres->l_blocked_list), + list_empty(&lockres->l_mask_waiters), lockres->l_type, + lockres->l_flags, lockres->l_ro_holders, + lockres->l_ex_holders, lockres->l_action, + lockres->l_unlock_action, lockres->l_requested, + lockres->l_blocking, lockres->l_pending_gen); BUG(); } - mlog(0, "lock %s, new_level = %d, l_blocking = %d\n", - lockres->l_name, new_level, lockres->l_blocking); + mlog(ML_BASTS, "lockres %s, level %d => %d, blocking %d\n", + lockres->l_name, lockres->l_level, new_level, lockres->l_blocking); lockres->l_action = OCFS2_AST_DOWNCONVERT; lockres->l_requested = new_level; @@ -3301,6 +3354,9 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb, mlog_entry_void(); + mlog(ML_BASTS, "lockres %s, level %d => %d\n", lockres->l_name, + lockres->l_level, new_level); + if (lvb) dlm_flags |= DLM_LKF_VALBLK; @@ -3309,8 +3365,7 @@ static int ocfs2_downconvert_lock(struct ocfs2_super *osb, &lockres->l_lksb, dlm_flags, lockres->l_name, - OCFS2_LOCK_ID_MAX_LEN - 1, - lockres); + OCFS2_LOCK_ID_MAX_LEN - 1); lockres_clear_pending(lockres, generation, osb); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_lock", ret, lockres); @@ -3331,14 +3386,12 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, assert_spin_locked(&lockres->l_lock); mlog_entry_void(); - mlog(0, "lock %s\n", lockres->l_name); if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { /* If we're already trying to cancel a lock conversion * then just drop the spinlock and allow the caller to * requeue this lock. */ - - mlog(0, "Lockres %s, skip convert\n", lockres->l_name); + mlog(ML_BASTS, "lockres %s, skip convert\n", lockres->l_name); return 0; } @@ -3353,6 +3406,8 @@ static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb, "lock %s, invalid flags: 0x%lx\n", lockres->l_name, lockres->l_flags); + mlog(ML_BASTS, "lockres %s\n", lockres->l_name); + return 1; } @@ -3362,16 +3417,15 @@ static int ocfs2_cancel_convert(struct ocfs2_super *osb, int ret; mlog_entry_void(); - mlog(0, "lock %s\n", lockres->l_name); ret = ocfs2_dlm_unlock(osb->cconn, &lockres->l_lksb, - DLM_LKF_CANCEL, lockres); + DLM_LKF_CANCEL); if (ret) { ocfs2_log_dlm_error("ocfs2_dlm_unlock", ret, lockres); ocfs2_recover_from_dlm_error(lockres, 0); } - mlog(0, "lock %s return from ocfs2_dlm_unlock\n", lockres->l_name); + mlog(ML_BASTS, "lockres %s\n", lockres->l_name); mlog_exit(ret); return ret; @@ -3428,8 +3482,11 @@ recheck: * at the same time they set OCFS2_DLM_BUSY. They must * clear OCFS2_DLM_PENDING after dlm_lock() returns. */ - if (lockres->l_flags & OCFS2_LOCK_PENDING) + if (lockres->l_flags & OCFS2_LOCK_PENDING) { + mlog(ML_BASTS, "lockres %s, ReQ: Pending\n", + lockres->l_name); goto leave_requeue; + } ctl->requeue = 1; ret = ocfs2_prepare_cancel_convert(osb, lockres); @@ -3461,6 +3518,7 @@ recheck: */ if (lockres->l_level == DLM_LOCK_NL) { BUG_ON(lockres->l_ex_holders || lockres->l_ro_holders); + mlog(ML_BASTS, "lockres %s, Aborting dc\n", lockres->l_name); lockres->l_blocking = DLM_LOCK_NL; lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED); spin_unlock_irqrestore(&lockres->l_lock, flags); @@ -3470,28 +3528,41 @@ recheck: /* if we're blocking an exclusive and we have *any* holders, * then requeue. */ if ((lockres->l_blocking == DLM_LOCK_EX) - && (lockres->l_ex_holders || lockres->l_ro_holders)) + && (lockres->l_ex_holders || lockres->l_ro_holders)) { + mlog(ML_BASTS, "lockres %s, ReQ: EX/PR Holders %u,%u\n", + lockres->l_name, lockres->l_ex_holders, + lockres->l_ro_holders); goto leave_requeue; + } /* If it's a PR we're blocking, then only * requeue if we've got any EX holders */ if (lockres->l_blocking == DLM_LOCK_PR && - lockres->l_ex_holders) + lockres->l_ex_holders) { + mlog(ML_BASTS, "lockres %s, ReQ: EX Holders %u\n", + lockres->l_name, lockres->l_ex_holders); goto leave_requeue; + } /* * Can we get a lock in this state if the holder counts are * zero? The meta data unblock code used to check this. */ if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH) - && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) + && (lockres->l_flags & OCFS2_LOCK_REFRESHING)) { + mlog(ML_BASTS, "lockres %s, ReQ: Lock Refreshing\n", + lockres->l_name); goto leave_requeue; + } new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); if (lockres->l_ops->check_downconvert - && !lockres->l_ops->check_downconvert(lockres, new_level)) + && !lockres->l_ops->check_downconvert(lockres, new_level)) { + mlog(ML_BASTS, "lockres %s, ReQ: Checkpointing\n", + lockres->l_name); goto leave_requeue; + } /* If we get here, then we know that there are no more * incompatible holders (and anyone asking for an incompatible @@ -3509,13 +3580,19 @@ recheck: ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking); - if (ctl->unblock_action == UNBLOCK_STOP_POST) + if (ctl->unblock_action == UNBLOCK_STOP_POST) { + mlog(ML_BASTS, "lockres %s, UNBLOCK_STOP_POST\n", + lockres->l_name); goto leave; + } spin_lock_irqsave(&lockres->l_lock, flags); if ((blocking != lockres->l_blocking) || (level != lockres->l_level)) { /* If this changed underneath us, then we can't drop * it just yet. */ + mlog(ML_BASTS, "lockres %s, block=%d:%d, level=%d:%d, " + "Recheck\n", lockres->l_name, blocking, + lockres->l_blocking, level, lockres->l_level); goto recheck; } @@ -3910,45 +3987,6 @@ void ocfs2_refcount_unlock(struct ocfs2_refcount_tree *ref_tree, int ex) ocfs2_cluster_unlock(osb, lockres, level); } -/* - * This is the filesystem locking protocol. It provides the lock handling - * hooks for the underlying DLM. It has a maximum version number. - * The version number allows interoperability with systems running at - * the same major number and an equal or smaller minor number. - * - * Whenever the filesystem does new things with locks (adds or removes a - * lock, orders them differently, does different things underneath a lock), - * the version must be changed. The protocol is negotiated when joining - * the dlm domain. A node may join the domain if its major version is - * identical to all other nodes and its minor version is greater than - * or equal to all other nodes. When its minor version is greater than - * the other nodes, it will run at the minor version specified by the - * other nodes. - * - * If a locking change is made that will not be compatible with older - * versions, the major number must be increased and the minor version set - * to zero. If a change merely adds a behavior that can be disabled when - * speaking to older versions, the minor version must be increased. If a - * change adds a fully backwards compatible change (eg, LVB changes that - * are just ignored by older versions), the version does not need to be - * updated. - */ -static struct ocfs2_locking_protocol lproto = { - .lp_max_version = { - .pv_major = OCFS2_LOCKING_PROTOCOL_MAJOR, - .pv_minor = OCFS2_LOCKING_PROTOCOL_MINOR, - }, - .lp_lock_ast = ocfs2_locking_ast, - .lp_blocking_ast = ocfs2_blocking_ast, - .lp_unlock_ast = ocfs2_unlock_ast, -}; - -void ocfs2_set_locking_protocol(void) -{ - ocfs2_stack_glue_set_locking_protocol(&lproto); -} - - static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres) { @@ -3965,7 +4003,7 @@ static void ocfs2_process_blocked_lock(struct ocfs2_super *osb, BUG_ON(!lockres); BUG_ON(!lockres->l_ops); - mlog(0, "lockres %s blocked.\n", lockres->l_name); + mlog(ML_BASTS, "lockres %s blocked\n", lockres->l_name); /* Detect whether a lock has been marked as going away while * the downconvert thread was processing other things. A lock can @@ -3988,7 +4026,7 @@ unqueue: } else ocfs2_schedule_blocked_lock(osb, lockres); - mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name, + mlog(ML_BASTS, "lockres %s, requeue = %s.\n", lockres->l_name, ctl.requeue ? "yes" : "no"); spin_unlock_irqrestore(&lockres->l_lock, flags); @@ -4010,7 +4048,7 @@ static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb, /* Do not schedule a lock for downconvert when it's on * the way to destruction - any nodes wanting access * to the resource will get it soon. */ - mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n", + mlog(ML_BASTS, "lockres %s won't be scheduled: flags 0x%lx\n", lockres->l_name, lockres->l_flags); return; } diff --git a/fs/ocfs2/file.c b/fs/ocfs2/file.c index 558ce031242..5b52547d629 100644 --- a/fs/ocfs2/file.c +++ b/fs/ocfs2/file.c @@ -993,10 +993,9 @@ int ocfs2_setattr(struct dentry *dentry, struct iattr *attr) } if (size_change && attr->ia_size != i_size_read(inode)) { - if (attr->ia_size > sb->s_maxbytes) { - status = -EFBIG; + status = inode_newsize_ok(inode, attr->ia_size); + if (status) goto bail_unlock; - } if (i_size_read(inode) > attr->ia_size) { if (ocfs2_should_order_data(inode)) { @@ -1836,6 +1835,8 @@ static int ocfs2_prepare_inode_for_write(struct dentry *dentry, &meta_level); if (has_refcount) *has_refcount = 1; + if (direct_io) + *direct_io = 0; } if (ret < 0) { @@ -1859,10 +1860,6 @@ static int ocfs2_prepare_inode_for_write(struct dentry *dentry, break; } - if (has_refcount && *has_refcount == 1) { - *direct_io = 0; - break; - } /* * Allowing concurrent direct writes means * i_size changes wouldn't be synchronized, so @@ -2043,7 +2040,7 @@ out_dio: * async dio is going to do it in the future or an end_io after an * error has already done it. */ - if (ret == -EIOCBQUEUED || !ocfs2_iocb_is_rw_locked(iocb)) { + if ((ret == -EIOCBQUEUED) || (!ocfs2_iocb_is_rw_locked(iocb))) { rw_level = -1; have_alloc_sem = 0; } diff --git a/fs/ocfs2/ioctl.h b/fs/ocfs2/ioctl.h index cf9a5ee30fe..0cd5323bd3f 100644 --- a/fs/ocfs2/ioctl.h +++ b/fs/ocfs2/ioctl.h @@ -7,10 +7,10 @@ * */ -#ifndef OCFS2_IOCTL_H -#define OCFS2_IOCTL_H +#ifndef OCFS2_IOCTL_PROTO_H +#define OCFS2_IOCTL_PROTO_H long ocfs2_ioctl(struct file *filp, unsigned int cmd, unsigned long arg); long ocfs2_compat_ioctl(struct file *file, unsigned cmd, unsigned long arg); -#endif /* OCFS2_IOCTL_H */ +#endif /* OCFS2_IOCTL_PROTO_H */ diff --git a/fs/ocfs2/localalloc.c b/fs/ocfs2/localalloc.c index ac10f83edb9..ca992d91f51 100644 --- a/fs/ocfs2/localalloc.c +++ b/fs/ocfs2/localalloc.c @@ -476,7 +476,7 @@ out_mutex: out: if (!status) - ocfs2_init_inode_steal_slot(osb); + ocfs2_init_steal_slots(osb); mlog_exit(status); return status; } diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h index 740f448041e..1238b491db9 100644 --- a/fs/ocfs2/ocfs2.h +++ b/fs/ocfs2/ocfs2.h @@ -42,6 +42,7 @@ #include "ocfs2_fs.h" #include "ocfs2_lockid.h" +#include "ocfs2_ioctl.h" /* For struct ocfs2_blockcheck_stats */ #include "blockcheck.h" @@ -159,7 +160,7 @@ struct ocfs2_lock_res { int l_level; unsigned int l_ro_holders; unsigned int l_ex_holders; - union ocfs2_dlm_lksb l_lksb; + struct ocfs2_dlm_lksb l_lksb; /* used from AST/BAST funcs. */ enum ocfs2_ast_action l_action; @@ -305,7 +306,9 @@ struct ocfs2_super u32 s_next_generation; unsigned long osb_flags; s16 s_inode_steal_slot; + s16 s_meta_steal_slot; atomic_t s_num_inodes_stolen; + atomic_t s_num_meta_stolen; unsigned long s_mount_opt; unsigned int s_atime_quantum; @@ -760,33 +763,6 @@ static inline unsigned int ocfs2_megabytes_to_clusters(struct super_block *sb, return megs << (20 - OCFS2_SB(sb)->s_clustersize_bits); } -static inline void ocfs2_init_inode_steal_slot(struct ocfs2_super *osb) -{ - spin_lock(&osb->osb_lock); - osb->s_inode_steal_slot = OCFS2_INVALID_SLOT; - spin_unlock(&osb->osb_lock); - atomic_set(&osb->s_num_inodes_stolen, 0); -} - -static inline void ocfs2_set_inode_steal_slot(struct ocfs2_super *osb, - s16 slot) -{ - spin_lock(&osb->osb_lock); - osb->s_inode_steal_slot = slot; - spin_unlock(&osb->osb_lock); -} - -static inline s16 ocfs2_get_inode_steal_slot(struct ocfs2_super *osb) -{ - s16 slot; - - spin_lock(&osb->osb_lock); - slot = osb->s_inode_steal_slot; - spin_unlock(&osb->osb_lock); - - return slot; -} - #define ocfs2_set_bit ext2_set_bit #define ocfs2_clear_bit ext2_clear_bit #define ocfs2_test_bit ext2_test_bit diff --git a/fs/ocfs2/ocfs2_fs.h b/fs/ocfs2/ocfs2_fs.h index 7638a38c32b..bb37218a797 100644 --- a/fs/ocfs2/ocfs2_fs.h +++ b/fs/ocfs2/ocfs2_fs.h @@ -254,63 +254,6 @@ * refcount tree */ /* - * ioctl commands - */ -#define OCFS2_IOC_GETFLAGS _IOR('f', 1, long) -#define OCFS2_IOC_SETFLAGS _IOW('f', 2, long) -#define OCFS2_IOC32_GETFLAGS _IOR('f', 1, int) -#define OCFS2_IOC32_SETFLAGS _IOW('f', 2, int) - -/* - * Space reservation / allocation / free ioctls and argument structure - * are designed to be compatible with XFS. - * - * ALLOCSP* and FREESP* are not and will never be supported, but are - * included here for completeness. - */ -struct ocfs2_space_resv { - __s16 l_type; - __s16 l_whence; - __s64 l_start; - __s64 l_len; /* len == 0 means until end of file */ - __s32 l_sysid; - __u32 l_pid; - __s32 l_pad[4]; /* reserve area */ -}; - -#define OCFS2_IOC_ALLOCSP _IOW ('X', 10, struct ocfs2_space_resv) -#define OCFS2_IOC_FREESP _IOW ('X', 11, struct ocfs2_space_resv) -#define OCFS2_IOC_RESVSP _IOW ('X', 40, struct ocfs2_space_resv) -#define OCFS2_IOC_UNRESVSP _IOW ('X', 41, struct ocfs2_space_resv) -#define OCFS2_IOC_ALLOCSP64 _IOW ('X', 36, struct ocfs2_space_resv) -#define OCFS2_IOC_FREESP64 _IOW ('X', 37, struct ocfs2_space_resv) -#define OCFS2_IOC_RESVSP64 _IOW ('X', 42, struct ocfs2_space_resv) -#define OCFS2_IOC_UNRESVSP64 _IOW ('X', 43, struct ocfs2_space_resv) - -/* Used to pass group descriptor data when online resize is done */ -struct ocfs2_new_group_input { - __u64 group; /* Group descriptor's blkno. */ - __u32 clusters; /* Total number of clusters in this group */ - __u32 frees; /* Total free clusters in this group */ - __u16 chain; /* Chain for this group */ - __u16 reserved1; - __u32 reserved2; -}; - -#define OCFS2_IOC_GROUP_EXTEND _IOW('o', 1, int) -#define OCFS2_IOC_GROUP_ADD _IOW('o', 2,struct ocfs2_new_group_input) -#define OCFS2_IOC_GROUP_ADD64 _IOW('o', 3,struct ocfs2_new_group_input) - -/* Used to pass 2 file names to reflink. */ -struct reflink_arguments { - __u64 old_path; - __u64 new_path; - __u64 preserve; -}; -#define OCFS2_IOC_REFLINK _IOW('o', 4, struct reflink_arguments) - - -/* * Journal Flags (ocfs2_dinode.id1.journal1.i_flags) */ #define OCFS2_JOURNAL_DIRTY_FL (0x00000001) /* Journal needs recovery */ diff --git a/fs/ocfs2/ocfs2_ioctl.h b/fs/ocfs2/ocfs2_ioctl.h new file mode 100644 index 00000000000..2d3420af1a8 --- /dev/null +++ b/fs/ocfs2/ocfs2_ioctl.h @@ -0,0 +1,79 @@ +/* -*- mode: c; c-basic-offset: 8; -*- + * vim: noexpandtab sw=8 ts=8 sts=0: + * + * ocfs2_ioctl.h + * + * Defines OCFS2 ioctls. + * + * Copyright (C) 2010 Oracle. All rights reserved. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public + * License, version 2, as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + */ + +#ifndef OCFS2_IOCTL_H +#define OCFS2_IOCTL_H + +/* + * ioctl commands + */ +#define OCFS2_IOC_GETFLAGS _IOR('f', 1, long) +#define OCFS2_IOC_SETFLAGS _IOW('f', 2, long) +#define OCFS2_IOC32_GETFLAGS _IOR('f', 1, int) +#define OCFS2_IOC32_SETFLAGS _IOW('f', 2, int) + +/* + * Space reservation / allocation / free ioctls and argument structure + * are designed to be compatible with XFS. + * + * ALLOCSP* and FREESP* are not and will never be supported, but are + * included here for completeness. + */ +struct ocfs2_space_resv { + __s16 l_type; + __s16 l_whence; + __s64 l_start; + __s64 l_len; /* len == 0 means until end of file */ + __s32 l_sysid; + __u32 l_pid; + __s32 l_pad[4]; /* reserve area */ +}; + +#define OCFS2_IOC_ALLOCSP _IOW ('X', 10, struct ocfs2_space_resv) +#define OCFS2_IOC_FREESP _IOW ('X', 11, struct ocfs2_space_resv) +#define OCFS2_IOC_RESVSP _IOW ('X', 40, struct ocfs2_space_resv) +#define OCFS2_IOC_UNRESVSP _IOW ('X', 41, struct ocfs2_space_resv) +#define OCFS2_IOC_ALLOCSP64 _IOW ('X', 36, struct ocfs2_space_resv) +#define OCFS2_IOC_FREESP64 _IOW ('X', 37, struct ocfs2_space_resv) +#define OCFS2_IOC_RESVSP64 _IOW ('X', 42, struct ocfs2_space_resv) +#define OCFS2_IOC_UNRESVSP64 _IOW ('X', 43, struct ocfs2_space_resv) + +/* Used to pass group descriptor data when online resize is done */ +struct ocfs2_new_group_input { + __u64 group; /* Group descriptor's blkno. */ + __u32 clusters; /* Total number of clusters in this group */ + __u32 frees; /* Total free clusters in this group */ + __u16 chain; /* Chain for this group */ + __u16 reserved1; + __u32 reserved2; +}; + +#define OCFS2_IOC_GROUP_EXTEND _IOW('o', 1, int) +#define OCFS2_IOC_GROUP_ADD _IOW('o', 2,struct ocfs2_new_group_input) +#define OCFS2_IOC_GROUP_ADD64 _IOW('o', 3,struct ocfs2_new_group_input) + +/* Used to pass 2 file names to reflink. */ +struct reflink_arguments { + __u64 old_path; + __u64 new_path; + __u64 preserve; +}; +#define OCFS2_IOC_REFLINK _IOW('o', 4, struct reflink_arguments) + +#endif /* OCFS2_IOCTL_H */ diff --git a/fs/ocfs2/ocfs2_lockingver.h b/fs/ocfs2/ocfs2_lockingver.h index 82d5eeac0ff..2e45c8d2ea7 100644 --- a/fs/ocfs2/ocfs2_lockingver.h +++ b/fs/ocfs2/ocfs2_lockingver.h @@ -23,6 +23,8 @@ /* * The protocol version for ocfs2 cluster locking. See dlmglue.c for * more details. + * + * 1.0 - Initial locking version from ocfs2 1.4. */ #define OCFS2_LOCKING_PROTOCOL_MAJOR 1 #define OCFS2_LOCKING_PROTOCOL_MINOR 0 diff --git a/fs/ocfs2/refcounttree.c b/fs/ocfs2/refcounttree.c index 8ae65c9c020..fb6aa7acf54 100644 --- a/fs/ocfs2/refcounttree.c +++ b/fs/ocfs2/refcounttree.c @@ -626,7 +626,7 @@ static int ocfs2_create_refcount_tree(struct inode *inode, rb = (struct ocfs2_refcount_block *)new_bh->b_data; memset(rb, 0, inode->i_sb->s_blocksize); strcpy((void *)rb, OCFS2_REFCOUNT_BLOCK_SIGNATURE); - rb->rf_suballoc_slot = cpu_to_le16(osb->slot_num); + rb->rf_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot); rb->rf_suballoc_bit = cpu_to_le16(suballoc_bit_start); rb->rf_fs_generation = cpu_to_le32(osb->fs_generation); rb->rf_blkno = cpu_to_le64(first_blkno); @@ -1330,7 +1330,7 @@ static int ocfs2_expand_inline_ref_root(handle_t *handle, memcpy(new_bh->b_data, ref_root_bh->b_data, sb->s_blocksize); new_rb = (struct ocfs2_refcount_block *)new_bh->b_data; - new_rb->rf_suballoc_slot = cpu_to_le16(OCFS2_SB(sb)->slot_num); + new_rb->rf_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot); new_rb->rf_suballoc_bit = cpu_to_le16(suballoc_bit_start); new_rb->rf_blkno = cpu_to_le64(blkno); new_rb->rf_cpos = cpu_to_le32(0); @@ -1576,7 +1576,7 @@ static int ocfs2_new_leaf_refcount_block(handle_t *handle, new_rb = (struct ocfs2_refcount_block *)new_bh->b_data; memset(new_rb, 0, sb->s_blocksize); strcpy((void *)new_rb, OCFS2_REFCOUNT_BLOCK_SIGNATURE); - new_rb->rf_suballoc_slot = cpu_to_le16(OCFS2_SB(sb)->slot_num); + new_rb->rf_suballoc_slot = cpu_to_le16(meta_ac->ac_alloc_slot); new_rb->rf_suballoc_bit = cpu_to_le16(suballoc_bit_start); new_rb->rf_fs_generation = cpu_to_le32(OCFS2_SB(sb)->fs_generation); new_rb->rf_blkno = cpu_to_le64(blkno); diff --git a/fs/ocfs2/stack_o2cb.c b/fs/ocfs2/stack_o2cb.c index 3038c92af49..7020e1253ff 100644 --- a/fs/ocfs2/stack_o2cb.c +++ b/fs/ocfs2/stack_o2cb.c @@ -161,24 +161,23 @@ static int dlm_status_to_errno(enum dlm_status status) static void o2dlm_lock_ast_wrapper(void *astarg) { - BUG_ON(o2cb_stack.sp_proto == NULL); + struct ocfs2_dlm_lksb *lksb = astarg; - o2cb_stack.sp_proto->lp_lock_ast(astarg); + lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); } static void o2dlm_blocking_ast_wrapper(void *astarg, int level) { - BUG_ON(o2cb_stack.sp_proto == NULL); + struct ocfs2_dlm_lksb *lksb = astarg; - o2cb_stack.sp_proto->lp_blocking_ast(astarg, level); + lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); } static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status) { + struct ocfs2_dlm_lksb *lksb = astarg; int error = dlm_status_to_errno(status); - BUG_ON(o2cb_stack.sp_proto == NULL); - /* * In o2dlm, you can get both the lock_ast() for the lock being * granted and the unlock_ast() for the CANCEL failing. A @@ -193,16 +192,15 @@ static void o2dlm_unlock_ast_wrapper(void *astarg, enum dlm_status status) if (status == DLM_CANCELGRANT) return; - o2cb_stack.sp_proto->lp_unlock_ast(astarg, error); + lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, error); } static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn, int mode, - union ocfs2_dlm_lksb *lksb, + struct ocfs2_dlm_lksb *lksb, u32 flags, void *name, - unsigned int namelen, - void *astarg) + unsigned int namelen) { enum dlm_status status; int o2dlm_mode = mode_to_o2dlm(mode); @@ -211,28 +209,27 @@ static int o2cb_dlm_lock(struct ocfs2_cluster_connection *conn, status = dlmlock(conn->cc_lockspace, o2dlm_mode, &lksb->lksb_o2dlm, o2dlm_flags, name, namelen, - o2dlm_lock_ast_wrapper, astarg, + o2dlm_lock_ast_wrapper, lksb, o2dlm_blocking_ast_wrapper); ret = dlm_status_to_errno(status); return ret; } static int o2cb_dlm_unlock(struct ocfs2_cluster_connection *conn, - union ocfs2_dlm_lksb *lksb, - u32 flags, - void *astarg) + struct ocfs2_dlm_lksb *lksb, + u32 flags) { enum dlm_status status; int o2dlm_flags = flags_to_o2dlm(flags); int ret; status = dlmunlock(conn->cc_lockspace, &lksb->lksb_o2dlm, - o2dlm_flags, o2dlm_unlock_ast_wrapper, astarg); + o2dlm_flags, o2dlm_unlock_ast_wrapper, lksb); ret = dlm_status_to_errno(status); return ret; } -static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb) +static int o2cb_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) { return dlm_status_to_errno(lksb->lksb_o2dlm.status); } @@ -242,17 +239,17 @@ static int o2cb_dlm_lock_status(union ocfs2_dlm_lksb *lksb) * contents, it will zero out the LVB. Thus the caller can always trust * the contents. */ -static int o2cb_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb) +static int o2cb_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) { return 1; } -static void *o2cb_dlm_lvb(union ocfs2_dlm_lksb *lksb) +static void *o2cb_dlm_lvb(struct ocfs2_dlm_lksb *lksb) { return (void *)(lksb->lksb_o2dlm.lvb); } -static void o2cb_dump_lksb(union ocfs2_dlm_lksb *lksb) +static void o2cb_dump_lksb(struct ocfs2_dlm_lksb *lksb) { dlm_print_one_lock(lksb->lksb_o2dlm.lockid); } @@ -280,7 +277,7 @@ static int o2cb_cluster_connect(struct ocfs2_cluster_connection *conn) struct dlm_protocol_version fs_version; BUG_ON(conn == NULL); - BUG_ON(o2cb_stack.sp_proto == NULL); + BUG_ON(conn->cc_proto == NULL); /* for now we only have one cluster/node, make sure we see it * in the heartbeat universe */ diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c index da78a2a334f..5ae8812b286 100644 --- a/fs/ocfs2/stack_user.c +++ b/fs/ocfs2/stack_user.c @@ -25,7 +25,6 @@ #include <linux/reboot.h> #include <asm/uaccess.h> -#include "ocfs2.h" /* For struct ocfs2_lock_res */ #include "stackglue.h" #include <linux/dlm_plock.h> @@ -63,8 +62,8 @@ * negotiated by the client. The client negotiates based on the maximum * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major * number from the "SETV" message must match - * ocfs2_user_plugin.sp_proto->lp_max_version.pv_major, and the minor number - * must be less than or equal to ...->lp_max_version.pv_minor. + * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number + * must be less than or equal to ...sp_max_version.pv_minor. * * Once this information has been set, mounts will be allowed. From this * point on, the "DOWN" message can be sent for node down notification. @@ -401,7 +400,7 @@ static int ocfs2_control_do_setversion_msg(struct file *file, char *ptr = NULL; struct ocfs2_control_private *p = file->private_data; struct ocfs2_protocol_version *max = - &ocfs2_user_plugin.sp_proto->lp_max_version; + &ocfs2_user_plugin.sp_max_proto; if (ocfs2_control_get_handshake_state(file) != OCFS2_CONTROL_HANDSHAKE_PROTOCOL) @@ -664,18 +663,10 @@ static void ocfs2_control_exit(void) -rc); } -static struct dlm_lksb *fsdlm_astarg_to_lksb(void *astarg) -{ - struct ocfs2_lock_res *res = astarg; - return &res->l_lksb.lksb_fsdlm; -} - static void fsdlm_lock_ast_wrapper(void *astarg) { - struct dlm_lksb *lksb = fsdlm_astarg_to_lksb(astarg); - int status = lksb->sb_status; - - BUG_ON(ocfs2_user_plugin.sp_proto == NULL); + struct ocfs2_dlm_lksb *lksb = astarg; + int status = lksb->lksb_fsdlm.sb_status; /* * For now we're punting on the issue of other non-standard errors @@ -688,25 +679,24 @@ static void fsdlm_lock_ast_wrapper(void *astarg) */ if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) - ocfs2_user_plugin.sp_proto->lp_unlock_ast(astarg, 0); + lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); else - ocfs2_user_plugin.sp_proto->lp_lock_ast(astarg); + lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); } static void fsdlm_blocking_ast_wrapper(void *astarg, int level) { - BUG_ON(ocfs2_user_plugin.sp_proto == NULL); + struct ocfs2_dlm_lksb *lksb = astarg; - ocfs2_user_plugin.sp_proto->lp_blocking_ast(astarg, level); + lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); } static int user_dlm_lock(struct ocfs2_cluster_connection *conn, int mode, - union ocfs2_dlm_lksb *lksb, + struct ocfs2_dlm_lksb *lksb, u32 flags, void *name, - unsigned int namelen, - void *astarg) + unsigned int namelen) { int ret; @@ -716,36 +706,35 @@ static int user_dlm_lock(struct ocfs2_cluster_connection *conn, ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, flags|DLM_LKF_NODLCKWT, name, namelen, 0, - fsdlm_lock_ast_wrapper, astarg, + fsdlm_lock_ast_wrapper, lksb, fsdlm_blocking_ast_wrapper); return ret; } static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, - union ocfs2_dlm_lksb *lksb, - u32 flags, - void *astarg) + struct ocfs2_dlm_lksb *lksb, + u32 flags) { int ret; ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, - flags, &lksb->lksb_fsdlm, astarg); + flags, &lksb->lksb_fsdlm, lksb); return ret; } -static int user_dlm_lock_status(union ocfs2_dlm_lksb *lksb) +static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) { return lksb->lksb_fsdlm.sb_status; } -static int user_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb) +static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) { int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; return !invalid; } -static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb) +static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) { if (!lksb->lksb_fsdlm.sb_lvbptr) lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + @@ -753,7 +742,7 @@ static void *user_dlm_lvb(union ocfs2_dlm_lksb *lksb) return (void *)(lksb->lksb_fsdlm.sb_lvbptr); } -static void user_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb) +static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) { } diff --git a/fs/ocfs2/stackglue.c b/fs/ocfs2/stackglue.c index f3df0baa9a4..39abf89697e 100644 --- a/fs/ocfs2/stackglue.c +++ b/fs/ocfs2/stackglue.c @@ -36,7 +36,7 @@ #define OCFS2_STACK_PLUGIN_USER "user" #define OCFS2_MAX_HB_CTL_PATH 256 -static struct ocfs2_locking_protocol *lproto; +static struct ocfs2_protocol_version locking_max_version; static DEFINE_SPINLOCK(ocfs2_stack_lock); static LIST_HEAD(ocfs2_stack_list); static char cluster_stack_name[OCFS2_STACK_LABEL_LEN + 1]; @@ -176,7 +176,7 @@ int ocfs2_stack_glue_register(struct ocfs2_stack_plugin *plugin) spin_lock(&ocfs2_stack_lock); if (!ocfs2_stack_lookup(plugin->sp_name)) { plugin->sp_count = 0; - plugin->sp_proto = lproto; + plugin->sp_max_proto = locking_max_version; list_add(&plugin->sp_list, &ocfs2_stack_list); printk(KERN_INFO "ocfs2: Registered cluster interface %s\n", plugin->sp_name); @@ -213,77 +213,76 @@ void ocfs2_stack_glue_unregister(struct ocfs2_stack_plugin *plugin) } EXPORT_SYMBOL_GPL(ocfs2_stack_glue_unregister); -void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto) +void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto) { struct ocfs2_stack_plugin *p; - BUG_ON(proto == NULL); - spin_lock(&ocfs2_stack_lock); - BUG_ON(active_stack != NULL); + if (memcmp(max_proto, &locking_max_version, + sizeof(struct ocfs2_protocol_version))) { + BUG_ON(locking_max_version.pv_major != 0); - lproto = proto; - list_for_each_entry(p, &ocfs2_stack_list, sp_list) { - p->sp_proto = lproto; + locking_max_version = *max_proto; + list_for_each_entry(p, &ocfs2_stack_list, sp_list) { + p->sp_max_proto = locking_max_version; + } } - spin_unlock(&ocfs2_stack_lock); } -EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_locking_protocol); +EXPORT_SYMBOL_GPL(ocfs2_stack_glue_set_max_proto_version); /* - * The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take - * "struct ocfs2_lock_res *astarg" instead of "void *astarg" because the - * underlying stack plugins need to pilfer the lksb off of the lock_res. - * If some other structure needs to be passed as an astarg, the plugins - * will need to be given a different avenue to the lksb. + * The ocfs2_dlm_lock() and ocfs2_dlm_unlock() functions take no argument + * for the ast and bast functions. They will pass the lksb to the ast + * and bast. The caller can wrap the lksb with their own structure to + * get more information. */ int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn, int mode, - union ocfs2_dlm_lksb *lksb, + struct ocfs2_dlm_lksb *lksb, u32 flags, void *name, - unsigned int namelen, - struct ocfs2_lock_res *astarg) + unsigned int namelen) { - BUG_ON(lproto == NULL); - + if (!lksb->lksb_conn) + lksb->lksb_conn = conn; + else + BUG_ON(lksb->lksb_conn != conn); return active_stack->sp_ops->dlm_lock(conn, mode, lksb, flags, - name, namelen, astarg); + name, namelen); } EXPORT_SYMBOL_GPL(ocfs2_dlm_lock); int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn, - union ocfs2_dlm_lksb *lksb, - u32 flags, - struct ocfs2_lock_res *astarg) + struct ocfs2_dlm_lksb *lksb, + u32 flags) { - BUG_ON(lproto == NULL); + BUG_ON(lksb->lksb_conn == NULL); - return active_stack->sp_ops->dlm_unlock(conn, lksb, flags, astarg); + return active_stack->sp_ops->dlm_unlock(conn, lksb, flags); } EXPORT_SYMBOL_GPL(ocfs2_dlm_unlock); -int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb) +int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) { return active_stack->sp_ops->lock_status(lksb); } EXPORT_SYMBOL_GPL(ocfs2_dlm_lock_status); -int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb) +int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) { return active_stack->sp_ops->lvb_valid(lksb); } EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb_valid); -void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb) +void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb) { return active_stack->sp_ops->lock_lvb(lksb); } EXPORT_SYMBOL_GPL(ocfs2_dlm_lvb); -void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb) +void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) { active_stack->sp_ops->dump_lksb(lksb); } @@ -312,6 +311,7 @@ EXPORT_SYMBOL_GPL(ocfs2_plock); int ocfs2_cluster_connect(const char *stack_name, const char *group, int grouplen, + struct ocfs2_locking_protocol *lproto, void (*recovery_handler)(int node_num, void *recovery_data), void *recovery_data, @@ -329,6 +329,12 @@ int ocfs2_cluster_connect(const char *stack_name, goto out; } + if (memcmp(&lproto->lp_max_version, &locking_max_version, + sizeof(struct ocfs2_protocol_version))) { + rc = -EINVAL; + goto out; + } + new_conn = kzalloc(sizeof(struct ocfs2_cluster_connection), GFP_KERNEL); if (!new_conn) { @@ -341,6 +347,7 @@ int ocfs2_cluster_connect(const char *stack_name, new_conn->cc_recovery_handler = recovery_handler; new_conn->cc_recovery_data = recovery_data; + new_conn->cc_proto = lproto; /* Start the new connection at our maximum compatibility level */ new_conn->cc_version = lproto->lp_max_version; @@ -366,6 +373,24 @@ out: } EXPORT_SYMBOL_GPL(ocfs2_cluster_connect); +/* The caller will ensure all nodes have the same cluster stack */ +int ocfs2_cluster_connect_agnostic(const char *group, + int grouplen, + struct ocfs2_locking_protocol *lproto, + void (*recovery_handler)(int node_num, + void *recovery_data), + void *recovery_data, + struct ocfs2_cluster_connection **conn) +{ + char *stack_name = NULL; + + if (cluster_stack_name[0]) + stack_name = cluster_stack_name; + return ocfs2_cluster_connect(stack_name, group, grouplen, lproto, + recovery_handler, recovery_data, conn); +} +EXPORT_SYMBOL_GPL(ocfs2_cluster_connect_agnostic); + /* If hangup_pending is 0, the stack driver will be dropped */ int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn, int hangup_pending) @@ -453,10 +478,10 @@ static ssize_t ocfs2_max_locking_protocol_show(struct kobject *kobj, ssize_t ret = 0; spin_lock(&ocfs2_stack_lock); - if (lproto) + if (locking_max_version.pv_major) ret = snprintf(buf, PAGE_SIZE, "%u.%u\n", - lproto->lp_max_version.pv_major, - lproto->lp_max_version.pv_minor); + locking_max_version.pv_major, + locking_max_version.pv_minor); spin_unlock(&ocfs2_stack_lock); return ret; @@ -685,7 +710,10 @@ static int __init ocfs2_stack_glue_init(void) static void __exit ocfs2_stack_glue_exit(void) { - lproto = NULL; + memset(&locking_max_version, 0, + sizeof(struct ocfs2_protocol_version)); + locking_max_version.pv_major = 0; + locking_max_version.pv_minor = 0; ocfs2_sysfs_exit(); if (ocfs2_table_header) unregister_sysctl_table(ocfs2_table_header); diff --git a/fs/ocfs2/stackglue.h b/fs/ocfs2/stackglue.h index 03a44d60eac..8ce7398ae1d 100644 --- a/fs/ocfs2/stackglue.h +++ b/fs/ocfs2/stackglue.h @@ -56,17 +56,6 @@ struct ocfs2_protocol_version { }; /* - * The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf. - */ -struct ocfs2_locking_protocol { - struct ocfs2_protocol_version lp_max_version; - void (*lp_lock_ast)(void *astarg); - void (*lp_blocking_ast)(void *astarg, int level); - void (*lp_unlock_ast)(void *astarg, int error); -}; - - -/* * The dlm_lockstatus struct includes lvb space, but the dlm_lksb struct only * has a pointer to separately allocated lvb space. This struct exists only to * include in the lksb union to make space for a combined dlm_lksb and lvb. @@ -81,12 +70,27 @@ struct fsdlm_lksb_plus_lvb { * size of the union is known. Lock status structures are embedded in * ocfs2 inodes. */ -union ocfs2_dlm_lksb { - struct dlm_lockstatus lksb_o2dlm; - struct dlm_lksb lksb_fsdlm; - struct fsdlm_lksb_plus_lvb padding; +struct ocfs2_cluster_connection; +struct ocfs2_dlm_lksb { + union { + struct dlm_lockstatus lksb_o2dlm; + struct dlm_lksb lksb_fsdlm; + struct fsdlm_lksb_plus_lvb padding; + }; + struct ocfs2_cluster_connection *lksb_conn; +}; + +/* + * The ocfs2_locking_protocol defines the handlers called on ocfs2's behalf. + */ +struct ocfs2_locking_protocol { + struct ocfs2_protocol_version lp_max_version; + void (*lp_lock_ast)(struct ocfs2_dlm_lksb *lksb); + void (*lp_blocking_ast)(struct ocfs2_dlm_lksb *lksb, int level); + void (*lp_unlock_ast)(struct ocfs2_dlm_lksb *lksb, int error); }; + /* * A cluster connection. Mostly opaque to ocfs2, the connection holds * state for the underlying stack. ocfs2 does use cc_version to determine @@ -96,6 +100,7 @@ struct ocfs2_cluster_connection { char cc_name[GROUP_NAME_MAX]; int cc_namelen; struct ocfs2_protocol_version cc_version; + struct ocfs2_locking_protocol *cc_proto; void (*cc_recovery_handler)(int node_num, void *recovery_data); void *cc_recovery_data; void *cc_lockspace; @@ -155,27 +160,29 @@ struct ocfs2_stack_operations { * * ast and bast functions are not part of the call because the * stack will likely want to wrap ast and bast calls before passing - * them to stack->sp_proto. + * them to stack->sp_proto. There is no astarg. The lksb will + * be passed back to the ast and bast functions. The caller can + * use this to find their object. */ int (*dlm_lock)(struct ocfs2_cluster_connection *conn, int mode, - union ocfs2_dlm_lksb *lksb, + struct ocfs2_dlm_lksb *lksb, u32 flags, void *name, - unsigned int namelen, - void *astarg); + unsigned int namelen); /* * Call the underlying dlm unlock function. The ->dlm_unlock() * function should convert the flags as appropriate. * * The unlock ast is not passed, as the stack will want to wrap - * it before calling stack->sp_proto->lp_unlock_ast(). + * it before calling stack->sp_proto->lp_unlock_ast(). There is + * no astarg. The lksb will be passed back to the unlock ast + * function. The caller can use this to find their object. */ int (*dlm_unlock)(struct ocfs2_cluster_connection *conn, - union ocfs2_dlm_lksb *lksb, - u32 flags, - void *astarg); + struct ocfs2_dlm_lksb *lksb, + u32 flags); /* * Return the status of the current lock status block. The fs @@ -183,17 +190,17 @@ struct ocfs2_stack_operations { * callback pulls out the stack-specific lksb, converts the status * to a proper errno, and returns it. */ - int (*lock_status)(union ocfs2_dlm_lksb *lksb); + int (*lock_status)(struct ocfs2_dlm_lksb *lksb); /* * Return non-zero if the LVB is valid. */ - int (*lvb_valid)(union ocfs2_dlm_lksb *lksb); + int (*lvb_valid)(struct ocfs2_dlm_lksb *lksb); /* * Pull the lvb pointer off of the stack-specific lksb. */ - void *(*lock_lvb)(union ocfs2_dlm_lksb *lksb); + void *(*lock_lvb)(struct ocfs2_dlm_lksb *lksb); /* * Cluster-aware posix locks @@ -210,7 +217,7 @@ struct ocfs2_stack_operations { * This is an optoinal debugging hook. If provided, the * stack can dump debugging information about this lock. */ - void (*dump_lksb)(union ocfs2_dlm_lksb *lksb); + void (*dump_lksb)(struct ocfs2_dlm_lksb *lksb); }; /* @@ -226,7 +233,7 @@ struct ocfs2_stack_plugin { /* These are managed by the stackglue code. */ struct list_head sp_list; unsigned int sp_count; - struct ocfs2_locking_protocol *sp_proto; + struct ocfs2_protocol_version sp_max_proto; }; @@ -234,10 +241,22 @@ struct ocfs2_stack_plugin { int ocfs2_cluster_connect(const char *stack_name, const char *group, int grouplen, + struct ocfs2_locking_protocol *lproto, void (*recovery_handler)(int node_num, void *recovery_data), void *recovery_data, struct ocfs2_cluster_connection **conn); +/* + * Used by callers that don't store their stack name. They must ensure + * all nodes have the same stack. + */ +int ocfs2_cluster_connect_agnostic(const char *group, + int grouplen, + struct ocfs2_locking_protocol *lproto, + void (*recovery_handler)(int node_num, + void *recovery_data), + void *recovery_data, + struct ocfs2_cluster_connection **conn); int ocfs2_cluster_disconnect(struct ocfs2_cluster_connection *conn, int hangup_pending); void ocfs2_cluster_hangup(const char *group, int grouplen); @@ -246,26 +265,24 @@ int ocfs2_cluster_this_node(unsigned int *node); struct ocfs2_lock_res; int ocfs2_dlm_lock(struct ocfs2_cluster_connection *conn, int mode, - union ocfs2_dlm_lksb *lksb, + struct ocfs2_dlm_lksb *lksb, u32 flags, void *name, - unsigned int namelen, - struct ocfs2_lock_res *astarg); + unsigned int namelen); int ocfs2_dlm_unlock(struct ocfs2_cluster_connection *conn, - union ocfs2_dlm_lksb *lksb, - u32 flags, - struct ocfs2_lock_res *astarg); + struct ocfs2_dlm_lksb *lksb, + u32 flags); -int ocfs2_dlm_lock_status(union ocfs2_dlm_lksb *lksb); -int ocfs2_dlm_lvb_valid(union ocfs2_dlm_lksb *lksb); -void *ocfs2_dlm_lvb(union ocfs2_dlm_lksb *lksb); -void ocfs2_dlm_dump_lksb(union ocfs2_dlm_lksb *lksb); +int ocfs2_dlm_lock_status(struct ocfs2_dlm_lksb *lksb); +int ocfs2_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb); +void *ocfs2_dlm_lvb(struct ocfs2_dlm_lksb *lksb); +void ocfs2_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb); int ocfs2_stack_supports_plocks(void); int ocfs2_plock(struct ocfs2_cluster_connection *conn, u64 ino, struct file *file, int cmd, struct file_lock *fl); -void ocfs2_stack_glue_set_locking_protocol(struct ocfs2_locking_protocol *proto); +void ocfs2_stack_glue_set_max_proto_version(struct ocfs2_protocol_version *max_proto); /* Used by stack plugins */ diff --git a/fs/ocfs2/suballoc.c b/fs/ocfs2/suballoc.c index c30b644d957..c3c60bc3e07 100644 --- a/fs/ocfs2/suballoc.c +++ b/fs/ocfs2/suballoc.c @@ -51,7 +51,7 @@ #define ALLOC_NEW_GROUP 0x1 #define ALLOC_GROUPS_FROM_GLOBAL 0x2 -#define OCFS2_MAX_INODES_TO_STEAL 1024 +#define OCFS2_MAX_TO_STEAL 1024 static inline void ocfs2_debug_bg(struct ocfs2_group_desc *bg); static inline void ocfs2_debug_suballoc_inode(struct ocfs2_dinode *fe); @@ -637,12 +637,113 @@ bail: return status; } +static void ocfs2_init_inode_steal_slot(struct ocfs2_super *osb) +{ + spin_lock(&osb->osb_lock); + osb->s_inode_steal_slot = OCFS2_INVALID_SLOT; + spin_unlock(&osb->osb_lock); + atomic_set(&osb->s_num_inodes_stolen, 0); +} + +static void ocfs2_init_meta_steal_slot(struct ocfs2_super *osb) +{ + spin_lock(&osb->osb_lock); + osb->s_meta_steal_slot = OCFS2_INVALID_SLOT; + spin_unlock(&osb->osb_lock); + atomic_set(&osb->s_num_meta_stolen, 0); +} + +void ocfs2_init_steal_slots(struct ocfs2_super *osb) +{ + ocfs2_init_inode_steal_slot(osb); + ocfs2_init_meta_steal_slot(osb); +} + +static void __ocfs2_set_steal_slot(struct ocfs2_super *osb, int slot, int type) +{ + spin_lock(&osb->osb_lock); + if (type == INODE_ALLOC_SYSTEM_INODE) + osb->s_inode_steal_slot = slot; + else if (type == EXTENT_ALLOC_SYSTEM_INODE) + osb->s_meta_steal_slot = slot; + spin_unlock(&osb->osb_lock); +} + +static int __ocfs2_get_steal_slot(struct ocfs2_super *osb, int type) +{ + int slot = OCFS2_INVALID_SLOT; + + spin_lock(&osb->osb_lock); + if (type == INODE_ALLOC_SYSTEM_INODE) + slot = osb->s_inode_steal_slot; + else if (type == EXTENT_ALLOC_SYSTEM_INODE) + slot = osb->s_meta_steal_slot; + spin_unlock(&osb->osb_lock); + + return slot; +} + +static int ocfs2_get_inode_steal_slot(struct ocfs2_super *osb) +{ + return __ocfs2_get_steal_slot(osb, INODE_ALLOC_SYSTEM_INODE); +} + +static int ocfs2_get_meta_steal_slot(struct ocfs2_super *osb) +{ + return __ocfs2_get_steal_slot(osb, EXTENT_ALLOC_SYSTEM_INODE); +} + +static int ocfs2_steal_resource(struct ocfs2_super *osb, + struct ocfs2_alloc_context *ac, + int type) +{ + int i, status = -ENOSPC; + int slot = __ocfs2_get_steal_slot(osb, type); + + /* Start to steal resource from the first slot after ours. */ + if (slot == OCFS2_INVALID_SLOT) + slot = osb->slot_num + 1; + + for (i = 0; i < osb->max_slots; i++, slot++) { + if (slot == osb->max_slots) + slot = 0; + + if (slot == osb->slot_num) + continue; + + status = ocfs2_reserve_suballoc_bits(osb, ac, + type, + (u32)slot, NULL, + NOT_ALLOC_NEW_GROUP); + if (status >= 0) { + __ocfs2_set_steal_slot(osb, slot, type); + break; + } + + ocfs2_free_ac_resource(ac); + } + + return status; +} + +static int ocfs2_steal_inode(struct ocfs2_super *osb, + struct ocfs2_alloc_context *ac) +{ + return ocfs2_steal_resource(osb, ac, INODE_ALLOC_SYSTEM_INODE); +} + +static int ocfs2_steal_meta(struct ocfs2_super *osb, + struct ocfs2_alloc_context *ac) +{ + return ocfs2_steal_resource(osb, ac, EXTENT_ALLOC_SYSTEM_INODE); +} + int ocfs2_reserve_new_metadata_blocks(struct ocfs2_super *osb, int blocks, struct ocfs2_alloc_context **ac) { int status; - u32 slot; + int slot = ocfs2_get_meta_steal_slot(osb); *ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL); if (!(*ac)) { @@ -653,12 +754,34 @@ int ocfs2_reserve_new_metadata_blocks(struct ocfs2_super *osb, (*ac)->ac_bits_wanted = blocks; (*ac)->ac_which = OCFS2_AC_USE_META; - slot = osb->slot_num; (*ac)->ac_group_search = ocfs2_block_group_search; + if (slot != OCFS2_INVALID_SLOT && + atomic_read(&osb->s_num_meta_stolen) < OCFS2_MAX_TO_STEAL) + goto extent_steal; + + atomic_set(&osb->s_num_meta_stolen, 0); status = ocfs2_reserve_suballoc_bits(osb, (*ac), EXTENT_ALLOC_SYSTEM_INODE, - slot, NULL, ALLOC_NEW_GROUP); + (u32)osb->slot_num, NULL, + ALLOC_NEW_GROUP); + + + if (status >= 0) { + status = 0; + if (slot != OCFS2_INVALID_SLOT) + ocfs2_init_meta_steal_slot(osb); + goto bail; + } else if (status < 0 && status != -ENOSPC) { + mlog_errno(status); + goto bail; + } + + ocfs2_free_ac_resource(*ac); + +extent_steal: + status = ocfs2_steal_meta(osb, *ac); + atomic_inc(&osb->s_num_meta_stolen); if (status < 0) { if (status != -ENOSPC) mlog_errno(status); @@ -685,43 +808,11 @@ int ocfs2_reserve_new_metadata(struct ocfs2_super *osb, ac); } -static int ocfs2_steal_inode_from_other_nodes(struct ocfs2_super *osb, - struct ocfs2_alloc_context *ac) -{ - int i, status = -ENOSPC; - s16 slot = ocfs2_get_inode_steal_slot(osb); - - /* Start to steal inodes from the first slot after ours. */ - if (slot == OCFS2_INVALID_SLOT) - slot = osb->slot_num + 1; - - for (i = 0; i < osb->max_slots; i++, slot++) { - if (slot == osb->max_slots) - slot = 0; - - if (slot == osb->slot_num) - continue; - - status = ocfs2_reserve_suballoc_bits(osb, ac, - INODE_ALLOC_SYSTEM_INODE, - slot, NULL, - NOT_ALLOC_NEW_GROUP); - if (status >= 0) { - ocfs2_set_inode_steal_slot(osb, slot); - break; - } - - ocfs2_free_ac_resource(ac); - } - - return status; -} - int ocfs2_reserve_new_inode(struct ocfs2_super *osb, struct ocfs2_alloc_context **ac) { int status; - s16 slot = ocfs2_get_inode_steal_slot(osb); + int slot = ocfs2_get_inode_steal_slot(osb); u64 alloc_group; *ac = kzalloc(sizeof(struct ocfs2_alloc_context), GFP_KERNEL); @@ -754,14 +845,14 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb, * need to check our slots to see whether there is some space for us. */ if (slot != OCFS2_INVALID_SLOT && - atomic_read(&osb->s_num_inodes_stolen) < OCFS2_MAX_INODES_TO_STEAL) + atomic_read(&osb->s_num_inodes_stolen) < OCFS2_MAX_TO_STEAL) goto inode_steal; atomic_set(&osb->s_num_inodes_stolen, 0); alloc_group = osb->osb_inode_alloc_group; status = ocfs2_reserve_suballoc_bits(osb, *ac, INODE_ALLOC_SYSTEM_INODE, - osb->slot_num, + (u32)osb->slot_num, &alloc_group, ALLOC_NEW_GROUP | ALLOC_GROUPS_FROM_GLOBAL); @@ -789,7 +880,7 @@ int ocfs2_reserve_new_inode(struct ocfs2_super *osb, ocfs2_free_ac_resource(*ac); inode_steal: - status = ocfs2_steal_inode_from_other_nodes(osb, *ac); + status = ocfs2_steal_inode(osb, *ac); atomic_inc(&osb->s_num_inodes_stolen); if (status < 0) { if (status != -ENOSPC) diff --git a/fs/ocfs2/suballoc.h b/fs/ocfs2/suballoc.h index 8c9a78a4316..fa60723c43e 100644 --- a/fs/ocfs2/suballoc.h +++ b/fs/ocfs2/suballoc.h @@ -56,6 +56,7 @@ struct ocfs2_alloc_context { is the same as ~0 - unlimited */ }; +void ocfs2_init_steal_slots(struct ocfs2_super *osb); void ocfs2_free_alloc_context(struct ocfs2_alloc_context *ac); static inline int ocfs2_alloc_context_bits_left(struct ocfs2_alloc_context *ac) { diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c index 755cd49a5ef..dee03197a49 100644 --- a/fs/ocfs2/super.c +++ b/fs/ocfs2/super.c @@ -69,6 +69,7 @@ #include "xattr.h" #include "quota.h" #include "refcounttree.h" +#include "suballoc.h" #include "buffer_head_io.h" @@ -301,9 +302,12 @@ static int ocfs2_osb_dump(struct ocfs2_super *osb, char *buf, int len) spin_lock(&osb->osb_lock); out += snprintf(buf + out, len - out, - "%10s => Slot: %d NumStolen: %d\n", "Steal", + "%10s => InodeSlot: %d StolenInodes: %d, " + "MetaSlot: %d StolenMeta: %d\n", "Steal", osb->s_inode_steal_slot, - atomic_read(&osb->s_num_inodes_stolen)); + atomic_read(&osb->s_num_inodes_stolen), + osb->s_meta_steal_slot, + atomic_read(&osb->s_num_meta_stolen)); spin_unlock(&osb->osb_lock); out += snprintf(buf + out, len - out, "OrphanScan => "); @@ -1997,7 +2001,7 @@ static int ocfs2_initialize_super(struct super_block *sb, osb->blocked_lock_count = 0; spin_lock_init(&osb->osb_lock); spin_lock_init(&osb->osb_xattr_lock); - ocfs2_init_inode_steal_slot(osb); + ocfs2_init_steal_slots(osb); atomic_set(&osb->alloc_stats.moves, 0); atomic_set(&osb->alloc_stats.local_data, 0); diff --git a/fs/ocfs2/xattr.c b/fs/ocfs2/xattr.c index 8fc6fb071c6..d1b0d386f6d 100644 --- a/fs/ocfs2/xattr.c +++ b/fs/ocfs2/xattr.c @@ -116,10 +116,11 @@ static struct xattr_handler *ocfs2_xattr_handler_map[OCFS2_XATTR_MAX] = { }; struct ocfs2_xattr_info { - int name_index; - const char *name; - const void *value; - size_t value_len; + int xi_name_index; + const char *xi_name; + int xi_name_len; + const void *xi_value; + size_t xi_value_len; }; struct ocfs2_xattr_search { @@ -137,6 +138,115 @@ struct ocfs2_xattr_search { int not_found; }; +/* Operations on struct ocfs2_xa_entry */ +struct ocfs2_xa_loc; +struct ocfs2_xa_loc_operations { + /* + * Journal functions + */ + int (*xlo_journal_access)(handle_t *handle, struct ocfs2_xa_loc *loc, + int type); + void (*xlo_journal_dirty)(handle_t *handle, struct ocfs2_xa_loc *loc); + + /* + * Return a pointer to the appropriate buffer in loc->xl_storage + * at the given offset from loc->xl_header. + */ + void *(*xlo_offset_pointer)(struct ocfs2_xa_loc *loc, int offset); + + /* Can we reuse the existing entry for the new value? */ + int (*xlo_can_reuse)(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi); + + /* How much space is needed for the new value? */ + int (*xlo_check_space)(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi); + + /* + * Return the offset of the first name+value pair. This is + * the start of our downward-filling free space. + */ + int (*xlo_get_free_start)(struct ocfs2_xa_loc *loc); + + /* + * Remove the name+value at this location. Do whatever is + * appropriate with the remaining name+value pairs. + */ + void (*xlo_wipe_namevalue)(struct ocfs2_xa_loc *loc); + + /* Fill xl_entry with a new entry */ + void (*xlo_add_entry)(struct ocfs2_xa_loc *loc, u32 name_hash); + + /* Add name+value storage to an entry */ + void (*xlo_add_namevalue)(struct ocfs2_xa_loc *loc, int size); + + /* + * Initialize the value buf's access and bh fields for this entry. + * ocfs2_xa_fill_value_buf() will handle the xv pointer. + */ + void (*xlo_fill_value_buf)(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_value_buf *vb); +}; + +/* + * Describes an xattr entry location. This is a memory structure + * tracking the on-disk structure. + */ +struct ocfs2_xa_loc { + /* This xattr belongs to this inode */ + struct inode *xl_inode; + + /* The ocfs2_xattr_header inside the on-disk storage. Not NULL. */ + struct ocfs2_xattr_header *xl_header; + + /* Bytes from xl_header to the end of the storage */ + int xl_size; + + /* + * The ocfs2_xattr_entry this location describes. If this is + * NULL, this location describes the on-disk structure where it + * would have been. + */ + struct ocfs2_xattr_entry *xl_entry; + + /* + * Internal housekeeping + */ + + /* Buffer(s) containing this entry */ + void *xl_storage; + + /* Operations on the storage backing this location */ + const struct ocfs2_xa_loc_operations *xl_ops; +}; + +/* + * Convenience functions to calculate how much space is needed for a + * given name+value pair + */ +static int namevalue_size(int name_len, uint64_t value_len) +{ + if (value_len > OCFS2_XATTR_INLINE_SIZE) + return OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_ROOT_SIZE; + else + return OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_SIZE(value_len); +} + +static int namevalue_size_xi(struct ocfs2_xattr_info *xi) +{ + return namevalue_size(xi->xi_name_len, xi->xi_value_len); +} + +static int namevalue_size_xe(struct ocfs2_xattr_entry *xe) +{ + u64 value_len = le64_to_cpu(xe->xe_value_size); + + BUG_ON((value_len > OCFS2_XATTR_INLINE_SIZE) && + ocfs2_xattr_is_local(xe)); + return namevalue_size(xe->xe_name_len, value_len); +} + + static int ocfs2_xattr_bucket_get_name_value(struct super_block *sb, struct ocfs2_xattr_header *xh, int index, @@ -212,14 +322,6 @@ static inline u16 ocfs2_blocks_per_xattr_bucket(struct super_block *sb) return OCFS2_XATTR_BUCKET_SIZE / (1 << sb->s_blocksize_bits); } -static inline u16 ocfs2_xattr_max_xe_in_bucket(struct super_block *sb) -{ - u16 len = sb->s_blocksize - - offsetof(struct ocfs2_xattr_header, xh_entries); - - return len / sizeof(struct ocfs2_xattr_entry); -} - #define bucket_blkno(_b) ((_b)->bu_bhs[0]->b_blocknr) #define bucket_block(_b, _n) ((_b)->bu_bhs[(_n)]->b_data) #define bucket_xh(_b) ((struct ocfs2_xattr_header *)bucket_block((_b), 0)) @@ -463,35 +565,22 @@ static u32 ocfs2_xattr_name_hash(struct inode *inode, return hash; } -/* - * ocfs2_xattr_hash_entry() - * - * Compute the hash of an extended attribute. - */ -static void ocfs2_xattr_hash_entry(struct inode *inode, - struct ocfs2_xattr_header *header, - struct ocfs2_xattr_entry *entry) +static int ocfs2_xattr_entry_real_size(int name_len, size_t value_len) { - u32 hash = 0; - char *name = (char *)header + le16_to_cpu(entry->xe_name_offset); - - hash = ocfs2_xattr_name_hash(inode, name, entry->xe_name_len); - entry->xe_name_hash = cpu_to_le32(hash); - - return; + return namevalue_size(name_len, value_len) + + sizeof(struct ocfs2_xattr_entry); } -static int ocfs2_xattr_entry_real_size(int name_len, size_t value_len) +static int ocfs2_xi_entry_usage(struct ocfs2_xattr_info *xi) { - int size = 0; - - if (value_len <= OCFS2_XATTR_INLINE_SIZE) - size = OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_SIZE(value_len); - else - size = OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_ROOT_SIZE; - size += sizeof(struct ocfs2_xattr_entry); + return namevalue_size_xi(xi) + + sizeof(struct ocfs2_xattr_entry); +} - return size; +static int ocfs2_xe_entry_usage(struct ocfs2_xattr_entry *xe) +{ + return namevalue_size_xe(xe) + + sizeof(struct ocfs2_xattr_entry); } int ocfs2_calc_security_init(struct inode *dir, @@ -1308,452 +1397,897 @@ out: return ret; } -static int ocfs2_xattr_cleanup(struct inode *inode, - handle_t *handle, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - struct ocfs2_xattr_value_buf *vb, - size_t offs) +static int ocfs2_xa_check_space_helper(int needed_space, int free_start, + int num_entries) { - int ret = 0; - size_t name_len = strlen(xi->name); - void *val = xs->base + offs; - size_t size = OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_ROOT_SIZE; + int free_space; - ret = vb->vb_access(handle, INODE_CACHE(inode), vb->vb_bh, - OCFS2_JOURNAL_ACCESS_WRITE); - if (ret) { - mlog_errno(ret); - goto out; - } - /* Decrease xattr count */ - le16_add_cpu(&xs->header->xh_count, -1); - /* Remove the xattr entry and tree root which has already be set*/ - memset((void *)xs->here, 0, sizeof(struct ocfs2_xattr_entry)); - memset(val, 0, size); + if (!needed_space) + return 0; - ret = ocfs2_journal_dirty(handle, vb->vb_bh); - if (ret < 0) - mlog_errno(ret); -out: - return ret; + free_space = free_start - + sizeof(struct ocfs2_xattr_header) - + (num_entries * sizeof(struct ocfs2_xattr_entry)) - + OCFS2_XATTR_HEADER_GAP; + if (free_space < 0) + return -EIO; + if (free_space < needed_space) + return -ENOSPC; + + return 0; } -static int ocfs2_xattr_update_entry(struct inode *inode, - handle_t *handle, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - struct ocfs2_xattr_value_buf *vb, - size_t offs) +static int ocfs2_xa_journal_access(handle_t *handle, struct ocfs2_xa_loc *loc, + int type) { - int ret; + return loc->xl_ops->xlo_journal_access(handle, loc, type); +} - ret = vb->vb_access(handle, INODE_CACHE(inode), vb->vb_bh, - OCFS2_JOURNAL_ACCESS_WRITE); - if (ret) { - mlog_errno(ret); - goto out; - } +static void ocfs2_xa_journal_dirty(handle_t *handle, struct ocfs2_xa_loc *loc) +{ + loc->xl_ops->xlo_journal_dirty(handle, loc); +} - xs->here->xe_name_offset = cpu_to_le16(offs); - xs->here->xe_value_size = cpu_to_le64(xi->value_len); - if (xi->value_len <= OCFS2_XATTR_INLINE_SIZE) - ocfs2_xattr_set_local(xs->here, 1); - else - ocfs2_xattr_set_local(xs->here, 0); - ocfs2_xattr_hash_entry(inode, xs->header, xs->here); +/* Give a pointer into the storage for the given offset */ +static void *ocfs2_xa_offset_pointer(struct ocfs2_xa_loc *loc, int offset) +{ + BUG_ON(offset >= loc->xl_size); + return loc->xl_ops->xlo_offset_pointer(loc, offset); +} - ret = ocfs2_journal_dirty(handle, vb->vb_bh); - if (ret < 0) - mlog_errno(ret); -out: - return ret; +/* + * Wipe the name+value pair and allow the storage to reclaim it. This + * must be followed by either removal of the entry or a call to + * ocfs2_xa_add_namevalue(). + */ +static void ocfs2_xa_wipe_namevalue(struct ocfs2_xa_loc *loc) +{ + loc->xl_ops->xlo_wipe_namevalue(loc); } /* - * ocfs2_xattr_set_value_outside() - * - * Set large size value in B tree. + * Find lowest offset to a name+value pair. This is the start of our + * downward-growing free space. */ -static int ocfs2_xattr_set_value_outside(struct inode *inode, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - struct ocfs2_xattr_set_ctxt *ctxt, - struct ocfs2_xattr_value_buf *vb, - size_t offs) +static int ocfs2_xa_get_free_start(struct ocfs2_xa_loc *loc) { - size_t name_len = strlen(xi->name); - void *val = xs->base + offs; - struct ocfs2_xattr_value_root *xv = NULL; - size_t size = OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_ROOT_SIZE; - int ret = 0; + return loc->xl_ops->xlo_get_free_start(loc); +} - memset(val, 0, size); - memcpy(val, xi->name, name_len); - xv = (struct ocfs2_xattr_value_root *) - (val + OCFS2_XATTR_SIZE(name_len)); - xv->xr_clusters = 0; - xv->xr_last_eb_blk = 0; - xv->xr_list.l_tree_depth = 0; - xv->xr_list.l_count = cpu_to_le16(1); - xv->xr_list.l_next_free_rec = 0; - vb->vb_xv = xv; - - ret = ocfs2_xattr_value_truncate(inode, vb, xi->value_len, ctxt); - if (ret < 0) { - mlog_errno(ret); - return ret; +/* Can we reuse loc->xl_entry for xi? */ +static int ocfs2_xa_can_reuse_entry(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi) +{ + return loc->xl_ops->xlo_can_reuse(loc, xi); +} + +/* How much free space is needed to set the new value */ +static int ocfs2_xa_check_space(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi) +{ + return loc->xl_ops->xlo_check_space(loc, xi); +} + +static void ocfs2_xa_add_entry(struct ocfs2_xa_loc *loc, u32 name_hash) +{ + loc->xl_ops->xlo_add_entry(loc, name_hash); + loc->xl_entry->xe_name_hash = cpu_to_le32(name_hash); + /* + * We can't leave the new entry's xe_name_offset at zero or + * add_namevalue() will go nuts. We set it to the size of our + * storage so that it can never be less than any other entry. + */ + loc->xl_entry->xe_name_offset = cpu_to_le16(loc->xl_size); +} + +static void ocfs2_xa_add_namevalue(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi) +{ + int size = namevalue_size_xi(xi); + int nameval_offset; + char *nameval_buf; + + loc->xl_ops->xlo_add_namevalue(loc, size); + loc->xl_entry->xe_value_size = cpu_to_le64(xi->xi_value_len); + loc->xl_entry->xe_name_len = xi->xi_name_len; + ocfs2_xattr_set_type(loc->xl_entry, xi->xi_name_index); + ocfs2_xattr_set_local(loc->xl_entry, + xi->xi_value_len <= OCFS2_XATTR_INLINE_SIZE); + + nameval_offset = le16_to_cpu(loc->xl_entry->xe_name_offset); + nameval_buf = ocfs2_xa_offset_pointer(loc, nameval_offset); + memset(nameval_buf, 0, size); + memcpy(nameval_buf, xi->xi_name, xi->xi_name_len); +} + +static void ocfs2_xa_fill_value_buf(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_value_buf *vb) +{ + int nameval_offset = le16_to_cpu(loc->xl_entry->xe_name_offset); + int name_size = OCFS2_XATTR_SIZE(loc->xl_entry->xe_name_len); + + /* Value bufs are for value trees */ + BUG_ON(ocfs2_xattr_is_local(loc->xl_entry)); + BUG_ON(namevalue_size_xe(loc->xl_entry) != + (name_size + OCFS2_XATTR_ROOT_SIZE)); + + loc->xl_ops->xlo_fill_value_buf(loc, vb); + vb->vb_xv = + (struct ocfs2_xattr_value_root *)ocfs2_xa_offset_pointer(loc, + nameval_offset + + name_size); +} + +static int ocfs2_xa_block_journal_access(handle_t *handle, + struct ocfs2_xa_loc *loc, int type) +{ + struct buffer_head *bh = loc->xl_storage; + ocfs2_journal_access_func access; + + if (loc->xl_size == (bh->b_size - + offsetof(struct ocfs2_xattr_block, + xb_attrs.xb_header))) + access = ocfs2_journal_access_xb; + else + access = ocfs2_journal_access_di; + return access(handle, INODE_CACHE(loc->xl_inode), bh, type); +} + +static void ocfs2_xa_block_journal_dirty(handle_t *handle, + struct ocfs2_xa_loc *loc) +{ + struct buffer_head *bh = loc->xl_storage; + + ocfs2_journal_dirty(handle, bh); +} + +static void *ocfs2_xa_block_offset_pointer(struct ocfs2_xa_loc *loc, + int offset) +{ + return (char *)loc->xl_header + offset; +} + +static int ocfs2_xa_block_can_reuse(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi) +{ + /* + * Block storage is strict. If the sizes aren't exact, we will + * remove the old one and reinsert the new. + */ + return namevalue_size_xe(loc->xl_entry) == + namevalue_size_xi(xi); +} + +static int ocfs2_xa_block_get_free_start(struct ocfs2_xa_loc *loc) +{ + struct ocfs2_xattr_header *xh = loc->xl_header; + int i, count = le16_to_cpu(xh->xh_count); + int offset, free_start = loc->xl_size; + + for (i = 0; i < count; i++) { + offset = le16_to_cpu(xh->xh_entries[i].xe_name_offset); + if (offset < free_start) + free_start = offset; } - ret = ocfs2_xattr_update_entry(inode, ctxt->handle, xi, xs, vb, offs); - if (ret < 0) { - mlog_errno(ret); - return ret; + + return free_start; +} + +static int ocfs2_xa_block_check_space(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi) +{ + int count = le16_to_cpu(loc->xl_header->xh_count); + int free_start = ocfs2_xa_get_free_start(loc); + int needed_space = ocfs2_xi_entry_usage(xi); + + /* + * Block storage will reclaim the original entry before inserting + * the new value, so we only need the difference. If the new + * entry is smaller than the old one, we don't need anything. + */ + if (loc->xl_entry) { + /* Don't need space if we're reusing! */ + if (ocfs2_xa_can_reuse_entry(loc, xi)) + needed_space = 0; + else + needed_space -= ocfs2_xe_entry_usage(loc->xl_entry); } - ret = __ocfs2_xattr_set_value_outside(inode, ctxt->handle, vb, - xi->value, xi->value_len); - if (ret < 0) - mlog_errno(ret); + if (needed_space < 0) + needed_space = 0; + return ocfs2_xa_check_space_helper(needed_space, free_start, count); +} - return ret; +/* + * Block storage for xattrs keeps the name+value pairs compacted. When + * we remove one, we have to shift any that preceded it towards the end. + */ +static void ocfs2_xa_block_wipe_namevalue(struct ocfs2_xa_loc *loc) +{ + int i, offset; + int namevalue_offset, first_namevalue_offset, namevalue_size; + struct ocfs2_xattr_entry *entry = loc->xl_entry; + struct ocfs2_xattr_header *xh = loc->xl_header; + int count = le16_to_cpu(xh->xh_count); + + namevalue_offset = le16_to_cpu(entry->xe_name_offset); + namevalue_size = namevalue_size_xe(entry); + first_namevalue_offset = ocfs2_xa_get_free_start(loc); + + /* Shift the name+value pairs */ + memmove((char *)xh + first_namevalue_offset + namevalue_size, + (char *)xh + first_namevalue_offset, + namevalue_offset - first_namevalue_offset); + memset((char *)xh + first_namevalue_offset, 0, namevalue_size); + + /* Now tell xh->xh_entries about it */ + for (i = 0; i < count; i++) { + offset = le16_to_cpu(xh->xh_entries[i].xe_name_offset); + if (offset < namevalue_offset) + le16_add_cpu(&xh->xh_entries[i].xe_name_offset, + namevalue_size); + } + + /* + * Note that we don't update xh_free_start or xh_name_value_len + * because they're not used in block-stored xattrs. + */ +} + +static void ocfs2_xa_block_add_entry(struct ocfs2_xa_loc *loc, u32 name_hash) +{ + int count = le16_to_cpu(loc->xl_header->xh_count); + loc->xl_entry = &(loc->xl_header->xh_entries[count]); + le16_add_cpu(&loc->xl_header->xh_count, 1); + memset(loc->xl_entry, 0, sizeof(struct ocfs2_xattr_entry)); +} + +static void ocfs2_xa_block_add_namevalue(struct ocfs2_xa_loc *loc, int size) +{ + int free_start = ocfs2_xa_get_free_start(loc); + + loc->xl_entry->xe_name_offset = cpu_to_le16(free_start - size); +} + +static void ocfs2_xa_block_fill_value_buf(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_value_buf *vb) +{ + struct buffer_head *bh = loc->xl_storage; + + if (loc->xl_size == (bh->b_size - + offsetof(struct ocfs2_xattr_block, + xb_attrs.xb_header))) + vb->vb_access = ocfs2_journal_access_xb; + else + vb->vb_access = ocfs2_journal_access_di; + vb->vb_bh = bh; } /* - * ocfs2_xattr_set_entry_local() - * - * Set, replace or remove extended attribute in local. + * Operations for xattrs stored in blocks. This includes inline inode + * storage and unindexed ocfs2_xattr_blocks. */ -static void ocfs2_xattr_set_entry_local(struct inode *inode, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - struct ocfs2_xattr_entry *last, - size_t min_offs) +static const struct ocfs2_xa_loc_operations ocfs2_xa_block_loc_ops = { + .xlo_journal_access = ocfs2_xa_block_journal_access, + .xlo_journal_dirty = ocfs2_xa_block_journal_dirty, + .xlo_offset_pointer = ocfs2_xa_block_offset_pointer, + .xlo_check_space = ocfs2_xa_block_check_space, + .xlo_can_reuse = ocfs2_xa_block_can_reuse, + .xlo_get_free_start = ocfs2_xa_block_get_free_start, + .xlo_wipe_namevalue = ocfs2_xa_block_wipe_namevalue, + .xlo_add_entry = ocfs2_xa_block_add_entry, + .xlo_add_namevalue = ocfs2_xa_block_add_namevalue, + .xlo_fill_value_buf = ocfs2_xa_block_fill_value_buf, +}; + +static int ocfs2_xa_bucket_journal_access(handle_t *handle, + struct ocfs2_xa_loc *loc, int type) { - size_t name_len = strlen(xi->name); - int i; + struct ocfs2_xattr_bucket *bucket = loc->xl_storage; - if (xi->value && xs->not_found) { - /* Insert the new xattr entry. */ - le16_add_cpu(&xs->header->xh_count, 1); - ocfs2_xattr_set_type(last, xi->name_index); - ocfs2_xattr_set_local(last, 1); - last->xe_name_len = name_len; - } else { - void *first_val; - void *val; - size_t offs, size; - - first_val = xs->base + min_offs; - offs = le16_to_cpu(xs->here->xe_name_offset); - val = xs->base + offs; - - if (le64_to_cpu(xs->here->xe_value_size) > - OCFS2_XATTR_INLINE_SIZE) - size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_ROOT_SIZE; + return ocfs2_xattr_bucket_journal_access(handle, bucket, type); +} + +static void ocfs2_xa_bucket_journal_dirty(handle_t *handle, + struct ocfs2_xa_loc *loc) +{ + struct ocfs2_xattr_bucket *bucket = loc->xl_storage; + + ocfs2_xattr_bucket_journal_dirty(handle, bucket); +} + +static void *ocfs2_xa_bucket_offset_pointer(struct ocfs2_xa_loc *loc, + int offset) +{ + struct ocfs2_xattr_bucket *bucket = loc->xl_storage; + int block, block_offset; + + /* The header is at the front of the bucket */ + block = offset >> loc->xl_inode->i_sb->s_blocksize_bits; + block_offset = offset % loc->xl_inode->i_sb->s_blocksize; + + return bucket_block(bucket, block) + block_offset; +} + +static int ocfs2_xa_bucket_can_reuse(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi) +{ + return namevalue_size_xe(loc->xl_entry) >= + namevalue_size_xi(xi); +} + +static int ocfs2_xa_bucket_get_free_start(struct ocfs2_xa_loc *loc) +{ + struct ocfs2_xattr_bucket *bucket = loc->xl_storage; + return le16_to_cpu(bucket_xh(bucket)->xh_free_start); +} + +static int ocfs2_bucket_align_free_start(struct super_block *sb, + int free_start, int size) +{ + /* + * We need to make sure that the name+value pair fits within + * one block. + */ + if (((free_start - size) >> sb->s_blocksize_bits) != + ((free_start - 1) >> sb->s_blocksize_bits)) + free_start -= free_start % sb->s_blocksize; + + return free_start; +} + +static int ocfs2_xa_bucket_check_space(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi) +{ + int rc; + int count = le16_to_cpu(loc->xl_header->xh_count); + int free_start = ocfs2_xa_get_free_start(loc); + int needed_space = ocfs2_xi_entry_usage(xi); + int size = namevalue_size_xi(xi); + struct super_block *sb = loc->xl_inode->i_sb; + + /* + * Bucket storage does not reclaim name+value pairs it cannot + * reuse. They live as holes until the bucket fills, and then + * the bucket is defragmented. However, the bucket can reclaim + * the ocfs2_xattr_entry. + */ + if (loc->xl_entry) { + /* Don't need space if we're reusing! */ + if (ocfs2_xa_can_reuse_entry(loc, xi)) + needed_space = 0; else - size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(le64_to_cpu(xs->here->xe_value_size)); - - if (xi->value && size == OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(xi->value_len)) { - /* The old and the new value have the - same size. Just replace the value. */ - ocfs2_xattr_set_local(xs->here, 1); - xs->here->xe_value_size = cpu_to_le64(xi->value_len); - /* Clear value bytes. */ - memset(val + OCFS2_XATTR_SIZE(name_len), - 0, - OCFS2_XATTR_SIZE(xi->value_len)); - memcpy(val + OCFS2_XATTR_SIZE(name_len), - xi->value, - xi->value_len); - return; - } - /* Remove the old name+value. */ - memmove(first_val + size, first_val, val - first_val); - memset(first_val, 0, size); - xs->here->xe_name_hash = 0; - xs->here->xe_name_offset = 0; - ocfs2_xattr_set_local(xs->here, 1); - xs->here->xe_value_size = 0; - - min_offs += size; - - /* Adjust all value offsets. */ - last = xs->header->xh_entries; - for (i = 0 ; i < le16_to_cpu(xs->header->xh_count); i++) { - size_t o = le16_to_cpu(last->xe_name_offset); - - if (o < offs) - last->xe_name_offset = cpu_to_le16(o + size); - last += 1; - } + needed_space -= sizeof(struct ocfs2_xattr_entry); + } + BUG_ON(needed_space < 0); - if (!xi->value) { - /* Remove the old entry. */ - last -= 1; - memmove(xs->here, xs->here + 1, - (void *)last - (void *)xs->here); - memset(last, 0, sizeof(struct ocfs2_xattr_entry)); - le16_add_cpu(&xs->header->xh_count, -1); - } + if (free_start < size) { + if (needed_space) + return -ENOSPC; + } else { + /* + * First we check if it would fit in the first place. + * Below, we align the free start to a block. This may + * slide us below the minimum gap. By checking unaligned + * first, we avoid that error. + */ + rc = ocfs2_xa_check_space_helper(needed_space, free_start, + count); + if (rc) + return rc; + free_start = ocfs2_bucket_align_free_start(sb, free_start, + size); } - if (xi->value) { - /* Insert the new name+value. */ - size_t size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(xi->value_len); - void *val = xs->base + min_offs - size; + return ocfs2_xa_check_space_helper(needed_space, free_start, count); +} + +static void ocfs2_xa_bucket_wipe_namevalue(struct ocfs2_xa_loc *loc) +{ + le16_add_cpu(&loc->xl_header->xh_name_value_len, + -namevalue_size_xe(loc->xl_entry)); +} - xs->here->xe_name_offset = cpu_to_le16(min_offs - size); - memset(val, 0, size); - memcpy(val, xi->name, name_len); - memcpy(val + OCFS2_XATTR_SIZE(name_len), - xi->value, - xi->value_len); - xs->here->xe_value_size = cpu_to_le64(xi->value_len); - ocfs2_xattr_set_local(xs->here, 1); - ocfs2_xattr_hash_entry(inode, xs->header, xs->here); +static void ocfs2_xa_bucket_add_entry(struct ocfs2_xa_loc *loc, u32 name_hash) +{ + struct ocfs2_xattr_header *xh = loc->xl_header; + int count = le16_to_cpu(xh->xh_count); + int low = 0, high = count - 1, tmp; + struct ocfs2_xattr_entry *tmp_xe; + + /* + * We keep buckets sorted by name_hash, so we need to find + * our insert place. + */ + while (low <= high && count) { + tmp = (low + high) / 2; + tmp_xe = &xh->xh_entries[tmp]; + + if (name_hash > le32_to_cpu(tmp_xe->xe_name_hash)) + low = tmp + 1; + else if (name_hash < le32_to_cpu(tmp_xe->xe_name_hash)) + high = tmp - 1; + else { + low = tmp; + break; + } } - return; + if (low != count) + memmove(&xh->xh_entries[low + 1], + &xh->xh_entries[low], + ((count - low) * sizeof(struct ocfs2_xattr_entry))); + + le16_add_cpu(&xh->xh_count, 1); + loc->xl_entry = &xh->xh_entries[low]; + memset(loc->xl_entry, 0, sizeof(struct ocfs2_xattr_entry)); +} + +static void ocfs2_xa_bucket_add_namevalue(struct ocfs2_xa_loc *loc, int size) +{ + int free_start = ocfs2_xa_get_free_start(loc); + struct ocfs2_xattr_header *xh = loc->xl_header; + struct super_block *sb = loc->xl_inode->i_sb; + int nameval_offset; + + free_start = ocfs2_bucket_align_free_start(sb, free_start, size); + nameval_offset = free_start - size; + loc->xl_entry->xe_name_offset = cpu_to_le16(nameval_offset); + xh->xh_free_start = cpu_to_le16(nameval_offset); + le16_add_cpu(&xh->xh_name_value_len, size); + +} + +static void ocfs2_xa_bucket_fill_value_buf(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_value_buf *vb) +{ + struct ocfs2_xattr_bucket *bucket = loc->xl_storage; + struct super_block *sb = loc->xl_inode->i_sb; + int nameval_offset = le16_to_cpu(loc->xl_entry->xe_name_offset); + int size = namevalue_size_xe(loc->xl_entry); + int block_offset = nameval_offset >> sb->s_blocksize_bits; + + /* Values are not allowed to straddle block boundaries */ + BUG_ON(block_offset != + ((nameval_offset + size - 1) >> sb->s_blocksize_bits)); + /* We expect the bucket to be filled in */ + BUG_ON(!bucket->bu_bhs[block_offset]); + + vb->vb_access = ocfs2_journal_access; + vb->vb_bh = bucket->bu_bhs[block_offset]; +} + +/* Operations for xattrs stored in buckets. */ +static const struct ocfs2_xa_loc_operations ocfs2_xa_bucket_loc_ops = { + .xlo_journal_access = ocfs2_xa_bucket_journal_access, + .xlo_journal_dirty = ocfs2_xa_bucket_journal_dirty, + .xlo_offset_pointer = ocfs2_xa_bucket_offset_pointer, + .xlo_check_space = ocfs2_xa_bucket_check_space, + .xlo_can_reuse = ocfs2_xa_bucket_can_reuse, + .xlo_get_free_start = ocfs2_xa_bucket_get_free_start, + .xlo_wipe_namevalue = ocfs2_xa_bucket_wipe_namevalue, + .xlo_add_entry = ocfs2_xa_bucket_add_entry, + .xlo_add_namevalue = ocfs2_xa_bucket_add_namevalue, + .xlo_fill_value_buf = ocfs2_xa_bucket_fill_value_buf, +}; + +static unsigned int ocfs2_xa_value_clusters(struct ocfs2_xa_loc *loc) +{ + struct ocfs2_xattr_value_buf vb; + + if (ocfs2_xattr_is_local(loc->xl_entry)) + return 0; + + ocfs2_xa_fill_value_buf(loc, &vb); + return le32_to_cpu(vb.vb_xv->xr_clusters); +} + +static int ocfs2_xa_value_truncate(struct ocfs2_xa_loc *loc, u64 bytes, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int trunc_rc, access_rc; + struct ocfs2_xattr_value_buf vb; + + ocfs2_xa_fill_value_buf(loc, &vb); + trunc_rc = ocfs2_xattr_value_truncate(loc->xl_inode, &vb, bytes, + ctxt); + + /* + * The caller of ocfs2_xa_value_truncate() has already called + * ocfs2_xa_journal_access on the loc. However, The truncate code + * calls ocfs2_extend_trans(). This may commit the previous + * transaction and open a new one. If this is a bucket, truncate + * could leave only vb->vb_bh set up for journaling. Meanwhile, + * the caller is expecting to dirty the entire bucket. So we must + * reset the journal work. We do this even if truncate has failed, + * as it could have failed after committing the extend. + */ + access_rc = ocfs2_xa_journal_access(ctxt->handle, loc, + OCFS2_JOURNAL_ACCESS_WRITE); + + /* Errors in truncate take precedence */ + return trunc_rc ? trunc_rc : access_rc; +} + +static void ocfs2_xa_remove_entry(struct ocfs2_xa_loc *loc) +{ + int index, count; + struct ocfs2_xattr_header *xh = loc->xl_header; + struct ocfs2_xattr_entry *entry = loc->xl_entry; + + ocfs2_xa_wipe_namevalue(loc); + loc->xl_entry = NULL; + + le16_add_cpu(&xh->xh_count, -1); + count = le16_to_cpu(xh->xh_count); + + /* + * Only zero out the entry if there are more remaining. This is + * important for an empty bucket, as it keeps track of the + * bucket's hash value. It doesn't hurt empty block storage. + */ + if (count) { + index = ((char *)entry - (char *)&xh->xh_entries) / + sizeof(struct ocfs2_xattr_entry); + memmove(&xh->xh_entries[index], &xh->xh_entries[index + 1], + (count - index) * sizeof(struct ocfs2_xattr_entry)); + memset(&xh->xh_entries[count], 0, + sizeof(struct ocfs2_xattr_entry)); + } } /* - * ocfs2_xattr_set_entry() + * If we have a problem adjusting the size of an external value during + * ocfs2_xa_prepare_entry() or ocfs2_xa_remove(), we may have an xattr + * in an intermediate state. For example, the value may be partially + * truncated. + * + * If the value tree hasn't changed, the extend/truncate went nowhere. + * We have nothing to do. The caller can treat it as a straight error. * - * Set extended attribute entry into inode or block. + * If the value tree got partially truncated, we now have a corrupted + * extended attribute. We're going to wipe its entry and leak the + * clusters. Better to leak some storage than leave a corrupt entry. * - * If extended attribute value size > OCFS2_XATTR_INLINE_SIZE, - * We first insert tree root(ocfs2_xattr_value_root) with set_entry_local(), - * then set value in B tree with set_value_outside(). + * If the value tree grew, it obviously didn't grow enough for the + * new entry. We're not going to try and reclaim those clusters either. + * If there was already an external value there (orig_clusters != 0), + * the new clusters are attached safely and we can just leave the old + * value in place. If there was no external value there, we remove + * the entry. + * + * This way, the xattr block we store in the journal will be consistent. + * If the size change broke because of the journal, no changes will hit + * disk anyway. */ -static int ocfs2_xattr_set_entry(struct inode *inode, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - struct ocfs2_xattr_set_ctxt *ctxt, - int flag) -{ - struct ocfs2_xattr_entry *last; - struct ocfs2_inode_info *oi = OCFS2_I(inode); - struct ocfs2_dinode *di = (struct ocfs2_dinode *)xs->inode_bh->b_data; - size_t min_offs = xs->end - xs->base, name_len = strlen(xi->name); - size_t size_l = 0; - handle_t *handle = ctxt->handle; - int free, i, ret; - struct ocfs2_xattr_info xi_l = { - .name_index = xi->name_index, - .name = xi->name, - .value = xi->value, - .value_len = xi->value_len, - }; - struct ocfs2_xattr_value_buf vb = { - .vb_bh = xs->xattr_bh, - .vb_access = ocfs2_journal_access_di, - }; +static void ocfs2_xa_cleanup_value_truncate(struct ocfs2_xa_loc *loc, + const char *what, + unsigned int orig_clusters) +{ + unsigned int new_clusters = ocfs2_xa_value_clusters(loc); + char *nameval_buf = ocfs2_xa_offset_pointer(loc, + le16_to_cpu(loc->xl_entry->xe_name_offset)); + + if (new_clusters < orig_clusters) { + mlog(ML_ERROR, + "Partial truncate while %s xattr %.*s. Leaking " + "%u clusters and removing the entry\n", + what, loc->xl_entry->xe_name_len, nameval_buf, + orig_clusters - new_clusters); + ocfs2_xa_remove_entry(loc); + } else if (!orig_clusters) { + mlog(ML_ERROR, + "Unable to allocate an external value for xattr " + "%.*s safely. Leaking %u clusters and removing the " + "entry\n", + loc->xl_entry->xe_name_len, nameval_buf, + new_clusters - orig_clusters); + ocfs2_xa_remove_entry(loc); + } else if (new_clusters > orig_clusters) + mlog(ML_ERROR, + "Unable to grow xattr %.*s safely. %u new clusters " + "have been added, but the value will not be " + "modified\n", + loc->xl_entry->xe_name_len, nameval_buf, + new_clusters - orig_clusters); +} + +static int ocfs2_xa_remove(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int rc = 0; + unsigned int orig_clusters; + + if (!ocfs2_xattr_is_local(loc->xl_entry)) { + orig_clusters = ocfs2_xa_value_clusters(loc); + rc = ocfs2_xa_value_truncate(loc, 0, ctxt); + if (rc) { + mlog_errno(rc); + /* + * Since this is remove, we can return 0 if + * ocfs2_xa_cleanup_value_truncate() is going to + * wipe the entry anyway. So we check the + * cluster count as well. + */ + if (orig_clusters != ocfs2_xa_value_clusters(loc)) + rc = 0; + ocfs2_xa_cleanup_value_truncate(loc, "removing", + orig_clusters); + if (rc) + goto out; + } + } - if (!(flag & OCFS2_INLINE_XATTR_FL)) { - BUG_ON(xs->xattr_bh == xs->inode_bh); - vb.vb_access = ocfs2_journal_access_xb; - } else - BUG_ON(xs->xattr_bh != xs->inode_bh); + ocfs2_xa_remove_entry(loc); - /* Compute min_offs, last and free space. */ - last = xs->header->xh_entries; +out: + return rc; +} - for (i = 0 ; i < le16_to_cpu(xs->header->xh_count); i++) { - size_t offs = le16_to_cpu(last->xe_name_offset); - if (offs < min_offs) - min_offs = offs; - last += 1; - } +static void ocfs2_xa_install_value_root(struct ocfs2_xa_loc *loc) +{ + int name_size = OCFS2_XATTR_SIZE(loc->xl_entry->xe_name_len); + char *nameval_buf; - free = min_offs - ((void *)last - xs->base) - OCFS2_XATTR_HEADER_GAP; - if (free < 0) - return -EIO; + nameval_buf = ocfs2_xa_offset_pointer(loc, + le16_to_cpu(loc->xl_entry->xe_name_offset)); + memcpy(nameval_buf + name_size, &def_xv, OCFS2_XATTR_ROOT_SIZE); +} - if (!xs->not_found) { - size_t size = 0; - if (ocfs2_xattr_is_local(xs->here)) - size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(le64_to_cpu(xs->here->xe_value_size)); - else - size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_ROOT_SIZE; - free += (size + sizeof(struct ocfs2_xattr_entry)); - } - /* Check free space in inode or block */ - if (xi->value && xi->value_len > OCFS2_XATTR_INLINE_SIZE) { - if (free < sizeof(struct ocfs2_xattr_entry) + - OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_ROOT_SIZE) { - ret = -ENOSPC; - goto out; +/* + * Take an existing entry and make it ready for the new value. This + * won't allocate space, but it may free space. It should be ready for + * ocfs2_xa_prepare_entry() to finish the work. + */ +static int ocfs2_xa_reuse_entry(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int rc = 0; + int name_size = OCFS2_XATTR_SIZE(xi->xi_name_len); + unsigned int orig_clusters; + char *nameval_buf; + int xe_local = ocfs2_xattr_is_local(loc->xl_entry); + int xi_local = xi->xi_value_len <= OCFS2_XATTR_INLINE_SIZE; + + BUG_ON(OCFS2_XATTR_SIZE(loc->xl_entry->xe_name_len) != + name_size); + + nameval_buf = ocfs2_xa_offset_pointer(loc, + le16_to_cpu(loc->xl_entry->xe_name_offset)); + if (xe_local) { + memset(nameval_buf + name_size, 0, + namevalue_size_xe(loc->xl_entry) - name_size); + if (!xi_local) + ocfs2_xa_install_value_root(loc); + } else { + orig_clusters = ocfs2_xa_value_clusters(loc); + if (xi_local) { + rc = ocfs2_xa_value_truncate(loc, 0, ctxt); + if (rc < 0) + mlog_errno(rc); + else + memset(nameval_buf + name_size, 0, + namevalue_size_xe(loc->xl_entry) - + name_size); + } else if (le64_to_cpu(loc->xl_entry->xe_value_size) > + xi->xi_value_len) { + rc = ocfs2_xa_value_truncate(loc, xi->xi_value_len, + ctxt); + if (rc < 0) + mlog_errno(rc); } - size_l = OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_ROOT_SIZE; - xi_l.value = (void *)&def_xv; - xi_l.value_len = OCFS2_XATTR_ROOT_SIZE; - } else if (xi->value) { - if (free < sizeof(struct ocfs2_xattr_entry) + - OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(xi->value_len)) { - ret = -ENOSPC; + + if (rc) { + ocfs2_xa_cleanup_value_truncate(loc, "reusing", + orig_clusters); goto out; } } - if (!xs->not_found) { - /* For existing extended attribute */ - size_t size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(le64_to_cpu(xs->here->xe_value_size)); - size_t offs = le16_to_cpu(xs->here->xe_name_offset); - void *val = xs->base + offs; + loc->xl_entry->xe_value_size = cpu_to_le64(xi->xi_value_len); + ocfs2_xattr_set_local(loc->xl_entry, xi_local); - if (ocfs2_xattr_is_local(xs->here) && size == size_l) { - /* Replace existing local xattr with tree root */ - ret = ocfs2_xattr_set_value_outside(inode, xi, xs, - ctxt, &vb, offs); - if (ret < 0) - mlog_errno(ret); - goto out; - } else if (!ocfs2_xattr_is_local(xs->here)) { - /* For existing xattr which has value outside */ - vb.vb_xv = (struct ocfs2_xattr_value_root *) - (val + OCFS2_XATTR_SIZE(name_len)); +out: + return rc; +} - if (xi->value_len > OCFS2_XATTR_INLINE_SIZE) { - /* - * If new value need set outside also, - * first truncate old value to new value, - * then set new value with set_value_outside(). - */ - ret = ocfs2_xattr_value_truncate(inode, - &vb, - xi->value_len, - ctxt); - if (ret < 0) { - mlog_errno(ret); - goto out; - } +/* + * Prepares loc->xl_entry to receive the new xattr. This includes + * properly setting up the name+value pair region. If loc->xl_entry + * already exists, it will take care of modifying it appropriately. + * + * Note that this modifies the data. You did journal_access already, + * right? + */ +static int ocfs2_xa_prepare_entry(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi, + u32 name_hash, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int rc = 0; + unsigned int orig_clusters; + __le64 orig_value_size = 0; - ret = ocfs2_xattr_update_entry(inode, - handle, - xi, - xs, - &vb, - offs); - if (ret < 0) { - mlog_errno(ret); - goto out; - } + rc = ocfs2_xa_check_space(loc, xi); + if (rc) + goto out; - ret = __ocfs2_xattr_set_value_outside(inode, - handle, - &vb, - xi->value, - xi->value_len); - if (ret < 0) - mlog_errno(ret); + if (loc->xl_entry) { + if (ocfs2_xa_can_reuse_entry(loc, xi)) { + orig_value_size = loc->xl_entry->xe_value_size; + rc = ocfs2_xa_reuse_entry(loc, xi, ctxt); + if (rc) + goto out; + goto alloc_value; + } + + if (!ocfs2_xattr_is_local(loc->xl_entry)) { + orig_clusters = ocfs2_xa_value_clusters(loc); + rc = ocfs2_xa_value_truncate(loc, 0, ctxt); + if (rc) { + mlog_errno(rc); + ocfs2_xa_cleanup_value_truncate(loc, + "overwriting", + orig_clusters); goto out; - } else { - /* - * If new value need set in local, - * just trucate old value to zero. - */ - ret = ocfs2_xattr_value_truncate(inode, - &vb, - 0, - ctxt); - if (ret < 0) - mlog_errno(ret); } } + ocfs2_xa_wipe_namevalue(loc); + } else + ocfs2_xa_add_entry(loc, name_hash); + + /* + * If we get here, we have a blank entry. Fill it. We grow our + * name+value pair back from the end. + */ + ocfs2_xa_add_namevalue(loc, xi); + if (xi->xi_value_len > OCFS2_XATTR_INLINE_SIZE) + ocfs2_xa_install_value_root(loc); + +alloc_value: + if (xi->xi_value_len > OCFS2_XATTR_INLINE_SIZE) { + orig_clusters = ocfs2_xa_value_clusters(loc); + rc = ocfs2_xa_value_truncate(loc, xi->xi_value_len, ctxt); + if (rc < 0) { + /* + * If we tried to grow an existing external value, + * ocfs2_xa_cleanuP-value_truncate() is going to + * let it stand. We have to restore its original + * value size. + */ + loc->xl_entry->xe_value_size = orig_value_size; + ocfs2_xa_cleanup_value_truncate(loc, "growing", + orig_clusters); + mlog_errno(rc); + } } - ret = ocfs2_journal_access_di(handle, INODE_CACHE(inode), xs->inode_bh, +out: + return rc; +} + +/* + * Store the value portion of the name+value pair. This will skip + * values that are stored externally. Their tree roots were set up + * by ocfs2_xa_prepare_entry(). + */ +static int ocfs2_xa_store_value(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int rc = 0; + int nameval_offset = le16_to_cpu(loc->xl_entry->xe_name_offset); + int name_size = OCFS2_XATTR_SIZE(xi->xi_name_len); + char *nameval_buf; + struct ocfs2_xattr_value_buf vb; + + nameval_buf = ocfs2_xa_offset_pointer(loc, nameval_offset); + if (xi->xi_value_len > OCFS2_XATTR_INLINE_SIZE) { + ocfs2_xa_fill_value_buf(loc, &vb); + rc = __ocfs2_xattr_set_value_outside(loc->xl_inode, + ctxt->handle, &vb, + xi->xi_value, + xi->xi_value_len); + } else + memcpy(nameval_buf + name_size, xi->xi_value, xi->xi_value_len); + + return rc; +} + +static int ocfs2_xa_set(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_info *xi, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int ret; + u32 name_hash = ocfs2_xattr_name_hash(loc->xl_inode, xi->xi_name, + xi->xi_name_len); + + ret = ocfs2_xa_journal_access(ctxt->handle, loc, OCFS2_JOURNAL_ACCESS_WRITE); if (ret) { mlog_errno(ret); goto out; } - if (!(flag & OCFS2_INLINE_XATTR_FL)) { - ret = vb.vb_access(handle, INODE_CACHE(inode), vb.vb_bh, - OCFS2_JOURNAL_ACCESS_WRITE); - if (ret) { - mlog_errno(ret); - goto out; - } - } - /* - * Set value in local, include set tree root in local. - * This is the first step for value size >INLINE_SIZE. + * From here on out, everything is going to modify the buffer a + * little. Errors are going to leave the xattr header in a + * sane state. Thus, even with errors we dirty the sucker. */ - ocfs2_xattr_set_entry_local(inode, &xi_l, xs, last, min_offs); - if (!(flag & OCFS2_INLINE_XATTR_FL)) { - ret = ocfs2_journal_dirty(handle, xs->xattr_bh); - if (ret < 0) { - mlog_errno(ret); - goto out; - } + /* Don't worry, we are never called with !xi_value and !xl_entry */ + if (!xi->xi_value) { + ret = ocfs2_xa_remove(loc, ctxt); + goto out_dirty; } - if (!(oi->ip_dyn_features & OCFS2_INLINE_XATTR_FL) && - (flag & OCFS2_INLINE_XATTR_FL)) { - struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); - unsigned int xattrsize = osb->s_xattr_inline_size; - - /* - * Adjust extent record count or inline data size - * to reserve space for extended attribute. - */ - if (oi->ip_dyn_features & OCFS2_INLINE_DATA_FL) { - struct ocfs2_inline_data *idata = &di->id2.i_data; - le16_add_cpu(&idata->id_count, -xattrsize); - } else if (!(ocfs2_inode_is_fast_symlink(inode))) { - struct ocfs2_extent_list *el = &di->id2.i_list; - le16_add_cpu(&el->l_count, -(xattrsize / - sizeof(struct ocfs2_extent_rec))); - } - di->i_xattr_inline_size = cpu_to_le16(xattrsize); + ret = ocfs2_xa_prepare_entry(loc, xi, name_hash, ctxt); + if (ret) { + if (ret != -ENOSPC) + mlog_errno(ret); + goto out_dirty; } - /* Update xattr flag */ - spin_lock(&oi->ip_lock); - oi->ip_dyn_features |= flag; - di->i_dyn_features = cpu_to_le16(oi->ip_dyn_features); - spin_unlock(&oi->ip_lock); - ret = ocfs2_journal_dirty(handle, xs->inode_bh); - if (ret < 0) + ret = ocfs2_xa_store_value(loc, xi, ctxt); + if (ret) mlog_errno(ret); - if (!ret && xi->value_len > OCFS2_XATTR_INLINE_SIZE) { - /* - * Set value outside in B tree. - * This is the second step for value size > INLINE_SIZE. - */ - size_t offs = le16_to_cpu(xs->here->xe_name_offset); - ret = ocfs2_xattr_set_value_outside(inode, xi, xs, ctxt, - &vb, offs); - if (ret < 0) { - int ret2; +out_dirty: + ocfs2_xa_journal_dirty(ctxt->handle, loc); - mlog_errno(ret); - /* - * If set value outside failed, we have to clean - * the junk tree root we have already set in local. - */ - ret2 = ocfs2_xattr_cleanup(inode, ctxt->handle, - xi, xs, &vb, offs); - if (ret2 < 0) - mlog_errno(ret2); - } - } out: return ret; } +static void ocfs2_init_dinode_xa_loc(struct ocfs2_xa_loc *loc, + struct inode *inode, + struct buffer_head *bh, + struct ocfs2_xattr_entry *entry) +{ + struct ocfs2_dinode *di = (struct ocfs2_dinode *)bh->b_data; + + BUG_ON(!(OCFS2_I(inode)->ip_dyn_features & OCFS2_INLINE_XATTR_FL)); + + loc->xl_inode = inode; + loc->xl_ops = &ocfs2_xa_block_loc_ops; + loc->xl_storage = bh; + loc->xl_entry = entry; + loc->xl_size = le16_to_cpu(di->i_xattr_inline_size); + loc->xl_header = + (struct ocfs2_xattr_header *)(bh->b_data + bh->b_size - + loc->xl_size); +} + +static void ocfs2_init_xattr_block_xa_loc(struct ocfs2_xa_loc *loc, + struct inode *inode, + struct buffer_head *bh, + struct ocfs2_xattr_entry *entry) +{ + struct ocfs2_xattr_block *xb = + (struct ocfs2_xattr_block *)bh->b_data; + + BUG_ON(le16_to_cpu(xb->xb_flags) & OCFS2_XATTR_INDEXED); + + loc->xl_inode = inode; + loc->xl_ops = &ocfs2_xa_block_loc_ops; + loc->xl_storage = bh; + loc->xl_header = &(xb->xb_attrs.xb_header); + loc->xl_entry = entry; + loc->xl_size = bh->b_size - offsetof(struct ocfs2_xattr_block, + xb_attrs.xb_header); +} + +static void ocfs2_init_xattr_bucket_xa_loc(struct ocfs2_xa_loc *loc, + struct ocfs2_xattr_bucket *bucket, + struct ocfs2_xattr_entry *entry) +{ + loc->xl_inode = bucket->bu_inode; + loc->xl_ops = &ocfs2_xa_bucket_loc_ops; + loc->xl_storage = bucket; + loc->xl_header = bucket_xh(bucket); + loc->xl_entry = entry; + loc->xl_size = OCFS2_XATTR_BUCKET_SIZE; +} + /* * In xattr remove, if it is stored outside and refcounted, we may have * the chance to split the refcount tree. So need the allocators. @@ -2149,6 +2683,55 @@ static int ocfs2_xattr_ibody_find(struct inode *inode, return 0; } +static int ocfs2_xattr_ibody_init(struct inode *inode, + struct buffer_head *di_bh, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int ret; + struct ocfs2_inode_info *oi = OCFS2_I(inode); + struct ocfs2_dinode *di = (struct ocfs2_dinode *)di_bh->b_data; + struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); + unsigned int xattrsize = osb->s_xattr_inline_size; + + if (!ocfs2_xattr_has_space_inline(inode, di)) { + ret = -ENOSPC; + goto out; + } + + ret = ocfs2_journal_access_di(ctxt->handle, INODE_CACHE(inode), di_bh, + OCFS2_JOURNAL_ACCESS_WRITE); + if (ret) { + mlog_errno(ret); + goto out; + } + + /* + * Adjust extent record count or inline data size + * to reserve space for extended attribute. + */ + if (oi->ip_dyn_features & OCFS2_INLINE_DATA_FL) { + struct ocfs2_inline_data *idata = &di->id2.i_data; + le16_add_cpu(&idata->id_count, -xattrsize); + } else if (!(ocfs2_inode_is_fast_symlink(inode))) { + struct ocfs2_extent_list *el = &di->id2.i_list; + le16_add_cpu(&el->l_count, -(xattrsize / + sizeof(struct ocfs2_extent_rec))); + } + di->i_xattr_inline_size = cpu_to_le16(xattrsize); + + spin_lock(&oi->ip_lock); + oi->ip_dyn_features |= OCFS2_INLINE_XATTR_FL|OCFS2_HAS_XATTR_FL; + di->i_dyn_features = cpu_to_le16(oi->ip_dyn_features); + spin_unlock(&oi->ip_lock); + + ret = ocfs2_journal_dirty(ctxt->handle, di_bh); + if (ret < 0) + mlog_errno(ret); + +out: + return ret; +} + /* * ocfs2_xattr_ibody_set() * @@ -2160,9 +2743,10 @@ static int ocfs2_xattr_ibody_set(struct inode *inode, struct ocfs2_xattr_search *xs, struct ocfs2_xattr_set_ctxt *ctxt) { + int ret; struct ocfs2_inode_info *oi = OCFS2_I(inode); struct ocfs2_dinode *di = (struct ocfs2_dinode *)xs->inode_bh->b_data; - int ret; + struct ocfs2_xa_loc loc; if (inode->i_sb->s_blocksize == OCFS2_MIN_BLOCKSIZE) return -ENOSPC; @@ -2175,8 +2759,25 @@ static int ocfs2_xattr_ibody_set(struct inode *inode, } } - ret = ocfs2_xattr_set_entry(inode, xi, xs, ctxt, - (OCFS2_INLINE_XATTR_FL | OCFS2_HAS_XATTR_FL)); + if (!(oi->ip_dyn_features & OCFS2_INLINE_XATTR_FL)) { + ret = ocfs2_xattr_ibody_init(inode, xs->inode_bh, ctxt); + if (ret) { + if (ret != -ENOSPC) + mlog_errno(ret); + goto out; + } + } + + ocfs2_init_dinode_xa_loc(&loc, inode, xs->inode_bh, + xs->not_found ? NULL : xs->here); + ret = ocfs2_xa_set(&loc, xi, ctxt); + if (ret) { + if (ret != -ENOSPC) + mlog_errno(ret); + goto out; + } + xs->here = loc.xl_entry; + out: up_write(&oi->ip_alloc_sem); @@ -2236,12 +2837,11 @@ cleanup: return ret; } -static int ocfs2_create_xattr_block(handle_t *handle, - struct inode *inode, +static int ocfs2_create_xattr_block(struct inode *inode, struct buffer_head *inode_bh, - struct ocfs2_alloc_context *meta_ac, - struct buffer_head **ret_bh, - int indexed) + struct ocfs2_xattr_set_ctxt *ctxt, + int indexed, + struct buffer_head **ret_bh) { int ret; u16 suballoc_bit_start; @@ -2252,14 +2852,14 @@ static int ocfs2_create_xattr_block(handle_t *handle, struct buffer_head *new_bh = NULL; struct ocfs2_xattr_block *xblk; - ret = ocfs2_journal_access_di(handle, INODE_CACHE(inode), inode_bh, - OCFS2_JOURNAL_ACCESS_CREATE); + ret = ocfs2_journal_access_di(ctxt->handle, INODE_CACHE(inode), + inode_bh, OCFS2_JOURNAL_ACCESS_CREATE); if (ret < 0) { mlog_errno(ret); goto end; } - ret = ocfs2_claim_metadata(osb, handle, meta_ac, 1, + ret = ocfs2_claim_metadata(osb, ctxt->handle, ctxt->meta_ac, 1, &suballoc_bit_start, &num_got, &first_blkno); if (ret < 0) { @@ -2270,7 +2870,7 @@ static int ocfs2_create_xattr_block(handle_t *handle, new_bh = sb_getblk(inode->i_sb, first_blkno); ocfs2_set_new_buffer_uptodate(INODE_CACHE(inode), new_bh); - ret = ocfs2_journal_access_xb(handle, INODE_CACHE(inode), + ret = ocfs2_journal_access_xb(ctxt->handle, INODE_CACHE(inode), new_bh, OCFS2_JOURNAL_ACCESS_CREATE); if (ret < 0) { @@ -2282,11 +2882,10 @@ static int ocfs2_create_xattr_block(handle_t *handle, xblk = (struct ocfs2_xattr_block *)new_bh->b_data; memset(xblk, 0, inode->i_sb->s_blocksize); strcpy((void *)xblk, OCFS2_XATTR_BLOCK_SIGNATURE); - xblk->xb_suballoc_slot = cpu_to_le16(osb->slot_num); + xblk->xb_suballoc_slot = cpu_to_le16(ctxt->meta_ac->ac_alloc_slot); xblk->xb_suballoc_bit = cpu_to_le16(suballoc_bit_start); xblk->xb_fs_generation = cpu_to_le32(osb->fs_generation); xblk->xb_blkno = cpu_to_le64(first_blkno); - if (indexed) { struct ocfs2_xattr_tree_root *xr = &xblk->xb_attrs.xb_root; xr->xt_clusters = cpu_to_le32(1); @@ -2297,14 +2896,17 @@ static int ocfs2_create_xattr_block(handle_t *handle, xr->xt_list.l_next_free_rec = cpu_to_le16(1); xblk->xb_flags = cpu_to_le16(OCFS2_XATTR_INDEXED); } + ocfs2_journal_dirty(ctxt->handle, new_bh); - ret = ocfs2_journal_dirty(handle, new_bh); - if (ret < 0) { - mlog_errno(ret); - goto end; - } + /* Add it to the inode */ di->i_xattr_loc = cpu_to_le64(first_blkno); - ocfs2_journal_dirty(handle, inode_bh); + + spin_lock(&OCFS2_I(inode)->ip_lock); + OCFS2_I(inode)->ip_dyn_features |= OCFS2_HAS_XATTR_FL; + di->i_dyn_features = cpu_to_le16(OCFS2_I(inode)->ip_dyn_features); + spin_unlock(&OCFS2_I(inode)->ip_lock); + + ocfs2_journal_dirty(ctxt->handle, inode_bh); *ret_bh = new_bh; new_bh = NULL; @@ -2326,13 +2928,13 @@ static int ocfs2_xattr_block_set(struct inode *inode, struct ocfs2_xattr_set_ctxt *ctxt) { struct buffer_head *new_bh = NULL; - handle_t *handle = ctxt->handle; struct ocfs2_xattr_block *xblk = NULL; int ret; + struct ocfs2_xa_loc loc; if (!xs->xattr_bh) { - ret = ocfs2_create_xattr_block(handle, inode, xs->inode_bh, - ctxt->meta_ac, &new_bh, 0); + ret = ocfs2_create_xattr_block(inode, xs->inode_bh, ctxt, + 0, &new_bh); if (ret) { mlog_errno(ret); goto end; @@ -2348,21 +2950,25 @@ static int ocfs2_xattr_block_set(struct inode *inode, xblk = (struct ocfs2_xattr_block *)xs->xattr_bh->b_data; if (!(le16_to_cpu(xblk->xb_flags) & OCFS2_XATTR_INDEXED)) { - /* Set extended attribute into external block */ - ret = ocfs2_xattr_set_entry(inode, xi, xs, ctxt, - OCFS2_HAS_XATTR_FL); - if (!ret || ret != -ENOSPC) - goto end; + ocfs2_init_xattr_block_xa_loc(&loc, inode, xs->xattr_bh, + xs->not_found ? NULL : xs->here); - ret = ocfs2_xattr_create_index_block(inode, xs, ctxt); - if (ret) + ret = ocfs2_xa_set(&loc, xi, ctxt); + if (!ret) + xs->here = loc.xl_entry; + else if (ret != -ENOSPC) goto end; + else { + ret = ocfs2_xattr_create_index_block(inode, xs, ctxt); + if (ret) + goto end; + } } - ret = ocfs2_xattr_set_entry_index_block(inode, xi, xs, ctxt); + if (le16_to_cpu(xblk->xb_flags) & OCFS2_XATTR_INDEXED) + ret = ocfs2_xattr_set_entry_index_block(inode, xi, xs, ctxt); end: - return ret; } @@ -2371,7 +2977,6 @@ static int ocfs2_xattr_can_be_in_inode(struct inode *inode, struct ocfs2_xattr_info *xi, struct ocfs2_xattr_search *xs) { - u64 value_size; struct ocfs2_xattr_entry *last; int free, i; size_t min_offs = xs->end - xs->base; @@ -2394,13 +2999,7 @@ static int ocfs2_xattr_can_be_in_inode(struct inode *inode, BUG_ON(!xs->not_found); - if (xi->value_len > OCFS2_XATTR_INLINE_SIZE) - value_size = OCFS2_XATTR_ROOT_SIZE; - else - value_size = OCFS2_XATTR_SIZE(xi->value_len); - - if (free >= sizeof(struct ocfs2_xattr_entry) + - OCFS2_XATTR_SIZE(strlen(xi->name)) + value_size) + if (free >= (sizeof(struct ocfs2_xattr_entry) + namevalue_size_xi(xi))) return 1; return 0; @@ -2424,7 +3023,7 @@ static int ocfs2_calc_xattr_set_need(struct inode *inode, char *base = NULL; int name_offset, name_len = 0; u32 new_clusters = ocfs2_clusters_for_bytes(inode->i_sb, - xi->value_len); + xi->xi_value_len); u64 value_size; /* @@ -2432,14 +3031,14 @@ static int ocfs2_calc_xattr_set_need(struct inode *inode, * No matter whether we replace an old one or add a new one, * we need this for writing. */ - if (xi->value_len > OCFS2_XATTR_INLINE_SIZE) + if (xi->xi_value_len > OCFS2_XATTR_INLINE_SIZE) credits += new_clusters * ocfs2_clusters_to_blocks(inode->i_sb, 1); if (xis->not_found && xbs->not_found) { credits += ocfs2_blocks_per_xattr_bucket(inode->i_sb); - if (xi->value_len > OCFS2_XATTR_INLINE_SIZE) { + if (xi->xi_value_len > OCFS2_XATTR_INLINE_SIZE) { clusters_add += new_clusters; credits += ocfs2_calc_extend_credits(inode->i_sb, &def_xv.xv.xr_list, @@ -2484,7 +3083,7 @@ static int ocfs2_calc_xattr_set_need(struct inode *inode, * The credits for removing the value tree will be extended * by ocfs2_remove_extent itself. */ - if (!xi->value) { + if (!xi->xi_value) { if (!ocfs2_xattr_is_local(xe)) credits += ocfs2_remove_extent_credits(inode->i_sb); @@ -2514,7 +3113,7 @@ static int ocfs2_calc_xattr_set_need(struct inode *inode, } } - if (xi->value_len > OCFS2_XATTR_INLINE_SIZE) { + if (xi->xi_value_len > OCFS2_XATTR_INLINE_SIZE) { /* the new values will be stored outside. */ u32 old_clusters = 0; @@ -2547,9 +3146,10 @@ static int ocfs2_calc_xattr_set_need(struct inode *inode, * value, we don't need any allocation, otherwise we have * to guess metadata allocation. */ - if ((ocfs2_xattr_is_local(xe) && value_size >= xi->value_len) || + if ((ocfs2_xattr_is_local(xe) && + (value_size >= xi->xi_value_len)) || (!ocfs2_xattr_is_local(xe) && - OCFS2_XATTR_ROOT_SIZE >= xi->value_len)) + OCFS2_XATTR_ROOT_SIZE >= xi->xi_value_len)) goto out; } @@ -2639,7 +3239,7 @@ static int ocfs2_init_xattr_set_ctxt(struct inode *inode, meta_add += extra_meta; mlog(0, "Set xattr %s, reserve meta blocks = %d, clusters = %d, " - "credits = %d\n", xi->name, meta_add, clusters_add, *credits); + "credits = %d\n", xi->xi_name, meta_add, clusters_add, *credits); if (meta_add) { ret = ocfs2_reserve_new_metadata_blocks(osb, meta_add, @@ -2679,7 +3279,7 @@ static int __ocfs2_xattr_set_handle(struct inode *inode, { int ret = 0, credits, old_found; - if (!xi->value) { + if (!xi->xi_value) { /* Remove existing extended attribute */ if (!xis->not_found) ret = ocfs2_xattr_ibody_set(inode, xi, xis, ctxt); @@ -2693,8 +3293,8 @@ static int __ocfs2_xattr_set_handle(struct inode *inode, * If succeed and that extended attribute existing in * external block, then we will remove it. */ - xi->value = NULL; - xi->value_len = 0; + xi->xi_value = NULL; + xi->xi_value_len = 0; old_found = xis->not_found; xis->not_found = -ENODATA; @@ -2722,8 +3322,8 @@ static int __ocfs2_xattr_set_handle(struct inode *inode, } else if (ret == -ENOSPC) { if (di->i_xattr_loc && !xbs->xattr_bh) { ret = ocfs2_xattr_block_find(inode, - xi->name_index, - xi->name, xbs); + xi->xi_name_index, + xi->xi_name, xbs); if (ret) goto out; @@ -2762,8 +3362,8 @@ static int __ocfs2_xattr_set_handle(struct inode *inode, * If succeed and that extended attribute * existing in inode, we will remove it. */ - xi->value = NULL; - xi->value_len = 0; + xi->xi_value = NULL; + xi->xi_value_len = 0; xbs->not_found = -ENODATA; ret = ocfs2_calc_xattr_set_need(inode, di, @@ -2829,10 +3429,11 @@ int ocfs2_xattr_set_handle(handle_t *handle, int ret; struct ocfs2_xattr_info xi = { - .name_index = name_index, - .name = name, - .value = value, - .value_len = value_len, + .xi_name_index = name_index, + .xi_name = name, + .xi_name_len = strlen(name), + .xi_value = value, + .xi_value_len = value_len, }; struct ocfs2_xattr_search xis = { @@ -2912,10 +3513,11 @@ int ocfs2_xattr_set(struct inode *inode, struct ocfs2_refcount_tree *ref_tree = NULL; struct ocfs2_xattr_info xi = { - .name_index = name_index, - .name = name, - .value = value, - .value_len = value_len, + .xi_name_index = name_index, + .xi_name = name, + .xi_name_len = strlen(name), + .xi_value = value, + .xi_value_len = value_len, }; struct ocfs2_xattr_search xis = { @@ -3759,7 +4361,7 @@ static int ocfs2_defrag_xattr_bucket(struct inode *inode, struct ocfs2_xattr_bucket *bucket) { int ret, i; - size_t end, offset, len, value_len; + size_t end, offset, len; struct ocfs2_xattr_header *xh; char *entries, *buf, *bucket_buf = NULL; u64 blkno = bucket_blkno(bucket); @@ -3813,12 +4415,7 @@ static int ocfs2_defrag_xattr_bucket(struct inode *inode, end = OCFS2_XATTR_BUCKET_SIZE; for (i = 0; i < le16_to_cpu(xh->xh_count); i++, xe++) { offset = le16_to_cpu(xe->xe_name_offset); - if (ocfs2_xattr_is_local(xe)) - value_len = OCFS2_XATTR_SIZE( - le64_to_cpu(xe->xe_value_size)); - else - value_len = OCFS2_XATTR_ROOT_SIZE; - len = OCFS2_XATTR_SIZE(xe->xe_name_len) + value_len; + len = namevalue_size_xe(xe); /* * We must make sure that the name/value pair @@ -4007,7 +4604,7 @@ static int ocfs2_divide_xattr_bucket(struct inode *inode, int new_bucket_head) { int ret, i; - int count, start, len, name_value_len = 0, xe_len, name_offset = 0; + int count, start, len, name_value_len = 0, name_offset = 0; struct ocfs2_xattr_bucket *s_bucket = NULL, *t_bucket = NULL; struct ocfs2_xattr_header *xh; struct ocfs2_xattr_entry *xe; @@ -4098,13 +4695,7 @@ static int ocfs2_divide_xattr_bucket(struct inode *inode, name_value_len = 0; for (i = 0; i < start; i++) { xe = &xh->xh_entries[i]; - xe_len = OCFS2_XATTR_SIZE(xe->xe_name_len); - if (ocfs2_xattr_is_local(xe)) - xe_len += - OCFS2_XATTR_SIZE(le64_to_cpu(xe->xe_value_size)); - else - xe_len += OCFS2_XATTR_ROOT_SIZE; - name_value_len += xe_len; + name_value_len += namevalue_size_xe(xe); if (le16_to_cpu(xe->xe_name_offset) < name_offset) name_offset = le16_to_cpu(xe->xe_name_offset); } @@ -4134,12 +4725,6 @@ static int ocfs2_divide_xattr_bucket(struct inode *inode, xh->xh_free_start = cpu_to_le16(OCFS2_XATTR_BUCKET_SIZE); for (i = 0; i < le16_to_cpu(xh->xh_count); i++) { xe = &xh->xh_entries[i]; - xe_len = OCFS2_XATTR_SIZE(xe->xe_name_len); - if (ocfs2_xattr_is_local(xe)) - xe_len += - OCFS2_XATTR_SIZE(le64_to_cpu(xe->xe_value_size)); - else - xe_len += OCFS2_XATTR_ROOT_SIZE; if (le16_to_cpu(xe->xe_name_offset) < le16_to_cpu(xh->xh_free_start)) xh->xh_free_start = xe->xe_name_offset; @@ -4751,195 +5336,6 @@ static inline char *ocfs2_xattr_bucket_get_val(struct inode *inode, } /* - * Handle the normal xattr set, including replace, delete and new. - * - * Note: "local" indicates the real data's locality. So we can't - * just its bucket locality by its length. - */ -static void ocfs2_xattr_set_entry_normal(struct inode *inode, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - u32 name_hash, - int local) -{ - struct ocfs2_xattr_entry *last, *xe; - int name_len = strlen(xi->name); - struct ocfs2_xattr_header *xh = xs->header; - u16 count = le16_to_cpu(xh->xh_count), start; - size_t blocksize = inode->i_sb->s_blocksize; - char *val; - size_t offs, size, new_size; - - last = &xh->xh_entries[count]; - if (!xs->not_found) { - xe = xs->here; - offs = le16_to_cpu(xe->xe_name_offset); - if (ocfs2_xattr_is_local(xe)) - size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(le64_to_cpu(xe->xe_value_size)); - else - size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(OCFS2_XATTR_ROOT_SIZE); - - /* - * If the new value will be stored outside, xi->value has been - * initalized as an empty ocfs2_xattr_value_root, and the same - * goes with xi->value_len, so we can set new_size safely here. - * See ocfs2_xattr_set_in_bucket. - */ - new_size = OCFS2_XATTR_SIZE(name_len) + - OCFS2_XATTR_SIZE(xi->value_len); - - le16_add_cpu(&xh->xh_name_value_len, -size); - if (xi->value) { - if (new_size > size) - goto set_new_name_value; - - /* Now replace the old value with new one. */ - if (local) - xe->xe_value_size = cpu_to_le64(xi->value_len); - else - xe->xe_value_size = 0; - - val = ocfs2_xattr_bucket_get_val(inode, - xs->bucket, offs); - memset(val + OCFS2_XATTR_SIZE(name_len), 0, - size - OCFS2_XATTR_SIZE(name_len)); - if (OCFS2_XATTR_SIZE(xi->value_len) > 0) - memcpy(val + OCFS2_XATTR_SIZE(name_len), - xi->value, xi->value_len); - - le16_add_cpu(&xh->xh_name_value_len, new_size); - ocfs2_xattr_set_local(xe, local); - return; - } else { - /* - * Remove the old entry if there is more than one. - * We don't remove the last entry so that we can - * use it to indicate the hash value of the empty - * bucket. - */ - last -= 1; - le16_add_cpu(&xh->xh_count, -1); - if (xh->xh_count) { - memmove(xe, xe + 1, - (void *)last - (void *)xe); - memset(last, 0, - sizeof(struct ocfs2_xattr_entry)); - } else - xh->xh_free_start = - cpu_to_le16(OCFS2_XATTR_BUCKET_SIZE); - - return; - } - } else { - /* find a new entry for insert. */ - int low = 0, high = count - 1, tmp; - struct ocfs2_xattr_entry *tmp_xe; - - while (low <= high && count) { - tmp = (low + high) / 2; - tmp_xe = &xh->xh_entries[tmp]; - - if (name_hash > le32_to_cpu(tmp_xe->xe_name_hash)) - low = tmp + 1; - else if (name_hash < - le32_to_cpu(tmp_xe->xe_name_hash)) - high = tmp - 1; - else { - low = tmp; - break; - } - } - - xe = &xh->xh_entries[low]; - if (low != count) - memmove(xe + 1, xe, (void *)last - (void *)xe); - - le16_add_cpu(&xh->xh_count, 1); - memset(xe, 0, sizeof(struct ocfs2_xattr_entry)); - xe->xe_name_hash = cpu_to_le32(name_hash); - xe->xe_name_len = name_len; - ocfs2_xattr_set_type(xe, xi->name_index); - } - -set_new_name_value: - /* Insert the new name+value. */ - size = OCFS2_XATTR_SIZE(name_len) + OCFS2_XATTR_SIZE(xi->value_len); - - /* - * We must make sure that the name/value pair - * exists in the same block. - */ - offs = le16_to_cpu(xh->xh_free_start); - start = offs - size; - - if (start >> inode->i_sb->s_blocksize_bits != - (offs - 1) >> inode->i_sb->s_blocksize_bits) { - offs = offs - offs % blocksize; - xh->xh_free_start = cpu_to_le16(offs); - } - - val = ocfs2_xattr_bucket_get_val(inode, xs->bucket, offs - size); - xe->xe_name_offset = cpu_to_le16(offs - size); - - memset(val, 0, size); - memcpy(val, xi->name, name_len); - memcpy(val + OCFS2_XATTR_SIZE(name_len), xi->value, xi->value_len); - - xe->xe_value_size = cpu_to_le64(xi->value_len); - ocfs2_xattr_set_local(xe, local); - xs->here = xe; - le16_add_cpu(&xh->xh_free_start, -size); - le16_add_cpu(&xh->xh_name_value_len, size); - - return; -} - -/* - * Set the xattr entry in the specified bucket. - * The bucket is indicated by xs->bucket and it should have the enough - * space for the xattr insertion. - */ -static int ocfs2_xattr_set_entry_in_bucket(struct inode *inode, - handle_t *handle, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - u32 name_hash, - int local) -{ - int ret; - u64 blkno; - - mlog(0, "Set xattr entry len = %lu index = %d in bucket %llu\n", - (unsigned long)xi->value_len, xi->name_index, - (unsigned long long)bucket_blkno(xs->bucket)); - - if (!xs->bucket->bu_bhs[1]) { - blkno = bucket_blkno(xs->bucket); - ocfs2_xattr_bucket_relse(xs->bucket); - ret = ocfs2_read_xattr_bucket(xs->bucket, blkno); - if (ret) { - mlog_errno(ret); - goto out; - } - } - - ret = ocfs2_xattr_bucket_journal_access(handle, xs->bucket, - OCFS2_JOURNAL_ACCESS_WRITE); - if (ret < 0) { - mlog_errno(ret); - goto out; - } - - ocfs2_xattr_set_entry_normal(inode, xi, xs, name_hash, local); - ocfs2_xattr_bucket_journal_dirty(handle, xs->bucket); - -out: - return ret; -} - -/* * Truncate the specified xe_off entry in xattr bucket. * bucket is indicated by header_bh and len is the new length. * Both the ocfs2_xattr_value_root and the entry will be updated here. @@ -5009,66 +5405,6 @@ out: return ret; } -static int ocfs2_xattr_bucket_value_truncate_xs(struct inode *inode, - struct ocfs2_xattr_search *xs, - int len, - struct ocfs2_xattr_set_ctxt *ctxt) -{ - int ret, offset; - struct ocfs2_xattr_entry *xe = xs->here; - struct ocfs2_xattr_header *xh = (struct ocfs2_xattr_header *)xs->base; - - BUG_ON(!xs->bucket->bu_bhs[0] || !xe || ocfs2_xattr_is_local(xe)); - - offset = xe - xh->xh_entries; - ret = ocfs2_xattr_bucket_value_truncate(inode, xs->bucket, - offset, len, ctxt); - if (ret) - mlog_errno(ret); - - return ret; -} - -static int ocfs2_xattr_bucket_set_value_outside(struct inode *inode, - handle_t *handle, - struct ocfs2_xattr_search *xs, - char *val, - int value_len) -{ - int ret, offset, block_off; - struct ocfs2_xattr_value_root *xv; - struct ocfs2_xattr_entry *xe = xs->here; - struct ocfs2_xattr_header *xh = bucket_xh(xs->bucket); - void *base; - struct ocfs2_xattr_value_buf vb = { - .vb_access = ocfs2_journal_access, - }; - - BUG_ON(!xs->base || !xe || ocfs2_xattr_is_local(xe)); - - ret = ocfs2_xattr_bucket_get_name_value(inode->i_sb, xh, - xe - xh->xh_entries, - &block_off, - &offset); - if (ret) { - mlog_errno(ret); - goto out; - } - - base = bucket_block(xs->bucket, block_off); - xv = (struct ocfs2_xattr_value_root *)(base + offset + - OCFS2_XATTR_SIZE(xe->xe_name_len)); - - vb.vb_xv = xv; - vb.vb_bh = xs->bucket->bu_bhs[block_off]; - ret = __ocfs2_xattr_set_value_outside(inode, handle, - &vb, val, value_len); - if (ret) - mlog_errno(ret); -out: - return ret; -} - static int ocfs2_rm_xattr_cluster(struct inode *inode, struct buffer_head *root_bh, u64 blkno, @@ -5167,128 +5503,6 @@ out: return ret; } -static void ocfs2_xattr_bucket_remove_xs(struct inode *inode, - handle_t *handle, - struct ocfs2_xattr_search *xs) -{ - struct ocfs2_xattr_header *xh = bucket_xh(xs->bucket); - struct ocfs2_xattr_entry *last = &xh->xh_entries[ - le16_to_cpu(xh->xh_count) - 1]; - int ret = 0; - - ret = ocfs2_xattr_bucket_journal_access(handle, xs->bucket, - OCFS2_JOURNAL_ACCESS_WRITE); - if (ret) { - mlog_errno(ret); - return; - } - - /* Remove the old entry. */ - memmove(xs->here, xs->here + 1, - (void *)last - (void *)xs->here); - memset(last, 0, sizeof(struct ocfs2_xattr_entry)); - le16_add_cpu(&xh->xh_count, -1); - - ocfs2_xattr_bucket_journal_dirty(handle, xs->bucket); -} - -/* - * Set the xattr name/value in the bucket specified in xs. - * - * As the new value in xi may be stored in the bucket or in an outside cluster, - * we divide the whole process into 3 steps: - * 1. insert name/value in the bucket(ocfs2_xattr_set_entry_in_bucket) - * 2. truncate of the outside cluster(ocfs2_xattr_bucket_value_truncate_xs) - * 3. Set the value to the outside cluster(ocfs2_xattr_bucket_set_value_outside) - * 4. If the clusters for the new outside value can't be allocated, we need - * to free the xattr we allocated in set. - */ -static int ocfs2_xattr_set_in_bucket(struct inode *inode, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - struct ocfs2_xattr_set_ctxt *ctxt) -{ - int ret, local = 1; - size_t value_len; - char *val = (char *)xi->value; - struct ocfs2_xattr_entry *xe = xs->here; - u32 name_hash = ocfs2_xattr_name_hash(inode, xi->name, - strlen(xi->name)); - - if (!xs->not_found && !ocfs2_xattr_is_local(xe)) { - /* - * We need to truncate the xattr storage first. - * - * If both the old and new value are stored to - * outside block, we only need to truncate - * the storage and then set the value outside. - * - * If the new value should be stored within block, - * we should free all the outside block first and - * the modification to the xattr block will be done - * by following steps. - */ - if (xi->value_len > OCFS2_XATTR_INLINE_SIZE) - value_len = xi->value_len; - else - value_len = 0; - - ret = ocfs2_xattr_bucket_value_truncate_xs(inode, xs, - value_len, - ctxt); - if (ret) - goto out; - - if (value_len) - goto set_value_outside; - } - - value_len = xi->value_len; - /* So we have to handle the inside block change now. */ - if (value_len > OCFS2_XATTR_INLINE_SIZE) { - /* - * If the new value will be stored outside of block, - * initalize a new empty value root and insert it first. - */ - local = 0; - xi->value = &def_xv; - xi->value_len = OCFS2_XATTR_ROOT_SIZE; - } - - ret = ocfs2_xattr_set_entry_in_bucket(inode, ctxt->handle, xi, xs, - name_hash, local); - if (ret) { - mlog_errno(ret); - goto out; - } - - if (value_len <= OCFS2_XATTR_INLINE_SIZE) - goto out; - - /* allocate the space now for the outside block storage. */ - ret = ocfs2_xattr_bucket_value_truncate_xs(inode, xs, - value_len, ctxt); - if (ret) { - mlog_errno(ret); - - if (xs->not_found) { - /* - * We can't allocate enough clusters for outside - * storage and we have allocated xattr already, - * so need to remove it. - */ - ocfs2_xattr_bucket_remove_xs(inode, ctxt->handle, xs); - } - goto out; - } - -set_value_outside: - ret = ocfs2_xattr_bucket_set_value_outside(inode, ctxt->handle, - xs, val, value_len); -out: - return ret; -} - /* * check whether the xattr bucket is filled up with the same hash value. * If we want to insert the xattr with the same hash, return -ENOSPC. @@ -5317,156 +5531,116 @@ static int ocfs2_check_xattr_bucket_collision(struct inode *inode, return 0; } -static int ocfs2_xattr_set_entry_index_block(struct inode *inode, - struct ocfs2_xattr_info *xi, - struct ocfs2_xattr_search *xs, - struct ocfs2_xattr_set_ctxt *ctxt) +/* + * Try to set the entry in the current bucket. If we fail, the caller + * will handle getting us another bucket. + */ +static int ocfs2_xattr_set_entry_bucket(struct inode *inode, + struct ocfs2_xattr_info *xi, + struct ocfs2_xattr_search *xs, + struct ocfs2_xattr_set_ctxt *ctxt) { - struct ocfs2_xattr_header *xh; - struct ocfs2_xattr_entry *xe; - u16 count, header_size, xh_free_start; - int free, max_free, need, old; - size_t value_size = 0, name_len = strlen(xi->name); - size_t blocksize = inode->i_sb->s_blocksize; - int ret, allocation = 0; - - mlog_entry("Set xattr %s in xattr index block\n", xi->name); - -try_again: - xh = xs->header; - count = le16_to_cpu(xh->xh_count); - xh_free_start = le16_to_cpu(xh->xh_free_start); - header_size = sizeof(struct ocfs2_xattr_header) + - count * sizeof(struct ocfs2_xattr_entry); - max_free = OCFS2_XATTR_BUCKET_SIZE - header_size - - le16_to_cpu(xh->xh_name_value_len) - OCFS2_XATTR_HEADER_GAP; - - mlog_bug_on_msg(header_size > blocksize, "bucket %llu has header size " - "of %u which exceed block size\n", - (unsigned long long)bucket_blkno(xs->bucket), - header_size); + int ret; + struct ocfs2_xa_loc loc; - if (xi->value && xi->value_len > OCFS2_XATTR_INLINE_SIZE) - value_size = OCFS2_XATTR_ROOT_SIZE; - else if (xi->value) - value_size = OCFS2_XATTR_SIZE(xi->value_len); + mlog_entry("Set xattr %s in xattr bucket\n", xi->xi_name); - if (xs->not_found) - need = sizeof(struct ocfs2_xattr_entry) + - OCFS2_XATTR_SIZE(name_len) + value_size; - else { - need = value_size + OCFS2_XATTR_SIZE(name_len); + ocfs2_init_xattr_bucket_xa_loc(&loc, xs->bucket, + xs->not_found ? NULL : xs->here); + ret = ocfs2_xa_set(&loc, xi, ctxt); + if (!ret) { + xs->here = loc.xl_entry; + goto out; + } + if (ret != -ENOSPC) { + mlog_errno(ret); + goto out; + } - /* - * We only replace the old value if the new length is smaller - * than the old one. Otherwise we will allocate new space in the - * bucket to store it. - */ - xe = xs->here; - if (ocfs2_xattr_is_local(xe)) - old = OCFS2_XATTR_SIZE(le64_to_cpu(xe->xe_value_size)); - else - old = OCFS2_XATTR_SIZE(OCFS2_XATTR_ROOT_SIZE); + /* Ok, we need space. Let's try defragmenting the bucket. */ + ret = ocfs2_defrag_xattr_bucket(inode, ctxt->handle, + xs->bucket); + if (ret) { + mlog_errno(ret); + goto out; + } - if (old >= value_size) - need = 0; + ret = ocfs2_xa_set(&loc, xi, ctxt); + if (!ret) { + xs->here = loc.xl_entry; + goto out; } + if (ret != -ENOSPC) + mlog_errno(ret); - free = xh_free_start - header_size - OCFS2_XATTR_HEADER_GAP; - /* - * We need to make sure the new name/value pair - * can exist in the same block. - */ - if (xh_free_start % blocksize < need) - free -= xh_free_start % blocksize; - - mlog(0, "xs->not_found = %d, in xattr bucket %llu: free = %d, " - "need = %d, max_free = %d, xh_free_start = %u, xh_name_value_len =" - " %u\n", xs->not_found, - (unsigned long long)bucket_blkno(xs->bucket), - free, need, max_free, le16_to_cpu(xh->xh_free_start), - le16_to_cpu(xh->xh_name_value_len)); - - if (free < need || - (xs->not_found && - count == ocfs2_xattr_max_xe_in_bucket(inode->i_sb))) { - if (need <= max_free && - count < ocfs2_xattr_max_xe_in_bucket(inode->i_sb)) { - /* - * We can create the space by defragment. Since only the - * name/value will be moved, the xe shouldn't be changed - * in xs. - */ - ret = ocfs2_defrag_xattr_bucket(inode, ctxt->handle, - xs->bucket); - if (ret) { - mlog_errno(ret); - goto out; - } - xh_free_start = le16_to_cpu(xh->xh_free_start); - free = xh_free_start - header_size - - OCFS2_XATTR_HEADER_GAP; - if (xh_free_start % blocksize < need) - free -= xh_free_start % blocksize; +out: + mlog_exit(ret); + return ret; +} - if (free >= need) - goto xattr_set; +static int ocfs2_xattr_set_entry_index_block(struct inode *inode, + struct ocfs2_xattr_info *xi, + struct ocfs2_xattr_search *xs, + struct ocfs2_xattr_set_ctxt *ctxt) +{ + int ret; - mlog(0, "Can't get enough space for xattr insert by " - "defragment. Need %u bytes, but we have %d, so " - "allocate new bucket for it.\n", need, free); - } + mlog_entry("Set xattr %s in xattr index block\n", xi->xi_name); - /* - * We have to add new buckets or clusters and one - * allocation should leave us enough space for insert. - */ - BUG_ON(allocation); + ret = ocfs2_xattr_set_entry_bucket(inode, xi, xs, ctxt); + if (!ret) + goto out; + if (ret != -ENOSPC) { + mlog_errno(ret); + goto out; + } - /* - * We do not allow for overlapping ranges between buckets. And - * the maximum number of collisions we will allow for then is - * one bucket's worth, so check it here whether we need to - * add a new bucket for the insert. - */ - ret = ocfs2_check_xattr_bucket_collision(inode, - xs->bucket, - xi->name); - if (ret) { - mlog_errno(ret); - goto out; - } + /* Ack, need more space. Let's try to get another bucket! */ - ret = ocfs2_add_new_xattr_bucket(inode, - xs->xattr_bh, + /* + * We do not allow for overlapping ranges between buckets. And + * the maximum number of collisions we will allow for then is + * one bucket's worth, so check it here whether we need to + * add a new bucket for the insert. + */ + ret = ocfs2_check_xattr_bucket_collision(inode, xs->bucket, - ctxt); - if (ret) { - mlog_errno(ret); - goto out; - } + xi->xi_name); + if (ret) { + mlog_errno(ret); + goto out; + } - /* - * ocfs2_add_new_xattr_bucket() will have updated - * xs->bucket if it moved, but it will not have updated - * any of the other search fields. Thus, we drop it and - * re-search. Everything should be cached, so it'll be - * quick. - */ - ocfs2_xattr_bucket_relse(xs->bucket); - ret = ocfs2_xattr_index_block_find(inode, xs->xattr_bh, - xi->name_index, - xi->name, xs); - if (ret && ret != -ENODATA) - goto out; - xs->not_found = ret; - allocation = 1; - goto try_again; + ret = ocfs2_add_new_xattr_bucket(inode, + xs->xattr_bh, + xs->bucket, + ctxt); + if (ret) { + mlog_errno(ret); + goto out; } -xattr_set: - ret = ocfs2_xattr_set_in_bucket(inode, xi, xs, ctxt); + /* + * ocfs2_add_new_xattr_bucket() will have updated + * xs->bucket if it moved, but it will not have updated + * any of the other search fields. Thus, we drop it and + * re-search. Everything should be cached, so it'll be + * quick. + */ + ocfs2_xattr_bucket_relse(xs->bucket); + ret = ocfs2_xattr_index_block_find(inode, xs->xattr_bh, + xi->xi_name_index, + xi->xi_name, xs); + if (ret && ret != -ENODATA) + goto out; + xs->not_found = ret; + + /* Ok, we have a new bucket, let's try again */ + ret = ocfs2_xattr_set_entry_bucket(inode, xi, xs, ctxt); + if (ret && (ret != -ENOSPC)) + mlog_errno(ret); + out: mlog_exit(ret); return ret; @@ -5678,7 +5852,7 @@ static int ocfs2_prepare_refcount_xattr(struct inode *inode, * refcount tree, and make the original extent become 3. So we will need * 2 * cluster more extent recs at most. */ - if (!xi->value || xi->value_len <= OCFS2_XATTR_INLINE_SIZE) { + if (!xi->xi_value || xi->xi_value_len <= OCFS2_XATTR_INLINE_SIZE) { ret = ocfs2_refcounted_xattr_delete_need(inode, &(*ref_tree)->rf_ci, @@ -6354,9 +6528,11 @@ static int ocfs2_create_empty_xattr_block(struct inode *inode, int indexed) { int ret; - handle_t *handle; struct ocfs2_alloc_context *meta_ac; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); + struct ocfs2_xattr_set_ctxt ctxt = { + .meta_ac = meta_ac, + }; ret = ocfs2_reserve_new_metadata_blocks(osb, 1, &meta_ac); if (ret < 0) { @@ -6364,21 +6540,21 @@ static int ocfs2_create_empty_xattr_block(struct inode *inode, return ret; } - handle = ocfs2_start_trans(osb, OCFS2_XATTR_BLOCK_CREATE_CREDITS); - if (IS_ERR(handle)) { - ret = PTR_ERR(handle); + ctxt.handle = ocfs2_start_trans(osb, OCFS2_XATTR_BLOCK_CREATE_CREDITS); + if (IS_ERR(ctxt.handle)) { + ret = PTR_ERR(ctxt.handle); mlog_errno(ret); goto out; } mlog(0, "create new xattr block for inode %llu, index = %d\n", (unsigned long long)fe_bh->b_blocknr, indexed); - ret = ocfs2_create_xattr_block(handle, inode, fe_bh, - meta_ac, ret_bh, indexed); + ret = ocfs2_create_xattr_block(inode, fe_bh, &ctxt, indexed, + ret_bh); if (ret) mlog_errno(ret); - ocfs2_commit_trans(osb, handle); + ocfs2_commit_trans(osb, ctxt.handle); out: ocfs2_free_alloc_context(meta_ac); return ret; |