summaryrefslogtreecommitdiff
path: root/caffe2
diff options
context:
space:
mode:
authorPieter Noordhuis <pietern@fb.com>2017-09-08 10:42:08 -0700
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>2017-09-08 10:57:41 -0700
commitb8eb8ced7dadcb54238909b7bcd32a813593be4b (patch)
tree12d505ae464417367d073ba026d559dc6134e33d /caffe2
parentfdbfcfc43112dcea5e12b3826ec71d2ccc5026dc (diff)
downloadpytorch-b8eb8ced7dadcb54238909b7bcd32a813593be4b.tar.gz
pytorch-b8eb8ced7dadcb54238909b7bcd32a813593be4b.tar.bz2
pytorch-b8eb8ced7dadcb54238909b7bcd32a813593be4b.zip
Add transport/interface arguments to CreateCommonWorld operator
Summary: These arguments control which Gloo transport (TCP or IB) and which network interface is used for the common world. If not specified, it defaults to using TCP and the network interface for the IP that the machine's hostname resolves to. The valid values for the transport argument are "tcp" and "ibverbs". For ibverbs to work, Gloo must have been compiled with ibverbs support. If Gloo is built as part of Caffe2 (sourced from the third_party directory), then you can pass -DUSE_IBVERBS=ON to CMake to enable ibverbs support in Gloo. Closes https://github.com/caffe2/caffe2/pull/1177 Reviewed By: akyrola Differential Revision: D5789729 Pulled By: pietern fbshipit-source-id: 0dea1a115c729e54c5c1f9fdd5fb29c14a834a82
Diffstat (limited to 'caffe2')
-rw-r--r--caffe2/contrib/gloo/common.cc37
-rw-r--r--caffe2/contrib/gloo/common.h19
-rw-r--r--caffe2/contrib/gloo/common_world_ops.cc19
-rw-r--r--caffe2/contrib/gloo/common_world_ops.h26
-rw-r--r--caffe2/contrib/gloo/common_world_ops_gpu.cc29
-rw-r--r--caffe2/python/data_parallel_model.py6
-rw-r--r--caffe2/python/examples/resnet50_trainer.py11
7 files changed, 107 insertions, 40 deletions
diff --git a/caffe2/contrib/gloo/common.cc b/caffe2/contrib/gloo/common.cc
index fcd80be9d2..1103817a07 100644
--- a/caffe2/contrib/gloo/common.cc
+++ b/caffe2/contrib/gloo/common.cc
@@ -1,12 +1,49 @@
#include "caffe2/contrib/gloo/common.h"
+#include "caffe2/core/logging.h"
#include "caffe2/core/tensor.h"
+#include <gloo/config.h>
+#include <gloo/transport/tcp/device.h>
+#if defined(GLOO_USE_IBVERBS) && GLOO_USE_IBVERBS
+#include <gloo/transport/ibverbs/device.h>
+#endif
+
namespace caffe2 {
+namespace gloo {
void signalFailure(Blob* status_blob, std::exception& /* unused */) {
auto* res = status_blob->GetMutable<TensorCPU>();
res->Resize(1);
res->template mutable_data<int32_t>()[0] = 1;
}
+
+std::shared_ptr<::gloo::transport::Device> createDevice(
+ const createDeviceAttr attr) {
+ if (attr.transport == "tcp") {
+ ::gloo::transport::tcp::attr tcpAttr;
+ if (attr.interface.size() > 0) {
+ tcpAttr.iface = attr.interface;
+ }
+ return ::gloo::transport::tcp::CreateDevice(tcpAttr);
+ } else if (attr.transport == "ibverbs") {
+#if defined(GLOO_USE_IBVERBS) && GLOO_USE_IBVERBS
+ ::gloo::transport::ibverbs::attr ibverbsAttr;
+ ibverbsAttr.port = 1;
+ ibverbsAttr.index = 0;
+ if (attr.interface.size() > 0) {
+ ibverbsAttr.name = attr.interface;
+ }
+ return ::gloo::transport::ibverbs::CreateDevice(ibverbsAttr);
+#else
+ CAFFE_THROW(
+ "Gloo was not compiled with ibverbs support. ",
+ "Please recompile with -DUSE_IBVERBS=1.");
+#endif
+ }
+
+ CAFFE_THROW("Invalid transport: ", attr.transport);
}
+
+} // namespace gloo
+} // namespace caffe2
diff --git a/caffe2/contrib/gloo/common.h b/caffe2/contrib/gloo/common.h
index 3176df9f63..498c0cf8d3 100644
--- a/caffe2/contrib/gloo/common.h
+++ b/caffe2/contrib/gloo/common.h
@@ -4,7 +4,24 @@
#include "caffe2/core/blob.h"
+#include <gloo/transport/device.h>
+
namespace caffe2 {
+namespace gloo {
void signalFailure(Blob* status_blob, std::exception& exception);
-}
+
+struct createDeviceAttr {
+ // "tcp" or "ibverbs"
+ std::string transport;
+
+ // E.g. "eth0" (tcp), or "mlx5_0" (ibverbs).
+ // This may be empty to make Gloo figure it out.
+ std::string interface;
+};
+
+std::shared_ptr<::gloo::transport::Device> createDevice(
+ const createDeviceAttr attr);
+
+} // namespace gloo
+} // namespace caffe2
diff --git a/caffe2/contrib/gloo/common_world_ops.cc b/caffe2/contrib/gloo/common_world_ops.cc
index 6ea938f15d..9e631ce137 100644
--- a/caffe2/contrib/gloo/common_world_ops.cc
+++ b/caffe2/contrib/gloo/common_world_ops.cc
@@ -1,24 +1,13 @@
-#include "common_world_ops.h"
+#include "caffe2/contrib/gloo/common_world_ops.h"
#include <gloo/transport/tcp/device.h>
namespace caffe2 {
namespace gloo {
-template <typename T>
-std::shared_ptr<::gloo::transport::Device>
-CreateCommonWorld<T>::createDevice() {
- // Share single device between all common worlds. This should be
- // made configurable, for varying transports, and transport options
- // (e.g. tcp socket options, ibverbs device).
- //
- // All pairs are switched to synchronous mode after having
- // connected, so they don't need to synchronize with the device
- // thread when they are used from an algorithm.
- //
- ::gloo::transport::tcp::attr attr;
- static auto sharedDevice = ::gloo::transport::tcp::CreateDevice(attr);
- return sharedDevice;
+template <>
+void CreateCommonWorld<CPUContext>::initializeForContext() {
+ // Nothing to initialize for CPUContext.
}
namespace {
diff --git a/caffe2/contrib/gloo/common_world_ops.h b/caffe2/contrib/gloo/common_world_ops.h
index 912b01a784..70a19905c3 100644
--- a/caffe2/contrib/gloo/common_world_ops.h
+++ b/caffe2/contrib/gloo/common_world_ops.h
@@ -24,6 +24,10 @@ class CreateCommonWorld final : public Operator<Context> {
size_(OperatorBase::template GetSingleArgument<int>("size", 0)),
rank_(OperatorBase::template GetSingleArgument<int>("rank", 0)),
sync_(OperatorBase::template GetSingleArgument<bool>("sync", false)),
+ transport_(OperatorBase::template GetSingleArgument<std::string>(
+ "transport", "tcp")),
+ interface_(OperatorBase::template GetSingleArgument<std::string>(
+ "interface", "")),
status_blob_(
OperatorBase::GetSingleArgument<std::string>("status_blob", "")),
timeout_ms_(OperatorBase::GetSingleArgument<int>("timeout_ms", -1)),
@@ -32,10 +36,10 @@ class CreateCommonWorld final : public Operator<Context> {
operator_def.has_name(), "CreateCommonWorld operator requires name");
CAFFE_ENFORCE(rank_ >= 0 && rank_ < size_);
name_ = operator_def.name();
- device_ = createDevice();
if (status_blob_ != "") {
ws_->CreateBlob(status_blob_);
}
+ initialize();
}
virtual ~CreateCommonWorld() {}
@@ -87,11 +91,29 @@ class CreateCommonWorld final : public Operator<Context> {
}
}
- std::shared_ptr<::gloo::transport::Device> createDevice();
+ void initialize() {
+ // Share single device between all common worlds.
+ static std::once_flag once;
+ static std::shared_ptr<::gloo::transport::Device> device;
+ std::call_once(once, [&]() {
+ createDeviceAttr attr;
+ attr.transport = transport_;
+ attr.interface = interface_;
+ device = createDevice(attr);
+ });
+ device_ = device;
+
+ // Context specific initialization.
+ initializeForContext();
+ }
+
+ void initializeForContext();
const int size_;
const int rank_;
const bool sync_;
+ const std::string transport_;
+ const std::string interface_;
const std::string status_blob_;
const int timeout_ms_;
Workspace* ws_;
diff --git a/caffe2/contrib/gloo/common_world_ops_gpu.cc b/caffe2/contrib/gloo/common_world_ops_gpu.cc
index 072d49a3a3..f8fb701c84 100644
--- a/caffe2/contrib/gloo/common_world_ops_gpu.cc
+++ b/caffe2/contrib/gloo/common_world_ops_gpu.cc
@@ -1,4 +1,4 @@
-#include "common_world_ops.h"
+#include "caffe2/contrib/gloo/common_world_ops.h"
#include "caffe2/core/context_gpu.h"
@@ -8,29 +8,14 @@
namespace caffe2 {
namespace gloo {
-template <typename T>
-std::shared_ptr<::gloo::transport::Device>
-CreateCommonWorld<T>::createDevice() {
- // Share single device between all common worlds. This should be
- // made configurable, for varying transports, and transport options
- // (e.g. tcp socket options, ibverbs device).
- //
- // All pairs are switched to synchronous mode after having
- // connected, so they don't need to synchronize with the device
- // thread when they are used from an algorithm.
- //
+template <>
+void CreateCommonWorld<CUDAContext>::initializeForContext() {
static std::once_flag once;
- static std::shared_ptr<::gloo::transport::Device> device;
std::call_once(once, [&]() {
- ::gloo::transport::tcp::attr attr;
- device = ::gloo::transport::tcp::CreateDevice(attr);
-
- // This operator is the first time any Gloo code is executed
- // for a CUDAContext. Share Caffe2 CUDA mutex with Gloo.
- ::gloo::CudaShared::setMutex(&CUDAContext::mutex());
- });
-
- return device;
+ // This is the first time we call Gloo code for a CUDAContext.
+ // Share Caffe2 CUDA mutex with Gloo.
+ ::gloo::CudaShared::setMutex(&CUDAContext::mutex());
+ });
}
namespace {
diff --git a/caffe2/python/data_parallel_model.py b/caffe2/python/data_parallel_model.py
index 4d16591180..5e127ec22c 100644
--- a/caffe2/python/data_parallel_model.py
+++ b/caffe2/python/data_parallel_model.py
@@ -1510,6 +1510,11 @@ def _CreateOrCloneCommonWorld(
status_blob=status_blob,
)
else:
+ kwargs=dict()
+ if 'transport' in rendezvous:
+ kwargs['transport'] = rendezvous['transport']
+ if 'interface' in rendezvous:
+ kwargs['interface'] = rendezvous['interface']
comm_world = net.CreateCommonWorld(
[rendezvous['kv_handler']],
common_world_blob,
@@ -1519,6 +1524,7 @@ def _CreateOrCloneCommonWorld(
engine=rendezvous['engine'],
status_blob=status_blob,
timeout_ms=timeout_ms,
+ **kwargs
)
return comm_world
diff --git a/caffe2/python/examples/resnet50_trainer.py b/caffe2/python/examples/resnet50_trainer.py
index 746582470a..61c558e92d 100644
--- a/caffe2/python/examples/resnet50_trainer.py
+++ b/caffe2/python/examples/resnet50_trainer.py
@@ -250,11 +250,18 @@ def Train(args):
prefix=args.run_id,
)
)
+
+ # Expect interfaces to be comma separated.
+ # Use of multiple network interfaces is not yet complete,
+ # so simply use the first one in the list.
+ interfaces = args.distributed_interfaces.split(",")
rendezvous = dict(
kv_handler=store_handler,
shard_id=shard_id,
num_shards=num_shards,
engine="GLOO",
+ transport=args.distributed_transport,
+ interface=interfaces[0],
exit_nets=None)
else:
rendezvous = None
@@ -490,6 +497,10 @@ def main():
help='Data type used for training')
parser.add_argument('--enable-tensor-core', action='store_true',
help='Enable Tensor Core math for Conv and FC ops')
+ parser.add_argument("--distributed_transport", type=str, default="tcp",
+ help="Transport to use for distributed run [tcp|ibverbs]")
+ parser.add_argument("--distributed_interfaces", type=str, default="",
+ help="Network interfaces to use for distributed run")
args = parser.parse_args()