summaryrefslogtreecommitdiff
path: root/torch/csrc/distributed
diff options
context:
space:
mode:
authorAdam Paszke <adam.paszke@gmail.com>2016-11-21 22:24:52 +0100
committerAdam Paszke <adam.paszke@gmail.com>2017-01-31 01:58:09 +0100
commit55632d81d22baf572c58c26bc73f86fc37044d67 (patch)
tree5a283eba590b90d18ab82f8f888e74ab84562d51 /torch/csrc/distributed
parent0b276d622e25c58c626bee5b2c85078360eb7a78 (diff)
downloadpytorch-55632d81d22baf572c58c26bc73f86fc37044d67.tar.gz
pytorch-55632d81d22baf572c58c26bc73f86fc37044d67.tar.bz2
pytorch-55632d81d22baf572c58c26bc73f86fc37044d67.zip
Add Python wrappers for process group mode
Diffstat (limited to 'torch/csrc/distributed')
-rw-r--r--torch/csrc/distributed/Module.cpp196
-rw-r--r--torch/csrc/distributed/THDP.h10
-rw-r--r--torch/csrc/distributed/utils.cpp9
-rw-r--r--torch/csrc/distributed/utils.h6
4 files changed, 221 insertions, 0 deletions
diff --git a/torch/csrc/distributed/Module.cpp b/torch/csrc/distributed/Module.cpp
new file mode 100644
index 0000000000..af87152c88
--- /dev/null
+++ b/torch/csrc/distributed/Module.cpp
@@ -0,0 +1,196 @@
+#include <Python.h>
+
+#include <unordered_map>
+
+#include "THDP.h"
+
+static std::unordered_map<std::string, THDChannelType> name2channel_type = {
+ {"mpi", THDChannelMPI},
+ {"tcp", THDChannelTCP},
+};
+
+static std::unordered_map<PyObject*, THDReduceOp> obj2reduceop;
+
+static THPObjectPtr _ensureBytes(PyObject *obj)
+{
+#if PY_MAJOR_VERSION == 2
+ if (PyString_Check(obj)) {
+#elif PY_MAJOR_VERSION == 3
+ if (PyBytes_Check(obj)) {
+#endif
+ Py_INCREF(obj);
+ return obj;
+ }
+ if (PyUnicode_Check(obj)) {
+ return PyUnicode_AsASCIIString(obj);
+ }
+ return NULL;
+}
+
+PyObject* THDPModule_initProcessGroup(PyObject *_unused, PyObject *_backend)
+{
+ HANDLE_TH_ERRORS
+ THPObjectPtr backend_bytes = _ensureBytes(_backend);
+ THPUtils_assert(backend_bytes, "backend argument has to be a string/bytes "
+ "object, but got %s", THPUtils_typename(_backend));
+ char *backend_name = THPUtils_bytesAsString(backend_bytes.get());
+ THDChannelType channel_type = name2channel_type.at(backend_name);
+ THPUtils_assert(THDProcessGroupInit(channel_type), "failed to initialize "
+ "distributed library (THD)");
+ Py_RETURN_NONE;
+ END_HANDLE_TH_ERRORS
+}
+
+PyObject* THDPModule_getRank(PyObject *_unused)
+{
+ HANDLE_TH_ERRORS
+ return PyInt_FromLong(THDGetRank());
+ END_HANDLE_TH_ERRORS
+}
+
+PyObject* THDPModule_getNumProcesses(PyObject *_unused)
+{
+ HANDLE_TH_ERRORS
+ return PyInt_FromLong(THDGetNumProcesses());
+ END_HANDLE_TH_ERRORS
+}
+
+static THDTensorDescriptor* _makeDescriptor(PyObject *obj)
+{
+ PyObject *type = (PyObject*)Py_TYPE(obj);
+#define REGISTER_TH_DESCRIPTOR(TYPE) \
+ if (type == THP##TYPE##Class) \
+ return THDTensorDescriptor_newFromTH##TYPE(((THP##TYPE*)obj)->cdata);
+ REGISTER_TH_DESCRIPTOR(DoubleTensor);
+ REGISTER_TH_DESCRIPTOR(FloatTensor);
+ REGISTER_TH_DESCRIPTOR(LongTensor);
+ REGISTER_TH_DESCRIPTOR(IntTensor);
+ REGISTER_TH_DESCRIPTOR(ShortTensor);
+ REGISTER_TH_DESCRIPTOR(CharTensor);
+ REGISTER_TH_DESCRIPTOR(ByteTensor);
+#undef REGISTER_TH_DESCRIPTOR
+ throw std::runtime_error(std::string("don't know how to create a THDTensorDesciptor for "
+ "type ") + std::string(THPUtils_typename(obj)));
+}
+
+static THDReduceOp _getReduceOp(PyObject *obj)
+{
+ auto it = obj2reduceop.find(obj);
+ if (it == obj2reduceop.end()) {
+ throw std::runtime_error("op should be a constant from "
+ "torch.distributed.reduce_op");
+ }
+ return it->second;
+}
+
+PyObject* THDPModule_send(PyObject *_unused, PyObject *args)
+{
+ HANDLE_TH_ERRORS
+ if (PyTuple_GET_SIZE(args) != 2 || !THPModule_isTensor(PyTuple_GET_ITEM(args, 0)) ||
+ !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
+ THPUtils_invalidArguments(args, "send", 1, "(tensor input, int dst_rank)");
+ return NULL;
+ }
+
+ THDPTensorDesc desc = _makeDescriptor(PyTuple_GET_ITEM(args, 0));
+ int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
+ THDSend(desc, dst_rank);
+ Py_RETURN_NONE;
+ END_HANDLE_TH_ERRORS
+}
+
+PyObject* THDPModule_recv(PyObject *_unused, PyObject *args)
+{
+ HANDLE_TH_ERRORS
+ if (PyTuple_GET_SIZE(args) != 2 || !THPModule_isTensor(PyTuple_GET_ITEM(args, 0)) ||
+ !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
+ THPUtils_invalidArguments(args, "recv", 1, "(tensor output, int src_rank)");
+ return NULL;
+ }
+
+ THDPTensorDesc desc = _makeDescriptor(PyTuple_GET_ITEM(args, 0));
+ int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
+ THDReceive(desc, src_rank);
+ Py_RETURN_NONE;
+ END_HANDLE_TH_ERRORS
+}
+
+PyObject* THDPModule_allReduce(PyObject *_unused, PyObject *args)
+{
+ HANDLE_TH_ERRORS
+ if (PyTuple_GET_SIZE(args) != 2 || !THPModule_isTensor(PyTuple_GET_ITEM(args, 0))) {
+ THPUtils_invalidArguments(args, "all_reduce", 1, "(tensor in_out, reduce_op op)");
+ return NULL;
+ }
+
+ THDReduceOp op = _getReduceOp(PyTuple_GET_ITEM(args, 1));
+ THDPTensorDesc desc = _makeDescriptor(PyTuple_GET_ITEM(args, 0));
+ THDAllReduce(desc, op);
+ Py_RETURN_NONE;
+ END_HANDLE_TH_ERRORS
+}
+
+PyObject* THDPModule_reduce(PyObject *_unused, PyObject *args)
+{
+ HANDLE_TH_ERRORS
+ if (PyTuple_GET_SIZE(args) != 3 || !THPModule_isTensor(PyTuple_GET_ITEM(args, 0)) ||
+ !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
+ THPUtils_invalidArguments(args, "reduce", 1,
+ "(tensor reduced, int dst_rank, reduce_op op)");
+ return NULL;
+ }
+
+ THDReduceOp op = _getReduceOp(PyTuple_GET_ITEM(args, 2));
+ THDPTensorDesc desc = _makeDescriptor(PyTuple_GET_ITEM(args, 0));
+ int dst_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
+ THDReduce(desc, op, dst_rank);
+ Py_RETURN_NONE;
+ END_HANDLE_TH_ERRORS
+}
+
+PyObject* THDPModule_broadcast(PyObject *_unused, PyObject *args)
+{
+ HANDLE_TH_ERRORS
+ if (PyTuple_GET_SIZE(args) != 2 || !THPModule_isTensor(PyTuple_GET_ITEM(args, 0)) ||
+ !THPUtils_checkLong(PyTuple_GET_ITEM(args, 1))) {
+ THPUtils_invalidArguments(args, "broadcast", 1, "(tensor src_dst, int src_rank)");
+ return NULL;
+ }
+
+ THDPTensorDesc desc = _makeDescriptor(PyTuple_GET_ITEM(args, 0));
+ int src_rank = THPUtils_unpackLong(PyTuple_GET_ITEM(args, 1));
+ THDBroadcast(desc, src_rank);
+ Py_RETURN_NONE;
+ END_HANDLE_TH_ERRORS
+}
+
+PyObject* THDPModule_initExtension(PyObject *_unused, PyObject *reduce_op_obj) {
+ THPObjectPtr reduce_op;
+#define REGISTER_REDUCE_OP(NAME) \
+ reduce_op = PyObject_GetAttrString(reduce_op_obj, #NAME); \
+ THPUtils_assert(reduce_op, "Missing object for reduce op " #NAME); \
+ obj2reduceop.emplace(reduce_op.get(), THDReduce##NAME);
+ REGISTER_REDUCE_OP(SUM);
+ REGISTER_REDUCE_OP(PRODUCT);
+ REGISTER_REDUCE_OP(MIN);
+ REGISTER_REDUCE_OP(MAX);
+#undef REGISTER_REDUCE_OP
+ Py_RETURN_TRUE;
+}
+
+static struct PyMethodDef _THDPModule_methods[] = {
+ {"_dist_init_extension", (PyCFunction)THDPModule_initExtension, METH_O, NULL},
+ {"_dist_init_process_group", (PyCFunction)THDPModule_initProcessGroup, METH_O, NULL},
+ {"_dist_get_rank", (PyCFunction)THDPModule_getRank, METH_NOARGS, NULL},
+ {"_dist_get_num_processes", (PyCFunction)THDPModule_getNumProcesses, METH_NOARGS, NULL},
+ {"_dist_send", (PyCFunction)THDPModule_send, METH_VARARGS, NULL},
+ {"_dist_recv", (PyCFunction)THDPModule_recv, METH_VARARGS, NULL},
+ {"_dist_all_reduce", (PyCFunction)THDPModule_allReduce, METH_VARARGS, NULL},
+ {"_dist_reduce", (PyCFunction)THDPModule_reduce, METH_VARARGS, NULL},
+ {"_dist_broadcast", (PyCFunction)THDPModule_broadcast, METH_VARARGS, NULL},
+ {NULL}
+};
+
+PyMethodDef* THDPModule_methods() {
+ return _THDPModule_methods;
+}
diff --git a/torch/csrc/distributed/THDP.h b/torch/csrc/distributed/THDP.h
new file mode 100644
index 0000000000..7acc1f10d7
--- /dev/null
+++ b/torch/csrc/distributed/THDP.h
@@ -0,0 +1,10 @@
+#ifndef THDP_H
+#define THDP_H
+
+#include <THD/THD.h>
+
+#include "torch/csrc/THP.h"
+#include "Module.h"
+#include "utils.h"
+
+#endif
diff --git a/torch/csrc/distributed/utils.cpp b/torch/csrc/distributed/utils.cpp
new file mode 100644
index 0000000000..ff06c02f67
--- /dev/null
+++ b/torch/csrc/distributed/utils.cpp
@@ -0,0 +1,9 @@
+#include "utils.h"
+
+template<>
+void THPPointer<THDTensorDescriptor>::free() {
+ if (ptr)
+ THDTensorDescriptor_free(ptr);
+}
+
+template class THPPointer<THDTensorDescriptor>;
diff --git a/torch/csrc/distributed/utils.h b/torch/csrc/distributed/utils.h
new file mode 100644
index 0000000000..5a73fa93dd
--- /dev/null
+++ b/torch/csrc/distributed/utils.h
@@ -0,0 +1,6 @@
+#include <Python.h>
+
+#include "THDP.h"
+
+typedef THPPointer<THDTensorDescriptor> THDPTensorDesc;
+