Skip to content

Commit

Permalink
Merge pull request dotnet/corefx#35363 from Wraith2/sqlperf-managedsmux
Browse files Browse the repository at this point in the history
SqlClient managed networking improvements, all test passed

Commit migrated from dotnet/corefx@2469a91
  • Loading branch information
Gary-Zh authored Jun 4, 2019
2 parents 0678c26 + 5c72220 commit 87e7f41
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 429 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ internal abstract class SNIHandle
/// </summary>
public abstract Guid ConnectionId { get; }

public virtual int ReserveHeaderSize => 0;

#if DEBUG
/// <summary>
/// Test handle for killing underlying connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public uint SendAsync(SNIPacket packet, SNIAsyncCallback callback)
/// <returns>SNI error code</returns>
public uint ReceiveAsync(ref SNIPacket packet)
{
if (packet != null)
{
packet.Release();
packet = null;
}
lock (this)
{
return _lowerHandle.ReceiveAsync(ref packet);
Expand Down Expand Up @@ -133,7 +138,7 @@ public void HandleReceiveError(SNIPacket packet)
{
handle.HandleReceiveError(packet);
}
packet?.Dispose();
packet?.Release();
}

/// <summary>
Expand Down Expand Up @@ -183,8 +188,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (bytesTaken == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand All @@ -199,7 +202,7 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

_currentHeader.Read(_headerBytes);
_dataBytesLeft = (int)_currentHeader.length;
_currentPacket = new SNIPacket((int)_currentHeader.length);
_currentPacket = new SNIPacket(headerSize: 0, dataSize: (int)_currentHeader.length);
}

currentHeader = _currentHeader;
Expand All @@ -214,8 +217,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)

if (_dataBytesLeft > 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down Expand Up @@ -271,8 +272,6 @@ public void HandleReceiveComplete(SNIPacket packet, uint sniErrorCode)
{
if (packet.DataLeft == 0)
{
packet.Dispose();
packet = null;
sniErrorCode = ReceiveAsync(ref packet);

if (sniErrorCode == TdsEnums.SNI_SUCCESS_IO_PENDING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// MARS handle
/// </summary>
internal class SNIMarsHandle : SNIHandle
internal sealed class SNIMarsHandle : SNIHandle
{
private const uint ACK_THRESHOLD = 2;

Expand All @@ -33,27 +33,11 @@ internal class SNIMarsHandle : SNIHandle
private uint _sequenceNumber;
private SNIError _connectionError;

/// <summary>
/// Connection ID
/// </summary>
public override Guid ConnectionId
{
get
{
return _connectionId;
}
}
public override Guid ConnectionId => _connectionId;

/// <summary>
/// Handle status
/// </summary>
public override uint Status
{
get
{
return _status;
}
}
public override uint Status => _status;

public override int ReserveHeaderSize => SNISMUXHeader.HEADER_LENGTH;

/// <summary>
/// Dispose object
Expand Down Expand Up @@ -93,48 +77,41 @@ public SNIMarsHandle(SNIMarsConnection connection, ushort sessionId, object call
/// <param name="flags">SMUX header flags</param>
private void SendControlPacket(SNISMUXFlags flags)
{
Span<byte> headerBytes = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
SNIPacket packet = new SNIPacket(headerSize: SNISMUXHeader.HEADER_LENGTH, dataSize: 0);
lock (this)
{
GetSMUXHeaderBytes(0, flags, headerBytes);
SetupSMUXHeader(0, flags);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();
}

SNIPacket packet = new SNIPacket(SNISMUXHeader.HEADER_LENGTH);
packet.AppendData(headerBytes);

_connection.Send(packet);
}

private void GetSMUXHeaderBytes(int length, SNISMUXFlags flags, Span<byte> bytes)
private void SetupSMUXHeader(int length, SNISMUXFlags flags)
{
Debug.Assert(Monitor.IsEntered(this), "must take lock on self before updating mux header");
_currentHeader.SMID = 83;
_currentHeader.flags = (byte)flags;
_currentHeader.sessionId = _sessionId;
_currentHeader.length = (uint)SNISMUXHeader.HEADER_LENGTH + (uint)length;
_currentHeader.sequenceNumber = ((flags == SNISMUXFlags.SMUX_FIN) || (flags == SNISMUXFlags.SMUX_ACK)) ? _sequenceNumber - 1 : _sequenceNumber++;
_currentHeader.highwater = _receiveHighwater;
_receiveHighwaterLastAck = _currentHeader.highwater;

_currentHeader.Write(bytes);
}

/// <summary>
/// Generate a packet with SMUX header
/// </summary>
/// <param name="packet">SNI packet</param>
/// <returns>Encapsulated SNI packet</returns>
private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>The packet with the SMUx header set.</returns>
private SNIPacket SetPacketSMUXHeader(SNIPacket packet)
{
uint xSequenceNumber = _sequenceNumber;
Span<byte> header = stackalloc byte[SNISMUXHeader.HEADER_LENGTH];
GetSMUXHeaderBytes(packet.Length, SNISMUXFlags.SMUX_DATA, header);
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to mux packet without mux reservation");


SNIPacket smuxPacket = new SNIPacket(SNISMUXHeader.HEADER_LENGTH + packet.Length);
smuxPacket.AppendData(header);
smuxPacket.AppendPacket(packet);
packet.Dispose();
return smuxPacket;
SetupSMUXHeader(packet.Length, SNISMUXFlags.SMUX_DATA);
_currentHeader.Write(packet.GetHeaderBuffer(SNISMUXHeader.HEADER_LENGTH));
packet.SetHeaderActive();
return packet;
}

/// <summary>
Expand All @@ -144,6 +121,8 @@ private SNIPacket GetSMUXEncapsulatedPacket(SNIPacket packet)
/// <returns>SNI error code</returns>
public override uint Send(SNIPacket packet)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in Send");

while (true)
{
lock (this)
Expand All @@ -161,9 +140,13 @@ public override uint Send(SNIPacket packet)
_ackEvent.Reset();
}
}
SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

return _connection.Send(encapsulatedPacket);
SNIPacket muxedPacket = null;
lock (this)
{
muxedPacket = SetPacketSMUXHeader(packet);
}
return _connection.Send(muxedPacket);
}

/// <summary>
Expand All @@ -174,25 +157,17 @@ public override uint Send(SNIPacket packet)
/// <returns>SNI error code</returns>
private uint InternalSendAsync(SNIPacket packet, SNIAsyncCallback callback)
{
Debug.Assert(packet.ReservedHeaderSize == SNISMUXHeader.HEADER_LENGTH, "mars handle attempting to send muxed packet without mux reservation in InternalSendAsync");
lock (this)
{
if (_sequenceNumber >= _sendHighwater)
{
return TdsEnums.SNI_QUEUE_FULL;
}

SNIPacket encapsulatedPacket = GetSMUXEncapsulatedPacket(packet);

if (callback != null)
{
encapsulatedPacket.SetCompletionCallback(callback);
}
else
{
encapsulatedPacket.SetCompletionCallback(HandleSendComplete);
}

return _connection.SendAsync(encapsulatedPacket, callback);
SNIPacket muxedPacket = SetPacketSMUXHeader(packet);
muxedPacket.SetCompletionCallback(callback ?? HandleSendComplete);
return _connection.SendAsync(muxedPacket, callback);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ namespace System.Data.SqlClient.SNI
/// <summary>
/// Named Pipe connection handle
/// </summary>
internal class SNINpHandle : SNIHandle
internal sealed class SNINpHandle : SNIHandle
{
internal const string DefaultPipePath = @"sql\query"; // e.g. \\HOSTNAME\pipe\sql\query
private const int MAX_PIPE_INSTANCES = 255;

private readonly string _targetServer;
private readonly object _callbackObject;

private Stream _stream;
private NamedPipeClientStream _pipeStream;
private SslOverTdsStream _sslOverTdsStream;
Expand Down Expand Up @@ -61,13 +61,13 @@ public SNINpHandle(string serverName, string pipeName, long timerExpire, object
_pipeStream.Connect((int)ts.TotalMilliseconds);
}
}
catch(TimeoutException te)
catch (TimeoutException te)
{
SNICommon.ReportSNIError(SNIProviders.NP_PROV, SNICommon.ConnOpenFailedError, te);
_status = TdsEnums.SNI_ERROR;
return;
}
catch(IOException ioe)
catch (IOException ioe)
{
SNICommon.ReportSNIError(SNIProviders.NP_PROV, SNICommon.ConnOpenFailedError, ioe);
_status = TdsEnums.SNI_ERROR;
Expand Down Expand Up @@ -150,7 +150,7 @@ public override uint Receive(out SNIPacket packet, int timeout)
packet = null;
try
{
packet = new SNIPacket(_bufferSize);
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);
packet.ReadFromStream(_stream);

if (packet.Length == 0)
Expand All @@ -174,8 +174,8 @@ public override uint Receive(out SNIPacket packet, int timeout)

public override uint ReceiveAsync(ref SNIPacket packet)
{
packet = new SNIPacket(_bufferSize);
packet = new SNIPacket(headerSize: 0, dataSize: _bufferSize);

try
{
packet.ReadFromStreamAsync(_stream, _receiveCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
bool error = false;
try
{
packet._length = await valueTask.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await valueTask.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -45,13 +45,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, ValueTask<
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, 0, _capacity), CancellationToken.None);
ValueTask<int> vt = stream.ReadAsync(new Memory<byte>(_data, _headerLength, _dataCapacity), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
_length = vt.Result;
_dataLength = vt.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand Down Expand Up @@ -88,11 +88,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, 0, _length), CancellationToken.None);
ValueTask vt = stream.WriteAsync(new Memory<byte>(_data, _headerLength, _dataLength), CancellationToken.None);

if (vt.IsCompletedSuccessfully)
{
Expand All @@ -103,7 +103,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
bool error = false;
try
{
packet._length = await task.ConfigureAwait(false);
if (packet._length == 0)
packet._dataLength = await task.ConfigureAwait(false);
if (packet._dataLength == 0)
{
SNILoadHandle.SingletonInstance.LastError = new SNIError(SNIProviders.TCP_PROV, 0, SNICommon.ConnTerminatedError, string.Empty);
error = true;
Expand All @@ -45,13 +45,13 @@ async Task ReadFromStreamAsync(SNIPacket packet, SNIAsyncCallback cb, Task<int>
cb(packet, error ? TdsEnums.SNI_ERROR : TdsEnums.SNI_SUCCESS);
}

Task<int> t = stream.ReadAsync(_data, 0, _capacity, CancellationToken.None);
Task<int> t = stream.ReadAsync(_data, _headerLength, _dataCapacity, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
_length = t.Result;
_dataLength = t.Result;
// Zero length to go via async local function as is error condition
if (_length > 0)
if (_dataLength > 0)
{
callback(this, TdsEnums.SNI_SUCCESS);

Expand Down Expand Up @@ -88,11 +88,11 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfter)
{
packet.Dispose();
packet.Release();
}
}

Task t = stream.WriteAsync(_data, 0, _length, CancellationToken.None);
Task t = stream.WriteAsync(_data, _headerLength, _dataLength, CancellationToken.None);

if ((t.Status & TaskStatus.RanToCompletion) != 0)
{
Expand All @@ -103,7 +103,7 @@ async Task WriteToStreamAsync(SNIPacket packet, SNIAsyncCallback cb, SNIProvider

if (disposeAfterWriteAsync)
{
Dispose();
Release();
}

// Completed
Expand Down
Loading

0 comments on commit 87e7f41

Please sign in to comment.