Skip to content

Commit

Permalink
ConfigurationOptions: shuffle in prep for no-clone (#2049)
Browse files Browse the repository at this point in the history
Cleaning up a bit before moving to no-clone for adjustable configuration. No functionality changes, just moving and ordering lots of cheese to make things easier, same with NRT diffs later.
  • Loading branch information
NickCraver authored Mar 19, 2022
1 parent cbd6d7e commit 9fb222e
Show file tree
Hide file tree
Showing 20 changed files with 1,572 additions and 1,585 deletions.
165 changes: 57 additions & 108 deletions src/StackExchange.Redis/ConfigurationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public Version DefaultVersion
/// <summary>
/// The endpoints defined for this configuration.
/// </summary>
public EndPointCollection EndPoints { get; } = new EndPointCollection();
public EndPointCollection EndPoints { get; init; } = new EndPointCollection();

/// <summary>
/// Use ThreadPriority.AboveNormal for SocketManager reader and writer threads (true by default).
Expand Down Expand Up @@ -529,50 +529,43 @@ public static ConfigurationOptions Parse(string configuration, bool ignoreUnknow
/// <summary>
/// Create a copy of the configuration.
/// </summary>
public ConfigurationOptions Clone()
{
var options = new ConfigurationOptions
{
defaultOptions = defaultOptions,
ClientName = ClientName,
ServiceName = ServiceName,
keepAlive = keepAlive,
syncTimeout = syncTimeout,
asyncTimeout = asyncTimeout,
allowAdmin = allowAdmin,
defaultVersion = defaultVersion,
connectTimeout = connectTimeout,
User = User,
Password = Password,
tieBreaker = tieBreaker,
ssl = ssl,
sslHost = sslHost,
highPrioritySocketThreads = highPrioritySocketThreads,
configChannel = configChannel,
abortOnConnectFail = abortOnConnectFail,
resolveDns = resolveDns,
proxy = proxy,
commandMap = commandMap,
CertificateValidationCallback = CertificateValidationCallback,
CertificateSelectionCallback = CertificateSelectionCallback,
ChannelPrefix = ChannelPrefix.Clone(),
SocketManager = SocketManager,
connectRetry = connectRetry,
configCheckSeconds = configCheckSeconds,
responseTimeout = responseTimeout,
DefaultDatabase = DefaultDatabase,
ReconnectRetryPolicy = reconnectRetryPolicy,
BacklogPolicy = backlogPolicy,
SslProtocols = SslProtocols,
checkCertificateRevocation = checkCertificateRevocation,
BeforeSocketConnect = BeforeSocketConnect,
};
foreach (var item in EndPoints)
{
options.EndPoints.Add(item);
}
return options;
}
public ConfigurationOptions Clone() => new ConfigurationOptions
{
defaultOptions = defaultOptions,
ClientName = ClientName,
ServiceName = ServiceName,
keepAlive = keepAlive,
syncTimeout = syncTimeout,
asyncTimeout = asyncTimeout,
allowAdmin = allowAdmin,
defaultVersion = defaultVersion,
connectTimeout = connectTimeout,
User = User,
Password = Password,
tieBreaker = tieBreaker,
ssl = ssl,
sslHost = sslHost,
highPrioritySocketThreads = highPrioritySocketThreads,
configChannel = configChannel,
abortOnConnectFail = abortOnConnectFail,
resolveDns = resolveDns,
proxy = proxy,
commandMap = commandMap,
CertificateValidationCallback = CertificateValidationCallback,
CertificateSelectionCallback = CertificateSelectionCallback,
ChannelPrefix = ChannelPrefix.Clone(),
SocketManager = SocketManager,
connectRetry = connectRetry,
configCheckSeconds = configCheckSeconds,
responseTimeout = responseTimeout,
DefaultDatabase = DefaultDatabase,
ReconnectRetryPolicy = reconnectRetryPolicy,
BacklogPolicy = backlogPolicy,
SslProtocols = SslProtocols,
checkCertificateRevocation = checkCertificateRevocation,
BeforeSocketConnect = BeforeSocketConnect,
EndPoints = new EndPointCollection(EndPoints),
};

/// <summary>
/// Apply settings to configure this instance of <see cref="ConfigurationOptions"/>, e.g. for a specific scenario.
Expand All @@ -585,23 +578,30 @@ public ConfigurationOptions Apply(Action<ConfigurationOptions> configure)
return this;
}

internal ConfigurationOptions WithDefaults(bool sentinel = false)
{
if (sentinel)
{
// this is required when connecting to sentinel servers
TieBreaker = "";
CommandMap = CommandMap.Sentinel;

// use default sentinel port
EndPoints.SetDefaultPorts(26379);
}
else
{
SetDefaultPorts();
}
return this;
}

/// <summary>
/// Resolve the default port for any endpoints that did not have a port explicitly specified.
/// </summary>
public void SetDefaultPorts() => EndPoints.SetDefaultPorts(Ssl ? 6380 : 6379);

/// <summary>
/// Sets default config settings required for sentinel usage.
/// </summary>
internal void SetSentinelDefaults()
{
// this is required when connecting to sentinel servers
TieBreaker = "";
CommandMap = CommandMap.Sentinel;

// use default sentinel port
EndPoints.SetDefaultPorts(26379);
}
internal bool IsSentinel => !string.IsNullOrEmpty(ServiceName);

/// <summary>
/// Returns the effective configuration string for this configuration, including Redis credentials.
Expand Down Expand Up @@ -652,57 +652,6 @@ public string ToString(bool includePassword)
return sb.ToString();
}

internal bool HasDnsEndPoints()
{
foreach (var endpoint in EndPoints)
{
if (endpoint is DnsEndPoint)
{
return true;
}
}
return false;
}

internal async Task ResolveEndPointsAsync(ConnectionMultiplexer multiplexer, LogProxy log)
{
var cache = new Dictionary<string, IPAddress>(StringComparer.OrdinalIgnoreCase);
for (int i = 0; i < EndPoints.Count; i++)
{
if (EndPoints[i] is DnsEndPoint dns)
{
try
{
if (dns.Host == ".")
{
EndPoints[i] = new IPEndPoint(IPAddress.Loopback, dns.Port);
}
else if (cache.TryGetValue(dns.Host, out IPAddress ip))
{ // use cache
EndPoints[i] = new IPEndPoint(ip, dns.Port);
}
else
{
log?.WriteLine($"Using DNS to resolve '{dns.Host}'...");
var ips = await Dns.GetHostAddressesAsync(dns.Host).ObserveErrors().ForAwait();
if (ips.Length == 1)
{
ip = ips[0];
log?.WriteLine($"'{dns.Host}' => {ip}");
cache[dns.Host] = ip;
EndPoints[i] = new IPEndPoint(ip, dns.Port);
}
}
}
catch (Exception ex)
{
multiplexer.OnInternalError(ex);
log?.WriteLine(ex.Message);
}
}
}
}

private static void Append(StringBuilder sb, object value)
{
if (value == null) return;
Expand Down
19 changes: 19 additions & 0 deletions src/StackExchange.Redis/ConnectionMultiplexer.Compat.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Threading.Tasks;

namespace StackExchange.Redis;

public partial class ConnectionMultiplexer
{
/// <summary>
/// No longer used.
/// </summary>
[Obsolete("No longer used, will be removed in 3.0.")]
public static TaskFactory Factory { get => Task.Factory; set { } }

/// <summary>
/// Gets or sets whether asynchronous operations should be invoked in a way that guarantees their original delivery order.
/// </summary>
[Obsolete("Not supported; if you require ordered pub/sub, please see " + nameof(ChannelMessageQueue) + ", will be removed in 3.0", false)]
public bool PreserveAsyncOrder { get => false; set { } }
}
39 changes: 39 additions & 0 deletions src/StackExchange.Redis/ConnectionMultiplexer.Debug.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Threading;

namespace StackExchange.Redis;

public partial class ConnectionMultiplexer
{
private static int _collectedWithoutDispose;
internal static int CollectedWithoutDispose => Thread.VolatileRead(ref _collectedWithoutDispose);

/// <summary>
/// Invoked by the garbage collector.
/// </summary>
~ConnectionMultiplexer()
{
Interlocked.Increment(ref _collectedWithoutDispose);
}

bool IInternalConnectionMultiplexer.AllowConnect
{
get => AllowConnect;
set => AllowConnect = value;
}

bool IInternalConnectionMultiplexer.IgnoreConnect
{
get => IgnoreConnect;
set => IgnoreConnect = value;
}

/// <summary>
/// For debugging: when not enabled, servers cannot connect.
/// </summary>
internal volatile bool AllowConnect = true;

/// <summary>
/// For debugging: when not enabled, end-connect is silently ignored (to simulate a long-running connect).
/// </summary>
internal volatile bool IgnoreConnect;
}
120 changes: 120 additions & 0 deletions src/StackExchange.Redis/ConnectionMultiplexer.Events.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
using System;
using System.Net;
using System.Runtime.CompilerServices;
using StackExchange.Redis.Maintenance;

namespace StackExchange.Redis;

public partial class ConnectionMultiplexer
{
/// <summary>
/// Raised whenever a physical connection fails.
/// </summary>
public event EventHandler<ConnectionFailedEventArgs> ConnectionFailed;
internal void OnConnectionFailed(EndPoint endpoint, ConnectionType connectionType, ConnectionFailureType failureType, Exception exception, bool reconfigure, string physicalName)
{
if (_isDisposed) return;
var handler = ConnectionFailed;
if (handler != null)
{
CompleteAsWorker(new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, failureType, exception, physicalName));
}
if (reconfigure)
{
ReconfigureIfNeeded(endpoint, false, "connection failed");
}
}

/// <summary>
/// Raised whenever an internal error occurs (this is primarily for debugging).
/// </summary>
public event EventHandler<InternalErrorEventArgs> InternalError;
internal void OnInternalError(Exception exception, EndPoint endpoint = null, ConnectionType connectionType = ConnectionType.None, [CallerMemberName] string origin = null)
{
try
{
if (_isDisposed) return;
Trace("Internal error: " + origin + ", " + exception == null ? "unknown" : exception.Message);
var handler = InternalError;
if (handler != null)
{
CompleteAsWorker(new InternalErrorEventArgs(handler, this, endpoint, connectionType, exception, origin));
}
}
catch
{
// Our internal error event failed...whatcha gonna do, exactly?
}
}

/// <summary>
/// Raised whenever a physical connection is established.
/// </summary>
public event EventHandler<ConnectionFailedEventArgs> ConnectionRestored;
internal void OnConnectionRestored(EndPoint endpoint, ConnectionType connectionType, string physicalName)
{
if (_isDisposed) return;
var handler = ConnectionRestored;
if (handler != null)
{
CompleteAsWorker(new ConnectionFailedEventArgs(handler, this, endpoint, connectionType, ConnectionFailureType.None, null, physicalName));
}
ReconfigureIfNeeded(endpoint, false, "connection restored");
}

/// <summary>
/// Raised when configuration changes are detected.
/// </summary>
public event EventHandler<EndPointEventArgs> ConfigurationChanged;
internal void OnConfigurationChanged(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChanged);

/// <summary>
/// Raised when nodes are explicitly requested to reconfigure via broadcast.
/// This usually means primary/replica changes.
/// </summary>
public event EventHandler<EndPointEventArgs> ConfigurationChangedBroadcast;
internal void OnConfigurationChangedBroadcast(EndPoint endpoint) => OnEndpointChanged(endpoint, ConfigurationChangedBroadcast);

private void OnEndpointChanged(EndPoint endpoint, EventHandler<EndPointEventArgs> handler)
{
if (_isDisposed) return;
if (handler != null)
{
CompleteAsWorker(new EndPointEventArgs(handler, this, endpoint));
}
}

/// <summary>
/// Raised when server indicates a maintenance event is going to happen.
/// </summary>
public event EventHandler<ServerMaintenanceEvent> ServerMaintenanceEvent;
internal void OnServerMaintenanceEvent(ServerMaintenanceEvent e) =>
ServerMaintenanceEvent?.Invoke(this, e);

/// <summary>
/// Raised when a hash-slot has been relocated.
/// </summary>
public event EventHandler<HashSlotMovedEventArgs> HashSlotMoved;
internal void OnHashSlotMoved(int hashSlot, EndPoint old, EndPoint @new)
{
var handler = HashSlotMoved;
if (handler != null)
{
CompleteAsWorker(new HashSlotMovedEventArgs(handler, this, hashSlot, old, @new));
}
}

/// <summary>
/// Raised when a server replied with an error message.
/// </summary>
public event EventHandler<RedisErrorEventArgs> ErrorMessage;
internal void OnErrorMessage(EndPoint endpoint, string message)
{
if (_isDisposed) return;
var handler = ErrorMessage;
if (handler != null)
{
CompleteAsWorker(new RedisErrorEventArgs(handler, this, endpoint, message));
}
}
}
Loading

0 comments on commit 9fb222e

Please sign in to comment.