Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/129/quartz jobs miss fired problem #131

Merged
merged 15 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ jobs:
- name: Start Kafka
uses: 280780363/kafka-action@v1.0
with:
kafka version: "latest" # Optional, kafka version
zookeeper version: "latest" # Optional, zookeeper version
kafka version: "3.4.0-debian-11-r15" # Optional, kafka version
zookeeper version: "3.8.1-debian-11-r18" # Optional, zookeeper version
kafka port: 9092 # Optional, kafka port. Connect using localhost:9092
zookeeper port: 2181 # Optional, zookeeper port
auto create topic: "true" # Optional, auto create kafka topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,4 @@ private void SetupServices(HostBuilderContext context, IServiceCollection servic
services.AddSingleton<RetryDurableGuaranteeOrderedConsumptionPhysicalStorageAssert>();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.5" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Include="Moq" Version="4.18.4" />
<PackageReference Include="OpenCover" Version="4.7.1221" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
namespace KafkaFlow.Retry.IntegrationTests.PollingTests
{
using System.Collections.Generic;
using global::KafkaFlow.Retry.Durable.Definitions.Polling;
using global::KafkaFlow.Retry.Durable.Polling;
using Quartz;

internal class JobDataProviderSurrogate : IJobDataProvider
{
public JobDataProviderSurrogate(string schedulerId, PollingDefinition pollingDefinition, ITrigger trigger, List<IJobExecutionContext> jobExecutionContexts)
{
this.PollingDefinition = pollingDefinition;

this.Trigger = trigger;
this.TriggerName = this.GetTriggerName(schedulerId);

this.JobExecutionContexts = jobExecutionContexts;
this.JobDetail = this.CreateJobDetail();
}

public IJobDetail JobDetail { get; }

public List<IJobExecutionContext> JobExecutionContexts { get; }

public PollingDefinition PollingDefinition { get; }

public ITrigger Trigger { get; }

public string TriggerName { get; }

private IJobDetail CreateJobDetail()
{
var dataMap = new JobDataMap { { "JobExecution", this.JobExecutionContexts } };

return JobBuilder
.Create<JobSurrogate>()
.SetJobData(dataMap)
.Build();
}

private string GetTriggerName(string schedulerId)
{
return $"pollingJobTrigger_{schedulerId}_{this.PollingDefinition.PollingJobType}";
}
}
}
17 changes: 17 additions & 0 deletions src/KafkaFlow.Retry.IntegrationTests/PollingTests/JobSurrogate.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace KafkaFlow.Retry.IntegrationTests.PollingTests
{
using System.Collections.Generic;
using System.Threading.Tasks;
using Quartz;

internal class JobSurrogate : IJob
{
public Task Execute(IJobExecutionContext context)
{
var jobExecutionContexts = context.JobDetail.JobDataMap["JobExecution"] as List<IJobExecutionContext>;
jobExecutionContexts.Add(context);

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
namespace KafkaFlow.Retry.IntegrationTests.PollingTests
{
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using global::KafkaFlow.Retry.Durable.Definitions.Polling;
using global::KafkaFlow.Retry.Durable.Polling;
using Moq;
using Quartz;
using Xunit;

public class QueueTrackerCoordinatorTests
{
private readonly Mock<IJobDataProvidersFactory> mockJobDataProvidersFactory;
private readonly ITriggerProvider triggerProvider;

public QueueTrackerCoordinatorTests()
{
this.triggerProvider = new TriggerProvider();

this.mockJobDataProvidersFactory = new Mock<IJobDataProvidersFactory>();
}

[Fact]
public async Task QueueTrackerCoordinator_ForceMisfireJob_SuccessWithCorrectScheduledFiredTimes()
{
// arrange
var schedulerId = "MisfiredJobsDoesNothing";
var jobExecutionContexts = new List<IJobExecutionContext>();

var waitForScheduleInSeconds = 5;
var jobActiveTimeInSeconds = 8;
var pollingInSeconds = 2;

var cronExpression = $"*/{pollingInSeconds} * * ? * * *";

var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, cronExpression, jobExecutionContexts);

this.mockJobDataProvidersFactory
.Setup(m => m.Create(It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>()))
.Returns(new[] { retryDurableJobDataProvider });

var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId);

// act

Thread.Sleep(waitForScheduleInSeconds * 1000);

await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of<IMessageProducer>(), Mock.Of<ILogHandler>());

Thread.Sleep(jobActiveTimeInSeconds * 1000);

await queueTrackerCoordinator.UnscheduleJobsAsync();

// assert
var scheduledFiredTimes = jobExecutionContexts
.Where(ctx => ctx.ScheduledFireTimeUtc.HasValue)
.Select(ctx => ctx.ScheduledFireTimeUtc.Value)
.OrderBy(x => x)
.ToList();

var currentScheduledFiredTime = scheduledFiredTimes.First();
var otherScheduledFiredTimes = scheduledFiredTimes.Skip(1).ToList();

foreach (var scheduledFiredTime in otherScheduledFiredTimes)
{
currentScheduledFiredTime.AddSeconds(pollingInSeconds).Should().Be(scheduledFiredTime);

currentScheduledFiredTime = scheduledFiredTime;
}
}

[Fact]
public async Task QueueTrackerCoordinator_ScheduleAndUnscheduleDifferentJobs_Success()
{
// arrange
var schedulerId = "twoJobsSchedulerId";
var jobExecutionContexts = new List<IJobExecutionContext>();

var timePollingActiveInSeconds = 4;

var retryDurableCronExpression = "*/2 * * ? * * *";
var cleanupCronExpression = "*/4 * * ? * * *";

var retryDurableMinExpectedJobsFired = 2;
var retryDurableMaxExpectedJobsFired = 3;
var cleanupMinExpectedJobsFired = 1;
var cleanupMaxExpectedJobsFired = 2;

var retryDurableJobDataProvider = this.CreateRetryDurableJobDataProvider(schedulerId, retryDurableCronExpression, jobExecutionContexts);
var cleanupJobDataProvider = this.CreateCleanupJobDataProvider(schedulerId, cleanupCronExpression, jobExecutionContexts);

this.mockJobDataProvidersFactory
.Setup(m => m.Create(It.IsAny<IMessageProducer>(), It.IsAny<ILogHandler>()))
.Returns(new[] { retryDurableJobDataProvider, cleanupJobDataProvider });

var queueTrackerCoordinator = this.CreateQueueTrackerCoordinator(schedulerId);

// act
await queueTrackerCoordinator.ScheduleJobsAsync(Mock.Of<IMessageProducer>(), Mock.Of<ILogHandler>());

Thread.Sleep(timePollingActiveInSeconds * 1000);

await queueTrackerCoordinator.UnscheduleJobsAsync();

// assert
jobExecutionContexts.Where(ctx => !ctx.PreviousFireTimeUtc.HasValue).Should().HaveCount(2);

var retryDurableFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == retryDurableJobDataProvider.TriggerName);
var cleanupFiresContexts = jobExecutionContexts.Where(ctx => ctx.Trigger.Key.Name == cleanupJobDataProvider.TriggerName);

retryDurableFiresContexts
.Should()
.HaveCountGreaterThanOrEqualTo(retryDurableMinExpectedJobsFired)
.And
.HaveCountLessThanOrEqualTo(retryDurableMaxExpectedJobsFired);

retryDurableFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue);

cleanupFiresContexts
.Should()
.HaveCountGreaterThanOrEqualTo(cleanupMinExpectedJobsFired)
.And
.HaveCountLessThanOrEqualTo(cleanupMaxExpectedJobsFired);

cleanupFiresContexts.Should().ContainSingle(ctx => !ctx.PreviousFireTimeUtc.HasValue);
}

private JobDataProviderSurrogate CreateCleanupJobDataProvider(string schedulerId, string cronExpression, List<IJobExecutionContext> jobExecutionContexts)
{
var cleanupPollingDefinition =
new CleanupPollingDefinition(
enabled: true,
cronExpression: cronExpression,
timeToLiveInDays: 1,
rowsPerRequest: 10
);

return this.CreateJobDataProvider(schedulerId, cleanupPollingDefinition, jobExecutionContexts);
}

private JobDataProviderSurrogate CreateJobDataProvider(string schedulerId, PollingDefinition pollingDefinition, List<IJobExecutionContext> jobExecutionContexts)
{
var trigger = this.triggerProvider.GetPollingTrigger(schedulerId, pollingDefinition);

return new JobDataProviderSurrogate(schedulerId, pollingDefinition, trigger, jobExecutionContexts);
}

private IQueueTrackerCoordinator CreateQueueTrackerCoordinator(string schedulerId)
{
var queueTrackerFactory = new QueueTrackerFactory(schedulerId, this.mockJobDataProvidersFactory.Object);

return new QueueTrackerCoordinator(queueTrackerFactory);
}

private JobDataProviderSurrogate CreateRetryDurableJobDataProvider(string schedulerId, string cronExpression, List<IJobExecutionContext> jobExecutionContexts)
{
var retryDurablePollingDefinition =
new RetryDurablePollingDefinition(
enabled: true,
cronExpression: cronExpression,
fetchSize: 100,
expirationIntervalFactor: 1
);

return this.CreateJobDataProvider(schedulerId, retryDurablePollingDefinition, jobExecutionContexts);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Xunit;

[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
[assembly: ExcludeFromCodeCoverage]
[assembly: ExcludeFromCodeCoverage]
[assembly: CollectionBehavior(DisableTestParallelization = true)]
2 changes: 1 addition & 1 deletion src/KafkaFlow.Retry.IntegrationTests/RetryDurableTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,4 @@ internal async Task RetryDurableTest(
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class QueueTrackerCoordinatorTests
{
private readonly Mock<IJobDataProvider> mockJobDataProvider;
private readonly Mock<IQueueTrackerFactory> mockQueueTrackerFactory;
private readonly QueueTrackerCoordinator queueTrackerCoordinator;
private readonly IQueueTrackerCoordinator queueTrackerCoordinator;

public QueueTrackerCoordinatorTests()
{
Expand Down Expand Up @@ -90,4 +90,4 @@ public async Task QueueTrackerCoordinator_UnscheduleJobs_Success()
this.mockJobDataProvider.Verify(m => m.Trigger, Times.Exactly(2));
}
}
}
}
Loading