Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CancellationToken to TextReader.ReadXAsync #61898

Merged
merged 10 commits into from
Jan 25, 2022
15 changes: 15 additions & 0 deletions src/libraries/System.Console/src/System/IO/SyncTextReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO
Expand Down Expand Up @@ -95,11 +96,25 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
return cancellationToken.IsCancellationRequested ?
ValueTask.FromCanceled<string?>(cancellationToken) :
new ValueTask<string?>(ReadLine());
}

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
return cancellationToken.IsCancellationRequested ?
Task.FromCanceled<string>(cancellationToken) :
Task.FromResult(ReadToEnd());
}

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
58 changes: 58 additions & 0 deletions src/libraries/System.IO/tests/StreamReader/StreamReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,43 @@ public async Task ReadToEndAsync()
Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCancellationToken()
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
using var sw = new StreamReader(GetLargeStream());
var result = await sw.ReadToEndAsync(default);

Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var sw = new StreamReader(GetLargeStream());
using var cts = new CancellationTokenSource();
cts.Cancel();
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await sw.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public async Task ReadToEndAsync_WithCancellation()
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
{
string path = GetTestFilePath();

// create large (~100MB) file
File.WriteAllLines(path, Enumerable.Repeat("A very large file used for testing StreamReader cancellation. 0123456789012345678901234567890123456789.", 1_000_000));

using StreamReader reader = File.OpenText(path);
using CancellationTokenSource cts = new (TimeSpan.FromMilliseconds(50));
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await reader.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public void GetBaseStream()
{
Expand Down Expand Up @@ -301,6 +338,27 @@ public void VanillaReadLines2()
Assert.Equal(valueString.Substring(1, valueString.IndexOf('\r') - 1), data);
}

[Fact]
public async Task VanillaReadLineAsync()
{
var baseInfo = GetCharArrayStream();
var sr = baseInfo.Item2;

string valueString = new string(baseInfo.Item1);

var data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(0, valueString.IndexOf('\r')), data);

data = await sr.ReadLineAsync(default);
Assert.Equal(valueString.Substring(valueString.IndexOf('\r') + 1, 3), data);

data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(valueString.IndexOf('\n') + 1, 2), data);

data = await sr.ReadLineAsync(default);
Assert.Equal((valueString.Substring(valueString.LastIndexOf('\n') + 1)), data);
}

[Fact]
public async Task ContinuousNewLinesAndTabsAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public static void ReadLine()
}
}

[Fact]
public static async Task ReadLineAsync()
{
string str1 = "Hello\0\t\v \\ World";
string str2 = str1 + Environment.NewLine + str1;

using (StringReader sr = new StringReader(str1))
{
Assert.Equal(str1, await sr.ReadLineAsync());
}
using (StringReader sr = new StringReader(str2))
{
Assert.Equal(str1, await sr.ReadLineAsync(default));
Assert.Equal(str1, await sr.ReadLineAsync(default));
}
}

[Fact]
public static void ReadPseudoRandomString()
{
Expand Down Expand Up @@ -155,6 +172,14 @@ public static void ReadToEndPseudoRandom() {
Assert.Equal(str1, sr.ReadToEnd());
}

[Fact]
public static async Task ReadToEndAsyncString()
{
string str1 = "Hello\0\t\v \\ World";
StringReader sr = new StringReader(str1);
Assert.Equal(str1, await sr.ReadToEndAsync(default));
}

[Fact]
public static void Closed_DisposedExceptions()
{
Expand Down Expand Up @@ -278,6 +303,8 @@ public async Task Precanceled_ThrowsException()

await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadBlockAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadLineAsync(new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadToEndAsync(new CancellationToken(true)));
}

private static void ValidateDisposedExceptions(StringReader sr)
Expand Down
21 changes: 21 additions & 0 deletions src/libraries/System.IO/tests/TextReader/TextReaderTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand Down Expand Up @@ -54,6 +55,26 @@ public async Task ReadToEndAsync()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
var result = await tr.ReadToEndAsync(default);
Assert.Equal(5000, result.Length);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
using var cts = new CancellationTokenSource();
cts.Cancel();
var token = cts.Token;

var ex = await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await tr.ReadToEndAsync(token));
Assert.Equal(token, ex.CancellationToken);
}

[Fact]
public void TestRead()
{
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/System.Private.CoreLib/src/System/IO/File.cs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private static async Task<string[]> InternalReadAllLinesAsync(string path, Encod
cancellationToken.ThrowIfCancellationRequested();
string? line;
List<string> lines = new List<string>();
while ((line = await sr.ReadLineAsync().ConfigureAwait(false)) != null)
while ((line = await sr.ReadLineAsync(cancellationToken).ConfigureAwait(false)) != null)
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
lines.Add(line);
cancellationToken.ThrowIfCancellationRequested();
Expand Down
81 changes: 68 additions & 13 deletions src/libraries/System.Private.CoreLib/src/System/IO/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -845,29 +845,57 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
return sb.ToString();
}

public override Task<string?> ReadLineAsync()
public override Task<string?> ReadLineAsync() =>
ReadLineAsync(default).AsTask();

/// <summary>
/// Reads a line of characters asynchronously from the current stream and returns the data as a string.
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A value task that represents the asynchronous read operation. The value of the <c>TResult</c>
/// parameter contains the next line from the stream, or is <c>null</c> if all of the characters have been read.</returns>
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
/// <exception cref="ArgumentOutOfRangeException">The number of characters in the next line is larger than <see cref="int.MaxValue"/>.</exception>
/// <exception cref="ObjectDisposedException">The stream reader has been disposed.</exception>
/// <exception cref="InvalidOperationException">The reader is currently in use by a previous read operation.</exception>
/// <example>
/// The following example shows how to read the first line of a file by using the <see cref="ReadLineAsync(CancellationToken)"/> method.
/// <code lang="C#">
/// using var tokenSource = new CancellationTokenSource();
/// using var reader = File.OpenText("existingfile.txt");
/// Console.WriteLine("Opened file.");
///
/// var result = await reader.ReadLineAsync(tokenSource.Token);
/// Console.WriteLine("First line contains: " + result);
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
/// </code>
/// </example>
/// <remarks>
/// <para>If this method is canceled via <paramref name="cancellationToken"/>, some data
/// that has been read from the current <see cref="Stream"/> but not stored (by the
/// <see cref="StreamReader"/>) or returned (to the caller) may be lost.</para>
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
/// </remarks>
public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadLineAsync();
return base.ReadLineAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string?> task = ReadLineAsyncInternal();
Task<string?> task = ReadLineAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
return new ValueTask<string?>(task);
}

private async Task<string?> ReadLineAsyncInternal()
private async Task<string?> ReadLineAsyncInternal(CancellationToken cancellationToken)
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
{
if (_charPos == _charLen && (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) == 0)
if (_charPos == _charLen && (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) == 0)
{
return null;
}
Expand Down Expand Up @@ -903,7 +931,7 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)

_charPos = tmpCharPos = i + 1;

if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) > 0))
if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) > 0))
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
tmpCharPos = _charPos;
if (_charBuffer[tmpCharPos] == '\n')
Expand All @@ -921,32 +949,59 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
i = tmpCharLen - tmpCharPos;
sb ??= new StringBuilder(i + 80);
sb.Append(tmpCharBuffer, tmpCharPos, i);
} while (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false) > 0);
} while (await ReadBufferAsync(cancellationToken).ConfigureAwait(false) > 0);

return sb.ToString();
}

public override Task<string> ReadToEndAsync()
public override Task<string> ReadToEndAsync() => ReadToEndAsync(default);

/// <summary>
/// Reads all characters from the current position to the end of the stream asynchronously and returns them as one string.
/// </summary>
/// <param name="cancellationToken">The token to monitor for cancellation requests.</param>
/// <returns>A task that represents the asynchronous read operation. The value of the <c>TResult</c> parameter contains
/// a string with the characters from the current position to the end of the stream.</returns>
/// <exception cref="ArgumentOutOfRangeException">The number of characters is larger than <see cref="int.MaxValue"/>.</exception>
/// <exception cref="ObjectDisposedException">The stream reader has been disposed.</exception>
/// <exception cref="InvalidOperationException">The reader is currently in use by a previous read operation.</exception>
/// <example>
/// The following example shows how to read the contents of a file by using the <see cref="ReadToEndAsync(CancellationToken)"/> method.
/// <code lang="C#">
/// using var tokenSource = new CancellationTokenSource();
/// using var reader = File.OpenText("existingfile.txt");
/// Console.WriteLine("Opened file.");
///
/// var result = await reader.ReadToEndAsync(tokenSource.Token);
/// Console.WriteLine("Contains: " + result);
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
/// </code>
/// </example>
/// <remarks>
/// <para>If this method is canceled via <paramref name="cancellationToken"/>, some data
/// that has been read from the current <see cref="Stream"/> but not stored (by the
/// <see cref="StreamReader"/>) or returned (to the caller) may be lost.</para>
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
/// </remarks>
public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadToEndAsync();
return base.ReadToEndAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string> task = ReadToEndAsyncInternal();
Task<string> task = ReadToEndAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
}

private async Task<string> ReadToEndAsyncInternal()
private async Task<string> ReadToEndAsyncInternal(CancellationToken cancellationToken)
{
// Call ReadBuffer, then pull data out of charBuffer.
StringBuilder sb = new StringBuilder(_charLen - _charPos);
Expand All @@ -955,7 +1010,7 @@ private async Task<string> ReadToEndAsyncInternal()
int tmpCharPos = _charPos;
sb.Append(_charBuffer, tmpCharPos, _charLen - tmpCharPos);
_charPos = _charLen; // We consumed these characters
await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false);
await ReadBufferAsync(cancellationToken).ConfigureAwait(false);
} while (_charLen > 0);

return sb.ToString();
Expand Down
Loading