Skip to content

Commit

Permalink
Improved logic for ObserveLinesWithRetry
Browse files Browse the repository at this point in the history
  • Loading branch information
tintoy committed Jul 13, 2024
1 parent 8cda77d commit 11d9250
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions src/KubeClient/ResourceClients/KubeResourceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Reactive;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
Expand All @@ -21,7 +22,6 @@ namespace KubeClient.ResourceClients
using KubeClient.Models.ContractResolvers;
using Models;
using Models.Converters;
using System.Reactive;

/// <summary>
/// The base class for Kubernetes resource API clients.
Expand Down Expand Up @@ -747,27 +747,29 @@ protected IObservable<string> ObserveLinesWithRetry(string operationDescription,
HttpRequest currentRequest = requestFactory();

return ObserveLines(requestFactory, operationDescription, bufferSize)
.RetryWhen(exceptions => Observable.Create((IObserver<Unit> retrySignal, CancellationToken subscriptionCancellation) =>
.RetryWhen(exceptions => Observable.Create((IObserver<Unit> retrySignal) =>
{
exceptions.Subscribe(
onNext: exception =>
return exceptions.Subscribe(
onNext: (Exception sourceError) =>
{
if (!subscriptionCancellation.IsCancellationRequested && shouldRetry(exception))
if (shouldRetry(sourceError))
retrySignal.OnNext(Unit.Default); // Retry (this will seamlessly continue the sequence).
else
retrySignal.OnError(exception); // Bubble up (this will terminate the sequence with an error).
retrySignal.OnError(sourceError); // Bubble up (this will terminate the sequence with an error).
},
onError: (Exception exceptionSourceError) =>
{
// Under normal circumstances this should not be called, and so we never retry from here.
retrySignal.OnError(exceptionSourceError); // Bubble up (this will terminate the sequence with an error).
},
onCompleted: () =>
{
if (!subscriptionCancellation.IsCancellationRequested && shouldRetry(null))
if (shouldRetry(null))
retrySignal.OnNext(Unit.Default); // Retry (this will seamlessly continue the sequence).
else
retrySignal.OnCompleted(); // Bubble up (this will terminate the sequence).
},
subscriptionCancellation // Automatically propagate termination of subscription.
}
);
return Task.CompletedTask;
}));
}

Expand Down

0 comments on commit 11d9250

Please sign in to comment.