Skip to content

Commit

Permalink
Make the info class abstract (#323)
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
  • Loading branch information
Gsantomaggio authored Oct 11, 2023
1 parent e37f2a2 commit a515ea9
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 53 deletions.
1 change: 0 additions & 1 deletion RabbitMQ.Stream.Client/AbstractEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ protected void MaybeCancelToken()

protected Client _client;

public Info Info { get; internal set; }
}
}
6 changes: 2 additions & 4 deletions RabbitMQ.Stream.Client/EntityInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

namespace RabbitMQ.Stream.Client;

public class Info
public abstract class Info
{
public string Reference { get; }
public string Stream { get; }

internal Info(string reference, string stream)
protected Info(string stream)
{
Reference = reference;
Stream = stream;
}
}
11 changes: 10 additions & 1 deletion RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface IConsumer
public Task<ResponseCode> Close();
public void Dispose();

public Info Info { get; }
public ConsumerInfo Info { get; }
}

public record IConsumerConfig : INamedEntity
Expand Down Expand Up @@ -73,3 +73,12 @@ public ushort InitialCredits
// It is not enabled by default because it is could reduce the performance.
public ICrc32 Crc32 { get; set; } = null;
}

public class ConsumerInfo : Info
{
public string Reference { get; }
public ConsumerInfo(string stream, string reference) : base(stream)
{
Reference = reference;
}
}
15 changes: 14 additions & 1 deletion RabbitMQ.Stream.Client/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public interface IProducer
/// <summary>
/// Info contains the reference and the stream name.
/// </summary>
public Info Info { get; }
public ProducerInfo Info { get; }
}

public record ProducerFilter
Expand Down Expand Up @@ -104,3 +104,16 @@ public record IProducerConfig : INamedEntity
/// </summary>
public ProducerFilter Filter { get; set; } = null;
}

/// <summary>
/// ProducerInfo contains the reference and the stream name.
/// </summary>
public class ProducerInfo : Info
{
public string Reference { get; }

public ProducerInfo(string stream, string reference) : base(stream)
{
Reference = reference;
}
}
24 changes: 16 additions & 8 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort
const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort
RabbitMQ.Stream.Client.AbstractEntity.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.AbstractEntity.MaybeCancelToken() -> void
RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.CancellationToken
RabbitMQ.Stream.Client.AuthMechanism
Expand All @@ -23,10 +22,13 @@ RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.get -> System.Func<RabbitMQ.Str
RabbitMQ.Stream.Client.ConsumerFilter.PostFilter.set -> void
RabbitMQ.Stream.Client.ConsumerFilter.Values.get -> System.Collections.Generic.List<string>
RabbitMQ.Stream.Client.ConsumerFilter.Values.set -> void
RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.ConsumerInfo.ConsumerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ConsumerInfo.Reference.get -> string
RabbitMQ.Stream.Client.CrcException
RabbitMQ.Stream.Client.CrcException.CrcException(string s) -> void
RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.IConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.IConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.ConsumerFilter.get -> RabbitMQ.Stream.Client.ConsumerFilter
Expand Down Expand Up @@ -57,9 +59,9 @@ RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Info.Reference.get -> string
RabbitMQ.Stream.Client.Info.Info(string stream) -> void
RabbitMQ.Stream.Client.Info.Stream.get -> string
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.IProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.IProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
Expand All @@ -71,6 +73,9 @@ RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTimeOf
RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.set -> void
RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.ProducerInfo.ProducerInfo(string stream, string reference) -> void
RabbitMQ.Stream.Client.ProducerInfo.Reference.get -> string
RabbitMQ.Stream.Client.PublishFilter
RabbitMQ.Stream.Client.PublishFilter.Command.get -> ushort
RabbitMQ.Stream.Client.PublishFilter.MaxVersion.get -> ushort
Expand All @@ -79,10 +84,13 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter() -> void
RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Collections.Generic.List<(ulong, RabbitMQ.Stream.Client.Message)> messages, System.Func<RabbitMQ.Stream.Client.Message, string> filterValueExtractor, Microsoft.Extensions.Logging.ILogger logger) -> void
RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int
RabbitMQ.Stream.Client.PublishFilter.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.Consumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Filter.get -> RabbitMQ.Stream.Client.ConsumerFilter
Expand All @@ -92,15 +100,15 @@ RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.IsOpen() -> bool
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Send(ulong publishing, RabbitMQ.Stream.Client.Message message) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void
RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase.Info.get -> RabbitMQ.Stream.Client.Info
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.RouteNotFoundException
Expand Down
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
_initialCredits = config.InitialCredits;
_config = config;
_logger.LogDebug("Creating... {ConsumerInfo}", ConsumerInfo());
Info = new Info(_config.Reference, _config.Stream);
Info = new ConsumerInfo(_config.Stream, _config.Reference);
// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
Expand Down Expand Up @@ -643,5 +643,7 @@ public void Dispose()
GC.SuppressFinalize(this);
}
}

public ConsumerInfo Info { get; }
}
}
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/RawProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu
{
_client = client;
_config = config;
Info = new Info(_config.Reference, _config.Stream);
Info = new ProducerInfo(_config.Stream, _config.Reference);
_messageBuffer = Channel.CreateBounded<OutgoingMsg>(new BoundedChannelOptions(10000)
{
AllowSynchronousContinuations = false,
Expand Down Expand Up @@ -403,5 +403,7 @@ public void Dispose()

GC.SuppressFinalize(this);
}

public ProducerInfo Info { get; }
}
}
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private RawSuperStreamConsumer(
_streamInfos = streamInfos;
_clientParameters = clientParameters;
_logger = logger ?? NullLogger.Instance;
Info = new Info(_config.Reference, _config.SuperStream);
Info = new ConsumerInfo(_config.SuperStream, _config.Reference);

StartConsumers().Wait(CancellationToken.None);
}
Expand Down Expand Up @@ -219,7 +219,7 @@ public void Dispose()
GC.SuppressFinalize(this);
}

public Info Info { get; internal set; }
public ConsumerInfo Info { get; }
}

public record RawSuperStreamConsumerConfig : IConsumerConfig
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/RawSuperStreamProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private RawSuperStreamProducer(
_config = config;
_streamInfos = streamInfos;
_clientParameters = clientParameters;
Info = new Info(config.Reference, config.SuperStream);
Info = new ProducerInfo(config.SuperStream, config.Reference);
_defaultRoutingConfiguration.RoutingStrategy = _config.RoutingStrategyType switch
{
RoutingStrategyType.Key => new KeyRoutingStrategy(_config.Routing,
Expand Down Expand Up @@ -277,7 +277,7 @@ public void Dispose()
public int IncomingFrames => _producers.Sum(x => x.Value.IncomingFrames);
public int PublishCommandsSent => _producers.Sum(x => x.Value.PublishCommandsSent);
public int PendingCount => _producers.Sum(x => x.Value.PendingCount);
public Info Info { get; }
public ProducerInfo Info { get; }
}

public enum RoutingStrategyType
Expand Down
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ internal Consumer(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null
{
_logger = logger ?? NullLogger<Consumer>.Instance;
_consumerConfig = consumerConfig;
Info = new Info(consumerConfig.Reference, consumerConfig.Stream);
Info = new ConsumerInfo(consumerConfig.Stream, consumerConfig.Reference);
}

public static async Task<Consumer> Create(ConsumerConfig consumerConfig, ILogger<Consumer> logger = null)
Expand Down Expand Up @@ -213,4 +213,6 @@ public override string ToString()
{
return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream} ";
}

public ConsumerInfo Info { get; }
}
2 changes: 1 addition & 1 deletion RabbitMQ.Stream.Client/Reliable/DeduplicatingProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ public async Task<ulong> GetLastPublishedId()
return await _producer.GetLastPublishingId().ConfigureAwait(false);
}

public Info Info => _producer.Info;
public ProducerInfo Info => _producer.Info;
}
4 changes: 3 additions & 1 deletion RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private Producer(ProducerConfig producerConfig, ILogger<Producer> logger = null)
producerConfig.TimeoutMessageAfter,
producerConfig.MaxInFlight
);
Info = new Info(producerConfig.Reference, producerConfig.Stream);
Info = new ProducerInfo(producerConfig.Stream, producerConfig.Reference);
_logger = logger ?? NullLogger<Producer>.Instance;
}

Expand Down Expand Up @@ -374,4 +374,6 @@ public async ValueTask Send(List<Message> messages)
SemaphoreSlim.Release();
}
}

public ProducerInfo Info { get; }
}
2 changes: 0 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,4 @@ public bool IsOpen()
{
return _isOpen;
}

public Info Info { get; internal set; }
}
45 changes: 18 additions & 27 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -741,17 +741,15 @@ public async void EntityInfoShouldBeCorrect()
var rawConsumer = await system.CreateRawConsumer(
new RawConsumerConfig(stream) { Reference = "consumer", });

var entityInfo = rawConsumer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("consumer", entityInfo.Reference);
Assert.Equal(stream, rawConsumer.Info.Stream);
Assert.Equal("consumer", rawConsumer.Info.Reference);
await rawConsumer.Close();

var rawProducer = await system.CreateRawProducer(
new RawProducerConfig(stream) { Reference = "producer", });

entityInfo = rawProducer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("producer", entityInfo.Reference);
Assert.Equal(stream, rawProducer.Info.Stream);
Assert.Equal("producer", rawProducer.Info.Reference);
await rawProducer.Close();

var rawSuperStreamProducer = await system.CreateRawSuperStreamProducer(
Expand All @@ -761,31 +759,27 @@ public async void EntityInfoShouldBeCorrect()
RoutingStrategyType = RoutingStrategyType.Hash,
Routing = _ => "OK",
});
entityInfo = rawSuperStreamProducer.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.Equal("super_producer", entityInfo.Reference);
Assert.Equal(SystemUtils.InvoicesExchange, rawSuperStreamProducer.Info.Stream);
Assert.Equal("super_producer", rawSuperStreamProducer.Info.Reference);
await rawSuperStreamProducer.Close();

var rawSuperStreamConsumer = await system.CreateSuperStreamConsumer(
new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { Reference = "super_consumer", });

entityInfo = rawSuperStreamConsumer.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.Equal("super_consumer", entityInfo.Reference);
Assert.Equal(SystemUtils.InvoicesExchange, rawSuperStreamConsumer.Info.Stream);
Assert.Equal("super_consumer", rawSuperStreamConsumer.Info.Reference);
await rawSuperStreamConsumer.Close();

var producer = await Producer.Create(new ProducerConfig(system, stream));

entityInfo = producer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
Assert.Equal(stream, producer.Info.Stream);
Assert.True(string.IsNullOrWhiteSpace(producer.Info.Reference));
await producer.Close();

var consumer = await Consumer.Create(new ConsumerConfig(system, stream) { Reference = "consumer", });

entityInfo = consumer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("consumer", entityInfo.Reference);
Assert.Equal(stream, consumer.Info.Stream);
Assert.Equal("consumer", consumer.Info.Reference);
await consumer.Close();

var producerSuperStream =
Expand All @@ -794,9 +788,8 @@ await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
SuperStreamConfig = new SuperStreamConfig() { Routing = _ => "OK" }
});

entityInfo = producerSuperStream.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.True(string.IsNullOrWhiteSpace(entityInfo.Reference));
Assert.Equal(SystemUtils.InvoicesExchange, producerSuperStream.Info.Stream);
Assert.True(string.IsNullOrWhiteSpace(producerSuperStream.Info.Reference));

await producerSuperStream.Close();

Expand All @@ -806,17 +799,15 @@ await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
IsSuperStream = true,
});

entityInfo = consumerSuperStream.Info;
Assert.Equal(SystemUtils.InvoicesExchange, entityInfo.Stream);
Assert.Equal("consumer", entityInfo.Reference);
Assert.Equal(SystemUtils.InvoicesExchange, consumerSuperStream.Info.Stream);
Assert.Equal("consumer", consumerSuperStream.Info.Reference);
await consumerSuperStream.Close();

var dedProducer =
await DeduplicatingProducer.Create(new DeduplicatingProducerConfig(system, stream, "dedProducer"));

entityInfo = dedProducer.Info;
Assert.Equal(stream, entityInfo.Stream);
Assert.Equal("dedProducer", entityInfo.Reference);
Assert.Equal(stream, dedProducer.Info.Stream);
Assert.Equal("dedProducer", dedProducer.Info.Reference);
await dedProducer.Close();

await SystemUtils.CleanUpStreamSystem(system, stream);
Expand Down

0 comments on commit a515ea9

Please sign in to comment.