Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate wire protocol from internal models #6206

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,8 @@ Target "Protobuf" <| fun _ ->
("Persistence.proto", "/src/core/Akka.Persistence/Serialization/Proto/");
("StreamRefMessages.proto", "/src/core/Akka.Streams/Serialization/Proto/");
("ReplicatorMessages.proto", "/src/contrib/cluster/Akka.DistributedData/Serialization/Proto/");
("ReplicatedDataMessages.proto", "/src/contrib/cluster/Akka.DistributedData/Serialization/Proto/"); ]
("ReplicatedDataMessages.proto", "/src/contrib/cluster/Akka.DistributedData/Serialization/Proto/")
("ClusterMetricsMessages.proto", "/src/contrib/cluster/Akka.Cluster.Metrics/Serialization/Proto/") ]

printfn "Using proto.exe: %s" protocPath

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Option<WeightedRoutees> UpdateWeightedRoutees()
///
/// The supervision strategy of the router actor can be configured with
/// [[#withSupervisorStrategy]]. If no strategy is provided, routers default to
/// a strategy of “always escalate”. This means that errors are passed up to the
/// a strategy of [[always escalate]]. This means that errors are passed up to the
/// router's supervisor for handling.
///
/// The router's supervisor will treat the error as an error with the router itself.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ public interface IClusterMetricMessage { }
/// Envelope adding a sender address to the cluster metrics gossip.
/// </summary>
[InternalApi]
public sealed partial class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
public sealed class MetricsGossipEnvelope : IClusterMetricMessage, IDeadLetterSuppression
{
/// <summary>
/// Akka's actor address
/// </summary>
public Actor.Address FromAddress { get; }
public MetricsGossip Gossip { get; }
public bool Reply { get; }

/// <summary>
/// Creates new instance of <see cref="MetricsGossipEnvelope"/>
Expand Down

Large diffs are not rendered by default.

33 changes: 28 additions & 5 deletions src/contrib/cluster/Akka.Cluster.Metrics/Serialization/EWMA.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ public static partial class Types
/// the sampled value resulting from the previous smoothing iteration.
/// This value is always used as the previous EWMA to calculate the new EWMA.
/// </summary>
public sealed partial class EWMA
public sealed class EWMA : IEquatable<EWMA>
{
public double Value { get; }
public double Alpha { get; }

/// <summary>
/// Creates new instance of <see cref="EWMA"/>
/// </summary>
Expand All @@ -47,10 +50,10 @@ public sealed partial class EWMA
public EWMA(double value, double alpha)
{
if (alpha < 0 || alpha > 1)
throw new ArgumentException(nameof(alpha), "alpha must be between 0.0 and 1.0");
throw new ArgumentException("alpha must be between 0.0 and 1.0", nameof(alpha));

value_ = value;
alpha_ = alpha;
Value = value;
Alpha = alpha;
}

/// <summary>
Expand Down Expand Up @@ -83,11 +86,31 @@ public static double GetAlpha(TimeSpan halfLife, TimeSpan collectInterval)

var halfLifeMillis = halfLife.TotalMilliseconds;
if (halfLifeMillis <= 0)
throw new ArgumentException(nameof(halfLife), "halfLife must be > 0 s");
throw new ArgumentException("halfLife must be > 0 s", nameof(halfLife));

var decayRate = logOf2 / halfLifeMillis;
return 1 - Math.Exp(-decayRate * collectInterval.TotalMilliseconds);
}

public bool Equals(EWMA other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Value.Equals(other.Value) && Alpha.Equals(other.Alpha);
}

public override bool Equals(object obj)
{
return obj is EWMA other && Equals(other);
}

public override int GetHashCode()
{
unchecked
{
return (Value.GetHashCode() * 397) ^ Alpha.GetHashCode();
}
}
}
}
}
Expand Down
15 changes: 4 additions & 11 deletions src/contrib/cluster/Akka.Cluster.Metrics/Serialization/Metric.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static partial class Types
///
/// Equality of Metric is based on its name index.
/// </summary>
public sealed partial class Metric
public sealed class Metric: IEquatable<Metric>
{
/// <summary>
/// Metric average value
Expand Down Expand Up @@ -79,7 +79,6 @@ public Metric(string name, AnyNumber value, Option<EWMA> average)
Name = name;
Value = value;
Average = average;
ewma_ = average.HasValue ? average.Value : default(EWMA);
}

/// <summary>
Expand Down Expand Up @@ -163,21 +162,15 @@ public static Either<long, double> ConvertNumber(AnyNumber number)
}
}

/*
* Two methods below, Equals and GetHashCode, should be used instead of generated in ClusterMetrics.Messages.g.cs
* file. Since we do not have an option to not generate those methods for this particular class,
* just stip them from generated code and paste here, with adding Address property check
*/
public override bool Equals(object obj)
=> obj is Metric other && Equals(other);



public bool Equals(Metric other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Name == other.Name;
return Name.Equals(other.Name);
}


public override int GetHashCode()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ namespace Akka.Cluster.Metrics.Serialization
/// Metrics gossip message
/// </summary>
[InternalApi]
public sealed partial class MetricsGossip
public sealed class MetricsGossip
{
public IImmutableSet<NodeMetrics> Nodes { get; private set; } = ImmutableHashSet<NodeMetrics>.Empty;
public IImmutableSet<NodeMetrics> Nodes { get; }

/// <summary>
/// Empty metrics gossip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Collections.Immutable;
using System.Linq;
using Akka.Util;
using Google.Protobuf.Collections;

namespace Akka.Cluster.Metrics.Serialization
{
Expand All @@ -20,9 +19,11 @@ namespace Akka.Cluster.Metrics.Serialization
///
/// Equality of NodeMetrics is based on its address.
/// </summary>
public sealed partial class NodeMetrics
public sealed partial class NodeMetrics : IEquatable<NodeMetrics>
{
public Actor.Address Address { get; private set; }
public Actor.Address Address { get; }
public ImmutableList<Types.Metric> Metrics { get; }
public long Timestamp { get; }

/// <summary>
/// Creates new instance of <see cref="NodeMetrics"/>
Expand All @@ -33,9 +34,8 @@ public sealed partial class NodeMetrics
public NodeMetrics(Actor.Address address, long timestamp, IEnumerable<Types.Metric> metrics)
{
Address = address;
timestamp_ = timestamp;
metrics_ = new RepeatedField<Types.Metric>();
metrics_.AddRange(metrics);
Timestamp = timestamp;
Metrics = metrics.ToImmutableList();
}

/// <summary>
Expand Down Expand Up @@ -93,19 +93,19 @@ public NodeMetrics Update(NodeMetrics that)
* just stip them from generated code and paste here, with adding Address property check
*/


public override bool Equals(object obj)
=> obj is NodeMetrics other && Equals(other);

public bool Equals(NodeMetrics other)
{
if (ReferenceEquals(null, other)) return false;
if (ReferenceEquals(this, other)) return true;
return Equals(Address, other.Address);
return Address.Equals(other.Address);
}


public override int GetHashCode()
{
return (Address != null ? Address.GetHashCode() : 0);
return Address.GetHashCode();
}
}
}
Loading