diff options
author | Pieter Noordhuis <pietern@fb.com> | 2016-11-22 12:13:43 -0800 |
---|---|---|
committer | Bram Wasti <bwasti@dev11999.prn1.facebook.com> | 2016-11-29 15:18:36 -0800 |
commit | 122a89e3c564e69cdb43e4cfd16a6244ab89feeb (patch) | |
tree | 301eeb8942ff0bde57f736dcbcf148b35b3812b4 /caffe2/distributed | |
parent | 2790043421b621a7a7c80a1c25a8e124043324ad (diff) | |
download | pytorch-122a89e3c564e69cdb43e4cfd16a6244ab89feeb.tar.gz pytorch-122a89e3c564e69cdb43e4cfd16a6244ab89feeb.tar.bz2 pytorch-122a89e3c564e69cdb43e4cfd16a6244ab89feeb.zip |
Add FileStoreHandler
Summary:
The FileStoreHandler subclasses the abstract StoreHandler
class.
Operators expecting to work with a StoreHandler can now use the
filesystem as their backing store.
Reviewed By: Yangqing
Differential Revision: D4217711
fbshipit-source-id: fce60c99c4c505201dfee33ca0a4e8a35db00338
Diffstat (limited to 'caffe2/distributed')
-rw-r--r-- | caffe2/distributed/file_store_handler.cc | 95 | ||||
-rw-r--r-- | caffe2/distributed/file_store_handler.h | 30 | ||||
-rw-r--r-- | caffe2/distributed/file_store_handler_op.cc | 37 | ||||
-rw-r--r-- | caffe2/distributed/file_store_handler_op.h | 21 |
4 files changed, 183 insertions, 0 deletions
diff --git a/caffe2/distributed/file_store_handler.cc b/caffe2/distributed/file_store_handler.cc new file mode 100644 index 0000000000..4e1068c8b5 --- /dev/null +++ b/caffe2/distributed/file_store_handler.cc @@ -0,0 +1,95 @@ +#include "file_store_handler_op.h" + +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#include <stdlib.h> + +#include <chrono> +#include <thread> + +#include "caffe2/utils/proto_utils.h" + +namespace caffe2 { + +FileStoreHandler::FileStoreHandler(std::string& path) { + basePath_ = realPath(path); +} + +FileStoreHandler::~FileStoreHandler() {} + +std::string FileStoreHandler::realPath(const std::string& path) { + std::array<char, PATH_MAX> buf; + CHECK_EQ(buf.data(), realpath(path.c_str(), buf.data())) << "realpath: " + << strerror(errno); + return std::string(buf.data()); +} + +std::string FileStoreHandler::objectPath(const std::string& name) { + std::string encoded; + for (const auto& c : name) { + // Convert non-alphabetic characters to octal. + // Means argument cannot collide with encoding. + // Don't want to take a dependency on SSL for SHA1 here. + if (!isalpha(c)) { + // 0-prefix, max 3 numbers, 0-terminator + std::array<char, 5> buf; + snprintf(buf.data(), buf.size(), "%#03o", c); + encoded.append(buf.data()); + } else { + encoded.append(&c, 1); + } + } + return basePath_ + "/" + encoded; +} + +void FileStoreHandler::set(const std::string& name, const std::string& data) { + auto path = objectPath(name); + WriteStringToFile(data, path.c_str()); +} + +std::string FileStoreHandler::get(const std::string& name) { + std::string result; + auto path = objectPath(name); + ReadStringFromFile(path.c_str(), &result); + return result; +} + +int64_t FileStoreHandler::add( + const std::string& /* unused */, + int64_t /* unused */) { + CHECK(false) << "add not implemented for FileStoreHandler"; +} + +bool FileStoreHandler::check(const std::vector<std::string>& names) { + std::vector<std::string> paths; + for (const auto& name : names) { + paths.push_back(objectPath(name)); + } + + for (const auto& path : paths) { + int fd = open(path.c_str(), O_RDONLY); + if (fd == -1) { + // Only deal with files that don't exist. + // Anything else is a problem. + CHECK_EQ(errno, ENOENT); + + // One of the paths doesn't exist; return early + return false; + } + + close(fd); + } + + return true; +} + +void FileStoreHandler::wait(const std::vector<std::string>& names) { + // Not using inotify because it doesn't work on many + // shared filesystems (such as NFS). + while (!check(names)) { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} +} diff --git a/caffe2/distributed/file_store_handler.h b/caffe2/distributed/file_store_handler.h new file mode 100644 index 0000000000..06174d2531 --- /dev/null +++ b/caffe2/distributed/file_store_handler.h @@ -0,0 +1,30 @@ +#pragma once + +#include <caffe2/distributed/store_handler.h> + +namespace caffe2 { + +class FileStoreHandler : public StoreHandler { + public: + explicit FileStoreHandler(std::string& path); + virtual ~FileStoreHandler(); + + virtual void set(const std::string& name, const std::string& data) override; + + virtual std::string get(const std::string& name) override; + + virtual int64_t add(const std::string& name, int64_t value) override; + + virtual bool check(const std::vector<std::string>& names) override; + + virtual void wait(const std::vector<std::string>& names) override; + + protected: + std::string basePath_; + + std::string realPath(const std::string& path); + + std::string objectPath(const std::string& name); +}; + +} // namespace caffe2 diff --git a/caffe2/distributed/file_store_handler_op.cc b/caffe2/distributed/file_store_handler_op.cc new file mode 100644 index 0000000000..5595918990 --- /dev/null +++ b/caffe2/distributed/file_store_handler_op.cc @@ -0,0 +1,37 @@ +#include "file_store_handler_op.h" + +namespace caffe2 { + +FileStoreHandlerCreateOp::FileStoreHandlerCreateOp( + const OperatorDef& operator_def, + Workspace* ws) + : Operator(operator_def, ws), + basePath_(GetSingleArgument<std::string>("path", "")) { + CHECK_NE(basePath_, "") << "path is a required argument"; +} + +bool FileStoreHandlerCreateOp::RunOnDevice() { + auto ptr = std::unique_ptr<StoreHandler>(new FileStoreHandler(basePath_)); + *OperatorBase::Output<std::unique_ptr<StoreHandler>>(HANDLER) = + std::move(ptr); + return true; +} + +REGISTER_CPU_OPERATOR(FileStoreHandlerCreate, FileStoreHandlerCreateOp); +OPERATOR_SCHEMA(FileStoreHandlerCreate) + .NumInputs(0) + .NumOutputs(1) + .SetDoc(R"DOC( +Creates a unique_ptr<StoreHandler> that uses the filesystem as backing +store (typically a filesystem shared between many nodes, such as NFS). +This store handler is not built to be fast. Its recommended use is for +integration tests and prototypes where extra dependencies are +cumbersome. Use an ephemeral path to ensure multiple processes or runs +don't interfere. +)DOC") + .Arg("path", "base path used by the FileStoreHandler") + .Output(0, "handler", "unique_ptr<StoreHandler>"); + +NO_GRADIENT(FileStoreHandlerCreateOp); + +} // namespace caffe2 diff --git a/caffe2/distributed/file_store_handler_op.h b/caffe2/distributed/file_store_handler_op.h new file mode 100644 index 0000000000..42c77588f6 --- /dev/null +++ b/caffe2/distributed/file_store_handler_op.h @@ -0,0 +1,21 @@ +#pragma once + +#include "file_store_handler.h" + +#include <caffe2/core/operator.h> + +namespace caffe2 { + +class FileStoreHandlerCreateOp final : public Operator<CPUContext> { + public: + FileStoreHandlerCreateOp(const OperatorDef& operator_def, Workspace* ws); + + bool RunOnDevice() override; + + private: + std::string basePath_; + + OUTPUT_TAGS(HANDLER); +}; + +} // namespace caffe2 |