Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RetryPolicy to keep SignalR connection alive and replace List<RealtimeData> with ImmutableList<RealtimeData> #24

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions server/PathServices/RetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Microsoft.AspNetCore.SignalR.Client;
using Serilog;
using System;

namespace PathApi.Server.PathServices
{
internal sealed class RetryPolicy : IRetryPolicy
{
public TimeSpan? NextRetryDelay(RetryContext retryContext)
{
Log.Logger.Here().Warning("SignalR connection retrying because of {retryReason}, total retry count {previousRetryCount}", retryContext.RetryReason, retryContext.PreviousRetryCount);
return TimeSpan.FromTicks(new Random().Next(1, 4) * Math.Min(retryContext.PreviousRetryCount + 1, 5) * (long)10e6);
This conversation was marked as resolved.
Show resolved Hide resolved
}
}
}
97 changes: 77 additions & 20 deletions server/PathServices/SignalRRealtimeDataRepository.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace PathApi.Server.PathServices
namespace PathApi.Server.PathServices
{
using Microsoft.AspNetCore.SignalR.Client;
using Newtonsoft.Json;
Expand All @@ -8,6 +8,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;

Expand All @@ -16,10 +17,11 @@
/// </summary>
internal sealed class SignalRRealtimeDataRepository : IRealtimeDataRepository, IDisposable
{
private readonly TimeSpan KEEP_ALIVE_INTERVAL = TimeSpan.FromTicks(5 * (long)10e6); // 5s
This conversation was marked as resolved.
Show resolved Hide resolved
private readonly IPathDataRepository pathDataRepository;
private readonly IPathApiClient pathApiClient;
private readonly ConcurrentDictionary<(Station, RouteDirection), HubConnection> hubConnections;
private readonly ConcurrentDictionary<(Station, RouteDirection), List<RealtimeData>> realtimeData;
private readonly ConcurrentDictionary<(Station, RouteDirection), ImmutableList<RealtimeData>> realtimeData;

/// <summary>
/// Constructs a new instance of the <see cref="SignalRRealtimeDataRepository"/>.
Expand All @@ -29,7 +31,7 @@
this.pathDataRepository = pathDataRepository;
this.pathApiClient = pathApiClient;
this.hubConnections = new ConcurrentDictionary<(Station, RouteDirection), HubConnection>();
this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), List<RealtimeData>>();
this.realtimeData = new ConcurrentDictionary<(Station, RouteDirection), ImmutableList<RealtimeData>>();

this.pathDataRepository.OnDataUpdate += this.PathSqlDbUpdated;
}
Expand All @@ -41,12 +43,36 @@
/// <returns>A collection of arriving trains.</returns>
public Task<IEnumerable<RealtimeData>> GetRealtimeData(Station station)
{
return Task.FromResult(this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ)).Where(data => data.DataExpiration > DateTime.UtcNow));
var allData = this.GetRealtimeData(station, RouteDirection.ToNY).Union(this.GetRealtimeData(station, RouteDirection.ToNJ));
var freshData = allData.Where(dataPoint => dataPoint.DataExpiration > DateTime.UtcNow);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Worried that this condition will always be met when the headways are really large... it may be fine. Need to double check.

if (allData.Count() != freshData.Count())
{
var staledData = allData.Except(freshData);
foreach (var staledDataPoint in staledData)
Log.Logger.Here().Warning("Staled data detected for S:{station} R:{route} with timestamp {updatedDataLastUpdated}, force reconnect maybe needed", station, staledDataPoint.Route.DisplayName, staledDataPoint.LastUpdated);

Log.Logger.Here().Information("Recreating SignalR hubs following staled data detection...");
Task.Run(this.CreateHubConnections).Wait();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make this method async you could await CreateHubConnections and just return freshData (rather than needing to wrap it in a task).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once I deal with the concurrency issue of CreateHubConnections I'll avoid calling .Wait() so this function can return immediately.
Currently there are 2 potential issues:

                    if (!this.hubConnections.ContainsKey((station, direction)))

This check is not reliable as without lock another thread can substitute the connection with a new one and the condition will still return true.

        private async Task CloseExistingHubConnections()
        {
            // Materialize the connections so we can clear the dictionary before disconnecting.
            // Otherwise, we will reconnect before reinitializing the connection (potentially
            // causing a loop if the token changes).
            var connections = this.hubConnections.Values.ToArray();
            this.hubConnections.Clear();

            await Task.WhenAll(connections.Select(async (client) => await client.DisposeAsync()));
        }

Another thread might be able to run between .ToArray() and .Clear() such that it will try to dispose the connections again.

The goal is to make sure once an HubConnection instance is retrieved from dictionary it remain exclusive to the thread until being put back(or not), maybe TryRemove will do.

Once refactor is done we should be able to reconnect a single HubConnection for more fine-grained control, if staled data is detected for the (Station, RouteDirection) pair.

}
return Task.FromResult(freshData);
}

private IEnumerable<RealtimeData> GetRealtimeData(Station station, RouteDirection direction)
{
return this.realtimeData.GetValueOrDefault((station, direction), new List<RealtimeData>());
Log.Logger.Here().Debug("Getting realtime data for {station}-{direction}...", station, direction);
var startTimestamp = DateTime.UtcNow;
This conversation was marked as resolved.
Show resolved Hide resolved
var emptyRealtimeData = ImmutableList.Create<RealtimeData>();
var realtimeDataResult = this.realtimeData.GetValueOrDefault((station, direction), emptyRealtimeData);
var endTimestamp = DateTime.UtcNow;
if (realtimeDataResult.Count() != 0)
{
Log.Logger.Here().Debug("Got {count} realtime dataPoint(s) for {station}-{direction}", realtimeDataResult.Count(), station, direction);
} else
{
Log.Logger.Here().Information("Got no realtime dataPoint for {station}-{direction}, this might indicate a problem either on the server or the client side", station, direction);
}
Log.Logger.Here().Information("Get realtime data for {station}-{direction} took {timespan:G}", station, direction, endTimestamp - startTimestamp);
return realtimeDataResult;
}

private void PathSqlDbUpdated(object sender, EventArgs args)
Expand All @@ -66,7 +92,7 @@
RouteDirectionMappings.RouteDirectionToDirectionKey.Select(direction => this.CreateHubConnection(tokenBrokerUrl, tokenValue, station.Key, direction.Key))));
}

private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction, int sequentialFailures = 0)
private async Task CreateHubConnection(string tokenBrokerUrl, string tokenValue, Station station, RouteDirection direction)
{
SignalRToken token;

Expand All @@ -77,21 +103,18 @@

var connection = new HubConnectionBuilder()
.WithUrl(token.Url, c => c.AccessTokenProvider = () => Task.FromResult(token.AccessToken))
.WithAutomaticReconnect(new RetryPolicy())
.Build();

connection.KeepAliveInterval = this.KEEP_ALIVE_INTERVAL;

connection.On<string, string>("SendMessage", (_, json) =>
this.ProcessNewMessage(station, direction, json)
.ConfigureAwait(false)
.GetAwaiter()
.GetResult());

async Task RetryConnection()
{
await Task.Delay(new Random().Next(1, 7) * (1000 * Math.Min(sequentialFailures + 1, 5)));
await this.CreateHubConnection(tokenBrokerUrl, tokenValue, station, direction, sequentialFailures + 1);
};

connection.Closed += async (e) =>

Check warning on line 117 in server/PathServices/SignalRRealtimeDataRepository.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 117 in server/PathServices/SignalRRealtimeDataRepository.cs

View workflow job for this annotation

GitHub Actions / build

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
if (!this.hubConnections.ContainsKey((station, direction)))
{
Expand All @@ -103,8 +126,7 @@
Log.Logger.Here().Warning(e, "SignalR connection was closed as a result of an exception");
}

Log.Logger.Here().Information("Recovering SignalR connection to {station}-{direction}...", station, direction);
await RetryConnection();
// Log.Logger.Here().Information("Recovering SignalR connection to {station}-{direction}...", station, direction);
};

try
Expand All @@ -114,10 +136,13 @@
catch (Exception ex)
{
Log.Logger.Here().Warning(ex, "SignalR connection failed to start for {station}-{direction}...", station, direction);
await RetryConnection();
}

this.hubConnections.AddOrUpdate((station, direction), connection, (_, __) => connection);
this.hubConnections.AddOrUpdate((station, direction), connection, (key, existingConnection) =>
{

return connection;
});
}
catch (Exception ex)
{
Expand All @@ -137,8 +162,9 @@
}
catch (Exception) { /* Ignore. */ }

Log.Logger.Here().Debug("SignalR Hub ProcessNewMessage for {station}-{direction}...", station, direction);

List<RealtimeData> newData = (await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage =>
var newImmtubaleData = ImmutableList.Create((await Task.WhenAll(messageBody.Messages.Select(async realtimeMessage =>
{
var realtimeData = new RealtimeData()
{
Expand All @@ -161,8 +187,39 @@
}
realtimeData.Route = route;
return realtimeData;
}))).ToList();
this.realtimeData.AddOrUpdate((station, direction), newData, (ignored, oldData) => newData[0].LastUpdated > oldData[0].LastUpdated ? newData : oldData);
}))).ToArray());

this.realtimeData.AddOrUpdate((station, direction), newImmtubaleData, (key, oldImmutableData) => {
var latestNewDataPointLastUpdated = DateTimeOffset.FromUnixTimeSeconds(0).DateTime; // 1970 epoch
foreach (var newDataPoint in newImmtubaleData) {
if (newDataPoint.LastUpdated > latestNewDataPointLastUpdated)
{
latestNewDataPointLastUpdated = newDataPoint.LastUpdated;
}
if (newDataPoint.DataExpiration <= DateTime.UtcNow)
{
Log.Logger.Here().Warning("Staled dataPoint received for S:{station} D:{direction} with timestamp {lastUpdated} expires at {expiration}", station, direction, newDataPoint.LastUpdated, newDataPoint.DataExpiration);
}
}

var updatedImmutableData = newImmtubaleData;
var oldDataNewerThanNewDataLastUpdatedCount = oldImmutableData.Where(oldDataPoint => oldDataPoint.LastUpdated > latestNewDataPointLastUpdated).Count();
if (oldDataNewerThanNewDataLastUpdatedCount > 0)
{
Log.Logger.Here().Warning("{count} dataPoint(s) in oldData are newer than newData for S:{station} D:{direction}, keeping oldData instead", oldDataNewerThanNewDataLastUpdatedCount, station, direction);
updatedImmutableData = oldImmutableData;
}
var filteredUpdatedImmutableData = ImmutableList.Create(updatedImmutableData.Where(updatedDataPoint => updatedDataPoint.DataExpiration > DateTime.UtcNow).ToArray());
if (filteredUpdatedImmutableData.Count() != updatedImmutableData.Count())
{
Log.Logger.Here().Warning("{count} dataPoint(s) in updatedData are removed for S:{station} D:{direction} as they are expired", updatedImmutableData.Count() - filteredUpdatedImmutableData.Count(), station, direction);
} else
{
// return existing data will improve performance
filteredUpdatedImmutableData = updatedImmutableData;
}
return filteredUpdatedImmutableData;
});
}
catch (Exception ex)
{
Expand Down Expand Up @@ -233,4 +290,4 @@
public DateTime DepartureTime { get; set; }
}
}
}
}
2 changes: 1 addition & 1 deletion server/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ static void Main(string[] args)
{
// Setup Logging
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Information()
.MinimumLevel.Debug()
.Enrich.FromLogContext()
.WriteTo.Console(outputTemplate: "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] ({FilePath}.{MemberName}:{LineNumber}) {Message}{NewLine}{Exception}")
.CreateLogger();
Expand Down
Loading