Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CancellationToken to TextReader.ReadXAsync #61898

Merged
merged 10 commits into from
Jan 25, 2022
11 changes: 11 additions & 0 deletions src/libraries/System.Console/src/System/IO/SyncTextReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO
Expand Down Expand Up @@ -95,11 +96,21 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
return cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled<string?>(cancellationToken) : new ValueTask<string?>(ReadLine());
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
}

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
return cancellationToken.IsCancellationRequested ? Task.FromCanceled<string>(cancellationToken) : Task.FromResult(ReadToEnd());
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
}

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
68 changes: 68 additions & 0 deletions src/libraries/System.IO/tests/StreamReader/StreamReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,53 @@ public async Task ReadToEndAsync()
Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCancellationToken()
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
using var sw = new StreamReader(GetLargeStream());
var result = await sw.ReadToEndAsync(default);

Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var sw = new StreamReader(GetLargeStream());
using var cts = new CancellationTokenSource();
cts.Cancel();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await sw.ReadToEndAsync(cts.Token));
}

[Fact]
public async Task ReadToEndAsync_WithCancellation()
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
{
var path = Path.GetTempFileName();
try
{
// create large (~100MB) file
using (var writer = new StreamWriter(path))
{
for (var i = 0; i < 1_000_000; i++)
writer.WriteLine("A very large file used for testing StreamReader cancellation. 0123456789012345678901234567890123456789.");
}

using var reader = File.OpenText(path);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50));
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await reader.ReadToEndAsync(cts.Token));
}
finally
{
try
{
File.Delete(path);
}
catch (Exception)
{
}
}
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public void GetBaseStream()
{
Expand Down Expand Up @@ -301,6 +348,27 @@ public void VanillaReadLines2()
Assert.Equal(valueString.Substring(1, valueString.IndexOf('\r') - 1), data);
}

[Fact]
public async Task VanillaReadLineAsync()
{
var baseInfo = GetCharArrayStream();
var sr = baseInfo.Item2;

string valueString = new string(baseInfo.Item1);

var data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(0, valueString.IndexOf('\r')), data);

data = await sr.ReadLineAsync(default);
Assert.Equal(valueString.Substring(valueString.IndexOf('\r') + 1, 3), data);

data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(valueString.IndexOf('\n') + 1, 2), data);

data = await sr.ReadLineAsync(default);
Assert.Equal((valueString.Substring(valueString.LastIndexOf('\n') + 1)), data);
}

[Fact]
public async Task ContinuousNewLinesAndTabsAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public static void ReadLine()
}
}

[Fact]
public static async Task ReadLineAsync()
{
string str1 = "Hello\0\t\v \\ World";
string str2 = str1 + Environment.NewLine + str1;

using (StringReader sr = new StringReader(str1))
{
Assert.Equal(str1, await sr.ReadLineAsync());
}
using (StringReader sr = new StringReader(str2))
{
Assert.Equal(str1, await sr.ReadLineAsync(default));
Assert.Equal(str1, await sr.ReadLineAsync(default));
}
}

[Fact]
public static void ReadPseudoRandomString()
{
Expand Down Expand Up @@ -155,6 +172,14 @@ public static void ReadToEndPseudoRandom() {
Assert.Equal(str1, sr.ReadToEnd());
}

[Fact]
public static async Task ReadToEndAsyncString()
{
string str1 = "Hello\0\t\v \\ World";
StringReader sr = new StringReader(str1);
Assert.Equal(str1, await sr.ReadToEndAsync(default));
}

[Fact]
public static void Closed_DisposedExceptions()
{
Expand Down Expand Up @@ -278,6 +303,8 @@ public async Task Precanceled_ThrowsException()

await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadBlockAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadLineAsync(new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadToEndAsync(new CancellationToken(true)));
}

private static void ValidateDisposedExceptions(StringReader sr)
Expand Down
18 changes: 18 additions & 0 deletions src/libraries/System.IO/tests/TextReader/TextReaderTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand Down Expand Up @@ -54,6 +55,23 @@ public async Task ReadToEndAsync()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
var result = await tr.ReadToEndAsync(default);
Assert.Equal(5000, result.Length);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
using var cts = new CancellationTokenSource();
cts.Cancel();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await tr.ReadToEndAsync(cts.Token));
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public void TestRead()
{
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/System.Private.CoreLib/src/System/IO/File.cs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private static async Task<string[]> InternalReadAllLinesAsync(string path, Encod
cancellationToken.ThrowIfCancellationRequested();
string? line;
List<string> lines = new List<string>();
while ((line = await sr.ReadLineAsync().ConfigureAwait(false)) != null)
while ((line = await sr.ReadLineAsync(cancellationToken).ConfigureAwait(false)) != null)
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
lines.Add(line);
cancellationToken.ThrowIfCancellationRequested();
Expand Down
32 changes: 19 additions & 13 deletions src/libraries/System.Private.CoreLib/src/System/IO/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -845,29 +845,32 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
return sb.ToString();
}

public override Task<string?> ReadLineAsync()
public override Task<string?> ReadLineAsync() =>
ReadLineAsync(default).AsTask();

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadLineAsync();
return base.ReadLineAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string?> task = ReadLineAsyncInternal();
Task<string?> task = ReadLineAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
return new ValueTask<string?>(task);
}

private async Task<string?> ReadLineAsyncInternal()
private async Task<string?> ReadLineAsyncInternal(CancellationToken cancellationToken)
adamsitnik marked this conversation as resolved.
Show resolved Hide resolved
{
if (_charPos == _charLen && (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) == 0)
if (_charPos == _charLen && (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) == 0)
{
return null;
}
Expand Down Expand Up @@ -903,7 +906,7 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)

_charPos = tmpCharPos = i + 1;

if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) > 0))
if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) > 0))
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
{
tmpCharPos = _charPos;
if (_charBuffer[tmpCharPos] == '\n')
Expand All @@ -921,32 +924,35 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
i = tmpCharLen - tmpCharPos;
sb ??= new StringBuilder(i + 80);
sb.Append(tmpCharBuffer, tmpCharPos, i);
} while (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false) > 0);
} while (await ReadBufferAsync(cancellationToken).ConfigureAwait(false) > 0);

return sb.ToString();
}

public override Task<string> ReadToEndAsync()
public override Task<string> ReadToEndAsync() =>
ReadToEndAsync(default);
bgrainger marked this conversation as resolved.
Show resolved Hide resolved

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadToEndAsync();
return base.ReadToEndAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string> task = ReadToEndAsyncInternal();
Task<string> task = ReadToEndAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
}

private async Task<string> ReadToEndAsyncInternal()
private async Task<string> ReadToEndAsyncInternal(CancellationToken cancellationToken)
{
// Call ReadBuffer, then pull data out of charBuffer.
StringBuilder sb = new StringBuilder(_charLen - _charPos);
Expand All @@ -955,7 +961,7 @@ private async Task<string> ReadToEndAsyncInternal()
int tmpCharPos = _charPos;
sb.Append(_charBuffer, tmpCharPos, _charLen - tmpCharPos);
_charPos = _charLen; // We consumed these characters
await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false);
await ReadBufferAsync(cancellationToken).ConfigureAwait(false);
} while (_charLen > 0);

return sb.ToString();
Expand Down
10 changes: 10 additions & 0 deletions src/libraries/System.Private.CoreLib/src/System/IO/StringReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,21 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) =>
cancellationToken.IsCancellationRequested
? ValueTask.FromCanceled<string?>(cancellationToken)
: new ValueTask<string?>(ReadLine());

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken) =>
cancellationToken.IsCancellationRequested
? Task.FromCanceled<string>(cancellationToken)
: Task.FromResult(ReadToEnd());

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
22 changes: 18 additions & 4 deletions src/libraries/System.Private.CoreLib/src/System/IO/TextReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,26 @@ public virtual int ReadBlock(Span<char> buffer)
}

#region Task based Async APIs
public virtual Task<string?> ReadLineAsync() =>
public virtual Task<string?> ReadLineAsync() => ReadLineCoreAsync(default);

public virtual ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) =>
bgrainger marked this conversation as resolved.
Show resolved Hide resolved
new ValueTask<string?>(ReadLineCoreAsync(cancellationToken));

private Task<string?> ReadLineCoreAsync(CancellationToken cancellationToken) =>
Task<string?>.Factory.StartNew(static state => ((TextReader)state!).ReadLine(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

public virtual async Task<string> ReadToEndAsync()
public virtual Task<string> ReadToEndAsync() =>
ReadToEndAsync(default);
bgrainger marked this conversation as resolved.
Show resolved Hide resolved

public virtual async Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
var sb = new StringBuilder(4096);
char[] chars = ArrayPool<char>.Shared.Rent(4096);
try
{
int len;
while ((len = await ReadAsyncInternal(chars, default).ConfigureAwait(false)) != 0)
while ((len = await ReadAsyncInternal(chars, cancellationToken).ConfigureAwait(false)) != 0)
{
sb.Append(chars, 0, len);
}
Expand Down Expand Up @@ -368,9 +376,15 @@ protected override void Dispose(bool disposing)
[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string?> ReadLineAsync() => Task.FromResult(ReadLine());

[MethodImpl(MethodImplOptions.Synchronized)]
public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled<string?>(cancellationToken) : new ValueTask<string?>(ReadLine());
bgrainger marked this conversation as resolved.
Show resolved Hide resolved

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string> ReadToEndAsync() => Task.FromResult(ReadToEnd());

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string> ReadToEndAsync(CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? Task.FromCanceled<string>(cancellationToken) : Task.FromResult(ReadToEnd());
bgrainger marked this conversation as resolved.
Show resolved Hide resolved

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
Expand Down
Loading