Skip to content

Commit

Permalink
Trigger store update only on change. (#3389)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelstaib authored Mar 29, 2021
1 parent 07f4254 commit 601c5b6
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 31 deletions.
103 changes: 78 additions & 25 deletions src/StrawberryShake/Client/src/Core/OperationExecutor.Observable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ public OperationExecutorObservable(
public IDisposable Subscribe(
IObserver<IOperationResult<TResult>> observer)
{
if (_strategy == ExecutionStrategy.NetworkOnly ||
_request.Document.Kind == OperationKind.Subscription)
{
var observerSession = new ObserverSession();
BeginExecute(observer, observerSession);
return observerSession;
}

var hasResultInStore = false;

if ((_strategy == ExecutionStrategy.CacheFirst ||
Expand All @@ -41,71 +49,116 @@ public IDisposable Subscribe(
observer.OnNext(result);
}

IDisposable session =
_operationStore.Watch<TResult>(_request).Subscribe(observer);
IDisposable session = _operationStore.Watch<TResult>(_request).Subscribe(observer);

if (_request.Document.Kind == OperationKind.Subscription ||
_strategy != ExecutionStrategy.CacheFirst ||
if (_strategy != ExecutionStrategy.CacheFirst ||
!hasResultInStore)
{
var requestSession = new RequestSession();
BeginExecute(requestSession);
return new ObserverSession(session, requestSession);
var observerSession = new ObserverSession();
observerSession.SetStoreSession(session);
BeginExecute(observer, observerSession);
return observerSession;
}

return session;
}

private void BeginExecute(RequestSession session) =>
Task.Run(() => ExecuteAsync(session));
private void BeginExecute(
IObserver<IOperationResult<TResult>> observer,
ObserverSession session) =>
Task.Run(() => ExecuteAsync(observer, session));

private async Task ExecuteAsync(RequestSession session)
private async Task ExecuteAsync(
IObserver<IOperationResult<TResult>> observer,
ObserverSession session)
{
try
{
CancellationToken token = session.RequestSession.Token;
IOperationResultBuilder<TData, TResult> resultBuilder = _resultBuilder();

await foreach (var response in
_connection.ExecuteAsync(_request, session.Token).ConfigureAwait(false))
_connection.ExecuteAsync(_request, token).ConfigureAwait(false))
{
if (session.Token.IsCancellationRequested)
if (token.IsCancellationRequested)
{
return;
}

_operationStore.Set(_request, resultBuilder.Build(response));
IOperationResult<TResult> result = resultBuilder.Build(response);
_operationStore.Set(_request, result);

if (_request.Document.Kind == OperationKind.Subscription)
if (!session.HasStoreSession)
{
_operationStore.Reset(_request);
observer.OnNext(result);

IDisposable storeSession =
_operationStore
.Watch<TResult>(_request)
.Subscribe(observer);

try
{
session.SetStoreSession(storeSession);
}
catch (ObjectDisposedException)
{
storeSession.Dispose();
throw;
}
}
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
finally
{
session.Dispose();
// after all the transport logic is finished we will dispose
// the request session.
session.RequestSession.Dispose();
}
}

private class ObserverSession : IDisposable
{
private readonly IDisposable _storeSession;
private readonly RequestSession _requestSession;
private readonly object _sync = new();
private IDisposable? _storeSession;
private bool _disposed;

public ObserverSession(IDisposable storeSession, RequestSession requestSession)
public ObserverSession()
{
RequestSession = new RequestSession();
}

public RequestSession RequestSession { get; }

public bool HasStoreSession => _storeSession is not null;

public void SetStoreSession(IDisposable storeSession)
{
_storeSession = storeSession;
_requestSession = requestSession;
lock (_sync)
{
if (_disposed)
{
throw new ObjectDisposedException(typeof(ObserverSession).FullName);
}

_storeSession = storeSession;
}
}

public void Dispose()
{
if (!_disposed)
lock (_sync)
{
_requestSession.Dispose();
_storeSession.Dispose();
_disposed = true;
if (!_disposed)
{
RequestSession.Dispose();
_storeSession?.Dispose();
_disposed = true;
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/StrawberryShake/Client/src/Core/OperationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@ namespace StrawberryShake
/// <summary>
/// The operation executor handles the execution of a specific operation.
/// </summary>
/// <typeparam name="TResultData">
/// <typeparam name="TData">
/// The result data type of this operation executor.
/// </typeparam>
/// <typeparam name="TResult">
/// The runtime result
/// </typeparam>
public partial class OperationExecutor<TData, TResult>
: IOperationExecutor<TResult>
where TResult : class
where TData : class
where TResult : class

{
private readonly IConnection<TData> _connection;
private readonly Func<IOperationResultBuilder<TData, TResult>> _resultBuilder;
Expand Down
8 changes: 4 additions & 4 deletions src/StrawberryShake/Client/src/Core/StoredOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ public void SetResult(
throw new ArgumentNullException(nameof(result));
}

var updated = LastResult is null ||
!ReferenceEquals(result.Data, LastResult?.Data) ||
(result.Data is not null && !result.Data.Equals(LastResult?.Data));
var updated = LastResult is null or { Data: null } ||
result.Data is null ||
!result.Data.Equals(LastResult?.Data);
LastResult = result;
LastModified = DateTime.UtcNow;

// capture current subscriber list
ImmutableList<Subscription> observers = _subscriptions;

if (!updated && observers.IsEmpty)
if (!updated || observers.IsEmpty)
{
// if there are now subscribers we will just return and waste no time.
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using StrawberryShake.CodeGeneration.CSharp.Integration.StarWarsGetHero.State;
using StrawberryShake.Extensions;
using StrawberryShake.Transport.WebSockets;
using StrawberryShake.Persistence.SQLite;
using Xunit;
Expand Down Expand Up @@ -309,5 +310,80 @@ public async Task Watch_Interact_With_Persistence()
}
}
}

[Fact]
public async Task Update_Once()
{
// arrange
CancellationToken ct = new CancellationTokenSource(20_000).Token;
using IWebHost host = TestServerHelper.CreateServer(
_ => { },
out var port);
var serviceCollection = new ServiceCollection();

serviceCollection
.AddStarWarsGetHeroClient()
.ConfigureHttpClient(
c => c.BaseAddress = new Uri("http://localhost:" + port + "/graphql"))
.ConfigureWebSocketClient(
c => c.Uri = new Uri("ws://localhost:" + port + "/graphql"));

IServiceProvider services = serviceCollection.BuildServiceProvider();
StarWarsGetHeroClient client = services.GetRequiredService<StarWarsGetHeroClient>();

await client.GetHero.ExecuteAsync(ct);

// act
var count = 0;
using IDisposable session =
client.GetHero
.Watch(ExecutionStrategy.CacheAndNetwork)
.Subscribe(_ =>
{
count++;
});

await Task.Delay(1000, ct);

// assert
Assert.Equal(1, count);
}

[Fact]
public async Task Update_Once_With_Cache_And_Network()
{
// arrange
CancellationToken ct = new CancellationTokenSource(20_000).Token;
using IWebHost host = TestServerHelper.CreateServer(
_ => { },
out var port);
var serviceCollection = new ServiceCollection();

serviceCollection
.AddStarWarsGetHeroClient()
.ConfigureHttpClient(
c => c.BaseAddress = new Uri("http://localhost:" + port + "/graphql"))
.ConfigureWebSocketClient(
c => c.Uri = new Uri("ws://localhost:" + port + "/graphql"));

IServiceProvider services = serviceCollection.BuildServiceProvider();
StarWarsGetHeroClient client = services.GetRequiredService<StarWarsGetHeroClient>();

// act
var count = 0;
using IDisposable session =
client.GetHero
.Watch(ExecutionStrategy.CacheAndNetwork)
.Subscribe(_ =>
{
count++;
});

await Task.Delay(1000, ct);

// assert
Assert.Equal(1, count);
}

}
}

0 comments on commit 601c5b6

Please sign in to comment.