Skip to content

Commit

Permalink
Refactor CancelableCommandExecutionStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeAmputer committed May 19, 2024
1 parent 297398a commit 27c10c0
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,70 +8,63 @@ namespace ClickHouse.Facades;
internal class CancelableCommandExecutionStrategy : ICommandExecutionStrategy
{
public Task<int> ExecuteNonQueryAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

command.QueryId = GenerateQueryId();

var result = command.ExecuteNonQueryAsync(cancellationToken);

return Execute(connection, command, result, cancellationToken);
}
CancellationToken cancellationToken) =>
Execute(
connection,
command,
(cmd, ct) => cmd.ExecuteNonQueryAsync(ct),
cancellationToken);

public Task<object> ExecuteScalarAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

command.QueryId = GenerateQueryId();

var result = command.ExecuteScalarAsync(cancellationToken);

return Execute(connection, command, result, cancellationToken);
}
CancellationToken cancellationToken) =>
Execute(
connection,
command,
(cmd, ct) => cmd.ExecuteScalarAsync(ct),
cancellationToken);

public Task<DbDataReader> ExecuteDataReaderAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken) =>
Execute(
connection,
command,
(cmd, ct) => cmd.ExecuteReaderAsync(ct),
cancellationToken);

private static Task<T> Execute<T>(
IClickHouseConnection connection,
ClickHouseCommand command,
Func<ClickHouseCommand, CancellationToken, Task<T>> resultTaskProvider,
CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

command.QueryId = GenerateQueryId();

var result = command.ExecuteReaderAsync(cancellationToken);
var cancelableTask = resultTaskProvider(command, cancellationToken);

return Execute(connection, command, result, cancellationToken);
}

private static Task<T> Execute<T>(
IClickHouseConnection connection,
ClickHouseCommand command,
Task<T> result,
CancellationToken cancellationToken)
{
if (cancellationToken == CancellationToken.None)
{
return result;
return cancelableTask;
}

result
cancelableTask
.ContinueWith(_ =>
{
Console.WriteLine(command.QueryId);
KillQuery(connection, command.QueryId);
},
CancellationToken.None,
TaskContinuationOptions.OnlyOnCanceled,
TaskScheduler.Current)
.ConfigureAwait(false);

return result;
return cancelableTask;
}

private static string GenerateQueryId() => Guid.NewGuid().ToString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
using System.Data.Common;
using ClickHouse.Client;
using ClickHouse.Client.ADO;

namespace ClickHouse.Facades;

internal class DefaultCommandExecutionStrategy : ICommandExecutionStrategy
{
public Task<int> ExecuteNonQueryAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken)
{
Expand All @@ -16,7 +17,7 @@ public Task<int> ExecuteNonQueryAsync(
}

public Task<object> ExecuteScalarAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken)
{
Expand All @@ -26,7 +27,7 @@ public Task<object> ExecuteScalarAsync(
}

public Task<DbDataReader> ExecuteDataReaderAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Data.Common;
using ClickHouse.Client;
using ClickHouse.Client.ADO;

namespace ClickHouse.Facades;
Expand All @@ -13,17 +14,17 @@ internal interface ICommandExecutionStrategy
};

Task<int> ExecuteNonQueryAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken);

Task<object> ExecuteScalarAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken);

Task<DbDataReader> ExecuteDataReaderAsync(
ClickHouseConnection connection,
IClickHouseConnection connection,
ClickHouseCommand command,
CancellationToken cancellationToken);
}

0 comments on commit 27c10c0

Please sign in to comment.