Skip to content

Commit

Permalink
fix: Adding retry support
Browse files Browse the repository at this point in the history
Adding options to create an AsyncDbSession with retry support
for high latency connection usage
  • Loading branch information
Farenheith committed Jul 23, 2024
1 parent f01fc4b commit a0dce6d
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 114 deletions.
19 changes: 13 additions & 6 deletions src/Codibre.MSSqlSession/Impl/AsyncDbSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ internal sealed class AsyncDbSession : IAsyncDbSession
private static readonly Task<IDisposable?> _nullTask = Task.FromResult((IDisposable?)null);
private static readonly object _updateToken = new();
private static DateTime _omitLogDeadline = DateTime.Now;
private readonly bool _customPool;
private readonly AsyncLocal<AsyncDbStorage?> _asyncStorage = new();
private readonly ILogger<IAsyncDbSession> _logger;
private readonly string _connectionString;
private readonly AsyncSqlOptions _options;
public DbTransaction? Transaction
{
get => _asyncStorage.Value?.Transaction;
Expand Down Expand Up @@ -64,11 +63,20 @@ public AsyncDbSession(
string connectionString,
bool customPool,
ILogger<AsyncDbSession> logger
) : this(logger, new AsyncSqlOptions
{
ConnectionString = connectionString,
CustomPool = customPool
})
{ }

public AsyncDbSession(
ILogger<AsyncDbSession> logger,
AsyncSqlOptions options
)
{
_logger = logger;
_customPool = customPool;
_connectionString = connectionString;
_options = options;
}

public async ValueTask Clear()
Expand Down Expand Up @@ -142,10 +150,9 @@ private static void UpdateOmitLogDeadline(DateTime now)
{
var disposer = new CrossedDisposer(this);
var connection = new AsyncSqlConnection(
_connectionString,
_options,
disposer,
_asyncStorage,
_customPool,
_logger
);
_asyncStorage.Value = new AsyncDbStorage(connection);
Expand Down
25 changes: 12 additions & 13 deletions src/Codibre.MSSqlSession/Impl/AsyncSqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,26 @@ namespace Codibre.MSSqlSession.Impl;
// Threads
internal class AsyncSqlConnection : DbConnection
{
private string _connectionString;
private readonly AsyncSqlOptions _options;
private string _dataSource;
private bool _opened = false;
private SqlConnection? _connection;
private object? _pooledConnection;
private readonly CrossedDisposer _disposer;
private readonly bool _customPool;
private readonly AsyncLocal<AsyncDbStorage?> _asyncDbStorage;
private readonly ILogger _logger;
public override string ConnectionString
{
get => _connectionString;
get => _options.ConnectionString;
set
{
if (_connectionString != value)
if (_options.ConnectionString != value)
{
_connection = null;
ReleaseConnection();
}
_connectionString = value;
_dataSource = GetDataSource(_connectionString);
_options.ConnectionString = value;
_dataSource = GetDataSource(_options.ConnectionString);
}
}

Expand All @@ -58,18 +57,16 @@ private static string GetDataSource(string connectionString)
public override ConnectionState State => _opened && _connection is not null ? _connection.State : ConnectionState.Closed;

internal AsyncSqlConnection(
string connectionString,
AsyncSqlOptions options,
CrossedDisposer disposer,
AsyncLocal<AsyncDbStorage?> currentTransaction,
bool customPool,
ILogger logger
)
{
_connectionString = connectionString;
_options = options;
_disposer = disposer;
_asyncDbStorage = currentTransaction;
_customPool = customPool;
_dataSource = GetDataSource(connectionString);
_dataSource = GetDataSource(_options.ConnectionString);
_logger = logger;
}

Expand All @@ -79,7 +76,7 @@ private SqlConnection Connection
{
if (_connection is null)
{
if (_customPool) (_connection, _pooledConnection) = SqlConnectionFactory.GetConnection(ConnectionString, _logger);
if (_options.CustomPool) (_connection, _pooledConnection) = SqlConnectionFactory.GetConnection(ConnectionString, _logger);
else _connection = new SqlConnection(ConnectionString);
}
return _connection;
Expand Down Expand Up @@ -145,7 +142,9 @@ protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLeve

protected override DbCommand CreateDbCommand()
{
var command = new AsyncDbCommand(Connection.CreateCommand(), this);
var sqlCommand = Connection.CreateCommand();
if (_options.RetryLogicBaseProvider is not null) sqlCommand.RetryLogicProvider = _options.RetryLogicBaseProvider;
var command = new AsyncDbCommand(sqlCommand, this);
var value = _asyncDbStorage.Value;
if (value != null)
{
Expand Down
10 changes: 10 additions & 0 deletions src/Codibre.MSSqlSession/Impl/AsyncSqlOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Microsoft.Data.SqlClient;

namespace Codibre.MSSqlSession.Impl;

public class AsyncSqlOptions
{
public string ConnectionString { get; set; }

Check warning on line 7 in src/Codibre.MSSqlSession/Impl/AsyncSqlOptions.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'ConnectionString' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 7 in src/Codibre.MSSqlSession/Impl/AsyncSqlOptions.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'ConnectionString' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 7 in src/Codibre.MSSqlSession/Impl/AsyncSqlOptions.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'ConnectionString' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 7 in src/Codibre.MSSqlSession/Impl/AsyncSqlOptions.cs

View workflow job for this annotation

GitHub Actions / build

Non-nullable property 'ConnectionString' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 7 in src/Codibre.MSSqlSession/Impl/AsyncSqlOptions.cs

View workflow job for this annotation

GitHub Actions / test

Non-nullable property 'ConnectionString' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 7 in src/Codibre.MSSqlSession/Impl/AsyncSqlOptions.cs

View workflow job for this annotation

GitHub Actions / test

Non-nullable property 'ConnectionString' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
public bool CustomPool { get; set; }
public SqlRetryLogicBaseProvider? RetryLogicBaseProvider { get; set; }
}
70 changes: 70 additions & 0 deletions src/Codibre.MSSqlSession/Impl/BatchQuery.Query.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Codibre.MSSqlSession.Extensions;
using Codibre.MSSqlSession.Impl.Utils;

namespace Codibre.MSSqlSession.Impl;

internal partial class BatchQuery : IBatchQuery
{
private readonly IAsyncDbSession _session;
private readonly IScriptBuilder _builder;
private readonly HookStorage _hookStorage;
internal BatchQuery(IAsyncDbSession session)
{
_session = session;
_builder = new ScriptBuilder(session.Connection);
_hookStorage = new(
new(),
_builder
);
}

public string Sql => _builder.Sql;
public int QueryCount => _builder.QueryCount;
public int ParamCount => _builder.ParamCount;

public void AddNoResultScript(FormattableString builtScript) => _builder.Add(builtScript);

public IResultHook<T> QueryFirstHook<T>(FormattableString builtScript)
{
var token = new object();
return _hookStorage.AddRequiredHook<T>(builtScript, token,
async (reader, setResult) => setResult(token, await reader.ReadFirstAsync<T>())
);
}
public IResultHook<T?> QueryFirstOrDefaultHook<T>(FormattableString builtScript)
{
var token = new object();
return _hookStorage.AddNullableHook<T?>(builtScript, token,
async (reader, setResult) => setResult(token, await reader.ReadFirstOrDefaultAsync<T>())
);
}

public IResultHook<IEnumerable<T>> QueryHook<T>(FormattableString builtScript)
{
var token = new object();
return _hookStorage.AddRequiredHook<IEnumerable<T>>(builtScript, token,
async (reader, setResult) => setResult(token, await reader.ReadAsync<T>())
);
}

public Task RunQueries(TimeSpan? customTimeout = null)
=> _hookStorage.RunQueries(customTimeout);

public Task Execute(TimeSpan? customTimeout = null)
=> _hookStorage.Execute(customTimeout);

public void Clear() => _hookStorage.Clear();

private IAsyncEnumerable<IList<KeyValuePair<TInput, TOutput>>> InternalPrepareEnumerable<TInput, TOutput>(
IEnumerable<TInput> enumerable,
Func<TInput, IBatchQuery, ValueTask<TOutput>> PreRunQuery
) => _hookStorage
.PrepareEnumerable(enumerable, (current) => PreRunQuery(current, this))
.OnStartAsync(async () => _session.ConnectionAcquired ? null : await _session.StartSession());

public IAsyncEnumerable<KeyValuePair<TInput, TOutput>> PrepareEnumerable<TInput, TOutput>(
IEnumerable<TInput> enumerable,
Func<TInput, IBatchQuery, ValueTask<TOutput>> PreRunQuery
) => InternalPrepareEnumerable(enumerable, PreRunQuery)
.SelectMany(x => x.ToAsyncEnumerable());
}
Original file line number Diff line number Diff line change
@@ -1,76 +1,12 @@
using Codibre.MSSqlSession.Extensions;
using Codibre.MSSqlSession.Impl.Utils;
using Codibre.MSSqlSession.Impl.Utils;

namespace Codibre.MSSqlSession.Impl;

internal class BatchQuery : IBatchQuery
internal partial class BatchQuery
{
private static readonly FormattableString _beginTran = $"BEGIN TRAN;";
private static readonly FormattableString _commitTran = $"COMMIT TRAN";
private readonly IAsyncDbSession _session;
private readonly IScriptBuilder _builder;
private readonly TransactionControl _transactionControl = new();
private readonly HookStorage _hookStorage;
internal BatchQuery(IAsyncDbSession session)
{
_session = session;
_builder = new ScriptBuilder(session.Connection);
_hookStorage = new(
new(),
_builder
);
}

public string Sql => _builder.Sql;
public int QueryCount => _builder.QueryCount;
public int ParamCount => _builder.ParamCount;

public void AddNoResultScript(FormattableString builtScript) => _builder.Add(builtScript);

public IResultHook<T> QueryFirstHook<T>(FormattableString builtScript)
{
var token = new object();
return _hookStorage.AddRequiredHook<T>(builtScript, token,
async (reader, setResult) => setResult(token, await reader.ReadFirstAsync<T>())
);
}
public IResultHook<T?> QueryFirstOrDefaultHook<T>(FormattableString builtScript)
{
var token = new object();
return _hookStorage.AddNullableHook<T?>(builtScript, token,
async (reader, setResult) => setResult(token, await reader.ReadFirstOrDefaultAsync<T>())
);
}

public IResultHook<IEnumerable<T>> QueryHook<T>(FormattableString builtScript)
{
var token = new object();
return _hookStorage.AddRequiredHook<IEnumerable<T>>(builtScript, token,
async (reader, setResult) => setResult(token, await reader.ReadAsync<T>())
);
}

public Task RunQueries(TimeSpan? customTimeout = null)
=> _hookStorage.RunQueries(customTimeout);

public Task Execute(TimeSpan? customTimeout = null)
=> _hookStorage.Execute(customTimeout);

public void Clear() => _hookStorage.Clear();

private IAsyncEnumerable<IList<KeyValuePair<TInput, TOutput>>> InternalPrepareEnumerable<TInput, TOutput>(
IEnumerable<TInput> enumerable,
Func<TInput, IBatchQuery, ValueTask<TOutput>> PreRunQuery
) => _hookStorage
.PrepareEnumerable(enumerable, (current) => PreRunQuery(current, this))
.OnStartAsync(async () => _session.ConnectionAcquired ? null : await _session.StartSession());

public IAsyncEnumerable<KeyValuePair<TInput, TOutput>> PrepareEnumerable<TInput, TOutput>(
IEnumerable<TInput> enumerable,
Func<TInput, IBatchQuery, ValueTask<TOutput>> PreRunQuery
) => InternalPrepareEnumerable(enumerable, PreRunQuery)
.SelectMany(x => x.ToAsyncEnumerable());

private async ValueTask InternalFlushTransaction()
{
if (!_transactionControl.Open)
Expand Down
Loading

0 comments on commit a0dce6d

Please sign in to comment.