Skip to content

Commit

Permalink
Use UnixFileStream's ReadAsync implementation on Windows as well (#56682
Browse files Browse the repository at this point in the history
)

UnixFileStream's ReadAsync implementation uses a reusable IValueTaskSource implementation to avoid allocating a new work item on every read.  We can push that implementation down to OSFileStreamStrategy, and then use it for the Windows implementation of ReadAsync as well when IsAsync==false, rather than delegating to the base Stream implementation.

This PR almost entirely just moves code around.  The only change to logic is in RandomAccess.Windows.cs, to only set an offset into the NativeOverlapped if the SafeFileHandle is seekable; otherwise, it fails when used with pipes.
  • Loading branch information
stephentoub authored Aug 2, 2021
1 parent 95ef79a commit 79c4144
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -695,8 +695,11 @@ private static NativeOverlapped GetNativeOverlappedForSyncHandle(SafeFileHandle
Debug.Assert(!handle.IsAsync);

NativeOverlapped result = default;
result.OffsetLow = unchecked((int)fileOffset);
result.OffsetHigh = (int)(fileOffset >> 32);
if (handle.CanSeek)
{
result.OffsetLow = unchecked((int)fileOffset);
result.OffsetHigh = (int)(fileOffset >> 32);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,7 @@ internal AsyncWindowsFileStreamStrategy(string path, FileMode mode, FileAccess a

internal override bool IsAsync => true;

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
=> ReadAsyncInternal(destination, cancellationToken);

private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
if (!CanSeek)
{
Expand All @@ -52,17 +46,11 @@ private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationT
: (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
=> WriteAsyncInternal(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
=> WriteAsyncInternal(buffer, cancellationToken);

private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, buffer.Length) - buffer.Length : -1;

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, writeOffset, cancellationToken);
(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, buffer, writeOffset, cancellationToken);
return vts != null
? new ValueTask(vts, vts.Version)
: (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
Expand Down Expand Up @@ -120,15 +108,5 @@ await FileStreamHelpers
}
}
}

public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);

public override int EndRead(IAsyncResult asyncResult) => TaskToApm.End<int>(asyncResult);

public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);

public override void EndWrite(IAsyncResult asyncResult) => TaskToApm.End(asyncResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Microsoft.Win32.SafeHandles;

namespace System.IO.Strategies
Expand All @@ -13,6 +14,7 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy
{
protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws
private readonly FileAccess _access; // What file was opened for.
private ReadAsyncTaskSource? _readAsyncTaskSource; // Cached IValueTaskSource used for async-over-sync reads

protected long _filePosition;
protected long _length = -1; // negative means that hasn't been fetched.
Expand Down Expand Up @@ -69,6 +71,8 @@ internal OSFileStreamStrategy(string path, FileMode mode, FileAccess access, Fil
}
}

internal override bool IsAsync => _fileHandle.IsAsync;

public sealed override bool CanSeek => _fileHandle.CanSeek;

public sealed override bool CanRead => !_fileHandle.IsClosed && (_access & FileAccess.Read) != 0;
Expand Down Expand Up @@ -278,5 +282,145 @@ public sealed override void Write(ReadOnlySpan<byte> buffer)
RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
_filePosition += buffer.Length;
}

public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(WriteAsync(buffer, offset, count), callback, state);

public sealed override void EndWrite(IAsyncResult asyncResult) =>
TaskToApm.End(asyncResult);

public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
WriteAsync(new ReadOnlyMemory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken);
}

public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) =>
TaskToApm.Begin(ReadAsync(buffer, offset, count), callback, state);

public sealed override int EndRead(IAsyncResult asyncResult) =>
TaskToApm.End<int>(asyncResult);

public sealed override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) =>
ReadAsync(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken)
{
if (!CanSeek)
{
return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
}

// This implementation updates the file position before the operation starts and updates it after incomplete read.
// Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations.
long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this);
return rats.QueueRead(destination, readOffset, cancellationToken);
}

/// <summary>Provides a reusable ValueTask-backing object for implementing ReadAsync.</summary>
private sealed class ReadAsyncTaskSource : IValueTaskSource<int>, IThreadPoolWorkItem
{
private readonly OSFileStreamStrategy _stream;
private ManualResetValueTaskSourceCore<int> _source;

private Memory<byte> _destination;
private long _readOffset;
private ExecutionContext? _context;
private CancellationToken _cancellationToken;

public ReadAsyncTaskSource(OSFileStreamStrategy stream) => _stream = stream;

public ValueTask<int> QueueRead(Memory<byte> destination, long readOffset, CancellationToken cancellationToken)
{
_destination = destination;
_readOffset = readOffset;
_cancellationToken = cancellationToken;
_context = ExecutionContext.Capture();

ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: true);
return new ValueTask<int>(this, _source.Version);
}

void IThreadPoolWorkItem.Execute()
{
if (_context is null || _context.IsDefault)
{
Read();
}
else
{
ExecutionContext.RunForThreadPoolUnsafe(_context, static x => x.Read(), this);
}
}

private void Read()
{
Exception? error = null;
int result = 0;

try
{
if (_cancellationToken.IsCancellationRequested)
{
error = new OperationCanceledException(_cancellationToken);
}
else
{
result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset);
}
}
catch (Exception e)
{
error = e;
}
finally
{
// if the read was incomplete, we need to update the file position:
if (result != _destination.Length)
{
_stream.OnIncompleteRead(_destination.Length, result);
}

_destination = default;
_readOffset = -1;
_cancellationToken = default;
_context = null;
}

if (error is not null)
{
_source.SetException(error);
}
else
{
_source.SetResult(result);
}
}

int IValueTaskSource<int>.GetResult(short token)
{
try
{
return _source.GetResult(token);
}
finally
{
_source.Reset();
#pragma warning disable CS0197
Volatile.Write(ref _stream._readAsyncTaskSource, this);
#pragma warning restore CS0197
}
}

ValueTaskSourceStatus IValueTaskSource<int>.GetStatus(short token) =>
_source.GetStatus(token);

void IValueTaskSource<int>.OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags) =>
_source.OnCompleted(continuation, state, token, flags);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Win32.SafeHandles;

namespace System.IO.Strategies
Expand All @@ -20,45 +17,5 @@ internal SyncWindowsFileStreamStrategy(string path, FileMode mode, FileAccess ac
}

internal override bool IsAsync => false;

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Read is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginRead, since we already know this is FileStream rather
// than something derived from it and what our BeginRead implementation is going to do.
return BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
}

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Read is invoked asynchronously. But if we have a byte[], we can do so using the base Stream's
// internal helper that bypasses delegating to BeginRead, since we already know this is FileStream
// rather than something derived from it and what our BeginRead implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask<int>(BeginReadInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
base.ReadAsync(buffer, cancellationToken);
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Write is invoked asynchronously. But we can do so using the base Stream's internal helper
// that bypasses delegating to BeginWrite, since we already know this is FileStream rather
// than something derived from it and what our BeginWrite implementation is going to do.
return BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
}

public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
// If we weren't opened for asynchronous I/O, we still call to the base implementation so that
// Write is invoked asynchronously. But if we have a byte[], we can do so using the base Stream's
// internal helper that bypasses delegating to BeginWrite, since we already know this is FileStream
// rather than something derived from it and what our BeginWrite implementation is going to do.
return MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment) ?
new ValueTask(BeginWriteInternal(segment.Array!, segment.Offset, segment.Count, null, null, serializeAsynchronously: true, apm: false)) :
base.WriteAsync(buffer, cancellationToken);
}
}
}
Loading

0 comments on commit 79c4144

Please sign in to comment.