Skip to content

Commit

Permalink
Load text column sizes to use them in queries
Browse files Browse the repository at this point in the history
  • Loading branch information
IgorFedchenko committed Feb 9, 2021
1 parent 1397850 commit 2e6ad55
Showing 1 changed file with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,9 @@ public RequestChunk(int chunkId, IJournalRequest[] requests)
private readonly Akka.Serialization.Serialization _serialization;
private readonly CircuitBreaker _circuitBreaker;
private int _remainingOperations;
private Option<int> _persistenceIdColumnSize = Option<int>.None;
private Option<int> _tagColumnSize = Option<int>.None;
private Option<int> _manifestColumnSize = Option<int>.None;

/// <summary>
/// Initializes a new instance of the <see cref="BatchingSqlJournal{TConnection, TCommand}" /> class.
Expand Down Expand Up @@ -888,6 +891,9 @@ private async Task<BatchComplete> ExecuteChunk(RequestChunk chunk, IActorContext
using (var connection = CreateConnection(Setup.ConnectionString))
{
await connection.OpenAsync();

// pre-load column sizes to use in queries
LoadColumnSizes(connection, Setup.NamingConventions);

using (var tx = connection.BeginTransaction(Setup.IsolationLevel))
using (var command = (TCommand)connection.CreateCommand())
Expand Down Expand Up @@ -1375,9 +1381,54 @@ protected void AddParameter(TCommand command, string paramName, DbType dbType, o
param.Value = value;
param.ParameterName = paramName;
param.DbType = dbType;

// for known string columns, keep parameter size constant for all queries for query plans optimization
switch (paramName)
{
case "@PersistenceId" when _persistenceIdColumnSize.HasValue:
param.Size = _persistenceIdColumnSize.Value;
break;
case "@Tag" when _tagColumnSize.HasValue:
param.Size = _tagColumnSize.Value;
break;
case "@Manifest" when _manifestColumnSize.HasValue:
param.Size = _manifestColumnSize.Value;
break;
}

command.Parameters.Add(param);
}

private void LoadColumnSizes(DbConnection connection, QueryConfiguration conventions)
{
// load schema/initialize column sizes only once
if (_persistenceIdColumnSize.HasValue || _tagColumnSize.HasValue || _manifestColumnSize.HasValue)
return;

var schema = connection.GetSchema();

// save real persistenceId column size to use in query parameter
if (schema.Columns.Contains(conventions.PersistenceIdColumnName) &&
schema.Columns[conventions.PersistenceIdColumnName].MaxLength > 0)
{
_persistenceIdColumnSize = schema.Columns[conventions.PersistenceIdColumnName].MaxLength;
}

// save real tag column size to use in query parameter
if (schema.Columns.Contains(conventions.TagsColumnName) &&
schema.Columns[conventions.TagsColumnName].MaxLength > 0)
{
_tagColumnSize = schema.Columns[conventions.TagsColumnName].MaxLength;
}

// save real manifest column size to use in query parameter
if (schema.Columns.Contains(conventions.ManifestColumnName) &&
schema.Columns[conventions.ManifestColumnName].MaxLength > 0)
{
_manifestColumnSize = schema.Columns[conventions.ManifestColumnName].MaxLength;
}
}

private RequestChunk DequeueChunk(int chunkId)
{
var operationsCount = Math.Min(Buffer.Count, Setup.MaxBatchSize);
Expand Down

0 comments on commit 2e6ad55

Please sign in to comment.