Skip to content

Commit

Permalink
Merge pull request #775 from dolittle/init-offsets
Browse files Browse the repository at this point in the history
Added initialization of offset metadata on startup for all streams
  • Loading branch information
mhelleborg authored Sep 25, 2024
2 parents abd0ca4 + 9004abd commit bcebd89
Showing 1 changed file with 84 additions and 2 deletions.
86 changes: 84 additions & 2 deletions Source/Events.Store.MongoDB/Persistence/OffsetStore.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Dolittle. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Dolittle.Runtime.DependencyInversion.Lifecycle;
using Dolittle.Runtime.DependencyInversion.Scoping;
using Dolittle.Runtime.Events.Store.MongoDB.Events;
using Dolittle.Runtime.Events.Store.MongoDB.Migrations;
using MongoDB.Bson;
using MongoDB.Driver;

namespace Dolittle.Runtime.Events.Store.MongoDB.Persistence;
Expand All @@ -24,16 +29,86 @@ public OffsetStore(IDatabaseConnection connection) : base(connection)
{
Collection = Database.GetCollection<StreamMetadata>(EventLogMetadataCollectionName);

CreateCollectionIfNotExists();
Init();
}

void CreateCollectionIfNotExists()
void Init()
{
var collectionNames = Database.ListCollectionNames().ToList();
if (!collectionNames.Contains(EventLogMetadataCollectionName))
{
Database.CreateCollection(EventLogMetadataCollectionName);
}
else // Already created. Check if it's initialized
{
var existingDocuments = Collection.CountDocuments(FilterDefinition<StreamMetadata>.Empty);
if (existingDocuments > 0)
{
return; // Already initialized
}
}

SetInitialOffsetsForStreams(collectionNames);
}

void SetInitialOffsetsForStreams(List<string> collectionNames)
{
// Not initialized, create and populate with stream metadata
using var session = Database.Client.StartSession();
try
{
session.StartTransaction();
var streamCollections = collectionNames.Where(StreamIdMatcher.IsStreamOrEventLog).ToList();
foreach (var stream in streamCollections)
{
InitStream(session, stream);
}
session.CommitTransaction();
}
catch (Exception e)
{
session.AbortTransaction();
throw new OffsetStoreInitFailed(e, "Failed to initialize stream metadata");
}
}

void InitStream(IClientSessionHandle session, string stream)
{
var currentOffset = GetCurrentOffsetForStream(session, stream);
if (currentOffset is null)
{
return;
}

var nextOffset = currentOffset.Value + 1;
Collection.InsertOne(session, new StreamMetadata
{
StreamName = stream,
NextEventOffset = nextOffset,
});
}

/// <summary>
/// Get _id of the last event in the stream
/// </summary>
/// <param name="session"></param>
/// <param name="stream"></param>
/// <returns></returns>
private ulong? GetCurrentOffsetForStream(IClientSessionHandle session, string stream)
{
var collection = Database.GetCollection<object>(stream);
var filter = Builders<object>.Filter.Empty;
var sort = Builders<object>.Sort.Descending("_id");
var projection = Builders<object>.Projection.Include("_id");
var cursor = collection.Find(session, filter).Sort(sort).Limit(1).Project(projection).ToCursor()
.FirstOrDefault();
if (cursor is null)
{
return null;
}

var bsonValue = cursor["_id"].ToInt64();
return (ulong)bsonValue;
}

public Task UpdateOffset(string stream, IClientSessionHandle session, ulong nextEventOffset,
Expand Down Expand Up @@ -72,3 +147,10 @@ public Task<ulong> GetNextOffset(string stream, IClientSessionHandle? session, C
static FilterDefinition<StreamMetadata> GetFilter(string streamName) =>
_filterBuilder.Eq(metadata => metadata.StreamName, streamName);
}

public class OffsetStoreInitFailed : Exception
{
public OffsetStoreInitFailed(Exception exception, string message): base(message, exception)
{
}
}

0 comments on commit bcebd89

Please sign in to comment.