diff options
author | Evan Shelhamer <shelhamer@imaginarynumber.net> | 2017-01-17 09:12:01 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-01-17 09:12:01 -0800 |
commit | 536851c00e44d545649648b372503f52a89c0499 (patch) | |
tree | d8f841db4fec3a92bf2cbe9a651b2bdf2d29159f | |
parent | 0a91794d85562eaa6064541a58a3588b5afb0e5f (diff) | |
parent | 5f28eb1147c1abb6e5e5c7cd282218679b0d531d (diff) | |
download | caffeonacl-536851c00e44d545649648b372503f52a89c0499.tar.gz caffeonacl-536851c00e44d545649648b372503f52a89c0499.tar.bz2 caffeonacl-536851c00e44d545649648b372503f52a89c0499.zip |
Merge pull request #4563 from cypof/nccl
adopt NVIDIA's NCCL for multi-GPU and switch interface to python
52 files changed, 1022 insertions, 883 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index da7142c9..3af394f7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ include(cmake/ConfigGen.cmake) # ---[ Options caffe_option(CPU_ONLY "Build Caffe without CUDA support" OFF) # TODO: rename to USE_CUDA caffe_option(USE_CUDNN "Build Caffe with cuDNN library support" ON IF NOT CPU_ONLY) +caffe_option(USE_NCCL "Build Caffe with NCCL library support" OFF) caffe_option(BUILD_SHARED_LIBS "Build shared libraries" ON) caffe_option(BUILD_python "Build Python wrapper" ON) set(python_version "2" CACHE STRING "Specify which Python version to use") @@ -328,6 +328,12 @@ ifeq ($(USE_CUDNN), 1) COMMON_FLAGS += -DUSE_CUDNN endif +# NCCL acceleration configuration +ifeq ($(USE_NCCL), 1) + LIBRARIES += nccl + COMMON_FLAGS += -DUSE_NCCL +endif + # configure IO libraries ifeq ($(USE_OPENCV), 1) COMMON_FLAGS += -DUSE_OPENCV diff --git a/Makefile.config.example b/Makefile.config.example index 07bed63a..541cf807 100644 --- a/Makefile.config.example +++ b/Makefile.config.example @@ -94,6 +94,10 @@ LIBRARY_DIRS := $(PYTHON_LIB) /usr/local/lib /usr/lib # INCLUDE_DIRS += $(shell brew --prefix)/include # LIBRARY_DIRS += $(shell brew --prefix)/lib +# NCCL acceleration switch (uncomment to build with NCCL) +# https://github.com/NVIDIA/nccl (last tested version: v1.2.3-1+cuda8.0) +# USE_NCCL := 1 + # Uncomment to use `pkg-config` to specify OpenCV library paths. # (Usually not necessary -- OpenCV libraries are normally installed in one of the above $LIBRARY_DIRS.) # USE_PKG_CONFIG := 1 diff --git a/cmake/Dependencies.cmake b/cmake/Dependencies.cmake index ae9ce8e4..ba28a128 100644 --- a/cmake/Dependencies.cmake +++ b/cmake/Dependencies.cmake @@ -67,6 +67,13 @@ if(NOT HAVE_CUDA) add_definitions(-DCPU_ONLY) endif() +if(USE_NCCL) + find_package(NCCL REQUIRED) + include_directories(SYSTEM ${NCCL_INCLUDE_DIR}) + list(APPEND Caffe_LINKER_LIBS ${NCCL_LIBRARIES}) + add_definitions(-DUSE_NCCL) +endif() + # ---[ OpenCV if(USE_OPENCV) find_package(OpenCV QUIET COMPONENTS core highgui imgproc imgcodecs) @@ -119,18 +126,18 @@ if(BUILD_python) find_package(NumPy 1.7.1) # Find the matching boost python implementation set(version ${PYTHONLIBS_VERSION_STRING}) - + STRING( REGEX REPLACE "[^0-9]" "" boost_py_version ${version} ) find_package(Boost 1.46 COMPONENTS "python-py${boost_py_version}") set(Boost_PYTHON_FOUND ${Boost_PYTHON-PY${boost_py_version}_FOUND}) - + while(NOT "${version}" STREQUAL "" AND NOT Boost_PYTHON_FOUND) STRING( REGEX REPLACE "([0-9.]+).[0-9]+" "\\1" version ${version} ) - + STRING( REGEX REPLACE "[^0-9]" "" boost_py_version ${version} ) find_package(Boost 1.46 COMPONENTS "python-py${boost_py_version}") set(Boost_PYTHON_FOUND ${Boost_PYTHON-PY${boost_py_version}_FOUND}) - + STRING( REGEX MATCHALL "([0-9.]+).[0-9]+" has_more_version ${version} ) if("${has_more_version}" STREQUAL "") break() diff --git a/cmake/Modules/FindNCCL.cmake b/cmake/Modules/FindNCCL.cmake new file mode 100644 index 00000000..c8845934 --- /dev/null +++ b/cmake/Modules/FindNCCL.cmake @@ -0,0 +1,26 @@ +set(NCCL_INC_PATHS + /usr/include + /usr/local/include + $ENV{NCCL_DIR}/include + ) + +set(NCCL_LIB_PATHS + /lib + /lib64 + /usr/lib + /usr/lib64 + /usr/local/lib + /usr/local/lib64 + $ENV{NCCL_DIR}/lib + ) + +find_path(NCCL_INCLUDE_DIR NAMES nccl.h PATHS ${NCCL_INC_PATHS}) +find_library(NCCL_LIBRARIES NAMES nccl PATHS ${NCCL_LIB_PATHS}) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(NCCL DEFAULT_MSG NCCL_INCLUDE_DIR NCCL_LIBRARIES) + +if (NCCL_FOUND) + message(STATUS "Found NCCL (include: ${NCCL_INCLUDE_DIR}, library: ${NCCL_LIBRARIES})") + mark_as_advanced(NCCL_INCLUDE_DIR NCCL_LIBRARIES) +endif () diff --git a/cmake/Summary.cmake b/cmake/Summary.cmake index ba025cf8..ed8c2526 100644 --- a/cmake/Summary.cmake +++ b/cmake/Summary.cmake @@ -117,6 +117,7 @@ function(caffe_print_configuration_summary) caffe_status(" USE_OPENCV : ${USE_OPENCV}") caffe_status(" USE_LEVELDB : ${USE_LEVELDB}") caffe_status(" USE_LMDB : ${USE_LMDB}") + caffe_status(" USE_NCCL : ${USE_NCCL}") caffe_status(" ALLOW_LMDB_NOLOCK : ${ALLOW_LMDB_NOLOCK}") caffe_status("") caffe_status("Dependencies:") diff --git a/include/caffe/blob.hpp b/include/caffe/blob.hpp index af360ac2..2f59471c 100644 --- a/include/caffe/blob.hpp +++ b/include/caffe/blob.hpp @@ -220,6 +220,7 @@ class Blob { void set_cpu_data(Dtype* data); const int* gpu_shape() const; const Dtype* gpu_data() const; + void set_gpu_data(Dtype* data); const Dtype* cpu_diff() const; const Dtype* gpu_diff() const; Dtype* mutable_cpu_data(); diff --git a/include/caffe/common.hpp b/include/caffe/common.hpp index 3c6a076e..4904d1d8 100644 --- a/include/caffe/common.hpp +++ b/include/caffe/common.hpp @@ -158,11 +158,14 @@ class Caffe { // Search from start_id to the highest possible device ordinal, // return the ordinal of the first available device. static int FindDevice(const int start_id = 0); - // Parallel training info + // Parallel training inline static int solver_count() { return Get().solver_count_; } inline static void set_solver_count(int val) { Get().solver_count_ = val; } - inline static bool root_solver() { return Get().root_solver_; } - inline static void set_root_solver(bool val) { Get().root_solver_ = val; } + inline static int solver_rank() { return Get().solver_rank_; } + inline static void set_solver_rank(int val) { Get().solver_rank_ = val; } + inline static bool multiprocess() { return Get().multiprocess_; } + inline static void set_multiprocess(bool val) { Get().multiprocess_ = val; } + inline static bool root_solver() { return Get().solver_rank_ == 0; } protected: #ifndef CPU_ONLY @@ -172,8 +175,11 @@ class Caffe { shared_ptr<RNG> random_generator_; Brew mode_; + + // Parallel training int solver_count_; - bool root_solver_; + int solver_rank_; + bool multiprocess_; private: // The private constructor to avoid duplicate instantiation. diff --git a/include/caffe/data_reader.hpp b/include/caffe/data_reader.hpp deleted file mode 100644 index 8ed5542c..00000000 --- a/include/caffe/data_reader.hpp +++ /dev/null @@ -1,82 +0,0 @@ -#ifndef CAFFE_DATA_READER_HPP_ -#define CAFFE_DATA_READER_HPP_ - -#include <map> -#include <string> -#include <vector> - -#include "caffe/common.hpp" -#include "caffe/internal_thread.hpp" -#include "caffe/util/blocking_queue.hpp" -#include "caffe/util/db.hpp" - -namespace caffe { - -/** - * @brief Reads data from a source to queues available to data layers. - * A single reading thread is created per source, even if multiple solvers - * are running in parallel, e.g. for multi-GPU training. This makes sure - * databases are read sequentially, and that each solver accesses a different - * subset of the database. Data is distributed to solvers in a round-robin - * way to keep parallel training deterministic. - */ -class DataReader { - public: - explicit DataReader(const LayerParameter& param); - ~DataReader(); - - inline BlockingQueue<Datum*>& free() const { - return queue_pair_->free_; - } - inline BlockingQueue<Datum*>& full() const { - return queue_pair_->full_; - } - - protected: - // Queue pairs are shared between a body and its readers - class QueuePair { - public: - explicit QueuePair(int size); - ~QueuePair(); - - BlockingQueue<Datum*> free_; - BlockingQueue<Datum*> full_; - - DISABLE_COPY_AND_ASSIGN(QueuePair); - }; - - // A single body is created per source - class Body : public InternalThread { - public: - explicit Body(const LayerParameter& param); - virtual ~Body(); - - protected: - void InternalThreadEntry(); - void read_one(db::Cursor* cursor, QueuePair* qp); - - const LayerParameter param_; - BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_; - - friend class DataReader; - - DISABLE_COPY_AND_ASSIGN(Body); - }; - - // A source is uniquely identified by its layer name + path, in case - // the same database is read from two different locations in the net. - static inline string source_key(const LayerParameter& param) { - return param.name() + ":" + param.data_param().source(); - } - - const shared_ptr<QueuePair> queue_pair_; - shared_ptr<Body> body_; - - static map<const string, boost::weak_ptr<DataReader::Body> > bodies_; - -DISABLE_COPY_AND_ASSIGN(DataReader); -}; - -} // namespace caffe - -#endif // CAFFE_DATA_READER_HPP_ diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp index 6a8c5a02..0ba67665 100644 --- a/include/caffe/internal_thread.hpp +++ b/include/caffe/internal_thread.hpp @@ -42,8 +42,8 @@ class InternalThread { bool must_stop(); private: - void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count, - bool root_solver); + void entry(int device, Caffe::Brew mode, int rand_seed, + int solver_count, int solver_rank, bool multiprocess); shared_ptr<boost::thread> thread_; }; diff --git a/include/caffe/layer.hpp b/include/caffe/layer.hpp index 10f353f9..30dbfd53 100644 --- a/include/caffe/layer.hpp +++ b/include/caffe/layer.hpp @@ -38,7 +38,7 @@ class Layer { * layer. */ explicit Layer(const LayerParameter& param) - : layer_param_(param), is_shared_(false) { + : layer_param_(param) { // Set phase and copy blobs (if there are any). phase_ = param.phase(); if (layer_param_.blobs_size() > 0) { @@ -66,7 +66,6 @@ class Layer { */ void SetUp(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { - InitMutex(); CheckBlobCounts(bottom, top); LayerSetUp(bottom, top); Reshape(bottom, top); @@ -93,30 +92,6 @@ class Layer { const vector<Blob<Dtype>*>& top) {} /** - * @brief Whether a layer should be shared by multiple nets during data - * parallelism. By default, all layers except for data layers should - * not be shared. data layers should be shared to ensure each worker - * solver access data sequentially during data parallelism. - */ - virtual inline bool ShareInParallel() const { return false; } - - /** @brief Return whether this layer is actually shared by other nets. - * If ShareInParallel() is true and using more than one GPU and the - * net has TRAIN phase, then this function is expected return true. - */ - inline bool IsShared() const { return is_shared_; } - - /** @brief Set whether this layer is actually shared by other nets - * If ShareInParallel() is true and using more than one GPU and the - * net has TRAIN phase, then is_shared should be set true. - */ - inline void SetShared(bool is_shared) { - CHECK(ShareInParallel() || !is_shared) - << type() << "Layer does not support sharing."; - is_shared_ = is_shared; - } - - /** * @brief Adjust the shapes of top blobs and internal buffers to accommodate * the shapes of the bottom blobs. * @@ -428,19 +403,6 @@ class Layer { } private: - /** Whether this layer is actually shared by other nets*/ - bool is_shared_; - - /** The mutex for sequential forward if this layer is shared */ - shared_ptr<boost::mutex> forward_mutex_; - - /** Initialize forward_mutex_ */ - void InitMutex(); - /** Lock forward_mutex_ if this layer is shared */ - void Lock(); - /** Unlock forward_mutex_ if this layer is shared */ - void Unlock(); - DISABLE_COPY_AND_ASSIGN(Layer); }; // class Layer @@ -450,8 +412,6 @@ class Layer { template <typename Dtype> inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { - // Lock during forward to ensure sequential forward - Lock(); Dtype loss = 0; Reshape(bottom, top); switch (Caffe::mode()) { @@ -482,7 +442,6 @@ inline Dtype Layer<Dtype>::Forward(const vector<Blob<Dtype>*>& bottom, default: LOG(FATAL) << "Unknown caffe mode."; } - Unlock(); return loss; } diff --git a/include/caffe/layers/base_data_layer.hpp b/include/caffe/layers/base_data_layer.hpp index 2c49b731..21d3ada5 100644 --- a/include/caffe/layers/base_data_layer.hpp +++ b/include/caffe/layers/base_data_layer.hpp @@ -67,16 +67,14 @@ class BasePrefetchingDataLayer : virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top); - // Prefetches batches (asynchronously if to GPU memory) - static const int PREFETCH_COUNT = 3; - protected: virtual void InternalThreadEntry(); virtual void load_batch(Batch<Dtype>* batch) = 0; - Batch<Dtype> prefetch_[PREFETCH_COUNT]; + vector<shared_ptr<Batch<Dtype> > > prefetch_; BlockingQueue<Batch<Dtype>*> prefetch_free_; BlockingQueue<Batch<Dtype>*> prefetch_full_; + Batch<Dtype>* prefetch_current_; Blob<Dtype> transformed_data_; }; diff --git a/include/caffe/layers/data_layer.hpp b/include/caffe/layers/data_layer.hpp index 6c361791..dec58180 100644 --- a/include/caffe/layers/data_layer.hpp +++ b/include/caffe/layers/data_layer.hpp @@ -4,7 +4,6 @@ #include <vector> #include "caffe/blob.hpp" -#include "caffe/data_reader.hpp" #include "caffe/data_transformer.hpp" #include "caffe/internal_thread.hpp" #include "caffe/layer.hpp" @@ -29,9 +28,13 @@ class DataLayer : public BasePrefetchingDataLayer<Dtype> { virtual inline int MaxTopBlobs() const { return 2; } protected: + void Next(); + bool Skip(); virtual void load_batch(Batch<Dtype>* batch); - DataReader reader_; + shared_ptr<db::DB> db_; + shared_ptr<db::Cursor> cursor_; + uint64_t offset_; }; } // namespace caffe diff --git a/include/caffe/layers/hdf5_data_layer.hpp b/include/caffe/layers/hdf5_data_layer.hpp index b04cf8e1..650a3fb0 100644 --- a/include/caffe/layers/hdf5_data_layer.hpp +++ b/include/caffe/layers/hdf5_data_layer.hpp @@ -23,7 +23,7 @@ template <typename Dtype> class HDF5DataLayer : public Layer<Dtype> { public: explicit HDF5DataLayer(const LayerParameter& param) - : Layer<Dtype>(param) {} + : Layer<Dtype>(param), offset_() {} virtual ~HDF5DataLayer(); virtual void LayerSetUp(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top); @@ -38,6 +38,9 @@ class HDF5DataLayer : public Layer<Dtype> { virtual inline int MinTopBlobs() const { return 1; } protected: + void Next(); + bool Skip(); + virtual void Forward_cpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top); virtual void Forward_gpu(const vector<Blob<Dtype>*>& bottom, @@ -55,6 +58,7 @@ class HDF5DataLayer : public Layer<Dtype> { std::vector<shared_ptr<Blob<Dtype> > > hdf_blobs_; std::vector<unsigned int> data_permutation_; std::vector<unsigned int> file_permutation_; + uint64_t offset_; }; } // namespace caffe diff --git a/include/caffe/layers/python_layer.hpp b/include/caffe/layers/python_layer.hpp index 66dbbdf1..10c4bfd0 100644 --- a/include/caffe/layers/python_layer.hpp +++ b/include/caffe/layers/python_layer.hpp @@ -21,8 +21,8 @@ class PythonLayer : public Layer<Dtype> { // Disallow PythonLayer in MultiGPU training stage, due to GIL issues // Details: https://github.com/BVLC/caffe/issues/2936 if (this->phase_ == TRAIN && Caffe::solver_count() > 1 - && !ShareInParallel()) { - LOG(FATAL) << "PythonLayer is not implemented in Multi-GPU training"; + && !Caffe::multiprocess()) { + LOG(FATAL) << "PythonLayer does not support CLI Multi-GPU, use train.py"; } self_.attr("param_str") = bp::str( this->layer_param_.python_param().param_str()); diff --git a/include/caffe/net.hpp b/include/caffe/net.hpp index 493bdf29..d3c9306e 100644 --- a/include/caffe/net.hpp +++ b/include/caffe/net.hpp @@ -23,10 +23,9 @@ namespace caffe { template <typename Dtype> class Net { public: - explicit Net(const NetParameter& param, const Net* root_net = NULL); + explicit Net(const NetParameter& param); explicit Net(const string& param_file, Phase phase, - const int level = 0, const vector<string>* stages = NULL, - const Net* root_net = NULL); + const int level = 0, const vector<string>* stages = NULL); virtual ~Net() {} /// @brief Initialize a network with a NetParameter. @@ -228,6 +227,31 @@ class Net { static bool StateMeetsRule(const NetState& state, const NetStateRule& rule, const string& layer_name); + // Invoked at specific points during an iteration + class Callback { + protected: + virtual void run(int layer) = 0; + + template <typename T> + friend class Net; + }; + const vector<Callback*>& before_forward() const { return before_forward_; } + void add_before_forward(Callback* value) { + before_forward_.push_back(value); + } + const vector<Callback*>& after_forward() const { return after_forward_; } + void add_after_forward(Callback* value) { + after_forward_.push_back(value); + } + const vector<Callback*>& before_backward() const { return before_backward_; } + void add_before_backward(Callback* value) { + before_backward_.push_back(value); + } + const vector<Callback*>& after_backward() const { return after_backward_; } + void add_after_backward(Callback* value) { + after_backward_.push_back(value); + } + protected: // Helpers for Init. /// @brief Append a new top blob to the net. @@ -306,9 +330,13 @@ class Net { size_t memory_used_; /// Whether to compute and display debug info for the net. bool debug_info_; - /// The root net that actually holds the shared layers in data parallelism - const Net* const root_net_; - DISABLE_COPY_AND_ASSIGN(Net); + // Callbacks + vector<Callback*> before_forward_; + vector<Callback*> after_forward_; + vector<Callback*> before_backward_; + vector<Callback*> after_backward_; + +DISABLE_COPY_AND_ASSIGN(Net); }; diff --git a/include/caffe/parallel.hpp b/include/caffe/parallel.hpp index 6c496c88..64bb48e6 100644 --- a/include/caffe/parallel.hpp +++ b/include/caffe/parallel.hpp @@ -1,8 +1,11 @@ #ifndef CAFFE_PARALLEL_HPP_ #define CAFFE_PARALLEL_HPP_ -#include <boost/date_time/posix_time/posix_time.hpp> +#ifdef USE_NCCL +#include <boost/thread.hpp> + +#include <string> #include <vector> #include "caffe/blob.hpp" @@ -13,6 +16,7 @@ #include "caffe/solver.hpp" #include "caffe/syncedmem.hpp" #include "caffe/util/blocking_queue.hpp" +#include "caffe/util/nccl.hpp" namespace caffe { @@ -51,7 +55,7 @@ class GPUParams : public Params<Dtype> { GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device); virtual ~GPUParams(); - void configure(Solver<Dtype>* solver) const; + void Configure(Solver<Dtype>* solver) const; protected: using Params<Dtype>::size_; @@ -59,58 +63,55 @@ class GPUParams : public Params<Dtype> { using Params<Dtype>::diff_; }; -class DevicePair { - public: - DevicePair(int parent, int device) - : parent_(parent), - device_(device) { - } - inline int parent() { - return parent_; - } - inline int device() { - return device_; - } - - // Group GPUs in pairs, by proximity depending on machine's topology - static void compute(const vector<int> devices, vector<DevicePair>* pairs); - - protected: - int parent_; - int device_; -}; - -// Synchronous data parallelism using map-reduce between local GPUs. template<typename Dtype> -class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback, - public InternalThread { +class NCCL : public GPUParams<Dtype>, + public Solver<Dtype>::Callback, + public Net<Dtype>::Callback { public: - explicit P2PSync(shared_ptr<Solver<Dtype> > root_solver, - P2PSync<Dtype>* parent, const SolverParameter& param); - virtual ~P2PSync(); - - inline const shared_ptr<Solver<Dtype> >& solver() const { - return solver_; - } - - void Run(const vector<int>& gpus); - void Prepare(const vector<int>& gpus, - vector<shared_ptr<P2PSync<Dtype> > >* syncs); - inline const int initial_iter() const { return initial_iter_; } + /** + * Single process version. + */ + explicit NCCL(shared_ptr<Solver<Dtype> > solver); + /** + * In multi-process settings, first create a NCCL id (new_uid), then + * pass it to each process to create connected instances. + */ + NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid); + ~NCCL(); + + boost::barrier* barrier(); + void set_barrier(boost::barrier* value); + + /** + * In single process settings, create instances without uids and + * call this to connect them. + */ + static void InitSingleProcess(vector<NCCL<Dtype>*>* nccls); + + static string new_uid(); + + /** + * Broadcast weights from rank 0 other solvers. + */ + void Broadcast(); + + /** + * Single process multi-GPU. + */ + void Run(const vector<int>& gpus, const char* restore); protected: - void on_start(); + void Init(); + void on_start() {} + void run(int layer); // Net callback void on_gradients_ready(); - void InternalThreadEntry(); + ncclComm_t comm_; + cudaStream_t stream_; - P2PSync<Dtype>* parent_; - vector<P2PSync<Dtype>*> children_; - BlockingQueue<P2PSync<Dtype>*> queue_; - const int initial_iter_; - Dtype* parent_grads_; shared_ptr<Solver<Dtype> > solver_; - + // Should not be necessary, https://github.com/NVIDIA/nccl/issues/37 + boost::barrier* barrier_; using Params<Dtype>::size_; using Params<Dtype>::data_; using Params<Dtype>::diff_; @@ -118,4 +119,5 @@ class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback, } // namespace caffe -#endif +#endif // USE_NCCL +#endif // header diff --git a/include/caffe/solver.hpp b/include/caffe/solver.hpp index eafcee32..a28d8cb8 100644 --- a/include/caffe/solver.hpp +++ b/include/caffe/solver.hpp @@ -6,6 +6,7 @@ #include "caffe/net.hpp" #include "caffe/solver_factory.hpp" +#include "caffe/util/benchmark.hpp" namespace caffe { @@ -40,9 +41,8 @@ typedef boost::function<SolverAction::Enum()> ActionCallback; template <typename Dtype> class Solver { public: - explicit Solver(const SolverParameter& param, - const Solver* root_solver = NULL); - explicit Solver(const string& param_file, const Solver* root_solver = NULL); + explicit Solver(const SolverParameter& param); + explicit Solver(const string& param_file); void Init(const SolverParameter& param); void InitTrainNet(); void InitTestNets(); @@ -72,7 +72,7 @@ class Solver { inline const vector<shared_ptr<Net<Dtype> > >& test_nets() { return test_nets_; } - int iter() { return iter_; } + int iter() const { return iter_; } // Invoked at specific points during an iteration class Callback { @@ -118,10 +118,6 @@ class Solver { vector<Dtype> losses_; Dtype smoothed_loss_; - // The root solver that holds root nets (actually containing shared layers) - // in data parallelism - const Solver* const root_solver_; - // A function that can be set by a client of the Solver to provide indication // that it wants a snapshot saved and/or to exit early. ActionCallback action_request_function_; @@ -129,31 +125,11 @@ class Solver { // True iff a request to stop early was received. bool requested_early_exit_; - DISABLE_COPY_AND_ASSIGN(Solver); -}; + // Timing information, handy to tune e.g. nbr of GPUs + Timer iteration_timer_; + float iterations_last_; -/** - * @brief Solver that only computes gradients, used as worker - * for multi-GPU training. - */ -template <typename Dtype> -class WorkerSolver : public Solver<Dtype> { - public: - explicit WorkerSolver(const SolverParameter& param, - const Solver<Dtype>* root_solver = NULL) - : Solver<Dtype>(param, root_solver) {} - - protected: - void ApplyUpdate() {} - void SnapshotSolverState(const string& model_filename) { - LOG(FATAL) << "Should not be called on worker solver."; - } - void RestoreSolverStateFromBinaryProto(const string& state_file) { - LOG(FATAL) << "Should not be called on worker solver."; - } - void RestoreSolverStateFromHDF5(const string& state_file) { - LOG(FATAL) << "Should not be called on worker solver."; - } + DISABLE_COPY_AND_ASSIGN(Solver); }; } // namespace caffe diff --git a/include/caffe/syncedmem.hpp b/include/caffe/syncedmem.hpp index 6474a696..317ce29a 100644 --- a/include/caffe/syncedmem.hpp +++ b/include/caffe/syncedmem.hpp @@ -56,14 +56,8 @@ inline void CaffeFreeHost(void* ptr, bool use_cuda) { */ class SyncedMemory { public: - SyncedMemory() - : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED), - own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false), - gpu_device_(-1) {} - explicit SyncedMemory(size_t size) - : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED), - own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false), - gpu_device_(-1) {} + SyncedMemory(); + explicit SyncedMemory(size_t size); ~SyncedMemory(); const void* cpu_data(); void set_cpu_data(void* data); @@ -80,6 +74,8 @@ class SyncedMemory { #endif private: + void check_device(); + void to_cpu(); void to_gpu(); void* cpu_ptr_; @@ -89,7 +85,7 @@ class SyncedMemory { bool own_cpu_data_; bool cpu_malloc_use_cuda_; bool own_gpu_data_; - int gpu_device_; + int device_; DISABLE_COPY_AND_ASSIGN(SyncedMemory); }; // class SyncedMemory diff --git a/include/caffe/util/math_functions.hpp b/include/caffe/util/math_functions.hpp index 6f6d3fee..51068fe2 100644 --- a/include/caffe/util/math_functions.hpp +++ b/include/caffe/util/math_functions.hpp @@ -185,6 +185,11 @@ void caffe_gpu_add_scalar(const int N, const Dtype alpha, Dtype *X); template <typename Dtype> void caffe_gpu_scal(const int N, const Dtype alpha, Dtype *X); +#ifndef CPU_ONLY +template <typename Dtype> +void caffe_gpu_scal(const int N, const Dtype alpha, Dtype* X, cudaStream_t str); +#endif + template <typename Dtype> void caffe_gpu_add(const int N, const Dtype* a, const Dtype* b, Dtype* y); diff --git a/include/caffe/util/nccl.hpp b/include/caffe/util/nccl.hpp new file mode 100644 index 00000000..e01fb745 --- /dev/null +++ b/include/caffe/util/nccl.hpp @@ -0,0 +1,37 @@ +#ifndef CAFFE_UTIL_NCCL_H_ +#define CAFFE_UTIL_NCCL_H_ +#ifdef USE_NCCL + +#include <nccl.h> + +#include "caffe/common.hpp" + +#define NCCL_CHECK(condition) \ +{ \ + ncclResult_t result = condition; \ + CHECK_EQ(result, ncclSuccess) << " " \ + << ncclGetErrorString(result); \ +} + +namespace caffe { + +namespace nccl { + +template <typename Dtype> class dataType; + +template<> class dataType<float> { + public: + static const ncclDataType_t type = ncclFloat; +}; +template<> class dataType<double> { + public: + static const ncclDataType_t type = ncclDouble; +}; + +} // namespace nccl + +} // namespace caffe + +#endif // end USE_NCCL + +#endif // CAFFE_UTIL_NCCL_H_ diff --git a/python/caffe/__init__.py b/python/caffe/__init__.py index 35868a40..43a0c49b 100644 --- a/python/caffe/__init__.py +++ b/python/caffe/__init__.py @@ -1,5 +1,5 @@ -from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver -from ._caffe import set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed +from .pycaffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, RMSPropSolver, AdaDeltaSolver, AdamSolver, NCCL, Timer +from ._caffe import init_log, log, set_mode_cpu, set_mode_gpu, set_device, Layer, get_solver, layer_type_list, set_random_seed, solver_count, set_solver_count, solver_rank, set_solver_rank, set_multiprocess, Layer, get_solver from ._caffe import __version__ from .proto.caffe_pb2 import TRAIN, TEST from .classifier import Classifier diff --git a/python/caffe/_caffe.cpp b/python/caffe/_caffe.cpp index bdee75ac..3589e476 100644 --- a/python/caffe/_caffe.cpp +++ b/python/caffe/_caffe.cpp @@ -51,6 +51,19 @@ const int NPY_DTYPE = NPY_FLOAT32; void set_mode_cpu() { Caffe::set_mode(Caffe::CPU); } void set_mode_gpu() { Caffe::set_mode(Caffe::GPU); } +void InitLog(int level) { + FLAGS_logtostderr = 1; + FLAGS_minloglevel = level; + ::google::InitGoogleLogging(""); + ::google::InstallFailureSignalHandler(); +} +void InitLogInfo() { + InitLog(google::INFO); +} +void Log(const string& s) { + LOG(INFO) << s; +} + void set_random_seed(unsigned int seed) { Caffe::set_random_seed(seed); } // For convenience, check that input files can be opened, and raise an @@ -254,12 +267,12 @@ bp::object BlobVec_add_blob(bp::tuple args, bp::dict kwargs) { } template<typename Dtype> -class PythonCallback: public Solver<Dtype>::Callback { +class SolverCallback: public Solver<Dtype>::Callback { protected: bp::object on_start_, on_gradients_ready_; public: - PythonCallback(bp::object on_start, bp::object on_gradients_ready) + SolverCallback(bp::object on_start, bp::object on_gradients_ready) : on_start_(on_start), on_gradients_ready_(on_gradients_ready) { } virtual void on_gradients_ready() { on_gradients_ready_(); @@ -271,9 +284,61 @@ class PythonCallback: public Solver<Dtype>::Callback { template<typename Dtype> void Solver_add_callback(Solver<Dtype> * solver, bp::object on_start, bp::object on_gradients_ready) { - solver->add_callback(new PythonCallback<Dtype>(on_start, on_gradients_ready)); + solver->add_callback(new SolverCallback<Dtype>(on_start, on_gradients_ready)); +} + +// Seems boost cannot call the base method directly +void Solver_add_nccl(SGDSolver<Dtype>* solver +#ifdef USE_NCCL + , NCCL<Dtype>* nccl +#endif +) { +#ifdef USE_NCCL + solver->add_callback(nccl); +#endif } +template<typename Dtype> +class NetCallback: public Net<Dtype>::Callback { + public: + explicit NetCallback(bp::object run) : run_(run) {} + + protected: + virtual void run(int layer) { + run_(layer); + } + bp::object run_; +}; +void Net_before_forward(Net<Dtype>* net, bp::object run) { + net->add_before_forward(new NetCallback<Dtype>(run)); +} +void Net_after_forward(Net<Dtype>* net, bp::object run) { + net->add_after_forward(new NetCallback<Dtype>(run)); +} +void Net_before_backward(Net<Dtype>* net, bp::object run) { + net->add_before_backward(new NetCallback<Dtype>(run)); +} +void Net_after_backward(Net<Dtype>* net, bp::object run) { + net->add_after_backward(new NetCallback<Dtype>(run)); +} + +void Net_add_nccl(Net<Dtype>* net +#ifdef USE_NCCL + , NCCL<Dtype>* nccl +#endif +) { +#ifdef USE_NCCL + net->add_after_backward(nccl); +#endif +} +#ifndef USE_NCCL +template<typename Dtype> +class NCCL { + public: + NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid) {} +}; +#endif + BOOST_PYTHON_MEMBER_FUNCTION_OVERLOADS(SolveOverloads, Solve, 0, 1); BOOST_PYTHON_MODULE(_caffe) { @@ -283,10 +348,18 @@ BOOST_PYTHON_MODULE(_caffe) { bp::scope().attr("__version__") = AS_STRING(CAFFE_VERSION); // Caffe utility functions + bp::def("init_log", &InitLog); + bp::def("init_log", &InitLogInfo); + bp::def("log", &Log); bp::def("set_mode_cpu", &set_mode_cpu); bp::def("set_mode_gpu", &set_mode_gpu); bp::def("set_random_seed", &set_random_seed); bp::def("set_device", &Caffe::SetDevice); + bp::def("solver_count", &Caffe::solver_count); + bp::def("set_solver_count", &Caffe::set_solver_count); + bp::def("solver_rank", &Caffe::solver_rank); + bp::def("set_solver_rank", &Caffe::set_solver_rank); + bp::def("set_multiprocess", &Caffe::set_multiprocess); bp::def("layer_type_list", &LayerRegistry<Dtype>::LayerTypeList); @@ -330,7 +403,12 @@ BOOST_PYTHON_MODULE(_caffe) { bp::with_custodian_and_ward<1, 2, bp::with_custodian_and_ward<1, 3> >()) .def("save", &Net_Save) .def("save_hdf5", &Net_SaveHDF5) - .def("load_hdf5", &Net_LoadHDF5); + .def("load_hdf5", &Net_LoadHDF5) + .def("before_forward", &Net_before_forward) + .def("after_forward", &Net_after_forward) + .def("before_backward", &Net_before_backward) + .def("after_backward", &Net_after_backward) + .def("after_backward", &Net_add_nccl); BP_REGISTER_SHARED_PTR_TO_PYTHON(Net<Dtype>); bp::class_<Blob<Dtype>, shared_ptr<Blob<Dtype> >, boost::noncopyable>( @@ -362,6 +440,10 @@ BOOST_PYTHON_MODULE(_caffe) { .add_property("type", bp::make_function(&Layer<Dtype>::type)); BP_REGISTER_SHARED_PTR_TO_PYTHON(Layer<Dtype>); + bp::class_<SolverParameter>("SolverParameter", bp::no_init) + .add_property("max_iter", &SolverParameter::max_iter) + .add_property("display", &SolverParameter::display) + .add_property("layer_wise_reduce", &SolverParameter::layer_wise_reduce); bp::class_<LayerParameter>("LayerParameter", bp::no_init); bp::class_<Solver<Dtype>, shared_ptr<Solver<Dtype> >, boost::noncopyable>( @@ -371,11 +453,14 @@ BOOST_PYTHON_MODULE(_caffe) { bp::return_internal_reference<>())) .add_property("iter", &Solver<Dtype>::iter) .def("add_callback", &Solver_add_callback<Dtype>) + .def("add_callback", &Solver_add_nccl) .def("solve", static_cast<void (Solver<Dtype>::*)(const char*)>( &Solver<Dtype>::Solve), SolveOverloads()) .def("step", &Solver<Dtype>::Step) .def("restore", &Solver<Dtype>::Restore) - .def("snapshot", &Solver<Dtype>::Snapshot); + .def("snapshot", &Solver<Dtype>::Snapshot) + .add_property("param", bp::make_function(&Solver<Dtype>::param, + bp::return_value_policy<bp::copy_const_reference>())); BP_REGISTER_SHARED_PTR_TO_PYTHON(Solver<Dtype>); bp::class_<SGDSolver<Dtype>, bp::bases<Solver<Dtype> >, @@ -419,6 +504,24 @@ BOOST_PYTHON_MODULE(_caffe) { bp::class_<vector<bool> >("BoolVec") .def(bp::vector_indexing_suite<vector<bool> >()); + bp::class_<NCCL<Dtype>, shared_ptr<NCCL<Dtype> >, + boost::noncopyable>("NCCL", + bp::init<shared_ptr<Solver<Dtype> >, const string&>()) +#ifdef USE_NCCL + .def("new_uid", &NCCL<Dtype>::new_uid).staticmethod("new_uid") + .def("bcast", &NCCL<Dtype>::Broadcast) +#endif + /* NOLINT_NEXT_LINE(whitespace/semicolon) */ + ; + BP_REGISTER_SHARED_PTR_TO_PYTHON(NCCL<Dtype>); + + bp::class_<Timer, shared_ptr<Timer>, boost::noncopyable>( + "Timer", bp::init<>()) + .def("start", &Timer::Start) + .def("stop", &Timer::Stop) + .add_property("ms", &Timer::MilliSeconds); + BP_REGISTER_SHARED_PTR_TO_PYTHON(Timer); + // boost python expects a void (missing) return value, while import_array // returns NULL for python3. import_array1() forces a void return value. import_array1(); diff --git a/python/caffe/pycaffe.py b/python/caffe/pycaffe.py index 5bae18d9..18803818 100644 --- a/python/caffe/pycaffe.py +++ b/python/caffe/pycaffe.py @@ -11,7 +11,7 @@ except: import numpy as np from ._caffe import Net, SGDSolver, NesterovSolver, AdaGradSolver, \ - RMSPropSolver, AdaDeltaSolver, AdamSolver + RMSPropSolver, AdaDeltaSolver, AdamSolver, NCCL, Timer import caffe.io import six diff --git a/python/train.py b/python/train.py new file mode 100644 index 00000000..5897f5dc --- /dev/null +++ b/python/train.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +""" +Trains a model using one or more GPUs. +""" +from multiprocessing import Process + +import caffe + + +def train( + solver, # solver proto definition + snapshot, # solver snapshot to restore + gpus, # list of device ids + timing=False, # show timing info for compute and communications +): + # NCCL uses a uid to identify a session + uid = caffe.NCCL.new_uid() + + caffe.init_log() + caffe.log('Using devices %s' % str(gpus)) + + procs = [] + for rank in range(len(gpus)): + p = Process(target=solve, + args=(solver, snapshot, gpus, timing, uid, rank)) + p.daemon = True + p.start() + procs.append(p) + for p in procs: + p.join() + + +def time(solver, nccl): + fprop = [] + bprop = [] + total = caffe.Timer() + allrd = caffe.Timer() + for _ in range(len(solver.net.layers)): + fprop.append(caffe.Timer()) + bprop.append(caffe.Timer()) + display = solver.param.display + + def show_time(): + if solver.iter % display == 0: + s = '\n' + for i in range(len(solver.net.layers)): + s += 'forw %3d %8s ' % (i, solver.net._layer_names[i]) + s += ': %.2f\n' % fprop[i].ms + for i in range(len(solver.net.layers) - 1, -1, -1): + s += 'back %3d %8s ' % (i, solver.net._layer_names[i]) + s += ': %.2f\n' % bprop[i].ms + s += 'solver total: %.2f\n' % total.ms + s += 'allreduce: %.2f\n' % allrd.ms + caffe.log(s) + + solver.net.before_forward(lambda layer: fprop[layer].start()) + solver.net.after_forward(lambda layer: fprop[layer].stop()) + solver.net.before_backward(lambda layer: bprop[layer].start()) + solver.net.after_backward(lambda layer: bprop[layer].stop()) + solver.add_callback(lambda: total.start(), lambda: (total.stop(), allrd.start())) + solver.add_callback(nccl) + solver.add_callback(lambda: '', lambda: (allrd.stop(), show_time())) + + +def solve(proto, snapshot, gpus, timing, uid, rank): + caffe.set_mode_gpu() + caffe.set_device(gpus[rank]) + caffe.set_solver_count(len(gpus)) + caffe.set_solver_rank(rank) + caffe.set_multiprocess(True) + + solver = caffe.SGDSolver(proto) + if snapshot and len(snapshot) != 0: + solver.restore(snapshot) + + nccl = caffe.NCCL(solver, uid) + nccl.bcast() + + if timing and rank == 0: + time(solver, nccl) + else: + solver.add_callback(nccl) + + if solver.param.layer_wise_reduce: + solver.net.after_backward(nccl) + solver.step(solver.param.max_iter) + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + + parser.add_argument("--solver", required=True, help="Solver proto definition.") + parser.add_argument("--snapshot", help="Solver snapshot to restore.") + parser.add_argument("--gpus", type=int, nargs='+', default=[0], + help="List of device ids.") + parser.add_argument("--timing", action='store_true', help="Show timing info.") + args = parser.parse_args() + + train(args.solver, args.snapshot, args.gpus, args.timing) diff --git a/src/caffe/blob.cpp b/src/caffe/blob.cpp index 4a34e4c5..603e52f7 100644 --- a/src/caffe/blob.cpp +++ b/src/caffe/blob.cpp @@ -89,6 +89,12 @@ const Dtype* Blob<Dtype>::cpu_data() const { template <typename Dtype> void Blob<Dtype>::set_cpu_data(Dtype* data) { CHECK(data); + // Make sure CPU and GPU sizes remain equal + size_t size = count_ * sizeof(Dtype); + if (data_->size() != size) { + data_.reset(new SyncedMemory(size)); + diff_.reset(new SyncedMemory(size)); + } data_->set_cpu_data(data); } @@ -99,6 +105,18 @@ const Dtype* Blob<Dtype>::gpu_data() const { } template <typename Dtype> +void Blob<Dtype>::set_gpu_data(Dtype* data) { + CHECK(data); + // Make sure CPU and GPU sizes remain equal + size_t size = count_ * sizeof(Dtype); + if (data_->size() != size) { + data_.reset(new SyncedMemory(size)); + diff_.reset(new SyncedMemory(size)); + } + data_->set_gpu_data(data); +} + +template <typename Dtype> const Dtype* Blob<Dtype>::cpu_diff() const { CHECK(diff_); return (const Dtype*)diff_->cpu_data(); diff --git a/src/caffe/common.cpp b/src/caffe/common.cpp index dee68165..4f6f9bcc 100644 --- a/src/caffe/common.cpp +++ b/src/caffe/common.cpp @@ -53,7 +53,7 @@ void GlobalInit(int* pargc, char*** pargv) { Caffe::Caffe() : random_generator_(), mode_(Caffe::CPU), - solver_count_(1), root_solver_(true) { } + solver_count_(1), solver_rank_(0), multiprocess_(false) { } Caffe::~Caffe() { } @@ -106,7 +106,8 @@ void* Caffe::RNG::generator() { Caffe::Caffe() : cublas_handle_(NULL), curand_generator_(NULL), random_generator_(), - mode_(Caffe::CPU), solver_count_(1), root_solver_(true) { + mode_(Caffe::CPU), + solver_count_(1), solver_rank_(0), multiprocess_(false) { // Try to create a cublas handler, and report an error if failed (but we will // keep the program running as one might just want to run CPU code). if (cublasCreate(&cublas_handle_) != CUBLAS_STATUS_SUCCESS) { diff --git a/src/caffe/data_reader.cpp b/src/caffe/data_reader.cpp deleted file mode 100644 index 9f019bbf..00000000 --- a/src/caffe/data_reader.cpp +++ /dev/null @@ -1,119 +0,0 @@ -#include <boost/thread.hpp> -#include <map> -#include <string> -#include <vector> - -#include "caffe/common.hpp" -#include "caffe/data_reader.hpp" -#include "caffe/layers/data_layer.hpp" -#include "caffe/proto/caffe.pb.h" - -namespace caffe { - -using boost::weak_ptr; - -map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_; -static boost::mutex bodies_mutex_; - -DataReader::DataReader(const LayerParameter& param) - : queue_pair_(new QueuePair( // - param.data_param().prefetch() * param.data_param().batch_size())) { - // Get or create a body - boost::mutex::scoped_lock lock(bodies_mutex_); - string key = source_key(param); - weak_ptr<Body>& weak = bodies_[key]; - body_ = weak.lock(); - if (!body_) { - body_.reset(new Body(param)); - bodies_[key] = weak_ptr<Body>(body_); - } - body_->new_queue_pairs_.push(queue_pair_); -} - -DataReader::~DataReader() { - string key = source_key(body_->param_); - body_.reset(); - boost::mutex::scoped_lock lock(bodies_mutex_); - if (bodies_[key].expired()) { - bodies_.erase(key); - } -} - -// - -DataReader::QueuePair::QueuePair(int size) { - // Initialize the free queue with requested number of datums - for (int i = 0; i < size; ++i) { - free_.push(new Datum()); - } -} - -DataReader::QueuePair::~QueuePair() { - Datum* datum; - while (free_.try_pop(&datum)) { - delete datum; - } - while (full_.try_pop(&datum)) { - delete datum; - } -} - -// - -DataReader::Body::Body(const LayerParameter& param) - : param_(param), - new_queue_pairs_() { - StartInternalThread(); -} - -DataReader::Body::~Body() { - StopInternalThread(); -} - -void DataReader::Body::InternalThreadEntry() { - shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend())); - db->Open(param_.data_param().source(), db::READ); - shared_ptr<db::Cursor> cursor(db->NewCursor()); - vector<shared_ptr<QueuePair> > qps; - try { - int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1; - - // To ensure deterministic runs, only start running once all solvers - // are ready. But solvers need to peek on one item during initialization, - // so read one item, then wait for the next solver. - for (int i = 0; i < solver_count; ++i) { - shared_ptr<QueuePair> qp(new_queue_pairs_.pop()); - read_one(cursor.get(), qp.get()); - qps.push_back(qp); - } - // Main loop - while (!must_stop()) { - for (int i = 0; i < solver_count; ++i) { - read_one(cursor.get(), qps[i].get()); - } - // Check no additional readers have been created. This can happen if - // more than one net is trained at a time per process, whether single - // or multi solver. It might also happen if two data layers have same - // name and same source. - CHECK_EQ(new_queue_pairs_.size(), 0); - } - } catch (boost::thread_interrupted&) { - // Interrupted exception is expected on shutdown - } -} - -void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) { - Datum* datum = qp->free_.pop(); - // TODO deserialize in-place instead of copy? - datum->ParseFromString(cursor->value()); - qp->full_.push(datum); - - // go to the next iter - cursor->Next(); - if (!cursor->valid()) { - DLOG(INFO) << "Restarting data prefetching from start."; - cursor->SeekToFirst(); - } -} - -} // namespace caffe diff --git a/src/caffe/internal_thread.cpp b/src/caffe/internal_thread.cpp index 104884e0..11de4979 100644 --- a/src/caffe/internal_thread.cpp +++ b/src/caffe/internal_thread.cpp @@ -28,25 +28,27 @@ void InternalThread::StartInternalThread() { Caffe::Brew mode = Caffe::mode(); int rand_seed = caffe_rng_rand(); int solver_count = Caffe::solver_count(); - bool root_solver = Caffe::root_solver(); + int solver_rank = Caffe::solver_rank(); + bool multiprocess = Caffe::multiprocess(); try { thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode, - rand_seed, solver_count, root_solver)); + rand_seed, solver_count, solver_rank, multiprocess)); } catch (std::exception& e) { LOG(FATAL) << "Thread exception: " << e.what(); } } void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed, - int solver_count, bool root_solver) { + int solver_count, int solver_rank, bool multiprocess) { #ifndef CPU_ONLY CUDA_CHECK(cudaSetDevice(device)); #endif Caffe::set_mode(mode); Caffe::set_random_seed(rand_seed); Caffe::set_solver_count(solver_count); - Caffe::set_root_solver(root_solver); + Caffe::set_solver_rank(solver_rank); + Caffe::set_multiprocess(multiprocess); InternalThreadEntry(); } diff --git a/src/caffe/layer.cpp b/src/caffe/layer.cpp index 3b912898..684ae88b 100644 --- a/src/caffe/layer.cpp +++ b/src/caffe/layer.cpp @@ -1,27 +1,7 @@ -#include <boost/thread.hpp> #include "caffe/layer.hpp" namespace caffe { -template <typename Dtype> -void Layer<Dtype>::InitMutex() { - forward_mutex_.reset(new boost::mutex()); -} - -template <typename Dtype> -void Layer<Dtype>::Lock() { - if (IsShared()) { - forward_mutex_->lock(); - } -} - -template <typename Dtype> -void Layer<Dtype>::Unlock() { - if (IsShared()) { - forward_mutex_->unlock(); - } -} - INSTANTIATE_CLASS(Layer); } // namespace caffe diff --git a/src/caffe/layers/base_data_layer.cpp b/src/caffe/layers/base_data_layer.cpp index 989319f1..93a798f3 100644 --- a/src/caffe/layers/base_data_layer.cpp +++ b/src/caffe/layers/base_data_layer.cpp @@ -36,9 +36,11 @@ template <typename Dtype> BasePrefetchingDataLayer<Dtype>::BasePrefetchingDataLayer( const LayerParameter& param) : BaseDataLayer<Dtype>(param), - prefetch_free_(), prefetch_full_() { - for (int i = 0; i < PREFETCH_COUNT; ++i) { - prefetch_free_.push(&prefetch_[i]); + prefetch_(param.data_param().prefetch()), + prefetch_free_(), prefetch_full_(), prefetch_current_() { + for (int i = 0; i < prefetch_.size(); ++i) { + prefetch_[i].reset(new Batch<Dtype>()); + prefetch_free_.push(prefetch_[i].get()); } } @@ -46,22 +48,23 @@ template <typename Dtype> void BasePrefetchingDataLayer<Dtype>::LayerSetUp( const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { BaseDataLayer<Dtype>::LayerSetUp(bottom, top); + // Before starting the prefetch thread, we make cpu_data and gpu_data // calls so that the prefetch thread does not accidentally make simultaneous // cudaMalloc calls when the main thread is running. In some GPUs this // seems to cause failures if we do not so. - for (int i = 0; i < PREFETCH_COUNT; ++i) { - prefetch_[i].data_.mutable_cpu_data(); + for (int i = 0; i < prefetch_.size(); ++i) { + prefetch_[i]->data_.mutable_cpu_data(); if (this->output_labels_) { - prefetch_[i].label_.mutable_cpu_data(); + prefetch_[i]->label_.mutable_cpu_data(); } } #ifndef CPU_ONLY if (Caffe::mode() == Caffe::GPU) { - for (int i = 0; i < PREFETCH_COUNT; ++i) { - prefetch_[i].data_.mutable_gpu_data(); + for (int i = 0; i < prefetch_.size(); ++i) { + prefetch_[i]->data_.mutable_gpu_data(); if (this->output_labels_) { - prefetch_[i].label_.mutable_gpu_data(); + prefetch_[i]->label_.mutable_gpu_data(); } } } @@ -88,6 +91,9 @@ void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() { #ifndef CPU_ONLY if (Caffe::mode() == Caffe::GPU) { batch->data_.data().get()->async_gpu_push(stream); + if (this->output_labels_) { + batch->label_.data().get()->async_gpu_push(stream); + } CUDA_CHECK(cudaStreamSynchronize(stream)); } #endif @@ -106,22 +112,18 @@ void BasePrefetchingDataLayer<Dtype>::InternalThreadEntry() { template <typename Dtype> void BasePrefetchingDataLayer<Dtype>::Forward_cpu( const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { - Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty"); + if (prefetch_current_) { + prefetch_free_.push(prefetch_current_); + } + prefetch_current_ = prefetch_full_.pop("Waiting for data"); // Reshape to loaded data. - top[0]->ReshapeLike(batch->data_); - // Copy the data - caffe_copy(batch->data_.count(), batch->data_.cpu_data(), - top[0]->mutable_cpu_data()); - DLOG(INFO) << "Prefetch copied"; + top[0]->ReshapeLike(prefetch_current_->data_); + top[0]->set_cpu_data(prefetch_current_->data_.mutable_cpu_data()); if (this->output_labels_) { // Reshape to loaded labels. - top[1]->ReshapeLike(batch->label_); - // Copy the labels. - caffe_copy(batch->label_.count(), batch->label_.cpu_data(), - top[1]->mutable_cpu_data()); + top[1]->ReshapeLike(prefetch_current_->label_); + top[1]->set_cpu_data(prefetch_current_->label_.mutable_cpu_data()); } - - prefetch_free_.push(batch); } #ifdef CPU_ONLY diff --git a/src/caffe/layers/base_data_layer.cu b/src/caffe/layers/base_data_layer.cu index 4056d36a..64c621a7 100644 --- a/src/caffe/layers/base_data_layer.cu +++ b/src/caffe/layers/base_data_layer.cu @@ -7,23 +7,18 @@ namespace caffe { template <typename Dtype> void BasePrefetchingDataLayer<Dtype>::Forward_gpu( const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { - Batch<Dtype>* batch = prefetch_full_.pop("Data layer prefetch queue empty"); + if (prefetch_current_) { + prefetch_free_.push(prefetch_current_); + } + prefetch_current_ = prefetch_full_.pop("Waiting for data"); // Reshape to loaded data. - top[0]->ReshapeLike(batch->data_); - // Copy the data - caffe_copy(batch->data_.count(), batch->data_.gpu_data(), - top[0]->mutable_gpu_data()); + top[0]->ReshapeLike(prefetch_current_->data_); + top[0]->set_gpu_data(prefetch_current_->data_.mutable_gpu_data()); if (this->output_labels_) { // Reshape to loaded labels. - top[1]->ReshapeLike(batch->label_); - // Copy the labels. - caffe_copy(batch->label_.count(), batch->label_.gpu_data(), - top[1]->mutable_gpu_data()); + top[1]->ReshapeLike(prefetch_current_->label_); + top[1]->set_gpu_data(prefetch_current_->label_.mutable_gpu_data()); } - // Ensure the copy is synchronous wrt the host, so that the next batch isn't - // copied in meanwhile. - CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault)); - prefetch_free_.push(batch); } INSTANTIATE_LAYER_GPU_FORWARD(BasePrefetchingDataLayer); diff --git a/src/caffe/layers/data_layer.cpp b/src/caffe/layers/data_layer.cpp index 66e6301f..0f1296bb 100644 --- a/src/caffe/layers/data_layer.cpp +++ b/src/caffe/layers/data_layer.cpp @@ -14,7 +14,10 @@ namespace caffe { template <typename Dtype> DataLayer<Dtype>::DataLayer(const LayerParameter& param) : BasePrefetchingDataLayer<Dtype>(param), - reader_(param) { + offset_() { + db_.reset(db::GetDB(param.data_param().backend())); + db_->Open(param.data_param().source(), db::READ); + cursor_.reset(db_->NewCursor()); } template <typename Dtype> @@ -27,7 +30,8 @@ void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { const int batch_size = this->layer_param_.data_param().batch_size(); // Read a data point, and use it to initialize the top blob. - Datum& datum = *(reader_.full().peek()); + Datum datum; + datum.ParseFromString(cursor_->value()); // Use data_transformer to infer the expected blob shape from datum. vector<int> top_shape = this->data_transformer_->InferBlobShape(datum); @@ -35,22 +39,44 @@ void DataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, // Reshape top[0] and prefetch_data according to the batch_size. top_shape[0] = batch_size; top[0]->Reshape(top_shape); - for (int i = 0; i < this->PREFETCH_COUNT; ++i) { - this->prefetch_[i].data_.Reshape(top_shape); + for (int i = 0; i < this->prefetch_.size(); ++i) { + this->prefetch_[i]->data_.Reshape(top_shape); } - LOG(INFO) << "output data size: " << top[0]->num() << "," + LOG_IF(INFO, Caffe::root_solver()) + << "output data size: " << top[0]->num() << "," << top[0]->channels() << "," << top[0]->height() << "," << top[0]->width(); // label if (this->output_labels_) { vector<int> label_shape(1, batch_size); top[1]->Reshape(label_shape); - for (int i = 0; i < this->PREFETCH_COUNT; ++i) { - this->prefetch_[i].label_.Reshape(label_shape); + for (int i = 0; i < this->prefetch_.size(); ++i) { + this->prefetch_[i]->label_.Reshape(label_shape); } } } +template <typename Dtype> +bool DataLayer<Dtype>::Skip() { + int size = Caffe::solver_count(); + int rank = Caffe::solver_rank(); + bool keep = (offset_ % size) == rank || + // In test mode, only rank 0 runs, so avoid skipping + this->layer_param_.phase() == TEST; + return !keep; +} + +template<typename Dtype> +void DataLayer<Dtype>::Next() { + cursor_->Next(); + if (!cursor_->valid()) { + LOG_IF(INFO, Caffe::root_solver()) + << "Restarting data prefetching from start."; + cursor_->SeekToFirst(); + } + offset_++; +} + // This function is called on prefetch thread template<typename Dtype> void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) { @@ -61,41 +87,41 @@ void DataLayer<Dtype>::load_batch(Batch<Dtype>* batch) { CPUTimer timer; CHECK(batch->data_.count()); CHECK(this->transformed_data_.count()); - - // Reshape according to the first datum of each batch - // on single input batches allows for inputs of varying dimension. const int batch_size = this->layer_param_.data_param().batch_size(); - Datum& datum = *(reader_.full().peek()); - // Use data_transformer to infer the expected blob shape from datum. - vector<int> top_shape = this->data_transformer_->InferBlobShape(datum); - this->transformed_data_.Reshape(top_shape); - // Reshape batch according to the batch_size. - top_shape[0] = batch_size; - batch->data_.Reshape(top_shape); - - Dtype* top_data = batch->data_.mutable_cpu_data(); - Dtype* top_label = NULL; // suppress warnings about uninitialized variables - if (this->output_labels_) { - top_label = batch->label_.mutable_cpu_data(); - } + Datum datum; for (int item_id = 0; item_id < batch_size; ++item_id) { timer.Start(); - // get a datum - Datum& datum = *(reader_.full().pop("Waiting for data")); + while (Skip()) { + Next(); + } + datum.ParseFromString(cursor_->value()); read_time += timer.MicroSeconds(); - timer.Start(); + + if (item_id == 0) { + // Reshape according to the first datum of each batch + // on single input batches allows for inputs of varying dimension. + // Use data_transformer to infer the expected blob shape from datum. + vector<int> top_shape = this->data_transformer_->InferBlobShape(datum); + this->transformed_data_.Reshape(top_shape); + // Reshape batch according to the batch_size. + top_shape[0] = batch_size; + batch->data_.Reshape(top_shape); + } + // Apply data transformations (mirror, scale, crop...) + timer.Start(); int offset = batch->data_.offset(item_id); + Dtype* top_data = batch->data_.mutable_cpu_data(); this->transformed_data_.set_cpu_data(top_data + offset); this->data_transformer_->Transform(datum, &(this->transformed_data_)); // Copy label. if (this->output_labels_) { + Dtype* top_label = batch->label_.mutable_cpu_data(); top_label[item_id] = datum.label(); } trans_time += timer.MicroSeconds(); - - reader_.free().push(const_cast<Datum*>(&datum)); + Next(); } timer.Stop(); batch_timer.Stop(); diff --git a/src/caffe/layers/hdf5_data_layer.cpp b/src/caffe/layers/hdf5_data_layer.cpp index c957451a..b9a071ce 100644 --- a/src/caffe/layers/hdf5_data_layer.cpp +++ b/src/caffe/layers/hdf5_data_layer.cpp @@ -125,27 +125,45 @@ void HDF5DataLayer<Dtype>::LayerSetUp(const vector<Blob<Dtype>*>& bottom, } template <typename Dtype> +bool HDF5DataLayer<Dtype>::Skip() { + int size = Caffe::solver_count(); + int rank = Caffe::solver_rank(); + bool keep = (offset_ % size) == rank || + // In test mode, only rank 0 runs, so avoid skipping + this->layer_param_.phase() == TEST; + return !keep; +} + +template<typename Dtype> +void HDF5DataLayer<Dtype>::Next() { + if (++current_row_ == hdf_blobs_[0]->shape(0)) { + if (num_files_ > 1) { + ++current_file_; + if (current_file_ == num_files_) { + current_file_ = 0; + if (this->layer_param_.hdf5_data_param().shuffle()) { + std::random_shuffle(file_permutation_.begin(), + file_permutation_.end()); + } + DLOG(INFO) << "Looping around to first file."; + } + LoadHDF5FileData( + hdf_filenames_[file_permutation_[current_file_]].c_str()); + } + current_row_ = 0; + if (this->layer_param_.hdf5_data_param().shuffle()) + std::random_shuffle(data_permutation_.begin(), data_permutation_.end()); + } + offset_++; +} + +template <typename Dtype> void HDF5DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { const int batch_size = this->layer_param_.hdf5_data_param().batch_size(); - for (int i = 0; i < batch_size; ++i, ++current_row_) { - if (current_row_ == hdf_blobs_[0]->shape(0)) { - if (num_files_ > 1) { - ++current_file_; - if (current_file_ == num_files_) { - current_file_ = 0; - if (this->layer_param_.hdf5_data_param().shuffle()) { - std::random_shuffle(file_permutation_.begin(), - file_permutation_.end()); - } - DLOG(INFO) << "Looping around to first file."; - } - LoadHDF5FileData( - hdf_filenames_[file_permutation_[current_file_]].c_str()); - } - current_row_ = 0; - if (this->layer_param_.hdf5_data_param().shuffle()) - std::random_shuffle(data_permutation_.begin(), data_permutation_.end()); + for (int i = 0; i < batch_size; ++i) { + while (Skip()) { + Next(); } for (int j = 0; j < this->layer_param_.top_size(); ++j) { int data_dim = top[j]->count() / top[j]->shape(0); @@ -153,6 +171,7 @@ void HDF5DataLayer<Dtype>::Forward_cpu(const vector<Blob<Dtype>*>& bottom, &hdf_blobs_[j]->cpu_data()[data_permutation_[current_row_] * data_dim], &top[j]->mutable_cpu_data()[i * data_dim]); } + Next(); } } diff --git a/src/caffe/layers/hdf5_data_layer.cu b/src/caffe/layers/hdf5_data_layer.cu index 595d2230..33eebd41 100644 --- a/src/caffe/layers/hdf5_data_layer.cu +++ b/src/caffe/layers/hdf5_data_layer.cu @@ -17,24 +17,9 @@ template <typename Dtype> void HDF5DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom, const vector<Blob<Dtype>*>& top) { const int batch_size = this->layer_param_.hdf5_data_param().batch_size(); - for (int i = 0; i < batch_size; ++i, ++current_row_) { - if (current_row_ == hdf_blobs_[0]->shape(0)) { - if (num_files_ > 1) { - current_file_ += 1; - if (current_file_ == num_files_) { - current_file_ = 0; - if (this->layer_param_.hdf5_data_param().shuffle()) { - std::random_shuffle(file_permutation_.begin(), - file_permutation_.end()); - } - DLOG(INFO) << "Looping around to first file."; - } - LoadHDF5FileData( - hdf_filenames_[file_permutation_[current_file_]].c_str()); - } - current_row_ = 0; - if (this->layer_param_.hdf5_data_param().shuffle()) - std::random_shuffle(data_permutation_.begin(), data_permutation_.end()); + for (int i = 0; i < batch_size; ++i) { + while (Skip()) { + Next(); } for (int j = 0; j < this->layer_param_.top_size(); ++j) { int data_dim = top[j]->count() / top[j]->shape(0); @@ -42,6 +27,7 @@ void HDF5DataLayer<Dtype>::Forward_gpu(const vector<Blob<Dtype>*>& bottom, &hdf_blobs_[j]->cpu_data()[data_permutation_[current_row_] * data_dim], &top[j]->mutable_gpu_data()[i * data_dim]); } + Next(); } } diff --git a/src/caffe/layers/image_data_layer.cpp b/src/caffe/layers/image_data_layer.cpp index 7ee7dc40..ec0fc5b0 100644 --- a/src/caffe/layers/image_data_layer.cpp +++ b/src/caffe/layers/image_data_layer.cpp @@ -54,6 +54,11 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, const unsigned int prefetch_rng_seed = caffe_rng_rand(); prefetch_rng_.reset(new Caffe::RNG(prefetch_rng_seed)); ShuffleImages(); + } else { + if (this->phase_ == TRAIN && Caffe::solver_rank() > 0 && + this->layer_param_.image_data_param().rand_skip() == 0) { + LOG(WARNING) << "Shuffling or skipping recommended for multi-GPU"; + } } LOG(INFO) << "A total of " << lines_.size() << " images."; @@ -77,8 +82,8 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, const int batch_size = this->layer_param_.image_data_param().batch_size(); CHECK_GT(batch_size, 0) << "Positive batch size required"; top_shape[0] = batch_size; - for (int i = 0; i < this->PREFETCH_COUNT; ++i) { - this->prefetch_[i].data_.Reshape(top_shape); + for (int i = 0; i < this->prefetch_.size(); ++i) { + this->prefetch_[i]->data_.Reshape(top_shape); } top[0]->Reshape(top_shape); @@ -88,8 +93,8 @@ void ImageDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, // label vector<int> label_shape(1, batch_size); top[1]->Reshape(label_shape); - for (int i = 0; i < this->PREFETCH_COUNT; ++i) { - this->prefetch_[i].label_.Reshape(label_shape); + for (int i = 0; i < this->prefetch_.size(); ++i) { + this->prefetch_[i]->label_.Reshape(label_shape); } } diff --git a/src/caffe/layers/window_data_layer.cpp b/src/caffe/layers/window_data_layer.cpp index 103dd4b6..1bf3760e 100644 --- a/src/caffe/layers/window_data_layer.cpp +++ b/src/caffe/layers/window_data_layer.cpp @@ -173,8 +173,8 @@ void WindowDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, CHECK_GT(crop_size, 0); const int batch_size = this->layer_param_.window_data_param().batch_size(); top[0]->Reshape(batch_size, channels, crop_size, crop_size); - for (int i = 0; i < this->PREFETCH_COUNT; ++i) - this->prefetch_[i].data_.Reshape( + for (int i = 0; i < this->prefetch_.size(); ++i) + this->prefetch_[i]->data_.Reshape( batch_size, channels, crop_size, crop_size); LOG(INFO) << "output data size: " << top[0]->num() << "," @@ -183,8 +183,8 @@ void WindowDataLayer<Dtype>::DataLayerSetUp(const vector<Blob<Dtype>*>& bottom, // label vector<int> label_shape(1, batch_size); top[1]->Reshape(label_shape); - for (int i = 0; i < this->PREFETCH_COUNT; ++i) { - this->prefetch_[i].label_.Reshape(label_shape); + for (int i = 0; i < this->prefetch_.size(); ++i) { + this->prefetch_[i]->label_.Reshape(label_shape); } // data mean diff --git a/src/caffe/net.cpp b/src/caffe/net.cpp index 644cb7e9..aa9e8f2f 100644 --- a/src/caffe/net.cpp +++ b/src/caffe/net.cpp @@ -22,16 +22,13 @@ namespace caffe { template <typename Dtype> -Net<Dtype>::Net(const NetParameter& param, const Net* root_net) - : root_net_(root_net) { +Net<Dtype>::Net(const NetParameter& param) { Init(param); } template <typename Dtype> Net<Dtype>::Net(const string& param_file, Phase phase, - const int level, const vector<string>* stages, - const Net* root_net) - : root_net_(root_net) { + const int level, const vector<string>* stages) { NetParameter param; ReadNetParamsFromTextFileOrDie(param_file, ¶m); // Set phase, stages and level @@ -47,8 +44,6 @@ Net<Dtype>::Net(const string& param_file, Phase phase, template <typename Dtype> void Net<Dtype>::Init(const NetParameter& in_param) { - CHECK(Caffe::root_solver() || root_net_) - << "root_net_ needs to be set for all non-root solvers"; // Set phase from the state. phase_ = in_param.state().phase(); // Filter layers based on their include/exclude rules and @@ -74,9 +69,6 @@ void Net<Dtype>::Init(const NetParameter& in_param) { top_id_vecs_.resize(param.layer_size()); bottom_need_backward_.resize(param.layer_size()); for (int layer_id = 0; layer_id < param.layer_size(); ++layer_id) { - // For non-root solvers, whether this layer is shared from root_net_. - bool share_from_root = !Caffe::root_solver() - && root_net_->layers_[layer_id]->ShareInParallel(); // Inherit phase from net if unset. if (!param.layer(layer_id).has_phase()) { param.mutable_layer(layer_id)->set_phase(phase_); @@ -89,13 +81,7 @@ void Net<Dtype>::Init(const NetParameter& in_param) { << "propagate_down param must be specified " << "either 0 or bottom_size times "; } - if (share_from_root) { - LOG(INFO) << "Sharing layer " << layer_param.name() << " from root net"; - layers_.push_back(root_net_->layers_[layer_id]); - layers_[layer_id]->SetShared(true); - } else { - layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param)); - } + layers_.push_back(LayerRegistry<Dtype>::CreateLayer(layer_param)); layer_names_.push_back(layer_param.name()); LOG_IF(INFO, Caffe::root_solver()) << "Creating Layer " << layer_param.name(); @@ -134,19 +120,7 @@ void Net<Dtype>::Init(const NetParameter& in_param) { } } // After this layer is connected, set it up. - if (share_from_root) { - // Set up size of top blobs using root_net_ - const vector<Blob<Dtype>*>& base_top = root_net_->top_vecs_[layer_id]; - const vector<Blob<Dtype>*>& this_top = this->top_vecs_[layer_id]; - for (int top_id = 0; top_id < base_top.size(); ++top_id) { - this_top[top_id]->ReshapeLike(*base_top[top_id]); - LOG(INFO) << "Created top blob " << top_id << " (shape: " - << this_top[top_id]->shape_string() << ") for shared layer " - << layer_param.name(); - } - } else { - layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]); - } + layers_[layer_id]->SetUp(bottom_vecs_[layer_id], top_vecs_[layer_id]); LOG_IF(INFO, Caffe::root_solver()) << "Setting up " << layer_names_[layer_id]; for (int top_id = 0; top_id < top_vecs_[layer_id].size(); ++top_id) { @@ -546,10 +520,15 @@ Dtype Net<Dtype>::ForwardFromTo(int start, int end) { CHECK_LT(end, layers_.size()); Dtype loss = 0; for (int i = start; i <= end; ++i) { - // LOG(ERROR) << "Forwarding " << layer_names_[i]; + for (int c = 0; c < before_forward_.size(); ++c) { + before_forward_[c]->run(i); + } Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]); loss += layer_loss; if (debug_info_) { ForwardDebugInfo(i); } + for (int c = 0; c < after_forward_.size(); ++c) { + after_forward_[c]->run(i); + } } return loss; } @@ -591,11 +570,17 @@ void Net<Dtype>::BackwardFromTo(int start, int end) { CHECK_GE(end, 0); CHECK_LT(start, layers_.size()); for (int i = start; i >= end; --i) { + for (int c = 0; c < before_backward_.size(); ++c) { + before_backward_[c]->run(i); + } if (layer_need_backward_[i]) { layers_[i]->Backward( top_vecs_[i], bottom_need_backward_[i], bottom_vecs_[i]); if (debug_info_) { BackwardDebugInfo(i); } } + for (int c = 0; c < after_backward_.size(); ++c) { + after_backward_[c]->run(i); + } } } diff --git a/src/caffe/parallel.cpp b/src/caffe/parallel.cpp index 5bc41c6a..d9433917 100644 --- a/src/caffe/parallel.cpp +++ b/src/caffe/parallel.cpp @@ -1,16 +1,15 @@ -#ifndef CPU_ONLY +#ifdef USE_NCCL + #include <cuda_runtime.h> -#endif #include <glog/logging.h> #include <stdio.h> - #include <sstream> #include <string> #include <vector> -#include "boost/thread.hpp" #include "caffe/caffe.hpp" #include "caffe/parallel.hpp" +#include "caffe/sgd_solvers.hpp" namespace caffe { @@ -68,15 +67,14 @@ static size_t total_size(const vector<Blob<Dtype>*>& params) { template<typename Dtype> Params<Dtype>::Params(shared_ptr<Solver<Dtype> > root_solver) - : size_(total_size<Dtype>(root_solver->net()->learnable_params())), - data_(), - diff_() { + : size_(total_size<Dtype>(root_solver->net()->learnable_params())), + data_(), + diff_() { } template<typename Dtype> GPUParams<Dtype>::GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device) - : Params<Dtype>(root_solver) { -#ifndef CPU_ONLY + : Params<Dtype>(root_solver) { int initial_device; CUDA_CHECK(cudaGetDevice(&initial_device)); @@ -86,358 +84,288 @@ GPUParams<Dtype>::GPUParams(shared_ptr<Solver<Dtype> > root_solver, int device) // Copy blob values const vector<Blob<Dtype>*>& net = - root_solver->net()->learnable_params(); + root_solver->net()->learnable_params(); apply_buffers(net, data_, size_, copy); CUDA_CHECK(cudaMalloc(&diff_, size_ * sizeof(Dtype))); caffe_gpu_set(size_, Dtype(0), diff_); CUDA_CHECK(cudaSetDevice(initial_device)); -#else - NO_GPU; -#endif } template<typename Dtype> GPUParams<Dtype>::~GPUParams() { -#ifndef CPU_ONLY CUDA_CHECK(cudaFree(data_)); CUDA_CHECK(cudaFree(diff_)); -#endif } template<typename Dtype> -void GPUParams<Dtype>::configure(Solver<Dtype>* solver) const { +void GPUParams<Dtype>::Configure(Solver<Dtype>* solver) const { const vector<Blob<Dtype>*>& net = - solver->net()->learnable_params(); + solver->net()->learnable_params(); apply_buffers(net, data_, size_, replace_gpu); apply_buffers(net, diff_, size_, replace_gpu_diff); } -void DevicePair::compute(const vector<int> devices, vector<DevicePair>* pairs) { -#ifndef CPU_ONLY - vector<int> remaining(devices); - - // Depth for reduction tree - int remaining_depth = static_cast<int>(ceil(log2(remaining.size()))); - - // Group GPUs by board - for (int d = 0; d < remaining_depth; ++d) { - for (int i = 0; i < remaining.size(); ++i) { - for (int j = i + 1; j < remaining.size(); ++j) { - cudaDeviceProp a, b; - CUDA_CHECK(cudaGetDeviceProperties(&a, remaining[i])); - CUDA_CHECK(cudaGetDeviceProperties(&b, remaining[j])); - if (a.isMultiGpuBoard && b.isMultiGpuBoard) { - if (a.multiGpuBoardGroupID == b.multiGpuBoardGroupID) { - pairs->push_back(DevicePair(remaining[i], remaining[j])); - DLOG(INFO) << "GPU board: " << remaining[i] << ":" << remaining[j]; - remaining.erase(remaining.begin() + j); - break; - } - } - } - } - } - ostringstream s; - for (int i = 0; i < remaining.size(); ++i) { - s << (i ? ", " : "") << remaining[i]; - } - DLOG(INFO) << "GPUs paired by boards, remaining: " << s.str(); - - // Group by P2P accessibility - remaining_depth = ceil(log2(remaining.size())); - for (int d = 0; d < remaining_depth; ++d) { - for (int i = 0; i < remaining.size(); ++i) { - for (int j = i + 1; j < remaining.size(); ++j) { - int access; - CUDA_CHECK( - cudaDeviceCanAccessPeer(&access, remaining[i], remaining[j])); - if (access) { - pairs->push_back(DevicePair(remaining[i], remaining[j])); - DLOG(INFO) << "P2P pair: " << remaining[i] << ":" << remaining[j]; - remaining.erase(remaining.begin() + j); - break; - } - } - } - } - s.str(""); - for (int i = 0; i < remaining.size(); ++i) { - s << (i ? ", " : "") << remaining[i]; - } - DLOG(INFO) << "GPUs paired by P2P access, remaining: " << s.str(); - - // Group remaining - remaining_depth = ceil(log2(remaining.size())); - for (int d = 0; d < remaining_depth; ++d) { - for (int i = 0; i < remaining.size(); ++i) { - pairs->push_back(DevicePair(remaining[i], remaining[i + 1])); - DLOG(INFO) << "Remaining pair: " << remaining[i] << ":" - << remaining[i + 1]; - remaining.erase(remaining.begin() + i + 1); - } - } +static int getDevice() { + int device = 0; + CUDA_CHECK(cudaGetDevice(&device)); + return device; +} - // Should only be the parent node remaining - CHECK_EQ(remaining.size(), 1); +template<typename Dtype> +NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver) + : GPUParams<Dtype>(solver, getDevice()), + comm_(), solver_(solver), barrier_() { + this->Configure(solver.get()); + Init(); +} - pairs->insert(pairs->begin(), DevicePair(-1, remaining[0])); +template<typename Dtype> +NCCL<Dtype>::NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid) + : GPUParams<Dtype>(solver, getDevice()), + solver_(solver), barrier_() { + this->Configure(solver.get()); + Caffe::set_multiprocess(true); + ncclUniqueId nccl_uid; + memcpy(&nccl_uid, &uid[0], NCCL_UNIQUE_ID_BYTES); // NOLINT(caffe/alt_fn) + NCCL_CHECK(ncclCommInitRank(&comm_, + Caffe::solver_count(), + nccl_uid, + Caffe::solver_rank())); + Init(); +} - CHECK(pairs->size() == devices.size()); - for (int i = 0; i < pairs->size(); ++i) { - CHECK((*pairs)[i].parent() != (*pairs)[i].device()); - for (int j = i + 1; j < pairs->size(); ++j) { - CHECK((*pairs)[i].device() != (*pairs)[j].device()); - } +template<typename Dtype> +void NCCL<Dtype>::Init() { + if (solver_->param().layer_wise_reduce()) { + CUDA_CHECK(cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking)); } -#else - NO_GPU; -#endif } -// - template<typename Dtype> -P2PSync<Dtype>::P2PSync(shared_ptr<Solver<Dtype> > root_solver, - P2PSync<Dtype>* parent, const SolverParameter& param) - : GPUParams<Dtype>(root_solver, param.device_id()), - parent_(parent), - children_(), - queue_(), - initial_iter_(root_solver->iter()), - solver_() { -#ifndef CPU_ONLY - int initial_device; - CUDA_CHECK(cudaGetDevice(&initial_device)); - const int self = param.device_id(); - CUDA_CHECK(cudaSetDevice(self)); - - if (parent == NULL) { - solver_ = root_solver; - } else { - Caffe::set_root_solver(false); - solver_.reset(new WorkerSolver<Dtype>(param, root_solver.get())); - Caffe::set_root_solver(true); +NCCL<Dtype>::~NCCL() { + if (solver_->param().layer_wise_reduce()) { + CUDA_CHECK(cudaStreamDestroy(stream_)); } - this->configure(solver_.get()); - solver_->add_callback(this); - - if (parent) { - // Enable p2p access between devices - const int peer = parent->solver_->param().device_id(); - int access; - CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer)); - if (access) { - CUDA_CHECK(cudaDeviceEnablePeerAccess(peer, 0)); - } else { - LOG(INFO)<< "GPU " << self << " does not have p2p access to GPU " << peer; - } - // Allocate receiving buffer on parent - CUDA_CHECK(cudaSetDevice(peer)); - CUDA_CHECK(cudaMalloc(&parent_grads_, size_ * sizeof(Dtype))); - CUDA_CHECK(cudaSetDevice(self)); + if (comm_) { + ncclCommDestroy(comm_); } - - CUDA_CHECK(cudaSetDevice(initial_device)); -#else - NO_GPU; -#endif } template<typename Dtype> -P2PSync<Dtype>::~P2PSync() { -#ifndef CPU_ONLY - int initial_device; - CUDA_CHECK(cudaGetDevice(&initial_device)); - const int self = solver_->param().device_id(); - CUDA_CHECK(cudaSetDevice(self)); - - if (parent_) { - CUDA_CHECK(cudaFree(parent_grads_)); - const int peer = parent_->solver_->param().device_id(); - int access; - CUDA_CHECK(cudaDeviceCanAccessPeer(&access, self, peer)); - if (access) { - CUDA_CHECK(cudaDeviceDisablePeerAccess(peer)); - } - } - - CUDA_CHECK(cudaSetDevice(initial_device)); -#endif +boost::barrier* NCCL<Dtype>::barrier() { + return barrier_; +} +template<typename Dtype> +void NCCL<Dtype>::set_barrier(boost::barrier* value) { + barrier_ = value; } template<typename Dtype> -void P2PSync<Dtype>::InternalThreadEntry() { - Caffe::SetDevice(solver_->param().device_id()); - CHECK(Caffe::root_solver()); - Caffe::set_root_solver(false); - // See if there is a defined seed and reset random state if so - if (solver_->param().random_seed() >= 0) { - // Fetch random seed and modulate by device ID to make sure - // everyone doesn't have the same seed. We seem to have some - // solver instability if we have everyone with the same seed - Caffe::set_random_seed( - solver_->param().random_seed() + solver_->param().device_id()); +void NCCL<Dtype>::InitSingleProcess(vector<NCCL<Dtype>*>* nccls) { + ncclComm_t* comms = new ncclComm_t[nccls->size()]; + int* gpu_list = new int[nccls->size()]; + for (int i = 0; i < nccls->size(); ++i) { + gpu_list[i] = (*nccls)[i]->solver_->param().device_id(); + } + NCCL_CHECK(ncclCommInitAll(comms, static_cast<int>(nccls->size()), gpu_list)); + for (int i = 0; i < nccls->size(); ++i) { + (*nccls)[i]->comm_ = comms[i]; } - solver_->Step(solver_->param().max_iter() - initial_iter_); } template<typename Dtype> -void P2PSync<Dtype>::on_start() { -#ifndef CPU_ONLY -#ifdef DEBUG - int device; - CUDA_CHECK(cudaGetDevice(&device)); - CHECK(device == solver_->param().device_id()); -#else -// CHECK(false); -#endif +string NCCL<Dtype>::new_uid() { + string uid; + uid.resize(NCCL_UNIQUE_ID_BYTES); + ncclUniqueId nccl_uid; + NCCL_CHECK(ncclGetUniqueId(&nccl_uid)); + memcpy(&uid[0], &nccl_uid, NCCL_UNIQUE_ID_BYTES); // NOLINT(caffe/alt_fn) + return uid; +} - // Wait for update from parent - if (parent_) { - P2PSync<Dtype> *parent = queue_.pop(); - CHECK(parent == parent_); +template<typename Dtype> +void NCCL<Dtype>::Broadcast() { + if (barrier_) { // NULL in multi process case + barrier_->wait(); } - - // Update children - for (int i = children_.size() - 1; i >= 0; i--) { - Dtype* src = data_; - Dtype* dst = children_[i]->data_; - -#ifdef DEBUG - cudaPointerAttributes attributes; - CUDA_CHECK(cudaPointerGetAttributes(&attributes, src)); - CHECK(attributes.device == device); - CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst)); - CHECK(attributes.device == children_[i]->solver_->param().device_id()); -#endif - - CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype), - cudaMemcpyDeviceToDevice, cudaStreamDefault)); - CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault)); - children_[i]->queue_.push(this); + NCCL_CHECK(ncclBcast(data_, static_cast<int>(size_), + nccl::dataType<Dtype>::type, 0, + comm_, cudaStreamDefault)); + if (barrier_) { + barrier_->wait(); } -#endif } template<typename Dtype> -void P2PSync<Dtype>::on_gradients_ready() { -#ifndef CPU_ONLY +void NCCL<Dtype>::run(int layer) { + CHECK(solver_->param().layer_wise_reduce()); + vector<shared_ptr<Blob<Dtype> > >& blobs = + solver_->net()->layers()[layer]->blobs(); #ifdef DEBUG - int device; - CUDA_CHECK(cudaGetDevice(&device)); - CHECK(device == solver_->param().device_id()); + // Assert blobs are contiguous to reduce in one step (e.g. bias often small) + for (int i = 1; i < blobs.size(); ++i) { + CHECK_EQ(blobs[i - 1]->gpu_diff() + blobs[i - 1]->count(), + blobs[i + 0]->gpu_diff()); + } #endif + if (blobs.size() > 0) { + // Make sure default stream is done computing gradients. Could be + // replaced by cudaEventRecord+cudaStreamWaitEvent to avoid + // blocking the default stream, but it's actually slower. + CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault)); - // Sum children gradients as they appear in the queue - for (int i = 0; i < children_.size(); ++i) { - P2PSync<Dtype> *child = queue_.pop(); - Dtype* src = child->parent_grads_; - Dtype* dst = diff_; - -#ifdef DEBUG - bool ok = false; - for (int j = 0; j < children_.size(); ++j) { - if (child == children_[j]) { - ok = true; - } + // Reduce asynchronously + int size = 0; + for (int i = 0; i < blobs.size(); ++i) { + size += blobs[i]->count(); } - CHECK(ok); - cudaPointerAttributes attributes; - CUDA_CHECK(cudaPointerGetAttributes(&attributes, src)); - CHECK(attributes.device == device); - CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst)); - CHECK(attributes.device == device); -#endif - - caffe_gpu_add(size_, src, dst, dst); + if (barrier_) { // NULL in multi process case + barrier_->wait(); + } + NCCL_CHECK(ncclAllReduce(blobs[0]->mutable_gpu_diff(), + blobs[0]->mutable_gpu_diff(), + size, + nccl::dataType<Dtype>::type, + ncclSum, comm_, stream_)); + caffe_gpu_scal(size, (Dtype) 1.0 / Caffe::solver_count(), + blobs[0]->mutable_gpu_diff(), stream_); } +} - // Send gradients to parent - if (parent_) { - Dtype* src = diff_; - Dtype* dst = parent_grads_; - -#ifdef DEBUG - cudaPointerAttributes attributes; - CUDA_CHECK(cudaPointerGetAttributes(&attributes, src)); - CHECK(attributes.device == device); - CUDA_CHECK(cudaPointerGetAttributes(&attributes, dst)); - CHECK(attributes.device == parent_->solver_->param().device_id()); -#endif - - CUDA_CHECK(cudaMemcpyAsync(dst, src, size_ * sizeof(Dtype), // - cudaMemcpyDeviceToDevice, cudaStreamDefault)); - CUDA_CHECK(cudaStreamSynchronize(cudaStreamDefault)); - parent_->queue_.push(this); +template<typename Dtype> +void NCCL<Dtype>::on_gradients_ready() { + if (solver_->param().layer_wise_reduce()) { + CHECK_EQ(solver_->net()->params().size(), + solver_->net()->learnable_params().size()) + << "Layer-wise reduce is not supported for nets with shared weights."; + + // Make sure reduction is done before applying gradients + CUDA_CHECK(cudaStreamSynchronize(stream_)); } else { - // Loss functions divide gradients by the batch size, so to compensate - // for split batch, the root solver divides by number of solvers. - caffe_gpu_scal(size_, Dtype(1.0 / Caffe::solver_count()), diff_); + if (barrier_) { // NULL in multi process case + barrier_->wait(); + } + NCCL_CHECK(ncclAllReduce(diff_, diff_, static_cast<int>(size_), + nccl::dataType<Dtype>::type, ncclSum, comm_, + cudaStreamDefault)); + caffe_gpu_scal(static_cast<int>(size_), + (Dtype) 1.0 / Caffe::solver_count(), diff_); } -#endif } template<typename Dtype> -void P2PSync<Dtype>::Prepare(const vector<int>& gpus, - vector<shared_ptr<P2PSync<Dtype> > >* syncs) { - // Pair devices for map-reduce synchronization - vector<DevicePair> pairs; - DevicePair::compute(gpus, &pairs); - ostringstream s; - for (int i = 1; i < pairs.size(); ++i) { - s << (i == 1 ? "" : ", ") << pairs[i].parent() << ":" << pairs[i].device(); +class Worker : public InternalThread { + public: + explicit Worker(shared_ptr<Solver<Dtype> > rank0, int device, + boost::barrier* barrier, vector<NCCL<Dtype>*>* nccls, + const char* restore) + : rank0_(rank0), device_(device), barrier_(barrier), + nccls_(nccls), restore_(restore) { } - LOG(INFO)<< "GPUs pairs " << s.str(); - - SolverParameter param(solver_->param()); - - // Build the GPU tree by finding the parent for each solver - for (int attempts = 0; attempts < pairs.size(); ++attempts) { - for (int i = 1; i < pairs.size(); ++i) { - if (!syncs->at(i).get()) { - P2PSync<Dtype>* parent = NULL; - for (int j = 0; j < syncs->size(); ++j) { - P2PSync<Dtype>* sync = j == 0 ? this : syncs->at(j).get(); - if (sync) { - const SolverParameter& p = sync->solver()->param(); - if (p.device_id() == pairs[i].parent()) { - parent = sync; - } - } - } - if (parent) { - param.set_device_id(pairs[i].device()); - syncs->at(i).reset(new P2PSync<Dtype>(solver_, parent, param)); - parent->children_.push_back((P2PSync<Dtype>*) syncs->at(i).get()); - } + virtual ~Worker() {} + + protected: + void InternalThreadEntry() { + // Create solver and install callbacks + SolverParameter param(rank0_->param()); + param.set_device_id(device_); +#ifdef DEBUG + int device; + CUDA_CHECK(cudaGetDevice(&device)); + CHECK_EQ(device, device_); +#endif + param.set_type(rank0_->type()); + shared_ptr<Solver<Dtype> > s(SolverRegistry<Dtype>::CreateSolver(param)); + CHECK_EQ(s->type(), rank0_->type()); + if (restore_) { + // Could not make NCCL broadcast solver state, it seems to crash + // if called in a tight loop, regardless of barriers etc. so + // restore all solvers from file. + s->Restore(restore_); + } + NCCL<Dtype> nccl(s); + nccl.set_barrier(barrier_); + s->add_callback(&nccl); + if (s->param().layer_wise_reduce()) { + s->net()->add_after_backward(&nccl); + } + (*nccls_)[Caffe::solver_rank()] = &nccl; + // Wait for other threads + barrier_->wait(); + // Wait for NCCL init + barrier_->wait(); + // Broadcast rank 0 state + nccl.Broadcast(); + // Solve + s->Step(param.max_iter() - s->iter()); + barrier_->wait(); +#ifdef DEBUG + // Check all solvers have same state + SGDSolver<Dtype>* sa = static_cast<SGDSolver<Dtype>*>(rank0_.get()); + SGDSolver<Dtype>* sb = static_cast<SGDSolver<Dtype>*>(s.get()); + for (int h = 0; h < sa->history().size(); ++h) { + CUDA_CHECK(cudaSetDevice(sa->param().device_id())); + const Dtype* a = sa->history()[h]->cpu_data(); + CUDA_CHECK(cudaSetDevice(sb->param().device_id())); + const Dtype* b = sb->history()[h]->cpu_data(); + for (int v = 0; v < sa->history()[h]->count(); ++v) { + CHECK_DOUBLE_EQ(a[v], b[v]); } } +#endif } -} - -template<typename Dtype> -void P2PSync<Dtype>::Run(const vector<int>& gpus) { - vector<shared_ptr<P2PSync<Dtype> > > syncs(gpus.size()); - Prepare(gpus, &syncs); - LOG(INFO)<< "Starting Optimization"; + shared_ptr<Solver<Dtype> > rank0_; + int device_; + boost::barrier* barrier_; + vector<NCCL<Dtype>*>* nccls_; + const char* restore_; +}; - for (int i = 1; i < syncs.size(); ++i) { - syncs[i]->StartInternalThread(); +template<typename Dtype> +void NCCL<Dtype>::Run(const vector<int>& gpus, const char* restore) { + boost::barrier barrier(static_cast<int>(gpus.size())); + vector<NCCL<Dtype>*> nccls(gpus.size()); + // Create workers + vector<shared_ptr<Worker<Dtype> > > workers(gpus.size()); + for (int i = 1; i < gpus.size(); ++i) { + CUDA_CHECK(cudaSetDevice(gpus[i])); + Caffe::set_solver_rank(i); + Worker<Dtype>* w = new Worker<Dtype>(solver_, gpus[i], &barrier, + &nccls, restore); + w->StartInternalThread(); + workers[i].reset(w); } - - // Run root solver on current thread + CUDA_CHECK(cudaSetDevice(gpus[0])); + Caffe::set_solver_rank(0); + barrier_ = &barrier; + solver_->add_callback(this); + if (solver_->param().layer_wise_reduce()) { + solver_->net()->add_after_backward(this); + } + nccls[0] = this; + // Wait for workers + barrier.wait(); + // Init NCCL + InitSingleProcess(&nccls); + barrier.wait(); + // Run first solver on current thread + Broadcast(); solver_->Solve(); - - for (int i = 1; i < syncs.size(); ++i) { - syncs[i]->StopInternalThread(); + barrier.wait(); // Hangs without it when running tests + // Wait for shutdown + for (int i = 1; i < gpus.size(); ++i) { + workers[i]->StopInternalThread(); } } INSTANTIATE_CLASS(Params); INSTANTIATE_CLASS(GPUParams); -INSTANTIATE_CLASS(P2PSync); +INSTANTIATE_CLASS(Worker); +INSTANTIATE_CLASS(NCCL); } // namespace caffe + +#endif // USE_NCCL diff --git a/src/caffe/proto/caffe.proto b/src/caffe/proto/caffe.proto index 430a0dea..1c85f696 100644 --- a/src/caffe/proto/caffe.proto +++ b/src/caffe/proto/caffe.proto @@ -98,7 +98,7 @@ message NetParameter { // NOTE // Update the next available ID when you add a new SolverParameter field. // -// SolverParameter next available ID: 41 (last added: type) +// SolverParameter next available ID: 42 (last added: layer_wise_reduce) message SolverParameter { ////////////////////////////////////////////////////////////////////////////// // Specifying the train and test networks @@ -239,6 +239,9 @@ message SolverParameter { } // DEPRECATED: use type instead of solver_type optional SolverType solver_type = 30 [default = SGD]; + + // Overlap compute and communication for data parallel training + optional bool layer_wise_reduce = 41 [default = true]; } // A message that stores the solver snapshots @@ -655,8 +658,8 @@ message DataParameter { optional bool mirror = 6 [default = false]; // Force the encoded image to have 3 color channels optional bool force_encoded_color = 9 [default = false]; - // Prefetch queue (Number of batches to prefetch to host memory, increase if - // data access bandwidth varies). + // Prefetch queue (Increase if data feeding bandwidth varies, within the + // limit of device memory for GPU training) optional uint32 prefetch = 10 [default = 4]; } diff --git a/src/caffe/solver.cpp b/src/caffe/solver.cpp index ece3913e..1c1a9e59 100644 --- a/src/caffe/solver.cpp +++ b/src/caffe/solver.cpp @@ -26,16 +26,14 @@ SolverAction::Enum Solver<Dtype>::GetRequestedAction() { } template <typename Dtype> -Solver<Dtype>::Solver(const SolverParameter& param, const Solver* root_solver) - : net_(), callbacks_(), root_solver_(root_solver), - requested_early_exit_(false) { +Solver<Dtype>::Solver(const SolverParameter& param) + : net_(), callbacks_(), requested_early_exit_(false) { Init(param); } template <typename Dtype> -Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver) - : net_(), callbacks_(), root_solver_(root_solver), - requested_early_exit_(false) { +Solver<Dtype>::Solver(const string& param_file) + : net_(), callbacks_(), requested_early_exit_(false) { SolverParameter param; ReadSolverParamsFromTextFileOrDie(param_file, ¶m); Init(param); @@ -43,15 +41,13 @@ Solver<Dtype>::Solver(const string& param_file, const Solver* root_solver) template <typename Dtype> void Solver<Dtype>::Init(const SolverParameter& param) { - CHECK(Caffe::root_solver() || root_solver_) - << "root_solver_ needs to be set for all non-root solvers"; LOG_IF(INFO, Caffe::root_solver()) << "Initializing solver from parameters: " << std::endl << param.DebugString(); param_ = param; CHECK_GE(param_.average_loss(), 1) << "average_loss should be non-negative."; CheckSnapshotWritePermissions(); - if (Caffe::root_solver() && param_.random_seed() >= 0) { - Caffe::set_random_seed(param_.random_seed()); + if (param_.random_seed() >= 0) { + Caffe::set_random_seed(param_.random_seed() + Caffe::solver_rank()); } // Scaffolding code InitTrainNet(); @@ -101,11 +97,7 @@ void Solver<Dtype>::InitTrainNet() { net_state.MergeFrom(net_param.state()); net_state.MergeFrom(param_.train_state()); net_param.mutable_state()->CopyFrom(net_state); - if (Caffe::root_solver()) { - net_.reset(new Net<Dtype>(net_param)); - } else { - net_.reset(new Net<Dtype>(net_param, root_solver_->net_.get())); - } + net_.reset(new Net<Dtype>(net_param)); } template <typename Dtype> @@ -180,12 +172,7 @@ void Solver<Dtype>::InitTestNets() { net_params[i].mutable_state()->CopyFrom(net_state); LOG(INFO) << "Creating test net (#" << i << ") specified by " << sources[i]; - if (Caffe::root_solver()) { - test_nets_[i].reset(new Net<Dtype>(net_params[i])); - } else { - test_nets_[i].reset(new Net<Dtype>(net_params[i], - root_solver_->test_nets_[i].get())); - } + test_nets_[i].reset(new Net<Dtype>(net_params[i])); test_nets_[i]->set_debug_info(param_.debug_info()); } } @@ -197,14 +184,16 @@ void Solver<Dtype>::Step(int iters) { int average_loss = this->param_.average_loss(); losses_.clear(); smoothed_loss_ = 0; + iteration_timer_.Start(); while (iter_ < stop_iter) { // zero-init the params net_->ClearParamDiffs(); if (param_.test_interval() && iter_ % param_.test_interval() == 0 - && (iter_ > 0 || param_.test_initialization()) - && Caffe::root_solver()) { - TestAll(); + && (iter_ > 0 || param_.test_initialization())) { + if (Caffe::root_solver()) { + TestAll(); + } if (requested_early_exit_) { // Break out of the while loop because stop was requested while testing. break; @@ -225,8 +214,13 @@ void Solver<Dtype>::Step(int iters) { // average the loss across iterations for smoothed reporting UpdateSmoothedLoss(loss, start_iter, average_loss); if (display) { + float lapse = iteration_timer_.Seconds(); + float per_s = (iter_ - iterations_last_) / (lapse ? lapse : 1); LOG_IF(INFO, Caffe::root_solver()) << "Iteration " << iter_ - << ", loss = " << smoothed_loss_; + << " (" << per_s << " iter/s, " << lapse << "s/" + << param_.display() << " iters), loss = " << smoothed_loss_; + iteration_timer_.Start(); + iterations_last_ = iter_; const vector<Blob<Dtype>*>& result = net_->output_blobs(); int score_index = 0; for (int j = 0; j < result.size(); ++j) { diff --git a/src/caffe/solvers/adagrad_solver.cpp b/src/caffe/solvers/adagrad_solver.cpp index e78eadca..d8107e1e 100644 --- a/src/caffe/solvers/adagrad_solver.cpp +++ b/src/caffe/solvers/adagrad_solver.cpp @@ -12,7 +12,6 @@ void adagrad_update_gpu(int N, Dtype* g, Dtype* h, Dtype delta, template <typename Dtype> void AdaGradSolver<Dtype>::ComputeUpdateValue(int param_id, Dtype rate) { - CHECK(Caffe::root_solver()); const vector<Blob<Dtype>*>& net_params = this->net_->learnable_params(); const vector<float>& net_params_lr = this->net_->params_lr(); Dtype delta = this->param_.delta(); diff --git a/src/caffe/solvers/nesterov_solver.cpp b/src/caffe/solvers/nesterov_solver.cpp index 23ab2d43..7c1fac1f 100644 --- a/src/caffe/solvers/nesterov_solver.cpp +++ b/src/caffe/solvers/nesterov_solver.cpp @@ -12,7 +12,6 @@ void nesterov_update_gpu(int N, Dtype* g, Dtype* h, Dtype momentum, template <typename Dtype> void NesterovSolver<Dtype>::ComputeUpdateValue(int param_id, Dtype rate) { - CHECK(Caffe::root_solver()); const vector<Blob<Dtype>*>& net_params = this->net_->learnable_params(); const vector<float>& net_params_lr = this->net_->params_lr(); Dtype momentum = this->param_.momentum(); diff --git a/src/caffe/solvers/sgd_solver.cpp b/src/caffe/solvers/sgd_solver.cpp index f30f316d..ad6abe54 100644 --- a/src/caffe/solvers/sgd_solver.cpp +++ b/src/caffe/solvers/sgd_solver.cpp @@ -100,10 +100,10 @@ void SGDSolver<Dtype>::ClipGradients() { template <typename Dtype> void SGDSolver<Dtype>::ApplyUpdate() { - CHECK(Caffe::root_solver()); Dtype rate = GetLearningRate(); if (this->param_.display() && this->iter_ % this->param_.display() == 0) { - LOG(INFO) << "Iteration " << this->iter_ << ", lr = " << rate; + LOG_IF(INFO, Caffe::root_solver()) << "Iteration " << this->iter_ + << ", lr = " << rate; } ClipGradients(); for (int param_id = 0; param_id < this->net_->learnable_params().size(); diff --git a/src/caffe/syncedmem.cpp b/src/caffe/syncedmem.cpp index 4d356417..88d9b785 100644 --- a/src/caffe/syncedmem.cpp +++ b/src/caffe/syncedmem.cpp @@ -3,26 +3,41 @@ #include "caffe/util/math_functions.hpp" namespace caffe { +SyncedMemory::SyncedMemory() + : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(0), head_(UNINITIALIZED), + own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false) { +#ifndef CPU_ONLY +#ifdef DEBUG + CUDA_CHECK(cudaGetDevice(&device_)); +#endif +#endif +} + +SyncedMemory::SyncedMemory(size_t size) + : cpu_ptr_(NULL), gpu_ptr_(NULL), size_(size), head_(UNINITIALIZED), + own_cpu_data_(false), cpu_malloc_use_cuda_(false), own_gpu_data_(false) { +#ifndef CPU_ONLY +#ifdef DEBUG + CUDA_CHECK(cudaGetDevice(&device_)); +#endif +#endif +} SyncedMemory::~SyncedMemory() { + check_device(); if (cpu_ptr_ && own_cpu_data_) { CaffeFreeHost(cpu_ptr_, cpu_malloc_use_cuda_); } #ifndef CPU_ONLY if (gpu_ptr_ && own_gpu_data_) { - int initial_device; - cudaGetDevice(&initial_device); - if (gpu_device_ != -1) { - CUDA_CHECK(cudaSetDevice(gpu_device_)); - } CUDA_CHECK(cudaFree(gpu_ptr_)); - cudaSetDevice(initial_device); } #endif // CPU_ONLY } inline void SyncedMemory::to_cpu() { + check_device(); switch (head_) { case UNINITIALIZED: CaffeMallocHost(&cpu_ptr_, size_, &cpu_malloc_use_cuda_); @@ -49,10 +64,10 @@ inline void SyncedMemory::to_cpu() { } inline void SyncedMemory::to_gpu() { + check_device(); #ifndef CPU_ONLY switch (head_) { case UNINITIALIZED: - CUDA_CHECK(cudaGetDevice(&gpu_device_)); CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_)); caffe_gpu_memset(size_, 0, gpu_ptr_); head_ = HEAD_AT_GPU; @@ -60,7 +75,6 @@ inline void SyncedMemory::to_gpu() { break; case HEAD_AT_CPU: if (gpu_ptr_ == NULL) { - CUDA_CHECK(cudaGetDevice(&gpu_device_)); CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_)); own_gpu_data_ = true; } @@ -77,11 +91,13 @@ inline void SyncedMemory::to_gpu() { } const void* SyncedMemory::cpu_data() { + check_device(); to_cpu(); return (const void*)cpu_ptr_; } void SyncedMemory::set_cpu_data(void* data) { + check_device(); CHECK(data); if (own_cpu_data_) { CaffeFreeHost(cpu_ptr_, cpu_malloc_use_cuda_); @@ -92,6 +108,7 @@ void SyncedMemory::set_cpu_data(void* data) { } const void* SyncedMemory::gpu_data() { + check_device(); #ifndef CPU_ONLY to_gpu(); return (const void*)gpu_ptr_; @@ -102,16 +119,11 @@ const void* SyncedMemory::gpu_data() { } void SyncedMemory::set_gpu_data(void* data) { + check_device(); #ifndef CPU_ONLY CHECK(data); if (own_gpu_data_) { - int initial_device; - cudaGetDevice(&initial_device); - if (gpu_device_ != -1) { - CUDA_CHECK(cudaSetDevice(gpu_device_)); - } CUDA_CHECK(cudaFree(gpu_ptr_)); - cudaSetDevice(initial_device); } gpu_ptr_ = data; head_ = HEAD_AT_GPU; @@ -122,12 +134,14 @@ void SyncedMemory::set_gpu_data(void* data) { } void* SyncedMemory::mutable_cpu_data() { + check_device(); to_cpu(); head_ = HEAD_AT_CPU; return cpu_ptr_; } void* SyncedMemory::mutable_gpu_data() { + check_device(); #ifndef CPU_ONLY to_gpu(); head_ = HEAD_AT_GPU; @@ -140,9 +154,9 @@ void* SyncedMemory::mutable_gpu_data() { #ifndef CPU_ONLY void SyncedMemory::async_gpu_push(const cudaStream_t& stream) { + check_device(); CHECK(head_ == HEAD_AT_CPU); if (gpu_ptr_ == NULL) { - CUDA_CHECK(cudaGetDevice(&gpu_device_)); CUDA_CHECK(cudaMalloc(&gpu_ptr_, size_)); own_gpu_data_ = true; } @@ -153,5 +167,20 @@ void SyncedMemory::async_gpu_push(const cudaStream_t& stream) { } #endif +void SyncedMemory::check_device() { +#ifndef CPU_ONLY +#ifdef DEBUG + int device; + cudaGetDevice(&device); + CHECK(device == device_); + if (gpu_ptr_ && own_gpu_data_) { + cudaPointerAttributes attributes; + CUDA_CHECK(cudaPointerGetAttributes(&attributes, gpu_ptr_)); + CHECK(attributes.device == device_); + } +#endif +#endif +} + } // namespace caffe diff --git a/src/caffe/test/test_data_layer.cpp b/src/caffe/test/test_data_layer.cpp index 3e8d113d..3835af1f 100644 --- a/src/caffe/test/test_data_layer.cpp +++ b/src/caffe/test/test_data_layer.cpp @@ -105,6 +105,32 @@ class DataLayerTest : public MultiDeviceTest<TypeParam> { } } + void TestSkip() { + LayerParameter param; + param.set_phase(TRAIN); + DataParameter* data_param = param.mutable_data_param(); + int batch_size = 5; + data_param->set_batch_size(batch_size); + data_param->set_source(filename_->c_str()); + data_param->set_backend(backend_); + Caffe::set_solver_count(8); + for (int dev = 0; dev < Caffe::solver_count(); ++dev) { + Caffe::set_solver_rank(dev); + DataLayer<Dtype> layer(param); + layer.SetUp(blob_bottom_vec_, blob_top_vec_); + int label = dev; + for (int iter = 0; iter < 10; ++iter) { + layer.Forward(blob_bottom_vec_, blob_top_vec_); + for (int i = 0; i < batch_size; ++i) { + EXPECT_EQ(label % batch_size, blob_top_label_->cpu_data()[i]); + label += Caffe::solver_count(); + } + } + } + Caffe::set_solver_count(1); + Caffe::set_solver_rank(0); + } + void TestReshape(DataParameter_DB backend) { const int num_inputs = 5; // Save data of varying shapes. @@ -356,6 +382,11 @@ TYPED_TEST(DataLayerTest, TestReadLevelDB) { this->TestRead(); } +TYPED_TEST(DataLayerTest, TestSkipLevelDB) { + this->Fill(false, DataParameter_DB_LEVELDB); + this->TestSkip(); +} + TYPED_TEST(DataLayerTest, TestReshapeLevelDB) { this->TestReshape(DataParameter_DB_LEVELDB); } @@ -396,6 +427,11 @@ TYPED_TEST(DataLayerTest, TestReadLMDB) { this->TestRead(); } +TYPED_TEST(DataLayerTest, TestSkipLMDB) { + this->Fill(false, DataParameter_DB_LMDB); + this->TestSkip(); +} + TYPED_TEST(DataLayerTest, TestReshapeLMDB) { this->TestReshape(DataParameter_DB_LMDB); } diff --git a/src/caffe/test/test_gradient_based_solver.cpp b/src/caffe/test/test_gradient_based_solver.cpp index 975a8f0f..6ad0d8f6 100644 --- a/src/caffe/test/test_gradient_based_solver.cpp +++ b/src/caffe/test/test_gradient_based_solver.cpp @@ -36,7 +36,9 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> { string snapshot_prefix_; shared_ptr<SGDSolver<Dtype> > solver_; - shared_ptr<P2PSync<Dtype> > sync_; +#ifdef USE_NCCL + shared_ptr<NCCL<Dtype> > nccl_; +#endif int seed_; // Dimensions are determined by generate_sample_data.py // TODO this is brittle and the hdf5 file should be checked instead. @@ -85,6 +87,7 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> { "lr_policy: 'fixed' " "iter_size: " << iter_size << " " "device_id: " << device_id << " " + "layer_wise_reduce: " << (!share_) << " " "net_param { " " name: 'TestNetwork' " " layer { " @@ -183,7 +186,7 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> { } Caffe::set_random_seed(this->seed_); this->InitSolverFromProtoString(proto.str()); - if (from_snapshot != NULL) { + if (from_snapshot) { this->solver_->Restore(from_snapshot); for (int i = 0; i < this->solver_->iter(); ++i) { this->solver_->net()->Forward(); @@ -202,9 +205,10 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> { gpus.push_back(i); } Caffe::set_solver_count(gpus.size()); - this->sync_.reset(new P2PSync<Dtype>( - this->solver_, NULL, this->solver_->param())); - this->sync_->Run(gpus); +#ifdef USE_NCCL + this->nccl_.reset(new NCCL<Dtype>(this->solver_)); + this->nccl_->Run(gpus, from_snapshot); +#endif Caffe::set_solver_count(1); } if (snapshot) { @@ -457,12 +461,28 @@ class GradientBasedSolverTest : public MultiDeviceTest<TypeParam> { const int kIterSize = 1; // Test over all numbers of devices. int available_devices = 1; -#ifndef CPU_ONLY +#ifdef USE_NCCL if (Caffe::mode() == Caffe::GPU) { CUDA_CHECK(cudaGetDeviceCount(&available_devices)); } #endif - for (int devices = 1; devices <= available_devices; ++devices) { + // Takes a while to test all sizes for each test so sparse + vector<int> sizes; + sizes.push_back(1); + if (available_devices >= 2) { + sizes.push_back(2); + } + if (available_devices >= 3) { + sizes.push_back(3); + } + if (available_devices >= 8) { + sizes.push_back(8); + } + if (available_devices >= 16) { + sizes.push_back(16); + } + for (int i = 0; i < sizes.size(); ++i) { + int devices = sizes[i]; // Configure batch size for single / multi device equivalence. // Constant data is needed for multi device as for accumulation. num_ = kNum * devices; diff --git a/src/caffe/test/test_hdf5data_layer.cpp b/src/caffe/test/test_hdf5data_layer.cpp index 8884ce95..68e10286 100644 --- a/src/caffe/test/test_hdf5data_layer.cpp +++ b/src/caffe/test/test_hdf5data_layer.cpp @@ -133,4 +133,34 @@ TYPED_TEST(HDF5DataLayerTest, TestRead) { } } +TYPED_TEST(HDF5DataLayerTest, TestSkip) { + typedef typename TypeParam::Dtype Dtype; + LayerParameter param; + param.add_top("data"); + param.add_top("label"); + + HDF5DataParameter* hdf5_data_param = param.mutable_hdf5_data_param(); + int batch_size = 5; + hdf5_data_param->set_batch_size(batch_size); + hdf5_data_param->set_source(*(this->filename)); + + Caffe::set_solver_count(8); + for (int dev = 0; dev < Caffe::solver_count(); ++dev) { + Caffe::set_solver_rank(dev); + + HDF5DataLayer<Dtype> layer(param); + layer.SetUp(this->blob_bottom_vec_, this->blob_top_vec_); + int label = dev; + for (int iter = 0; iter < 1; ++iter) { + layer.Forward(this->blob_bottom_vec_, this->blob_top_vec_); + for (int i = 0; i < batch_size; ++i) { + EXPECT_EQ(1 + label, this->blob_top_label_->cpu_data()[i]); + label = (label + Caffe::solver_count()) % (batch_size * 2); + } + } + } + Caffe::set_solver_count(1); + Caffe::set_solver_rank(0); +} + } // namespace caffe diff --git a/src/caffe/util/blocking_queue.cpp b/src/caffe/util/blocking_queue.cpp index 058668fe..f69d2104 100644 --- a/src/caffe/util/blocking_queue.cpp +++ b/src/caffe/util/blocking_queue.cpp @@ -1,7 +1,6 @@ #include <boost/thread.hpp> #include <string> -#include "caffe/data_reader.hpp" #include "caffe/layers/base_data_layer.hpp" #include "caffe/parallel.hpp" #include "caffe/util/blocking_queue.hpp" @@ -88,9 +87,5 @@ size_t BlockingQueue<T>::size() const { template class BlockingQueue<Batch<float>*>; template class BlockingQueue<Batch<double>*>; -template class BlockingQueue<Datum*>; -template class BlockingQueue<shared_ptr<DataReader::QueuePair> >; -template class BlockingQueue<P2PSync<float>*>; -template class BlockingQueue<P2PSync<double>*>; } // namespace caffe diff --git a/src/caffe/util/db_lmdb.cpp b/src/caffe/util/db_lmdb.cpp index fb1d4956..491a9bd0 100644 --- a/src/caffe/util/db_lmdb.cpp +++ b/src/caffe/util/db_lmdb.cpp @@ -32,7 +32,7 @@ void LMDB::Open(const string& source, Mode mode) { MDB_CHECK(rc); } #endif - LOG(INFO) << "Opened lmdb " << source; + LOG_IF(INFO, Caffe::root_solver()) << "Opened lmdb " << source; } LMDBCursor* LMDB::NewCursor() { diff --git a/src/caffe/util/math_functions.cu b/src/caffe/util/math_functions.cu index 4c587537..6d001026 100644 --- a/src/caffe/util/math_functions.cu +++ b/src/caffe/util/math_functions.cu @@ -91,6 +91,26 @@ void caffe_gpu_scal<double>(const int N, const double alpha, double *X) { } template <> +void caffe_gpu_scal<float>(const int N, const float alpha, float* X, + cudaStream_t str) { + cudaStream_t initial_stream; + CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str)); + CUBLAS_CHECK(cublasSscal(Caffe::cublas_handle(), N, &alpha, X, 1)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream)); +} + +template <> +void caffe_gpu_scal<double>(const int N, const double alpha, double* X, + cudaStream_t str) { + cudaStream_t initial_stream; + CUBLAS_CHECK(cublasGetStream(Caffe::cublas_handle(), &initial_stream)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), str)); + CUBLAS_CHECK(cublasDscal(Caffe::cublas_handle(), N, &alpha, X, 1)); + CUBLAS_CHECK(cublasSetStream(Caffe::cublas_handle(), initial_stream)); +} + +template <> void caffe_gpu_axpby<float>(const int N, const float alpha, const float* X, const float beta, float* Y) { caffe_gpu_scal<float>(N, beta, Y); diff --git a/tools/caffe.cpp b/tools/caffe.cpp index 9bf4214a..3587d8aa 100644 --- a/tools/caffe.cpp +++ b/tools/caffe.cpp @@ -195,6 +195,7 @@ int train() { // If the gpus flag is not provided, allow the mode and device to be set // in the solver prototxt. if (FLAGS_gpu.size() == 0 + && solver_param.has_solver_mode() && solver_param.solver_mode() == caffe::SolverParameter_SolverMode_GPU) { if (solver_param.has_device_id()) { FLAGS_gpu = "" + @@ -244,11 +245,15 @@ int train() { CopyLayers(solver.get(), FLAGS_weights); } + LOG(INFO) << "Starting Optimization"; if (gpus.size() > 1) { - caffe::P2PSync<float> sync(solver, NULL, solver->param()); - sync.Run(gpus); +#ifdef USE_NCCL + caffe::NCCL<float> nccl(solver); + nccl.Run(gpus, FLAGS_snapshot.size() > 0 ? FLAGS_snapshot.c_str() : NULL); +#else + LOG(FATAL) << "Multi-GPU execution not available - rebuild with USE_NCCL"; +#endif } else { - LOG(INFO) << "Starting Optimization"; solver->Solve(); } LOG(INFO) << "Optimization Done."; |