summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Shelhamer <shelhamer@imaginarynumber.net>2017-01-17 09:12:01 -0800
committerGitHub <noreply@github.com>2017-01-17 09:12:01 -0800
commit536851c00e44d545649648b372503f52a89c0499 (patch)
treed8f841db4fec3a92bf2cbe9a651b2bdf2d29159f
parent0a91794d85562eaa6064541a58a3588b5afb0e5f (diff)
parent5f28eb1147c1abb6e5e5c7cd282218679b0d531d (diff)
downloadcaffeonacl-536851c00e44d545649648b372503f52a89c0499.tar.gz
caffeonacl-536851c00e44d545649648b372503f52a89c0499.tar.bz2
caffeonacl-536851c00e44d545649648b372503f52a89c0499.zip
Merge pull request #4563 from cypof/nccl
adopt NVIDIA's NCCL for multi-GPU and switch interface to python
-rw-r--r--CMakeLists.txt1
-rw-r--r--Makefile6
-rw-r--r--Makefile.config.example4
-rw-r--r--cmake/Dependencies.cmake15
-rw-r--r--cmake/Modules/FindNCCL.cmake26
-rw-r--r--cmake/Summary.cmake1
-rw-r--r--include/caffe/blob.hpp1
-rw-r--r--include/caffe/common.hpp14
-rw-r--r--include/caffe/data_reader.hpp82
-rw-r--r--include/caffe/internal_thread.hpp4
-rw-r--r--include/caffe/layer.hpp43
-rw-r--r--include/caffe/layers/base_data_layer.hpp6
-rw-r--r--include/caffe/layers/data_layer.hpp7
-rw-r--r--include/caffe/layers/hdf5_data_layer.hpp6
-rw-r--r--include/caffe/layers/python_layer.hpp4
-rw-r--r--include/caffe/net.hpp40
-rw-r--r--include/caffe/parallel.hpp96
-rw-r--r--include/caffe/solver.hpp40
-rw-r--r--include/caffe/syncedmem.hpp14
-rw-r--r--include/caffe/util/math_functions.hpp5
-rw-r--r--include/caffe/util/nccl.hpp37
-rw-r--r--python/caffe/__init__.py4
-rw-r--r--python/caffe/_caffe.cpp113
-rw-r--r--python/caffe/pycaffe.py2
-rw-r--r--python/train.py100
-rw-r--r--src/caffe/blob.cpp18
-rw-r--r--src/caffe/common.cpp5
-rw-r--r--src/caffe/data_reader.cpp119
-rw-r--r--src/caffe/internal_thread.cpp10
-rw-r--r--src/caffe/layer.cpp20
-rw-r--r--src/caffe/layers/base_data_layer.cpp44
-rw-r--r--src/caffe/layers/base_data_layer.cu21
-rw-r--r--src/caffe/layers/data_layer.cpp82
-rw-r--r--src/caffe/layers/hdf5_data_layer.cpp55
-rw-r--r--src/caffe/layers/hdf5_data_layer.cu22
-rw-r--r--src/caffe/layers/image_data_layer.cpp13
-rw-r--r--src/caffe/layers/window_data_layer.cpp8
-rw-r--r--src/caffe/net.cpp47
-rw-r--r--src/caffe/parallel.cpp514
-rw-r--r--src/caffe/proto/caffe.proto9
-rw-r--r--src/caffe/solver.cpp44
-rw-r--r--src/caffe/solvers/adagrad_solver.cpp1
-rw-r--r--src/caffe/solvers/nesterov_solver.cpp1
-rw-r--r--src/caffe/solvers/sgd_solver.cpp4
-rw-r--r--src/caffe/syncedmem.cpp59
-rw-r--r--src/caffe/test/test_data_layer.cpp36
-rw-r--r--src/caffe/test/test_gradient_based_solver.cpp34
-rw-r--r--src/caffe/test/test_hdf5data_layer.cpp30
-rw-r--r--src/caffe/util/blocking_queue.cpp5
-rw-r--r--src/caffe/util/db_lmdb.cpp2
-rw-r--r--src/caffe/util/math_functions.cu20
-rw-r--r--tools/caffe.cpp11
52 files changed, 1022 insertions, 883 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index da7142c9..3af394f7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -28,6 +28,7 @@ include(cmake/ConfigGen.cmake)
# ---[ Options
caffe_option(CPU_ONLY "Build Caffe without CUDA support" OFF) # TODO: rename to USE_CUDA
caffe_option(USE_CUDNN "Build Caffe with cuDNN library support" ON IF NOT CPU_ONLY)
+caffe_option(USE_NCCL "Build Caffe with NCCL library support" OFF)
caffe_option(BUILD_SHARED_LIBS "Build shared libraries" ON)
caffe_option(BUILD_python "Build Python wrapper" ON)
set(python_version "2" CACHE STRING "Specify which Python version to use")
diff --git a/Makefile b/Makefile
index ccc4d8b9..65d08f7d 100644
--- a/Makefile
+++ b/Makefile
@@ -328,6 +328,12 @@ ifeq ($(USE_CUDNN), 1)
COMMON_FLAGS += -DUSE_CUDNN
endif
+# NCCL acceleration configuration
+ifeq ($(USE_NCCL), 1)
+ LIBRARIES += nccl
+ COMMON_FLAGS += -DUSE_NCCL
+endif
+
# configure IO libraries
ifeq ($(USE_OPENCV), 1)
COMMON_FLAGS += -DUSE_OPENCV
diff --git a/Makefile.config.example b/Makefile.config.example
index 07bed63a..541cf807 100644
--- a/Makefile.config.example
+++ b/Makefile.config.example
@@ -94,6 +94,10 @@ LIBRARY_DIRS := $(PYTHON_LIB) /usr/local/lib /usr/lib
# INCLUDE_DIRS += $(shell brew --prefix)/include
# LIBRARY_DIRS += $(shell brew --prefix)/lib
+# NCCL acceleration switch (uncomment to build with NCCL)
+# https://github.com/NVIDIA/nccl (last tested version: v1.2.3-1+cuda8.0)
+# USE_NCCL := 1
+
# Uncomment to use `pkg-config` to specify OpenCV library paths.
# (Usually not necessary -- OpenCV libraries are normally installed in one of the above $LIBRARY_DIRS.)
# USE_PKG_CONFIG := 1
diff --git a/cmake/Dependencies.cmake b/cmake/Dependencies.cmake
index ae9ce8e4..ba28a128 100644
--- a/cmake/Dependencies.cmake
+++ b/cmake/Dependencies.cmake
@@ -67,6 +67,13 @@ if(NOT HAVE_CUDA)
add_definitions(-DCPU_ONLY)
endif()
+if(USE_NCCL)
+ find_package(NCCL REQUIRED)
+ include_directories(SYSTEM ${NCCL_INCLUDE_DIR})
+ list(APPEND Caffe_LINKER_LIBS ${NCCL_LIBRARIES})
+ add_definitions(-DUSE_NCCL)
+endif()
+
# ---[ OpenCV
if(USE_OPENCV)
find_package(OpenCV QUIET COMPONENTS core highgui imgproc imgcodecs)
@@ -119,18 +126,18 @@ if(BUILD_python)
find_package(NumPy 1.7.1)
# Find the matching boost python implementation
set(version ${PYTHONLIBS_VERSION_STRING})
-
+
STRING( REGEX REPLACE "[^0-9]" "" boost_py_version ${version} )
find_package(Boost 1.46 COMPONENTS "python-py${boost_py_version}")
set(Boost_PYTHON_FOUND ${Boost_PYTHON-PY${boost_py_version}_FOUND})
-
+
while(NOT "${version}" STREQUAL "" AND NOT Boost_PYTHON_FOUND)
STRING( REGEX REPLACE "([0-9.]+).[0-9]+" "\\1" version ${version} )
-
+
STRING( REGEX REPLACE "[^0-9]" "" boost_py_version ${version} )
find_package(Boost 1.46 COMPONENTS "python-py${boost_py_version}")
set(Boost_PYTHON_FOUND ${Boost_PYTHON-PY${boost_py_version}_FOUND})
-
+
STRING( REGEX MATCHALL "([0-9.]+).[0-9]+" has_more_version ${version} )
if("${has_more_version}" STREQUAL "")
break()
diff --git a/cmake/Modules/FindNCCL.cmake b/cmake/Modules/FindNCCL.cmake
new file mode 100644
index 00000000..c8845934
--- /dev/null
+++ b/cmake/Modules/FindNCCL.cmake
@@ -0,0 +1,26 @@
+set(NCCL_INC_PATHS
+ /usr/include
+ /usr/local/include
+ $ENV{NCCL_DIR}/include
+ )
+
+set(NCCL_LIB_PATHS
+ /lib
+ /lib64
+ /usr/lib
+ /usr/lib64
+ /usr/local/lib
+ /usr/local/lib64
+ $ENV{NCCL_DIR}/lib
+ )
+
+find_path(NCCL_INCLUDE_DIR NAMES nccl.h PATHS ${NCCL_INC_PATHS})
+find_library(NCCL_LIBRARIES NAMES nccl PATHS ${NCCL_LIB_PATHS})
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(NCCL DEFAULT_MSG NCCL_INCLUDE_DIR NCCL_LIBRARIES)
+
+if (NCCL_FOUND)
+ message(STATUS "Found NCCL (include: ${NCCL_INCLUDE_DIR}, library: ${NCCL_LIBRARIES})")
+ mark_as_advanced(NCCL_INCLUDE_DIR NCCL_LIBRARIES)
+endif ()
diff --git a/cmake/Summary.cmake b/cmake/Summary.cmake
index ba025cf8..ed8c2526 100644
--- a/cmake/Summary.cmake
+++ b/cmake/Summary.cmake
@@ -117,6 +117,7 @@ function(caffe_print_configuration_summary)
caffe_status(" USE_OPENCV : ${USE_OPENCV}")
caffe_status(" USE_LEVELDB : ${USE_LEVELDB}")
caffe_status(" USE_LMDB : ${USE_LMDB}")
+ caffe_status(" USE_NCCL : ${USE_NCCL}")
caffe_status(" ALLOW_LMDB_NOLOCK : ${ALLOW_LMDB_NOLOCK}")
caffe_status("")
caffe_status("Dependencies:")
diff --git a/include/caffe/blob.hpp b/include/caffe/blob.hpp
index af360ac2..2f59471c 100644
--- a/include/caffe/blob.hpp
+++ b/include/caffe/blob.hpp
@@ -220,6 +220,7 @@ class Blob {
void set_cpu_data(Dtype* data);
const int* gpu_shape() const;
const Dtype* gpu_data() const;
+ void set_gpu_data(Dtype* data);
const Dtype* cpu_diff() const;
const Dtype* gpu_diff() const;
Dtype* mutable_cpu_data();
diff --git a/include/caffe/common.hpp b/include/caffe/common.hpp
index 3c6a076e..4904d1d8 100644
--- a/include/caffe/common.hpp
+++ b/include/caffe/common.hpp
@@ -158,11 +158,14 @@ class Caffe {
// Search from start_id to the highest possible device ordinal,
// return the ordinal of the first available device.
static int FindDevice(const int start_id = 0);
- // Parallel training info
+ // Parallel training
inline static int solver_count() { return Get().solver_count_; }
inline static void set_solver_count(int val) { Get().solver_count_ = val; }
- inline static bool root_solver() { return Get().root_solver_; }
- inline static void set_root_solver(bool val) { Get().root_solver_ = val; }
+ inline static int solver_rank() { return Get().solver_rank_; }
+ inline static void set_solver_rank(int val) { Get().solver_rank_ = val; }
+ inline static bool multiprocess() { return Get().multiprocess_; }
+ inline static void set_multiprocess(bool val) { Get().multiprocess_ = val; }
+ inline static bool root_solver() { return Get().solver_rank_ == 0; }
protected:
#ifndef CPU_ONLY
@@ -172,8 +175,11 @@ class Caffe {
shared_ptr<RNG> random_generator_;
Brew mode_;
+
+ // Parallel training
int solver_count_;
- bool root_solver_;
+ int solver_rank_;
+ bool multiprocess_;
private:
// The private constructor to avoid duplicate instantiation.
diff --git a/include/caffe/data_reader.hpp b/include/caffe/data_reader.hpp
deleted file mode 100644
index 8ed5542c..00000000
--- a/include/caffe/data_reader.hpp
+++ /dev/null
@@ -1,82 +0,0 @@
-#ifndef CAFFE_DATA_READER_HPP_
-#define CAFFE_DATA_READER_HPP_
-
-#include <map>
-#include <string>
-#include <vector>
-
-#include "caffe/common.hpp"
-#include "caffe/internal_thread.hpp"
-#include "caffe/util/blocking_queue.hpp"
-#include "caffe/util/db.hpp"
-
-namespace caffe {
-
-/**
- * @brief Reads data from a source to queues available to data layers.
- * A single reading thread is created per source, even if multiple solvers
- * are running in parallel, e.g. for multi-GPU training. This makes sure
- * databases are read sequentially, and that each solver accesses a different
- * subset of the database. Data is distributed to solvers in a round-robin
- * way to keep parallel training deterministic.
- */
-class DataReader {
- public:
- explicit DataReader(const LayerParameter& param);
- ~DataReader();
-
- inline BlockingQueue<Datum*>& free() const {
- return queue_pair_->free_;
- }
- inline BlockingQueue<Datum*>& full() const {
- return queue_pair_->full_;
- }
-
- protected:
- // Queue pairs are shared between a body and its readers
- class QueuePair {
- public:
- explicit QueuePair(int size);
- ~QueuePair();
-
- BlockingQueue<Datum*> free_;
- BlockingQueue<Datum*> full_;
-
- DISABLE_COPY_AND_ASSIGN(QueuePair);
- };
-
- // A single body is created per source
- class Body : public InternalThread {
- public:
- explicit Body(const LayerParameter& param);
- virtual ~Body();
-
- protected:
- void InternalThreadEntry();
- void read_one(db::Cursor* cursor, QueuePair* qp);
-
- const LayerParameter param_;
- BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_;
-
- friend class DataReader;
-
- DISABLE_COPY_AND_ASSIGN(Body);
- };
-
- // A source is uniquely identified by its layer name + path, in case
- // the same database is read from two different locations in the net.
- static inline string source_key(const LayerParameter& param) {
- return param.name() + ":" + param.data_param().source();
- }
-
- const shared_ptr<QueuePair> queue_pair_;
- shared_ptr<Body> body_;
-
- static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
-
-DISABLE_COPY_AND_ASSIGN(DataReader);
-};
-
-} // namespace caffe
-
-#endif // CAFFE_DATA_READER_HPP_
diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp
index 6a8c5a02..0ba67665 100644
--- a/include/caffe/internal_thread.hpp
+++ b/include/caffe/internal_thread.hpp
@@ -42,8 +42,8 @@ class InternalThread {
bool must_stop();
private:
- void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count,
- bool root_solver);
+ void entry(int device, Caffe::Brew mode, int rand_seed,
+ int solver_count, int solver_rank, bool multiprocess);
shared_ptr<boost::thread> thread_;
};
diff --git a/include/caffe/layer.hpp b/include/caffe/layer.hpp
index 10f353f9..30dbfd53 100644
--- a/include/caffe/layer.hpp
+++ b/include/caffe/layer.hpp
@@ -38,7 +38,7 @@ class Layer {
* layer.
*/
explicit Layer(const LayerParameter& param)
- : layer_param_(param), is_shared_(false) {
+ : layer_param_(param) {
// Set phase and copy blobs (if there are any).
phase_ = param.phase();
if (layer_param_.blobs_size() > 0) {
@@ -66,7 +66,6 @@ class Layer {
*/
void SetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
- InitMutex();
CheckBlobCounts(bottom, top);
LayerSetUp(bottom, top);
Reshape(bottom, top);
@@ -93,30 +92,6 @@ class Layer {
const vector<Blob<Dtype>*>& top) {}
/**
- * @brief Whether a layer should be shared by multiple nets during data
- * parallelism. By default, all layers except for data layers should
- * not be shared. data layers should be shared to ensure each worker
- * solver access data sequentially during data parallelism.
- */
- virtual inline bool ShareInParallel() const { return false; }
-
- /** @brief Return whether this layer is actually shared by other nets.
- * If ShareInParallel() is true and using more than one GPU and the
- * net has TRAIN phase, then this function is expected return true.
- */
- inline bool IsShared() const { return is_shared_; }
-
- /** @brief Set whether this layer is actually shared by other nets
- * If ShareInParallel() is true and using more than one GPU and the
- * net has TRAIN phase, then is_shared should be set true.
- */
- inline void SetShared(bool is_shared) {
- CHECK(ShareInParallel() || !is_shared)
- << type() << "Layer does not support sharing.";
- is_shared_ = is_shared;
- }
-
- /**
* @brief Adjust the shapes of top blobs and internal buffers to accommodate
* the shapes of the bottom blobs.
*
@@ -428,19 +403,6 @@ class Layer {
}
private:
- /** Whether this layer is actually shared by other nets*/
- bool is_shared_;
-
- /** The mutex for sequential forward if this layer is shared */
- shared_ptr<boost::mutex> forward_mutex_;
-
- /** Initialize forward_mutex_ */
- void InitMutex();
- /** Lock forward_mutex_ if this layer is shared */
- void Lock();
- /** Unlock forward_mutex_ if this layer is shared */
- void Unlock();
-
DISABLE_COPY_AND_ASSIGN(Layer);
}; // class Layer
@@ -450,8 +412,6 @@ class Layer {
template <typename Dtype>
inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
- // Lock during forward to ensure sequential forward
- Lock();
Dtype loss = 0;
Reshape(bottom, top);
switch (Caffe::mode()) {
@@ -482,7 +442,6 @@ inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom,
default:
LOG(FATAL) << "Unknown caffe mode.";
}
- Unlock();
return loss;
}
diff --git a/include/caffe/layers/base_data_layer.hpp b/include/caffe/layers/base_data_layer.hpp
index 2c49b731..21d3ada5 100644
--- a/include/caffe/layers/base_data_layer.hpp
+++ b/include/caffe/layers/base_data_layer.hpp
@@ -67,16 +67,14 @@ class BasePrefetchingDataLayer :
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
- // Prefetches batches (asynchronously if to GPU memory)
- static const int PREFETCH_COUNT = 3;
-
protected:
virtual void InternalThreadEntry();
virtual void load_batch(Batch<Dtype>* batch) = 0;
- Batch<Dtype> prefetch_[PREFETCH_COUNT];
+ vector<shared_ptr<Batch<Dtype> > > prefetch_;
BlockingQueue<Batch<Dtype>*> prefetch_free_;
BlockingQueue<Batch<Dtype>*> prefetch_full_;
+ Batch<Dtype>* prefetch_current_;
Blob<Dtype> transformed_data_;
};
diff --git a/include/caffe/layers/data_layer.hpp b/include/caffe/layers/data_layer.hpp
index 6c361791..dec58180 100644
--- a/include/caffe/layers/data_layer.hpp
+++ b/include/caffe/layers/data_layer.hpp
@@ -4,7 +4,6 @@
#include <vector>
#include "caffe/blob.hpp"
-#include "caffe/data_reader.hpp"
#include "caffe/data_transformer.hpp"
#include "caffe/internal_thread.hpp"
#include "caffe/layer.hpp"
@@ -29,9 +28,13 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> {
virtual inline int MaxTopBlobs() const { return 2; }
protected:
+ void Next();
+ bool Skip();
virtual void load_batch(Batch<Dtype>* batch);
- DataReader reader_;
+ shared_ptr<db::DB> db_;
+ shared_ptr<db::Cursor> cursor_;
+ uint64_t offset_;
};
} // namespace caffe
diff --git a/include/caffe/layers/hdf5_data_layer.hpp b/include/caffe/layers/hdf5_data_layer.hpp
index b04cf8e1..650a3fb0 100644
--- a/include/caffe/layers/hdf5_data_layer.hpp
+++ b/include/caffe/layers/hdf5_data_layer.hpp
@@ -23,7 +23,7 @@ template <typename Dtype>
class HDF5DataLayer : public Layer<Dtype> {
public:
explicit HDF5DataLayer(const LayerParameter& param)
- : Layer<Dtype>(param) {}
+ : Layer<Dtype>(param), offset_() {}
virtual ~HDF5DataLayer();
virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
@@ -38,6 +38,9 @@ class HDF5DataLayer : public Layer<Dtype> {
virtual inline int MinTopBlobs() const { return 1; }
protected:
+ void Next();
+ bool Skip();
+
virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top);
virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom,
@@ -55,6 +58,7 @@ class HDF5DataLayer : public Layer<Dtype> {
std::vector<shared_ptr<Blob<Dtype> > > hdf_blobs_;
std::vector<unsigned int> data_permutation_;
std::vector<unsigned int> file_permutation_;
+ uint64_t offset_;
};
} // namespace caffe
diff --git a/include/caffe/layers/python_layer.hpp b/include/caffe/layers/python_layer.hpp
index 66dbbdf1..10c4bfd0 100644
--- a/include/caffe/layers/python_layer.hpp
+++ b/include/caffe/layers/python_layer.hpp
@@ -21,8 +21,8 @@ class PythonLayer : public Layer<Dtype> {
// Disallow PythonLayer in MultiGPU training stage, due to GIL issues
// Details: https://github.com/BVLC/caffe/issues/2936
if (this->phase_ == TRAIN && Caffe::solver_count() > 1
- && !ShareInParallel()) {
- LOG(FATAL) << "PythonLayer is not implemented in Multi-GPU training";
+ && !Caffe::multiprocess()) {
+ LOG(FATAL) << "PythonLayer does not support CLI Multi-GPU, use train.py";
}
self_.attr("param_str") = bp::str(
this->layer_param_.python_param().param_str());
diff --git a/include/caffe/net.hpp b/include/caffe/net.hpp
index 493bdf29..d3c9306e 100644
--- a/include/caffe/net.hpp
+++ b/include/caffe/net.hpp
@@ -23,10 +23,9 @@ namespace caffe {
template <typename Dtype>
class Net {
public:
- explicit Net(const NetParameter& param, const Net* root_net = NULL);
+ explicit Net(const NetParameter& param);
explicit Net(const string& param_file, Phase phase,
- const int level = 0, const vector<string>* stages = NULL,
- const Net* root_net = NULL);
+ const int level = 0, const vector<string>* stages = NULL);
virtual ~Net() {}
/// @brief Initialize a network with a NetParameter.
@@ -228,6 +227,31 @@ class Net {
static bool StateMeetsRule(const NetState& state, const NetStateRule& rule,
const string& layer_name);
+ // Invoked at specific points during an iteration
+ class Callback {
+ protected:
+ virtual void run(int layer) = 0;
+
+ template <typename T>
+ friend class Net;
+ };
+ const vector<Callback*>& before_forward() const { return before_forward_; }
+ void add_before_forward(Callback* value) {
+ before_forward_.push_back(value);
+ }
+ const vector<Callback*>& after_forward() const { return after_forward_; }
+ void add_after_forward(Callback* value) {
+ after_forward_.push_back(value);
+ }
+ const vector<Callback*>& before_backward() const { return before_backward_; }
+ void add_before_backward(Callback* value) {
+ before_backward_.push_back(value);
+ }
+ const vector<Callback*>& after_backward() const { return after_backward_; }
+ void add_after_backward(Callback* value) {
+ after_backward_.push_back(value);
+ }
+
protected:
// Helpers for Init.
/// @brief Append a new top blob to the net.
@@ -306,9 +330,13 @@ class Net {
size_t memory_used_;
/// Whether to compute and display debug info for the net.
bool debug_info_;
- /// The root net that actually holds the shared layers in data parallelism
- const Net* const root_net_;
- DISABLE_COPY_AND_ASSIGN(Net);
+ // Callbacks
+ vector<Callback*> before_forward_;
+ vector<Callback*> after_forward_;
+ vector<Callback*> before_backward_;
+ vector<Callback*> after_backward_;
+
+DISABLE_COPY_AND_ASSIGN(Net);
};
diff --git a/include/caffe/parallel.hpp b/include/caffe/parallel.hpp
index 6c496c88..64bb48e6 100644
--- a/include/caffe/parallel.hpp
+++ b/include/caffe/parallel.hpp
@@ -1,8 +1,11 @@
#ifndef CAFFE_PARALLEL_HPP_
#define CAFFE_PARALLEL_HPP_
-#include <boost/date_time/posix_time/posix_time.hpp>
+#ifdef USE_NCCL
+#include <boost/thread.hpp>
+
+#include <string>
#include <vector>
#include "caffe/blob.hpp"
@@ -13,6 +16,7 @@
#include "caffe/solver.hpp"
#include "caffe/syncedmem.hpp"
#include "caffe/util/blocking_queue.hpp"
+#include "caffe/util/nccl.hpp"
namespace caffe {
@@ -51,7 +55,7 @@ class GPUParams : public Params<Dtype> {
GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device);
virtual ~GPUParams();
- void configure(Solver<Dtype>* solver) const;
+ void Configure(Solver<Dtype>* solver) const;
protected:
using Params<Dtype>::size_;
@@ -59,58 +63,55 @@ class GPUParams : public Params<Dtype> {
using Params<Dtype>::diff_;
};
-class DevicePair {
- public:
- DevicePair(int parent, int device)
- : parent_(parent),
- device_(device) {
- }
- inline int parent() {
- return parent_;
- }
- inline int device() {
- return device_;
- }
-
- // Group GPUs in pairs, by proximity depending on machine's topology
- static void compute(const vector<int> devices, vector<DevicePair>* pairs);
-
- protected:
- int parent_;
- int device_;
-};
-
-// Synchronous data parallelism using map-reduce between local GPUs.
template<typename Dtype>
-class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
- public InternalThread {
+class NCCL : public GPUParams<Dtype>,
+ public Solver<Dtype>::Callback,
+ public Net<Dtype>::Callback {
public:
- explicit P2PSync(shared_ptr<Solver<Dtype> > root_solver,
- P2PSync<Dtype>* parent, const SolverParameter& param);
- virtual ~P2PSync();
-
- inline const shared_ptr<Solver<Dtype> >& solver() const {
- return solver_;
- }
-
- void Run(const vector<int>& gpus);
- void Prepare(const vector<int>& gpus,
- vector<shared_ptr<P2PSync<Dtype> > >* syncs);
- inline const int initial_iter() const { return initial_iter_; }
+ /**
+ * Single process version.
+ */
+ explicit NCCL(shared_ptr<Solver<Dtype> > solver);
+ /**
+ * In multi-process settings, first create a NCCL id (new_uid), then
+ * pass it to each process to create connected instances.
+ */
+ NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid);
+ ~NCCL();
+
+ boost::barrier* barrier();
+ void set_barrier(boost::barrier* value);
+
+ /**
+ * In single process settings, create instances without uids and
+ * call this to connect them.
+ */
+ static void InitSingleProcess(vector<NCCL<Dtype>*>* nccls);
+
+ static string new_uid();
+
+ /**
+ * Broadcast weights from rank 0 other solvers.
+ */
+ void Broadcast();
+
+ /**
+ * Single process multi-GPU.
+ */
+ void Run(const vector<int>& gpus, const char* restore);
protected:
- void on_start();
+ void Init();
+ void on_start() {}
+ void run(int layer); // Net callback
void on_gradients_ready();
- void InternalThreadEntry();
+ ncclComm_t comm_;
+ cudaStream_t stream_;
- P2PSync<Dtype>* parent_;
- vector<P2PSync<Dtype>*> children_;
- BlockingQueue<P2PSync<Dtype>*> queue_;
- const int initial_iter_;
- Dtype* parent_grads_;
shared_ptr<Solver<Dtype> > solver_;
-
+ // Should not be necessary, https://github.com/NVIDIA/nccl/issues/37
+ boost::barrier* barrier_;
using Params<Dtype>::size_;
using Params<Dtype>::data_;
using Params<Dtype>::diff_;
@@ -118,4 +119,5 @@ class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
} // namespace caffe
-#endif
+#endif // USE_NCCL
+#endif // header
diff --git a/include/caffe/solver.hpp b/include/caffe/solver.hpp
index eafcee32..a28d8cb8 100644
--- a/include/caffe/solver.hpp
+++ b/include/caffe/solver.hpp
@@ -6,6 +6,7 @@
#include "caffe/net.hpp"
#include "caffe/solver_factory.hpp"
+#include "caffe/util/benchmark.hpp"
namespace caffe {
@@ -40,9 +41,8 @@ typedef boost::function<SolverAction::Enum()> ActionCallback;
template <typename Dtype>
class Solver {
public:
- explicit Solver(const SolverParameter& param,
- const Solver* root_solver = NULL);
- explicit Solver(const string& param_file, const Solver* root_solver = NULL);
+ explicit Solver(const SolverParameter& param);
+ explicit Solver(const string& param_file);
void Init(const SolverParameter& param);
void InitTrainNet();
void InitTestNets();
@@ -72,7 +72,7 @@ class Solver {
inline const vector<shared_ptr<Net<Dtype> > >& test_nets() {
return test_nets_;
}
- int iter() { return iter_; }
+ int iter() const { return iter_; }
// Invoked at specific points during an iteration
class Callback {
@@ -118,10 +118,6 @@ class Solver {
vector<Dtype> losses_;
Dtype smoothed_loss_;
- // The root solver that holds root nets (actually containing shared layers)
- // in data parallelism
- const Solver* const root_solver_;
-
// A function that can be set by a client of the Solver to provide indication
// that it wants a snapshot saved and/or to exit early.
ActionCallback action_request_function_;
@@ -129,31 +125,11 @@ class Solver {
// True iff a request to stop early was received.
bool requested_early_exit_;
- DISABLE_COPY_AND_ASSIGN(Solver);
-};
+ // Timing information, handy to tune e.g. nbr of GPUs
+ Timer iteration_timer_;
+ float iterations_last_;
-/**
- * @brief Solver that only computes gradients, used as worker
- * for multi-GPU training.
- */
-template <typename Dtype>
-class WorkerSolver : public Solver<Dtype> {
- public:
- explicit WorkerSolver(const SolverParameter& param,
- const Solver<Dtype>* root_solver = NULL)
- : Solver<Dtype>(param, root_solver) {}
-
- protected:
- void ApplyUpdate() {}
- void SnapshotSolverState(const string& model_filename) {
- LOG(FATAL) << "Should not be called on worker solver.";
- }
- void RestoreSolverStateFromBinaryProto(const string& state_file) {
- LOG(FATAL) << "Should not be called on worker solver.";
- }
- void RestoreSolverStateFromHDF5(const string& state_file) {
- LOG(FATAL) << "Should not be called on worker solver.";
- }
+ DISABLE_COPY_AND_ASSIGN(Solver);
};
} // namespace caffe
diff --git a/include/caffe/syncedmem.hpp b/include/caffe/syncedmem.hpp
index 6474a696..317ce29a 100644
--- a/include/caffe/syncedmem.hpp
+++ b/include/caffe/syncedmem.hpp
@@ -56,14 +56,8 @@ inline void CaffeFreeHost(void* ptr, bool use_cuda) {
*/
class SyncedMemory {
public:
- SyncedMemory()
- : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED),
- own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false),
- gpu_device_(-1) {}
- explicit SyncedMemory(size_t size)
- : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED),
- own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false),
- gpu_device_(-1) {}
+ SyncedMemory();
+ explicit SyncedMemory(size_t size);
~SyncedMemory();
const void* cpu_data();
void set_cpu_data(void* data);
@@ -80,6 +74,8 @@ class SyncedMemory {
#endif
private:
+ void check_device();
+
void to_cpu();
void to_gpu();
void* cpu_ptr_;
@@ -89,7 +85,7 @@ class SyncedMemory {
bool own_cpu_data_;
bool cpu_malloc_use_cuda_;
bool own_gpu_data_;
- int gpu_device_;
+ int device_;
DISABLE_COPY_AND_ASSIGN(SyncedMemory);
}; // class SyncedMemory
diff --git a/include/caffe/util/math_functions.hpp b/include/caffe/util/math_functions.hpp
index 6f6d3fee..51068fe2 100644
--- a/include/caffe/util/math_functions.hpp
+++ b/include/caffe/util/math_functions.hpp
@@ -185,6 +185,11 @@ void caffe_gpu_add_scalar(const int N, const Dtype alpha, Dtype *X);
template <typename Dtype>
void caffe_gpu_scal(const int N, const Dtype alpha, Dtype *X);
+#ifndef CPU_ONLY
+template <typename Dtype>
+void caffe_gpu_scal(const int N, const Dtype alpha, Dtype* X, cudaStream_t str);
+#endif
+
template <typename Dtype>
void caffe_gpu_add(const int N, const Dtype* a, const Dtype* b, Dtype* y);
diff --git a/include/caffe/util/nccl.hpp b/include/caffe/util/nccl.hpp
new file mode 100644
index 00000000..e01fb745
--- /dev/null
+++ b/include/caffe/util/nccl.hpp
@@ -0,0 +1,37 @@
+#ifndef CAFFE_UTIL_NCCL_H_
+#define CAFFE_UTIL_NCCL_H_
+#ifdef USE_NCCL
+
+#include <nccl.h>
+
+#include "caffe/common.hpp"
+
+#define NCCL_CHECK(condition) \
+{ \
+ ncclResult_t result = condition; \
+ CHECK_EQ(result, ncclSuccess) << " " \
+ << ncclGetErrorString(result); \
+}
+
+namespace caffe {
+
+namespace nccl {
+
+template <typename Dtype> class dataType;
+
+template<> class dataType<float> {
+ public:
+ static const ncclDataType_t type = ncclFloat;
+};
+template<> class dataType<double> {
+ public:
+ static const ncclDataType_t type = ncclDouble;
+};
+
+} // namespace nccl
+
+} // namespace caffe
+
+#endif // end USE_NCCL
+
+#endif // CAFFE_UTIL_NCCL_H_
diff --git a/python/caffe/__init__.py b/python/caffe/__init__.py
index 35868a40..43a0c49b 100644
--- a/python/caffe/__init__.py
+++ b/python/caffe/__init__.py
@@ -1,5 +1,5 @@
-from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver
-from ._caffe import set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed
+from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver, NCCL, Timer
+from ._caffe import init_log, log, set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed, solver_count, set_solver_count, solver_rank, set_solver_rank, set_multiprocess, Layer, get_solver
from ._caffe import __version__
from .proto.caffe_pb2 import TRAIN, TEST
from .classifier import Classifier
diff --git a/python/caffe/_caffe.cpp b/python/caffe/_caffe.cpp
index bdee75ac..3589e476 100644
--- a/python/caffe/_caffe.cpp
+++ b/python/caffe/_caffe.cpp
@@ -51,6 +51,19 @@ const int NPY_DTYPE = NPY_FLOAT32;
void set_mode_cpu() { Caffe::set_mode(Caffe::CPU); }
void set_mode_gpu() { Caffe::set_mode(Caffe::GPU); }
+void InitLog(int level) {
+ FLAGS_logtostderr = 1;
+ FLAGS_minloglevel = level;
+ ::google::InitGoogleLogging("");
+ ::google::InstallFailureSignalHandler();
+}
+void InitLogInfo() {
+ InitLog(google::INFO);
+}
+void Log(const string& s) {
+ LOG(INFO) << s;
+}
+
void set_random_seed(unsigned int seed) { Caffe::set_random_seed(seed); }
// For convenience, check that input files can be opened, and raise an
@@ -254,12 +267,12 @@ bp::object BlobVec_add_blob(bp::tuple args, bp::dict kwargs) {
}
template<typename Dtype>
-class PythonCallback: public Solver<Dtype>::Callback {
+class SolverCallback: public Solver<Dtype>::Callback {
protected:
bp::object on_start_, on_gradients_ready_;
public:
- PythonCallback(bp::object on_start, bp::object on_gradients_ready)
+ SolverCallback(bp::object on_start, bp::object on_gradients_ready)
: on_start_(on_start), on_gradients_ready_(on_gradients_ready) { }
virtual void on_gradients_ready() {
on_gradients_ready_();
@@ -271,9 +284,61 @@ class PythonCallback: public Solver<Dtype>::Callback {
template<typename Dtype>
void Solver_add_callback(Solver<Dtype> * solver, bp::object on_start,
bp::object on_gradients_ready) {
- solver->add_callback(new PythonCallback<Dtype>(on_start, on_gradients_ready));
+ solver->add_callback(new SolverCallback<Dtype>(on_start, on_gradients_ready));
+}
+
+// Seems boost cannot call the base method directly
+void Solver_add_nccl(SGDSolver<Dtype>* solver
+#ifdef USE_NCCL
+ , NCCL<Dtype>* nccl
+#endif
+) {
+#ifdef USE_NCCL
+ solver->add_callback(nccl);
+#endif
}
+template<typename Dtype>
+class NetCallback: public Net<Dtype>::Callback {
+ public:
+ explicit NetCallback(bp::object run) : run_(run) {}
+
+ protected:
+ virtual void run(int layer) {
+ run_(layer);
+ }
+ bp::object run_;
+};
+void Net_before_forward(Net<Dtype>* net, bp::object run) {
+ net->add_before_forward(new NetCallback<Dtype>(run));
+}
+void Net_after_forward(Net<Dtype>* net, bp::object run) {
+ net->add_after_forward(new NetCallback<Dtype>(run));
+}
+void Net_before_backward(Net<Dtype>* net, bp::object run) {
+ net->add_before_backward(new NetCallback<Dtype>(run));
+}
+void Net_after_backward(Net<Dtype>* net, bp::object run) {
+ net->add_after_backward(new NetCallback<Dtype>(run));
+}
+
+void Net_add_nccl(Net<Dtype>* net
+#ifdef USE_NCCL
+ , NCCL<Dtype>* nccl
+#endif
+) {
+#ifdef USE_NCCL
+ net->add_after_backward(nccl);
+#endif
+}
+#ifndef USE_NCCL
+template<typename Dtype>
+class NCCL {
+ public:
+ NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid) {}
+};
+#endif
+
BOOST_PYTHON_MEMBER_FUNCTION_OVERLOADS(SolveOverloads, Solve, 0, 1);
BOOST_PYTHON_MODULE(_caffe) {
@@ -283,10 +348,18 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::scope().attr("__version__") = AS_STRING(CAFFE_VERSION);
// Caffe utility functions
+ bp::def("init_log", &InitLog);
+ bp::def("init_log", &InitLogInfo);
+ bp::def("log", &Log);
bp::def("set_mode_cpu", &set_mode_cpu);
bp::def("set_mode_gpu", &set_mode_gpu);
bp::def("set_random_seed", &set_random_seed);
bp::def("set_device", &Caffe::SetDevice);
+ bp::def("solver_count", &Caffe::solver_count);
+ bp::def("set_solver_count", &Caffe::set_solver_count);
+ bp::def("solver_rank", &Caffe::solver_rank);
+ bp::def("set_solver_rank", &Caffe::set_solver_rank);
+ bp::def("set_multiprocess", &Caffe::set_multiprocess);
bp::def("layer_type_list", &LayerRegistry<Dtype>::LayerTypeList);
@@ -330,7 +403,12 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::with_custodian_and_ward<1, 2, bp::with_custodian_and_ward<1, 3> >())
.def("save", &Net_Save)
.def("save_hdf5", &Net_SaveHDF5)
- .def("load_hdf5", &Net_LoadHDF5);
+ .def("load_hdf5", &Net_LoadHDF5)
+ .def("before_forward", &Net_before_forward)
+ .def("after_forward", &Net_after_forward)
+ .def("before_backward", &Net_before_backward)
+ .def("after_backward", &Net_after_backward)
+ .def("after_backward", &Net_add_nccl);
BP_REGISTER_SHARED_PTR_TO_PYTHON(Net<Dtype>);
bp::class_<Blob<Dtype>, shared_ptr<Blob<Dtype> >, boost::noncopyable>(
@@ -362,6 +440,10 @@ BOOST_PYTHON_MODULE(_caffe) {
.add_property("type", bp::make_function(&Layer<Dtype>::type));
BP_REGISTER_SHARED_PTR_TO_PYTHON(Layer<Dtype>);
+ bp::class_<SolverParameter>("SolverParameter", bp::no_init)
+ .add_property("max_iter", &SolverParameter::max_iter)
+ .add_property("display", &SolverParameter::display)
+ .add_property("layer_wise_reduce", &SolverParameter::layer_wise_reduce);
bp::class_<LayerParameter>("LayerParameter", bp::no_init);
bp::class_<Solver<Dtype>, shared_ptr<Solver<Dtype> >, boost::noncopyable>(
@@ -371,11 +453,14 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::return_internal_reference<>()))
.add_property("iter", &Solver<Dtype>::iter)
.def("add_callback", &Solver_add_callback<Dtype>)
+ .def("add_callback", &Solver_add_nccl)
.def("solve", static_cast<void (Solver<Dtype>::*)(const char*)>(
&Solver<Dtype>::Solve), SolveOverloads())
.def("step", &Solver<Dtype>::Step)
.def("restore", &Solver<Dtype>::Restore)
- .def("snapshot", &Solver<Dtype>::Snapshot);
+ .def("snapshot", &Solver<Dtype>::Snapshot)
+ .add_property("param", bp::make_function(&Solver<Dtype>::param,
+ bp::return_value_policy<bp::copy_const_reference>()));
BP_REGISTER_SHARED_PTR_TO_PYTHON(Solver<Dtype>);
bp::class_<SGDSolver<Dtype>, bp::bases<Solver<Dtype> >,
@@ -419,6 +504,24 @@ BOOST_PYTHON_MODULE(_caffe) {
bp::class_<vector<bool> >("BoolVec")
.def(bp::vector_indexing_suite<vector<bool> >());
+ bp::class_<NCCL<Dtype>, shared_ptr<NCCL<Dtype> >,
+ boost::noncopyable>("NCCL",
+ bp::init<shared_ptr<Solver<Dtype> >, const string&>())
+#ifdef USE_NCCL
+ .def("new_uid", &NCCL<Dtype>::new_uid).staticmethod("new_uid")
+ .def("bcast", &NCCL<Dtype>::Broadcast)
+#endif
+ /* NOLINT_NEXT_LINE(whitespace/semicolon) */
+ ;
+ BP_REGISTER_SHARED_PTR_TO_PYTHON(NCCL<Dtype>);
+
+ bp::class_<Timer, shared_ptr<Timer>, boost::noncopyable>(
+ "Timer", bp::init<>())
+ .def("start", &Timer::Start)
+ .def("stop", &Timer::Stop)
+ .add_property("ms", &Timer::MilliSeconds);
+ BP_REGISTER_SHARED_PTR_TO_PYTHON(Timer);
+
// boost python expects a void (missing) return value, while import_array
// returns NULL for python3. import_array1() forces a void return value.
import_array1();
diff --git a/python/caffe/pycaffe.py b/python/caffe/pycaffe.py
index 5bae18d9..18803818 100644
--- a/python/caffe/pycaffe.py
+++ b/python/caffe/pycaffe.py
@@ -11,7 +11,7 @@ except:
import numpy as np
from ._caffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, \
- RMSPropSolver, AdaDeltaSolver, AdamSolver
+ RMSPropSolver, AdaDeltaSolver, AdamSolver, NCCL, Timer
import caffe.io
import six
diff --git a/python/train.py b/python/train.py
new file mode 100644
index 00000000..5897f5dc
--- /dev/null
+++ b/python/train.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+"""
+Trains a model using one or more GPUs.
+"""
+from multiprocessing import Process
+
+import caffe
+
+
+def train(
+ solver, # solver proto definition
+ snapshot, # solver snapshot to restore
+ gpus, # list of device ids
+ timing=False, # show timing info for compute and communications
+):
+ # NCCL uses a uid to identify a session
+ uid = caffe.NCCL.new_uid()
+
+ caffe.init_log()
+ caffe.log('Using devices %s' % str(gpus))
+
+ procs = []
+ for rank in range(len(gpus)):
+ p = Process(target=solve,
+ args=(solver, snapshot, gpus, timing, uid, rank))
+ p.daemon = True
+ p.start()
+ procs.append(p)
+ for p in procs:
+ p.join()
+
+
+def time(solver, nccl):
+ fprop = []
+ bprop = []
+ total = caffe.Timer()
+ allrd = caffe.Timer()
+ for _ in range(len(solver.net.layers)):
+ fprop.append(caffe.Timer())
+ bprop.append(caffe.Timer())
+ display = solver.param.display
+
+ def show_time():
+ if solver.iter % display == 0:
+ s = '\n'
+ for i in range(len(solver.net.layers)):
+ s += 'forw %3d %8s ' % (i, solver.net._layer_names[i])
+ s += ': %.2f\n' % fprop[i].ms
+ for i in range(len(solver.net.layers) - 1, -1, -1):
+ s += 'back %3d %8s ' % (i, solver.net._layer_names[i])
+ s += ': %.2f\n' % bprop[i].ms
+ s += 'solver total: %.2f\n' % total.ms
+ s += 'allreduce: %.2f\n' % allrd.ms
+ caffe.log(s)
+
+ solver.net.before_forward(lambda layer: fprop[layer].start())
+ solver.net.after_forward(lambda layer: fprop[layer].stop())
+ solver.net.before_backward(lambda layer: bprop[layer].start())
+ solver.net.after_backward(lambda layer: bprop[layer].stop())
+ solver.add_callback(lambda: total.start(), lambda: (total.stop(), allrd.start()))
+ solver.add_callback(nccl)
+ solver.add_callback(lambda: '', lambda: (allrd.stop(), show_time()))
+
+
+def solve(proto, snapshot, gpus, timing, uid, rank):
+ caffe.set_mode_gpu()
+ caffe.set_device(gpus[rank])
+ caffe.set_solver_count(len(gpus))
+ caffe.set_solver_rank(rank)
+ caffe.set_multiprocess(True)
+
+ solver = caffe.SGDSolver(proto)
+ if snapshot and len(snapshot) != 0:
+ solver.restore(snapshot)
+
+ nccl = caffe.NCCL(solver, uid)
+ nccl.bcast()
+
+ if timing and rank == 0:
+ time(solver, nccl)
+ else:
+ solver.add_callback(nccl)
+
+ if solver.param.layer_wise_reduce:
+ solver.net.after_backward(nccl)
+ solver.step(solver.param.max_iter)
+
+
+if __name__ == '__main__':
+ import argparse
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--solver", required=True, help="Solver proto definition.")
+ parser.add_argument("--snapshot", help="Solver snapshot to restore.")
+ parser.add_argument("--gpus", type=int, nargs='+', default=[0],
+ help="List of device ids.")
+ parser.add_argument("--timing", action='store_true', help="Show timing info.")
+ args = parser.parse_args()
+
+ train(args.solver, args.snapshot, args.gpus, args.timing)
diff --git a/src/caffe/blob.cpp b/src/caffe/blob.cpp
index 4a34e4c5..603e52f7 100644
--- a/src/caffe/blob.cpp
+++ b/src/caffe/blob.cpp
@@ -89,6 +89,12 @@ const Dtype* Blob<Dtype>::cpu_data() const {
template <typename Dtype>
void Blob<Dtype>::set_cpu_data(Dtype* data) {
CHECK(data);
+ // Make sure CPU and GPU sizes remain equal
+ size_t size = count_ * sizeof(Dtype);
+ if (data_->size() != size) {
+ data_.reset(new SyncedMemory(size));
+ diff_.reset(new SyncedMemory(size));
+ }
data_->set_cpu_data(data);
}
@@ -99,6 +105,18 @@ const Dtype* Blob<Dtype>::gpu_data() const {
}
template <typename Dtype>
+void Blob<Dtype>::set_gpu_data(Dtype* data) {
+ CHECK(data);
+ // Make sure CPU and GPU sizes remain equal
+ size_t size = count_ * sizeof(Dtype);
+ if (data_->size() != size) {
+ data_.reset(new SyncedMemory(size));
+ diff_.reset(new SyncedMemory(size));
+ }
+ data_->set_gpu_data(data);
+}
+
+template <typename Dtype>
const Dtype* Blob<Dtype>::cpu_diff() const {
CHECK(diff_);
return (const Dtype*)diff_->cpu_data();
diff --git a/src/caffe/common.cpp b/src/caffe/common.cpp
index dee68165..4f6f9bcc 100644
--- a/src/caffe/common.cpp
+++ b/src/caffe/common.cpp
@@ -53,7 +53,7 @@ void GlobalInit(int* pargc, char*** pargv) {
Caffe::Caffe()
: random_generator_(), mode_(Caffe::CPU),
- solver_count_(1), root_solver_(true) { }
+ solver_count_(1), solver_rank_(0), multiprocess_(false) { }
Caffe::~Caffe() { }
@@ -106,7 +106,8 @@ void* Caffe::RNG::generator() {
Caffe::Caffe()
: cublas_handle_(NULL), curand_generator_(NULL), random_generator_(),
- mode_(Caffe::CPU), solver_count_(1), root_solver_(true) {
+ mode_(Caffe::CPU),
+ solver_count_(1), solver_rank_(0), multiprocess_(false) {
// Try to create a cublas handler, and report an error if failed (but we will
// keep the program running as one might just want to run CPU code).
if (cublasCreate(&cublas_handle_) != CUBLAS_STATUS_SUCCESS) {
diff --git a/src/caffe/data_reader.cpp b/src/caffe/data_reader.cpp
deleted file mode 100644
index 9f019bbf..00000000
--- a/src/caffe/data_reader.cpp
+++ /dev/null
@@ -1,119 +0,0 @@
-#include <boost/thread.hpp>
-#include <map>
-#include <string>
-#include <vector>
-
-#include "caffe/common.hpp"
-#include "caffe/data_reader.hpp"
-#include "caffe/layers/data_layer.hpp"
-#include "caffe/proto/caffe.pb.h"
-
-namespace caffe {
-
-using boost::weak_ptr;
-
-map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
-static boost::mutex bodies_mutex_;
-
-DataReader::DataReader(const LayerParameter& param)
- : queue_pair_(new QueuePair( //
- param.data_param().prefetch() * param.data_param().batch_size())) {
- // Get or create a body
- boost::mutex::scoped_lock lock(bodies_mutex_);
- string key = source_key(param);
- weak_ptr<Body>& weak = bodies_[key];
- body_ = weak.lock();
- if (!body_) {
- body_.reset(new Body(param));
- bodies_[key] = weak_ptr<Body>(body_);
- }
- body_->new_queue_pairs_.push(queue_pair_);
-}
-
-DataReader::~DataReader() {
- string key = source_key(body_->param_);
- body_.reset();
- boost::mutex::scoped_lock lock(bodies_mutex_);
- if (bodies_[key].expired()) {
- bodies_.erase(key);
- }
-}
-
-//
-
-DataReader::QueuePair::QueuePair(int size) {
- // Initialize the free queue with requested number of datums
- for (int i = 0; i < size; ++i) {
- free_.push(new Datum());
- }
-}
-
-DataReader::QueuePair::~QueuePair() {
- Datum* datum;
- while (free_.try_pop(&datum)) {
- delete datum;
- }
- while (full_.try_pop(&datum)) {
- delete datum;
- }
-}
-
-//
-
-DataReader::Body::Body(const LayerParameter& param)
- : param_(param),
- new_queue_pairs_() {
- StartInternalThread();
-}
-
-DataReader::Body::~Body() {
- StopInternalThread();
-}
-
-void DataReader::Body::InternalThreadEntry() {
- shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend()));
- db->Open(param_.data_param().source(), db::READ);
- shared_ptr<db::Cursor> cursor(db->NewCursor());
- vector<shared_ptr<QueuePair> > qps;
- try {
- int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
-
- // To ensure deterministic runs, only start running once all solvers
- // are ready. But solvers need to peek on one item during initialization,
- // so read one item, then wait for the next solver.
- for (int i = 0; i < solver_count; ++i) {
- shared_ptr<QueuePair> qp(new_queue_pairs_.pop());
- read_one(cursor.get(), qp.get());
- qps.push_back(qp);
- }
- // Main loop
- while (!must_stop()) {
- for (int i = 0; i < solver_count; ++i) {
- read_one(cursor.get(), qps[i].get());
- }
- // Check no additional readers have been created. This can happen if
- // more than one net is trained at a time per process, whether single
- // or multi solver. It might also happen if two data layers have same
- // name and same source.
- CHECK_EQ(new_queue_pairs_.size(), 0);
- }
- } catch (boost::thread_interrupted&) {
- // Interrupted exception is expected on shutdown
- }
-}
-
-void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
- Datum* datum = qp->free_.pop();
- // TODO deserialize in-place instead of copy?
- datum->ParseFromString(cursor->value());
- qp->full_.push(datum);
-
- // go to the next iter
- cursor->Next();
- if (!cursor->valid()) {
- DLOG(INFO) << "Restarting data prefetching from start.";
- cursor->SeekToFirst();
- }
-}
-
-} // namespace caffe
diff --git a/src/caffe/internal_thread.cpp b/src/caffe/internal_thread.cpp
index 104884e0..11de4979 100644
--- a/src/caffe/internal_thread.cpp
+++ b/src/caffe/internal_thread.cpp
@@ -28,25 +28,27 @@ void InternalThread::StartInternalThread() {
Caffe::Brew mode = Caffe::mode();
int rand_seed = caffe_rng_rand();
int solver_count = Caffe::solver_count();
- bool root_solver = Caffe::root_solver();
+ int solver_rank = Caffe::solver_rank();
+ bool multiprocess = Caffe::multiprocess();
try {
thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
- rand_seed, solver_count, root_solver));
+ rand_seed, solver_count, solver_rank, multiprocess));
} catch (std::exception& e) {
LOG(FATAL) << "Thread exception: " << e.what();
}
}
void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed,
- int solver_count, bool root_solver) {
+ int solver_count, int solver_rank, bool multiprocess) {
#ifndef CPU_ONLY
CUDA_CHECK(cudaSetDevice(device));
#endif
Caffe::set_mode(mode);
Caffe::set_random_seed(rand_seed);
Caffe::set_solver_count(solver_count);
- Caffe::set_root_solver(root_solver);
+ Caffe::set_solver_rank(solver_rank);
+ Caffe::set_multiprocess(multiprocess);
InternalThreadEntry();
}
diff --git a/src/caffe/layer.cpp b/src/caffe/layer.cpp
index 3b912898..684ae88b 100644
--- a/src/caffe/layer.cpp
+++ b/src/caffe/layer.cpp
@@ -1,27 +1,7 @@
-#include <boost/thread.hpp>
#include "caffe/layer.hpp"
namespace caffe {
-template <typename Dtype>
-void Layer<Dtype>::InitMutex() {
- forward_mutex_.reset(new boost::mutex());
-}
-
-template <typename Dtype>
-void Layer<Dtype>::Lock() {
- if (IsShared()) {
- forward_mutex_->lock();
- }
-}
-
-template <typename Dtype>
-void Layer<Dtype>::Unlock() {
- if (IsShared()) {
- forward_mutex_->unlock();
- }
-}
-
INSTANTIATE_CLASS(Layer);
} // namespace caffe
diff --git a/src/caffe/layers/base_data_layer.cpp b/src/caffe/layers/base_data_layer.cpp
index 989319f1..93a798f3 100644
--- a/src/caffe/layers/base_data_layer.cpp
+++ b/src/caffe/layers/base_data_layer.cpp
@@ -36,9 +36,11 @@ template <typename Dtype>
BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer(
const LayerParameter& param)
: BaseDataLayer<Dtype>(param),
- prefetch_free_(), prefetch_full_() {
- for (int i = 0; i < PREFETCH_COUNT; ++i) {
- prefetch_free_.push(&prefetch_[i]);
+ prefetch_(param.data_param().prefetch()),
+ prefetch_free_(), prefetch_full_(), prefetch_current_() {
+ for (int i = 0; i < prefetch_.size(); ++i) {
+ prefetch_[i].reset(new Batch<Dtype>());
+ prefetch_free_.push(prefetch_[i].get());
}
}
@@ -46,22 +48,23 @@ template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
BaseDataLayer<Dtype>::LayerSetUp(bottom, top);
+
// Before starting the prefetch thread, we make cpu_data and gpu_data
// calls so that the prefetch thread does not accidentally make simultaneous
// cudaMalloc calls when the main thread is running. In some GPUs this
// seems to cause failures if we do not so.
- for (int i = 0; i < PREFETCH_COUNT; ++i) {
- prefetch_[i].data_.mutable_cpu_data();
+ for (int i = 0; i < prefetch_.size(); ++i) {
+ prefetch_[i]->data_.mutable_cpu_data();
if (this->output_labels_) {
- prefetch_[i].label_.mutable_cpu_data();
+ prefetch_[i]->label_.mutable_cpu_data();
}
}
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
- for (int i = 0; i < PREFETCH_COUNT; ++i) {
- prefetch_[i].data_.mutable_gpu_data();
+ for (int i = 0; i < prefetch_.size(); ++i) {
+ prefetch_[i]->data_.mutable_gpu_data();
if (this->output_labels_) {
- prefetch_[i].label_.mutable_gpu_data();
+ prefetch_[i]->label_.mutable_gpu_data();
}
}
}
@@ -88,6 +91,9 @@ void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
#ifndef CPU_ONLY
if (Caffe::mode() == Caffe::GPU) {
batch->data_.data().get()->async_gpu_push(stream);
+ if (this->output_labels_) {
+ batch->label_.data().get()->async_gpu_push(stream);
+ }
CUDA_CHECK(cudaStreamSynchronize(stream));
}
#endif
@@ -106,22 +112,18 @@ void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() {
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_cpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
- Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
+ if (prefetch_current_) {
+ prefetch_free_.push(prefetch_current_);
+ }
+ prefetch_current_ = prefetch_full_.pop("Waiting for data");
// Reshape to loaded data.
- top[0]->ReshapeLike(batch->data_);
- // Copy the data
- caffe_copy(batch->data_.count(), batch->data_.cpu_data(),
- top[0]->mutable_cpu_data());
- DLOG(INFO) << "Prefetch copied";
+ top[0]->ReshapeLike(prefetch_current_->data_);
+ top[0]->set_cpu_data(prefetch_current_->data_.mutable_cpu_data());
if (this->output_labels_) {
// Reshape to loaded labels.
- top[1]->ReshapeLike(batch->label_);
- // Copy the labels.
- caffe_copy(batch->label_.count(), batch->label_.cpu_data(),
- top[1]->mutable_cpu_data());
+ top[1]->ReshapeLike(prefetch_current_->label_);
+ top[1]->set_cpu_data(prefetch_current_->label_.mutable_cpu_data());
}
-
- prefetch_free_.push(batch);
}
#ifdef CPU_ONLY
diff --git a/src/caffe/layers/base_data_layer.cu b/src/caffe/layers/base_data_layer.cu
index 4056d36a..64c621a7 100644
--- a/src/caffe/layers/base_data_layer.cu
+++ b/src/caffe/layers/base_data_layer.cu
@@ -7,23 +7,18 @@ namespace caffe {
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::Forward_gpu(
const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) {
- Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty");
+ if (prefetch_current_) {
+ prefetch_free_.push(prefetch_current_);
+ }
+ prefetch_current_ = prefetch_full_.pop("Waiting for data");
// Reshape to loaded data.
- top[0]->ReshapeLike(batch->data_);
- // Copy the data
- caffe_copy(batch->data_.count(), batch->data_.gpu_data(),
- top[0]->mutable_gpu_data());
+ top[0]->ReshapeLike(prefetch_current_->data_);
+ top[0]->set_gpu_data(prefetch_current_->data_.mutable_gpu_data());
if (this->output_labels_) {
// Reshape to loaded labels.
- top[1]->ReshapeLike(batch->label_);
- // Copy the labels.
- caffe_copy(batch->label_.count(), batch->label_.gpu_data(),
- top[1]->mutable_gpu_data());
+ top[1]->ReshapeLike(prefetch_current_->label_);
+ top[1]->set_gpu_data(prefetch_current_->label_.mutable_gpu_data());
}
- // Ensure the copy is synchronous wrt the host, so that the next batch isn't
- // copied in meanwhile.
- CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
- prefetch_free_.push(batch);
}
INSTANTIATE_LAYER_GPU_FORWARD(BasePrefetchingDataLayer);
diff --git a/src/caffe/layers/data_layer.cpp b/src/caffe/layers/data_layer.cpp
index 66e6301f..0f1296bb 100644
--- a/src/caffe/layers/data_layer.cpp
+++ b/src/caffe/layers/data_layer.cpp
@@ -14,7 +14,10 @@ namespace caffe {
template <typename Dtype>
DataLayer<Dtype>::DataLayer(const LayerParameter& param)
: BasePrefetchingDataLayer<Dtype>(param),
- reader_(param) {
+ offset_() {
+ db_.reset(db::GetDB(param.data_param().backend()));
+ db_->Open(param.data_param().source(), db::READ);
+ cursor_.reset(db_->NewCursor());
}
template <typename Dtype>
@@ -27,7 +30,8 @@ void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
const int batch_size = this->layer_param_.data_param().batch_size();
// Read a data point, and use it to initialize the top blob.
- Datum& datum = *(reader_.full().peek());
+ Datum datum;
+ datum.ParseFromString(cursor_->value());
// Use data_transformer to infer the expected blob shape from datum.
vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
@@ -35,22 +39,44 @@ void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
// Reshape top[0] and prefetch_data according to the batch_size.
top_shape[0] = batch_size;
top[0]->Reshape(top_shape);
- for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
- this->prefetch_[i].data_.Reshape(top_shape);
+ for (int i = 0; i < this->prefetch_.size(); ++i) {
+ this->prefetch_[i]->data_.Reshape(top_shape);
}
- LOG(INFO) << "output data size: " << top[0]->num() << ","
+ LOG_IF(INFO, Caffe::root_solver())
+ << "output data size: " << top[0]->num() << ","
<< top[0]->channels() << "," << top[0]->height() << ","
<< top[0]->width();
// label
if (this->output_labels_) {
vector<int> label_shape(1, batch_size);
top[1]->Reshape(label_shape);
- for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
- this->prefetch_[i].label_.Reshape(label_shape);
+ for (int i = 0; i < this->prefetch_.size(); ++i) {
+ this->prefetch_[i]->label_.Reshape(label_shape);
}
}
}
+template <typename Dtype>
+bool DataLayer<Dtype>::Skip() {
+ int size = Caffe::solver_count();
+ int rank = Caffe::solver_rank();
+ bool keep = (offset_ % size) == rank ||
+ // In test mode, only rank 0 runs, so avoid skipping
+ this->layer_param_.phase() == TEST;
+ return !keep;
+}
+
+template<typename Dtype>
+void DataLayer<Dtype>::Next() {
+ cursor_->Next();
+ if (!cursor_->valid()) {
+ LOG_IF(INFO, Caffe::root_solver())
+ << "Restarting data prefetching from start.";
+ cursor_->SeekToFirst();
+ }
+ offset_++;
+}
+
// This function is called on prefetch thread
template<typename Dtype>
void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
@@ -61,41 +87,41 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) {
CPUTimer timer;
CHECK(batch->data_.count());
CHECK(this->transformed_data_.count());
-
- // Reshape according to the first datum of each batch
- // on single input batches allows for inputs of varying dimension.
const int batch_size = this->layer_param_.data_param().batch_size();
- Datum& datum = *(reader_.full().peek());
- // Use data_transformer to infer the expected blob shape from datum.
- vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
- this->transformed_data_.Reshape(top_shape);
- // Reshape batch according to the batch_size.
- top_shape[0] = batch_size;
- batch->data_.Reshape(top_shape);
-
- Dtype* top_data = batch->data_.mutable_cpu_data();
- Dtype* top_label = NULL; // suppress warnings about uninitialized variables
- if (this->output_labels_) {
- top_label = batch->label_.mutable_cpu_data();
- }
+ Datum datum;
for (int item_id = 0; item_id < batch_size; ++item_id) {
timer.Start();
- // get a datum
- Datum& datum = *(reader_.full().pop("Waiting for data"));
+ while (Skip()) {
+ Next();
+ }
+ datum.ParseFromString(cursor_->value());
read_time += timer.MicroSeconds();
- timer.Start();
+
+ if (item_id == 0) {
+ // Reshape according to the first datum of each batch
+ // on single input batches allows for inputs of varying dimension.
+ // Use data_transformer to infer the expected blob shape from datum.
+ vector<int> top_shape = this->data_transformer_->InferBlobShape(datum);
+ this->transformed_data_.Reshape(top_shape);
+ // Reshape batch according to the batch_size.
+ top_shape[0] = batch_size;
+ batch->data_.Reshape(top_shape);
+ }
+
// Apply data transformations (mirror, scale, crop...)
+ timer.Start();
int offset = batch->data_.offset(item_id);
+ Dtype* top_data = batch->data_.mutable_cpu_data();
this->transformed_data_.set_cpu_data(top_data + offset);
this->data_transformer_->Transform(datum, &(this->transformed_data_));
// Copy label.
if (this->output_labels_) {
+ Dtype* top_label = batch->label_.mutable_cpu_data();
top_label[item_id] = datum.label();
}
trans_time += timer.MicroSeconds();
-
- reader_.free().push(const_cast<Datum*>(&datum));
+ Next();
}
timer.Stop();
batch_timer.Stop();
diff --git a/src/caffe/layers/hdf5_data_layer.cpp b/src/caffe/layers/hdf5_data_layer.cpp
index c957451a..b9a071ce 100644
--- a/src/caffe/layers/hdf5_data_layer.cpp
+++ b/src/caffe/layers/hdf5_data_layer.cpp
@@ -125,27 +125,45 @@ void HDF5DataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom,
}
template <typename Dtype>
+bool HDF5DataLayer<Dtype>::Skip() {
+ int size = Caffe::solver_count();
+ int rank = Caffe::solver_rank();
+ bool keep = (offset_ % size) == rank ||
+ // In test mode, only rank 0 runs, so avoid skipping
+ this->layer_param_.phase() == TEST;
+ return !keep;
+}
+
+template<typename Dtype>
+void HDF5DataLayer<Dtype>::Next() {
+ if (++current_row_ == hdf_blobs_[0]->shape(0)) {
+ if (num_files_ > 1) {
+ ++current_file_;
+ if (current_file_ == num_files_) {
+ current_file_ = 0;
+ if (this->layer_param_.hdf5_data_param().shuffle()) {
+ std::random_shuffle(file_permutation_.begin(),
+ file_permutation_.end());
+ }
+ DLOG(INFO) << "Looping around to first file.";
+ }
+ LoadHDF5FileData(
+ hdf_filenames_[file_permutation_[current_file_]].c_str());
+ }
+ current_row_ = 0;
+ if (this->layer_param_.hdf5_data_param().shuffle())
+ std::random_shuffle(data_permutation_.begin(), data_permutation_.end());
+ }
+ offset_++;
+}
+
+template <typename Dtype>
void HDF5DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
const int batch_size = this->layer_param_.hdf5_data_param().batch_size();
- for (int i = 0; i < batch_size; ++i, ++current_row_) {
- if (current_row_ == hdf_blobs_[0]->shape(0)) {
- if (num_files_ > 1) {
- ++current_file_;
- if (current_file_ == num_files_) {
- current_file_ = 0;
- if (this->layer_param_.hdf5_data_param().shuffle()) {
- std::random_shuffle(file_permutation_.begin(),
- file_permutation_.end());
- }
- DLOG(INFO) << "Looping around to first file.";
- }
- LoadHDF5FileData(
- hdf_filenames_[file_permutation_[current_file_]].c_str());
- }
- current_row_ = 0;
- if (this->layer_param_.hdf5_data_param().shuffle())
- std::random_shuffle(data_permutation_.begin(), data_permutation_.end());
+ for (int i = 0; i < batch_size; ++i) {
+ while (Skip()) {
+ Next();
}
for (int j = 0; j < this->layer_param_.top_size(); ++j) {
int data_dim = top[j]->count() / top[j]->shape(0);
@@ -153,6 +171,7 @@ void HDF5DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom,
&hdf_blobs_[j]->cpu_data()[data_permutation_[current_row_]
* data_dim], &top[j]->mutable_cpu_data()[i * data_dim]);
}
+ Next();
}
}
diff --git a/src/caffe/layers/hdf5_data_layer.cu b/src/caffe/layers/hdf5_data_layer.cu
index 595d2230..33eebd41 100644
--- a/src/caffe/layers/hdf5_data_layer.cu
+++ b/src/caffe/layers/hdf5_data_layer.cu
@@ -17,24 +17,9 @@ template <typename Dtype>
void HDF5DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom,
const vector<Blob<Dtype>*>& top) {
const int batch_size = this->layer_param_.hdf5_data_param().batch_size();
- for (int i = 0; i < batch_size; ++i, ++current_row_) {
- if (current_row_ == hdf_blobs_[0]->shape(0)) {
- if (num_files_ > 1) {
- current_file_ += 1;
- if (current_file_ == num_files_) {
- current_file_ = 0;
- if (this->layer_param_.hdf5_data_param().shuffle()) {
- std::random_shuffle(file_permutation_.begin(),
- file_permutation_.end());
- }
- DLOG(INFO) << "Looping around to first file.";
- }
- LoadHDF5FileData(
- hdf_filenames_[file_permutation_[current_file_]].c_str());
- }
- current_row_ = 0;
- if (this->layer_param_.hdf5_data_param().shuffle())
- std::random_shuffle(data_permutation_.begin(), data_permutation_.end());
+ for (int i = 0; i < batch_size; ++i) {
+ while (Skip()) {
+ Next();
}
for (int j = 0; j < this->layer_param_.top_size(); ++j) {
int data_dim = top[j]->count() / top[j]->shape(0);
@@ -42,6 +27,7 @@ void HDF5DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom,
&hdf_blobs_[j]->cpu_data()[data_permutation_[current_row_]
* data_dim], &top[j]->mutable_gpu_data()[i * data_dim]);
}
+ Next();
}
}
diff --git a/src/caffe/layers/image_data_layer.cpp b/src/caffe/layers/image_data_layer.cpp
index 7ee7dc40..ec0fc5b0 100644
--- a/src/caffe/layers/image_data_layer.cpp
+++ b/src/caffe/layers/image_data_layer.cpp
@@ -54,6 +54,11 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const unsigned int prefetch_rng_seed = caffe_rng_rand();
prefetch_rng_.reset(new Caffe::RNG(prefetch_rng_seed));
ShuffleImages();
+ } else {
+ if (this->phase_ == TRAIN && Caffe::solver_rank() > 0 &&
+ this->layer_param_.image_data_param().rand_skip() == 0) {
+ LOG(WARNING) << "Shuffling or skipping recommended for multi-GPU";
+ }
}
LOG(INFO) << "A total of " << lines_.size() << " images.";
@@ -77,8 +82,8 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
const int batch_size = this->layer_param_.image_data_param().batch_size();
CHECK_GT(batch_size, 0) << "Positive batch size required";
top_shape[0] = batch_size;
- for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
- this->prefetch_[i].data_.Reshape(top_shape);
+ for (int i = 0; i < this->prefetch_.size(); ++i) {
+ this->prefetch_[i]->data_.Reshape(top_shape);
}
top[0]->Reshape(top_shape);
@@ -88,8 +93,8 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
// label
vector<int> label_shape(1, batch_size);
top[1]->Reshape(label_shape);
- for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
- this->prefetch_[i].label_.Reshape(label_shape);
+ for (int i = 0; i < this->prefetch_.size(); ++i) {
+ this->prefetch_[i]->label_.Reshape(label_shape);
}
}
diff --git a/src/caffe/layers/window_data_layer.cpp b/src/caffe/layers/window_data_layer.cpp
index 103dd4b6..1bf3760e 100644
--- a/src/caffe/layers/window_data_layer.cpp
+++ b/src/caffe/layers/window_data_layer.cpp
@@ -173,8 +173,8 @@ void WindowDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
CHECK_GT(crop_size, 0);
const int batch_size = this->layer_param_.window_data_param().batch_size();
top[0]->Reshape(batch_size, channels, crop_size, crop_size);
- for (int i = 0; i < this->PREFETCH_COUNT; ++i)
- this->prefetch_[i].data_.Reshape(
+ for (int i = 0; i < this->prefetch_.size(); ++i)
+ this->prefetch_[i]->data_.Reshape(
batch_size, channels, crop_size, crop_size);
LOG(INFO) << "output data size: " << top[0]->num() << ","
@@ -183,8 +183,8 @@ void WindowDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom,
// label
vector<int> label_shape(1, batch_size);
top[1]->Reshape(label_shape);
- for (int i = 0; i < this->PREFETCH_COUNT; ++i) {
- this->prefetch_[i].label_.Reshape(label_shape);
+ for (int i = 0; i < this->prefetch_.size(); ++i) {
+ this->prefetch_[i]->label_.Reshape(label_shape);
}
// data mean
diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp
index 644cb7e9..aa9e8f2f 100644
--- a/src/caffe/net.cpp
+++ b/src/caffe/net.cpp
@@ -22,16 +22,13 @@
namespace caffe {
template <typename Dtype>
-Net<Dtype>::Net(const NetParameter& param, const Net* root_net)
- : root_net_(root_net) {
+Net<Dtype>::Net(const NetParameter& param) {
Init(param);
}
template <typename Dtype>
Net<Dtype>::Net(const string& param_file, Phase phase,
- const int level, const vector<string>* stages,
- const Net* root_net)
- : root_net_(root_net) {
+ const int level, const vector<string>* stages) {
NetParameter param;
ReadNetParamsFromTextFileOrDie(param_file, &param);
// Set phase, stages and level
@@ -47,8 +44,6 @@ Net<Dtype>::Net(const string& param_file, Phase phase,
template <typename Dtype>
void Net<Dtype>::Init(const NetParameter& in_param) {
- CHECK(Caffe::root_solver() || root_net_)
- << "root_net_ needs to be set for all non-root solvers";
// Set phase from the state.
phase_ = in_param.state().phase();
// Filter layers based on their include/exclude rules and
@@ -74,9 +69,6 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
top_id_vecs_.resize(param.layer_size());
bottom_need_backward_.resize(param.layer_size());
for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) {
- // For non-root solvers, whether this layer is shared from root_net_.
- bool share_from_root = !Caffe::root_solver()
- && root_net_->layers_[layer_id]->ShareInParallel();
// Inherit phase from net if unset.
if (!param.layer(layer_id).has_phase()) {
param.mutable_layer(layer_id)->set_phase(phase_);
@@ -89,13 +81,7 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
<< "propagate_down param must be specified "
<< "either 0 or bottom_size times ";
}
- if (share_from_root) {
- LOG(INFO) << "Sharing layer " << layer_param.name() << " from root net";
- layers_.push_back(root_net_->layers_[layer_id]);
- layers_[layer_id]->SetShared(true);
- } else {
- layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
- }
+ layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param));
layer_names_.push_back(layer_param.name());
LOG_IF(INFO, Caffe::root_solver())
<< "Creating Layer " << layer_param.name();
@@ -134,19 +120,7 @@ void Net<Dtype>::Init(const NetParameter& in_param) {
}
}
// After this layer is connected, set it up.
- if (share_from_root) {
- // Set up size of top blobs using root_net_
- const vector<Blob<Dtype>*>& base_top = root_net_->top_vecs_[layer_id];
- const vector<Blob<Dtype>*>& this_top = this->top_vecs_[layer_id];
- for (int top_id = 0; top_id < base_top.size(); ++top_id) {
- this_top[top_id]->ReshapeLike(*base_top[top_id]);
- LOG(INFO) << "Created top blob " << top_id << " (shape: "
- << this_top[top_id]->shape_string() << ") for shared layer "
- << layer_param.name();
- }
- } else {
- layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
- }
+ layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]);
LOG_IF(INFO, Caffe::root_solver())
<< "Setting up " << layer_names_[layer_id];
for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) {
@@ -546,10 +520,15 @@ Dtype Net<Dtype>::ForwardFromTo(int start, int end) {
CHECK_LT(end, layers_.size());
Dtype loss = 0;
for (int i = start; i <= end; ++i) {
- // LOG(ERROR) << "Forwarding " << layer_names_[i];
+ for (int c = 0; c < before_forward_.size(); ++c) {
+ before_forward_[c]->run(i);
+ }
Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]);
loss += layer_loss;
if (debug_info_) { ForwardDebugInfo(i); }
+ for (int c = 0; c < after_forward_.size(); ++c) {
+ after_forward_[c]->run(i);
+ }
}
return loss;
}
@@ -591,11 +570,17 @@ void Net<Dtype>::BackwardFromTo(int start, int end) {
CHECK_GE(end, 0);
CHECK_LT(start, layers_.size());
for (int i = start; i >= end; --i) {
+ for (int c = 0; c < before_backward_.size(); ++c) {
+ before_backward_[c]->run(i);
+ }
if (layer_need_backward_[i]) {
layers_[i]->Backward(
top_vecs_[i], bottom_need_backward_[i], bottom_vecs_[i]);
if (debug_info_) { BackwardDebugInfo(i); }
}
+ for (int c = 0; c < after_backward_.size(); ++c) {
+ after_backward_[c]->run(i);
+ }
}
}
diff --git a/src/caffe/parallel.cpp b/src/caffe/parallel.cpp
index 5bc41c6a..d9433917 100644
--- a/src/caffe/parallel.cpp
+++ b/src/caffe/parallel.cpp
@@ -1,16 +1,15 @@
-#ifndef CPU_ONLY
+#ifdef USE_NCCL
+
#include <cuda_runtime.h>
-#endif
#include <glog/logging.h>
#include <stdio.h>
-
#include <sstream>
#include <string>
#include <vector>
-#include "boost/thread.hpp"
#include "caffe/caffe.hpp"
#include "caffe/parallel.hpp"
+#include "caffe/sgd_solvers.hpp"
namespace caffe {
@@ -68,15 +67,14 @@ static size_t total_size(const vector<Blob<Dtype>*>& params) {
template<typename Dtype>
Params<Dtype>::Params(shared_ptr<Solver<Dtype> > root_solver)
- : size_(total_size<Dtype>(root_solver->net()->learnable_params())),
- data_(),
- diff_() {
+ : size_(total_size<Dtype>(root_solver->net()->learnable_params())),
+ data_(),
+ diff_() {
}
template<typename Dtype>
GPUParams<Dtype>::GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device)
- : Params<Dtype>(root_solver) {
-#ifndef CPU_ONLY
+ : Params<Dtype>(root_solver) {
int initial_device;
CUDA_CHECK(cudaGetDevice(&initial_device));
@@ -86,358 +84,288 @@ GPUParams<Dtype>::GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device)
// Copy blob values
const vector<Blob<Dtype>*>& net =
- root_solver->net()->learnable_params();
+ root_solver->net()->learnable_params();
apply_buffers(net, data_, size_, copy);
CUDA_CHECK(cudaMalloc(&diff_, size_ * sizeof(Dtype)));
caffe_gpu_set(size_, Dtype(0), diff_);
CUDA_CHECK(cudaSetDevice(initial_device));
-#else
- NO_GPU;
-#endif
}
template<typename Dtype>
GPUParams<Dtype>::~GPUParams() {
-#ifndef CPU_ONLY
CUDA_CHECK(cudaFree(data_));
CUDA_CHECK(cudaFree(diff_));
-#endif
}
template<typename Dtype>
-void GPUParams<Dtype>::configure(Solver<Dtype>* solver) const {
+void GPUParams<Dtype>::Configure(Solver<Dtype>* solver) const {
const vector<Blob<Dtype>*>& net =
- solver->net()->learnable_params();
+ solver->net()->learnable_params();
apply_buffers(net, data_, size_, replace_gpu);
apply_buffers(net, diff_, size_, replace_gpu_diff);
}
-void DevicePair::compute(const vector<int> devices, vector<DevicePair>* pairs) {
-#ifndef CPU_ONLY
- vector<int> remaining(devices);
-
- // Depth for reduction tree
- int remaining_depth = static_cast<int>(ceil(log2(remaining.size())));
-
- // Group GPUs by board
- for (int d = 0; d < remaining_depth; ++d) {
- for (int i = 0; i < remaining.size(); ++i) {
- for (int j = i + 1; j < remaining.size(); ++j) {
- cudaDeviceProp a, b;
- CUDA_CHECK(cudaGetDeviceProperties(&a, remaining[i]));
- CUDA_CHECK(cudaGetDeviceProperties(&b, remaining[j]));
- if (a.isMultiGpuBoard && b.isMultiGpuBoard) {
- if (a.multiGpuBoardGroupID == b.multiGpuBoardGroupID) {
- pairs->push_back(DevicePair(remaining[i], remaining[j]));
- DLOG(INFO) << "GPU board: " << remaining[i] << ":" << remaining[j];
- remaining.erase(remaining.begin() + j);
- break;
- }
- }
- }
- }
- }
- ostringstream s;
- for (int i = 0; i < remaining.size(); ++i) {
- s << (i ? ", " : "") << remaining[i];
- }
- DLOG(INFO) << "GPUs paired by boards, remaining: " << s.str();
-
- // Group by P2P accessibility
- remaining_depth = ceil(log2(remaining.size()));
- for (int d = 0; d < remaining_depth; ++d) {
- for (int i = 0; i < remaining.size(); ++i) {
- for (int j = i + 1; j < remaining.size(); ++j) {
- int access;
- CUDA_CHECK(
- cudaDeviceCanAccessPeer(&access, remaining[i], remaining[j]));
- if (access) {
- pairs->push_back(DevicePair(remaining[i], remaining[j]));
- DLOG(INFO) << "P2P pair: " << remaining[i] << ":" << remaining[j];
- remaining.erase(remaining.begin() + j);
- break;
- }
- }
- }
- }
- s.str("");
- for (int i = 0; i < remaining.size(); ++i) {
- s << (i ? ", " : "") << remaining[i];
- }
- DLOG(INFO) << "GPUs paired by P2P access, remaining: " << s.str();
-
- // Group remaining
- remaining_depth = ceil(log2(remaining.size()));
- for (int d = 0; d < remaining_depth; ++d) {
- for (int i = 0; i < remaining.size(); ++i) {
- pairs->push_back(DevicePair(remaining[i], remaining[i + 1]));
- DLOG(INFO) << "Remaining pair: " << remaining[i] << ":"
- << remaining[i + 1];
- remaining.erase(remaining.begin() + i + 1);
- }
- }
+static int getDevice() {
+ int device = 0;
+ CUDA_CHECK(cudaGetDevice(&device));
+ return device;
+}
- // Should only be the parent node remaining
- CHECK_EQ(remaining.size(), 1);
+template<typename Dtype>
+NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver)
+ : GPUParams<Dtype>(solver, getDevice()),
+ comm_(), solver_(solver), barrier_() {
+ this->Configure(solver.get());
+ Init();
+}
- pairs->insert(pairs->begin(), DevicePair(-1, remaining[0]));
+template<typename Dtype>
+NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid)
+ : GPUParams<Dtype>(solver, getDevice()),
+ solver_(solver), barrier_() {
+ this->Configure(solver.get());
+ Caffe::set_multiprocess(true);
+ ncclUniqueId nccl_uid;
+ memcpy(&nccl_uid, &uid[0], NCCL_UNIQUE_ID_BYTES); // NOLINT(caffe/alt_fn)
+ NCCL_CHECK(ncclCommInitRank(&comm_,
+ Caffe::solver_count(),
+ nccl_uid,
+ Caffe::solver_rank()));
+ Init();
+}
- CHECK(pairs->size() == devices.size());
- for (int i = 0; i < pairs->size(); ++i) {
- CHECK((*pairs)[i].parent() != (*pairs)[i].device());
- for (int j = i + 1; j < pairs->size(); ++j) {
- CHECK((*pairs)[i].device() != (*pairs)[j].device());
- }
+template<typename Dtype>
+void NCCL<Dtype>::Init() {
+ if (solver_->param().layer_wise_reduce()) {
+ CUDA_CHECK(cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking));
}
-#else
- NO_GPU;
-#endif
}
-//
-
template<typename Dtype>
-P2PSync<Dtype>::P2PSync(shared_ptr<Solver<Dtype> > root_solver,
- P2PSync<Dtype>* parent, const SolverParameter& param)
- : GPUParams<Dtype>(root_solver, param.device_id()),
- parent_(parent),
- children_(),
- queue_(),
- initial_iter_(root_solver->iter()),
- solver_() {
-#ifndef CPU_ONLY
- int initial_device;
- CUDA_CHECK(cudaGetDevice(&initial_device));
- const int self = param.device_id();
- CUDA_CHECK(cudaSetDevice(self));
-
- if (parent == NULL) {
- solver_ = root_solver;
- } else {
- Caffe::set_root_solver(false);
- solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get()));
- Caffe::set_root_solver(true);
+NCCL<Dtype>::~NCCL() {
+ if (solver_->param().layer_wise_reduce()) {
+ CUDA_CHECK(cudaStreamDestroy(stream_));
}
- this->configure(solver_.get());
- solver_->add_callback(this);
-
- if (parent) {
- // Enable p2p access between devices
- const int peer = parent->solver_->param().device_id();
- int access;
- CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
- if (access) {
- CUDA_CHECK(cudaDeviceEnablePeerAccess(peer, 0));
- } else {
- LOG(INFO)<< "GPU " << self << " does not have p2p access to GPU " << peer;
- }
- // Allocate receiving buffer on parent
- CUDA_CHECK(cudaSetDevice(peer));
- CUDA_CHECK(cudaMalloc(&parent_grads_, size_ * sizeof(Dtype)));
- CUDA_CHECK(cudaSetDevice(self));
+ if (comm_) {
+ ncclCommDestroy(comm_);
}
-
- CUDA_CHECK(cudaSetDevice(initial_device));
-#else
- NO_GPU;
-#endif
}
template<typename Dtype>
-P2PSync<Dtype>::~P2PSync() {
-#ifndef CPU_ONLY
- int initial_device;
- CUDA_CHECK(cudaGetDevice(&initial_device));
- const int self = solver_->param().device_id();
- CUDA_CHECK(cudaSetDevice(self));
-
- if (parent_) {
- CUDA_CHECK(cudaFree(parent_grads_));
- const int peer = parent_->solver_->param().device_id();
- int access;
- CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer));
- if (access) {
- CUDA_CHECK(cudaDeviceDisablePeerAccess(peer));
- }
- }
-
- CUDA_CHECK(cudaSetDevice(initial_device));
-#endif
+boost::barrier* NCCL<Dtype>::barrier() {
+ return barrier_;
+}
+template<typename Dtype>
+void NCCL<Dtype>::set_barrier(boost::barrier* value) {
+ barrier_ = value;
}
template<typename Dtype>
-void P2PSync<Dtype>::InternalThreadEntry() {
- Caffe::SetDevice(solver_->param().device_id());
- CHECK(Caffe::root_solver());
- Caffe::set_root_solver(false);
- // See if there is a defined seed and reset random state if so
- if (solver_->param().random_seed() >= 0) {
- // Fetch random seed and modulate by device ID to make sure
- // everyone doesn't have the same seed. We seem to have some
- // solver instability if we have everyone with the same seed
- Caffe::set_random_seed(
- solver_->param().random_seed() + solver_->param().device_id());
+void NCCL<Dtype>::InitSingleProcess(vector<NCCL<Dtype>*>* nccls) {
+ ncclComm_t* comms = new ncclComm_t[nccls->size()];
+ int* gpu_list = new int[nccls->size()];
+ for (int i = 0; i < nccls->size(); ++i) {
+ gpu_list[i] = (*nccls)[i]->solver_->param().device_id();
+ }
+ NCCL_CHECK(ncclCommInitAll(comms, static_cast<int>(nccls->size()), gpu_list));
+ for (int i = 0; i < nccls->size(); ++i) {
+ (*nccls)[i]->comm_ = comms[i];
}
- solver_->Step(solver_->param().max_iter() - initial_iter_);
}
template<typename Dtype>
-void P2PSync<Dtype>::on_start() {
-#ifndef CPU_ONLY
-#ifdef DEBUG
- int device;
- CUDA_CHECK(cudaGetDevice(&device));
- CHECK(device == solver_->param().device_id());
-#else
-// CHECK(false);
-#endif
+string NCCL<Dtype>::new_uid() {
+ string uid;
+ uid.resize(NCCL_UNIQUE_ID_BYTES);
+ ncclUniqueId nccl_uid;
+ NCCL_CHECK(ncclGetUniqueId(&nccl_uid));
+ memcpy(&uid[0], &nccl_uid, NCCL_UNIQUE_ID_BYTES); // NOLINT(caffe/alt_fn)
+ return uid;
+}
- // Wait for update from parent
- if (parent_) {
- P2PSync<Dtype> *parent = queue_.pop();
- CHECK(parent == parent_);
+template<typename Dtype>
+void NCCL<Dtype>::Broadcast() {
+ if (barrier_) { // NULL in multi process case
+ barrier_->wait();
}
-
- // Update children
- for (int i = children_.size() - 1; i >= 0; i--) {
- Dtype* src = data_;
- Dtype* dst = children_[i]->data_;
-
-#ifdef DEBUG
- cudaPointerAttributes attributes;
- CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
- CHECK(attributes.device == device);
- CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
- CHECK(attributes.device == children_[i]->solver_->param().device_id());
-#endif
-
- CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype),
- cudaMemcpyDeviceToDevice, cudaStreamDefault));
- CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
- children_[i]->queue_.push(this);
+ NCCL_CHECK(ncclBcast(data_, static_cast<int>(size_),
+ nccl::dataType<Dtype>::type, 0,
+ comm_, cudaStreamDefault));
+ if (barrier_) {
+ barrier_->wait();
}
-#endif
}
template<typename Dtype>
-void P2PSync<Dtype>::on_gradients_ready() {
-#ifndef CPU_ONLY
+void NCCL<Dtype>::run(int layer) {
+ CHECK(solver_->param().layer_wise_reduce());
+ vector<shared_ptr<Blob<Dtype> > >& blobs =
+ solver_->net()->layers()[layer]->blobs();
#ifdef DEBUG
- int device;
- CUDA_CHECK(cudaGetDevice(&device));
- CHECK(device == solver_->param().device_id());
+ // Assert blobs are contiguous to reduce in one step (e.g. bias often small)
+ for (int i = 1; i < blobs.size(); ++i) {
+ CHECK_EQ(blobs[i - 1]->gpu_diff() + blobs[i - 1]->count(),
+ blobs[i + 0]->gpu_diff());
+ }
#endif
+ if (blobs.size() > 0) {
+ // Make sure default stream is done computing gradients. Could be
+ // replaced by cudaEventRecord+cudaStreamWaitEvent to avoid
+ // blocking the default stream, but it's actually slower.
+ CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
- // Sum children gradients as they appear in the queue
- for (int i = 0; i < children_.size(); ++i) {
- P2PSync<Dtype> *child = queue_.pop();
- Dtype* src = child->parent_grads_;
- Dtype* dst = diff_;
-
-#ifdef DEBUG
- bool ok = false;
- for (int j = 0; j < children_.size(); ++j) {
- if (child == children_[j]) {
- ok = true;
- }
+ // Reduce asynchronously
+ int size = 0;
+ for (int i = 0; i < blobs.size(); ++i) {
+ size += blobs[i]->count();
}
- CHECK(ok);
- cudaPointerAttributes attributes;
- CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
- CHECK(attributes.device == device);
- CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
- CHECK(attributes.device == device);
-#endif
-
- caffe_gpu_add(size_, src, dst, dst);
+ if (barrier_) { // NULL in multi process case
+ barrier_->wait();
+ }
+ NCCL_CHECK(ncclAllReduce(blobs[0]->mutable_gpu_diff(),
+ blobs[0]->mutable_gpu_diff(),
+ size,
+ nccl::dataType<Dtype>::type,
+ ncclSum, comm_, stream_));
+ caffe_gpu_scal(size, (Dtype) 1.0 / Caffe::solver_count(),
+ blobs[0]->mutable_gpu_diff(), stream_);
}
+}
- // Send gradients to parent
- if (parent_) {
- Dtype* src = diff_;
- Dtype* dst = parent_grads_;
-
-#ifdef DEBUG
- cudaPointerAttributes attributes;
- CUDA_CHECK(cudaPointerGetAttributes(&attributes, src));
- CHECK(attributes.device == device);
- CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst));
- CHECK(attributes.device == parent_->solver_->param().device_id());
-#endif
-
- CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype), //
- cudaMemcpyDeviceToDevice, cudaStreamDefault));
- CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault));
- parent_->queue_.push(this);
+template<typename Dtype>
+void NCCL<Dtype>::on_gradients_ready() {
+ if (solver_->param().layer_wise_reduce()) {
+ CHECK_EQ(solver_->net()->params().size(),
+ solver_->net()->learnable_params().size())
+ << "Layer-wise reduce is not supported for nets with shared weights.";
+
+ // Make sure reduction is done before applying gradients
+ CUDA_CHECK(cudaStreamSynchronize(stream_));
} else {
- // Loss functions divide gradients by the batch size, so to compensate
- // for split batch, the root solver divides by number of solvers.
- caffe_gpu_scal(size_, Dtype(1.0 / Caffe::solver_count()), diff_);
+ if (barrier_) { // NULL in multi process case
+ barrier_->wait();
+ }
+ NCCL_CHECK(ncclAllReduce(diff_, diff_, static_cast<int>(size_),
+ nccl::dataType<Dtype>::type, ncclSum, comm_,
+ cudaStreamDefault));
+ caffe_gpu_scal(static_cast<int>(size_),
+ (Dtype) 1.0 / Caffe::solver_count(), diff_);
}
-#endif
}
template<typename Dtype>
-void P2PSync<Dtype>::Prepare(const vector<int>& gpus,
- vector<shared_ptr<P2PSync<Dtype> > >* syncs) {
- // Pair devices for map-reduce synchronization
- vector<DevicePair> pairs;
- DevicePair::compute(gpus, &pairs);
- ostringstream s;
- for (int i = 1; i < pairs.size(); ++i) {
- s << (i == 1 ? "" : ", ") << pairs[i].parent() << ":" << pairs[i].device();
+class Worker : public InternalThread {
+ public:
+ explicit Worker(shared_ptr<Solver<Dtype> > rank0, int device,
+ boost::barrier* barrier, vector<NCCL<Dtype>*>* nccls,
+ const char* restore)
+ : rank0_(rank0), device_(device), barrier_(barrier),
+ nccls_(nccls), restore_(restore) {
}
- LOG(INFO)<< "GPUs pairs " << s.str();
-
- SolverParameter param(solver_->param());
-
- // Build the GPU tree by finding the parent for each solver
- for (int attempts = 0; attempts < pairs.size(); ++attempts) {
- for (int i = 1; i < pairs.size(); ++i) {
- if (!syncs->at(i).get()) {
- P2PSync<Dtype>* parent = NULL;
- for (int j = 0; j < syncs->size(); ++j) {
- P2PSync<Dtype>* sync = j == 0 ? this : syncs->at(j).get();
- if (sync) {
- const SolverParameter& p = sync->solver()->param();
- if (p.device_id() == pairs[i].parent()) {
- parent = sync;
- }
- }
- }
- if (parent) {
- param.set_device_id(pairs[i].device());
- syncs->at(i).reset(new P2PSync<Dtype>(solver_, parent, param));
- parent->children_.push_back((P2PSync<Dtype>*) syncs->at(i).get());
- }
+ virtual ~Worker() {}
+
+ protected:
+ void InternalThreadEntry() {
+ // Create solver and install callbacks
+ SolverParameter param(rank0_->param());
+ param.set_device_id(device_);
+#ifdef DEBUG
+ int device;
+ CUDA_CHECK(cudaGetDevice(&device));
+ CHECK_EQ(device, device_);
+#endif
+ param.set_type(rank0_->type());
+ shared_ptr<Solver<Dtype> > s(SolverRegistry<Dtype>::CreateSolver(param));
+ CHECK_EQ(s->type(), rank0_->type());
+ if (restore_) {
+ // Could not make NCCL broadcast solver state, it seems to crash
+ // if called in a tight loop, regardless of barriers etc. so
+ // restore all solvers from file.
+ s->Restore(restore_);
+ }
+ NCCL<Dtype> nccl(s);
+ nccl.set_barrier(barrier_);
+ s->add_callback(&nccl);
+ if (s->param().layer_wise_reduce()) {
+ s->net()->add_after_backward(&nccl);
+ }
+ (*nccls_)[Caffe::solver_rank()] = &nccl;
+ // Wait for other threads
+ barrier_->wait();
+ // Wait for NCCL init
+ barrier_->wait();
+ // Broadcast rank 0 state
+ nccl.Broadcast();
+ // Solve
+ s->Step(param.max_iter() - s->iter());
+ barrier_->wait();
+#ifdef DEBUG
+ // Check all solvers have same state
+ SGDSolver<Dtype>* sa = static_cast<SGDSolver<Dtype>*>(rank0_.get());
+ SGDSolver<Dtype>* sb = static_cast<SGDSolver<Dtype>*>(s.get());
+ for (int h = 0; h < sa->history().size(); ++h) {
+ CUDA_CHECK(cudaSetDevice(sa->param().device_id()));
+ const Dtype* a = sa->history()[h]->cpu_data();
+ CUDA_CHECK(cudaSetDevice(sb->param().device_id()));
+ const Dtype* b = sb->history()[h]->cpu_data();
+ for (int v = 0; v < sa->history()[h]->count(); ++v) {
+ CHECK_DOUBLE_EQ(a[v], b[v]);
}
}
+#endif
}
-}
-
-template<typename Dtype>
-void P2PSync<Dtype>::Run(const vector<int>& gpus) {
- vector<shared_ptr<P2PSync<Dtype> > > syncs(gpus.size());
- Prepare(gpus, &syncs);
- LOG(INFO)<< "Starting Optimization";
+ shared_ptr<Solver<Dtype> > rank0_;
+ int device_;
+ boost::barrier* barrier_;
+ vector<NCCL<Dtype>*>* nccls_;
+ const char* restore_;
+};
- for (int i = 1; i < syncs.size(); ++i) {
- syncs[i]->StartInternalThread();
+template<typename Dtype>
+void NCCL<Dtype>::Run(const vector<int>& gpus, const char* restore) {
+ boost::barrier barrier(static_cast<int>(gpus.size()));
+ vector<NCCL<Dtype>*> nccls(gpus.size());
+ // Create workers
+ vector<shared_ptr<Worker<Dtype> > > workers(gpus.size());
+ for (int i = 1; i < gpus.size(); ++i) {
+ CUDA_CHECK(cudaSetDevice(gpus[i]));
+ Caffe::set_solver_rank(i);
+ Worker<Dtype>* w = new Worker<Dtype>(solver_, gpus[i], &barrier,
+ &nccls, restore);
+ w->StartInternalThread();
+ workers[i].reset(w);
}
-
- // Run root solver on current thread
+ CUDA_CHECK(cudaSetDevice(gpus[0]));
+ Caffe::set_solver_rank(0);
+ barrier_ = &barrier;
+ solver_->add_callback(this);
+ if (solver_->param().layer_wise_reduce()) {
+ solver_->net()->add_after_backward(this);
+ }
+ nccls[0] = this;
+ // Wait for workers
+ barrier.wait();
+ // Init NCCL
+ InitSingleProcess(&nccls);
+ barrier.wait();
+ // Run first solver on current thread
+ Broadcast();
solver_->Solve();
-
- for (int i = 1; i < syncs.size(); ++i) {
- syncs[i]->StopInternalThread();
+ barrier.wait(); // Hangs without it when running tests
+ // Wait for shutdown
+ for (int i = 1; i < gpus.size(); ++i) {
+ workers[i]->StopInternalThread();
}
}
INSTANTIATE_CLASS(Params);
INSTANTIATE_CLASS(GPUParams);
-INSTANTIATE_CLASS(P2PSync);
+INSTANTIATE_CLASS(Worker);
+INSTANTIATE_CLASS(NCCL);
} // namespace caffe
+
+#endif // USE_NCCL
diff --git a/src/caffe/proto/caffe.proto b/src/caffe/proto/caffe.proto
index 430a0dea..1c85f696 100644
--- a/src/caffe/proto/caffe.proto
+++ b/src/caffe/proto/caffe.proto
@@ -98,7 +98,7 @@ message NetParameter {
// NOTE
// Update the next available ID when you add a new SolverParameter field.
//
-// SolverParameter next available ID: 41 (last added: type)
+// SolverParameter next available ID: 42 (last added: layer_wise_reduce)
message SolverParameter {
//////////////////////////////////////////////////////////////////////////////
// Specifying the train and test networks
@@ -239,6 +239,9 @@ message SolverParameter {
}
// DEPRECATED: use type instead of solver_type
optional SolverType solver_type = 30 [default = SGD];
+
+ // Overlap compute and communication for data parallel training
+ optional bool layer_wise_reduce = 41 [default = true];
}
// A message that stores the solver snapshots
@@ -655,8 +658,8 @@ message DataParameter {
optional bool mirror = 6 [default = false];
// Force the encoded image to have 3 color channels
optional bool force_encoded_color = 9 [default = false];
- // Prefetch queue (Number of batches to prefetch to host memory, increase if
- // data access bandwidth varies).
+ // Prefetch queue (Increase if data feeding bandwidth varies, within the
+ // limit of device memory for GPU training)
optional uint32 prefetch = 10 [default = 4];
}
diff --git a/src/caffe/solver.cpp b/src/caffe/solver.cpp
index ece3913e..1c1a9e59 100644
--- a/src/caffe/solver.cpp
+++ b/src/caffe/solver.cpp
@@ -26,16 +26,14 @@ SolverAction::Enum Solver<Dtype>::GetRequestedAction() {
}
template <typename Dtype>
-Solver<Dtype>::Solver(const SolverParameter& param, const Solver* root_solver)
- : net_(), callbacks_(), root_solver_(root_solver),
- requested_early_exit_(false) {
+Solver<Dtype>::Solver(const SolverParameter& param)
+ : net_(), callbacks_(), requested_early_exit_(false) {
Init(param);
}
template <typename Dtype>
-Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver)
- : net_(), callbacks_(), root_solver_(root_solver),
- requested_early_exit_(false) {
+Solver<Dtype>::Solver(const string& param_file)
+ : net_(), callbacks_(), requested_early_exit_(false) {
SolverParameter param;
ReadSolverParamsFromTextFileOrDie(param_file, &param);
Init(param);
@@ -43,15 +41,13 @@ Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver)
template <typename Dtype>
void Solver<Dtype>::Init(const SolverParameter& param) {
- CHECK(Caffe::root_solver() || root_solver_)
- << "root_solver_ needs to be set for all non-root solvers";
LOG_IF(INFO, Caffe::root_solver()) << "Initializing solver from parameters: "
<< std::endl << param.DebugString();
param_ = param;
CHECK_GE(param_.average_loss(), 1) << "average_loss should be non-negative.";
CheckSnapshotWritePermissions();
- if (Caffe::root_solver() && param_.random_seed() >= 0) {
- Caffe::set_random_seed(param_.random_seed());
+ if (param_.random_seed() >= 0) {
+ Caffe::set_random_seed(param_.random_seed() + Caffe::solver_rank());
}
// Scaffolding code
InitTrainNet();
@@ -101,11 +97,7 @@ void Solver<Dtype>::InitTrainNet() {
net_state.MergeFrom(net_param.state());
net_state.MergeFrom(param_.train_state());
net_param.mutable_state()->CopyFrom(net_state);
- if (Caffe::root_solver()) {
- net_.reset(new Net<Dtype>(net_param));
- } else {
- net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get()));
- }
+ net_.reset(new Net<Dtype>(net_param));
}
template <typename Dtype>
@@ -180,12 +172,7 @@ void Solver<Dtype>::InitTestNets() {
net_params[i].mutable_state()->CopyFrom(net_state);
LOG(INFO)
<< "Creating test net (#" << i << ") specified by " << sources[i];
- if (Caffe::root_solver()) {
- test_nets_[i].reset(new Net<Dtype>(net_params[i]));
- } else {
- test_nets_[i].reset(new Net<Dtype>(net_params[i],
- root_solver_->test_nets_[i].get()));
- }
+ test_nets_[i].reset(new Net<Dtype>(net_params[i]));
test_nets_[i]->set_debug_info(param_.debug_info());
}
}
@@ -197,14 +184,16 @@ void Solver<Dtype>::Step(int iters) {
int average_loss = this->param_.average_loss();
losses_.clear();
smoothed_loss_ = 0;
+ iteration_timer_.Start();
while (iter_ < stop_iter) {
// zero-init the params
net_->ClearParamDiffs();
if (param_.test_interval() && iter_ % param_.test_interval() == 0
- && (iter_ > 0 || param_.test_initialization())
- && Caffe::root_solver()) {
- TestAll();
+ && (iter_ > 0 || param_.test_initialization())) {
+ if (Caffe::root_solver()) {
+ TestAll();
+ }
if (requested_early_exit_) {
// Break out of the while loop because stop was requested while testing.
break;
@@ -225,8 +214,13 @@ void Solver<Dtype>::Step(int iters) {
// average the loss across iterations for smoothed reporting
UpdateSmoothedLoss(loss, start_iter, average_loss);
if (display) {
+ float lapse = iteration_timer_.Seconds();
+ float per_s = (iter_ - iterations_last_) / (lapse ? lapse : 1);
LOG_IF(INFO, Caffe::root_solver()) << "Iteration " << iter_
- << ", loss = " << smoothed_loss_;
+ << " (" << per_s << " iter/s, " << lapse << "s/"
+ << param_.display() << " iters), loss = " << smoothed_loss_;
+ iteration_timer_.Start();
+ iterations_last_ = iter_;
const vector<Blob<Dtype>*>& result = net_->output_blobs();
int score_index = 0;
for (int j = 0; j < result.size(); ++j) {
diff --git a/src/caffe/solvers/adagrad_solver.cpp b/src/caffe/solvers/adagrad_solver.cpp
index e78eadca..d8107e1e 100644
--- a/src/caffe/solvers/adagrad_solver.cpp
+++ b/src/caffe/solvers/adagrad_solver.cpp
@@ -12,7 +12,6 @@ void adagrad_update_gpu(int N, Dtype* g, Dtype* h, Dtype delta,
template <typename Dtype>
void AdaGradSolver<Dtype>::ComputeUpdateValue(int param_id, Dtype rate) {
- CHECK(Caffe::root_solver());
const vector<Blob<Dtype>*>& net_params = this->net_->learnable_params();
const vector<float>& net_params_lr = this->net_->params_lr();
Dtype delta = this->param_.delta();
diff --git a/src/caffe/solvers/nesterov_solver.cpp b/src/caffe/solvers/nesterov_solver.cpp
index 23ab2d43..7c1fac1f 100644
--- a/src/caffe/solvers/nesterov_solver.cpp
+++ b/src/caffe/solvers/nesterov_solver.cpp
@@ -12,7 +12,6 @@ void nesterov_update_gpu(int N, Dtype* g, Dtype* h, Dtype momentum,
template <typename Dtype>
void NesterovSolver<Dtype>::ComputeUpdateValue(int param_id, Dtype rate) {
- CHECK(Caffe::root_solver());
const vector<Blob<Dtype>*>& net_params = this->net_->learnable_params();
const vector<float>& net_params_lr = this->net_->params_lr();
Dtype momentum = this->param_.momentum();
diff --git a/src/caffe/solvers/sgd_solver.cpp b/src/caffe/solvers/sgd_solver.cpp
index f30f316d..ad6abe54 100644
--- a/src/caffe/solvers/sgd_solver.cpp
+++ b/src/caffe/solvers/sgd_solver.cpp
@@ -100,10 +100,10 @@ void SGDSolver<Dtype>::ClipGradients() {
template <typename Dtype>
void SGDSolver<Dtype>::ApplyUpdate() {
- CHECK(Caffe::root_solver());
Dtype rate = GetLearningRate();
if (this->param_.display() && this->iter_ % this->param_.display() == 0) {
- LOG(INFO) << "Iteration " << this->iter_ << ", lr = " << rate;
+ LOG_IF(INFO, Caffe::root_solver()) << "Iteration " << this->iter_
+ << ", lr = " << rate;
}
ClipGradients();
for (int param_id = 0; param_id < this->net_->learnable_params().size();
diff --git a/src/caffe/syncedmem.cpp b/src/caffe/syncedmem.cpp
index 4d356417..88d9b785 100644
--- a/src/caffe/syncedmem.cpp
+++ b/src/caffe/syncedmem.cpp
@@ -3,26 +3,41 @@
#include "caffe/util/math_functions.hpp"
namespace caffe {
+SyncedMemory::SyncedMemory()
+ : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED),
+ own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false) {
+#ifndef CPU_ONLY
+#ifdef DEBUG
+ CUDA_CHECK(cudaGetDevice(&device_));
+#endif
+#endif
+}
+
+SyncedMemory::SyncedMemory(size_t size)
+ : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED),
+ own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false) {
+#ifndef CPU_ONLY
+#ifdef DEBUG
+ CUDA_CHECK(cudaGetDevice(&device_));
+#endif
+#endif
+}
SyncedMemory::~SyncedMemory() {
+ check_device();
if (cpu_ptr_ && own_cpu_data_) {
CaffeFreeHost(cpu_ptr_, cpu_malloc_use_cuda_);
}
#ifndef CPU_ONLY
if (gpu_ptr_ && own_gpu_data_) {
- int initial_device;
- cudaGetDevice(&initial_device);
- if (gpu_device_ != -1) {
- CUDA_CHECK(cudaSetDevice(gpu_device_));
- }
CUDA_CHECK(cudaFree(gpu_ptr_));
- cudaSetDevice(initial_device);
}
#endif // CPU_ONLY
}
inline void SyncedMemory::to_cpu() {
+ check_device();
switch (head_) {
case UNINITIALIZED:
CaffeMallocHost(&cpu_ptr_, size_, &cpu_malloc_use_cuda_);
@@ -49,10 +64,10 @@ inline void SyncedMemory::to_cpu() {
}
inline void SyncedMemory::to_gpu() {
+ check_device();
#ifndef CPU_ONLY
switch (head_) {
case UNINITIALIZED:
- CUDA_CHECK(cudaGetDevice(&gpu_device_));
CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
caffe_gpu_memset(size_, 0, gpu_ptr_);
head_ = HEAD_AT_GPU;
@@ -60,7 +75,6 @@ inline void SyncedMemory::to_gpu() {
break;
case HEAD_AT_CPU:
if (gpu_ptr_ == NULL) {
- CUDA_CHECK(cudaGetDevice(&gpu_device_));
CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
own_gpu_data_ = true;
}
@@ -77,11 +91,13 @@ inline void SyncedMemory::to_gpu() {
}
const void* SyncedMemory::cpu_data() {
+ check_device();
to_cpu();
return (const void*)cpu_ptr_;
}
void SyncedMemory::set_cpu_data(void* data) {
+ check_device();
CHECK(data);
if (own_cpu_data_) {
CaffeFreeHost(cpu_ptr_, cpu_malloc_use_cuda_);
@@ -92,6 +108,7 @@ void SyncedMemory::set_cpu_data(void* data) {
}
const void* SyncedMemory::gpu_data() {
+ check_device();
#ifndef CPU_ONLY
to_gpu();
return (const void*)gpu_ptr_;
@@ -102,16 +119,11 @@ const void* SyncedMemory::gpu_data() {
}
void SyncedMemory::set_gpu_data(void* data) {
+ check_device();
#ifndef CPU_ONLY
CHECK(data);
if (own_gpu_data_) {
- int initial_device;
- cudaGetDevice(&initial_device);
- if (gpu_device_ != -1) {
- CUDA_CHECK(cudaSetDevice(gpu_device_));
- }
CUDA_CHECK(cudaFree(gpu_ptr_));
- cudaSetDevice(initial_device);
}
gpu_ptr_ = data;
head_ = HEAD_AT_GPU;
@@ -122,12 +134,14 @@ void SyncedMemory::set_gpu_data(void* data) {
}
void* SyncedMemory::mutable_cpu_data() {
+ check_device();
to_cpu();
head_ = HEAD_AT_CPU;
return cpu_ptr_;
}
void* SyncedMemory::mutable_gpu_data() {
+ check_device();
#ifndef CPU_ONLY
to_gpu();
head_ = HEAD_AT_GPU;
@@ -140,9 +154,9 @@ void* SyncedMemory::mutable_gpu_data() {
#ifndef CPU_ONLY
void SyncedMemory::async_gpu_push(const cudaStream_t& stream) {
+ check_device();
CHECK(head_ == HEAD_AT_CPU);
if (gpu_ptr_ == NULL) {
- CUDA_CHECK(cudaGetDevice(&gpu_device_));
CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_));
own_gpu_data_ = true;
}
@@ -153,5 +167,20 @@ void SyncedMemory::async_gpu_push(const cudaStream_t& stream) {
}
#endif
+void SyncedMemory::check_device() {
+#ifndef CPU_ONLY
+#ifdef DEBUG
+ int device;
+ cudaGetDevice(&device);
+ CHECK(device == device_);
+ if (gpu_ptr_ && own_gpu_data_) {
+ cudaPointerAttributes attributes;
+ CUDA_CHECK(cudaPointerGetAttributes(&attributes, gpu_ptr_));
+ CHECK(attributes.device == device_);
+ }
+#endif
+#endif
+}
+
} // namespace caffe
diff --git a/src/caffe/test/test_data_layer.cpp b/src/caffe/test/test_data_layer.cpp
index 3e8d113d..3835af1f 100644
--- a/src/caffe/test/test_data_layer.cpp
+++ b/src/caffe/test/test_data_layer.cpp
@@ -105,6 +105,32 @@ class DataLayerTest : public MultiDeviceTest<TypeParam> {
}
}
+ void TestSkip() {
+ LayerParameter param;
+ param.set_phase(TRAIN);
+ DataParameter* data_param = param.mutable_data_param();
+ int batch_size = 5;
+ data_param->set_batch_size(batch_size);
+ data_param->set_source(filename_->c_str());
+ data_param->set_backend(backend_);
+ Caffe::set_solver_count(8);
+ for (int dev = 0; dev < Caffe::solver_count(); ++dev) {
+ Caffe::set_solver_rank(dev);
+ DataLayer<Dtype> layer(param);
+ layer.SetUp(blob_bottom_vec_, blob_top_vec_);
+ int label = dev;
+ for (int iter = 0; iter < 10; ++iter) {
+ layer.Forward(blob_bottom_vec_, blob_top_vec_);
+ for (int i = 0; i < batch_size; ++i) {
+ EXPECT_EQ(label % batch_size, blob_top_label_->cpu_data()[i]);
+ label += Caffe::solver_count();
+ }
+ }
+ }
+ Caffe::set_solver_count(1);
+ Caffe::set_solver_rank(0);
+ }
+
void TestReshape(DataParameter_DB backend) {
const int num_inputs = 5;
// Save data of varying shapes.
@@ -356,6 +382,11 @@ TYPED_TEST(DataLayerTest, TestReadLevelDB) {
this->TestRead();
}
+TYPED_TEST(DataLayerTest, TestSkipLevelDB) {
+ this->Fill(false, DataParameter_DB_LEVELDB);
+ this->TestSkip();
+}
+
TYPED_TEST(DataLayerTest, TestReshapeLevelDB) {
this->TestReshape(DataParameter_DB_LEVELDB);
}
@@ -396,6 +427,11 @@ TYPED_TEST(DataLayerTest, TestReadLMDB) {
this->TestRead();
}
+TYPED_TEST(DataLayerTest, TestSkipLMDB) {
+ this->Fill(false, DataParameter_DB_LMDB);
+ this->TestSkip();
+}
+
TYPED_TEST(DataLayerTest, TestReshapeLMDB) {
this->TestReshape(DataParameter_DB_LMDB);
}
diff --git a/src/caffe/test/test_gradient_based_solver.cpp b/src/caffe/test/test_gradient_based_solver.cpp
index 975a8f0f..6ad0d8f6 100644
--- a/src/caffe/test/test_gradient_based_solver.cpp
+++ b/src/caffe/test/test_gradient_based_solver.cpp
@@ -36,7 +36,9 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
string snapshot_prefix_;
shared_ptr<SGDSolver<Dtype> > solver_;
- shared_ptr<P2PSync<Dtype> > sync_;
+#ifdef USE_NCCL
+ shared_ptr<NCCL<Dtype> > nccl_;
+#endif
int seed_;
// Dimensions are determined by generate_sample_data.py
// TODO this is brittle and the hdf5 file should be checked instead.
@@ -85,6 +87,7 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
"lr_policy: 'fixed' "
"iter_size: " << iter_size << " "
"device_id: " << device_id << " "
+ "layer_wise_reduce: " << (!share_) << " "
"net_param { "
" name: 'TestNetwork' "
" layer { "
@@ -183,7 +186,7 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
}
Caffe::set_random_seed(this->seed_);
this->InitSolverFromProtoString(proto.str());
- if (from_snapshot != NULL) {
+ if (from_snapshot) {
this->solver_->Restore(from_snapshot);
for (int i = 0; i < this->solver_->iter(); ++i) {
this->solver_->net()->Forward();
@@ -202,9 +205,10 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
gpus.push_back(i);
}
Caffe::set_solver_count(gpus.size());
- this->sync_.reset(new P2PSync<Dtype>(
- this->solver_, NULL, this->solver_->param()));
- this->sync_->Run(gpus);
+#ifdef USE_NCCL
+ this->nccl_.reset(new NCCL<Dtype>(this->solver_));
+ this->nccl_->Run(gpus, from_snapshot);
+#endif
Caffe::set_solver_count(1);
}
if (snapshot) {
@@ -457,12 +461,28 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> {
const int kIterSize = 1;
// Test over all numbers of devices.
int available_devices = 1;
-#ifndef CPU_ONLY
+#ifdef USE_NCCL
if (Caffe::mode() == Caffe::GPU) {
CUDA_CHECK(cudaGetDeviceCount(&available_devices));
}
#endif
- for (int devices = 1; devices <= available_devices; ++devices) {
+ // Takes a while to test all sizes for each test so sparse
+ vector<int> sizes;
+ sizes.push_back(1);
+ if (available_devices >= 2) {
+ sizes.push_back(2);
+ }
+ if (available_devices >= 3) {
+ sizes.push_back(3);
+ }
+ if (available_devices >= 8) {
+ sizes.push_back(8);
+ }
+ if (available_devices >= 16) {
+ sizes.push_back(16);
+ }
+ for (int i = 0; i < sizes.size(); ++i) {
+ int devices = sizes[i];
// Configure batch size for single / multi device equivalence.
// Constant data is needed for multi device as for accumulation.
num_ = kNum * devices;
diff --git a/src/caffe/test/test_hdf5data_layer.cpp b/src/caffe/test/test_hdf5data_layer.cpp
index 8884ce95..68e10286 100644
--- a/src/caffe/test/test_hdf5data_layer.cpp
+++ b/src/caffe/test/test_hdf5data_layer.cpp
@@ -133,4 +133,34 @@ TYPED_TEST(HDF5DataLayerTest, TestRead) {
}
}
+TYPED_TEST(HDF5DataLayerTest, TestSkip) {
+ typedef typename TypeParam::Dtype Dtype;
+ LayerParameter param;
+ param.add_top("data");
+ param.add_top("label");
+
+ HDF5DataParameter* hdf5_data_param = param.mutable_hdf5_data_param();
+ int batch_size = 5;
+ hdf5_data_param->set_batch_size(batch_size);
+ hdf5_data_param->set_source(*(this->filename));
+
+ Caffe::set_solver_count(8);
+ for (int dev = 0; dev < Caffe::solver_count(); ++dev) {
+ Caffe::set_solver_rank(dev);
+
+ HDF5DataLayer<Dtype> layer(param);
+ layer.SetUp(this->blob_bottom_vec_, this->blob_top_vec_);
+ int label = dev;
+ for (int iter = 0; iter < 1; ++iter) {
+ layer.Forward(this->blob_bottom_vec_, this->blob_top_vec_);
+ for (int i = 0; i < batch_size; ++i) {
+ EXPECT_EQ(1 + label, this->blob_top_label_->cpu_data()[i]);
+ label = (label + Caffe::solver_count()) % (batch_size * 2);
+ }
+ }
+ }
+ Caffe::set_solver_count(1);
+ Caffe::set_solver_rank(0);
+}
+
} // namespace caffe
diff --git a/src/caffe/util/blocking_queue.cpp b/src/caffe/util/blocking_queue.cpp
index 058668fe..f69d2104 100644
--- a/src/caffe/util/blocking_queue.cpp
+++ b/src/caffe/util/blocking_queue.cpp
@@ -1,7 +1,6 @@
#include <boost/thread.hpp>
#include <string>
-#include "caffe/data_reader.hpp"
#include "caffe/layers/base_data_layer.hpp"
#include "caffe/parallel.hpp"
#include "caffe/util/blocking_queue.hpp"
@@ -88,9 +87,5 @@ size_t BlockingQueue<T>::size() const {
template class BlockingQueue<Batch<float>*>;
template class BlockingQueue<Batch<double>*>;
-template class BlockingQueue<Datum*>;
-template class BlockingQueue<shared_ptr<DataReader::QueuePair> >;
-template class BlockingQueue<P2PSync<float>*>;
-template class BlockingQueue<P2PSync<double>*>;
} // namespace caffe
diff --git a/src/caffe/util/db_lmdb.cpp b/src/caffe/util/db_lmdb.cpp
index fb1d4956..491a9bd0 100644
--- a/src/caffe/util/db_lmdb.cpp
+++ b/src/caffe/util/db_lmdb.cpp
@@ -32,7 +32,7 @@ void LMDB::Open(const string& source, Mode mode) {
MDB_CHECK(rc);
}
#endif
- LOG(INFO) << "Opened lmdb " << source;
+ LOG_IF(INFO, Caffe::root_solver()) << "Opened lmdb " << source;
}
LMDBCursor* LMDB::NewCursor() {
diff --git a/src/caffe/util/math_functions.cu b/src/caffe/util/math_functions.cu
index 4c587537..6d001026 100644
--- a/src/caffe/util/math_functions.cu
+++ b/src/caffe/util/math_functions.cu
@@ -91,6 +91,26 @@ void caffe_gpu_scal<double>(const int N, const double alpha, double *X) {
}
template <>
+void caffe_gpu_scal<float>(const int N, const float alpha, float* X,
+ cudaStream_t str) {
+ cudaStream_t initial_stream;
+ CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream));
+ CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str));
+ CUBLAS_CHECK(cublasSscal(Caffe::cublas_handle(), N, &alpha, X, 1));
+ CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream));
+}
+
+template <>
+void caffe_gpu_scal<double>(const int N, const double alpha, double* X,
+ cudaStream_t str) {
+ cudaStream_t initial_stream;
+ CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream));
+ CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str));
+ CUBLAS_CHECK(cublasDscal(Caffe::cublas_handle(), N, &alpha, X, 1));
+ CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream));
+}
+
+template <>
void caffe_gpu_axpby<float>(const int N, const float alpha, const float* X,
const float beta, float* Y) {
caffe_gpu_scal<float>(N, beta, Y);
diff --git a/tools/caffe.cpp b/tools/caffe.cpp
index 9bf4214a..3587d8aa 100644
--- a/tools/caffe.cpp
+++ b/tools/caffe.cpp
@@ -195,6 +195,7 @@ int train() {
// If the gpus flag is not provided, allow the mode and device to be set
// in the solver prototxt.
if (FLAGS_gpu.size() == 0
+ && solver_param.has_solver_mode()
&& solver_param.solver_mode() == caffe::SolverParameter_SolverMode_GPU) {
if (solver_param.has_device_id()) {
FLAGS_gpu = "" +
@@ -244,11 +245,15 @@ int train() {
CopyLayers(solver.get(), FLAGS_weights);
}
+ LOG(INFO) << "Starting Optimization";
if (gpus.size() > 1) {
- caffe::P2PSync<float> sync(solver, NULL, solver->param());
- sync.Run(gpus);
+#ifdef USE_NCCL
+ caffe::NCCL<float> nccl(solver);
+ nccl.Run(gpus, FLAGS_snapshot.size() > 0 ? FLAGS_snapshot.c_str() : NULL);
+#else
+ LOG(FATAL) << "Multi-GPU execution not available - rebuild with USE_NCCL";
+#endif
} else {
- LOG(INFO) << "Starting Optimization";
solver->Solve();
}
LOG(INFO) << "Optimization Done.";