diff options
author | Pieter Noordhuis <pietern@fb.com> | 2017-09-08 10:42:08 -0700 |
---|---|---|
committer | Facebook Github Bot <facebook-github-bot@users.noreply.github.com> | 2017-09-08 10:57:41 -0700 |
commit | b8eb8ced7dadcb54238909b7bcd32a813593be4b (patch) | |
tree | 12d505ae464417367d073ba026d559dc6134e33d /caffe2 | |
parent | fdbfcfc43112dcea5e12b3826ec71d2ccc5026dc (diff) | |
download | pytorch-b8eb8ced7dadcb54238909b7bcd32a813593be4b.tar.gz pytorch-b8eb8ced7dadcb54238909b7bcd32a813593be4b.tar.bz2 pytorch-b8eb8ced7dadcb54238909b7bcd32a813593be4b.zip |
Add transport/interface arguments to CreateCommonWorld operator
Summary:
These arguments control which Gloo transport (TCP or IB) and which
network interface is used for the common world. If not specified, it
defaults to using TCP and the network interface for the IP that the
machine's hostname resolves to.
The valid values for the transport argument are "tcp" and "ibverbs".
For ibverbs to work, Gloo must have been compiled with ibverbs
support. If Gloo is built as part of Caffe2 (sourced from the
third_party directory), then you can pass -DUSE_IBVERBS=ON to CMake to
enable ibverbs support in Gloo.
Closes https://github.com/caffe2/caffe2/pull/1177
Reviewed By: akyrola
Differential Revision: D5789729
Pulled By: pietern
fbshipit-source-id: 0dea1a115c729e54c5c1f9fdd5fb29c14a834a82
Diffstat (limited to 'caffe2')
-rw-r--r-- | caffe2/contrib/gloo/common.cc | 37 | ||||
-rw-r--r-- | caffe2/contrib/gloo/common.h | 19 | ||||
-rw-r--r-- | caffe2/contrib/gloo/common_world_ops.cc | 19 | ||||
-rw-r--r-- | caffe2/contrib/gloo/common_world_ops.h | 26 | ||||
-rw-r--r-- | caffe2/contrib/gloo/common_world_ops_gpu.cc | 29 | ||||
-rw-r--r-- | caffe2/python/data_parallel_model.py | 6 | ||||
-rw-r--r-- | caffe2/python/examples/resnet50_trainer.py | 11 |
7 files changed, 107 insertions, 40 deletions
diff --git a/caffe2/contrib/gloo/common.cc b/caffe2/contrib/gloo/common.cc index fcd80be9d2..1103817a07 100644 --- a/caffe2/contrib/gloo/common.cc +++ b/caffe2/contrib/gloo/common.cc @@ -1,12 +1,49 @@ #include "caffe2/contrib/gloo/common.h" +#include "caffe2/core/logging.h" #include "caffe2/core/tensor.h" +#include <gloo/config.h> +#include <gloo/transport/tcp/device.h> +#if defined(GLOO_USE_IBVERBS) && GLOO_USE_IBVERBS +#include <gloo/transport/ibverbs/device.h> +#endif + namespace caffe2 { +namespace gloo { void signalFailure(Blob* status_blob, std::exception& /* unused */) { auto* res = status_blob->GetMutable<TensorCPU>(); res->Resize(1); res->template mutable_data<int32_t>()[0] = 1; } + +std::shared_ptr<::gloo::transport::Device> createDevice( + const createDeviceAttr attr) { + if (attr.transport == "tcp") { + ::gloo::transport::tcp::attr tcpAttr; + if (attr.interface.size() > 0) { + tcpAttr.iface = attr.interface; + } + return ::gloo::transport::tcp::CreateDevice(tcpAttr); + } else if (attr.transport == "ibverbs") { +#if defined(GLOO_USE_IBVERBS) && GLOO_USE_IBVERBS + ::gloo::transport::ibverbs::attr ibverbsAttr; + ibverbsAttr.port = 1; + ibverbsAttr.index = 0; + if (attr.interface.size() > 0) { + ibverbsAttr.name = attr.interface; + } + return ::gloo::transport::ibverbs::CreateDevice(ibverbsAttr); +#else + CAFFE_THROW( + "Gloo was not compiled with ibverbs support. ", + "Please recompile with -DUSE_IBVERBS=1."); +#endif + } + + CAFFE_THROW("Invalid transport: ", attr.transport); } + +} // namespace gloo +} // namespace caffe2 diff --git a/caffe2/contrib/gloo/common.h b/caffe2/contrib/gloo/common.h index 3176df9f63..498c0cf8d3 100644 --- a/caffe2/contrib/gloo/common.h +++ b/caffe2/contrib/gloo/common.h @@ -4,7 +4,24 @@ #include "caffe2/core/blob.h" +#include <gloo/transport/device.h> + namespace caffe2 { +namespace gloo { void signalFailure(Blob* status_blob, std::exception& exception); -} + +struct createDeviceAttr { + // "tcp" or "ibverbs" + std::string transport; + + // E.g. "eth0" (tcp), or "mlx5_0" (ibverbs). + // This may be empty to make Gloo figure it out. + std::string interface; +}; + +std::shared_ptr<::gloo::transport::Device> createDevice( + const createDeviceAttr attr); + +} // namespace gloo +} // namespace caffe2 diff --git a/caffe2/contrib/gloo/common_world_ops.cc b/caffe2/contrib/gloo/common_world_ops.cc index 6ea938f15d..9e631ce137 100644 --- a/caffe2/contrib/gloo/common_world_ops.cc +++ b/caffe2/contrib/gloo/common_world_ops.cc @@ -1,24 +1,13 @@ -#include "common_world_ops.h" +#include "caffe2/contrib/gloo/common_world_ops.h" #include <gloo/transport/tcp/device.h> namespace caffe2 { namespace gloo { -template <typename T> -std::shared_ptr<::gloo::transport::Device> -CreateCommonWorld<T>::createDevice() { - // Share single device between all common worlds. This should be - // made configurable, for varying transports, and transport options - // (e.g. tcp socket options, ibverbs device). - // - // All pairs are switched to synchronous mode after having - // connected, so they don't need to synchronize with the device - // thread when they are used from an algorithm. - // - ::gloo::transport::tcp::attr attr; - static auto sharedDevice = ::gloo::transport::tcp::CreateDevice(attr); - return sharedDevice; +template <> +void CreateCommonWorld<CPUContext>::initializeForContext() { + // Nothing to initialize for CPUContext. } namespace { diff --git a/caffe2/contrib/gloo/common_world_ops.h b/caffe2/contrib/gloo/common_world_ops.h index 912b01a784..70a19905c3 100644 --- a/caffe2/contrib/gloo/common_world_ops.h +++ b/caffe2/contrib/gloo/common_world_ops.h @@ -24,6 +24,10 @@ class CreateCommonWorld final : public Operator<Context> { size_(OperatorBase::template GetSingleArgument<int>("size", 0)), rank_(OperatorBase::template GetSingleArgument<int>("rank", 0)), sync_(OperatorBase::template GetSingleArgument<bool>("sync", false)), + transport_(OperatorBase::template GetSingleArgument<std::string>( + "transport", "tcp")), + interface_(OperatorBase::template GetSingleArgument<std::string>( + "interface", "")), status_blob_( OperatorBase::GetSingleArgument<std::string>("status_blob", "")), timeout_ms_(OperatorBase::GetSingleArgument<int>("timeout_ms", -1)), @@ -32,10 +36,10 @@ class CreateCommonWorld final : public Operator<Context> { operator_def.has_name(), "CreateCommonWorld operator requires name"); CAFFE_ENFORCE(rank_ >= 0 && rank_ < size_); name_ = operator_def.name(); - device_ = createDevice(); if (status_blob_ != "") { ws_->CreateBlob(status_blob_); } + initialize(); } virtual ~CreateCommonWorld() {} @@ -87,11 +91,29 @@ class CreateCommonWorld final : public Operator<Context> { } } - std::shared_ptr<::gloo::transport::Device> createDevice(); + void initialize() { + // Share single device between all common worlds. + static std::once_flag once; + static std::shared_ptr<::gloo::transport::Device> device; + std::call_once(once, [&]() { + createDeviceAttr attr; + attr.transport = transport_; + attr.interface = interface_; + device = createDevice(attr); + }); + device_ = device; + + // Context specific initialization. + initializeForContext(); + } + + void initializeForContext(); const int size_; const int rank_; const bool sync_; + const std::string transport_; + const std::string interface_; const std::string status_blob_; const int timeout_ms_; Workspace* ws_; diff --git a/caffe2/contrib/gloo/common_world_ops_gpu.cc b/caffe2/contrib/gloo/common_world_ops_gpu.cc index 072d49a3a3..f8fb701c84 100644 --- a/caffe2/contrib/gloo/common_world_ops_gpu.cc +++ b/caffe2/contrib/gloo/common_world_ops_gpu.cc @@ -1,4 +1,4 @@ -#include "common_world_ops.h" +#include "caffe2/contrib/gloo/common_world_ops.h" #include "caffe2/core/context_gpu.h" @@ -8,29 +8,14 @@ namespace caffe2 { namespace gloo { -template <typename T> -std::shared_ptr<::gloo::transport::Device> -CreateCommonWorld<T>::createDevice() { - // Share single device between all common worlds. This should be - // made configurable, for varying transports, and transport options - // (e.g. tcp socket options, ibverbs device). - // - // All pairs are switched to synchronous mode after having - // connected, so they don't need to synchronize with the device - // thread when they are used from an algorithm. - // +template <> +void CreateCommonWorld<CUDAContext>::initializeForContext() { static std::once_flag once; - static std::shared_ptr<::gloo::transport::Device> device; std::call_once(once, [&]() { - ::gloo::transport::tcp::attr attr; - device = ::gloo::transport::tcp::CreateDevice(attr); - - // This operator is the first time any Gloo code is executed - // for a CUDAContext. Share Caffe2 CUDA mutex with Gloo. - ::gloo::CudaShared::setMutex(&CUDAContext::mutex()); - }); - - return device; + // This is the first time we call Gloo code for a CUDAContext. + // Share Caffe2 CUDA mutex with Gloo. + ::gloo::CudaShared::setMutex(&CUDAContext::mutex()); + }); } namespace { diff --git a/caffe2/python/data_parallel_model.py b/caffe2/python/data_parallel_model.py index 4d16591180..5e127ec22c 100644 --- a/caffe2/python/data_parallel_model.py +++ b/caffe2/python/data_parallel_model.py @@ -1510,6 +1510,11 @@ def _CreateOrCloneCommonWorld( status_blob=status_blob, ) else: + kwargs=dict() + if 'transport' in rendezvous: + kwargs['transport'] = rendezvous['transport'] + if 'interface' in rendezvous: + kwargs['interface'] = rendezvous['interface'] comm_world = net.CreateCommonWorld( [rendezvous['kv_handler']], common_world_blob, @@ -1519,6 +1524,7 @@ def _CreateOrCloneCommonWorld( engine=rendezvous['engine'], status_blob=status_blob, timeout_ms=timeout_ms, + **kwargs ) return comm_world diff --git a/caffe2/python/examples/resnet50_trainer.py b/caffe2/python/examples/resnet50_trainer.py index 746582470a..61c558e92d 100644 --- a/caffe2/python/examples/resnet50_trainer.py +++ b/caffe2/python/examples/resnet50_trainer.py @@ -250,11 +250,18 @@ def Train(args): prefix=args.run_id, ) ) + + # Expect interfaces to be comma separated. + # Use of multiple network interfaces is not yet complete, + # so simply use the first one in the list. + interfaces = args.distributed_interfaces.split(",") rendezvous = dict( kv_handler=store_handler, shard_id=shard_id, num_shards=num_shards, engine="GLOO", + transport=args.distributed_transport, + interface=interfaces[0], exit_nets=None) else: rendezvous = None @@ -490,6 +497,10 @@ def main(): help='Data type used for training') parser.add_argument('--enable-tensor-core', action='store_true', help='Enable Tensor Core math for Conv and FC ops') + parser.add_argument("--distributed_transport", type=str, default="tcp", + help="Transport to use for distributed run [tcp|ibverbs]") + parser.add_argument("--distributed_interfaces", type=str, default="", + help="Network interfaces to use for distributed run") args = parser.parse_args() |