Skip to content

Commit

Permalink
Use using for critical section management everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
roji committed Jun 29, 2024
1 parent 3ada916 commit 8032232
Show file tree
Hide file tree
Showing 12 changed files with 543 additions and 712 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,85 +148,78 @@ private async Task<bool> MoveNextCore()

try
{
_concurrencyDetector?.EnterCriticalSection();
using var _ = _concurrencyDetector?.EnterCriticalSection();

try
if (_hasExecuted)
{
if (_hasExecuted)
{
return false;
}
return false;
}

_hasExecuted = true;
_hasExecuted = true;

var maxItemCount = (int)_cosmosQueryContext.ParameterValues[_queryingEnumerable._maxItemCountParameterName];
var continuationToken =
(string)_cosmosQueryContext.ParameterValues[_queryingEnumerable._continuationTokenParameterName];
var responseContinuationTokenLimitInKb = (int?)
_cosmosQueryContext.ParameterValues[_queryingEnumerable._responseContinuationTokenLimitInKbParameterName];
var maxItemCount = (int)_cosmosQueryContext.ParameterValues[_queryingEnumerable._maxItemCountParameterName];
var continuationToken =
(string)_cosmosQueryContext.ParameterValues[_queryingEnumerable._continuationTokenParameterName];
var responseContinuationTokenLimitInKb = (int?)
_cosmosQueryContext.ParameterValues[_queryingEnumerable._responseContinuationTokenLimitInKbParameterName];

var sqlQuery = _queryingEnumerable.GenerateQuery();
var sqlQuery = _queryingEnumerable.GenerateQuery();

EntityFrameworkMetricsData.ReportQueryExecuting();
EntityFrameworkMetricsData.ReportQueryExecuting();

var queryRequestOptions = new QueryRequestOptions
{
ResponseContinuationTokenLimitInKb = responseContinuationTokenLimitInKb
};
var queryRequestOptions = new QueryRequestOptions
{
ResponseContinuationTokenLimitInKb = responseContinuationTokenLimitInKb
};

if (_cosmosPartitionKeyValue != PartitionKey.None)
{
queryRequestOptions.PartitionKey = _cosmosPartitionKeyValue;
}
if (_cosmosPartitionKeyValue != PartitionKey.None)
{
queryRequestOptions.PartitionKey = _cosmosPartitionKeyValue;
}

var cosmosClient = _cosmosQueryContext.CosmosClient;
_commandLogger.ExecutingSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery);
_cosmosQueryContext.InitializeStateManager(_standAloneStateManager);

var cosmosClient = _cosmosQueryContext.CosmosClient;
_commandLogger.ExecutingSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery);
_cosmosQueryContext.InitializeStateManager(_standAloneStateManager);
var results = new List<T>(maxItemCount);

var results = new List<T>(maxItemCount);
while (maxItemCount > 0)
{
queryRequestOptions.MaxItemCount = maxItemCount;
using var feedIterator = cosmosClient.CreateQuery(
_cosmosContainer, sqlQuery, continuationToken, queryRequestOptions);

using var responseMessage = await feedIterator.ReadNextAsync(_cancellationToken).ConfigureAwait(false);

_commandLogger.ExecutedReadNext(
responseMessage.Diagnostics.GetClientElapsedTime(),
responseMessage.Headers.RequestCharge,
responseMessage.Headers.ActivityId,
_cosmosContainer,
_cosmosPartitionKeyValue,
sqlQuery);

while (maxItemCount > 0)
responseMessage.EnsureSuccessStatusCode();

var responseMessageEnumerable = cosmosClient.GetResponseMessageEnumerable(responseMessage);
foreach (var resultObject in responseMessageEnumerable)
{
queryRequestOptions.MaxItemCount = maxItemCount;
using var feedIterator = cosmosClient.CreateQuery(
_cosmosContainer, sqlQuery, continuationToken, queryRequestOptions);

using var responseMessage = await feedIterator.ReadNextAsync(_cancellationToken).ConfigureAwait(false);

_commandLogger.ExecutedReadNext(
responseMessage.Diagnostics.GetClientElapsedTime(),
responseMessage.Headers.RequestCharge,
responseMessage.Headers.ActivityId,
_cosmosContainer,
_cosmosPartitionKeyValue,
sqlQuery);

responseMessage.EnsureSuccessStatusCode();

var responseMessageEnumerable = cosmosClient.GetResponseMessageEnumerable(responseMessage);
foreach (var resultObject in responseMessageEnumerable)
{
results.Add(_shaper(_cosmosQueryContext, resultObject));
maxItemCount--;
}

continuationToken = responseMessage.ContinuationToken;

if (responseMessage.ContinuationToken is null)
{
break;
}
results.Add(_shaper(_cosmosQueryContext, resultObject));
maxItemCount--;
}

Current = new CosmosPage<T>(results, continuationToken);
continuationToken = responseMessage.ContinuationToken;

_hasExecuted = true;
return true;
}
finally
{
_concurrencyDetector?.ExitCriticalSection();
if (responseMessage.ContinuationToken is null)
{
break;
}
}

Current = new CosmosPage<T>(results, continuationToken);

_hasExecuted = true;
return true;
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,35 +147,28 @@ public bool MoveNext()
{
try
{
_concurrencyDetector?.EnterCriticalSection();
using var _ = _concurrencyDetector?.EnterCriticalSection();

try
if (_enumerator == null)
{
if (_enumerator == null)
{
var sqlQuery = _queryingEnumerable.GenerateQuery();
var sqlQuery = _queryingEnumerable.GenerateQuery();

EntityFrameworkMetricsData.ReportQueryExecuting();
EntityFrameworkMetricsData.ReportQueryExecuting();

_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery)
.GetEnumerator();
_cosmosQueryContext.InitializeStateManager(_standAloneStateManager);
}
_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQuery(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery)
.GetEnumerator();
_cosmosQueryContext.InitializeStateManager(_standAloneStateManager);
}

var hasNext = _enumerator.MoveNext();
var hasNext = _enumerator.MoveNext();

Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;
Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;

return hasNext;
}
finally
{
_concurrencyDetector?.ExitCriticalSection();
}
return hasNext;
}
catch (Exception exception)
{
Expand Down Expand Up @@ -242,35 +235,28 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
_concurrencyDetector?.EnterCriticalSection();
using var _ = _concurrencyDetector?.EnterCriticalSection();

try
if (_enumerator == null)
{
if (_enumerator == null)
{
var sqlQuery = _queryingEnumerable.GenerateQuery();
var sqlQuery = _queryingEnumerable.GenerateQuery();

EntityFrameworkMetricsData.ReportQueryExecuting();
EntityFrameworkMetricsData.ReportQueryExecuting();

_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQueryAsync(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery)
.GetAsyncEnumerator(_cancellationToken);
_cosmosQueryContext.InitializeStateManager(_standAloneStateManager);
}
_enumerator = _cosmosQueryContext.CosmosClient
.ExecuteSqlQueryAsync(_cosmosContainer, _cosmosPartitionKeyValue, sqlQuery)
.GetAsyncEnumerator(_cancellationToken);
_cosmosQueryContext.InitializeStateManager(_standAloneStateManager);
}

var hasNext = await _enumerator.MoveNextAsync().ConfigureAwait(false);
var hasNext = await _enumerator.MoveNextAsync().ConfigureAwait(false);

Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;
Current
= hasNext
? _shaper(_cosmosQueryContext, _enumerator.Current)
: default;

return hasNext;
}
finally
{
_concurrencyDetector?.ExitCriticalSection();
}
return hasNext;
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,38 +177,31 @@ public bool MoveNext()
{
try
{
_concurrencyDetector?.EnterCriticalSection();
using var _ = _concurrencyDetector?.EnterCriticalSection();

try
if (_hasExecuted)
{
if (!_hasExecuted)
{
if (!_readItemEnumerable.TryGetResourceId(out var resourceId))
{
throw new InvalidOperationException(CosmosStrings.ResourceIdMissing);
}
return false;
}

if (!_readItemEnumerable.TryGetPartitionKey(out var partitionKeyValue))
{
throw new InvalidOperationException(CosmosStrings.PartitionKeyMissing);
}
if (!_readItemEnumerable.TryGetResourceId(out var resourceId))
{
throw new InvalidOperationException(CosmosStrings.ResourceIdMissing);
}

EntityFrameworkMetricsData.ReportQueryExecuting();
if (!_readItemEnumerable.TryGetPartitionKey(out var partitionKeyValue))
{
throw new InvalidOperationException(CosmosStrings.PartitionKeyMissing);
}

_item = _cosmosQueryContext.CosmosClient.ExecuteReadItem(
_cosmosContainer,
partitionKeyValue,
resourceId);
EntityFrameworkMetricsData.ReportQueryExecuting();

return ShapeResult();
}
_item = _cosmosQueryContext.CosmosClient.ExecuteReadItem(
_cosmosContainer,
partitionKeyValue,
resourceId);

return false;
}
finally
{
_concurrencyDetector?.ExitCriticalSection();
}
return ShapeResult();
}
catch (Exception exception)
{
Expand All @@ -229,40 +222,33 @@ public async ValueTask<bool> MoveNextAsync()
{
try
{
_concurrencyDetector?.EnterCriticalSection();
using var _ = _concurrencyDetector?.EnterCriticalSection();

try
if (_hasExecuted)
{
if (!_hasExecuted)
{
if (!_readItemEnumerable.TryGetResourceId(out var resourceId))
{
throw new InvalidOperationException(CosmosStrings.ResourceIdMissing);
}

if (!_readItemEnumerable.TryGetPartitionKey(out var partitionKeyValue))
{
throw new InvalidOperationException(CosmosStrings.PartitionKeyMissing);
}

EntityFrameworkMetricsData.ReportQueryExecuting();

_item = await _cosmosQueryContext.CosmosClient.ExecuteReadItemAsync(
_cosmosContainer,
partitionKeyValue,
resourceId,
_cancellationToken)
.ConfigureAwait(false);

return ShapeResult();
}

return false;
}
finally

if (!_readItemEnumerable.TryGetResourceId(out var resourceId))
{
_concurrencyDetector?.ExitCriticalSection();
throw new InvalidOperationException(CosmosStrings.ResourceIdMissing);
}

if (!_readItemEnumerable.TryGetPartitionKey(out var partitionKeyValue))
{
throw new InvalidOperationException(CosmosStrings.PartitionKeyMissing);
}

EntityFrameworkMetricsData.ReportQueryExecuting();

_item = await _cosmosQueryContext.CosmosClient.ExecuteReadItemAsync(
_cosmosContainer,
partitionKeyValue,
resourceId,
_cancellationToken)
.ConfigureAwait(false);

return ShapeResult();
}
catch (Exception exception)
{
Expand Down
Loading

0 comments on commit 8032232

Please sign in to comment.