Skip to content

Commit

Permalink
Add CancellationToken to TextReader.ReadXAsync (#61898)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Sitnik <adam.sitnik@gmail.com>
Co-authored-by: Stephen Toub <stoub@microsoft.com>
  • Loading branch information
3 people authored Jan 25, 2022
1 parent 4e165d3 commit 7483b24
Show file tree
Hide file tree
Showing 9 changed files with 311 additions and 19 deletions.
15 changes: 15 additions & 0 deletions src/libraries/System.Console/src/System/IO/SyncTextReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO
Expand Down Expand Up @@ -95,11 +96,25 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
{
return cancellationToken.IsCancellationRequested ?
ValueTask.FromCanceled<string?>(cancellationToken) :
new ValueTask<string?>(ReadLine());
}

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
return cancellationToken.IsCancellationRequested ?
Task.FromCanceled<string>(cancellationToken) :
Task.FromResult(ReadToEnd());
}

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
59 changes: 59 additions & 0 deletions src/libraries/System.IO/tests/StreamReader/StreamReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,44 @@ public async Task ReadToEndAsync()
Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCancellationToken()
{
using var sw = new StreamReader(GetLargeStream());
var result = await sw.ReadToEndAsync(default);

Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var sw = new StreamReader(GetLargeStream());
using var cts = new CancellationTokenSource();
cts.Cancel();
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await sw.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
[SkipOnPlatform(TestPlatforms.Browser, "Not supported on Browser.")]
public async Task ReadToEndAsync_WithCancellation()
{
string path = GetTestFilePath();

// create large (~100MB) file
File.WriteAllLines(path, Enumerable.Repeat("A very large file used for testing StreamReader cancellation. 0123456789012345678901234567890123456789.", 1_000_000));

using StreamReader reader = File.OpenText(path);
using CancellationTokenSource cts = new (TimeSpan.FromMilliseconds(50));
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await reader.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public void GetBaseStream()
{
Expand Down Expand Up @@ -301,6 +339,27 @@ public void VanillaReadLines2()
Assert.Equal(valueString.Substring(1, valueString.IndexOf('\r') - 1), data);
}

[Fact]
public async Task VanillaReadLineAsync()
{
var baseInfo = GetCharArrayStream();
var sr = baseInfo.Item2;

string valueString = new string(baseInfo.Item1);

var data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(0, valueString.IndexOf('\r')), data);

data = await sr.ReadLineAsync(default);
Assert.Equal(valueString.Substring(valueString.IndexOf('\r') + 1, 3), data);

data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(valueString.IndexOf('\n') + 1, 2), data);

data = await sr.ReadLineAsync(default);
Assert.Equal((valueString.Substring(valueString.LastIndexOf('\n') + 1)), data);
}

[Fact]
public async Task ContinuousNewLinesAndTabsAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public static void ReadLine()
}
}

[Fact]
public static async Task ReadLineAsync()
{
string str1 = "Hello\0\t\v \\ World";
string str2 = str1 + Environment.NewLine + str1;

using (StringReader sr = new StringReader(str1))
{
Assert.Equal(str1, await sr.ReadLineAsync());
}
using (StringReader sr = new StringReader(str2))
{
Assert.Equal(str1, await sr.ReadLineAsync(default));
Assert.Equal(str1, await sr.ReadLineAsync(default));
}
}

[Fact]
public static void ReadPseudoRandomString()
{
Expand Down Expand Up @@ -155,6 +172,14 @@ public static void ReadToEndPseudoRandom() {
Assert.Equal(str1, sr.ReadToEnd());
}

[Fact]
public static async Task ReadToEndAsyncString()
{
string str1 = "Hello\0\t\v \\ World";
StringReader sr = new StringReader(str1);
Assert.Equal(str1, await sr.ReadToEndAsync(default));
}

[Fact]
public static void Closed_DisposedExceptions()
{
Expand Down Expand Up @@ -278,6 +303,8 @@ public async Task Precanceled_ThrowsException()

await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadBlockAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadLineAsync(new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadToEndAsync(new CancellationToken(true)));
}

private static void ValidateDisposedExceptions(StringReader sr)
Expand Down
21 changes: 21 additions & 0 deletions src/libraries/System.IO/tests/TextReader/TextReaderTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand Down Expand Up @@ -54,6 +55,26 @@ public async Task ReadToEndAsync()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
var result = await tr.ReadToEndAsync(default);
Assert.Equal(5000, result.Length);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
using var cts = new CancellationTokenSource();
cts.Cancel();
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await tr.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public void TestRead()
{
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/System.Private.CoreLib/src/System/IO/File.cs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private static async Task<string[]> InternalReadAllLinesAsync(string path, Encod
cancellationToken.ThrowIfCancellationRequested();
string? line;
List<string> lines = new List<string>();
while ((line = await sr.ReadLineAsync().ConfigureAwait(false)) != null)
while ((line = await sr.ReadLineAsync(cancellationToken).ConfigureAwait(false)) != null)
{
lines.Add(line);
cancellationToken.ThrowIfCancellationRequested();
Expand Down
81 changes: 68 additions & 13 deletions src/libraries/System.Private.CoreLib/src/System/IO/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -845,29 +845,59 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
return sb.ToString();
}

public override Task<string?> ReadLineAsync()
public override Task<string?> ReadLineAsync() =>
ReadLineAsync(default).AsTask();

/// <summary>
/// Reads a line of characters asynchronously from the current stream and returns the data as a string.
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A value task that represents the asynchronous read operation. The value of the <c>TResult</c>
/// parameter contains the next line from the stream, or is <see langword="null" /> if all of the characters have been read.</returns>
/// <exception cref="ArgumentOutOfRangeException">The number of characters in the next line is larger than <see cref="int.MaxValue"/>.</exception>
/// <exception cref="ObjectDisposedException">The stream reader has been disposed.</exception>
/// <exception cref="InvalidOperationException">The reader is currently in use by a previous read operation.</exception>
/// <example>
/// The following example shows how to read and print all lines from the file until the end of the file is reached or the operation timed out.
/// <code lang="C#">
/// using CancellationTokenSource tokenSource = new (TimeSpan.FromSeconds(1));
/// using StreamReader reader = File.OpenText("existingfile.txt");
///
/// string line;
/// while ((line = await reader.ReadLineAsync(tokenSource.Token)) is not null)
/// {
/// Console.WriteLine(line);
/// }
/// </code>
/// </example>
/// <remarks>
/// If this method is canceled via <paramref name="cancellationToken"/>, some data
/// that has been read from the current <see cref="Stream"/> but not stored (by the
/// <see cref="StreamReader"/>) or returned (to the caller) may be lost.
/// </remarks>
public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadLineAsync();
return base.ReadLineAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string?> task = ReadLineAsyncInternal();
Task<string?> task = ReadLineAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
return new ValueTask<string?>(task);
}

private async Task<string?> ReadLineAsyncInternal()
private async Task<string?> ReadLineAsyncInternal(CancellationToken cancellationToken)
{
if (_charPos == _charLen && (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) == 0)
if (_charPos == _charLen && (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) == 0)
{
return null;
}
Expand Down Expand Up @@ -903,7 +933,7 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)

_charPos = tmpCharPos = i + 1;

if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) > 0))
if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) > 0))
{
tmpCharPos = _charPos;
if (_charBuffer[tmpCharPos] == '\n')
Expand All @@ -921,32 +951,57 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
i = tmpCharLen - tmpCharPos;
sb ??= new StringBuilder(i + 80);
sb.Append(tmpCharBuffer, tmpCharPos, i);
} while (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false) > 0);
} while (await ReadBufferAsync(cancellationToken).ConfigureAwait(false) > 0);

return sb.ToString();
}

public override Task<string> ReadToEndAsync()
public override Task<string> ReadToEndAsync() => ReadToEndAsync(default);

/// <summary>
/// Reads all characters from the current position to the end of the stream asynchronously and returns them as one string.
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous read operation. The value of the <c>TResult</c> parameter contains
/// a string with the characters from the current position to the end of the stream.</returns>
/// <exception cref="ArgumentOutOfRangeException">The number of characters is larger than <see cref="int.MaxValue"/>.</exception>
/// <exception cref="ObjectDisposedException">The stream reader has been disposed.</exception>
/// <exception cref="InvalidOperationException">The reader is currently in use by a previous read operation.</exception>
/// <example>
/// The following example shows how to read the contents of a file by using the <see cref="ReadToEndAsync(CancellationToken)"/> method.
/// <code lang="C#">
/// using CancellationTokenSource tokenSource = new (TimeSpan.FromSeconds(1));
/// using StreamReader reader = File.OpenText("existingfile.txt");
///
/// Console.WriteLine(await reader.ReadToEndAsync(tokenSource.Token));
/// </code>
/// </example>
/// <remarks>
/// If this method is canceled via <paramref name="cancellationToken"/>, some data
/// that has been read from the current <see cref="Stream"/> but not stored (by the
/// <see cref="StreamReader"/>) or returned (to the caller) may be lost.
/// </remarks>
public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadToEndAsync();
return base.ReadToEndAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string> task = ReadToEndAsyncInternal();
Task<string> task = ReadToEndAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
}

private async Task<string> ReadToEndAsyncInternal()
private async Task<string> ReadToEndAsyncInternal(CancellationToken cancellationToken)
{
// Call ReadBuffer, then pull data out of charBuffer.
StringBuilder sb = new StringBuilder(_charLen - _charPos);
Expand All @@ -955,7 +1010,7 @@ private async Task<string> ReadToEndAsyncInternal()
int tmpCharPos = _charPos;
sb.Append(_charBuffer, tmpCharPos, _charLen - tmpCharPos);
_charPos = _charLen; // We consumed these characters
await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false);
await ReadBufferAsync(cancellationToken).ConfigureAwait(false);
} while (_charLen > 0);

return sb.ToString();
Expand Down
Loading

0 comments on commit 7483b24

Please sign in to comment.