Skip to content
This repository has been archived by the owner on Jan 23, 2023. It is now read-only.
/ corefx Public archive

SqlClient managed networking improvements #35363

Merged
merged 10 commits into from
Jun 4, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ internal abstract class SNIHandle
/// </summary>
public abstract Guid ConnectionId { get; }

public virtual int ReserveHeaderSize => 0;

#if DEBUG
/// <summary>
/// Test handle for killing underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public uint SendAsync(SNIPacket packet, SNIAsyncCallback callback)
/// <returns>SNI error code</returns>
public uint ReceiveAsync(ref SNIPacket packet)
{
if (packet != null)
{
packet.Release();
packet = null;
}
lock (this)
{
return _lowerHandle.ReceiveAsync(ref packet);
Expand Down Expand Up @@ -133,7 +138,7 @@ public void HandleReceiveError(SNIPacket packet)
{
handle.HandleReceiveError(packet);
}
packet?.Dispose();
packet?.Release();
}

/// <summary>
Expand Down Expand Up @@ -183,8 +188,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (bytesTaken == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand All @@ -199,7 +202,7 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

_currentHeader.Read(_headerBytes);
_dataBytesLeft = (int)_currentHeader.length;
_currentPacket = new SNIPacket((int)_currentHeader.length);
_currentPacket = new SNIPacket(headerSize: 0, dataSize: (int)_currentHeader.length);
}

currentHeader = _currentHeader;
Expand All @@ -214,8 +217,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (_dataBytesLeft > 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down Expand Up @@ -271,8 +272,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
{
if (packet.DataLeft == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// MARS handle
/// </summary>
internal class SNIMarsHandle : SNIHandle
internal sealed class SNIMarsHandle : SNIHandle
{
private const uint ACK_THRESHOLD = 2;

Expand All @@ -33,27 +33,11 @@ internal class SNIMarsHandle : SNIHandle
private uint _sequenceNumber;
private SNIError _connectionError;

/// <summary>
/// Connection ID
/// </summary>
public override Guid ConnectionId
{
get
{
return _connectionId;
}
}
public override Guid ConnectionId => _connectionId;

/// <summary>
/// Handle status
/// </summary>
public override uint Status
{
get
{
return _status;
}
}
public override uint Status => _status;

public override int ReserveHeaderSize => SNISMUXHeader.HEADER_LENGTH;

/// <summary>
/// Dispose object
Expand Down Expand Up @@ -93,48 +77,41 @@ public SNIMarsHandle(SNIMarsConnection connection, ushort sessionId, object call
/// <param name="flags">SMUX header flags</param>
private void SendControlPacket(SNISMUXFlags flags)
{
Span<byte> headerBytes = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
SNIPacket packet = new SNIPacket(headerSize: SNISMUXHeader.HEADER_LENGTH, dataSize: 0);
lock (this)
{
GetSMUXHeaderBytes(0, flags, headerBytes);
SetupSMUXHeader(0, flags);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();
}

SNIPacket packet = new SNIPacket(SNISMUXHeader.HEADER_LENGTH);
packet.AppendData(headerBytes);

_connection.Send(packet);
}

private void GetSMUXHeaderBytes(int length, SNISMUXFlags flags, Span<byte> bytes)
private void SetupSMUXHeader(int length, SNISMUXFlags flags)
{
Debug.Assert(Monitor.IsEntered(this), "must take lock on self before updating mux header");
_currentHeader.SMID = 83;
_currentHeader.flags = (byte)flags;
_currentHeader.sessionId = _sessionId;
_currentHeader.length = (uint)SNISMUXHeader.HEADER_LENGTH + (uint)length;
_currentHeader.sequenceNumber = ((flags == SNISMUXFlags.SMUX_FIN) || (flags == SNISMUXFlags.SMUX_ACK)) ? _sequenceNumber - 1 : _sequenceNumber++;
_currentHeader.highwater = _receiveHighwater;
_receiveHighwaterLastAck = _currentHeader.highwater;

_currentHeader.Write(bytes);
}

/// <summary>
/// Generate a packet with SMUX header
/// </summary>
/// <param name="packet">SNI packet</param>
/// <returns>Encapsulated SNI packet</returns>
private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>The packet with the SMUx header set.</returns>
private SNIPacket SetPacketSMUXHeader(SNIPacket packet)
{
uint xSequenceNumber = _sequenceNumber;
Span<byte> header = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
GetSMUXHeaderBytes(packet.Length, SNISMUXFlags.SMUX_DATA, header);
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to mux packet without mux reservation");


SNIPacket smuxPacket = new SNIPacket(SNISMUXHeader.HEADER_LENGTH + packet.Length);
smuxPacket.AppendData(header);
smuxPacket.AppendPacket(packet);
packet.Dispose();
return smuxPacket;
SetupSMUXHeader(packet.Length, SNISMUXFlags.SMUX_DATA);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();
return packet;
}

/// <summary>
Expand All @@ -144,6 +121,8 @@ private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>SNI error code</returns>
public override uint Send(SNIPacket packet)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in Send");

while (true)
{
lock (this)
Expand All @@ -161,9 +140,13 @@ public override uint Send(SNIPacket packet)
_ackEvent.Reset();
}
}
SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

return _connection.Send(encapsulatedPacket);
SNIPacket muxedPacket = null;
lock (this)
{
muxedPacket = SetPacketSMUXHeader(packet);
}
return _connection.Send(muxedPacket);
}

/// <summary>
Expand All @@ -174,25 +157,17 @@ public override uint Send(SNIPacket packet)
/// <returns>SNI error code</returns>
private uint InternalSendAsync(SNIPacket packet, SNIAsyncCallback callback)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in InternalSendAsync");
lock (this)
{
if (_sequenceNumber >= _sendHighwater)
{
return TdsEnums.SNI_QUEUE_FULL;
}

SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

if (callback != null)
{
encapsulatedPacket.SetCompletionCallback(callback);
}
else
{
encapsulatedPacket.SetCompletionCallback(HandleSendComplete);
}

return _connection.SendAsync(encapsulatedPacket, callback);
SNIPacket muxedPacket = SetPacketSMUXHeader(packet);
muxedPacket.SetCompletionCallback(callback ?? HandleSendComplete);
return _connection.SendAsync(muxedPacket, callback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// Named Pipe connection handle
/// </summary>
internal class SNINpHandle : SNIHandle
internal sealed class SNINpHandle : SNIHandle
{
internal const string DefaultPipePath = @"sql\query"; // e.g. \\HOSTNAME\pipe\sql\query
private const int MAX_PIPE_INSTANCES = 255;

private readonly string _targetServer;
private readonly object _callbackObject;

private Stream _stream;
private NamedPipeClientStream _pipeStream;
private SslOverTdsStream _sslOverTdsStream;
Expand Down Expand Up @@ -61,13 +61,13 @@ public SNINpHandle(string serverName, string pipeName, long timerExpire, object
_pipeStream.Connect((int)ts.TotalMilliseconds);
}
}
catch(TimeoutException te)
catch (TimeoutException te)
{
SNICommon.ReportSNIError(SNIProviders.NP_PROV, SNICommon.ConnOpenFailedError, te);
_status = TdsEnums.SNI_ERROR;
return;
}
catch(IOException ioe)
catch (IOException ioe)
{
SNICommon.ReportSNIError(SNIProviders.NP_PROV, SNICommon.ConnOpenFailedError, ioe);
_status = TdsEnums.SNI_ERROR;
Expand Down Expand Up @@ -150,7 +150,7 @@ public override uint Receive(out SNIPacket packet, int timeout)
packet = null;
try
{
packet = new SNIPacket(_bufferSize);
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);
packet.ReadFromStream(_stream);

if (packet.Length == 0)
Expand All @@ -174,8 +174,8 @@ public override uint Receive(out SNIPacket packet, int timeout)

public override uint ReceiveAsync(ref SNIPacket packet)
{
packet = new SNIPacket(_bufferSize);
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);

try
{
packet.ReadFromStreamAsync(_stream, _receiveCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
bool error = false;
try
{
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await valueTask.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -45,13 +45,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);
ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, _headerLength, _dataCapacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
_dataLength = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand Down Expand Up @@ -88,11 +88,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);
ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, _headerLength, _dataLength), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
Expand All @@ -103,7 +103,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
bool error = false;
try
{
packet._length = await task.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await task.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -45,13 +45,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);
Task<int> t = stream.ReadAsync(_data, _headerLength, _dataCapacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
_length = t.Result;
_dataLength = t.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand Down Expand Up @@ -88,11 +88,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);
Task t = stream.WriteAsync(_data, _headerLength, _dataLength, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
Expand All @@ -103,7 +103,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Loading