Skip to content

Commit

Permalink
Support merge strategy when updating entities
Browse files Browse the repository at this point in the history
In table table (not entity or document, which cannot do merge since the entity is stored as a whole), we can perform data merging by submitting only non-null properties for saving, so that when using Merge, we can fetch all columns and avoid overwriting existing values if no new values are provided.

According to the docs on the REST API operations on entities (see https://docs.microsoft.com/en-us/rest/api/storageservices/Merge-Entity?redirectedfrom=MSDN#remarks), null values are already skipped when doing this, so we just need to replicate that ourselves too by skipping those properties too when persisting.

NOTE: when using Merge, every PutAsync operation will incur an additional REST request for the retrieval of the merged data.

Fixes #46
  • Loading branch information
kzu committed Jul 1, 2021
1 parent f923549 commit 4eae0f4
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 16 deletions.
29 changes: 21 additions & 8 deletions src/TableStorage/TablePartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ static partial class TablePartition
/// <typeparamref name="T"/> <c>Name</c> as the partition key.
/// </summary>
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="updateStrategy">Strategy to apply when updating an existing entity. Defaults to <see cref="UpdateStrategy.Replace"/>.</param>
/// <returns>The new <see cref="ITablePartition{TEntity}"/>.</returns>
public static ITablePartition<TableEntity> Create(CloudStorageAccount storageAccount, string tableName, string partitionKey)
=> new TableEntityPartition(storageAccount, tableName, partitionKey);
public static ITablePartition<TableEntity> Create(CloudStorageAccount storageAccount, string tableName, string partitionKey, UpdateStrategy? updateStrategy = default)
=> new TableEntityPartition(storageAccount, tableName, partitionKey)
{
UpdateStrategy = updateStrategy ?? UpdateStrategy.Replace
};

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
Expand All @@ -40,11 +44,13 @@ public static ITablePartition<TableEntity> Create(CloudStorageAccount storageAcc
/// <typeparam name="T">The type of entity that the repository will manage.</typeparam>
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="rowKey">Function to retrieve the row key for a given entity.</param>
/// <param name="updateStrategy">Strategy to apply when updating an existing entity. Defaults to <see cref="UpdateStrategy.Replace"/>.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static ITablePartition<T> Create<T>(
CloudStorageAccount storageAccount,
Expression<Func<T, string>> rowKey) where T : class
=> Create<T>(storageAccount, DefaultTableName, default, rowKey);
Expression<Func<T, string>> rowKey,
UpdateStrategy? updateStrategy = default) where T : class
=> Create<T>(storageAccount, DefaultTableName, default, rowKey, updateStrategy);

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
Expand All @@ -55,12 +61,14 @@ public static ITablePartition<T> Create<T>(
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="tableName">Table name to use.</param>
/// <param name="rowKey">Function to retrieve the row key for a given entity.</param>
/// <param name="updateStrategy">Strategy to apply when updating an existing entity. Defaults to <see cref="UpdateStrategy.Replace"/>.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static ITablePartition<T> Create<T>(
CloudStorageAccount storageAccount,
string tableName,
Expression<Func<T, string>> rowKey) where T : class
=> Create<T>(storageAccount, tableName, default, rowKey);
Expression<Func<T, string>> rowKey,
UpdateStrategy? updateStrategy = default) where T : class
=> Create<T>(storageAccount, tableName, default, rowKey, updateStrategy);

/// <summary>
/// Creates an <see cref="ITablePartition{T}"/> for the given entity type
Expand All @@ -74,18 +82,23 @@ public static ITablePartition<T> Create<T>(
/// If not provided, the <typeparamref name="T"/> <c>Name</c> will be used.</param>
/// <param name="rowKey">Optional function to retrieve the row key for a given entity.
/// If not provided, the class will need a property annotated with <see cref="RowKeyAttribute"/>.</param>
/// <param name="updateStrategy">Strategy to apply when updating an existing entity. Defaults to <see cref="UpdateStrategy.Replace"/>.</param>
/// <returns>The new <see cref="ITablePartition{T}"/>.</returns>
public static ITablePartition<T> Create<T>(
CloudStorageAccount storageAccount,
string? tableName = default,
string? partitionKey = null,
Expression<Func<T, string>>? rowKey = null) where T : class
Expression<Func<T, string>>? rowKey = null,
UpdateStrategy? updateStrategy = default) where T : class
{
tableName ??= GetDefaultTableName<T>();
partitionKey ??= GetDefaultPartitionKey<T>();
rowKey ??= RowKeyAttribute.CreateAccessor<T>();

return new TablePartition<T>(storageAccount, tableName, partitionKey, rowKey);
return new TablePartition<T>(storageAccount, tableName, partitionKey, rowKey)
{
UpdateStrategy = updateStrategy ?? UpdateStrategy.Replace
};
}

/// <summary>
Expand Down
9 changes: 9 additions & 0 deletions src/TableStorage/TablePartition`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ protected internal TablePartition(CloudStorageAccount storageAccount, string tab
/// <inheritdoc />
public string PartitionKey { get; }

/// <summary>
/// The strategy to use when updating an existing entity.
/// </summary>
public UpdateStrategy UpdateStrategy
{
get => repository.UpdateStrategy;
set => repository.UpdateStrategy = value;
}

/// <inheritdoc />
public IQueryable<T> CreateQuery()
{
Expand Down
21 changes: 15 additions & 6 deletions src/TableStorage/TableRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ static partial class TableRepository
/// </summary>
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="tableName">Table name to use.</param>
/// <param name="updateStrategy">Strategy to apply when updating an existing entity. Defaults to <see cref="UpdateStrategy.Replace"/>.</param>
/// <returns>The new <see cref="ITableRepository{TableEntity}"/>.</returns>
public static ITableRepository<TableEntity> Create(
CloudStorageAccount storageAccount,
string tableName)
=> new TableEntityRepository(storageAccount, tableName);
string tableName,
UpdateStrategy? updateStrategy = default)
=> new TableEntityRepository(storageAccount, tableName) { UpdateStrategy = updateStrategy ?? UpdateStrategy.Replace };

/// <summary>
/// Creates an <see cref="ITableRepository{T}"/> for the given entity type
Expand All @@ -36,12 +38,14 @@ public static ITableRepository<TableEntity> Create(
/// <param name="storageAccount">The storage account to use.</param>
/// <param name="partitionKey">Function to retrieve the partition key for a given entity.</param>
/// <param name="rowKey">Function to retrieve the row key for a given entity.</param>
/// <param name="updateStrategy">Strategy to apply when updating an existing entity. Defaults to <see cref="UpdateStrategy.Replace"/>.</param>
/// <returns>The new <see cref="ITableRepository{T}"/>.</returns>
public static ITableRepository<T> Create<T>(
CloudStorageAccount storageAccount,
Expression<Func<T, string>> partitionKey,
Expression<Func<T, string>> rowKey) where T : class
=> Create<T>(storageAccount, typeof(T).Name, partitionKey, rowKey);
Expression<Func<T, string>> rowKey,
UpdateStrategy? updateStrategy = default) where T : class
=> Create<T>(storageAccount, typeof(T).Name, partitionKey, rowKey, updateStrategy);

/// <summary>
/// Creates an <see cref="ITableRepository{T}"/> for the given entity type
Expand All @@ -55,18 +59,23 @@ public static ITableRepository<T> Create<T>(
/// If not provided, the class will need a property annotated with <see cref="PartitionKeyAttribute"/>.</param>
/// <param name="rowKey">Optional function to retrieve the row key for a given entity.
/// If not provided, the class will need a property annotated with <see cref="RowKeyAttribute"/>.</param>
/// <param name="updateStrategy">Strategy to apply when updating an existing entity. Defaults to <see cref="UpdateStrategy.Replace"/>.</param>
/// <returns>The new <see cref="ITableRepository{T}"/>.</returns>
public static ITableRepository<T> Create<T>(
CloudStorageAccount storageAccount,
string? tableName = default,
Expression<Func<T, string>>? partitionKey = null,
Expression<Func<T, string>>? rowKey = null) where T : class
Expression<Func<T, string>>? rowKey = null,
UpdateStrategy? updateStrategy = default) where T : class
{
tableName ??= GetDefaultTableName<T>();
partitionKey ??= PartitionKeyAttribute.CreateAccessor<T>();
rowKey ??= RowKeyAttribute.CreateAccessor<T>();

return new TableRepository<T>(storageAccount, tableName, partitionKey, rowKey);
return new TableRepository<T>(storageAccount, tableName, partitionKey, rowKey)
{
UpdateStrategy = updateStrategy ?? UpdateStrategy.Replace,
};
}

/// <summary>
Expand Down
17 changes: 15 additions & 2 deletions src/TableStorage/TableRepository`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ protected internal TableRepository(CloudStorageAccount storageAccount, string ta
/// <inheritdoc />
public string TableName { get; }

/// <summary>
/// The strategy to use when updating an existing entity.
/// </summary>
public UpdateStrategy UpdateStrategy { get; set; } = UpdateStrategy.Replace;

/// <inheritdoc />
public IQueryable<T> CreateQuery() => new TableRepositoryQuery<T>(storageAccount, serializer, TableName, partitionKeyProperty, rowKeyProperty);

Expand Down Expand Up @@ -138,12 +143,20 @@ public async Task<T> PutAsync(T entity, CancellationToken cancellation = default

var table = await this.table.ConfigureAwait(false);
var values = properties
.ToDictionary(prop => prop.Name, prop => EntityProperty.CreateEntityPropertyFromObject(prop.GetValue(entity)));
.Select(prop => new { Name = prop.Name, Value = prop.GetValue(entity) })
.Where(pair => pair.Value != null)
.ToDictionary(pair => pair.Name, pair => EntityProperty.CreateEntityPropertyFromObject(pair.Value));

var result = await table.ExecuteAsync(TableOperation.InsertOrReplace(
var result = await table.ExecuteAsync(UpdateStrategy.CreateOperation(
new DynamicTableEntity(partitionKey, rowKey, "*", values)), cancellation)
.ConfigureAwait(false);

// For merging, we need to actually retrieve the entity again, since the previous operation
// will just return the same entity we persisted, we may have fewer properties/values than
// the ones in storage.
if (UpdateStrategy == UpdateStrategy.Merge)
return (await GetAsync(partitionKey, rowKey, cancellation))!;

return ToEntity((DynamicTableEntity)result.Result);
}

Expand Down
35 changes: 35 additions & 0 deletions src/TableStorage/TableStorageExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//<auto-generated/>
#nullable enable
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;

namespace Devlooped
{
/// <summary>
/// Various usability overloads.
/// </summary>
[EditorBrowsable(EditorBrowsableState.Never)]
static partial class TableStorageExtensions
{
/// <summary>
/// Queries the repository for items that match the given <paramref name="predicate"/>.
/// </summary>
/// <remarks>
/// Shortcut for <c>CreateQuery().Where(predicate)</c> and returning as <see cref="IAsyncEnumerable{T}"/>
/// for use with <c>await foreach</c> directly.
/// </remarks>
/// <example>
/// var books = TableRepository.Create&lt;Book&gt;();
/// await foreach (var book in books.QueryAsync(x => x.IsPublished))
/// {
/// Console.WriteLine(book.ISBN);
/// }
/// </example>
public static IAsyncEnumerable<T> EnumerateAsync<T>(this ITableRepository<T> repository, Expression<Func<T, bool>> predicate, CancellationToken cancellation = default) where T : class
=> (IAsyncEnumerable<T>)repository.CreateQuery().Where(predicate);
}
}
40 changes: 40 additions & 0 deletions src/TableStorage/UpdateStrategy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
//<auto-generated/>
#nullable enable
using Microsoft.Azure.Cosmos.Table;

namespace Devlooped
{
/// <summary>
/// Provides the strategy used when saving entities (using `PutAsync`) that
/// already exist in the table. Either <see cref="Merge"/> or <see cref="Replace"/>.
/// When not provided, <see cref="Replace"/> is used by default.
/// </summary>
abstract partial class UpdateStrategy
{
/// <summary>
/// When storing an entity that already exists in the table, merge with the
/// existing data, using <see cref="TableOperation.InsertOrMerge(ITableEntity)"/>.
/// </summary>
public static UpdateStrategy Merge { get; } = new MergeStrategy();

/// <summary>
/// When storing an entity that already exists in the table, replace the
/// existing data, using <see cref="TableOperation.InsertOrReplace(ITableEntity)"/>.
/// </summary>
public static UpdateStrategy Replace { get; } = new ReplaceStrategy();

protected internal abstract TableOperation CreateOperation(ITableEntity entity);

class MergeStrategy : UpdateStrategy
{
protected internal override TableOperation CreateOperation(ITableEntity entity)
=> TableOperation.InsertOrMerge(entity);
}

class ReplaceStrategy : UpdateStrategy
{
protected internal override TableOperation CreateOperation(ITableEntity entity)
=> TableOperation.InsertOrReplace(entity);
}
}
}
2 changes: 2 additions & 0 deletions src/TableStorage/Visibility.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public partial class AttributedDocumentRepository<T> { }
public partial class TablePartition { }
public partial class TablePartition<T> { }
public partial class DocumentPartition { }
public partial class UpdateStrategy { }

public partial class TableStorageExtensions { }
public partial class IAsyncEnumerableExtensions { }
public partial class IQueryableExtensions { }

Expand Down
25 changes: 25 additions & 0 deletions src/Tests/RepositoryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,30 @@ await CloudStorageAccount.DevelopmentStorageAccount.CreateCloudTableClient().Get
}
}

[Fact]
public async Task CanMergeEntity()
{
await CloudStorageAccount.DevelopmentStorageAccount.CreateCloudTableClient().GetTableReference(nameof(CanMergeEntity))
.DeleteIfExistsAsync();

var repo = TableRepository.Create<AttributedRecordEntity>(CloudStorageAccount.DevelopmentStorageAccount, nameof(CanMergeEntity), updateStrategy: UpdateStrategy.Merge);

await repo.PutAsync(new AttributedRecordEntity("Book", "1234") { Status = "OK" });
var record = await repo.PutAsync(new AttributedRecordEntity("Book", "1234") { Reason = "Done" });

Assert.Equal("OK", record.Status);

await CloudStorageAccount.DevelopmentStorageAccount.CreateCloudTableClient().GetTableReference(nameof(CanMergeEntity))
.DeleteIfExistsAsync();

var partition = TablePartition.Create<AttributedRecordEntity>(CloudStorageAccount.DevelopmentStorageAccount, nameof(CanMergeEntity), updateStrategy: UpdateStrategy.Merge);

await partition.PutAsync(new AttributedRecordEntity("Book", "1234") { Status = "OK" });
record = await partition.PutAsync(new AttributedRecordEntity("Book", "1234") { Reason = "Done" });

Assert.Equal("OK", record.Status);
}

class MyEntity
{
public MyEntity(string id) => Id = id;
Expand Down Expand Up @@ -288,6 +312,7 @@ record RecordEntity(string Kind, string ID)
record AttributedRecordEntity([PartitionKey] string Kind, [RowKey] string ID)
{
public string? Status { get; set; }
public string? Reason { get; set; }
}
}
}

0 comments on commit 4eae0f4

Please sign in to comment.