summaryrefslogtreecommitdiff
path: root/net/rds/rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/rdma.c')
-rw-r--r--net/rds/rdma.c339
1 files changed, 226 insertions, 113 deletions
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 75fd13bb631..1a41debca1c 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -35,7 +35,7 @@
#include <linux/rbtree.h>
#include <linux/dma-mapping.h> /* for DMA_*_DEVICE */
-#include "rdma.h"
+#include "rds.h"
/*
* XXX
@@ -130,14 +130,22 @@ void rds_rdma_drop_keys(struct rds_sock *rs)
{
struct rds_mr *mr;
struct rb_node *node;
+ unsigned long flags;
/* Release any MRs associated with this socket */
+ spin_lock_irqsave(&rs->rs_rdma_lock, flags);
while ((node = rb_first(&rs->rs_rdma_keys))) {
mr = container_of(node, struct rds_mr, r_rb_node);
if (mr->r_trans == rs->rs_transport)
mr->r_invalidate = 0;
+ rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
+ RB_CLEAR_NODE(&mr->r_rb_node);
+ spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
+ rds_destroy_mr(mr);
rds_mr_put(mr);
+ spin_lock_irqsave(&rs->rs_rdma_lock, flags);
}
+ spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
if (rs->rs_transport && rs->rs_transport->flush_mrs)
rs->rs_transport->flush_mrs();
@@ -181,7 +189,7 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
goto out;
}
- if (rs->rs_transport->get_mr == NULL) {
+ if (!rs->rs_transport->get_mr) {
ret = -EOPNOTSUPP;
goto out;
}
@@ -197,13 +205,13 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
/* XXX clamp nr_pages to limit the size of this alloc? */
pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
- if (pages == NULL) {
+ if (!pages) {
ret = -ENOMEM;
goto out;
}
mr = kzalloc(sizeof(struct rds_mr), GFP_KERNEL);
- if (mr == NULL) {
+ if (!mr) {
ret = -ENOMEM;
goto out;
}
@@ -230,13 +238,13 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
* r/o or r/w. We need to assume r/w, or we'll do a lot of RDMA to
* the zero page.
*/
- ret = rds_pin_pages(args->vec.addr & PAGE_MASK, nr_pages, pages, 1);
+ ret = rds_pin_pages(args->vec.addr, nr_pages, pages, 1);
if (ret < 0)
goto out;
nents = ret;
sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
- if (sg == NULL) {
+ if (!sg) {
ret = -ENOMEM;
goto out;
}
@@ -406,68 +414,127 @@ void rds_rdma_unuse(struct rds_sock *rs, u32 r_key, int force)
spin_lock_irqsave(&rs->rs_rdma_lock, flags);
mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
- if (mr && (mr->r_use_once || force)) {
+ if (!mr) {
+ printk(KERN_ERR "rds: trying to unuse MR with unknown r_key %u!\n", r_key);
+ spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
+ return;
+ }
+
+ if (mr->r_use_once || force) {
rb_erase(&mr->r_rb_node, &rs->rs_rdma_keys);
RB_CLEAR_NODE(&mr->r_rb_node);
zot_me = 1;
- } else if (mr)
- atomic_inc(&mr->r_refcount);
+ }
spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);
/* May have to issue a dma_sync on this memory region.
* Note we could avoid this if the operation was a RDMA READ,
* but at this point we can't tell. */
- if (mr != NULL) {
- if (mr->r_trans->sync_mr)
- mr->r_trans->sync_mr(mr->r_trans_private, DMA_FROM_DEVICE);
-
- /* If the MR was marked as invalidate, this will
- * trigger an async flush. */
- if (zot_me)
- rds_destroy_mr(mr);
- rds_mr_put(mr);
- }
+ if (mr->r_trans->sync_mr)
+ mr->r_trans->sync_mr(mr->r_trans_private, DMA_FROM_DEVICE);
+
+ /* If the MR was marked as invalidate, this will
+ * trigger an async flush. */
+ if (zot_me)
+ rds_destroy_mr(mr);
+ rds_mr_put(mr);
}
-void rds_rdma_free_op(struct rds_rdma_op *ro)
+void rds_rdma_free_op(struct rm_rdma_op *ro)
{
unsigned int i;
- for (i = 0; i < ro->r_nents; i++) {
- struct page *page = sg_page(&ro->r_sg[i]);
+ for (i = 0; i < ro->op_nents; i++) {
+ struct page *page = sg_page(&ro->op_sg[i]);
/* Mark page dirty if it was possibly modified, which
* is the case for a RDMA_READ which copies from remote
* to local memory */
- if (!ro->r_write) {
- BUG_ON(in_interrupt());
+ if (!ro->op_write) {
+ BUG_ON(irqs_disabled());
set_page_dirty(page);
}
put_page(page);
}
- kfree(ro->r_notifier);
- kfree(ro);
+ kfree(ro->op_notifier);
+ ro->op_notifier = NULL;
+ ro->op_active = 0;
+}
+
+void rds_atomic_free_op(struct rm_atomic_op *ao)
+{
+ struct page *page = sg_page(ao->op_sg);
+
+ /* Mark page dirty if it was possibly modified, which
+ * is the case for a RDMA_READ which copies from remote
+ * to local memory */
+ set_page_dirty(page);
+ put_page(page);
+
+ kfree(ao->op_notifier);
+ ao->op_notifier = NULL;
+ ao->op_active = 0;
}
+
/*
- * args is a pointer to an in-kernel copy in the sendmsg cmsg.
+ * Count the number of pages needed to describe an incoming iovec.
*/
-static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
- struct rds_rdma_args *args)
+static int rds_rdma_pages(struct rds_rdma_args *args)
{
struct rds_iovec vec;
- struct rds_rdma_op *op = NULL;
+ struct rds_iovec __user *local_vec;
+ unsigned int tot_pages = 0;
unsigned int nr_pages;
- unsigned int max_pages;
+ unsigned int i;
+
+ local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
+
+ /* figure out the number of pages in the vector */
+ for (i = 0; i < args->nr_local; i++) {
+ if (copy_from_user(&vec, &local_vec[i],
+ sizeof(struct rds_iovec)))
+ return -EFAULT;
+
+ nr_pages = rds_pages_in_vec(&vec);
+ if (nr_pages == 0)
+ return -EINVAL;
+
+ tot_pages += nr_pages;
+ }
+
+ return tot_pages;
+}
+
+int rds_rdma_extra_size(struct rds_rdma_args *args)
+{
+ return rds_rdma_pages(args) * sizeof(struct scatterlist);
+}
+
+/*
+ * The application asks for a RDMA transfer.
+ * Extract all arguments and set up the rdma_op
+ */
+int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
+ struct cmsghdr *cmsg)
+{
+ struct rds_rdma_args *args;
+ struct rds_iovec vec;
+ struct rm_rdma_op *op = &rm->rdma;
+ int nr_pages;
unsigned int nr_bytes;
struct page **pages = NULL;
struct rds_iovec __user *local_vec;
- struct scatterlist *sg;
unsigned int nr;
unsigned int i, j;
- int ret;
+ int ret = 0;
+ if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args))
+ || rm->rdma.op_active)
+ return -EINVAL;
+
+ args = CMSG_DATA(cmsg);
if (rs->rs_bound_addr == 0) {
ret = -ENOTCONN; /* XXX not a great errno */
@@ -479,61 +546,38 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
goto out;
}
- nr_pages = 0;
- max_pages = 0;
-
- local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
-
- /* figure out the number of pages in the vector */
- for (i = 0; i < args->nr_local; i++) {
- if (copy_from_user(&vec, &local_vec[i],
- sizeof(struct rds_iovec))) {
- ret = -EFAULT;
- goto out;
- }
-
- nr = rds_pages_in_vec(&vec);
- if (nr == 0) {
- ret = -EINVAL;
- goto out;
- }
-
- max_pages = max(nr, max_pages);
- nr_pages += nr;
- }
-
- pages = kcalloc(max_pages, sizeof(struct page *), GFP_KERNEL);
- if (pages == NULL) {
- ret = -ENOMEM;
+ nr_pages = rds_rdma_pages(args);
+ if (nr_pages < 0)
goto out;
- }
- op = kzalloc(offsetof(struct rds_rdma_op, r_sg[nr_pages]), GFP_KERNEL);
- if (op == NULL) {
+ pages = kcalloc(nr_pages, sizeof(struct page *), GFP_KERNEL);
+ if (!pages) {
ret = -ENOMEM;
goto out;
}
- op->r_write = !!(args->flags & RDS_RDMA_READWRITE);
- op->r_fence = !!(args->flags & RDS_RDMA_FENCE);
- op->r_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
- op->r_recverr = rs->rs_recverr;
+ op->op_write = !!(args->flags & RDS_RDMA_READWRITE);
+ op->op_fence = !!(args->flags & RDS_RDMA_FENCE);
+ op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+ op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
+ op->op_active = 1;
+ op->op_recverr = rs->rs_recverr;
WARN_ON(!nr_pages);
- sg_init_table(op->r_sg, nr_pages);
+ op->op_sg = rds_message_alloc_sgs(rm, nr_pages);
- if (op->r_notify || op->r_recverr) {
+ if (op->op_notify || op->op_recverr) {
/* We allocate an uninitialized notifier here, because
* we don't want to do that in the completion handler. We
* would have to use GFP_ATOMIC there, and don't want to deal
* with failed allocations.
*/
- op->r_notifier = kmalloc(sizeof(struct rds_notifier), GFP_KERNEL);
- if (!op->r_notifier) {
+ op->op_notifier = kmalloc(sizeof(struct rds_notifier), GFP_KERNEL);
+ if (!op->op_notifier) {
ret = -ENOMEM;
goto out;
}
- op->r_notifier->n_user_token = args->user_token;
- op->r_notifier->n_status = RDS_RDMA_SUCCESS;
+ op->op_notifier->n_user_token = args->user_token;
+ op->op_notifier->n_status = RDS_RDMA_SUCCESS;
}
/* The cookie contains the R_Key of the remote memory region, and
@@ -543,15 +587,17 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
* destination address (which is really an offset into the MR)
* FIXME: We may want to move this into ib_rdma.c
*/
- op->r_key = rds_rdma_cookie_key(args->cookie);
- op->r_remote_addr = args->remote_vec.addr + rds_rdma_cookie_offset(args->cookie);
+ op->op_rkey = rds_rdma_cookie_key(args->cookie);
+ op->op_remote_addr = args->remote_vec.addr + rds_rdma_cookie_offset(args->cookie);
nr_bytes = 0;
rdsdebug("RDS: rdma prepare nr_local %llu rva %llx rkey %x\n",
(unsigned long long)args->nr_local,
(unsigned long long)args->remote_vec.addr,
- op->r_key);
+ op->op_rkey);
+
+ local_vec = (struct rds_iovec __user *)(unsigned long) args->local_vec_addr;
for (i = 0; i < args->nr_local; i++) {
if (copy_from_user(&vec, &local_vec[i],
@@ -569,15 +615,10 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
rs->rs_user_addr = vec.addr;
rs->rs_user_bytes = vec.bytes;
- /* did the user change the vec under us? */
- if (nr > max_pages || op->r_nents + nr > nr_pages) {
- ret = -EINVAL;
- goto out;
- }
/* If it's a WRITE operation, we want to pin the pages for reading.
* If it's a READ operation, we need to pin the pages for writing.
*/
- ret = rds_pin_pages(vec.addr & PAGE_MASK, nr, pages, !op->r_write);
+ ret = rds_pin_pages(vec.addr, nr, pages, !op->op_write);
if (ret < 0)
goto out;
@@ -588,8 +629,9 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
for (j = 0; j < nr; j++) {
unsigned int offset = vec.addr & ~PAGE_MASK;
+ struct scatterlist *sg;
- sg = &op->r_sg[op->r_nents + j];
+ sg = &op->op_sg[op->op_nents + j];
sg_set_page(sg, pages[j],
min_t(unsigned int, vec.bytes, PAGE_SIZE - offset),
offset);
@@ -601,10 +643,9 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
vec.bytes -= sg->length;
}
- op->r_nents += nr;
+ op->op_nents += nr;
}
-
if (nr_bytes > args->remote_vec.bytes) {
rdsdebug("RDS nr_bytes %u remote_bytes %u do not match\n",
nr_bytes,
@@ -612,38 +653,17 @@ static struct rds_rdma_op *rds_rdma_prepare(struct rds_sock *rs,
ret = -EINVAL;
goto out;
}
- op->r_bytes = nr_bytes;
+ op->op_bytes = nr_bytes;
ret = 0;
out:
kfree(pages);
- if (ret) {
- if (op)
- rds_rdma_free_op(op);
- op = ERR_PTR(ret);
- }
- return op;
-}
-
-/*
- * The application asks for a RDMA transfer.
- * Extract all arguments and set up the rdma_op
- */
-int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
- struct cmsghdr *cmsg)
-{
- struct rds_rdma_op *op;
-
- if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args)) ||
- rm->m_rdma_op != NULL)
- return -EINVAL;
+ if (ret)
+ rds_rdma_free_op(op);
- op = rds_rdma_prepare(rs, CMSG_DATA(cmsg));
- if (IS_ERR(op))
- return PTR_ERR(op);
rds_stats_inc(s_send_rdma);
- rm->m_rdma_op = op;
- return 0;
+
+ return ret;
}
/*
@@ -673,7 +693,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
spin_lock_irqsave(&rs->rs_rdma_lock, flags);
mr = rds_mr_tree_walk(&rs->rs_rdma_keys, r_key, NULL);
- if (mr == NULL)
+ if (!mr)
err = -EINVAL; /* invalid r_key */
else
atomic_inc(&mr->r_refcount);
@@ -681,7 +701,7 @@ int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
if (mr) {
mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
- rm->m_rdma_mr = mr;
+ rm->rdma.op_rdma_mr = mr;
}
return err;
}
@@ -699,5 +719,98 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
rm->m_rdma_cookie != 0)
return -EINVAL;
- return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->m_rdma_mr);
+ return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.op_rdma_mr);
+}
+
+/*
+ * Fill in rds_message for an atomic request.
+ */
+int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
+ struct cmsghdr *cmsg)
+{
+ struct page *page = NULL;
+ struct rds_atomic_args *args;
+ int ret = 0;
+
+ if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_atomic_args))
+ || rm->atomic.op_active)
+ return -EINVAL;
+
+ args = CMSG_DATA(cmsg);
+
+ /* Nonmasked & masked cmsg ops converted to masked hw ops */
+ switch (cmsg->cmsg_type) {
+ case RDS_CMSG_ATOMIC_FADD:
+ rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
+ rm->atomic.op_m_fadd.add = args->fadd.add;
+ rm->atomic.op_m_fadd.nocarry_mask = 0;
+ break;
+ case RDS_CMSG_MASKED_ATOMIC_FADD:
+ rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
+ rm->atomic.op_m_fadd.add = args->m_fadd.add;
+ rm->atomic.op_m_fadd.nocarry_mask = args->m_fadd.nocarry_mask;
+ break;
+ case RDS_CMSG_ATOMIC_CSWP:
+ rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
+ rm->atomic.op_m_cswp.compare = args->cswp.compare;
+ rm->atomic.op_m_cswp.swap = args->cswp.swap;
+ rm->atomic.op_m_cswp.compare_mask = ~0;
+ rm->atomic.op_m_cswp.swap_mask = ~0;
+ break;
+ case RDS_CMSG_MASKED_ATOMIC_CSWP:
+ rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
+ rm->atomic.op_m_cswp.compare = args->m_cswp.compare;
+ rm->atomic.op_m_cswp.swap = args->m_cswp.swap;
+ rm->atomic.op_m_cswp.compare_mask = args->m_cswp.compare_mask;
+ rm->atomic.op_m_cswp.swap_mask = args->m_cswp.swap_mask;
+ break;
+ default:
+ BUG(); /* should never happen */
+ }
+
+ rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+ rm->atomic.op_silent = !!(args->flags & RDS_RDMA_SILENT);
+ rm->atomic.op_active = 1;
+ rm->atomic.op_recverr = rs->rs_recverr;
+ rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
+
+ /* verify 8 byte-aligned */
+ if (args->local_addr & 0x7) {
+ ret = -EFAULT;
+ goto err;
+ }
+
+ ret = rds_pin_pages(args->local_addr, 1, &page, 1);
+ if (ret != 1)
+ goto err;
+ ret = 0;
+
+ sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr));
+
+ if (rm->atomic.op_notify || rm->atomic.op_recverr) {
+ /* We allocate an uninitialized notifier here, because
+ * we don't want to do that in the completion handler. We
+ * would have to use GFP_ATOMIC there, and don't want to deal
+ * with failed allocations.
+ */
+ rm->atomic.op_notifier = kmalloc(sizeof(*rm->atomic.op_notifier), GFP_KERNEL);
+ if (!rm->atomic.op_notifier) {
+ ret = -ENOMEM;
+ goto err;
+ }
+
+ rm->atomic.op_notifier->n_user_token = args->user_token;
+ rm->atomic.op_notifier->n_status = RDS_RDMA_SUCCESS;
+ }
+
+ rm->atomic.op_rkey = rds_rdma_cookie_key(args->cookie);
+ rm->atomic.op_remote_addr = args->remote_addr + rds_rdma_cookie_offset(args->cookie);
+
+ return ret;
+err:
+ if (page)
+ put_page(page);
+ kfree(rm->atomic.op_notifier);
+
+ return ret;
}