Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: rework TdsParserStateObject and SqlDataReader snapshots #198

Merged
merged 4 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ internal class SharedState

private Task _currentTask;
private Snapshot _snapshot;
private Snapshot _cachedSnapshot;
private CancellationTokenSource _cancelAsyncOnCloseTokenSource;
private CancellationToken _cancelAsyncOnCloseToken;

Expand Down Expand Up @@ -803,7 +804,7 @@ private bool TryCleanPartialRead()
}

#if DEBUG
if (_stateObj._pendingData)
if (_stateObj.HasPendingData)
{
byte token;
if (!_stateObj.TryPeekByte(out token))
Expand Down Expand Up @@ -936,7 +937,7 @@ private bool TryCloseInternal(bool closeReader)

try
{
if ((!_isClosed) && (parser != null) && (stateObj != null) && (stateObj._pendingData))
if ((!_isClosed) && (parser != null) && (stateObj != null) && (stateObj.HasPendingData))
{
// It is possible for this to be called during connection close on a
// broken connection, so check state first.
Expand Down Expand Up @@ -1118,7 +1119,7 @@ private bool TryConsumeMetaData()
{
// warning: Don't check the MetaData property within this function
// warning: as it will be a reentrant call
while (_parser != null && _stateObj != null && _stateObj._pendingData && !_metaDataConsumed)
while (_parser != null && _stateObj != null && _stateObj.HasPendingData && !_metaDataConsumed)
{
if (_parser.State == TdsParserState.Broken || _parser.State == TdsParserState.Closed)
{
Expand Down Expand Up @@ -3053,7 +3054,7 @@ private bool TryHasMoreResults(out bool moreResults)

Debug.Assert(null != _command, "unexpected null command from the data reader!");

while (_stateObj._pendingData)
while (_stateObj.HasPendingData)
{
byte token;
if (!_stateObj.TryPeekByte(out token))
Expand Down Expand Up @@ -3137,7 +3138,7 @@ private bool TryHasMoreRows(out bool moreRows)
moreRows = false;
return true;
}
if (_stateObj._pendingData)
if (_stateObj.HasPendingData)
{
// Consume error's, info's, done's on HasMoreRows, so user obtains error on Read.
byte b;
Expand Down Expand Up @@ -3178,7 +3179,7 @@ private bool TryHasMoreRows(out bool moreRows)
moreRows = false;
return false;
}
if (_stateObj._pendingData)
if (_stateObj.HasPendingData)
{
if (!_stateObj.TryPeekByte(out b))
{
Expand Down Expand Up @@ -3451,7 +3452,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
if (moreRows)
{
// read the row from the backend (unless it's an altrow were the marker is already inside the altrow ...)
while (_stateObj._pendingData)
while (_stateObj.HasPendingData)
{
if (_altRowStatus != ALTROWSTATUS.AltRow)
{
Expand Down Expand Up @@ -3483,7 +3484,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
}
}

if (!_stateObj._pendingData)
if (!_stateObj.HasPendingData)
{
if (!TryCloseInternal(false /*closeReader*/))
{
Expand All @@ -3507,7 +3508,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
{
// if we are in SingleRow mode, and we've read the first row,
// read the rest of the rows, if any
while (_stateObj._pendingData && !_sharedState._dataReady)
while (_stateObj.HasPendingData && !_sharedState._dataReady)
{
if (!_parser.TryRun(RunBehavior.ReturnImmediately, _command, this, null, _stateObj, out _sharedState._dataReady))
{
Expand Down Expand Up @@ -3548,7 +3549,7 @@ private bool TryReadInternal(bool setTimeout, out bool more)
more = false;

#if DEBUG
if ((!_sharedState._dataReady) && (_stateObj._pendingData))
if ((!_sharedState._dataReady) && (_stateObj.HasPendingData))
{
byte token;
if (!_stateObj.TryPeekByte(out token))
Expand Down Expand Up @@ -3770,6 +3771,10 @@ private bool TryReadColumnInternal(int i, bool readHeaderOnly = false)
{
// reset snapshot to save memory use. We can safely do that here because all SqlDataReader values are stable.
// The retry logic can use the current values to get back to the right state.
if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
_snapshot = null;
PrepareAsyncInvocation(useSnapshot: true);
}
Expand Down Expand Up @@ -4659,6 +4664,10 @@ public override Task<bool> ReadAsync(CancellationToken cancellationToken)
if (!rowTokenRead)
{
rowTokenRead = true;
if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
_snapshot = null;
PrepareAsyncInvocation(useSnapshot: true);
}
Expand Down Expand Up @@ -5112,28 +5121,27 @@ private void PrepareAsyncInvocation(bool useSnapshot)

if (_snapshot == null)
{
_snapshot = new Snapshot
{
_dataReady = _sharedState._dataReady,
_haltRead = _haltRead,
_metaDataConsumed = _metaDataConsumed,
_browseModeInfoConsumed = _browseModeInfoConsumed,
_hasRows = _hasRows,
_altRowStatus = _altRowStatus,
_nextColumnDataToRead = _sharedState._nextColumnDataToRead,
_nextColumnHeaderToRead = _sharedState._nextColumnHeaderToRead,
_columnDataBytesRead = _columnDataBytesRead,
_columnDataBytesRemaining = _sharedState._columnDataBytesRemaining,

// _metadata and _altaMetaDataSetCollection must be Cloned
// before they are updated
_metadata = _metaData,
_altMetaDataSetCollection = _altMetaDataSetCollection,
_tableNames = _tableNames,

_currentStream = _currentStream,
_currentTextReader = _currentTextReader,
};
_snapshot = Interlocked.Exchange(ref _cachedSnapshot, null) ?? new Snapshot();

_snapshot._dataReady = _sharedState._dataReady;
_snapshot._haltRead = _haltRead;
_snapshot._metaDataConsumed = _metaDataConsumed;
_snapshot._browseModeInfoConsumed = _browseModeInfoConsumed;
_snapshot._hasRows = _hasRows;
_snapshot._altRowStatus = _altRowStatus;
_snapshot._nextColumnDataToRead = _sharedState._nextColumnDataToRead;
_snapshot._nextColumnHeaderToRead = _sharedState._nextColumnHeaderToRead;
_snapshot._columnDataBytesRead = _columnDataBytesRead;
_snapshot._columnDataBytesRemaining = _sharedState._columnDataBytesRemaining;

// _metadata and _altaMetaDataSetCollection must be Cloned
// before they are updated
_snapshot._metadata = _metaData;
_snapshot._altMetaDataSetCollection = _altMetaDataSetCollection;
_snapshot._tableNames = _tableNames;

_snapshot._currentStream = _currentStream;
_snapshot._currentTextReader = _currentTextReader;

_stateObj.SetSnapshot();
}
Expand Down Expand Up @@ -5186,6 +5194,10 @@ private void CleanupAfterAsyncInvocationInternal(TdsParserStateObject stateObj,
stateObj._permitReplayStackTraceToDiffer = false;
#endif

if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
// We are setting this to null inside the if-statement because stateObj==null means that the reader hasn't been initialized or has been closed (either way _snapshot should already be null)
_snapshot = null;
}
Expand Down Expand Up @@ -5224,6 +5236,10 @@ private void SwitchToAsyncWithoutSnapshot()
Debug.Assert(_snapshot != null, "Should currently have a snapshot");
Debug.Assert(_stateObj != null && !_stateObj._asyncReadWithoutSnapshot, "Already in async without snapshot");

if (_cachedSnapshot is null)
{
_cachedSnapshot = _snapshot;
}
_snapshot = null;
_stateObj.ResetSnapshot();
_stateObj._asyncReadWithoutSnapshot = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,11 @@ internal override void ValidateConnectionForExecute(SqlCommand command)
// or if MARS is off, then a datareader exists
throw ADP.OpenReaderExists(parser.MARSOn);
}
else if (!parser.MARSOn && parser._physicalStateObj._pendingData)
else if (!parser.MARSOn && parser._physicalStateObj.HasPendingData)
{
parser.DrainData(parser._physicalStateObj);
}
Debug.Assert(!parser._physicalStateObj._pendingData, "Should not have a busy physicalStateObject at this point!");
Debug.Assert(!parser._physicalStateObj.HasPendingData, "Should not have a busy physicalStateObject at this point!");

parser.RollbackOrphanedAPITransactions();
}
Expand Down Expand Up @@ -840,7 +840,7 @@ private void ResetConnection()
// obtains a clone.

Debug.Assert(!HasLocalTransactionFromAPI, "Upon ResetConnection SqlInternalConnectionTds has a currently ongoing local transaction.");
Debug.Assert(!_parser._physicalStateObj._pendingData, "Upon ResetConnection SqlInternalConnectionTds has pending data.");
Debug.Assert(!_parser._physicalStateObj.HasPendingData, "Upon ResetConnection SqlInternalConnectionTds has pending data.");

if (_fResetConnection)
{
Expand Down
Loading