diff options
Diffstat (limited to 'src/mscorlib/src/System/IO/BufferedStream.cs')
-rw-r--r-- | src/mscorlib/src/System/IO/BufferedStream.cs | 1319 |
1 files changed, 1319 insertions, 0 deletions
diff --git a/src/mscorlib/src/System/IO/BufferedStream.cs b/src/mscorlib/src/System/IO/BufferedStream.cs new file mode 100644 index 0000000000..ab6e8a5b34 --- /dev/null +++ b/src/mscorlib/src/System/IO/BufferedStream.cs @@ -0,0 +1,1319 @@ +// Copyright (c) Microsoft. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for full license information. + +/*============================================================ +** +** +** +** +** Purpose: A composable Stream that buffers reads & writes to the underlying stream. +** +** +===========================================================*/ +using System; +using System.Runtime.InteropServices; +using System.Globalization; +using System.Diagnostics.Contracts; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Collections.ObjectModel; +using System.Security; +using System.Threading.Tasks; + +namespace System.IO { + +/// <summary> +/// One of the design goals here is to prevent the buffer from getting in the way and slowing +/// down underlying stream accesses when it is not needed. If you always read & write for sizes +/// greater than the internal buffer size, then this class may not even allocate the internal buffer. +/// See a large comment in Write for the details of the write buffer heuristic. +/// +/// This class buffers reads & writes in a shared buffer. +/// (If you maintained two buffers separately, one operation would always trash the other buffer +/// anyways, so we might as well use one buffer.) +/// The assumption here is you will almost always be doing a series of reads or writes, but rarely +/// alternate between the two of them on the same stream. +/// +/// Class Invariants: +/// The class has one buffer, shared for reading & writing. +/// It can only be used for one or the other at any point in time - not both. +/// The following should be true: +/// <![CDATA[ +/// * 0 <= _readPos <= _readLen < _bufferSize +/// * 0 <= _writePos < _bufferSize +/// * _readPos == _readLen && _readPos > 0 implies the read buffer is valid, but we're at the end of the buffer. +/// * _readPos == _readLen == 0 means the read buffer contains garbage. +/// * Either _writePos can be greater than 0, or _readLen & _readPos can be greater than zero, +/// but neither can be greater than zero at the same time. +/// ]]> +/// This class will never cache more bytes than the max specified buffer size. +/// However, it may use a temporary buffer of up to twice the size in order to combine several IO operations on +/// the underlying stream into a single operation. This is because we assume that memory copies are significantly +/// faster than IO operations on the underlying stream (if this was not true, using buffering is never appropriate). +/// The max size of this "shadow" buffer is limited as to not allocate it on the LOH. +/// Shadowing is always transient. Even when using this technique, this class still guarantees that the number of +/// bytes cached (not yet written to the target stream or not yet consumed by the user) is never larger than the +/// actual specified buffer size. +/// </summary> +[ComVisible(true)] +public sealed class BufferedStream : Stream { + + + private const Int32 _DefaultBufferSize = 4096; + + + private Stream _stream; // Underlying stream. Close sets _stream to null. + + private Byte[] _buffer; // Shared read/write buffer. Alloc on first use. + + private readonly Int32 _bufferSize; // Length of internal buffer (not counting the shadow buffer). + + private Int32 _readPos; // Read pointer within shared buffer. + private Int32 _readLen; // Number of bytes read in buffer from _stream. + private Int32 _writePos; // Write pointer within shared buffer. + + private BeginEndAwaitableAdapter _beginEndAwaitable; // Used to be able to await a BeginXxx call and thus to share code + // between the APM and Async pattern implementations + + private Task<Int32> _lastSyncCompletedReadTask; // The last successful Task returned from ReadAsync + // (perf optimization for successive reads of the same size) + + + // Removing a private default constructor is a breaking change for the DataContractSerializer. + // Because this ctor was here previously we need to keep it around. + private BufferedStream() { } + + + public BufferedStream(Stream stream) + + : this(stream, _DefaultBufferSize) { + } + + + public BufferedStream(Stream stream, Int32 bufferSize) { + + if (stream == null) + throw new ArgumentNullException("stream"); + + if (bufferSize <= 0) + throw new ArgumentOutOfRangeException("bufferSize", Environment.GetResourceString("ArgumentOutOfRange_MustBePositive", "bufferSize")); + + Contract.EndContractBlock(); + + BCLDebug.Perf(!(stream is FileStream), "FileStream is buffered - don't wrap it in a BufferedStream"); + BCLDebug.Perf(!(stream is MemoryStream), "MemoryStream shouldn't be wrapped in a BufferedStream!"); + BCLDebug.Perf(!(stream is BufferedStream), "BufferedStream shouldn't be wrapped in another BufferedStream!"); + + _stream = stream; + _bufferSize = bufferSize; + + // Allocate _buffer on its first use - it will not be used if all reads + // & writes are greater than or equal to buffer size. + + if (!_stream.CanRead && !_stream.CanWrite) + __Error.StreamIsClosed(); + } + + + private void EnsureNotClosed() { + + if (_stream == null) + __Error.StreamIsClosed(); + } + + + private void EnsureCanSeek() { + + Contract.Requires(_stream != null); + + if (!_stream.CanSeek) + __Error.SeekNotSupported(); + } + + + private void EnsureCanRead() { + + Contract.Requires(_stream != null); + + if (!_stream.CanRead) + __Error.ReadNotSupported(); + } + + + private void EnsureCanWrite() { + + Contract.Requires(_stream != null); + + if (!_stream.CanWrite) + __Error.WriteNotSupported(); + } + + + private void EnsureBeginEndAwaitableAllocated() { + // We support only a single ongoing async operation and enforce this with a semaphore, + // so singleton is fine and no need to worry about a race condition here. + if (_beginEndAwaitable == null) + _beginEndAwaitable = new BeginEndAwaitableAdapter(); + } + + + /// <summary><code>MaxShadowBufferSize</code> is chosed such that shadow buffers are not allocated on the Large Object Heap. + /// Currently, an object is allocated on the LOH if it is larger than 85000 bytes. See LARGE_OBJECT_SIZE in ndp\clr\src\vm\gc.h + /// We will go with exactly 80 KBytes, although this is somewhat arbitrary.</summary> + private const Int32 MaxShadowBufferSize = 81920; // Make sure not to get to the Large Object Heap. + private void EnsureShadowBufferAllocated() { + + Contract.Assert(_buffer != null); + Contract.Assert(_bufferSize > 0); + + // Already have shadow buffer? + if (_buffer.Length != _bufferSize || _bufferSize >= MaxShadowBufferSize) + return; + + Byte[] shadowBuffer = new Byte[Math.Min(_bufferSize + _bufferSize, MaxShadowBufferSize)]; + Buffer.InternalBlockCopy(_buffer, 0, shadowBuffer, 0, _writePos); + _buffer = shadowBuffer; + } + + + private void EnsureBufferAllocated() { + + Contract.Assert(_bufferSize > 0); + + // BufferedStream is not intended for multi-threaded use, so no worries about the get/set race conditions on _buffer. + if (_buffer == null) + _buffer = new Byte[_bufferSize]; + } + + + internal Stream UnderlyingStream { + [FriendAccessAllowed] + [Pure] + get { return _stream; } + } + + + internal Int32 BufferSize { + [FriendAccessAllowed] + [Pure] + get { return _bufferSize; } + } + + + public override bool CanRead { + [Pure] + get { return _stream != null && _stream.CanRead; } + } + + + public override bool CanWrite { + [Pure] + get { return _stream != null && _stream.CanWrite; } + } + + + public override bool CanSeek { + [Pure] + get { return _stream != null && _stream.CanSeek; } + } + + + public override Int64 Length { + get { + EnsureNotClosed(); + + if (_writePos > 0) + FlushWrite(); + + return _stream.Length; + } + } + + + public override Int64 Position { + get { + EnsureNotClosed(); + EnsureCanSeek(); + + Contract.Assert(! (_writePos > 0 && _readPos != _readLen), "Read and Write buffers cannot both have data in them at the same time."); + return _stream.Position + (_readPos - _readLen + _writePos); + } + set { + if (value < 0) + throw new ArgumentOutOfRangeException("value", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + Contract.EndContractBlock(); + + EnsureNotClosed(); + EnsureCanSeek(); + + if (_writePos > 0) + FlushWrite(); + + _readPos = 0; + _readLen = 0; + _stream.Seek(value, SeekOrigin.Begin); + } + } + + + protected override void Dispose(bool disposing) { + + try { + if (disposing && _stream != null) { + try { + Flush(); + } finally { + _stream.Close(); + } + } + } finally { + _stream = null; + _buffer = null; + _lastSyncCompletedReadTask = null; + + // Call base.Dispose(bool) to cleanup async IO resources + base.Dispose(disposing); + } + } + + + public override void Flush() { + + EnsureNotClosed(); + + // Has WRITE data in the buffer: + if (_writePos > 0) { + + FlushWrite(); + Contract.Assert(_writePos == 0 && _readPos == 0 && _readLen == 0); + return; + } + + // Has READ data in the buffer: + if (_readPos < _readLen) { + + // If the underlying stream is not seekable AND we have something in the read buffer, then FlushRead would throw. + // We can either throw away the buffer resulting in data loss (!) or ignore the Flush. + // (We cannot throw becasue it would be a breaking change.) We opt into ignoring the Flush in that situation. + if (!_stream.CanSeek) + return; + + FlushRead(); + + // User streams may have opted to throw from Flush if CanWrite is false (although the abstract Stream does not do so). + // However, if we do not forward the Flush to the underlying stream, we may have problems when chaining several streams. + // Let us make a best effort attempt: + if (_stream.CanWrite || _stream is BufferedStream) + _stream.Flush(); + + Contract.Assert(_writePos == 0 && _readPos == 0 && _readLen == 0); + return; + } + + // We had no data in the buffer, but we still need to tell the underlying stream to flush. + if (_stream.CanWrite || _stream is BufferedStream) + _stream.Flush(); + + _writePos = _readPos = _readLen = 0; + } + + public override Task FlushAsync(CancellationToken cancellationToken) { + + if (cancellationToken.IsCancellationRequested) + return Task.FromCancellation<Int32>(cancellationToken); + + EnsureNotClosed(); + + return FlushAsyncInternal(cancellationToken, this, _stream, _writePos, _readPos, _readLen); + } + + + private static async Task FlushAsyncInternal(CancellationToken cancellationToken, + BufferedStream _this, Stream stream, Int32 writePos, Int32 readPos, Int32 readLen) { + + // We bring instance fields down as local parameters to this async method becasue BufferedStream is derived from MarshalByRefObject. + // Field access would be from the async state machine i.e., not via the this pointer and would require runtime checking to see + // if we are talking to a remote object, which is currently very slow + + Contract.Assert(stream != null); + + SemaphoreSlim sem = _this.EnsureAsyncActiveSemaphoreInitialized(); + await sem.WaitAsync().ConfigureAwait(false); + try { + + if (writePos > 0) { + + await _this.FlushWriteAsync(cancellationToken).ConfigureAwait(false); + Contract.Assert(_this._writePos == 0 && _this._readPos == 0 && _this._readLen == 0); + return; + } + + if (readPos < readLen) { + + // If the underlying stream is not seekable AND we have something in the read buffer, then FlushRead would throw. + // We can either throw away the buffer resulting in date loss (!) or ignore the Flush. (We cannot throw becasue it + // would be a breaking change.) We opt into ignoring the Flush in that situation. + if (!stream.CanSeek) + return; + + _this.FlushRead(); // not async; it uses Seek, but there's no SeekAsync + + // User streams may have opted to throw from Flush if CanWrite is false (although the abstract Stream does not do so). + // However, if we do not forward the Flush to the underlying stream, we may have problems when chaining several streams. + // Let us make a best effort attempt: + if (stream.CanRead || stream is BufferedStream) + await stream.FlushAsync(cancellationToken).ConfigureAwait(false); + + Contract.Assert(_this._writePos == 0 && _this._readPos == 0 && _this._readLen == 0); + return; + } + + // We had no data in the buffer, but we still need to tell the underlying stream to flush. + if (stream.CanWrite || stream is BufferedStream) + await stream.FlushAsync(cancellationToken).ConfigureAwait(false); + + // There was nothing in the buffer: + Contract.Assert(_this._writePos == 0 && _this._readPos == _this._readLen); + + } finally { + sem.Release(); + } + } + + + // Reading is done in blocks, but someone could read 1 byte from the buffer then write. + // At that point, the underlying stream's pointer is out of sync with this stream's position. + // All write functions should call this function to ensure that the buffered data is not lost. + private void FlushRead() { + + Contract.Assert(_writePos == 0, "BufferedStream: Write buffer must be empty in FlushRead!"); + + if (_readPos - _readLen != 0) + _stream.Seek(_readPos - _readLen, SeekOrigin.Current); + + _readPos = 0; + _readLen = 0; + } + + + private void ClearReadBufferBeforeWrite() { + + // This is called by write methods to clear the read buffer. + + Contract.Assert(_readPos <= _readLen, "_readPos <= _readLen [" + _readPos +" <= " + _readLen + "]"); + + // No READ data in the buffer: + if (_readPos == _readLen) { + + _readPos = _readLen = 0; + return; + } + + // Must have READ data. + Contract.Assert(_readPos < _readLen); + + // If the underlying stream cannot seek, FlushRead would end up throwing NotSupported. + // However, since the user did not call a method that is intuitively expected to seek, a better message is in order. + // Ideally, we would throw an InvalidOperation here, but for backward compat we have to stick with NotSupported. + if (!_stream.CanSeek) + throw new NotSupportedException(Environment.GetResourceString("NotSupported_CannotWriteToBufferedStreamIfReadBufferCannotBeFlushed")); + + FlushRead(); + } + + + private void FlushWrite() { + + Contract.Assert(_readPos == 0 && _readLen == 0, + "BufferedStream: Read buffer must be empty in FlushWrite!"); + Contract.Assert(_buffer != null && _bufferSize >= _writePos, + "BufferedStream: Write buffer must be allocated and write position must be in the bounds of the buffer in FlushWrite!"); + + _stream.Write(_buffer, 0, _writePos); + _writePos = 0; + _stream.Flush(); + } + + + private async Task FlushWriteAsync(CancellationToken cancellationToken) { + + Contract.Assert(_readPos == 0 && _readLen == 0, + "BufferedStream: Read buffer must be empty in FlushWrite!"); + Contract.Assert(_buffer != null && _bufferSize >= _writePos, + "BufferedStream: Write buffer must be allocated and write position must be in the bounds of the buffer in FlushWrite!"); + + await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false); + _writePos = 0; + await _stream.FlushAsync(cancellationToken).ConfigureAwait(false); + } + + + private Int32 ReadFromBuffer(Byte[] array, Int32 offset, Int32 count) { + + Int32 readBytes = _readLen - _readPos; + Contract.Assert(readBytes >= 0); + + if (readBytes == 0) + return 0; + + Contract.Assert(readBytes > 0); + + if (readBytes > count) + readBytes = count; + + Buffer.InternalBlockCopy(_buffer, _readPos, array, offset, readBytes); + _readPos += readBytes; + + return readBytes; + } + + + private Int32 ReadFromBuffer(Byte[] array, Int32 offset, Int32 count, out Exception error) { + + try { + + error = null; + return ReadFromBuffer(array, offset, count); + + } catch (Exception ex) { + error = ex; + return 0; + } + } + + + public override int Read([In, Out] Byte[] array, Int32 offset, Int32 count) { + + if (array == null) + throw new ArgumentNullException("array", Environment.GetResourceString("ArgumentNull_Buffer")); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (count < 0) + throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (array.Length - offset < count) + throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen")); + Contract.EndContractBlock(); + + EnsureNotClosed(); + EnsureCanRead(); + + Int32 bytesFromBuffer = ReadFromBuffer(array, offset, count); + + // We may have read less than the number of bytes the user asked for, but that is part of the Stream contract. + + // Reading again for more data may cause us to block if we're using a device with no clear end of file, + // such as a serial port or pipe. If we blocked here and this code was used with redirected pipes for a + // process's standard output, this can lead to deadlocks involving two processes. + // BUT - this is a breaking change. + // So: If we could not read all bytes the user asked for from the buffer, we will try once from the underlying + // stream thus ensuring the same blocking behaviour as if the underlying stream was not wrapped in this BufferedStream. + if (bytesFromBuffer == count) + return bytesFromBuffer; + + Int32 alreadySatisfied = bytesFromBuffer; + if (bytesFromBuffer > 0) { + count -= bytesFromBuffer; + offset += bytesFromBuffer; + } + + // So the READ buffer is empty. + Contract.Assert(_readLen == _readPos); + _readPos = _readLen = 0; + + // If there was anything in the WRITE buffer, clear it. + if (_writePos > 0) + FlushWrite(); + + // If the requested read is larger than buffer size, avoid the buffer and still use a single read: + if (count >= _bufferSize) { + + return _stream.Read(array, offset, count) + alreadySatisfied; + } + + // Ok. We can fill the buffer: + EnsureBufferAllocated(); + _readLen = _stream.Read(_buffer, 0, _bufferSize); + + bytesFromBuffer = ReadFromBuffer(array, offset, count); + + // We may have read less than the number of bytes the user asked for, but that is part of the Stream contract. + // Reading again for more data may cause us to block if we're using a device with no clear end of stream, + // such as a serial port or pipe. If we blocked here & this code was used with redirected pipes for a process's + // standard output, this can lead to deadlocks involving two processes. Additionally, translating one read on the + // BufferedStream to more than one read on the underlying Stream may defeat the whole purpose of buffering of the + // underlying reads are significantly more expensive. + + return bytesFromBuffer + alreadySatisfied; + } + + + public override IAsyncResult BeginRead(Byte[] buffer, Int32 offset, Int32 count, AsyncCallback callback, Object state) { + + if (buffer == null) + throw new ArgumentNullException("buffer", Environment.GetResourceString("ArgumentNull_Buffer")); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (count < 0) + throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (buffer.Length - offset < count) + throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen")); + Contract.EndContractBlock(); + + // Previous version incorrectly threw NotSupported instead of ObjectDisposed. We keep that behaviour for back-compat. + // EnsureNotClosed(); + if (_stream == null) __Error.ReadNotSupported(); + EnsureCanRead(); + + Int32 bytesFromBuffer = 0; + // Try to satisfy the request from the buffer synchronously. But still need a sem-lock in case that another + // Async IO Task accesses the buffer concurrently. If we fail to acquire the lock without waiting, make this + // an Async operation. + SemaphoreSlim sem = base.EnsureAsyncActiveSemaphoreInitialized(); + Task semaphoreLockTask = sem.WaitAsync(); + if (semaphoreLockTask.Status == TaskStatus.RanToCompletion) { + + bool completeSynchronously = true; + try { + + Exception error; + bytesFromBuffer = ReadFromBuffer(buffer, offset, count, out error); + + // If we satistied enough data from the buffer, we can complete synchronously. + // Reading again for more data may cause us to block if we're using a device with no clear end of file, + // such as a serial port or pipe. If we blocked here and this code was used with redirected pipes for a + // process's standard output, this can lead to deadlocks involving two processes. + // BUT - this is a breaking change. + // So: If we could not read all bytes the user asked for from the buffer, we will try once from the underlying + // stream thus ensuring the same blocking behaviour as if the underlying stream was not wrapped in this BufferedStream. + completeSynchronously = (bytesFromBuffer == count || error != null); + + if (completeSynchronously) { + + SynchronousAsyncResult asyncResult = (error == null) + ? new SynchronousAsyncResult(bytesFromBuffer, state) + : new SynchronousAsyncResult(error, state, isWrite: false); + if (callback != null) + callback(asyncResult); + + return asyncResult; + } + } finally { + if (completeSynchronously) // if this is FALSE, we will be entering ReadFromUnderlyingStreamAsync and releasing there. + sem.Release(); + } + } + + // Delegate to the async implementation. + return BeginReadFromUnderlyingStream(buffer, offset + bytesFromBuffer, count - bytesFromBuffer, callback, state, + bytesFromBuffer, semaphoreLockTask); + } + + + private IAsyncResult BeginReadFromUnderlyingStream(Byte[] buffer, Int32 offset, Int32 count, AsyncCallback callback, Object state, + Int32 bytesAlreadySatisfied, Task semaphoreLockTask) { + + Task<Int32> readOp = ReadFromUnderlyingStreamAsync(buffer, offset, count, CancellationToken.None, + bytesAlreadySatisfied, semaphoreLockTask, useApmPattern: true); + return TaskToApm.Begin(readOp, callback, state); + } + + + public override Int32 EndRead(IAsyncResult asyncResult) { + + if (asyncResult == null) + throw new ArgumentNullException("asyncResult"); + Contract.Ensures(Contract.Result<Int32>() >= 0); + Contract.EndContractBlock(); + + var sAR = asyncResult as SynchronousAsyncResult; + if (sAR != null) + return SynchronousAsyncResult.EndRead(asyncResult); + return TaskToApm.End<Int32>(asyncResult); + } + + + private Task<Int32> LastSyncCompletedReadTask(Int32 val) { + + Task<Int32> t = _lastSyncCompletedReadTask; + Contract.Assert(t == null || t.Status == TaskStatus.RanToCompletion); + + if (t != null && t.Result == val) + return t; + + t = Task.FromResult<Int32>(val); + _lastSyncCompletedReadTask = t; + return t; + } + + + public override Task<Int32> ReadAsync(Byte[] buffer, Int32 offset, Int32 count, CancellationToken cancellationToken) { + + if (buffer == null) + throw new ArgumentNullException("buffer", Environment.GetResourceString("ArgumentNull_Buffer")); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (count < 0) + throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (buffer.Length - offset < count) + throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen")); + Contract.EndContractBlock(); + + // Fast path check for cancellation already requested + if (cancellationToken.IsCancellationRequested) + return Task.FromCancellation<Int32>(cancellationToken); + + EnsureNotClosed(); + EnsureCanRead(); + + Int32 bytesFromBuffer = 0; + // Try to satisfy the request from the buffer synchronously. But still need a sem-lock in case that another + // Async IO Task accesses the buffer concurrently. If we fail to acquire the lock without waiting, make this + // an Async operation. + SemaphoreSlim sem = base.EnsureAsyncActiveSemaphoreInitialized(); + Task semaphoreLockTask = sem.WaitAsync(); + if (semaphoreLockTask.Status == TaskStatus.RanToCompletion) { + + bool completeSynchronously = true; + try { + Exception error; + bytesFromBuffer = ReadFromBuffer(buffer, offset, count, out error); + + // If we satistied enough data from the buffer, we can complete synchronously. + // Reading again for more data may cause us to block if we're using a device with no clear end of file, + // such as a serial port or pipe. If we blocked here and this code was used with redirected pipes for a + // process's standard output, this can lead to deadlocks involving two processes. + // BUT - this is a breaking change. + // So: If we could not read all bytes the user asked for from the buffer, we will try once from the underlying + // stream thus ensuring the same blocking behaviour as if the underlying stream was not wrapped in this BufferedStream. + completeSynchronously = (bytesFromBuffer == count || error != null); + + if (completeSynchronously) { + + return (error == null) + ? LastSyncCompletedReadTask(bytesFromBuffer) + : Task.FromException<Int32>(error); + } + } finally { + if (completeSynchronously) // if this is FALSE, we will be entering ReadFromUnderlyingStreamAsync and releasing there. + sem.Release(); + } + } + + // Delegate to the async implementation. + return ReadFromUnderlyingStreamAsync(buffer, offset + bytesFromBuffer, count - bytesFromBuffer, cancellationToken, + bytesFromBuffer, semaphoreLockTask, useApmPattern: false); + } + + + /// <summary>BufferedStream should be as thin a wrapper as possible. We want that ReadAsync delegates to + /// ReadAsync of the underlying _stream and that BeginRead delegates to BeginRead of the underlying stream, + /// rather than calling the base Stream which implements the one in terms of the other. This allows BufferedStream + /// to affect the semantics of the stream it wraps as little as possible. At the same time, we want to share as + /// much code between the APM and the Async pattern implementations as possible. This method is called by both with + /// a corresponding useApmPattern value. Recall that Task implements IAsyncResult.</summary> + /// <returns>-2 if _bufferSize was set to 0 while waiting on the semaphore; otherwise num of bytes read.</returns> + private async Task<Int32> ReadFromUnderlyingStreamAsync(Byte[] array, Int32 offset, Int32 count, + CancellationToken cancellationToken, + Int32 bytesAlreadySatisfied, + Task semaphoreLockTask, bool useApmPattern) { + + // Same conditions validated with exceptions in ReadAsync: + // (These should be Contract.Requires(..) but that method had some issues in async methods; using Assert(..) for now.) + Contract.Assert(array != null); + Contract.Assert(offset >= 0); + Contract.Assert(count >= 0); + Contract.Assert(array.Length - offset >= count); + Contract.Assert(_stream != null); + Contract.Assert(_stream.CanRead); + Contract.Assert(_bufferSize > 0); + Contract.Assert(semaphoreLockTask != null); + + // Employ async waiting based on the same synchronization used in BeginRead of the abstract Stream. + await semaphoreLockTask.ConfigureAwait(false); + try { + + // The buffer might have been changed by another async task while we were waiting on the semaphore. + // Check it now again. + Int32 bytesFromBuffer = ReadFromBuffer(array, offset, count); + if (bytesFromBuffer == count) + return bytesAlreadySatisfied + bytesFromBuffer; + + if (bytesFromBuffer > 0) { + count -= bytesFromBuffer; + offset += bytesFromBuffer; + bytesAlreadySatisfied += bytesFromBuffer; + } + + Contract.Assert(_readLen == _readPos); + _readPos = _readLen = 0; + + // If there was anything in the WRITE buffer, clear it. + if (_writePos > 0) + await FlushWriteAsync(cancellationToken).ConfigureAwait(false); // no Begin-End read version for Flush. Use Async. + + // If the requested read is larger than buffer size, avoid the buffer and still use a single read: + if (count >= _bufferSize) { + + if (useApmPattern) { + EnsureBeginEndAwaitableAllocated(); + _stream.BeginRead(array, offset, count, BeginEndAwaitableAdapter.Callback, _beginEndAwaitable); + return bytesAlreadySatisfied + _stream.EndRead(await _beginEndAwaitable); + } else { + return bytesAlreadySatisfied + await _stream.ReadAsync(array, offset, count, cancellationToken).ConfigureAwait(false); + } + } + + // Ok. We can fill the buffer: + EnsureBufferAllocated(); + if (useApmPattern) { + EnsureBeginEndAwaitableAllocated(); + _stream.BeginRead(_buffer, 0, _bufferSize, BeginEndAwaitableAdapter.Callback, _beginEndAwaitable); + _readLen = _stream.EndRead(await _beginEndAwaitable); + } else { + _readLen = await _stream.ReadAsync(_buffer, 0, _bufferSize, cancellationToken).ConfigureAwait(false); + } + + bytesFromBuffer = ReadFromBuffer(array, offset, count); + return bytesAlreadySatisfied + bytesFromBuffer; + + } finally { + SemaphoreSlim sem = base.EnsureAsyncActiveSemaphoreInitialized(); + sem.Release(); + } + } + + + public override Int32 ReadByte() { + + EnsureNotClosed(); + EnsureCanRead(); + + if (_readPos == _readLen) { + + if (_writePos > 0) + FlushWrite(); + + EnsureBufferAllocated(); + _readLen = _stream.Read(_buffer, 0, _bufferSize); + _readPos = 0; + } + + if (_readPos == _readLen) + return -1; + + Int32 b = _buffer[_readPos++]; + return b; + } + + + private void WriteToBuffer(Byte[] array, ref Int32 offset, ref Int32 count) { + + Int32 bytesToWrite = Math.Min(_bufferSize - _writePos, count); + + if (bytesToWrite <= 0) + return; + + EnsureBufferAllocated(); + Buffer.InternalBlockCopy(array, offset, _buffer, _writePos, bytesToWrite); + + _writePos += bytesToWrite; + count -= bytesToWrite; + offset += bytesToWrite; + } + + + private void WriteToBuffer(Byte[] array, ref Int32 offset, ref Int32 count, out Exception error) { + + try { + + error = null; + WriteToBuffer(array, ref offset, ref count); + + } catch (Exception ex) { + error = ex; + } + } + + + public override void Write(Byte[] array, Int32 offset, Int32 count) { + + if (array == null) + throw new ArgumentNullException("array", Environment.GetResourceString("ArgumentNull_Buffer")); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (count < 0) + throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (array.Length - offset < count) + throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen")); + Contract.EndContractBlock(); + + EnsureNotClosed(); + EnsureCanWrite(); + + if (_writePos == 0) + ClearReadBufferBeforeWrite(); + + #region Write algorithm comment + // We need to use the buffer, while avoiding unnecessary buffer usage / memory copies. + // We ASSUME that memory copies are much cheaper than writes to the underlying stream, so if an extra copy is + // guaranteed to reduce the number of writes, we prefer it. + // We pick a simple strategy that makes degenerate cases rare if our assumptions are right. + // + // For ever write, we use a simple heuristic (below) to decide whether to use the buffer. + // The heuristic has the desirable property (*) that if the specified user data can fit into the currently available + // buffer space without filling it up completely, the heuristic will always tell us to use the buffer. It will also + // tell us to use the buffer in cases where the current write would fill the buffer, but the remaining data is small + // enough such that subsequent operations can use the buffer again. + // + // Algorithm: + // Determine whether or not to buffer according to the heuristic (below). + // If we decided to use the buffer: + // Copy as much user data as we can into the buffer. + // If we consumed all data: We are finished. + // Otherwise, write the buffer out. + // Copy the rest of user data into the now cleared buffer (no need to write out the buffer again as the heuristic + // will prevent it from being filled twice). + // If we decided not to use the buffer: + // Can the data already in the buffer and current user data be combines to a single write + // by allocating a "shadow" buffer of up to twice the size of _bufferSize (up to a limit to avoid LOH)? + // Yes, it can: + // Allocate a larger "shadow" buffer and ensure the buffered data is moved there. + // Copy user data to the shadow buffer. + // Write shadow buffer to the underlying stream in a single operation. + // No, it cannot (amount of data is still too large): + // Write out any data possibly in the buffer. + // Write out user data directly. + // + // Heuristic: + // If the subsequent write operation that follows the current write operation will result in a write to the + // underlying stream in case that we use the buffer in the current write, while it would not have if we avoided + // using the buffer in the current write (by writing current user data to the underlying stream directly), then we + // prefer to avoid using the buffer since the corresponding memory copy is wasted (it will not reduce the number + // of writes to the underlying stream, which is what we are optimising for). + // ASSUME that the next write will be for the same amount of bytes as the current write (most common case) and + // determine if it will cause a write to the underlying stream. If the next write is actually larger, our heuristic + // still yields the right behaviour, if the next write is actually smaller, we may making an unnecessary write to + // the underlying stream. However, this can only occur if the current write is larger than half the buffer size and + // we will recover after one iteration. + // We have: + // useBuffer = (_writePos + count + count < _bufferSize + _bufferSize) + // + // Example with _bufferSize = 20, _writePos = 6, count = 10: + // + // +---------------------------------------+---------------------------------------+ + // | current buffer | next iteration's "future" buffer | + // +---------------------------------------+---------------------------------------+ + // |0| | | | | | | | | |1| | | | | | | | | |2| | | | | | | | | |3| | | | | | | | | | + // |0|1|2|3|4|5|6|7|8|9|0|1|2|3|4|5|6|7|8|9|0|1|2|3|4|5|6|7|8|9|0|1|2|3|4|5|6|7|8|9| + // +-----------+-------------------+-------------------+---------------------------+ + // | _writePos | current count | assumed next count|avail buff after next write| + // +-----------+-------------------+-------------------+---------------------------+ + // + // A nice property (*) of this heuristic is that it will always succeed if the user data completely fits into the + // available buffer, i.e. if count < (_bufferSize - _writePos). + #endregion Write algorithm comment + + Contract.Assert(_writePos < _bufferSize); + + Int32 totalUserBytes; + bool useBuffer; + checked { // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early: + totalUserBytes = _writePos + count; + useBuffer = (totalUserBytes + count < (_bufferSize + _bufferSize)); + } + + if (useBuffer) { + + WriteToBuffer(array, ref offset, ref count); + + if (_writePos < _bufferSize) { + + Contract.Assert(count == 0); + return; + } + + Contract.Assert(count >= 0); + Contract.Assert(_writePos == _bufferSize); + Contract.Assert(_buffer != null); + + _stream.Write(_buffer, 0, _writePos); + _writePos = 0; + + WriteToBuffer(array, ref offset, ref count); + + Contract.Assert(count == 0); + Contract.Assert(_writePos < _bufferSize); + + } else { // if (!useBuffer) + + // Write out the buffer if necessary. + if (_writePos > 0) { + + Contract.Assert(_buffer != null); + Contract.Assert(totalUserBytes >= _bufferSize); + + // Try avoiding extra write to underlying stream by combining previously buffered data with current user data: + if (totalUserBytes <= (_bufferSize + _bufferSize) && totalUserBytes <= MaxShadowBufferSize) { + + EnsureShadowBufferAllocated(); + Buffer.InternalBlockCopy(array, offset, _buffer, _writePos, count); + _stream.Write(_buffer, 0, totalUserBytes); + _writePos = 0; + return; + } + + _stream.Write(_buffer, 0, _writePos); + _writePos = 0; + } + + // Write out user data. + _stream.Write(array, offset, count); + } + } + + + + + public override IAsyncResult BeginWrite(Byte[] buffer, Int32 offset, Int32 count, AsyncCallback callback, Object state) { + + if (buffer == null) + throw new ArgumentNullException("buffer", Environment.GetResourceString("ArgumentNull_Buffer")); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (count < 0) + throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (buffer.Length - offset < count) + throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen")); + Contract.EndContractBlock(); + + // Previous version incorrectly threw NotSupported instead of ObjectDisposed. We keep that behaviour for back-compat. + // EnsureNotClosed(); + if (_stream == null) __Error.ReadNotSupported(); + EnsureCanWrite(); + + // Try to satisfy the request from the buffer synchronously. But still need a sem-lock in case that another + // Async IO Task accesses the buffer concurrently. If we fail to acquire the lock without waiting, make this + // an Async operation. + SemaphoreSlim sem = base.EnsureAsyncActiveSemaphoreInitialized(); + Task semaphoreLockTask = sem.WaitAsync(); + if (semaphoreLockTask.Status == TaskStatus.RanToCompletion) { + + bool completeSynchronously = true; + try { + if (_writePos == 0) + ClearReadBufferBeforeWrite(); + + // If the write completely fits into the buffer, we can complete synchronously. + Contract.Assert(_writePos < _bufferSize); + completeSynchronously = (count < _bufferSize - _writePos); + + if (completeSynchronously) { + + Exception error; + WriteToBuffer(buffer, ref offset, ref count, out error); + Contract.Assert(count == 0); + + SynchronousAsyncResult asyncResult = (error == null) + ? new SynchronousAsyncResult(state) + : new SynchronousAsyncResult(error, state, isWrite: true); + if (callback != null) + callback(asyncResult); + + return asyncResult; + } + } finally { + if (completeSynchronously) // if this is FALSE, we will be entering WriteToUnderlyingStreamAsync and releasing there. + sem.Release(); + } + } + + // Delegate to the async implementation. + return BeginWriteToUnderlyingStream(buffer, offset, count, callback, state, semaphoreLockTask); + } + + + private IAsyncResult BeginWriteToUnderlyingStream(Byte[] buffer, Int32 offset, Int32 count, AsyncCallback callback, Object state, + Task semaphoreLockTask) { + + Task writeOp = WriteToUnderlyingStreamAsync(buffer, offset, count, CancellationToken.None, semaphoreLockTask, useApmPattern: true); + return TaskToApm.Begin(writeOp, callback, state); + } + + + public override void EndWrite(IAsyncResult asyncResult) { + + if (asyncResult == null) + throw new ArgumentNullException("asyncResult"); + Contract.EndContractBlock(); + + var sAR = asyncResult as SynchronousAsyncResult; + if (sAR != null) { + SynchronousAsyncResult.EndWrite(asyncResult); + return; + } + + TaskToApm.End(asyncResult); + } + + + public override Task WriteAsync(Byte[] buffer, Int32 offset, Int32 count, CancellationToken cancellationToken) { + + if (buffer == null) + throw new ArgumentNullException("buffer", Environment.GetResourceString("ArgumentNull_Buffer")); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (count < 0) + throw new ArgumentOutOfRangeException("count", Environment.GetResourceString("ArgumentOutOfRange_NeedNonNegNum")); + if (buffer.Length - offset < count) + throw new ArgumentException(Environment.GetResourceString("Argument_InvalidOffLen")); + Contract.EndContractBlock(); + + // Fast path check for cancellation already requested + if (cancellationToken.IsCancellationRequested) + return Task.FromCancellation<Int32>(cancellationToken); + + EnsureNotClosed(); + EnsureCanWrite(); + + // Try to satisfy the request from the buffer synchronously. But still need a sem-lock in case that another + // Async IO Task accesses the buffer concurrently. If we fail to acquire the lock without waiting, make this + // an Async operation. + SemaphoreSlim sem = base.EnsureAsyncActiveSemaphoreInitialized(); + Task semaphoreLockTask = sem.WaitAsync(); + if (semaphoreLockTask.Status == TaskStatus.RanToCompletion) { + + bool completeSynchronously = true; + try { + + if (_writePos == 0) + ClearReadBufferBeforeWrite(); + + Contract.Assert(_writePos < _bufferSize); + + // If the write completely fits into the buffer, we can complete synchronously: + completeSynchronously = (count < _bufferSize - _writePos); + + if (completeSynchronously) { + + Exception error; + WriteToBuffer(buffer, ref offset, ref count, out error); + Contract.Assert(count == 0); + + return (error == null) + ? Task.CompletedTask + : Task.FromException(error); + } + } finally { + if (completeSynchronously) // if this is FALSE, we will be entering WriteToUnderlyingStreamAsync and releasing there. + sem.Release(); + } + } + + // Delegate to the async implementation. + return WriteToUnderlyingStreamAsync(buffer, offset, count, cancellationToken, semaphoreLockTask, useApmPattern: false); + } + + + /// <summary>BufferedStream should be as thin a wrapper as possible. We want that WriteAsync delegates to + /// WriteAsync of the underlying _stream and that BeginWrite delegates to BeginWrite of the underlying stream, + /// rather than calling the base Stream which implements the one in terms of the other. This allows BufferedStream + /// to affect the semantics of the stream it wraps as little as possible. At the same time, we want to share as + /// much code between the APM and the Async pattern implementations as possible. This method is called by both with + /// a corresponding useApmPattern value. Recall that Task implements IAsyncResult.</summary> + private async Task WriteToUnderlyingStreamAsync(Byte[] array, Int32 offset, Int32 count, + CancellationToken cancellationToken, + Task semaphoreLockTask, bool useApmPattern) { + + // (These should be Contract.Requires(..) but that method had some issues in async methods; using Assert(..) for now.) + Contract.Assert(array != null); + Contract.Assert(offset >= 0); + Contract.Assert(count >= 0); + Contract.Assert(array.Length - offset >= count); + Contract.Assert(_stream != null); + Contract.Assert(_stream.CanWrite); + Contract.Assert(_bufferSize > 0); + Contract.Assert(semaphoreLockTask != null); + + // See the LARGE COMMENT in Write(..) for the explanation of the write buffer algorithm. + + await semaphoreLockTask.ConfigureAwait(false); + try { + + // The buffer might have been changed by another async task while we were waiting on the semaphore. + // However, note that if we recalculate the sync completion condition to TRUE, then useBuffer will also be TRUE. + + if (_writePos == 0) + ClearReadBufferBeforeWrite(); + + Int32 totalUserBytes; + bool useBuffer; + checked { // We do not expect buffer sizes big enough for an overflow, but if it happens, lets fail early: + totalUserBytes = _writePos + count; + useBuffer = (totalUserBytes + count < (_bufferSize + _bufferSize)); + } + + if (useBuffer) { + + WriteToBuffer(array, ref offset, ref count); + + if (_writePos < _bufferSize) { + + Contract.Assert(count == 0); + return; + } + + Contract.Assert(count >= 0); + Contract.Assert(_writePos == _bufferSize); + Contract.Assert(_buffer != null); + + if (useApmPattern) { + EnsureBeginEndAwaitableAllocated(); + _stream.BeginWrite(_buffer, 0, _writePos, BeginEndAwaitableAdapter.Callback, _beginEndAwaitable); + _stream.EndWrite(await _beginEndAwaitable); + } else { + await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false); + } + _writePos = 0; + + WriteToBuffer(array, ref offset, ref count); + + Contract.Assert(count == 0); + Contract.Assert(_writePos < _bufferSize); + + } else { // if (!useBuffer) + + // Write out the buffer if necessary. + if (_writePos > 0) { + + Contract.Assert(_buffer != null); + Contract.Assert(totalUserBytes >= _bufferSize); + + // Try avoiding extra write to underlying stream by combining previously buffered data with current user data: + if (totalUserBytes <= (_bufferSize + _bufferSize) && totalUserBytes <= MaxShadowBufferSize) { + + EnsureShadowBufferAllocated(); + Buffer.InternalBlockCopy(array, offset, _buffer, _writePos, count); + if (useApmPattern) { + EnsureBeginEndAwaitableAllocated(); + _stream.BeginWrite(_buffer, 0, totalUserBytes, BeginEndAwaitableAdapter.Callback, _beginEndAwaitable); + _stream.EndWrite(await _beginEndAwaitable); + } else { + await _stream.WriteAsync(_buffer, 0, totalUserBytes, cancellationToken).ConfigureAwait(false); + } + _writePos = 0; + return; + } + + if (useApmPattern) { + EnsureBeginEndAwaitableAllocated(); + _stream.BeginWrite(_buffer, 0, _writePos, BeginEndAwaitableAdapter.Callback, _beginEndAwaitable); + _stream.EndWrite(await _beginEndAwaitable); + } else { + await _stream.WriteAsync(_buffer, 0, _writePos, cancellationToken).ConfigureAwait(false); + } + _writePos = 0; + } + + // Write out user data. + if (useApmPattern) { + EnsureBeginEndAwaitableAllocated(); + _stream.BeginWrite(array, offset, count, BeginEndAwaitableAdapter.Callback, _beginEndAwaitable); + _stream.EndWrite(await _beginEndAwaitable); + } else { + await _stream.WriteAsync(array, offset, count, cancellationToken).ConfigureAwait(false); + } + } + } finally { + SemaphoreSlim sem = base.EnsureAsyncActiveSemaphoreInitialized(); + sem.Release(); + } + } + + + public override void WriteByte(Byte value) { + + EnsureNotClosed(); + + if (_writePos == 0) { + + EnsureCanWrite(); + ClearReadBufferBeforeWrite(); + EnsureBufferAllocated(); + } + + // We should not be flushing here, but only writing to the underlying stream, but previous version flushed, so we keep this. + if (_writePos >= _bufferSize - 1) + FlushWrite(); + + _buffer[_writePos++] = value; + + Contract.Assert(_writePos < _bufferSize); + } + + + public override Int64 Seek(Int64 offset, SeekOrigin origin) { + + EnsureNotClosed(); + EnsureCanSeek(); + + // If we have bytes in the WRITE buffer, flush them out, seek and be done. + if (_writePos > 0) { + + // We should be only writing the buffer and not flushing, + // but the previous version did flush and we stick to it for back-compat reasons. + FlushWrite(); + return _stream.Seek(offset, origin); + } + + // The buffer is either empty or we have a buffered READ. + + if (_readLen - _readPos > 0 && origin == SeekOrigin.Current) { + + // If we have bytes in the READ buffer, adjust the seek offset to account for the resulting difference + // between this stream's position and the underlying stream's position. + offset -= (_readLen - _readPos); + } + + Int64 oldPos = Position; + Contract.Assert(oldPos == _stream.Position + (_readPos - _readLen)); + + Int64 newPos = _stream.Seek(offset, origin); + + // If the seek destination is still within the data currently in the buffer, we want to keep the buffer data and continue using it. + // Otherwise we will throw away the buffer. This can only happen on READ, as we flushed WRITE data above. + + // The offset of the new/updated seek pointer within _buffer: + _readPos = (Int32) (newPos - (oldPos - _readPos)); + + // If the offset of the updated seek pointer in the buffer is still legal, then we can keep using the buffer: + if (0 <= _readPos && _readPos < _readLen) { + + // Adjust the seek pointer of the underlying stream to reflect the amount of useful bytes in the read buffer: + _stream.Seek(_readLen - _readPos, SeekOrigin.Current); + + } else { // The offset of the updated seek pointer is not a legal offset. Loose the buffer. + + _readPos = _readLen = 0; + } + + Contract.Assert(newPos == Position, "newPos (=" + newPos + ") == Position (=" + Position + ")"); + return newPos; + } + + + public override void SetLength(Int64 value) { + + if (value < 0) + throw new ArgumentOutOfRangeException("value", Environment.GetResourceString("ArgumentOutOfRange_NegFileSize")); + Contract.EndContractBlock(); + + EnsureNotClosed(); + EnsureCanSeek(); + EnsureCanWrite(); + + Flush(); + _stream.SetLength(value); + } + +} // class BufferedStream +} // namespace |