summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCyprien Noel <cyprien.noel@gmail.com>2015-04-28 14:46:20 -0700
committerEvan Shelhamer <shelhamer@imaginarynumber.net>2015-08-09 15:13:10 -0700
commit73b3d13b68bedad9d19f70755b0ee4ef376e2a30 (patch)
tree25d714f6976a2009deb3945e7a104e193182f84a
parent45d792e8b1e44acb467ab9be3debdd9e819c11d1 (diff)
downloadcaffeonacl-73b3d13b68bedad9d19f70755b0ee4ef376e2a30.tar.gz
caffeonacl-73b3d13b68bedad9d19f70755b0ee4ef376e2a30.tar.bz2
caffeonacl-73b3d13b68bedad9d19f70755b0ee4ef376e2a30.zip
Change the way threads are started and stopped
- Interrupt the thread before waiting on join - Provide a method for looping threads to exit on demand - CHECK if start and stop succeed instead of returning an error
-rw-r--r--include/caffe/internal_thread.hpp8
-rw-r--r--src/caffe/internal_thread.cpp33
-rw-r--r--src/caffe/layers/base_data_layer.cpp4
-rw-r--r--src/caffe/test/test_internal_thread.cpp16
4 files changed, 32 insertions, 29 deletions
diff --git a/include/caffe/internal_thread.hpp b/include/caffe/internal_thread.hpp
index bcff318e..be6ff7fb 100644
--- a/include/caffe/internal_thread.hpp
+++ b/include/caffe/internal_thread.hpp
@@ -25,12 +25,11 @@ class InternalThread {
* Caffe's thread local state will be initialized using the current
* thread values, e.g. device id, solver index etc. The random seed
* is initialized using caffe_rng_rand.
- * Will not return until the internal thread has exited.
*/
- bool StartInternalThread();
+ void StartInternalThread();
/** Will not return until the internal thread has exited. */
- bool WaitForInternalThreadToExit();
+ void StopInternalThread();
bool is_started() const;
@@ -39,6 +38,9 @@ class InternalThread {
with the code you want your thread to run. */
virtual void InternalThreadEntry() {}
+ /* Should be tested when running loops to exit when requested. */
+ bool must_stop();
+
private:
void entry(int device, Caffe::Brew mode, int rand_seed);
diff --git a/src/caffe/internal_thread.cpp b/src/caffe/internal_thread.cpp
index 2be88b31..d6c26559 100644
--- a/src/caffe/internal_thread.cpp
+++ b/src/caffe/internal_thread.cpp
@@ -1,4 +1,5 @@
#include <boost/thread.hpp>
+#include <exception>
#include "caffe/internal_thread.hpp"
#include "caffe/util/math_functions.hpp"
@@ -9,18 +10,19 @@ InternalThread::~InternalThread() {
StopInternalThread();
}
-InternalThread::~InternalThread() {
- WaitForInternalThreadToExit();
+bool InternalThread::is_started() const {
+ return thread_ && thread_->joinable();
}
-bool InternalThread::is_started() const {
- return thread_.get() != NULL && thread_->joinable();
+bool InternalThread::must_stop() {
+ return thread_ && thread_->interruption_requested();
}
-bool InternalThread::StartInternalThread() {
- if (!WaitForInternalThreadToExit()) {
- return false;
- }
+void InternalThread::StartInternalThread() {
+ // TODO switch to failing once Caffe prefetch thread is persistent.
+ // Threads should not be started and stopped repeatedly.
+ // CHECK(!is_started());
+ StopInternalThread();
int device = 0;
#ifndef CPU_ONLY
@@ -32,10 +34,9 @@ bool InternalThread::StartInternalThread() {
try {
thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
rand_seed));
- } catch (...) {
- return false;
+ } catch (std::exception& e) {
+ LOG(FATAL) << "Thread exception: " << e.what();
}
- return true;
}
void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed) {
@@ -48,16 +49,16 @@ void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed) {
InternalThreadEntry();
}
-/** Will not return until the internal thread has exited. */
-bool InternalThread::WaitForInternalThreadToExit() {
+void InternalThread::StopInternalThread() {
if (is_started()) {
+ thread_->interrupt();
try {
thread_->join();
- } catch (...) {
- return false;
+ } catch (boost::thread_interrupted&) {
+ } catch (std::exception& e) {
+ LOG(FATAL) << "Thread exception: " << e.what();
}
}
- return true;
}
} // namespace caffe
diff --git a/src/caffe/layers/base_data_layer.cpp b/src/caffe/layers/base_data_layer.cpp
index 26a11182..facaed7f 100644
--- a/src/caffe/layers/base_data_layer.cpp
+++ b/src/caffe/layers/base_data_layer.cpp
@@ -47,12 +47,12 @@ void BasePrefetchingDataLayer<Dtype>::LayerSetUp(
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::CreatePrefetchThread() {
this->data_transformer_->InitRand();
- CHECK(StartInternalThread()) << "Thread execution failed";
+ StartInternalThread();
}
template <typename Dtype>
void BasePrefetchingDataLayer<Dtype>::JoinPrefetchThread() {
- CHECK(WaitForInternalThreadToExit()) << "Thread joining failed";
+ StopInternalThread();
}
template <typename Dtype>
diff --git a/src/caffe/test/test_internal_thread.cpp b/src/caffe/test/test_internal_thread.cpp
index 390c8eda..93f1cc54 100644
--- a/src/caffe/test/test_internal_thread.cpp
+++ b/src/caffe/test/test_internal_thread.cpp
@@ -14,9 +14,9 @@ class InternalThreadTest : public ::testing::Test {};
TEST_F(InternalThreadTest, TestStartAndExit) {
InternalThread thread;
EXPECT_FALSE(thread.is_started());
- EXPECT_TRUE(thread.StartInternalThread());
+ thread.StartInternalThread();
EXPECT_TRUE(thread.is_started());
- EXPECT_TRUE(thread.WaitForInternalThreadToExit());
+ thread.StopInternalThread();
EXPECT_FALSE(thread.is_started());
}
@@ -35,18 +35,18 @@ class TestThreadB : public InternalThread {
TEST_F(InternalThreadTest, TestRandomSeed) {
TestThreadA t1;
Caffe::set_random_seed(9658361);
- EXPECT_TRUE(t1.StartInternalThread());
- EXPECT_TRUE(t1.WaitForInternalThreadToExit());
+ t1.StartInternalThread();
+ t1.StopInternalThread();
TestThreadA t2;
Caffe::set_random_seed(9658361);
- EXPECT_TRUE(t2.StartInternalThread());
- EXPECT_TRUE(t2.WaitForInternalThreadToExit());
+ t2.StartInternalThread();
+ t2.StopInternalThread();
TestThreadB t3;
Caffe::set_random_seed(3435563);
- EXPECT_TRUE(t3.StartInternalThread());
- EXPECT_TRUE(t3.WaitForInternalThreadToExit());
+ t3.StartInternalThread();
+ t3.StopInternalThread();
}
} // namespace caffe