From 6cb4ca20eeaa6b1f05901c1901ab2141d9897cd6 Mon Sep 17 00:00:00 2001 From: Parminder Kaur <88398605+Kaur-Parminder@users.noreply.github.com> Date: Tue, 19 Oct 2021 10:42:40 -0700 Subject: [PATCH] Move to Shared SqlSequentialStream (#1345) --- .../src/Microsoft.Data.SqlClient.csproj | 4 +- .../Data/SqlClient/SqlSequentialStream.cs | 317 ------------------ .../netfx/src/Microsoft.Data.SqlClient.csproj | 4 +- .../Data/SqlClient/SqlSequentialStream.cs | 137 ++++---- 4 files changed, 70 insertions(+), 392 deletions(-) delete mode 100644 src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs rename src/Microsoft.Data.SqlClient/{netfx => }/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs (87%) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj index 99336a82bc..86baae9553 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft.Data.SqlClient.csproj @@ -554,6 +554,9 @@ Microsoft\Data\SqlClient\EnclavePackage.cs + + Microsoft\Data\SqlClient\SqlSequentialStream.cs + @@ -581,7 +584,6 @@ - Microsoft\Data\SqlClient\SqlStatistics.cs diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs deleted file mode 100644 index f8a3a086ed..0000000000 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs +++ /dev/null @@ -1,317 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Data.Common; - -namespace Microsoft.Data.SqlClient -{ - sealed internal class SqlSequentialStream : System.IO.Stream - { - private SqlDataReader _reader; // The SqlDataReader that we are reading data from - private int _columnIndex; // The index of out column in the table - private Task _currentTask; // Holds the current task being processed - private int _readTimeout; // Read timeout for this stream in ms (for Stream.ReadTimeout) - private CancellationTokenSource _disposalTokenSource; // Used to indicate that a cancellation is requested due to disposal - - internal SqlSequentialStream(SqlDataReader reader, int columnIndex) - { - Debug.Assert(reader != null, "Null reader when creating sequential stream"); - Debug.Assert(columnIndex >= 0, "Invalid column index when creating sequential stream"); - - _reader = reader; - _columnIndex = columnIndex; - _currentTask = null; - _disposalTokenSource = new CancellationTokenSource(); - - // Safely convert the CommandTimeout from seconds to milliseconds - if ((reader.Command != null) && (reader.Command.CommandTimeout != 0)) - { - _readTimeout = (int)Math.Min((long)reader.Command.CommandTimeout * 1000L, (long)int.MaxValue); - } - else - { - _readTimeout = Timeout.Infinite; - } - } - - public override bool CanRead - { - get { return ((_reader != null) && (!_reader.IsClosed)); } - } - - public override bool CanSeek - { - get { return false; } - } - - public override bool CanTimeout - { - get { return true; } - } - - public override bool CanWrite - { - get { return false; } - } - - public override void Flush() - { } - - public override long Length - { - get { throw ADP.NotSupported(); } - } - - public override long Position - { - get { throw ADP.NotSupported(); } - set { throw ADP.NotSupported(); } - } - - public override int ReadTimeout - { - get { return _readTimeout; } - set - { - if ((value > 0) || (value == Timeout.Infinite)) - { - _readTimeout = value; - } - else - { - throw ADP.ArgumentOutOfRange(nameof(value)); - } - } - } - - internal int ColumnIndex - { - get { return _columnIndex; } - } - - public override int Read(byte[] buffer, int offset, int count) - { - ValidateReadParameters(buffer, offset, count); - if (!CanRead) - { - throw ADP.ObjectDisposed(this); - } - if (_currentTask != null) - { - throw ADP.AsyncOperationPending(); - } - - try - { - return _reader.GetBytesInternalSequential(_columnIndex, buffer, offset, count, _readTimeout); - } - catch (SqlException ex) - { - // Stream.Read() can't throw a SqlException - so wrap it in an IOException - throw ADP.ErrorReadingFromStream(ex); - } - } - - - public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - ValidateReadParameters(buffer, offset, count); - - TaskCompletionSource completion = new TaskCompletionSource(); - if (!CanRead) - { - completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this))); - } - else - { - try - { - Task original = Interlocked.CompareExchange(ref _currentTask, completion.Task, null); - if (original != null) - { - completion.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending())); - } - else - { - // Set up a combined cancellation token for both the user's and our disposal tokens - CancellationTokenSource combinedTokenSource; - if (!cancellationToken.CanBeCanceled) - { - // Users token is not cancellable - just use ours - combinedTokenSource = _disposalTokenSource; - } - else - { - // Setup registrations from user and disposal token to cancel the combined token - combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposalTokenSource.Token); - } - - int bytesRead = 0; - Task getBytesTask = null; - var reader = _reader; - if ((reader != null) && (!cancellationToken.IsCancellationRequested) && (!_disposalTokenSource.Token.IsCancellationRequested)) - { - getBytesTask = reader.GetBytesAsync(_columnIndex, buffer, offset, count, _readTimeout, combinedTokenSource.Token, out bytesRead); - } - - if (getBytesTask == null) - { - _currentTask = null; - if (cancellationToken.IsCancellationRequested) - { - completion.SetCanceled(); - } - else if (!CanRead) - { - completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this))); - } - else - { - completion.SetResult(bytesRead); - } - - if (combinedTokenSource != _disposalTokenSource) - { - combinedTokenSource.Dispose(); - } - } - else - { - getBytesTask.ContinueWith((t) => - { - _currentTask = null; - // If we completed, but _reader is null (i.e. the stream is closed), then report cancellation - if ((t.Status == TaskStatus.RanToCompletion) && (CanRead)) - { - completion.SetResult((int)t.Result); - } - else if (t.Status == TaskStatus.Faulted) - { - if (t.Exception.InnerException is SqlException) - { - // Stream.ReadAsync() can't throw a SqlException - so wrap it in an IOException - completion.SetException(ADP.ExceptionWithStackTrace(ADP.ErrorReadingFromStream(t.Exception.InnerException))); - } - else - { - completion.SetException(t.Exception.InnerException); - } - } - else if (!CanRead) - { - completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this))); - } - else - { - completion.SetCanceled(); - } - - if (combinedTokenSource != _disposalTokenSource) - { - combinedTokenSource.Dispose(); - } - }, TaskScheduler.Default); - } - } - } - catch (Exception ex) - { - // In case of any errors, ensure that the completion is completed and the task is set back to null if we switched it - completion.TrySetException(ex); - Interlocked.CompareExchange(ref _currentTask, null, completion.Task); - throw; - } - } - - return completion.Task; - } - - public override IAsyncResult BeginRead(byte[] array, int offset, int count, AsyncCallback asyncCallback, object asyncState) => - TaskToApm.Begin(ReadAsync(array, offset, count, CancellationToken.None), asyncCallback, asyncState); - - public override int EndRead(IAsyncResult asyncResult) => - TaskToApm.End(asyncResult); - - public override long Seek(long offset, System.IO.SeekOrigin origin) - { - throw ADP.NotSupported(); - } - - public override void SetLength(long value) - { - throw ADP.NotSupported(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - throw ADP.NotSupported(); - } - - /// - /// Forces the stream to act as if it was closed (i.e. CanRead=false and Read() throws) - /// This does not actually close the stream, read off the rest of the data or dispose this - /// - internal void SetClosed() - { - _disposalTokenSource.Cancel(); - _reader = null; - - // Wait for pending task - var currentTask = _currentTask; - if (currentTask != null) - { - ((IAsyncResult)currentTask).AsyncWaitHandle.WaitOne(); - } - } - - protected override void Dispose(bool disposing) - { - if (disposing) - { - // Set the stream as closed - SetClosed(); - } - - base.Dispose(disposing); - } - - /// - /// Checks the parameters passed into a Read() method are valid - /// - /// - /// - /// - internal static void ValidateReadParameters(byte[] buffer, int offset, int count) - { - if (buffer == null) - { - throw ADP.ArgumentNull(nameof(buffer)); - } - if (offset < 0) - { - throw ADP.ArgumentOutOfRange(nameof(offset)); - } - if (count < 0) - { - throw ADP.ArgumentOutOfRange(nameof(count)); - } - try - { - if (checked(offset + count) > buffer.Length) - { - throw ExceptionBuilder.InvalidOffsetLength(); - } - } - catch (OverflowException) - { - // If we've overflowed when adding offset and count, then they never would have fit into buffer anyway - throw ExceptionBuilder.InvalidOffsetLength(); - } - } - } -} diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj index fba0bf7f9a..040dde902c 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft.Data.SqlClient.csproj @@ -563,6 +563,9 @@ Microsoft\Data\SqlClient\SqlCredential.cs + + Microsoft\Data\SqlClient\SqlSequentialStream.cs + @@ -573,7 +576,6 @@ - diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs similarity index 87% rename from src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs rename to src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs index e2d0876d7f..c97e51f577 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs +++ b/src/Microsoft.Data.SqlClient/src/Microsoft/Data/SqlClient/SqlSequentialStream.cs @@ -1,4 +1,4 @@ -// Licensed to the .NET Foundation under one or more agreements. +// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. // See the LICENSE file in the project root for more information. @@ -13,10 +13,10 @@ namespace Microsoft.Data.SqlClient sealed internal class SqlSequentialStream : System.IO.Stream { private SqlDataReader _reader; // The SqlDataReader that we are reading data from - private int _columnIndex; // The index of out column in the table + private readonly int _columnIndex; // The index of out column in the table private Task _currentTask; // Holds the current task being processed private int _readTimeout; // Read timeout for this stream in ms (for Stream.ReadTimeout) - private CancellationTokenSource _disposalTokenSource; // Used to indicate that a cancellation is requested due to disposal + private readonly CancellationTokenSource _disposalTokenSource; // Used to indicate that a cancellation is requested due to disposal internal SqlSequentialStream(SqlDataReader reader, int columnIndex) { @@ -28,10 +28,10 @@ internal SqlSequentialStream(SqlDataReader reader, int columnIndex) _currentTask = null; _disposalTokenSource = new CancellationTokenSource(); - // Safely safely convert the CommandTimeout from seconds to milliseconds + // Safely convert the CommandTimeout from seconds to milliseconds if ((reader.Command != null) && (reader.Command.CommandTimeout != 0)) { - _readTimeout = (int)Math.Min((long)reader.Command.CommandTimeout * 1000L, (long)Int32.MaxValue); + _readTimeout = (int)Math.Min((long)reader.Command.CommandTimeout * 1000L, (long)int.MaxValue); } else { @@ -39,43 +39,28 @@ internal SqlSequentialStream(SqlDataReader reader, int columnIndex) } } - public override bool CanRead - { - get { return ((_reader != null) && (!_reader.IsClosed)); } - } + public override bool CanRead => (_reader != null) && (!_reader.IsClosed); - public override bool CanSeek - { - get { return false; } - } + public override bool CanSeek => false; - public override bool CanTimeout - { - get { return true; } - } + public override bool CanTimeout => true; - public override bool CanWrite - { - get { return false; } - } + public override bool CanWrite => false; public override void Flush() { } - public override long Length - { - get { throw ADP.NotSupported(); } - } + public override long Length => throw ADP.NotSupported(); public override long Position { - get { throw ADP.NotSupported(); } - set { throw ADP.NotSupported(); } + get => throw ADP.NotSupported(); + set => throw ADP.NotSupported(); } public override int ReadTimeout { - get { return _readTimeout; } + get => _readTimeout; set { if ((value > 0) || (value == Timeout.Infinite)) @@ -84,15 +69,12 @@ public override int ReadTimeout } else { - throw ADP.ArgumentOutOfRange("value"); + throw ADP.ArgumentOutOfRange(nameof(value)); } } } - internal int ColumnIndex - { - get { return _columnIndex; } - } + internal int ColumnIndex => _columnIndex; public override int Read(byte[] buffer, int offset, int count) { @@ -117,48 +99,12 @@ public override int Read(byte[] buffer, int offset, int count) } } - public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - if (!CanRead) - { - // This is checked in ReadAsync - but its a better for the user if it throw here instead of having to wait for EndRead - throw ADP.ObjectDisposed(this); - } - - Task readTask = ReadAsync(buffer, offset, count, CancellationToken.None); - if (callback != null) - { - readTask.ContinueWith((t) => callback(t), TaskScheduler.Default); - } - return readTask; - } - - public override int EndRead(IAsyncResult asyncResult) - { - if (asyncResult == null) - { - throw ADP.ArgumentNull("asyncResult"); - } - - // Wait for the task to complete - this will also cause any exceptions to be thrown - Task readTask = (Task)asyncResult; - try - { - readTask.Wait(); - } - catch (AggregateException ex) - { - throw ex.InnerException; - } - - return readTask.Result; - } public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { ValidateReadParameters(buffer, offset, count); - TaskCompletionSource completion = new TaskCompletionSource(); + TaskCompletionSource completion = new(); if (!CanRead) { completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this))); @@ -189,7 +135,7 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel int bytesRead = 0; Task getBytesTask = null; - var reader = _reader; + SqlDataReader reader = _reader; if ((reader != null) && (!cancellationToken.IsCancellationRequested) && (!_disposalTokenSource.Token.IsCancellationRequested)) { getBytesTask = reader.GetBytesAsync(_columnIndex, buffer, offset, count, _readTimeout, combinedTokenSource.Token, out bytesRead); @@ -292,7 +238,7 @@ internal void SetClosed() _reader = null; // Wait for pending task - var currentTask = _currentTask; + Task currentTask = _currentTask; if (currentTask != null) { ((IAsyncResult)currentTask).AsyncWaitHandle.WaitOne(); @@ -311,7 +257,7 @@ protected override void Dispose(bool disposing) } /// - /// Checks the the parameters passed into a Read() method are valid + /// Checks the parameters passed into a Read() method are valid /// /// /// @@ -343,5 +289,50 @@ internal static void ValidateReadParameters(byte[] buffer, int offset, int count throw ExceptionBuilder.InvalidOffsetLength(); } } + +#if NETFRAMEWORK + public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) + { + if (!CanRead) + { + // This is checked in ReadAsync - but its a better for the user if it throw here instead of having to wait for EndRead + throw ADP.ObjectDisposed(this); + } + + Task readTask = ReadAsync(buffer, offset, count, CancellationToken.None); + if (callback != null) + { + readTask.ContinueWith((t) => callback(t), TaskScheduler.Default); + } + return readTask; + } + + public override int EndRead(IAsyncResult asyncResult) + { + if (asyncResult == null) + { + throw ADP.ArgumentNull("asyncResult"); + } + + // Wait for the task to complete - this will also cause any exceptions to be thrown + Task readTask = (Task)asyncResult; + try + { + readTask.Wait(); + } + catch (AggregateException ex) + { + throw ex.InnerException; + } + + return readTask.Result; + } +#else + public override IAsyncResult BeginRead(byte[] array, int offset, int count, AsyncCallback asyncCallback, object asyncState) => + TaskToApm.Begin(ReadAsync(array, offset, count, CancellationToken.None), asyncCallback, asyncState); + + public override int EndRead(IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); +#endif } }