Skip to content

Commit

Permalink
Merge pull request #760 from dolittle/eventhorizon-mongodb
Browse files Browse the repository at this point in the history
Eventhorizon bugfix & optimizations
  • Loading branch information
mhelleborg authored Nov 27, 2023
2 parents 0fb7cea + 431738d commit 7fdc799
Show file tree
Hide file tree
Showing 17 changed files with 72 additions and 37 deletions.
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dotnet_naming_style.private_fields.required_prefix
dotnet_naming_rule.public_members_must_be_capitalized.style = first_word_upper_case_style
dotnet_naming_style.first_word_upper_case_style.capitalization = first_word_upper
dotnet_naming_rule.public_members_must_be_capitalized.symbols = public_symbols
dotnet_naming_rule.public_members_must_be_capitalized.severity = error
dotnet_naming_symbols.public_symbols.applicable_kinds = property,method,field,event,delegate
dotnet_naming_symbols.public_symbols.applicable_accessibilities = public
dotnet_naming_symbols.public_symbols.required_modifiers = readonly
Expand Down Expand Up @@ -116,6 +117,8 @@ dotnet_diagnostic.RCS1018.severity = none
dotnet_diagnostic.IDE0130.severity = none
# IDE0058: Remove unnecessary expression value
dotnet_diagnostic.IDE0058.severity = none
# IDE0290: Use primary constructor
dotnet_diagnostic.IDE0290.severity = none

# IDE0160: Convert to file-scoped namespace
csharp_style_namespace_declarations = file_scoped:warning
Expand Down
2 changes: 1 addition & 1 deletion Configurations/producer/.dolittle/runtime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ tenants:
consents:
- consumerTenant: "445f8ea8-1a6f-40d7-b2fc-796dba92dc44"
stream: "2c087657-b318-40b1-ae92-a400de44e507"
partition: "Dolittle Tacos"
partition: "00000000-0000-0000-0000-000000000000"
consent: "ad57aa2b-e641-4251-b800-dd171e175d1f"
4 changes: 2 additions & 2 deletions Integration/Benchmarks/Events.Store/CommitEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public Task CommitEventsInBatch()
[Benchmark]
[Arguments(10)]
[Arguments(100)]
public async Task CommitEventsInParallel(int ParallelCommits)
public async Task CommitEventsInParallel(int parallelCommits)
{
var tasks = new List<Task>();
foreach (var i in Enumerable.Range(0, ParallelCommits))
for (var i = 0; i < parallelCommits; i++)
{
tasks.Add(_eventStore.Commit(_eventsToCommit, _executionContext));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ public static class AggregateMetadataExtensions
/// </summary>
/// <param name="committedEvent">The <see cref="CommittedEvent" />.</param>
/// <returns>The <see cref="AggregateMetadata"/>.</returns>
public static AggregateMetadata GetAggregateMetadata(this CommittedEvent committedEvent) =>
public static AggregateMetadata? GetAggregateMetadata(this CommittedEvent committedEvent) =>
committedEvent is CommittedAggregateEvent aggregateEvent ?
new AggregateMetadata(true, aggregateEvent.AggregateRoot.Id, aggregateEvent.AggregateRoot.Generation, aggregateEvent.AggregateRootVersion)
: new AggregateMetadata();
}
: null;
}
6 changes: 3 additions & 3 deletions Source/Events.Store.MongoDB/Events/CommittedEventExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public static EventHorizonMetadata GetEventHorizonMetadata(this CommittedExterna
/// </summary>
/// <param name="committedEvent">The <see cref="CommittedEvent"/>.</param>
/// <returns>The converted <see cref="EventHorizonMetadata" />.</returns>
public static EventHorizonMetadata GetEventHorizonMetadata(this CommittedEvent committedEvent) =>
public static EventHorizonMetadata? GetEventHorizonMetadata(this CommittedEvent committedEvent) =>
committedEvent is CommittedExternalEvent externalEvent ?
externalEvent.GetEventHorizonMetadata()
: new EventHorizonMetadata();
}
: null;
}
2 changes: 1 addition & 1 deletion Source/Events.Store.MongoDB/Events/EventConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public mongoDB.Event ToEventLogEvent(CommittedExternalEvent committedEvent) =>
committedEvent.EventLogSequenceNumber,
committedEvent.ExecutionContext.ToStoreRepresentation(),
committedEvent.GetEventMetadata(),
new AggregateMetadata(),
null,
committedEvent.GetEventHorizonMetadata(),
_contentConverter.ToBson(committedEvent.Content));

Expand Down
10 changes: 5 additions & 5 deletions Source/Events.Store.MongoDB/Events/EventHorizonMetadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ public EventHorizonMetadata(
/// <summary>
/// Gets or sets a value indicating whether this event came from EventHorizon.
/// </summary>
public bool FromEventHorizon { get; set; }
public bool FromEventHorizon { get; init; }

/// <summary>
/// Gets or sets the origin event log sequence number of the event if it came from EventHorizon.
/// </summary>
[BsonRepresentation(BsonType.Decimal128)]
public ulong ExternalEventLogSequenceNumber { get; set; }
public ulong ExternalEventLogSequenceNumber { get; init; }

/// <summary>
/// Gets or sets the <see cref="DateTime"/> of when the Event was received.
/// </summary>
[BsonDateTimeOptions(Kind = DateTimeKind.Utc)]
public DateTime Received { get; set; }
public DateTime Received { get; init; }

/// <summary>
/// Gets or sets the consent id.
/// </summary>
public Guid Consent { get; set; }
}
public Guid Consent { get; init; }
}
12 changes: 10 additions & 2 deletions Source/Events.Store.MongoDB/Migrations/StreamIdMatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@ namespace Dolittle.Runtime.Events.Store.MongoDB.Migrations;

public static partial class StreamIdMatcher
{
public static bool IsStreamOrEventLog(string input) => input.Equals("event-log") || IsStream(input) || IsScopedEventLog(input) || IsScopedStream(input);
public static bool IsStreamOrEventLog(string input) => input.Equals("event-log") || IsStream(input) || IsScopedEventLog(input);

public static bool IsStream(string input) => IsNormalStream(input) || IsScopedStream(input);
public static bool IsStream(string input) => IsNormalStream(input) || IsScopedStream(input) || IsPublicStream(input);

public static bool IsNormalStream(string input)
{
return StreamIdRegex().IsMatch(input);
}

public static bool IsPublicStream(string input)
{
return PublicStreamIdRegex().IsMatch(input);
}

public static bool IsScopedStream(string input)
{
Expand All @@ -28,6 +33,9 @@ public static bool IsScopedEventLog(string input)

[GeneratedRegex("^stream-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", RegexOptions.IgnoreCase | RegexOptions.Compiled)]
private static partial Regex StreamIdRegex();

[GeneratedRegex("^public-stream-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", RegexOptions.IgnoreCase | RegexOptions.Compiled)]
private static partial Regex PublicStreamIdRegex();

[GeneratedRegex("^x-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}-stream-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
RegexOptions.IgnoreCase | RegexOptions.Compiled)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public class SubscriptionState
/// <param name="producerTenantId">The producer <see cref="TenantId" />.</param>
/// <param name="streamId">The public <see cref="Store.Streams.StreamId" /> to subscribe to.</param>
/// <param name="partitionId">The <see cref="Store.Streams.PartitionId" /> in the stream to subscribe to.</param>
/// <param name="position">The position.</param>
/// <param name="position">The public stream position.</param>
/// <param name="eventLogSequence">The event log position</param>
/// <param name="retryTime">The time to retry processing.</param>
/// <param name="failureReason">The reason for failing.</param>
/// <param name="processingAttempts">The number of times the event at <see cref="Position" /> has been processed.</param>
Expand All @@ -33,6 +34,7 @@ public SubscriptionState(
Guid streamId,
string partitionId,
ulong position,
ulong eventLogSequence,
DateTime retryTime,
string failureReason,
uint processingAttempts,
Expand All @@ -44,6 +46,7 @@ public SubscriptionState(
Stream = streamId;
Partition = partitionId;
Position = position;
EventLogSequence = eventLogSequence;
LastSuccessfullyProcessed = lastSuccessfullyProcessed;
RetryTime = retryTime;
FailureReason = failureReason;
Expand Down Expand Up @@ -76,6 +79,12 @@ public SubscriptionState(
/// </summary>
[BsonRepresentation(BsonType.Decimal128)]
public ulong Position { get; set; }

/// <summary>
/// Gets or sets the EventLogSequence.
/// </summary>
[BsonRepresentation(BsonType.Decimal128)]
public ulong EventLogSequence { get; set; }

/// <summary>
/// Gets or sets the timestamp when the StreamProcessor has processed the stream with Kind of UTC.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public static class SubscriptionStateExtensions
/// <returns>The converted <see cref="runtime.Partitioned.StreamProcessorState" />.</returns>
public static runtime.StreamProcessorState ToRuntimeRepresentation(this SubscriptionState state) =>
new(
new ProcessingPosition(state.Position, state.Position),
new ProcessingPosition(state.Position, state.EventLogSequence),
state.FailureReason,
state.RetryTime,
state.ProcessingAttempts,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected override MongoSubscriptionState CreateDocument(SubscriptionId id, UnPa
id.ProducerTenantId,
id.StreamId,
id.PartitionId,
state.Position.StreamPosition,
state.Position.EventLogPosition,
state.RetryTime.UtcDateTime,
state.FailureReason,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ public void VerifyMatchesScopedStream(string input)
StreamIdMatcher.IsScopedStream(input).Should().BeTrue();
}

[Theory]
[InlineData("public-stream-2c087657-b318-40b1-ae92-a400de44e507")]

public void VerifyMatchesPublicStream(string input)
{
StreamIdMatcher.IsPublicStream(input).Should().BeTrue();
}

[Theory]
[InlineData("x-06fd3dcf-a457-4e76-917e-5049ef49bfd3-event-log")]
[InlineData("x-06fd3dcf-a457-4e76-917e-5049ef49bfd4-event-log")]
Expand All @@ -28,6 +36,7 @@ public void VerifyMatchesScopedEventLog(string input)
[Theory]
[InlineData("event-log")]
[InlineData("stream-6a080414-d493-4ce1-a11b-bd60208b9d7a")]
[InlineData("public-stream-6a080414-d493-4ce1-a11b-bd60208b9d7a")]
[InlineData("x-06fd3dcf-a457-4e76-917e-5049ef49bfd3-event-log")]
[InlineData("x-16fd3dcf-a457-4e76-917e-5049ef49bfd3-stream-6a080414-d493-4ce1-a11b-bd60208b9d7b")]
public void VerifyMatches(string input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,5 @@ public class from_committed_event

Because of = () => result = committed_event.GetEventHorizonMetadata();

It should_have_empty_consent = () => result.Consent.ShouldEqual(Guid.Empty);
It should_have_the_default_external_event_log_sequence_number = () => result.ExternalEventLogSequenceNumber.ShouldEqual(default);
It should_not_be_from_event_horizon = () => result.FromEventHorizon.ShouldBeFalse();
It should_have_the_correct_received_value = () => result.Received.ShouldEqual(DateTime.MinValue);
private It should_have_empty_consent = () => result.ShouldBeNull();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class an_external_event : given.an_event_content_converter
Because of = () => result = event_converter.ToEventLogEvent(committed_event);

It should_represent_the_same_event = () => result.ShouldBeTheSameAs(committed_event);
It should_not_be_applied_by_aggregate = () => result.Aggregate.WasAppliedByAggregate.ShouldBeFalse();
private It should_not_be_applied_by_aggregate = () => result.Aggregate.ShouldBeNull();
It should_come_from_event_horizon = () => result.IsFromEventHorizon.ShouldBeTrue();
It should_have_the_same_consent = () => result.EventHorizon.Consent.ShouldEqual(committed_event.Consent.Value);
It should_have_the_same_external_event_log_sequence_number = () => result.EventHorizon.ExternalEventLogSequenceNumber.ShouldEqual(committed_event.ExternalEventLogSequenceNumber.Value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class an_external_event : given.an_event_content_converter

It should_represent_the_same_event = () => result.ShouldBeTheSameAs(committed_event);
It should_have_the_correct_stream_position = () => result.StreamPosition.ShouldEqual(stream_position.Value);
It should_not_be_applied_by_aggregate = () => result.Aggregate.WasAppliedByAggregate.ShouldBeFalse();
private It should_not_be_applied_by_aggregate = () => result.Aggregate.ShouldBeNull();
It should_come_from_event_horizon = () => result.IsFromEventHorizon.ShouldBeTrue();
It should_have_the_same_consent = () => result.EventHorizon.Consent.ShouldEqual(committed_event.Consent.Value);
It should_have_the_same_external_event_log_sequence_number = () => result.EventHorizon.ExternalEventLogSequenceNumber.ShouldEqual(committed_event.ExternalEventLogSequenceNumber.Value);
Expand Down
15 changes: 9 additions & 6 deletions Specifications/Events.Store.MongoDB/event_builder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public event_builder(EventLogSequenceNumber event_log_sequence_number) =>
execution_contexts.create_store(),
metadata.random_event_metadata,
metadata.aggregate_metadata_from_non_aggregate_event,
new EventHorizonMetadata(),
null,
events.some_event_content_bson_document);

public event_builder(EventLogSequenceNumber event_log_sequence_number, AggregateRootVersion aggregate_version) =>
Expand All @@ -27,7 +27,7 @@ public event_builder(EventLogSequenceNumber event_log_sequence_number, Aggregate
execution_contexts.create_store(),
metadata.random_event_metadata,
metadata.random_aggregate_metadata_from_aggregate_event_with_version(aggregate_version),
new EventHorizonMetadata(),
null,
events.some_event_content_bson_document);

public MongoDB.Events.Event build() => _instance;
Expand All @@ -54,10 +54,13 @@ public event_builder with_content(BsonDocument document)

public event_builder from_event_horizon()
{
_instance.EventHorizon.FromEventHorizon = true;
_instance.EventHorizon.Consent = Guid.Parse("df838974-100b-4a07-9e44-08e2c7d7e99a");
_instance.EventHorizon.ExternalEventLogSequenceNumber = 71883084;
_instance.EventHorizon.Received = new DateTime(2944480155, DateTimeKind.Utc);
_instance.EventHorizon = new EventHorizonMetadata
{
FromEventHorizon = true,
ExternalEventLogSequenceNumber = 71883084,
Received = new DateTime(2944480155, DateTimeKind.Utc),
Consent = Guid.Parse("df838974-100b-4a07-9e44-08e2c7d7e99a")
};
return this;
}
}
17 changes: 11 additions & 6 deletions Specifications/Events.Store.MongoDB/stream_event_builder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public stream_event_builder(StreamPosition stream_position, PartitionId partitio
execution_contexts.create_store(),
metadata.random_stream_event_metadata,
metadata.aggregate_metadata_from_non_aggregate_event,
new EventHorizonMetadata(),
null,
events.some_event_content_bson_document);

public stream_event_builder(StreamPosition stream_position, PartitionId partition, AggregateRootVersion aggregate_version) =>
Expand All @@ -30,7 +30,7 @@ public stream_event_builder(StreamPosition stream_position, PartitionId partitio
execution_contexts.create_store(),
metadata.random_stream_event_metadata,
metadata.random_aggregate_metadata_from_aggregate_event_with_version(aggregate_version),
new EventHorizonMetadata(),
null,
events.some_event_content_bson_document);

public Events.StreamEvent build() => _instance;
Expand Down Expand Up @@ -69,10 +69,15 @@ public stream_event_builder with_event_log_sequence_number(EventLogSequenceNumbe

public stream_event_builder from_event_horizon()
{
_instance.EventHorizon.FromEventHorizon = true;
_instance.EventHorizon.Consent = Guid.Parse("e1af7d82-b11a-4766-bcfa-f5405ac0b133");
_instance.EventHorizon.ExternalEventLogSequenceNumber = 205;
_instance.EventHorizon.Received = new DateTime(226397148, DateTimeKind.Utc);
_instance.EventHorizon = new EventHorizonMetadata
{
FromEventHorizon = true,
ExternalEventLogSequenceNumber = 205,
Received = new DateTime(226397148, DateTimeKind.Utc),
Consent = Guid.Parse("e1af7d82-b11a-4766-bcfa-f5405ac0b133")
};


return this;
}
}

0 comments on commit 7fdc799

Please sign in to comment.