Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#3422 SQL persistence providers: Data reader SequentialAccess #3429

Merged
merged 2 commits into from
May 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public interface IJournalQueryExecutor
Task<ImmutableArray<string>> SelectAllPersistenceIdsAsync(DbConnection connection, CancellationToken cancellationToken);

/// <summary>
/// Asynchronously replays a <paramref name="callback"/> on all selected events for provided
/// <paramref name="persistenceId"/>, within boundaries of <paramref name="fromSequenceNr"/>
/// Asynchronously replays a <paramref name="callback"/> on all selected events for provided
/// <paramref name="persistenceId"/>, within boundaries of <paramref name="fromSequenceNr"/>
/// and <paramref name="toSequenceNr"/> up to <paramref name="max"/> number of events.
/// </summary>
/// <param name="connection">TBD</param>
Expand All @@ -53,8 +53,8 @@ public interface IJournalQueryExecutor
Task SelectByPersistenceIdAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> callback);

/// <summary>
/// Asynchronously replays <paramref name="callback"/> on all selected events, which have been tagged using
/// provided <paramref name="tag"/>, within boundaries of <paramref name="fromOffset"/> and
/// Asynchronously replays <paramref name="callback"/> on all selected events, which have been tagged using
/// provided <paramref name="tag"/>, within boundaries of <paramref name="fromOffset"/> and
/// <paramref name="toOffset"/>, up to <paramref name="max"/> number of elements.
/// Returns highest sequence number from selected events.
/// </summary>
Expand Down Expand Up @@ -171,6 +171,11 @@ public class QueryConfiguration
/// </summary>
public string DefaultSerializer { get; }

/// <summary>
/// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.
/// </summary>
public bool UseSequentialAccess { get; }

/// <summary>
/// TBD
/// </summary>
Expand All @@ -188,6 +193,7 @@ public class QueryConfiguration
/// <param name="serializerIdColumnName">TBD</param>
/// <param name="timeout">TBD</param>
/// <param name="defaultSerializer">The default serializer used when not type override matching is found</param>
/// <param name="useSequentialAccess">Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.</param>
public QueryConfiguration(
string schemaName,
string journalEventsTableName,
Expand All @@ -202,7 +208,8 @@ public QueryConfiguration(
string orderingColumnName,
string serializerIdColumnName,
TimeSpan timeout,
string defaultSerializer)
string defaultSerializer,
bool useSequentialAccess)
{
SchemaName = schemaName;
JournalEventsTableName = journalEventsTableName;
Expand All @@ -218,6 +225,7 @@ public QueryConfiguration(
OrderingColumnName = orderingColumnName;
DefaultSerializer = defaultSerializer;
SerializerIdColumnName = serializerIdColumnName;
UseSequentialAccess = useSequentialAccess;
}

/// <summary>
Expand All @@ -235,7 +243,7 @@ public QueryConfiguration(
/// </summary>
public abstract class AbstractQueryExecutor : IJournalQueryExecutor
{
// indexes of particular fields returned from all events queries
// indexes of particular fields returned from all events queries
// they must match `allEventColumnNames` order
/// <summary>
/// TBD
Expand Down Expand Up @@ -461,7 +469,18 @@ public virtual async Task SelectByPersistenceIdAsync(DbConnection connection, Ca
AddParameter(command, "@FromSequenceNr", DbType.Int64, fromSequenceNr);
AddParameter(command, "@ToSequenceNr", DbType.Int64, toSequenceNr);

using (var reader = await command.ExecuteReaderAsync(cancellationToken))
CommandBehavior commandBehavior;

if (Configuration.UseSequentialAccess)
{
commandBehavior = CommandBehavior.SequentialAccess;
}
else
{
commandBehavior = CommandBehavior.Default;
}

using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken))
{
var i = 0L;
while ((i++) < max && await reader.ReadAsync(cancellationToken))
Expand Down Expand Up @@ -494,7 +513,18 @@ public virtual async Task<long> SelectByTagAsync(DbConnection connection, Cancel
AddParameter(command, "@Ordering", DbType.Int64, fromOffset);
AddParameter(command, "@Take", DbType.Int64, take);

using (var reader = await command.ExecuteReaderAsync(cancellationToken))
CommandBehavior commandBehavior;

if (Configuration.UseSequentialAccess)
{
commandBehavior = CommandBehavior.SequentialAccess;
}
else
{
commandBehavior = CommandBehavior.Default;
}

using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken))
{
var maxSequenceNr = 0L;
while (await reader.ReadAsync(cancellationToken))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
namespace Akka.Persistence.Sql.Common.Snapshot
{
/// <summary>
/// Flattened and serialized snapshot object used as intermediate representation
/// Flattened and serialized snapshot object used as intermediate representation
/// before saving snapshot with metadata inside SQL Server database.
/// </summary>
public class SnapshotEntry
Expand Down Expand Up @@ -112,6 +112,11 @@ public class QueryConfiguration
/// </summary>
public readonly string DefaultSerializer;

/// <summary>
/// Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.
/// </summary>
public bool UseSequentialAccess { get; }

/// <summary>
/// TBD
/// </summary>
Expand All @@ -125,6 +130,7 @@ public class QueryConfiguration
/// <param name="serializerIdColumnName">TBD</param>
/// <param name="timeout">TBD</param>
/// <param name="defaultSerializer">The default serializer used when not type override matching is found</param>
/// <param name="useSequentialAccess">Uses the CommandBehavior.SequentialAccess when creating the command, providing a performance improvement for reading large BLOBS.</param>
public QueryConfiguration(
string schemaName,
string snapshotTableName,
Expand All @@ -134,8 +140,9 @@ public QueryConfiguration(
string manifestColumnName,
string timestampColumnName,
string serializerIdColumnName,
TimeSpan timeout,
string defaultSerializer)
TimeSpan timeout,
string defaultSerializer,
bool useSequentialAccess)
{
SchemaName = schemaName;
SnapshotTableName = snapshotTableName;
Expand All @@ -147,6 +154,7 @@ public QueryConfiguration(
SerializerIdColumnName = serializerIdColumnName;
Timeout = timeout;
DefaultSerializer = defaultSerializer;
UseSequentialAccess = useSequentialAccess;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the default value of this? Is it left up to the plugin implementation? Should we add it to some of the built-in HOCON files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response.
Default value is false = current behavior
As the configuration is read by the plugin implementations it is basically up to the plugins. Therefore I'm not sure if it makes sense to put into some HOCO file on Akka level.
So I'm not sure if further changes are needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response.
Default value is false = current behavior
As the configuration is read by the plugin implementations it is basically up to the plugins. Therefore I'm not sure if it makes sense to put into some HOCO file on Akka level.
So I'm not sure if further changes are needed.

}

/// <summary>
Expand All @@ -166,7 +174,7 @@ public interface ISnapshotQueryExecutor
QueryConfiguration Configuration { get; }

/// <summary>
/// Deletes a single snapshot identified by it's persistent actor's <paramref name="persistenceId"/>,
/// Deletes a single snapshot identified by it's persistent actor's <paramref name="persistenceId"/>,
/// <paramref name="sequenceNr"/> and <paramref name="timestamp"/>.
/// </summary>
/// <param name="connection">TBD</param>
Expand All @@ -178,7 +186,7 @@ public interface ISnapshotQueryExecutor
Task DeleteAsync(DbConnection connection, CancellationToken cancellationToken, string persistenceId, long sequenceNr, DateTime? timestamp);

/// <summary>
/// Deletes all snapshot matching persistent actor's <paramref name="persistenceId"/> as well as
/// Deletes all snapshot matching persistent actor's <paramref name="persistenceId"/> as well as
/// upper (inclusive) bounds of the both <paramref name="maxSequenceNr"/> and <paramref name="maxTimestamp"/>.
/// </summary>
/// <param name="connection">TBD</param>
Expand Down Expand Up @@ -466,7 +474,19 @@ public virtual async Task<SelectedSnapshot> SelectSnapshotAsync(DbConnection con
SetPersistenceIdParameter(persistenceId, command);
SetSequenceNrParameter(maxSequenceNr, command);
SetTimestampParameter(maxTimestamp, command);
using (var reader = await command.ExecuteReaderAsync(cancellationToken))

CommandBehavior commandBehavior;

if (Configuration.UseSequentialAccess)
{
commandBehavior = CommandBehavior.SequentialAccess;
}
else
{
commandBehavior = CommandBehavior.Default;
}

using (var reader = await command.ExecuteReaderAsync(commandBehavior, cancellationToken))
{
if (await reader.ReadAsync(cancellationToken))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public sealed class BatchingSqliteJournalSetup : BatchingSqlJournalSetup
orderingColumnName: "ordering",
serializerIdColumnName: "serializer_id",
timeout: config.GetTimeSpan("connection-timeout"),
defaultSerializer: config.GetString("serializer")))
defaultSerializer: config.GetString("serializer"),
useSequentialAccess: config.GetBoolean("use-sequential-access")))
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public SqliteJournal(Config journalConfig) : base(journalConfig.WithFallback(Ext
orderingColumnName: "ordering",
serializerIdColumnName: "serializer_id",
timeout: config.GetTimeSpan("connection-timeout"),
defaultSerializer: config.GetString("serializer")),
defaultSerializer: config.GetString("serializer"),
useSequentialAccess: config.GetBoolean("use-sequential-access")),
Context.System.Serialization,
GetTimestampProvider(config.GetString("timestamp-provider")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public SqliteSnapshotStore(Config snapshotConfig) : base(snapshotConfig)
timestampColumnName: "created_at",
serializerIdColumnName: "serializer_id",
timeout: config.GetTimeSpan("connection-timeout"),
defaultSerializer: config.GetString("serializer")),
defaultSerializer: config.GetString("serializer"),
useSequentialAccess: config.GetBoolean("use-sequential-access")),
Context.System.Serialization);
}

Expand Down