summaryrefslogtreecommitdiff
path: root/caffe2/distributed
diff options
context:
space:
mode:
authorPieter Noordhuis <pietern@fb.com>2016-11-22 12:13:43 -0800
committerBram Wasti <bwasti@dev11999.prn1.facebook.com>2016-11-29 15:18:36 -0800
commit122a89e3c564e69cdb43e4cfd16a6244ab89feeb (patch)
tree301eeb8942ff0bde57f736dcbcf148b35b3812b4 /caffe2/distributed
parent2790043421b621a7a7c80a1c25a8e124043324ad (diff)
downloadpytorch-122a89e3c564e69cdb43e4cfd16a6244ab89feeb.tar.gz
pytorch-122a89e3c564e69cdb43e4cfd16a6244ab89feeb.tar.bz2
pytorch-122a89e3c564e69cdb43e4cfd16a6244ab89feeb.zip
Add FileStoreHandler
Summary: The FileStoreHandler subclasses the abstract StoreHandler class. Operators expecting to work with a StoreHandler can now use the filesystem as their backing store. Reviewed By: Yangqing Differential Revision: D4217711 fbshipit-source-id: fce60c99c4c505201dfee33ca0a4e8a35db00338
Diffstat (limited to 'caffe2/distributed')
-rw-r--r--caffe2/distributed/file_store_handler.cc95
-rw-r--r--caffe2/distributed/file_store_handler.h30
-rw-r--r--caffe2/distributed/file_store_handler_op.cc37
-rw-r--r--caffe2/distributed/file_store_handler_op.h21
4 files changed, 183 insertions, 0 deletions
diff --git a/caffe2/distributed/file_store_handler.cc b/caffe2/distributed/file_store_handler.cc
new file mode 100644
index 0000000000..4e1068c8b5
--- /dev/null
+++ b/caffe2/distributed/file_store_handler.cc
@@ -0,0 +1,95 @@
+#include "file_store_handler_op.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#include <stdlib.h>
+
+#include <chrono>
+#include <thread>
+
+#include "caffe2/utils/proto_utils.h"
+
+namespace caffe2 {
+
+FileStoreHandler::FileStoreHandler(std::string& path) {
+ basePath_ = realPath(path);
+}
+
+FileStoreHandler::~FileStoreHandler() {}
+
+std::string FileStoreHandler::realPath(const std::string& path) {
+ std::array<char, PATH_MAX> buf;
+ CHECK_EQ(buf.data(), realpath(path.c_str(), buf.data())) << "realpath: "
+ << strerror(errno);
+ return std::string(buf.data());
+}
+
+std::string FileStoreHandler::objectPath(const std::string& name) {
+ std::string encoded;
+ for (const auto& c : name) {
+ // Convert non-alphabetic characters to octal.
+ // Means argument cannot collide with encoding.
+ // Don't want to take a dependency on SSL for SHA1 here.
+ if (!isalpha(c)) {
+ // 0-prefix, max 3 numbers, 0-terminator
+ std::array<char, 5> buf;
+ snprintf(buf.data(), buf.size(), "%#03o", c);
+ encoded.append(buf.data());
+ } else {
+ encoded.append(&c, 1);
+ }
+ }
+ return basePath_ + "/" + encoded;
+}
+
+void FileStoreHandler::set(const std::string& name, const std::string& data) {
+ auto path = objectPath(name);
+ WriteStringToFile(data, path.c_str());
+}
+
+std::string FileStoreHandler::get(const std::string& name) {
+ std::string result;
+ auto path = objectPath(name);
+ ReadStringFromFile(path.c_str(), &result);
+ return result;
+}
+
+int64_t FileStoreHandler::add(
+ const std::string& /* unused */,
+ int64_t /* unused */) {
+ CHECK(false) << "add not implemented for FileStoreHandler";
+}
+
+bool FileStoreHandler::check(const std::vector<std::string>& names) {
+ std::vector<std::string> paths;
+ for (const auto& name : names) {
+ paths.push_back(objectPath(name));
+ }
+
+ for (const auto& path : paths) {
+ int fd = open(path.c_str(), O_RDONLY);
+ if (fd == -1) {
+ // Only deal with files that don't exist.
+ // Anything else is a problem.
+ CHECK_EQ(errno, ENOENT);
+
+ // One of the paths doesn't exist; return early
+ return false;
+ }
+
+ close(fd);
+ }
+
+ return true;
+}
+
+void FileStoreHandler::wait(const std::vector<std::string>& names) {
+ // Not using inotify because it doesn't work on many
+ // shared filesystems (such as NFS).
+ while (!check(names)) {
+ /* sleep override */
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+}
+}
diff --git a/caffe2/distributed/file_store_handler.h b/caffe2/distributed/file_store_handler.h
new file mode 100644
index 0000000000..06174d2531
--- /dev/null
+++ b/caffe2/distributed/file_store_handler.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <caffe2/distributed/store_handler.h>
+
+namespace caffe2 {
+
+class FileStoreHandler : public StoreHandler {
+ public:
+ explicit FileStoreHandler(std::string& path);
+ virtual ~FileStoreHandler();
+
+ virtual void set(const std::string& name, const std::string& data) override;
+
+ virtual std::string get(const std::string& name) override;
+
+ virtual int64_t add(const std::string& name, int64_t value) override;
+
+ virtual bool check(const std::vector<std::string>& names) override;
+
+ virtual void wait(const std::vector<std::string>& names) override;
+
+ protected:
+ std::string basePath_;
+
+ std::string realPath(const std::string& path);
+
+ std::string objectPath(const std::string& name);
+};
+
+} // namespace caffe2
diff --git a/caffe2/distributed/file_store_handler_op.cc b/caffe2/distributed/file_store_handler_op.cc
new file mode 100644
index 0000000000..5595918990
--- /dev/null
+++ b/caffe2/distributed/file_store_handler_op.cc
@@ -0,0 +1,37 @@
+#include "file_store_handler_op.h"
+
+namespace caffe2 {
+
+FileStoreHandlerCreateOp::FileStoreHandlerCreateOp(
+ const OperatorDef& operator_def,
+ Workspace* ws)
+ : Operator(operator_def, ws),
+ basePath_(GetSingleArgument<std::string>("path", "")) {
+ CHECK_NE(basePath_, "") << "path is a required argument";
+}
+
+bool FileStoreHandlerCreateOp::RunOnDevice() {
+ auto ptr = std::unique_ptr<StoreHandler>(new FileStoreHandler(basePath_));
+ *OperatorBase::Output<std::unique_ptr<StoreHandler>>(HANDLER) =
+ std::move(ptr);
+ return true;
+}
+
+REGISTER_CPU_OPERATOR(FileStoreHandlerCreate, FileStoreHandlerCreateOp);
+OPERATOR_SCHEMA(FileStoreHandlerCreate)
+ .NumInputs(0)
+ .NumOutputs(1)
+ .SetDoc(R"DOC(
+Creates a unique_ptr<StoreHandler> that uses the filesystem as backing
+store (typically a filesystem shared between many nodes, such as NFS).
+This store handler is not built to be fast. Its recommended use is for
+integration tests and prototypes where extra dependencies are
+cumbersome. Use an ephemeral path to ensure multiple processes or runs
+don't interfere.
+)DOC")
+ .Arg("path", "base path used by the FileStoreHandler")
+ .Output(0, "handler", "unique_ptr<StoreHandler>");
+
+NO_GRADIENT(FileStoreHandlerCreateOp);
+
+} // namespace caffe2
diff --git a/caffe2/distributed/file_store_handler_op.h b/caffe2/distributed/file_store_handler_op.h
new file mode 100644
index 0000000000..42c77588f6
--- /dev/null
+++ b/caffe2/distributed/file_store_handler_op.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include "file_store_handler.h"
+
+#include <caffe2/core/operator.h>
+
+namespace caffe2 {
+
+class FileStoreHandlerCreateOp final : public Operator<CPUContext> {
+ public:
+ FileStoreHandlerCreateOp(const OperatorDef& operator_def, Workspace* ws);
+
+ bool RunOnDevice() override;
+
+ private:
+ std::string basePath_;
+
+ OUTPUT_TAGS(HANDLER);
+};
+
+} // namespace caffe2