Skip to content

Commit

Permalink
Merge pull request #769 from dolittle/eventhandler-lag
Browse files Browse the repository at this point in the history
Metrics and stream processor improvements
  • Loading branch information
mhelleborg authored Jun 13, 2024
2 parents 1de1ac1 + 53b9c85 commit dd04a09
Show file tree
Hide file tree
Showing 51 changed files with 1,623 additions and 266 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ indent_size = 2
# we're putting the :severity options on everything where it can be put as it's bugged on net5
# check https://github.com/dotnet/roslyn/issues/50785#issuecomment-768606882
# default to all on warning
dotnet_analyzer_diagnostic.severity = warning
# dotnet_analyzer_diagnostic.severity = warning
file_header_template = Copyright (c) Dolittle. All rights reserved.\nLicensed under the MIT license. See LICENSE file in the project root for full license information.

#### .NET Conventions ####
Expand Down
2 changes: 2 additions & 0 deletions Integration/Shared/EventStoreExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public static FetchForAggregateResponse Combine(this FetchForAggregateResponse[]
AggregateRootId = responses.First().Events.AggregateRootId,
EventSourceId = responses.First().Events.EventSourceId,
CurrentAggregateRootVersion = responses.First().Events.CurrentAggregateRootVersion,
#pragma warning disable CS0612 // Type or member is obsolete
AggregateRootVersion = responses.First().Events.AggregateRootVersion
#pragma warning restore CS0612 // Type or member is obsolete
}
};

Expand Down
36 changes: 36 additions & 0 deletions Integration/Shared/Portable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Net;
using System.Net.Sockets;

namespace Integration.Shared;

public static class Portable
{
public static (int,int,int) FindFreePorts()
{
var listener = new TcpListener(IPAddress.Loopback, 0);
var listener2 = new TcpListener(IPAddress.Loopback, 0);
var listener3 = new TcpListener(IPAddress.Loopback, 0);
try
{
// Start the listener to obtain the port.
listener.Start();
listener2.Start();
listener3.Start();
// Get the port number assigned by the system.
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
var port2 = ((IPEndPoint)listener2.LocalEndpoint).Port;
var port3 = ((IPEndPoint)listener3.LocalEndpoint).Port;
return (port, port2, port3);
}
finally
{
// Stop the listener.
listener.Stop();
listener2.Stop();
listener3.Stop();
}
}
}
13 changes: 10 additions & 3 deletions Integration/Shared/Runtime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
using Dolittle.Runtime.Domain.Platform;
using Dolittle.Runtime.Domain.Tenancy;
using Dolittle.Runtime.Execution;
using Dolittle.Runtime.Metrics.Configuration;
using Dolittle.Runtime.Metrics.Hosting;
using Dolittle.Runtime.Server.Web;
using Dolittle.Runtime.Services;
Expand Down Expand Up @@ -52,6 +53,8 @@ public static RunningRuntime CreateAndStart(int numberOfTenants)
var configuration = new Dictionary<string, string?>();
var (databases, tenants) = CreateRuntimeConfiguration(configuration, numberOfTenants);

var (manPort, freePort, pubPort) = Portable.FindFreePorts();

var runtimeHost = Host.CreateDefaultBuilder()
.UseDolittleServices()
.ConfigureOpenTelemetry(cfg)
Expand All @@ -70,9 +73,13 @@ public static RunningRuntime CreateAndStart(int numberOfTenants)
coll.AddLogging(builder => builder.ClearProviders());
coll.AddOptions<EndpointsConfiguration>().Configure(builder =>
{
builder.Management = new EndpointConfiguration { Port = 0 };
// builder.Private = new EndpointConfiguration { Port = 0 };
builder.Public = new EndpointConfiguration { Port = 0 };
builder.Management = new EndpointConfiguration { Port = manPort };
builder.Private = new EndpointConfiguration { Port = freePort };
builder.Public = new EndpointConfiguration { Port = pubPort };
});
coll.AddOptions<MetricsServerConfiguration>().Configure(builder =>
{
builder.Port = 0;
});
})
.AddActorSystem()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ protected static IEnumerable<StreamEvent> get_partitioned_events_in_stream(IEven
while (!cts.IsCancellationRequested)
{
var evt = Task.Run(async () => await reader.ReadAsync(CancellationToken.None), cts.Token).GetAwaiter().GetResult();
events.Add(evt);
if (evt.IsEvent)
{
events.Add(evt.StreamEvent);
}
}
}
catch (Exception)
Expand Down
5 changes: 3 additions & 2 deletions Source/Actors/Hosting/ActorSystemClusterHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ public ActorSystemClusterHostedService(ActorSystem actorSystem, IStreamProcessor
}

/// <inheritdoc />
public Task StartAsync(CancellationToken cancellationToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
Log.SetLoggerFactory(_loggerFactory);
return _actorSystem.Cluster().StartMemberAsync();
await _actorSystem.Cluster().StartMemberAsync();
_logger.LogInformation("Actor system started");
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ public object Parse(string argName, string value, CultureInfo culture)
=> Guid.TryParse(value, out var aggregateRootId)
? new AggregateRootIdOrAlias(aggregateRootId)
: new AggregateRootIdOrAlias(new AggregateRootAlias(value));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ public class EventHandlerIdOrAliasParser : IValueParser
public Type TargetType => typeof(EventHandlerIdOrAlias);

/// <inheritdoc />
public object Parse(string argName, string value, CultureInfo culture)
public object Parse(string? argName, string? value, CultureInfo culture)
{
var scope = ScopeId.Default;
var segments = value.Split(":");
var segments = value?.Split(":") ?? Array.Empty<string>();
ThrowIfInvalidFormat(value, segments);
if (segments.Length > 1)
{
Expand All @@ -35,7 +35,7 @@ public object Parse(string argName, string value, CultureInfo culture)
: new EventHandlerIdOrAlias(segments[0], scope);
}

static void ThrowIfInvalidFormat(string value, string[] segments)
static void ThrowIfInvalidFormat(string? value, string[] segments)
{
switch (segments.Length)
{
Expand All @@ -45,4 +45,4 @@ static void ThrowIfInvalidFormat(string value, string[] segments)
throw new InvalidEventHandlerIdOrAlias(value);
}
}
}
}
11 changes: 9 additions & 2 deletions Source/CLI/Options/Parsers/Versioning/VersionParser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ public VersionParser(IVersionConverter converter)
public Type TargetType => typeof(Version);

/// <inheritdoc/>
public object Parse(string argName, string value, CultureInfo culture)
=> _converter.FromString(value);
public object Parse(string? argName, string? value, CultureInfo culture)
{
if (value == null)
{
throw new InvalidVersionString("");
}

return _converter.FromString(value);
}
}
2 changes: 1 addition & 1 deletion Source/CLI/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Dolittle.Runtime.CLI;
/// </summary>
[Command("dolittle", Description = "The Dolittle CLI tool")]
[Subcommand(typeof(Runtime.Command))]
class Program
sealed class Program
{
static int Main(string[] args)
{
Expand Down
2 changes: 1 addition & 1 deletion Source/CLI/Runtime/EventHandlers/IManagementClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ public interface IManagementClient
/// <param name="eventHandler">The Event Handler identifier.</param>
/// <param name="tenant">The Tenant to get Stream Processor states for, or null to get all.</param>
/// <returns>A <see cref="Task"/> that, when resolved, returns the <see cref="Try"/> containing the <see cref="EventHandlerStatus"/>-</returns>
Task<Try<EventHandlerStatus>> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId tenant = null);
Task<Try<EventHandlerStatus>> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId? tenant);

}
2 changes: 1 addition & 1 deletion Source/CLI/Runtime/EventHandlers/ManagementClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public async Task<IEnumerable<EventHandlerStatus>> GetAll(MicroserviceAddress ru
}

/// <inheritdoc />
public async Task<Try<EventHandlerStatus>> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId tenant = null)
public async Task<Try<EventHandlerStatus>> Get(MicroserviceAddress runtime, EventHandlerId eventHandler, TenantId? tenant = null)
{
var client = _clients.CreateClientFor<EventHandlersClient>(runtime);
var request = new GetOneRequest
Expand Down
2 changes: 1 addition & 1 deletion Source/CLI/Runtime/EventHandlers/Replay/FromCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public FromCommand(ICanLocateRuntimes runtimes, IManagementClient client, IResol
StreamPosition Position { get; init; }

[Option("--tenant", CommandOptionType.SingleValue, Description = "The tenant to replay events for. Defaults to the development tenant.")]
TenantId Tenant { get; init; }
TenantId? Tenant { get; init; }

/// <summary>
/// The entrypoint for the "dolittle runtime eventhandlers replay from" command.
Expand Down
12 changes: 12 additions & 0 deletions Source/Diagnostics/OpenTelemetry/Metrics/RuntimeMetrics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Diagnostics.Metrics;

namespace Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics;

public class RuntimeMetrics
{
public const string SourceName = "Dolittle.Runtime";
public static readonly Meter Meter = new(SourceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ public class OpenTelemetryConfiguration

public bool Logging { get; set; } = true;
public bool Tracing { get; set; } = true;
public bool Metrics { get; set; } = true;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using Dolittle.Runtime.Diagnostics.OpenTelemetry.Metrics;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Logs;
using OpenTelemetry.Metrics;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Proto.OpenTelemetry;
Expand All @@ -26,7 +28,8 @@ public static IHostBuilder ConfigureOpenTelemetry(this IHostBuilder builder, ICo

if (!Uri.TryCreate(configuration.Endpoint, UriKind.RelativeOrAbsolute, out var otlpEndpoint))
{
var logger = LoggerFactory.Create(opt => opt.AddConfiguration(cfg)).CreateLogger(typeof(OpenTelemetryConfigurationExtensions));
var logger = LoggerFactory.Create(opt => opt.AddConfiguration(cfg))
.CreateLogger(typeof(OpenTelemetryConfigurationExtensions));
#pragma warning disable CA1848
logger.LogWarning("Unable to parse otlp endpoint {Input}", configuration.Endpoint);
#pragma warning restore CA1848
Expand All @@ -50,6 +53,11 @@ public static IHostBuilder ConfigureOpenTelemetry(this IHostBuilder builder, ICo
builder.AddOpenTelemetryTracing(resourceBuilder, otlpEndpoint);
}

if (configuration.Metrics)
{
builder.AddOpenTelemetryMetrics(resourceBuilder, otlpEndpoint);
}

return builder;
}

Expand All @@ -74,9 +82,9 @@ static void AddOpenTelemetryTracing(this IHostBuilder builder, ResourceBuilder r
{
builder.ConfigureServices(services =>
services.AddOpenTelemetry()
.WithTracing(_ =>
.WithTracing(providerBuilder =>
{
_.SetResourceBuilder(resourceBuilder)
providerBuilder.SetResourceBuilder(resourceBuilder)
.AddSource(RuntimeActivity.SourceName)
.AddHttpClientInstrumentation()
.AddAspNetCoreInstrumentation()
Expand All @@ -86,4 +94,22 @@ static void AddOpenTelemetryTracing(this IHostBuilder builder, ResourceBuilder r
.AddOtlpExporter(options => { options.Endpoint = otlpEndpoint; });
}));
}

static void AddOpenTelemetryMetrics(this IHostBuilder builder, ResourceBuilder resourceBuilder, Uri otlpEndpoint)
{
builder.ConfigureServices(services =>
services.AddOpenTelemetry()
.WithMetrics(providerBuilder =>
{
providerBuilder.SetResourceBuilder(resourceBuilder)
.AddMeter(RuntimeMetrics.SourceName)
.AddAspNetCoreInstrumentation()
.AddProtoActorInstrumentation()
.AddOtlpExporter((exporterOptions, readerOptions) =>
{
exporterOptions.Endpoint = otlpEndpoint;
readerOptions.PeriodicExportingMetricReaderOptions.ExportIntervalMilliseconds = 5000;
});
}));
}
}
Loading

0 comments on commit dd04a09

Please sign in to comment.