diff --git a/src/documentation/articles/howitworks.md b/src/documentation/articles/howitworks.md index 187b7fba..aa58ef12 100644 --- a/src/documentation/articles/howitworks.md +++ b/src/documentation/articles/howitworks.md @@ -38,7 +38,11 @@ Using the [topic compaction](https://kafka.apache.org/documentation/#compaction) - Update: a producer storing a new record with a previously stored unique key will discard the old records - Delete: a producer storing a new record with a previously stored unique key, and value set to null, will delete all records with that unique key -All CRUD operations are helped, behind the scene, from [`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) and/or [`KNetProducer`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Producer/KNetProducer.cs)/[Apache Kafka Streams](https://kafka.apache.org/documentation/streams/). +All CRUD operations are helped, behind the scene, from [`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) or [`KNetProducer`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Producer/KNetProducer.cs)/[Apache Kafka Streams](https://kafka.apache.org/documentation/streams/). + +### First-level cache + +[`KNetCompactedReplicator`](https://github.com/masesgroup/KNet/blob/master/src/net/KNet/Specific/Replicator/KNetCompactedReplicator.cs) or [Apache Kafka Streams](https://kafka.apache.org/documentation/streams/) act as first-level cache of [Entity Framework Core](https://learn.microsoft.com/it-it/ef/core/): **data coming from the Apache Kafka cluster updates their content while the system is running without a specific request**. ### Data storage diff --git a/src/documentation/articles/kafkadbcontext.md b/src/documentation/articles/kafkadbcontext.md index 3f5dd8ab..d399a4b0 100644 --- a/src/documentation/articles/kafkadbcontext.md +++ b/src/documentation/articles/kafkadbcontext.md @@ -6,9 +6,10 @@ - **KeySerializationType**: the .NET type to be used to allocate an external serializer for Apache Kafka record key - **ValueSerializationType**: the .NET type to be used to allocate an external serializer for Apache Kafka record value - **ValueContainerType**: the .NET type to be used to allocate an external container class for Apache Kafka record value + - **UseNameMatching**: set to **false** to avoid Entity matching based on Name - **BootstrapServers**: the server hosting the broker of Apache Kafka - **ApplicationId**: the application identifier used to identify the context - - **DbName**: the user defined name which declares the database name, it is used to prepend every Topic which belongs to this database + - **DatabaseName**: the user defined name which declares the database name, it is used to prepend every Topic which belongs to this database - **DefaultNumPartitions**: the default number of partitions used when topics are created for each entity - **DefaultReplicationFactor**: the replication factor to use when data are stored in Apache Kafka - **DefaultConsumerInstances**: the consumer instances to be allocated when UseCompactedReplicator is **true** diff --git a/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs b/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs index cfe7625a..f66804f8 100644 --- a/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs +++ b/src/net/KEFCore/Design/Internal/KafkaCSharpRuntimeAnnotationCodeGenerator.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * @@ -32,6 +29,10 @@ namespace MASES.EntityFrameworkCore.KNet.Design.Internal; /// public class KafkaCSharpRuntimeAnnotationCodeGenerator : CSharpRuntimeAnnotationCodeGenerator { + /// + /// Default initializer + /// + /// public KafkaCSharpRuntimeAnnotationCodeGenerator( CSharpRuntimeAnnotationCodeGeneratorDependencies dependencies) : base(dependencies) diff --git a/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs b/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs index a60384b1..b2f01b24 100644 --- a/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs +++ b/src/net/KEFCore/Design/Internal/KafkaDesignTimeServices.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * @@ -32,6 +29,7 @@ namespace MASES.EntityFrameworkCore.KNet.Design.Internal; /// public class KafkaDesignTimeServices : IDesignTimeServices { + /// public virtual void ConfigureDesignTimeServices(IServiceCollection serviceCollection) { serviceCollection.AddEntityFrameworkKafkaDatabase(); diff --git a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs index c27cbeb6..329e7076 100644 --- a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs +++ b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggerExtensions.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * diff --git a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs index 0062a21d..3e4e541d 100644 --- a/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs +++ b/src/net/KEFCore/Diagnostics/Internal/KafkaLoggingDefinitions.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * diff --git a/src/net/KEFCore/Diagnostics/KafkaEventId.cs b/src/net/KEFCore/Diagnostics/KafkaEventId.cs index 09675ee4..8cc86b61 100644 --- a/src/net/KEFCore/Diagnostics/KafkaEventId.cs +++ b/src/net/KEFCore/Diagnostics/KafkaEventId.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * diff --git a/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs b/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs index 1a2b08b1..8251b436 100644 --- a/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaDatabaseFacadeExtensions.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * diff --git a/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs b/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs index 866c9605..85f96e7a 100644 --- a/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaDbContextOptionsExtensions.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * @@ -125,22 +122,28 @@ var coreOptionsExtension ((IDbContextOptionsBuilderInfrastructure)optionsBuilder).AddOrUpdateExtension(coreOptionsExtension); } - + /// + /// Creates a serializer for keys + /// public static Type SerializerTypeForKey(this IKafkaSingletonOptions options, IEntityType entityType) { var primaryKey = entityType.FindPrimaryKey()!.GetKeyType(); - return options.KeySerializationType.MakeGenericType(primaryKey); + return options.KeySerializationType?.MakeGenericType(primaryKey)!; } - + /// + /// Creates a serialzier for values + /// public static Type SerializerTypeForValue(this IKafkaSingletonOptions options, IEntityType entityType) { var primaryKey = entityType.FindPrimaryKey()!.GetKeyType(); - return options.ValueSerializationType.MakeGenericType(ValueContainerType(options, entityType)); + return options.ValueSerializationType?.MakeGenericType(ValueContainerType(options, entityType))!; } - + /// + /// Create the ValueContainer + /// public static Type ValueContainerType(this IKafkaSingletonOptions options, IEntityType entityType) { var primaryKey = entityType.FindPrimaryKey()!.GetKeyType(); - return options.ValueContainerType.MakeGenericType(primaryKey); + return options.ValueContainerType?.MakeGenericType(primaryKey)!; } } diff --git a/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs b/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs index ba28565f..b0a6e462 100644 --- a/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaEntityTypeBuilderExtensions.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * diff --git a/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs b/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs index 9a158759..fdee1d9f 100644 --- a/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaEntityTypeExtensions.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * @@ -90,34 +87,46 @@ public static void SetKafkaQuery( => entityType.FindAnnotation(CoreAnnotationNames.DefiningQuery)?.GetConfigurationSource(); #pragma warning restore CS0612 // Il tipo o il membro è obsoleto #pragma warning restore EF1001 // Internal EF Core API usage. - + /// + /// Creates the topic name + /// public static string TopicName(this IEntityType entityType, KafkaOptionsExtension options) { return $"{options.DatabaseName}.{entityType.Name}"; } - + /// + /// Creates the storage id + /// public static string StorageIdForTable(this IEntityType entityType, KafkaOptionsExtension options) { return $"Table_{entityType.TopicName(options)}"; } - + /// + /// Creates the application id + /// public static string ApplicationIdForTable(this IEntityType entityType, KafkaOptionsExtension options) { return $"{options.ApplicationId}_{entityType.Name}"; } - + /// + /// Gets replication factor + /// public static short ReplicationFactor(this IEntityType entityType, KafkaOptionsExtension options) { var replicationFactor = options.DefaultReplicationFactor; return replicationFactor; } - + /// + /// Gets number of partitions + /// public static int NumPartitions(this IEntityType entityType, KafkaOptionsExtension options) { var numPartitions = options.DefaultNumPartitions; return numPartitions; } - + /// + /// Gets consumer instances + /// public static int? ConsumerInstances(this IEntityType entityType, KafkaOptionsExtension options) { var consumerInstances = options.DefaultConsumerInstances; diff --git a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs index ef69994f..68aed60e 100644 --- a/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs +++ b/src/net/KEFCore/Extensions/KafkaServiceCollectionExtensions.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * diff --git a/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs b/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs index e1052d33..94d9cb54 100644 --- a/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs +++ b/src/net/KEFCore/Infrastructure/IKafkaDbContextOptionsBuilderInfrastructure.cs @@ -1,7 +1,4 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -/* +/* * Copyright 2023 MASES s.r.l. * * Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs index ab6aecae..058967c3 100644 --- a/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/IKafkaSingletonOptions.cs @@ -32,39 +32,40 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; /// public interface IKafkaSingletonOptions : ISingletonOptions { - Type KeySerializationType { get; } - - Type ValueSerializationType { get; } - - Type ValueContainerType { get; } - + /// + Type? KeySerializationType { get; } + /// + Type? ValueSerializationType { get; } + /// + Type? ValueContainerType { get; } + /// bool UseNameMatching { get; } - + /// string? DatabaseName { get; } - + /// string? ApplicationId { get; } - + /// string? BootstrapServers { get; } - + /// bool UseDeletePolicyForTopic { get; } - + /// bool UseCompactedReplicator { get; } - + /// bool UsePersistentStorage { get; } - + /// int DefaultNumPartitions { get; } - + /// int? DefaultConsumerInstances { get; } - + /// int DefaultReplicationFactor { get; } - + /// ConsumerConfigBuilder? ConsumerConfig { get; } - + /// ProducerConfigBuilder? ProducerConfig { get; } - + /// StreamsConfigBuilder? StreamsConfig { get; } - + /// TopicConfigBuilder? TopicConfig { get; } - - Action OnChangeEvent { get; } + /// + Action? OnChangeEvent { get; } } diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs index 68648e8b..87df64c2 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaModelValidator.cs @@ -25,11 +25,14 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; /// public class KafkaModelValidator : ModelValidator { + /// + /// Initializer + /// public KafkaModelValidator(ModelValidatorDependencies dependencies) : base(dependencies) { } - + /// public override void Validate(IModel model, IDiagnosticsLogger logger) { base.Validate(model, logger); diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs index f3dd184c..83e45595 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaOptionsExtension.cs @@ -25,7 +25,6 @@ using MASES.KNet.Common; using MASES.KNet.Consumer; using MASES.KNet.Producer; -using MASES.KNet.Serialization; using MASES.KNet.Streams; using System.Globalization; @@ -60,11 +59,15 @@ public class KafkaOptionsExtension : IDbContextOptionsExtension static Java.Lang.ClassLoader _loader = Java.Lang.ClassLoader.SystemClassLoader; static Java.Lang.ClassLoader SystemClassLoader => _loader; - + /// + /// Initializer + /// public KafkaOptionsExtension() { } - + /// + /// Initializer + /// protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) { _keySerializationType = copyFrom._keySerializationType; @@ -86,49 +89,51 @@ protected KafkaOptionsExtension(KafkaOptionsExtension copyFrom) _topicConfigBuilder = TopicConfigBuilder.CreateFrom(copyFrom._topicConfigBuilder); _onChangeEvent = copyFrom._onChangeEvent; } - + /// public virtual DbContextOptionsExtensionInfo Info => _info ??= new ExtensionInfo(this); - + /// protected virtual KafkaOptionsExtension Clone() => new(this); - + /// + /// Internal property + /// public virtual string ClusterId => _bootstrapServers!; - + /// public virtual Type KeySerializationType => _keySerializationType; - + /// public virtual Type ValueSerializationType => _valueSerializationType; - + /// public virtual Type ValueContainerType => _valueContainerType; - + /// public virtual bool UseNameMatching => _useNameMatching; - + /// public virtual string DatabaseName => _databaseName!; - + /// public virtual string ApplicationId => _applicationId!; - + /// public virtual string BootstrapServers => _bootstrapServers!; - + /// public virtual bool UseDeletePolicyForTopic => _useDeletePolicyForTopic; - + /// public virtual bool UseCompactedReplicator => _useCompactedReplicator; - + /// public virtual bool UsePersistentStorage => _usePersistentStorage; - + /// public virtual int DefaultNumPartitions => _defaultNumPartitions; - + /// public virtual int? DefaultConsumerInstances => _defaultConsumerInstances; - + /// public virtual short DefaultReplicationFactor => _defaultReplicationFactor; - + /// public virtual ConsumerConfigBuilder ConsumerConfig => _consumerConfigBuilder!; - + /// public virtual ProducerConfigBuilder ProducerConfig => _producerConfigBuilder!; - + /// public virtual StreamsConfigBuilder StreamsConfig => _streamsConfigBuilder!; - + /// public virtual TopicConfigBuilder TopicConfig => _topicConfigBuilder!; - + /// public virtual Action OnChangeEvent => _onChangeEvent!; - + /// public virtual KafkaOptionsExtension WithKeySerializationType(Type serializationType) { if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\""); @@ -139,7 +144,7 @@ public virtual KafkaOptionsExtension WithKeySerializationType(Type serialization return clone; } - + /// public virtual KafkaOptionsExtension WithValueSerializationType(Type serializationType) { if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\""); @@ -150,7 +155,7 @@ public virtual KafkaOptionsExtension WithValueSerializationType(Type serializati return clone; } - + /// public virtual KafkaOptionsExtension WithValueContainerType(Type serializationType) { if (!serializationType.IsGenericTypeDefinition) throw new InvalidOperationException($"{serializationType.Name} shall be a generic type and shall be defined using \"<>\""); @@ -161,7 +166,7 @@ public virtual KafkaOptionsExtension WithValueContainerType(Type serializationTy return clone; } - + /// public virtual KafkaOptionsExtension WithUseNameMatching(bool useNameMatching = true) { var clone = Clone(); @@ -170,7 +175,7 @@ public virtual KafkaOptionsExtension WithUseNameMatching(bool useNameMatching = return clone; } - + /// public virtual KafkaOptionsExtension WithDatabaseName(string databaseName) { var clone = Clone(); @@ -179,7 +184,7 @@ public virtual KafkaOptionsExtension WithDatabaseName(string databaseName) return clone; } - + /// public virtual KafkaOptionsExtension WithApplicationId(string applicationId) { var clone = Clone(); @@ -188,7 +193,7 @@ public virtual KafkaOptionsExtension WithApplicationId(string applicationId) return clone; } - + /// public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServers) { var clone = Clone(); @@ -197,7 +202,7 @@ public virtual KafkaOptionsExtension WithBootstrapServers(string bootstrapServer return clone; } - + /// public virtual KafkaOptionsExtension WithUseDeletePolicyForTopic(bool useDeletePolicyForTopic = false) { var clone = Clone(); @@ -206,7 +211,7 @@ public virtual KafkaOptionsExtension WithUseDeletePolicyForTopic(bool useDeleteP return clone; } - + /// public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedReplicator = true) { var clone = Clone(); @@ -215,7 +220,7 @@ public virtual KafkaOptionsExtension WithCompactedReplicator(bool useCompactedRe return clone; } - + /// public virtual KafkaOptionsExtension WithUsePersistentStorage(bool usePersistentStorage = false) { var clone = Clone(); @@ -224,7 +229,7 @@ public virtual KafkaOptionsExtension WithUsePersistentStorage(bool usePersistent return clone; } - + /// public virtual KafkaOptionsExtension WithDefaultNumPartitions(int defaultNumPartitions = 1) { var clone = Clone(); @@ -233,7 +238,7 @@ public virtual KafkaOptionsExtension WithDefaultNumPartitions(int defaultNumPart return clone; } - + /// public virtual KafkaOptionsExtension WithDefaultConsumerInstances(int? defaultConsumerInstances = null) { var clone = Clone(); @@ -242,7 +247,7 @@ public virtual KafkaOptionsExtension WithDefaultConsumerInstances(int? defaultCo return clone; } - + /// public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultReplicationFactor = 1) { var clone = Clone(); @@ -251,7 +256,7 @@ public virtual KafkaOptionsExtension WithDefaultReplicationFactor(short defaultR return clone; } - + /// public virtual KafkaOptionsExtension WithConsumerConfig(ConsumerConfigBuilder consumerConfigBuilder) { var clone = Clone(); @@ -260,7 +265,7 @@ public virtual KafkaOptionsExtension WithConsumerConfig(ConsumerConfigBuilder co return clone; } - + /// public virtual KafkaOptionsExtension WithProducerConfig(ProducerConfigBuilder producerConfigBuilder) { var clone = Clone(); @@ -269,7 +274,7 @@ public virtual KafkaOptionsExtension WithProducerConfig(ProducerConfigBuilder pr return clone; } - + /// public virtual KafkaOptionsExtension WithStreamsConfig(StreamsConfigBuilder streamsConfigBuilder) { var clone = Clone(); @@ -278,7 +283,7 @@ public virtual KafkaOptionsExtension WithStreamsConfig(StreamsConfigBuilder stre return clone; } - + /// public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicConfigBuilder) { var clone = Clone(); @@ -287,7 +292,7 @@ public virtual KafkaOptionsExtension WithTopicConfig(TopicConfigBuilder topicCon return clone; } - + /// public virtual KafkaOptionsExtension WithOnChangeEvent(Action onChangeEvent) { var clone = Clone(); @@ -296,12 +301,16 @@ public virtual KafkaOptionsExtension WithOnChangeEvent(Action + /// Build for + /// public virtual Properties StreamsOptions(IEntityType entityType) { return StreamsOptions(entityType.ApplicationIdForTable(this)); } - + /// + /// Build for applicationId + /// public virtual Properties StreamsOptions(string applicationId) { Properties props = _streamsConfigBuilder ?? new(); @@ -333,7 +342,9 @@ public virtual Properties StreamsOptions(string applicationId) return props; } - + /// + /// Build for producers + /// public virtual Properties ProducerOptions() { Properties props = _producerConfigBuilder ?? new(); @@ -367,9 +378,9 @@ public virtual Properties ProducerOptions() return props; } - + /// public virtual void ApplyServices(IServiceCollection services) => services.AddEntityFrameworkKafkaDatabase(); - + /// public virtual void Validate(IDbContextOptions options) { var kafkaOptions = options.FindExtension(); diff --git a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs index 48cf7439..26427b91 100644 --- a/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs +++ b/src/net/KEFCore/Infrastructure/Internal/KafkaSingletonOptions.cs @@ -30,6 +30,7 @@ namespace MASES.EntityFrameworkCore.KNet.Infrastructure.Internal; /// public class KafkaSingletonOptions : IKafkaSingletonOptions { + /// public virtual void Initialize(IDbContextOptions options) { var kafkaOptions = options.FindExtension(); @@ -56,7 +57,7 @@ public virtual void Initialize(IDbContextOptions options) OnChangeEvent = kafkaOptions.OnChangeEvent; } } - + /// public virtual void Validate(IDbContextOptions options) { var kafkaOptions = options.FindExtension(); @@ -70,40 +71,40 @@ public virtual void Validate(IDbContextOptions options) nameof(DbContextOptionsBuilder.UseInternalServiceProvider))); } } - - public virtual Type KeySerializationType { get; private set; } - - public virtual Type ValueSerializationType { get; private set; } - - public virtual Type ValueContainerType { get; private set; } - + /// + public virtual Type? KeySerializationType { get; private set; } + /// + public virtual Type? ValueSerializationType { get; private set; } + /// + public virtual Type? ValueContainerType { get; private set; } + /// public virtual bool UseNameMatching { get; private set; } - + /// public virtual string? DatabaseName { get; private set; } - + /// public virtual string? ApplicationId { get; private set; } - + /// public virtual string? BootstrapServers { get; private set; } - + /// public virtual bool UseDeletePolicyForTopic { get; private set; } - + /// public virtual bool UseCompactedReplicator { get; private set; } - + /// public virtual bool UsePersistentStorage { get; private set; } - + /// public virtual int DefaultNumPartitions { get; private set; } - + /// public virtual int? DefaultConsumerInstances { get; private set; } - + /// public virtual int DefaultReplicationFactor { get; private set; } - + /// public virtual ConsumerConfigBuilder? ConsumerConfig { get; private set; } - + /// public virtual ProducerConfigBuilder? ProducerConfig { get; private set; } - + /// public virtual StreamsConfigBuilder? StreamsConfig { get; private set; } - + /// public virtual TopicConfigBuilder? TopicConfig { get; private set; } - - public virtual Action OnChangeEvent { get; private set; } + /// + public virtual Action? OnChangeEvent { get; private set; } } diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs index 18626fb8..b5d488e5 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContext.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContext.cs @@ -168,6 +168,10 @@ public KafkaDbContext(DbContextOptions options) : base(options) /// public virtual Type? ValueContainerType { get; set; } = null; /// + /// Set to to avoid match of s using + /// + public virtual bool UseNameMatching { get; set; } = true; + /// /// The bootstrap servers of the Apache Kafka cluster /// public virtual string? BootstrapServers { get; set; } @@ -176,9 +180,9 @@ public KafkaDbContext(DbContextOptions options) : base(options) /// public virtual string ApplicationId { get; set; } = Guid.NewGuid().ToString(); /// - /// Database name + /// Database name means whe prefix of the topics associated to the instance of /// - public virtual string? DbName { get; set; } + public virtual string? DatabaseName { get; set; } /// /// Default number of partitions associated to each topic /// @@ -229,10 +233,11 @@ public KafkaDbContext(DbContextOptions options) : base(options) protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { if (BootstrapServers == null) throw new ArgumentNullException(nameof(BootstrapServers)); - if (DbName == null) throw new ArgumentNullException(nameof(DbName)); + if (DatabaseName == null) throw new ArgumentNullException(nameof(DatabaseName)); - optionsBuilder.UseKafkaCluster(ApplicationId, DbName, BootstrapServers, (o) => + optionsBuilder.UseKafkaCluster(ApplicationId, DatabaseName, BootstrapServers, (o) => { + o.WithUseNameMatching(UseNameMatching); o.WithConsumerConfig(ConsumerConfig ?? DefaultConsumerConfig); o.WithProducerConfig(ProducerConfig ?? DefaultProducerConfig); o.WithStreamsConfig(StreamsConfig ?? DefaultStreamsConfig).WithDefaultNumPartitions(DefaultNumPartitions); diff --git a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs index 934895b0..8fe2704b 100644 --- a/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs +++ b/src/net/KEFCore/Infrastructure/KafkaDbContextOptionsBuilder.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * @@ -26,6 +23,7 @@ using MASES.KNet.Replicator; using MASES.KNet.Streams; using Org.Apache.Kafka.Clients.Producer; +using Org.Apache.Kafka.Common.Config; using Org.Apache.Kafka.Streams; using Org.Apache.Kafka.Streams.Kstream; using System.ComponentModel; diff --git a/src/net/KEFCore/Query/Internal/AnonymousObject.cs b/src/net/KEFCore/Query/Internal/AnonymousObject.cs index 7fff5f8b..77255dfa 100644 --- a/src/net/KEFCore/Query/Internal/AnonymousObject.cs +++ b/src/net/KEFCore/Query/Internal/AnonymousObject.cs @@ -45,11 +45,11 @@ public AnonymousObject(object[] values) public static bool operator !=(AnonymousObject x, AnonymousObject y) => !x.Equals(y); - + /// public override bool Equals(object? obj) => obj is not null && (obj is AnonymousObject anonymousObject && _values.SequenceEqual(anonymousObject._values)); - + /// public override int GetHashCode() { var hash = new HashCode(); diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs index 669a5a46..80d7ddb5 100644 --- a/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs +++ b/src/net/KEFCore/Query/Internal/KafkaQueryContext.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * @@ -32,12 +29,18 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal; public class KafkaQueryContext : QueryContext { private readonly IKafkaCluster _cluster; - + /// + /// Retrieve for the specified + /// + /// + /// public virtual IEnumerable GetValueBuffers(IEntityType entityType) { return _cluster.GetValueBuffers(entityType); } - + /// + /// Default initializer + /// public KafkaQueryContext(QueryContextDependencies dependencies, IKafkaCluster cluster) : base(dependencies) { diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs b/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs index 3adfba7a..20ce8d0b 100644 --- a/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs +++ b/src/net/KEFCore/Query/Internal/KafkaQueryContextFactory.cs @@ -1,6 +1,3 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - /* * Copyright 2023 MASES s.r.l. * @@ -31,7 +28,9 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal; public class KafkaQueryContextFactory : IQueryContextFactory { private readonly IKafkaCluster _cluster; - + /// + /// Default initializer + /// public KafkaQueryContextFactory( QueryContextDependencies dependencies, IKafkaClusterCache clusterCache, @@ -45,6 +44,6 @@ public KafkaQueryContextFactory( /// Dependencies for this service. /// protected virtual QueryContextDependencies Dependencies { get; } - + /// public virtual QueryContext Create() => new KafkaQueryContext(Dependencies, _cluster); } diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs b/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs index 8ff2b23d..a311aa9e 100644 --- a/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs +++ b/src/net/KEFCore/Query/Internal/KafkaQueryExpression.cs @@ -639,7 +639,7 @@ public virtual EntityShaperExpression AddNavigationToWeakEntityType( return entityShaper; } - + /// public virtual ShapedQueryExpression Clone(Expression shaperExpression) { var clonedKafkaQueryExpression = Clone(); @@ -667,10 +667,10 @@ public virtual Expression GetSingleScalarProjection() public virtual void ConvertToSingleResult(MethodInfo methodInfo) => _singleResultMethodInfo = methodInfo; - + /// public override Type Type => typeof(IEnumerable); - + /// public sealed override ExpressionType NodeType => ExpressionType.Extension; diff --git a/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs b/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs index aa78f990..634a3aba 100644 --- a/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs +++ b/src/net/KEFCore/Query/Internal/KafkaQueryableMethodTranslatingExpressionVisitor.cs @@ -32,7 +32,7 @@ public class KafkaQueryableMethodTranslatingExpressionVisitor : QueryableMethodT private readonly SharedTypeEntityExpandingExpressionVisitor _weakEntityExpandingExpressionVisitor; private readonly KafkaProjectionBindingExpressionVisitor _projectionBindingExpressionVisitor; private readonly IModel _model; - + /// public KafkaQueryableMethodTranslatingExpressionVisitor( QueryableMethodTranslatingExpressionVisitorDependencies dependencies, QueryCompilationContext queryCompilationContext) @@ -43,7 +43,7 @@ public KafkaQueryableMethodTranslatingExpressionVisitor( _projectionBindingExpressionVisitor = new KafkaProjectionBindingExpressionVisitor(this, _expressionTranslator); _model = queryCompilationContext.Model; } - + /// protected KafkaQueryableMethodTranslatingExpressionVisitor( KafkaQueryableMethodTranslatingExpressionVisitor parentVisitor) : base(parentVisitor.Dependencies, parentVisitor.QueryCompilationContext, subquery: true) @@ -53,10 +53,10 @@ protected KafkaQueryableMethodTranslatingExpressionVisitor( _projectionBindingExpressionVisitor = new KafkaProjectionBindingExpressionVisitor(this, _expressionTranslator); _model = parentVisitor._model; } - + /// protected override QueryableMethodTranslatingExpressionVisitor CreateSubqueryVisitor() => new KafkaQueryableMethodTranslatingExpressionVisitor(this); - + /// protected override Expression VisitExtension(Expression extensionExpression) { switch (extensionExpression) @@ -75,7 +75,7 @@ protected override Expression VisitExtension(Expression extensionExpression) return base.VisitExtension(extensionExpression); } } - + /// protected override Expression VisitMethodCall(MethodCallExpression methodCallExpression) { if (methodCallExpression.Method.IsGenericMethod @@ -90,11 +90,13 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp return base.VisitMethodCall(methodCallExpression); } #if NET6_0 + /// protected override ShapedQueryExpression CreateShapedQueryExpression(Type elementType) { throw new NotImplementedException(); } #endif + /// protected override ShapedQueryExpression CreateShapedQueryExpression(IEntityType entityType) => CreateShapedQueryExpressionStatic(entityType); @@ -112,7 +114,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy typeof(ValueBuffer)), false)); } - + /// protected override ShapedQueryExpression? TranslateAll(ShapedQueryExpression source, LambdaExpression predicate) { predicate = Expression.Lambda(Expression.Not(predicate.Body), predicate.Parameters); @@ -139,7 +141,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(bool))); } - + /// protected override ShapedQueryExpression? TranslateAny(ShapedQueryExpression source, LambdaExpression? predicate) { if (predicate != null) @@ -167,21 +169,21 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(bool))); } - + /// protected override ShapedQueryExpression? TranslateAverage( ShapedQueryExpression source, LambdaExpression? selector, Type resultType) => TranslateScalarAggregate(source, selector, nameof(Enumerable.Average), resultType); - + /// protected override ShapedQueryExpression? TranslateCast(ShapedQueryExpression source, Type resultType) => source.ShaperExpression.Type != resultType ? source.UpdateShaperExpression(Expression.Convert(source.ShaperExpression, resultType)) : source; - + /// protected override ShapedQueryExpression? TranslateConcat(ShapedQueryExpression source1, ShapedQueryExpression source2) => TranslateSetOperation(EnumerableMethods.Concat, source1, source2); - + /// protected override ShapedQueryExpression? TranslateContains(ShapedQueryExpression source, Expression item) { var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression; @@ -207,7 +209,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(bool))); } - + /// protected override ShapedQueryExpression? TranslateCount(ShapedQueryExpression source, LambdaExpression? predicate) { if (predicate != null) @@ -235,7 +237,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(int))); } - + /// protected override ShapedQueryExpression? TranslateDefaultIfEmpty(ShapedQueryExpression source, Expression? defaultValue) { if (defaultValue == null) @@ -246,23 +248,23 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy return null; } - + /// protected override ShapedQueryExpression? TranslateDistinct(ShapedQueryExpression source) { ((KafkaQueryExpression)source.QueryExpression).ApplyDistinct(); return source; } - + /// protected override ShapedQueryExpression? TranslateElementAtOrDefault( ShapedQueryExpression source, Expression index, bool returnDefault) => null; - + /// protected override ShapedQueryExpression? TranslateExcept(ShapedQueryExpression source1, ShapedQueryExpression source2) => TranslateSetOperation(EnumerableMethods.Except, source1, source2); - + /// protected override ShapedQueryExpression? TranslateFirstOrDefault( ShapedQueryExpression source, LambdaExpression? predicate, @@ -276,7 +278,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy ? EnumerableMethods.FirstOrDefaultWithoutPredicate : EnumerableMethods.FirstWithoutPredicate); - + /// protected override ShapedQueryExpression? TranslateGroupBy( ShapedQueryExpression source, LambdaExpression keySelector, @@ -376,7 +378,7 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy : Expression.Convert(translation, expression.Type); } } - + /// protected override ShapedQueryExpression? TranslateGroupJoin( ShapedQueryExpression outer, ShapedQueryExpression inner, @@ -384,10 +386,10 @@ private static ShapedQueryExpression CreateShapedQueryExpressionStatic(IEntityTy LambdaExpression innerKeySelector, LambdaExpression resultSelector) => null; - + /// protected override ShapedQueryExpression? TranslateIntersect(ShapedQueryExpression source1, ShapedQueryExpression source2) => TranslateSetOperation(EnumerableMethods.Intersect, source1, source2); - + /// protected override ShapedQueryExpression? TranslateJoin( ShapedQueryExpression outer, ShapedQueryExpression inner, @@ -523,7 +525,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner) && !inner.Type.IsNullableType() && outer.Type.UnwrapNullableType() == inner.Type; } - + /// protected override ShapedQueryExpression? TranslateLastOrDefault( ShapedQueryExpression source, LambdaExpression? predicate, @@ -536,7 +538,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner) returnDefault ? EnumerableMethods.LastOrDefaultWithoutPredicate : EnumerableMethods.LastWithoutPredicate); - + /// protected override ShapedQueryExpression? TranslateLeftJoin( ShapedQueryExpression outer, ShapedQueryExpression inner, @@ -565,7 +567,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner) return TranslateTwoParameterSelector(outer, resultSelector); } - + /// protected override ShapedQueryExpression? TranslateLongCount(ShapedQueryExpression source, LambdaExpression? predicate) { if (predicate != null) @@ -594,16 +596,16 @@ static bool IsConvertedToNullable(Expression outer, Expression inner) return source.UpdateShaperExpression(Expression.Convert(kafkaQueryExpression.GetSingleScalarProjection(), typeof(long))); } - + /// protected override ShapedQueryExpression? TranslateMax( ShapedQueryExpression source, LambdaExpression? selector, Type resultType) => TranslateScalarAggregate(source, selector, nameof(Enumerable.Max), resultType); - + /// protected override ShapedQueryExpression? TranslateMin(ShapedQueryExpression source, LambdaExpression? selector, Type resultType) => TranslateScalarAggregate(source, selector, nameof(Enumerable.Min), resultType); - + /// protected override ShapedQueryExpression? TranslateOfType(ShapedQueryExpression source, Type resultType) { if (source.ShaperExpression is EntityShaperExpression entityShaperExpression) @@ -651,7 +653,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner) return null; } - + /// protected override ShapedQueryExpression? TranslateOrderBy( ShapedQueryExpression source, LambdaExpression keySelector, @@ -676,7 +678,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner) return source; } - + /// protected override ShapedQueryExpression? TranslateReverse(ShapedQueryExpression source) { var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression; @@ -688,7 +690,7 @@ static bool IsConvertedToNullable(Expression outer, Expression inner) return source; } - + /// protected override ShapedQueryExpression TranslateSelect(ShapedQueryExpression source, LambdaExpression selector) { if (selector.Body == selector.Parameters[0]) @@ -702,7 +704,7 @@ protected override ShapedQueryExpression TranslateSelect(ShapedQueryExpression s return source.UpdateShaperExpression(newShaper); } - + /// protected override ShapedQueryExpression? TranslateSelectMany( ShapedQueryExpression source, LambdaExpression collectionSelector, @@ -748,7 +750,7 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp return base.VisitMethodCall(methodCallExpression); } } - + /// protected override ShapedQueryExpression? TranslateSelectMany(ShapedQueryExpression source, LambdaExpression selector) { var innerParameter = Expression.Parameter(selector.ReturnType.GetSequenceType(), "i"); @@ -757,7 +759,7 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp return TranslateSelectMany(source, selector, resultSelector); } - + /// protected override ShapedQueryExpression? TranslateSingleOrDefault( ShapedQueryExpression source, LambdaExpression? predicate, @@ -770,7 +772,7 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp returnDefault ? EnumerableMethods.SingleOrDefaultWithoutPredicate : EnumerableMethods.SingleWithoutPredicate); - + /// protected override ShapedQueryExpression? TranslateSkip(ShapedQueryExpression source, Expression count) { var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression; @@ -790,13 +792,13 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp return source; } - + /// protected override ShapedQueryExpression? TranslateSkipWhile(ShapedQueryExpression source, LambdaExpression predicate) => null; - + /// protected override ShapedQueryExpression? TranslateSum(ShapedQueryExpression source, LambdaExpression? selector, Type resultType) => TranslateScalarAggregate(source, selector, nameof(Enumerable.Sum), resultType); - + /// protected override ShapedQueryExpression? TranslateTake(ShapedQueryExpression source, Expression count) { var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression; @@ -816,10 +818,10 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp return source; } - + /// protected override ShapedQueryExpression? TranslateTakeWhile(ShapedQueryExpression source, LambdaExpression predicate) => null; - + /// protected override ShapedQueryExpression? TranslateThenBy( ShapedQueryExpression source, LambdaExpression keySelector, @@ -843,10 +845,10 @@ protected override Expression VisitMethodCall(MethodCallExpression methodCallExp return source; } - + /// protected override ShapedQueryExpression? TranslateUnion(ShapedQueryExpression source1, ShapedQueryExpression source2) => TranslateSetOperation(EnumerableMethods.Union, source1, source2); - + /// protected override ShapedQueryExpression? TranslateWhere(ShapedQueryExpression source, LambdaExpression predicate) { var kafkaQueryExpression = (KafkaQueryExpression)source.QueryExpression; diff --git a/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs b/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs index 1d110592..f546d5a2 100644 --- a/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs +++ b/src/net/KEFCore/Query/Internal/KafkaShapedQueryCompilingExpressionVisitor.cs @@ -30,7 +30,9 @@ public partial class KafkaShapedQueryCompilingExpressionVisitor : ShapedQueryCom { private readonly Type _contextType; private readonly bool _threadSafetyChecksEnabled; - + /// + /// Default initilizer + /// public KafkaShapedQueryCompilingExpressionVisitor( ShapedQueryCompilingExpressionVisitorDependencies dependencies, QueryCompilationContext queryCompilationContext) @@ -39,7 +41,7 @@ public KafkaShapedQueryCompilingExpressionVisitor( _contextType = queryCompilationContext.ContextType; _threadSafetyChecksEnabled = dependencies.CoreSingletonOptions.AreThreadSafetyChecksEnabled; } - + /// protected override Expression VisitExtension(Expression extensionExpression) { switch (extensionExpression) @@ -53,7 +55,7 @@ protected override Expression VisitExtension(Expression extensionExpression) return base.VisitExtension(extensionExpression); } - + /// protected override Expression VisitShapedQuery(ShapedQueryExpression shapedQueryExpression) { var kafkaQueryExpression = (KafkaQueryExpression)shapedQueryExpression.QueryExpression; diff --git a/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs b/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs index e91741a5..7740210c 100644 --- a/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs +++ b/src/net/KEFCore/Query/Internal/KafkaShapedQueryExpressionVisitorFactory.cs @@ -28,6 +28,9 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal; /// public class KafkaShapedQueryCompilingExpressionVisitorFactory : IShapedQueryCompilingExpressionVisitorFactory { + /// + /// Default initializer + /// public KafkaShapedQueryCompilingExpressionVisitorFactory( ShapedQueryCompilingExpressionVisitorDependencies dependencies) { @@ -38,7 +41,7 @@ public KafkaShapedQueryCompilingExpressionVisitorFactory( /// Dependencies for this service. /// protected virtual ShapedQueryCompilingExpressionVisitorDependencies Dependencies { get; } - + /// public virtual ShapedQueryCompilingExpressionVisitor Create(QueryCompilationContext queryCompilationContext) => new KafkaShapedQueryCompilingExpressionVisitor(Dependencies, queryCompilationContext); } diff --git a/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs b/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs index 7fd627da..18408629 100644 --- a/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs +++ b/src/net/KEFCore/Query/Internal/KafkaTableExpression.cs @@ -28,19 +28,24 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal; /// public class KafkaTableExpression : Expression, IPrintableExpression { + /// + /// Default initializer + /// public KafkaTableExpression(IEntityType entityType) { EntityType = entityType; } - + /// public override Type Type => typeof(IEnumerable); - + /// + /// associated to the + /// public virtual IEntityType EntityType { get; } - + /// public sealed override ExpressionType NodeType => ExpressionType.Extension; - + /// protected override Expression VisitChildren(ExpressionVisitor visitor) => this; diff --git a/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs b/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs index a4ab613d..d2273933 100644 --- a/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs +++ b/src/net/KEFCore/Query/Internal/SingleResultShaperExpression.cs @@ -28,6 +28,9 @@ namespace MASES.EntityFrameworkCore.KNet.Query.Internal; /// public class SingleResultShaperExpression : Expression, IPrintableExpression { + /// + /// Default initializer + /// public SingleResultShaperExpression( Expression projection, Expression innerShaper) @@ -36,7 +39,7 @@ public SingleResultShaperExpression( InnerShaper = innerShaper; Type = innerShaper.Type; } - + /// protected override Expression VisitChildren(ExpressionVisitor visitor) { var projection = visitor.Visit(Projection); @@ -49,14 +52,18 @@ public virtual SingleResultShaperExpression Update(Expression projection, Expres => projection != Projection || innerShaper != InnerShaper ? new SingleResultShaperExpression(projection, innerShaper) : this; - + /// public sealed override ExpressionType NodeType => ExpressionType.Extension; - + /// public override Type Type { get; } - + /// + /// Projection + /// public virtual Expression Projection { get; } - + /// + /// Inner shaper + /// public virtual Expression InnerShaper { get; } void IPrintableExpression.Print(ExpressionPrinter expressionPrinter) diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs index f49e03a0..8cf8a1f6 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducer.cs @@ -47,9 +47,9 @@ public class EntityTypeProducer? _kafkaCompactedReplicator; private readonly IKNetProducer? _kafkaProducer; - private readonly IKafkaStreamsBaseRetriever _streamData; - private readonly IKNetSerDes _keySerdes; - private readonly IKNetSerDes _valueSerdes; + private readonly IKafkaStreamsBaseRetriever? _streamData; + private readonly IKNetSerDes? _keySerdes; + private readonly IKNetSerDes? _valueSerdes; private readonly Action? _onChangeEvent; #region KNetCompactedReplicatorEnumerable @@ -67,8 +67,8 @@ class KNetCompactedReplicatorEnumerator : IEnumerator Stopwatch _valueBufferSw = new Stopwatch(); #endif readonly IEntityType _entityType; - IKNetCompactedReplicator? _kafkaCompactedReplicator; - readonly IEnumerator> _enumerator; + readonly IKNetCompactedReplicator? _kafkaCompactedReplicator; + readonly IEnumerator>? _enumerator; public KNetCompactedReplicatorEnumerator(IEntityType entityType, IKNetCompactedReplicator? kafkaCompactedReplicator) { _entityType = entityType; @@ -95,7 +95,7 @@ public ValueBuffer Current { _currentSw.Start(); #endif - return _current.HasValue ? _current.Value : default; + return _current ?? default; #if DEBUG_PERFORMANCE } finally @@ -127,13 +127,13 @@ public bool MoveNext() { _moveNextSw.Start(); #endif - if (_enumerator.MoveNext()) + if (_enumerator != null && _enumerator.MoveNext()) { #if DEBUG_PERFORMANCE _cycles++; _valueBufferSw.Start(); #endif - object[] array = null; + object[] array = null!; _enumerator.Current.Value.GetData(_entityType, ref array); #if DEBUG_PERFORMANCE _valueBufferSw.Stop(); @@ -180,7 +180,9 @@ IEnumerator IEnumerable.GetEnumerator() } } #endregion - + /// + /// Default initializer + /// public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) { #if DEBUG_PERFORMANCE @@ -197,6 +199,9 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) _keySerdes = new TKeySerializer() as IKNetSerDes; _valueSerdes = new TValueSerializer() as IKNetSerDes; + if (_keySerdes == null) throw new InvalidOperationException($"{typeof(TKeySerializer)} is not a {typeof(IKNetSerDes)}"); + if (_valueSerdes == null) throw new InvalidOperationException($"{typeof(TValueSerializer)} is not a {typeof(IKNetSerDes)}"); + if (_useCompactedReplicator) { _kafkaCompactedReplicator = new KNetCompactedReplicator() @@ -230,12 +235,12 @@ public EntityTypeProducer(IEntityType entityType, IKafkaCluster cluster) else { _kafkaProducer = new KNetProducer(_cluster.Options.ProducerOptions(), _keySerdes, _valueSerdes); - _streamData = new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes, _valueSerdes); + _streamData = new KafkaStreamsTableRetriever(cluster, entityType, _keySerdes!, _valueSerdes!); } } - + /// public virtual IEntityType EntityType => _entityType; - + /// public IEnumerable> Commit(IEnumerable records) { if (_useCompactedReplicator) @@ -246,7 +251,7 @@ public IEnumerable> Commit(IEnumerable reco if (_kafkaCompactedReplicator != null) _kafkaCompactedReplicator[record.Key] = value!; } - return null; + return null!; } else { @@ -254,7 +259,7 @@ public IEnumerable> Commit(IEnumerable reco foreach (KafkaRowBag record in records) { var future = _kafkaProducer?.Send(new KNetProducerRecord(record.AssociatedTopicName, 0, record.Key, record.Value(TValueContainerConstructor)!)); - futures.Add(future); + futures.Add(future!); } _kafkaProducer?.Flush(); @@ -262,7 +267,7 @@ public IEnumerable> Commit(IEnumerable reco return futures; } } - + /// public void Dispose() { if (_kafkaCompactedReplicator != null) @@ -280,7 +285,7 @@ public void Dispose() _streamData?.Dispose(); } } - + /// public IEnumerable ValueBuffers { get diff --git a/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs index 8ebfa800..b309948c 100644 --- a/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs +++ b/src/net/KEFCore/Storage/Internal/EntityTypeProducers.cs @@ -30,9 +30,10 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public class EntityTypeProducers { - static IEntityTypeProducer? _globalProducer = null; static readonly ConcurrentDictionary _producers = new(); - + /// + /// Allocates a new + /// public static IEntityTypeProducer Create(IEntityType entityType, IKafkaCluster cluster) where TKey : notnull where TValueContainer : class, IValueContainer @@ -41,7 +42,9 @@ public static IEntityTypeProducer Create CreateProducerLocal(entityType, cluster)); } - + /// + /// Dispose a previously allocated + /// public static void Dispose(IEntityTypeProducer producer) { if (!_producers.TryRemove(new KeyValuePair(producer.EntityType, producer))) diff --git a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs index 57d1a720..b88487cc 100644 --- a/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs +++ b/src/net/KEFCore/Storage/Internal/IEntityTypeProducer.cs @@ -30,9 +30,18 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public interface IEntityTypeProducer : IDisposable { + /// + /// Associated + /// IEntityType EntityType { get; } - + /// + /// Stores an + /// + /// The to be stored + /// IEnumerable> Commit(IEnumerable records); - + /// + /// The current s + /// IEnumerable ValueBuffers { get; } } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs index f2108593..eb173296 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaCluster.cs @@ -28,21 +28,40 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public interface IKafkaCluster : IDisposable { + /// + /// Execute the + /// bool EnsureDeleted(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger); - + /// + /// Execute the + /// bool EnsureCreated(IUpdateAdapterFactory updateAdapterFactory, IModel designModel, IDiagnosticsLogger updateLogger); - + /// + /// Execute the + /// bool EnsureConnected(IModel designModel, IDiagnosticsLogger updateLogger); - + /// + /// Creates a table for on Apache Kafka cluster + /// string CreateTable(IEntityType entityType); - + /// + /// Retrieve the + /// IEnumerable GetValueBuffers(IEntityType entityType); - + /// + /// Gets the + /// KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property); - + /// + /// Executes a transaction + /// int ExecuteTransaction(IList entries, IDiagnosticsLogger updateLogger); - + /// + /// The Apche Kafka cluster identifier + /// string ClusterId { get; } - + /// + /// The + /// KafkaOptionsExtension Options { get; } } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs b/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs index c46d9858..1b788841 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaClusterCache.cs @@ -27,7 +27,12 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public interface IKafkaClusterCache { + /// + /// Gets an + /// IKafkaCluster GetCluster(KafkaOptionsExtension options); - + /// + /// Dispose an + /// void Dispose(IKafkaCluster cluster); } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs b/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs index 1e00226d..d3a056fc 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaDatabase.cs @@ -25,11 +25,20 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public interface IKafkaDatabase : IDatabase, IDisposable { + /// + /// The referring + /// IKafkaCluster Cluster { get; } - + /// + /// Execute the + /// bool EnsureDatabaseDeleted(); - + /// + /// Execute the + /// bool EnsureDatabaseCreated(); - + /// + /// Execute the + /// bool EnsureDatabaseConnected(); } \ No newline at end of file diff --git a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs index cf3650b5..2c5945e4 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaRowBag.cs @@ -27,7 +27,12 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public interface IKafkaRowBag { + /// + /// The with changes + /// IUpdateEntry UpdateEntry { get; } - + /// + /// The topic data will be stored + /// string AssociatedTopicName { get; } } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs index 55cb0b97..2c65ab4c 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaTable.cs @@ -29,21 +29,36 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public interface IKafkaTable : IEntityTypeProducer { + /// + /// Create snapshot + /// IReadOnlyList SnapshotRows(); - + /// + /// Current rows + /// IEnumerable Rows { get; } - + /// + /// Creates a new row + /// IKafkaRowBag Create(IUpdateEntry entry); - + /// + /// Deletes a row + /// IKafkaRowBag Delete(IUpdateEntry entry); - + /// + /// Updates a row + /// IKafkaRowBag Update(IUpdateEntry entry); - + /// + /// Get an + /// KafkaIntegerValueGenerator GetIntegerValueGenerator(IProperty property, IReadOnlyList tables); - + /// + /// Bumps values + /// void BumpValueGenerators(object?[] row); - + /// + /// The referring + /// IKafkaCluster Cluster { get; } - - IEntityType EntityType { get; } } diff --git a/src/net/KEFCore/Storage/Internal/IKafkaTableFactory.cs b/src/net/KEFCore/Storage/Internal/IKafkaTableFactory.cs index 5d57e5f6..31123554 100644 --- a/src/net/KEFCore/Storage/Internal/IKafkaTableFactory.cs +++ b/src/net/KEFCore/Storage/Internal/IKafkaTableFactory.cs @@ -25,7 +25,15 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public interface IKafkaTableFactory { + /// + /// Allocate a new + /// + /// + /// + /// IKafkaTable Create(IKafkaCluster cluster, IEntityType entityType); - + /// + /// Dispose a previously allocated + /// void Dispose(IKafkaTable table); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs index ff2f9254..d91b8cbd 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaCluster.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaCluster.cs @@ -45,14 +45,16 @@ public class KafkaCluster : IKafkaCluster private readonly object _lock = new(); private System.Collections.Generic.Dictionary? _tables; - + /// + /// Dfault initializer + /// public KafkaCluster(KafkaOptionsExtension options, IKafkaTableFactory tableFactory) { _options = options; _tableFactory = tableFactory; _useNameMatching = options.UseNameMatching; } - + /// public virtual void Dispose() { #if DEBUG_PERFORMANCE @@ -67,11 +69,11 @@ public virtual void Dispose() } _tables?.Clear(); } - + /// public virtual string ClusterId => _options.ClusterId; - + /// public virtual KafkaOptionsExtension Options => _options; - + /// public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator( IProperty property) { @@ -84,7 +86,7 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator EnsureTable(type)).ToArray()); } } - + /// public virtual bool EnsureDeleted( IUpdateAdapterFactory updateAdapterFactory, IModel designModel, @@ -139,7 +141,7 @@ public virtual bool EnsureDeleted( return true; } - + /// public virtual bool EnsureCreated( IUpdateAdapterFactory updateAdapterFactory, IModel designModel, @@ -174,14 +176,14 @@ public virtual bool EnsureCreated( return valuesSeeded; } } - + /// public virtual bool EnsureConnected( IModel designModel, IDiagnosticsLogger updateLogger) { return true; } - + /// public virtual string CreateTable(IEntityType entityType) { return CreateTable(entityType, 0); @@ -227,7 +229,7 @@ private string CreateTable(IEntityType entityType, int cycle) } private static System.Collections.Generic.Dictionary CreateTables() => new(); - + /// public virtual IEnumerable GetValueBuffers(IEntityType entityType) { lock (_lock) @@ -259,7 +261,7 @@ public virtual IEnumerable GetValueBuffers(IEntityType entityType) #endif } } - + /// public virtual int ExecuteTransaction( System.Collections.Generic.IList entries, IDiagnosticsLogger updateLogger) diff --git a/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs b/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs index aa1024a1..eb54d0ff 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaClusterCache.cs @@ -30,7 +30,9 @@ public class KafkaClusterCache : IKafkaClusterCache { private readonly IKafkaTableFactory _tableFactory; private readonly ConcurrentDictionary _namedClusters; - + /// + /// Default initializer + /// public KafkaClusterCache( IKafkaTableFactory tableFactory, IKafkaSingletonOptions? options) @@ -38,10 +40,10 @@ public KafkaClusterCache( _tableFactory = tableFactory; _namedClusters = new ConcurrentDictionary(); } - + /// public virtual IKafkaCluster GetCluster(KafkaOptionsExtension options) => _namedClusters.GetOrAdd(options.ClusterId, _ => new KafkaCluster(options, _tableFactory)); - + /// public virtual void Dispose(IKafkaCluster cluster) { if (cluster != null) diff --git a/src/net/KEFCore/Storage/Internal/KafkaClusterCacheExtensions.cs b/src/net/KEFCore/Storage/Internal/KafkaClusterCacheExtensions.cs index f63e927a..f8df6a8e 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaClusterCacheExtensions.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaClusterCacheExtensions.cs @@ -27,6 +27,9 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public static class KafkaClusterCacheExtensions { + /// + /// Gets the + /// public static IKafkaCluster GetCluster(this IKafkaClusterCache storeCache, IDbContextOptions options) => storeCache.GetCluster(options.Extensions.OfType().First()); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs b/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs index 7a0f6c41..bda4f38e 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaDatabase.cs @@ -30,7 +30,9 @@ public class KafkaDatabase : Database, IKafkaDatabase private readonly IUpdateAdapterFactory _updateAdapterFactory; private readonly IDiagnosticsLogger _updateLogger; private readonly IDesignTimeModel _designTimeModel; - + /// + /// Default initializer + /// public KafkaDatabase( DatabaseDependencies dependencies, IKafkaClusterCache clusterCache, @@ -46,29 +48,29 @@ public KafkaDatabase( _updateAdapterFactory = updateAdapterFactory; _updateLogger = updateLogger; } - + /// public void Dispose() { _clusterCache.Dispose(_cluster); } - + /// public virtual IKafkaCluster Cluster => _cluster; - + /// public override int SaveChanges(IList entries) => _cluster.ExecuteTransaction(entries, _updateLogger); - + /// public override Task SaveChangesAsync( IList entries, CancellationToken cancellationToken = default) => cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : Task.FromResult(_cluster.ExecuteTransaction(entries, _updateLogger)); - + /// public virtual bool EnsureDatabaseDeleted() => _cluster.EnsureDeleted(_updateAdapterFactory, _designTimeModel.Model, _updateLogger); - + /// public virtual bool EnsureDatabaseCreated() => _cluster.EnsureCreated(_updateAdapterFactory, _designTimeModel.Model, _updateLogger); - + /// public virtual bool EnsureDatabaseConnected() => _cluster.EnsureConnected(_designTimeModel.Model, _updateLogger); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaDatabaseCreator.cs b/src/net/KEFCore/Storage/Internal/KafkaDatabaseCreator.cs index b4b30b69..1b2bb4d2 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaDatabaseCreator.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaDatabaseCreator.cs @@ -26,30 +26,35 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; public class KafkaDatabaseCreator : IDatabaseCreator { private readonly IDatabase _database; - + /// + /// Default initializer + /// + /// public KafkaDatabaseCreator(IDatabase database) { _database = database; } - + /// + /// The + /// protected virtual IKafkaDatabase Database => (IKafkaDatabase)_database; - + /// public virtual bool EnsureDeleted() => Database.EnsureDatabaseDeleted(); - + /// public virtual Task EnsureDeletedAsync(CancellationToken cancellationToken = default) => Task.FromResult(Database.EnsureDatabaseDeleted()); - + /// public virtual bool EnsureCreated() => Database.EnsureDatabaseCreated(); - + /// public virtual Task EnsureCreatedAsync(CancellationToken cancellationToken = default) => Task.FromResult(Database.EnsureDatabaseCreated()); - + /// public virtual bool CanConnect() => Database.EnsureDatabaseConnected(); - + /// public virtual Task CanConnectAsync(CancellationToken cancellationToken = default) => Task.FromResult(Database.EnsureDatabaseConnected()); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs index 4e37b436..c7f54c8b 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaRowBag.cs @@ -31,6 +31,9 @@ public class KafkaRowBag : IKafkaRowBag where TKey : notnull where TValueContainer : IValueContainer { + /// + /// Default initializer + /// public KafkaRowBag(IUpdateEntry entry, string topicName, TKey key, object?[]? row) { UpdateEntry = entry; @@ -38,14 +41,20 @@ public KafkaRowBag(IUpdateEntry entry, string topicName, TKey key, object?[]? ro Key = key; ValueBuffer = row; } - + /// public IUpdateEntry UpdateEntry { get; private set; } - + /// public string AssociatedTopicName { get; private set; } - + /// + /// The Key + /// public TKey Key { get; private set; } - + /// + /// The Value + /// public TValueContainer? Value(ConstructorInfo ci) => UpdateEntry.EntityState == EntityState.Deleted ? default : (TValueContainer)ci.Invoke(new object[] { UpdateEntry.EntityType, ValueBuffer! }); - + /// + /// The content + /// public object?[]? ValueBuffer { get; private set; } } diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs index ac5b2b37..d6f446f5 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsBaseRetriever.cs @@ -70,7 +70,9 @@ public class KafkaStreamsBaseRetriever : IKafkaStreamsBaseRe private Exception? _resultException = null; private KafkaStreams.State _currentState = KafkaStreams.State.NOT_RUNNING; private ReadOnlyKeyValueStore? keyValueStore; - + /// + /// Default initializer + /// public KafkaStreamsBaseRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes valueSerdes, string storageId, StreamsBuilder builder, KStream root) { _kafkaCluster = kafkaCluster; @@ -165,7 +167,7 @@ private void StartTopology(StreamsBuilder builder, KStream root) keyValueStore ??= _streams?.Store(StoreQueryParameters>.FromNameAndType(_storageId, QueryableStoreTypes.KeyValueStore())); } - + /// public IEnumerator GetEnumerator() { if (_resultException != null) throw _resultException; @@ -179,7 +181,7 @@ System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return GetEnumerator(); } - + /// public void Dispose() { _streams?.Close(); @@ -248,7 +250,7 @@ public ValueBuffer Current _valueSerdesSw.Stop(); _valueBufferSw.Start(); #endif - object[] array = null; + object[] array = null!; entityTypeData.GetData(_entityType, ref array); #if DEBUG_PERFORMANCE _valueBufferSw.Stop(); diff --git a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs index de8f95aa..a24714d5 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaStreamsTableRetriever.cs @@ -31,13 +31,19 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// doing so can result in application failures when updating to a new Entity Framework Core release. /// public sealed class KafkaStreamsTableRetriever : KafkaStreamsBaseRetriever + where TKey :notnull where TValueContainer : IValueContainer { + /// + /// Initializer + /// public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes valueSerdes) : this(kafkaCluster, entityType, keySerdes, valueSerdes, new StreamsBuilder()) { } - + /// + /// Initializer + /// public KafkaStreamsTableRetriever(IKafkaCluster kafkaCluster, IEntityType entityType, IKNetSerDes keySerdes, IKNetSerDes valueSerdes, StreamsBuilder builder) : base(kafkaCluster, entityType, keySerdes, valueSerdes, entityType.StorageIdForTable(kafkaCluster.Options), builder, builder.Stream(entityType.TopicName(kafkaCluster.Options))) { diff --git a/src/net/KEFCore/Storage/Internal/KafkaTable.cs b/src/net/KEFCore/Storage/Internal/KafkaTable.cs index 5dc97f6b..01169a12 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTable.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTable.cs @@ -48,7 +48,9 @@ public class KafkaTable private Dictionary? _integerGenerators; readonly IEntityTypeProducer _producer; private readonly string _tableAssociatedTopicName; - + /// + /// Default initializer + /// public KafkaTable( IKafkaCluster cluster, IEntityType entityType, @@ -84,7 +86,7 @@ public KafkaTable( } } } - + /// public virtual void Dispose() { #if DEBUG_PERFORMANCE @@ -92,11 +94,11 @@ public virtual void Dispose() #endif EntityTypeProducers.Dispose(_producer!); } - + /// public virtual IKafkaCluster Cluster { get; } - + /// public virtual IEntityType EntityType { get; } - + /// public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator( IProperty property, IReadOnlyList tables) @@ -120,13 +122,13 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator)generator; } - + /// public virtual IEnumerable> Commit(IEnumerable records) => _producer.Commit(records); - + /// public virtual IEnumerable ValueBuffers => _producer.ValueBuffers; - + /// public virtual IEnumerable Rows => RowsInTable(); - + /// public virtual IReadOnlyList SnapshotRows() { var rows = Rows.ToList(); @@ -164,7 +166,7 @@ public virtual KafkaIntegerValueGenerator GetIntegerValueGenerator RowsInTable() => _rows.Values; private static List GetKeyComparers(IEnumerable properties) => properties.Select(p => p.GetKeyValueComparer()).ToList(); - + /// public virtual IKafkaRowBag Create(IUpdateEntry entry) { var properties = entry.EntityType.GetProperties().ToArray(); @@ -191,7 +193,7 @@ public virtual IKafkaRowBag Create(IUpdateEntry entry) return new KafkaRowBag(entry, _tableAssociatedTopicName, key, row); } - + /// public virtual IKafkaRowBag Delete(IUpdateEntry entry) { var key = CreateKey(entry); @@ -251,7 +253,7 @@ private static bool IsConcurrencyConflict( return false; } - + /// public virtual IKafkaRowBag Update(IUpdateEntry entry) { var key = CreateKey(entry); @@ -302,7 +304,7 @@ public virtual IKafkaRowBag Update(IUpdateEntry entry) throw new DbUpdateConcurrencyException(KafkaStrings.UpdateConcurrencyException, new[] { entry }); } } - + /// public virtual void BumpValueGenerators(object?[] row) { if (_integerGenerators != null) @@ -314,7 +316,7 @@ public virtual void BumpValueGenerators(object?[] row) } } - private TKey CreateKey(IUpdateEntry entry) => _keyValueFactory.CreateFromCurrentValues(entry); + private TKey CreateKey(IUpdateEntry entry) => _keyValueFactory.CreateFromCurrentValues(entry)!; private static object? SnapshotValue(IProperty property, ValueComparer? comparer, IUpdateEntry entry) { diff --git a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs index 280cd4a1..92033979 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTableFactory.cs @@ -33,7 +33,9 @@ public class KafkaTableFactory : IKafkaTableFactory private readonly bool _sensitiveLoggingEnabled; private readonly ConcurrentDictionary<(IKafkaCluster Cluster, IEntityType EntityType), Func> _factories = new(); - + /// + /// Default initializer + /// public KafkaTableFactory( ILoggingOptions loggingOptions, IKafkaSingletonOptions options) @@ -41,10 +43,10 @@ public KafkaTableFactory( _options = options; _sensitiveLoggingEnabled = loggingOptions.IsSensitiveDataLoggingEnabled; } - + /// public virtual IKafkaTable Create(IKafkaCluster cluster, IEntityType entityType) => _factories.GetOrAdd((cluster, entityType), e => CreateTable(e.Cluster, e.EntityType))(); - + /// public void Dispose(IKafkaTable table) { table.Dispose(); diff --git a/src/net/KEFCore/Storage/Internal/KafkaTableSnapshot.cs b/src/net/KEFCore/Storage/Internal/KafkaTableSnapshot.cs deleted file mode 100644 index a4167a2d..00000000 --- a/src/net/KEFCore/Storage/Internal/KafkaTableSnapshot.cs +++ /dev/null @@ -1,39 +0,0 @@ -/* -* Copyright 2023 MASES s.r.l. -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -* -* Refer to LICENSE for more information. -*/ - -namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; -/// -/// This is an internal API that supports the Entity Framework Core infrastructure and not subject to -/// the same compatibility standards as public APIs. It may be changed or removed without notice in -/// any release. You should only use it directly in your code with extreme caution and knowing that -/// doing so can result in application failures when updating to a new Entity Framework Core release. -/// -public class KafkaTableSnapshot -{ - public KafkaTableSnapshot( - IEntityType entityType, - IReadOnlyList rows) - { - EntityType = entityType; - Rows = rows; - } - - public virtual IEntityType EntityType { get; } - - public virtual IReadOnlyList Rows { get; } -} diff --git a/src/net/KEFCore/Storage/Internal/KafkaTransaction.cs b/src/net/KEFCore/Storage/Internal/KafkaTransaction.cs index 0a654a4e..9480e3b9 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTransaction.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTransaction.cs @@ -25,26 +25,27 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public class KafkaTransaction : IDbContextTransaction { + /// public virtual Guid TransactionId { get; } = Guid.NewGuid(); - + /// public virtual void Commit() { } - + /// public virtual Task CommitAsync(CancellationToken cancellationToken = default) => Task.CompletedTask; - + /// public virtual void Rollback() { } - + /// public virtual Task RollbackAsync(CancellationToken cancellationToken = default) => Task.CompletedTask; - + /// public virtual void Dispose() { } - + /// public virtual ValueTask DisposeAsync() => default; } diff --git a/src/net/KEFCore/Storage/Internal/KafkaTransactionManager.cs b/src/net/KEFCore/Storage/Internal/KafkaTransactionManager.cs index 463c37c2..51621aa9 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTransactionManager.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTransactionManager.cs @@ -32,20 +32,22 @@ public class KafkaTransactionManager : IDbContextTransactionManager, ITransactio private static readonly KafkaTransaction StubTransaction = new(); private readonly IDiagnosticsLogger _logger; - + /// + /// Default initializer + /// public KafkaTransactionManager( IDiagnosticsLogger logger) { _logger = logger; } - + /// public virtual IDbContextTransaction BeginTransaction() { _logger.TransactionIgnoredWarning(); return StubTransaction; } - + /// public virtual Task BeginTransactionAsync( CancellationToken cancellationToken = default) { @@ -53,38 +55,38 @@ public virtual Task BeginTransactionAsync( return Task.FromResult(StubTransaction); } - + /// public virtual void CommitTransaction() => _logger.TransactionIgnoredWarning(); - + /// public virtual Task CommitTransactionAsync(CancellationToken cancellationToken = default) { _logger.TransactionIgnoredWarning(); return Task.CompletedTask; } - + /// public virtual void RollbackTransaction() => _logger.TransactionIgnoredWarning(); - + /// public virtual Task RollbackTransactionAsync(CancellationToken cancellationToken = default) { _logger.TransactionIgnoredWarning(); return Task.CompletedTask; } - + /// public virtual IDbContextTransaction? CurrentTransaction => null; - + /// public virtual Transaction? EnlistedTransaction => null; - + /// public virtual void EnlistTransaction(Transaction? transaction) => _logger.TransactionIgnoredWarning(); - + /// public virtual void ResetState() { } - + /// public virtual Task ResetStateAsync(CancellationToken cancellationToken = default) { ResetState(); diff --git a/src/net/KEFCore/Storage/Internal/KafkaTypeMapping.cs b/src/net/KEFCore/Storage/Internal/KafkaTypeMapping.cs index 7a4aa453..27f8f3be 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTypeMapping.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTypeMapping.cs @@ -25,6 +25,9 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public class KafkaTypeMapping : CoreTypeMapping { + /// + /// Default initializer + /// public KafkaTypeMapping( Type clrType, ValueComparer? comparer = null, @@ -42,7 +45,7 @@ private KafkaTypeMapping(CoreTypeMappingParameters parameters) : base(parameters) { } - + /// public override CoreTypeMapping Clone(ValueConverter? converter) => new KafkaTypeMapping(Parameters.WithComposedConverter(converter)); } diff --git a/src/net/KEFCore/Storage/Internal/KafkaTypeMappingSource.cs b/src/net/KEFCore/Storage/Internal/KafkaTypeMappingSource.cs index 9f5473cd..3d8e2a50 100644 --- a/src/net/KEFCore/Storage/Internal/KafkaTypeMappingSource.cs +++ b/src/net/KEFCore/Storage/Internal/KafkaTypeMappingSource.cs @@ -25,11 +25,14 @@ namespace MASES.EntityFrameworkCore.KNet.Storage.Internal; /// public class KafkaTypeMappingSource : TypeMappingSource { + /// + /// Default initializer + /// public KafkaTypeMappingSource(TypeMappingSourceDependencies dependencies) : base(dependencies) { } - + /// protected override CoreTypeMapping? FindMapping(in TypeMappingInfo mappingInfo) { var clrType = mappingInfo.ClrType; diff --git a/src/net/KEFCore/ValueGeneration/Internal/IKafkaIntegerValueGenerator.cs b/src/net/KEFCore/ValueGeneration/Internal/IKafkaIntegerValueGenerator.cs index 18a4ebc1..798747c8 100644 --- a/src/net/KEFCore/ValueGeneration/Internal/IKafkaIntegerValueGenerator.cs +++ b/src/net/KEFCore/ValueGeneration/Internal/IKafkaIntegerValueGenerator.cs @@ -25,5 +25,8 @@ namespace MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; /// public interface IKafkaIntegerValueGenerator { + /// + /// Execute a bump on data + /// void Bump(object?[] row); } diff --git a/src/net/KEFCore/ValueGeneration/Internal/KafkaIntegerValueGenerator.cs b/src/net/KEFCore/ValueGeneration/Internal/KafkaIntegerValueGenerator.cs index a2bda1e7..71c9d359 100644 --- a/src/net/KEFCore/ValueGeneration/Internal/KafkaIntegerValueGenerator.cs +++ b/src/net/KEFCore/ValueGeneration/Internal/KafkaIntegerValueGenerator.cs @@ -29,12 +29,14 @@ public class KafkaIntegerValueGenerator : ValueGenerator, IKafka { private readonly int _propertyIndex; private long _current; - + /// + /// Default initializer + /// public KafkaIntegerValueGenerator(int propertyIndex) { _propertyIndex = propertyIndex; } - + /// public virtual void Bump(object?[] row) { var newValue = (long)Convert.ChangeType(row[_propertyIndex]!, typeof(long)); @@ -44,10 +46,10 @@ public virtual void Bump(object?[] row) Interlocked.Exchange(ref _current, newValue); } } - + /// public override TValue Next(EntityEntry entry) => (TValue)Convert.ChangeType(Interlocked.Increment(ref _current), typeof(TValue), CultureInfo.InvariantCulture); - + /// public override bool GeneratesTemporaryValues => false; } diff --git a/src/net/KEFCore/ValueGeneration/Internal/KafkaValueGeneratorSelector.cs b/src/net/KEFCore/ValueGeneration/Internal/KafkaValueGeneratorSelector.cs index 4d57e3dc..d4546c56 100644 --- a/src/net/KEFCore/ValueGeneration/Internal/KafkaValueGeneratorSelector.cs +++ b/src/net/KEFCore/ValueGeneration/Internal/KafkaValueGeneratorSelector.cs @@ -28,7 +28,9 @@ namespace MASES.EntityFrameworkCore.KNet.ValueGeneration.Internal; public class KafkaValueGeneratorSelector : ValueGeneratorSelector { private readonly IKafkaCluster _kafkaCluster; - + /// + /// Default initializer + /// public KafkaValueGeneratorSelector( ValueGeneratorSelectorDependencies dependencies, IKafkaDatabase kafkaDatabase) @@ -36,7 +38,7 @@ public KafkaValueGeneratorSelector( { _kafkaCluster = kafkaDatabase.Cluster; } - + /// public override ValueGenerator Select(IProperty property, IEntityType entityType) => property.GetValueGeneratorFactory() == null && property.ClrType.IsInteger() diff --git a/src/net/templates/templates/kefcoreApp/Program.cs b/src/net/templates/templates/kefcoreApp/Program.cs index c147c51a..1ca45f77 100644 --- a/src/net/templates/templates/kefcoreApp/Program.cs +++ b/src/net/templates/templates/kefcoreApp/Program.cs @@ -17,7 +17,7 @@ static void Main(string[] args) { BootstrapServers = "KAFKA-BROKER:9092", ApplicationId = "MyApplicationId", - DbName = "MyDB", + DatabaseName = "MyDB", }; // cleanup topics on Broker context.Database.EnsureDeleted(); diff --git a/src/net/templates/templates/kefcoreAppWithEvents/Program.cs b/src/net/templates/templates/kefcoreAppWithEvents/Program.cs index 28bf2363..e3179966 100644 --- a/src/net/templates/templates/kefcoreAppWithEvents/Program.cs +++ b/src/net/templates/templates/kefcoreAppWithEvents/Program.cs @@ -23,7 +23,7 @@ static void Main(string[] args) { BootstrapServers = "KAFKA-BROKER:9092", ApplicationId = "MyApplicationId", - DbName = "MyDB", + DatabaseName = "MyDB", OnChangeEvent = OnEvent }; // cleanup topics on Broker diff --git a/test/KEFCore.Benchmark.Avro.Test/Program.cs b/test/KEFCore.Benchmark.Avro.Test/Program.cs index 6b513583..0329433c 100644 --- a/test/KEFCore.Benchmark.Avro.Test/Program.cs +++ b/test/KEFCore.Benchmark.Avro.Test/Program.cs @@ -86,7 +86,7 @@ static void Main(string[] args) { BootstrapServers = config.BootstrapServers, ApplicationId = config.ApplicationId, - DbName = databaseName, + DatabaseName = databaseName, StreamsConfig = streamConfig, }) { @@ -147,7 +147,7 @@ static void Main(string[] args) { BootstrapServers = config.BootstrapServers, ApplicationId = config.ApplicationId, - DbName = databaseName, + DatabaseName = databaseName, StreamsConfig = streamConfig, }) { diff --git a/test/KEFCore.Benchmark.Test/Program.cs b/test/KEFCore.Benchmark.Test/Program.cs index de058443..9882aa3e 100644 --- a/test/KEFCore.Benchmark.Test/Program.cs +++ b/test/KEFCore.Benchmark.Test/Program.cs @@ -85,7 +85,7 @@ static void Main(string[] args) { BootstrapServers = config.BootstrapServers, ApplicationId = config.ApplicationId, - DbName = databaseName, + DatabaseName = databaseName, StreamsConfig = streamConfig, }) { @@ -140,7 +140,7 @@ static void Main(string[] args) { BootstrapServers = config.BootstrapServers, ApplicationId = config.ApplicationId, - DbName = databaseName, + DatabaseName = databaseName, StreamsConfig = streamConfig, }) { diff --git a/test/KEFCore.Complex.Test/Program.cs b/test/KEFCore.Complex.Test/Program.cs index 7bc6bc40..10eba508 100644 --- a/test/KEFCore.Complex.Test/Program.cs +++ b/test/KEFCore.Complex.Test/Program.cs @@ -82,7 +82,7 @@ static void Main(string[] args) { BootstrapServers = config.BootstrapServers, ApplicationId = config.ApplicationId, - DbName = databaseName, + DatabaseName = databaseName, StreamsConfig = streamConfig, }; diff --git a/test/KEFCore.Test/Program.cs b/test/KEFCore.Test/Program.cs index 7700b74b..808fe627 100644 --- a/test/KEFCore.Test/Program.cs +++ b/test/KEFCore.Test/Program.cs @@ -84,7 +84,7 @@ static void Main(string[] args) { BootstrapServers = config.BootstrapServers, ApplicationId = config.ApplicationId, - DbName = databaseName, + DatabaseName = databaseName, StreamsConfig = streamConfig, };