summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--snappy-sinksource.cc33
-rw-r--r--snappy-sinksource.h47
-rw-r--r--snappy.cc249
-rw-r--r--snappy.h20
-rw-r--r--snappy_unittest.cc72
5 files changed, 420 insertions, 1 deletions
diff --git a/snappy-sinksource.cc b/snappy-sinksource.cc
index 5844552..369a132 100644
--- a/snappy-sinksource.cc
+++ b/snappy-sinksource.cc
@@ -40,6 +40,21 @@ char* Sink::GetAppendBuffer(size_t length, char* scratch) {
return scratch;
}
+char* Sink::GetAppendBufferVariable(
+ size_t min_size, size_t desired_size_hint, char* scratch,
+ size_t scratch_size, size_t* allocated_size) {
+ *allocated_size = scratch_size;
+ return scratch;
+}
+
+void Sink::AppendAndTakeOwnership(
+ char* bytes, size_t n,
+ void (*deleter)(void*, const char*, size_t),
+ void *deleter_arg) {
+ Append(bytes, n);
+ (*deleter)(deleter_arg, bytes, n);
+}
+
ByteArraySource::~ByteArraySource() { }
size_t ByteArraySource::Available() const { return left_; }
@@ -68,4 +83,22 @@ char* UncheckedByteArraySink::GetAppendBuffer(size_t len, char* scratch) {
return dest_;
}
+void UncheckedByteArraySink::AppendAndTakeOwnership(
+ char* data, size_t n,
+ void (*deleter)(void*, const char*, size_t),
+ void *deleter_arg) {
+ if (data != dest_) {
+ memcpy(dest_, data, n);
+ (*deleter)(deleter_arg, data, n);
+ }
+ dest_ += n;
+}
+
+char* UncheckedByteArraySink::GetAppendBufferVariable(
+ size_t min_size, size_t desired_size_hint, char* scratch,
+ size_t scratch_size, size_t* allocated_size) {
+ *allocated_size = desired_size_hint;
+ return dest_;
}
+
+} // namespace snappy
diff --git a/snappy-sinksource.h b/snappy-sinksource.h
index bc4ac52..8afcdaa 100644
--- a/snappy-sinksource.h
+++ b/snappy-sinksource.h
@@ -59,6 +59,47 @@ class Sink {
// The default implementation always returns the scratch buffer.
virtual char* GetAppendBuffer(size_t length, char* scratch);
+ // For higher performance, Sink implementations can provide custom
+ // AppendAndTakeOwnership() and GetAppendBufferVariable() methods.
+ // These methods can reduce the number of copies done during
+ // compression/decompression.
+
+ // Append "bytes[0,n-1] to the sink. Takes ownership of "bytes"
+ // and calls the deleter function as (*deleter)(deleter_arg, bytes, n)
+ // to free the buffer. deleter function must be non NULL.
+ //
+ // The default implementation just calls Append and frees "bytes".
+ // Other implementations may avoid a copy while appending the buffer.
+ virtual void AppendAndTakeOwnership(
+ char* bytes, size_t n, void (*deleter)(void*, const char*, size_t),
+ void *deleter_arg);
+
+ // Returns a writable buffer for appending and writes the buffer's capacity to
+ // *allocated_size. Guarantees *allocated_size >= min_size.
+ // May return a pointer to the caller-owned scratch buffer which must have
+ // scratch_size >= min_size.
+ //
+ // The returned buffer is only valid until the next operation
+ // on this ByteSink.
+ //
+ // After writing at most *allocated_size bytes, call Append() with the
+ // pointer returned from this function and the number of bytes written.
+ // Many Append() implementations will avoid copying bytes if this function
+ // returned an internal buffer.
+ //
+ // If the sink implementation allocates or reallocates an internal buffer,
+ // it should use the desired_size_hint if appropriate. If a caller cannot
+ // provide a reasonable guess at the desired capacity, it should set
+ // desired_size_hint = 0.
+ //
+ // If a non-scratch buffer is returned, the caller may only pass
+ // a prefix to it to Append(). That is, it is not correct to pass an
+ // interior pointer to Append().
+ //
+ // The default implementation always returns the scratch buffer.
+ virtual char* GetAppendBufferVariable(
+ size_t min_size, size_t desired_size_hint, char* scratch,
+ size_t scratch_size, size_t* allocated_size);
private:
// No copying
@@ -121,6 +162,12 @@ class UncheckedByteArraySink : public Sink {
virtual ~UncheckedByteArraySink();
virtual void Append(const char* data, size_t n);
virtual char* GetAppendBuffer(size_t len, char* scratch);
+ virtual char* GetAppendBufferVariable(
+ size_t min_size, size_t desired_size_hint, char* scratch,
+ size_t scratch_size, size_t* allocated_size);
+ virtual void AppendAndTakeOwnership(
+ char* bytes, size_t n, void (*deleter)(void*, const char*, size_t),
+ void *deleter_arg);
// Return the current output pointer so that a caller can see how
// many bytes were produced.
diff --git a/snappy.cc b/snappy.cc
index 971adc2..b973d63 100644
--- a/snappy.cc
+++ b/snappy.cc
@@ -863,6 +863,7 @@ static bool InternalUncompressAllTags(SnappyDecompressor* decompressor,
// Process the entire input
decompressor->DecompressAllTags(writer);
+ writer->Flush();
return (decompressor->eof() && writer->CheckLength());
}
@@ -1115,6 +1116,7 @@ class SnappyIOVecWriter {
return true;
}
+ inline void Flush() {}
};
bool RawUncompressToIOVec(const char* compressed, size_t compressed_length,
@@ -1215,6 +1217,10 @@ class SnappyArrayWriter {
op_ = op + len;
return true;
}
+ inline size_t Produced() const {
+ return op_ - base_;
+ }
+ inline void Flush() {}
};
bool RawUncompress(const char* compressed, size_t n, char* uncompressed) {
@@ -1269,6 +1275,7 @@ class SnappyDecompressionValidator {
produced_ += len;
return produced_ <= expected_;
}
+ inline void Flush() {}
};
bool IsValidCompressedBuffer(const char* compressed, size_t n) {
@@ -1277,6 +1284,11 @@ bool IsValidCompressedBuffer(const char* compressed, size_t n) {
return InternalUncompress(&reader, &writer);
}
+bool IsValidCompressed(Source* compressed) {
+ SnappyDecompressionValidator writer;
+ return InternalUncompress(compressed, &writer);
+}
+
void RawCompress(const char* input,
size_t input_length,
char* compressed,
@@ -1300,6 +1312,241 @@ size_t Compress(const char* input, size_t input_length, string* compressed) {
return compressed_length;
}
+// -----------------------------------------------------------------------
+// Sink interface
+// -----------------------------------------------------------------------
-} // end namespace snappy
+// A type that decompresses into a Sink. The template parameter
+// Allocator must export one method "char* Allocate(int size);", which
+// allocates a buffer of "size" and appends that to the destination.
+template <typename Allocator>
+class SnappyScatteredWriter {
+ Allocator allocator_;
+
+ // We need random access into the data generated so far. Therefore
+ // we keep track of all of the generated data as an array of blocks.
+ // All of the blocks except the last have length kBlockSize.
+ vector<char*> blocks_;
+ size_t expected_;
+
+ // Total size of all fully generated blocks so far
+ size_t full_size_;
+
+ // Pointer into current output block
+ char* op_base_; // Base of output block
+ char* op_ptr_; // Pointer to next unfilled byte in block
+ char* op_limit_; // Pointer just past block
+
+ inline size_t Size() const {
+ return full_size_ + (op_ptr_ - op_base_);
+ }
+
+ bool SlowAppend(const char* ip, size_t len);
+ bool SlowAppendFromSelf(size_t offset, size_t len);
+
+ public:
+ inline explicit SnappyScatteredWriter(const Allocator& allocator)
+ : allocator_(allocator),
+ full_size_(0),
+ op_base_(NULL),
+ op_ptr_(NULL),
+ op_limit_(NULL) {
+ }
+
+ inline void SetExpectedLength(size_t len) {
+ assert(blocks_.empty());
+ expected_ = len;
+ }
+
+ inline bool CheckLength() const {
+ return Size() == expected_;
+ }
+
+ // Return the number of bytes actually uncompressed so far
+ inline size_t Produced() const {
+ return Size();
+ }
+
+ inline bool Append(const char* ip, size_t len) {
+ size_t avail = op_limit_ - op_ptr_;
+ if (len <= avail) {
+ // Fast path
+ memcpy(op_ptr_, ip, len);
+ op_ptr_ += len;
+ return true;
+ } else {
+ return SlowAppend(ip, len);
+ }
+ }
+
+ inline bool TryFastAppend(const char* ip, size_t available, size_t length) {
+ char* op = op_ptr_;
+ const int space_left = op_limit_ - op;
+ if (length <= 16 && available >= 16 + kMaximumTagLength &&
+ space_left >= 16) {
+ // Fast path, used for the majority (about 95%) of invocations.
+ UNALIGNED_STORE64(op, UNALIGNED_LOAD64(ip));
+ UNALIGNED_STORE64(op + 8, UNALIGNED_LOAD64(ip + 8));
+ op_ptr_ = op + length;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ inline bool AppendFromSelf(size_t offset, size_t len) {
+ // See SnappyArrayWriter::AppendFromSelf for an explanation of
+ // the "offset - 1u" trick.
+ if (offset - 1u < op_ptr_ - op_base_) {
+ const size_t space_left = op_limit_ - op_ptr_;
+ if (space_left >= len + kMaxIncrementCopyOverflow) {
+ // Fast path: src and dst in current block.
+ IncrementalCopyFastPath(op_ptr_ - offset, op_ptr_, len);
+ op_ptr_ += len;
+ return true;
+ }
+ }
+ return SlowAppendFromSelf(offset, len);
+ }
+
+ // Called at the end of the decompress. We ask the allocator
+ // write all blocks to the sink.
+ inline void Flush() { allocator_.Flush(Produced()); }
+};
+
+template<typename Allocator>
+bool SnappyScatteredWriter<Allocator>::SlowAppend(const char* ip, size_t len) {
+ size_t avail = op_limit_ - op_ptr_;
+ while (len > avail) {
+ // Completely fill this block
+ memcpy(op_ptr_, ip, avail);
+ op_ptr_ += avail;
+ assert(op_limit_ - op_ptr_ == 0);
+ full_size_ += (op_ptr_ - op_base_);
+ len -= avail;
+ ip += avail;
+
+ // Bounds check
+ if (full_size_ + len > expected_) {
+ return false;
+ }
+
+ // Make new block
+ size_t bsize = min<size_t>(kBlockSize, expected_ - full_size_);
+ op_base_ = allocator_.Allocate(bsize);
+ op_ptr_ = op_base_;
+ op_limit_ = op_base_ + bsize;
+ blocks_.push_back(op_base_);
+ avail = bsize;
+ }
+
+ memcpy(op_ptr_, ip, len);
+ op_ptr_ += len;
+ return true;
+}
+
+template<typename Allocator>
+bool SnappyScatteredWriter<Allocator>::SlowAppendFromSelf(size_t offset,
+ size_t len) {
+ // Overflow check
+ // See SnappyArrayWriter::AppendFromSelf for an explanation of
+ // the "offset - 1u" trick.
+ const size_t cur = Size();
+ if (offset - 1u >= cur) return false;
+ if (expected_ - cur < len) return false;
+
+ // Currently we shouldn't ever hit this path because Compress() chops the
+ // input into blocks and does not create cross-block copies. However, it is
+ // nice if we do not rely on that, since we can get better compression if we
+ // allow cross-block copies and thus might want to change the compressor in
+ // the future.
+ size_t src = cur - offset;
+ while (len-- > 0) {
+ char c = blocks_[src >> kBlockLog][src & (kBlockSize-1)];
+ Append(&c, 1);
+ src++;
+ }
+ return true;
+}
+
+class SnappySinkAllocator {
+ public:
+ explicit SnappySinkAllocator(Sink* dest): dest_(dest) {}
+ ~SnappySinkAllocator() {}
+
+ char* Allocate(int size) {
+ Datablock block(new char[size], size);
+ blocks_.push_back(block);
+ return block.data;
+ }
+ // We flush only at the end, because the writer wants
+ // random access to the blocks and once we hand the
+ // block over to the sink, we can't access it anymore.
+ // Also we don't write more than has been actually written
+ // to the blocks.
+ void Flush(size_t size) {
+ size_t size_written = 0;
+ size_t block_size;
+ for (int i = 0; i < blocks_.size(); ++i) {
+ block_size = min<size_t>(blocks_[i].size, size - size_written);
+ dest_->AppendAndTakeOwnership(blocks_[i].data, block_size,
+ &SnappySinkAllocator::Deleter, NULL);
+ size_written += block_size;
+ }
+ blocks_.clear();
+ }
+
+ private:
+ struct Datablock {
+ char* data;
+ size_t size;
+ Datablock(char* p, size_t s) : data(p), size(s) {}
+ };
+
+ static void Deleter(void* arg, const char* bytes, size_t size) {
+ delete[] bytes;
+ }
+
+ Sink* dest_;
+ vector<Datablock> blocks_;
+
+ // Note: copying this object is allowed
+};
+
+size_t UncompressAsMuchAsPossible(Source* compressed, Sink* uncompressed) {
+ SnappySinkAllocator allocator(uncompressed);
+ SnappyScatteredWriter<SnappySinkAllocator> writer(allocator);
+ InternalUncompress(compressed, &writer);
+ return writer.Produced();
+}
+
+bool Uncompress(Source* compressed, Sink* uncompressed) {
+ // Read the uncompressed length from the front of the compressed input
+ SnappyDecompressor decompressor(compressed);
+ uint32 uncompressed_len = 0;
+ if (!decompressor.ReadUncompressedLength(&uncompressed_len)) {
+ return false;
+ }
+
+ char c;
+ size_t allocated_size;
+ char* buf = uncompressed->GetAppendBufferVariable(
+ 1, uncompressed_len, &c, 1, &allocated_size);
+
+ // If we can get a flat buffer, then use it, otherwise do block by block
+ // uncompression
+ if (allocated_size >= uncompressed_len) {
+ SnappyArrayWriter writer(buf);
+ bool result = InternalUncompressAllTags(
+ &decompressor, &writer, uncompressed_len);
+ uncompressed->Append(buf, writer.Produced());
+ return result;
+ } else {
+ SnappySinkAllocator allocator(uncompressed);
+ SnappyScatteredWriter<SnappySinkAllocator> writer(allocator);
+ return InternalUncompressAllTags(&decompressor, &writer, uncompressed_len);
+ }
+}
+
+} // end namespace snappy
diff --git a/snappy.h b/snappy.h
index 4ac572f..4568db8 100644
--- a/snappy.h
+++ b/snappy.h
@@ -84,6 +84,18 @@ namespace snappy {
bool Uncompress(const char* compressed, size_t compressed_length,
string* uncompressed);
+ // Decompresses "compressed" to "*uncompressed".
+ //
+ // returns false if the message is corrupted and could not be decompressed
+ bool Uncompress(Source* compressed, Sink* uncompressed);
+
+ // This routine uncompresses as much of the "compressed" as possible
+ // into sink. It returns the number of valid bytes added to sink
+ // (extra invalid bytes may have been added due to errors; the caller
+ // should ignore those). The emitted data typically has length
+ // GetUncompressedLength(), but may be shorter if an error is
+ // encountered.
+ size_t UncompressAsMuchAsPossible(Source* compressed, Sink* uncompressed);
// ------------------------------------------------------------------------
// Lower-level character array based routines. May be useful for
@@ -164,6 +176,14 @@ namespace snappy {
bool IsValidCompressedBuffer(const char* compressed,
size_t compressed_length);
+ // Returns true iff the contents of "compressed" can be uncompressed
+ // successfully. Does not return the uncompressed data. Takes
+ // time proportional to *compressed length, but is usually at least
+ // a factor of four faster than actual decompression.
+ // On success, consumes all of *compressed. On failure, consumes an
+ // unspecified prefix of *compressed.
+ bool IsValidCompressed(Source* compressed);
+
// The size of a compression block. Note that many parts of the compression
// code assumes that kBlockSize <= 65536; in particular, the hash table
// can only store 16-bit offsets, and EmitCopy() also assumes the offset
diff --git a/snappy_unittest.cc b/snappy_unittest.cc
index 5653a2c..4a80f2a 100644
--- a/snappy_unittest.cc
+++ b/snappy_unittest.cc
@@ -488,6 +488,23 @@ static int VerifyString(const string& input) {
return uncompressed.size();
}
+static void VerifyStringSink(const string& input) {
+ string compressed;
+ DataEndingAtUnreadablePage i(input);
+ const size_t written = snappy::Compress(i.data(), i.size(), &compressed);
+ CHECK_EQ(written, compressed.size());
+ CHECK_LE(compressed.size(),
+ snappy::MaxCompressedLength(input.size()));
+ CHECK(snappy::IsValidCompressedBuffer(compressed.data(), compressed.size()));
+
+ string uncompressed;
+ uncompressed.resize(input.size());
+ snappy::UncheckedByteArraySink sink(string_as_array(&uncompressed));
+ DataEndingAtUnreadablePage c(compressed);
+ snappy::ByteArraySource source(c.data(), c.size());
+ CHECK(snappy::Uncompress(&source, &sink));
+ CHECK_EQ(uncompressed, input);
+}
static void VerifyIOVec(const string& input) {
string compressed;
@@ -559,6 +576,28 @@ static void VerifyNonBlockedCompression(const string& input) {
CHECK(snappy::Uncompress(compressed.data(), compressed.size(), &uncomp_str));
CHECK_EQ(uncomp_str, input);
+ // Uncompress using source/sink
+ string uncomp_str2;
+ uncomp_str2.resize(input.size());
+ snappy::UncheckedByteArraySink sink(string_as_array(&uncomp_str2));
+ snappy::ByteArraySource source(compressed.data(), compressed.size());
+ CHECK(snappy::Uncompress(&source, &sink));
+ CHECK_EQ(uncomp_str2, input);
+
+ // Uncompress into iovec
+ {
+ static const int kNumBlocks = 10;
+ struct iovec vec[kNumBlocks];
+ const int block_size = 1 + input.size() / kNumBlocks;
+ string iovec_data(block_size * kNumBlocks, 'x');
+ for (int i = 0; i < kNumBlocks; i++) {
+ vec[i].iov_base = string_as_array(&iovec_data) + i * block_size;
+ vec[i].iov_len = block_size;
+ }
+ CHECK(snappy::RawUncompressToIOVec(compressed.data(), compressed.size(),
+ vec, kNumBlocks));
+ CHECK_EQ(string(iovec_data.data(), input.size()), input);
+ }
}
// Expand the input so that it is at least K times as big as block size
@@ -577,6 +616,8 @@ static int Verify(const string& input) {
// Compress using string based routines
const int result = VerifyString(input);
+ // Verify using sink based routines
+ VerifyStringSink(input);
VerifyNonBlockedCompression(input);
VerifyIOVec(input);
@@ -1291,6 +1332,37 @@ static void BM_UIOVec(int iters, int arg) {
}
BENCHMARK(BM_UIOVec)->DenseRange(0, 4);
+static void BM_UFlatSink(int iters, int arg) {
+ StopBenchmarkTiming();
+
+ // Pick file to process based on "arg"
+ CHECK_GE(arg, 0);
+ CHECK_LT(arg, ARRAYSIZE(files));
+ string contents = ReadTestDataFile(files[arg].filename,
+ files[arg].size_limit);
+
+ string zcontents;
+ snappy::Compress(contents.data(), contents.size(), &zcontents);
+ char* dst = new char[contents.size()];
+
+ SetBenchmarkBytesProcessed(static_cast<int64>(iters) *
+ static_cast<int64>(contents.size()));
+ SetBenchmarkLabel(files[arg].label);
+ StartBenchmarkTiming();
+ while (iters-- > 0) {
+ snappy::ByteArraySource source(zcontents.data(), zcontents.size());
+ snappy::UncheckedByteArraySink sink(dst);
+ CHECK(snappy::Uncompress(&source, &sink));
+ }
+ StopBenchmarkTiming();
+
+ string s(dst, contents.size());
+ CHECK_EQ(contents, s);
+
+ delete[] dst;
+}
+
+BENCHMARK(BM_UFlatSink)->DenseRange(0, ARRAYSIZE(files) - 1);
static void BM_ZFlat(int iters, int arg) {
StopBenchmarkTiming();