Skip to content

Commit

Permalink
Don't use hazelcast executor in task scheduling #10122
Browse files Browse the repository at this point in the history
  • Loading branch information
vbradnitski committed Apr 21, 2023
1 parent 5824082 commit 6479f15
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
public interface ScheduleCalendar
extends Serializable
{
Optional<Duration> timeToNextExecution();

Optional<ZonedDateTime> nextExecution( Instant instant );

Optional<Duration> nextExecution();

ScheduleCalendarType getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public Optional<ZonedDateTime> nextExecution( final Instant instant )
}

@Override
public Optional<Duration> timeToNextExecution()
public Optional<Duration> nextExecution()
{
return this.executionTime.timeToNextExecution( ZonedDateTime.now( timeZone.toZoneId() ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ public Instant getValue()
}

@Override
public Optional<Duration> timeToNextExecution()
public Optional<Duration> nextExecution()
{
return Optional.of( Duration.between( Instant.now(), value ) );
}

@Override
public Optional<ZonedDateTime> nextExecution( final Instant instant )
{
return Optional.of( value.atZone( ZoneId.systemDefault() ) ); //TODO:discuss
final ZonedDateTime timeToRun = value.atZone( ZoneId.systemDefault() );
return Optional.ofNullable( instant.atZone( ZoneId.systemDefault() ).isBefore( timeToRun ) ? timeToRun : null );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package com.enonic.xp.impl.scheduler.distributed;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;

import com.enonic.xp.context.Context;
import com.enonic.xp.context.ContextAccessor;
import com.enonic.xp.context.ContextBuilder;
Expand Down Expand Up @@ -54,36 +56,36 @@ public String getName()
return NAME;
}

private static void fetchJobsToSchedule( final Map<ScheduledJobName, ScheduledJob> jobs )
private static void fillJobsToSchedule( final Map<ScheduledJobName, ScheduledJob> jobs )
{
final Instant now = Instant.now();
final Set<ScheduledJobName> scheduledJobNames = QUEUE.stream().map( entity -> entity.name ).collect( Collectors.toSet() );

scheduleCronJobs( jobs, now, scheduledJobNames );
scheduleOneTimeJobs( jobs, scheduledJobNames );
final Predicate<ScheduledJob> filterAlreadyScheduled =
job -> !QUEUE.stream().map( entity -> entity.name ).collect( Collectors.toSet() ).contains( job.getName() );

scheduleCronJobs( jobs, now, filterAlreadyScheduled );
scheduleOneTimeJobs( jobs, now, filterAlreadyScheduled );
}

private static void scheduleOneTimeJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final Set<ScheduledJobName> scheduledJobNames )
private static void scheduleOneTimeJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final Instant now,
final Predicate<ScheduledJob> filterAlreadyScheduled )
{
jobs.values()
.stream()
.filter( ScheduledJob::isEnabled )
.filter( job -> !scheduledJobNames.contains( job.getName() ) )
.filter( filterAlreadyScheduled )
.filter( job -> ScheduleCalendarType.ONE_TIME.equals( job.getCalendar().getType() ) && job.getLastRun() == null )
.forEach( ( job ) -> {
job.getCalendar()
.timeToNextExecution()
.ifPresent( duration -> QUEUE.offer( new JobToRun( job.getName(), Instant.now().plus( duration ) ) ) );
.nextExecution()
.ifPresent( duration -> QUEUE.offer( new JobToRun( job.getName(), now.plus( duration ) ) ) );
} );
}

private static void scheduleCronJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final Instant now,
final Set<ScheduledJobName> scheduledJobNames )
final Predicate<ScheduledJob> filterAlreadyScheduled )
{
jobs.values()
.stream()
.filter( ScheduledJob::isEnabled )
.filter( job -> !scheduledJobNames.contains( job.getName() ) )
jobs.values().stream().filter( ScheduledJob::isEnabled ).filter( filterAlreadyScheduled )
.filter( job -> ScheduleCalendarType.CRON.equals( job.getCalendar().getType() ) )
.forEach( job -> {
final Instant actualLastRun = job.getLastRun();
Expand All @@ -101,9 +103,9 @@ private static void scheduleCronJobs( final Map<ScheduledJobName, ScheduledJob>
else
{
job.getCalendar()
.timeToNextExecution()
.ifPresent( nextExecutionDuration -> QUEUE.offer(
new JobToRun( job.getName(), Instant.now().plus( nextExecutionDuration ) ) ) );
.nextExecution()
.ifPresent(
nextExecutionDuration -> QUEUE.offer( new JobToRun( job.getName(), now.plus( nextExecutionDuration ) ) ) );
}
} );
}
Expand Down Expand Up @@ -141,17 +143,17 @@ private void doRun()
.stream()
.collect( Collectors.toMap( ScheduledJob::getName, job -> job ) );

fetchJobsToSchedule( jobs );

final List<JobToRun> failedJobs = new ArrayList<>();
fillJobsToSchedule( jobs );

scheduleJobs( jobs, failedJobs );
final List<JobToRun> failedJobs = scheduleJobs( jobs );

retryFailedJobs( failedJobs );
}

private void scheduleJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final List<JobToRun> failedJobs )
private List<JobToRun> scheduleJobs( final Map<ScheduledJobName, ScheduledJob> jobs )
{
final ImmutableList.Builder<JobToRun> failedJobs = ImmutableList.builder();

while ( !QUEUE.isEmpty() )
{
final JobToRun peek = QUEUE.peek();
Expand All @@ -169,18 +171,17 @@ private void scheduleJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final

final ScheduledJob job = jobs.get( peek.name );

final Function<TaskService, TaskId> submitTask = taskService -> taskService.submitTask( SubmitTaskParams.create()
.name(
job.getName().getValue() )
.descriptorKey(
job.getDescriptor() )
.data( job.getConfig() )
.build() );
try
{
final TaskId taskId = taskContext( job.getUser() ).callWith( () -> OsgiSupport.withService( TaskService.class,
taskService -> taskService.submitTask(
SubmitTaskParams.create()
.name( job.getName()
.getValue() ) //TODO: check
.descriptorKey(
job.getDescriptor() )
.data(
job.getConfig() )
.build() ) ) );
final TaskId taskId =
taskContext( job.getUser() ).callWith( () -> OsgiSupport.withService( TaskService.class, submitTask ) );

adminContext().runWith( () -> OsgiSupport.withService( NodeService.class, nodeService -> UpdateLastRunCommand.create()
.nodeService( nodeService )
Expand All @@ -200,6 +201,8 @@ private void scheduleJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final
}
}
}

return failedJobs.build();
}

private void retryFailedJobs( final List<JobToRun> failedJobs )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void cron()
{
final CronCalendar calendar = calendarService.cron( "* * * * *", TimeZone.getTimeZone( "GMT+5:30" ) );

assertTrue( calendar.timeToNextExecution().get().get( ChronoUnit.SECONDS ) <= 60 );
assertTrue( calendar.nextExecution().get().get( ChronoUnit.SECONDS ) <= 60 );
assertEquals( TimeZone.getTimeZone( "GMT+5:30" ), calendar.getTimeZone() );
assertEquals( "* * * * *", calendar.getCronValue() );
}
Expand All @@ -53,7 +53,7 @@ public void oneTime()
{
final OneTimeCalendar calendar = calendarService.oneTime( Instant.parse( "2014-09-25T10:00:00.00Z" ) );

assertTrue( calendar.timeToNextExecution().get().isNegative() );
assertTrue( calendar.nextExecution().get().isNegative() );
assertEquals( Instant.parse( "2014-09-25T10:00:00.00Z" ), calendar.getValue() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void create()
timeZone( TimeZone.getDefault() ).
build();

assertTrue( calendar.timeToNextExecution().get().toSeconds() <= 60 );
assertTrue( calendar.nextExecution().get().toSeconds() <= 60 );
assertEquals( ScheduleCalendarType.CRON, calendar.getType() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ public void createWrongValue()
@Test
public void create()
{
OneTimeCalendarImpl calendar = OneTimeCalendarImpl.create().
value( Instant.now().plus( Duration.of( 1, ChronoUnit.MINUTES ) ) ).
build();
final Instant now = Instant.now();

OneTimeCalendarImpl calendar = OneTimeCalendarImpl.create().value( now.plus( Duration.of( 1, ChronoUnit.MINUTES ) ) ).build();

assertFalse( calendar.nextExecution().get().isNegative() );
assertTrue( calendar.nextExecution( now.plus( Duration.of( 2, ChronoUnit.MINUTES ) ) ).isEmpty() );

assertFalse( calendar.timeToNextExecution().get().isNegative() );
assertEquals( ScheduleCalendarType.ONE_TIME, calendar.getType() );

calendar = OneTimeCalendarImpl.create().
value( Instant.now().minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ).
build();
calendar = OneTimeCalendarImpl.create().value( now.minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ).build();

assertTrue( calendar.timeToNextExecution().get().isNegative() );
assertTrue( calendar.nextExecution().get().isNegative() );
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public TimeZone getTimeZone()
return TimeZone.getTimeZone( "GMT+3:00" );
}

public Optional<Duration> timeToNextExecution()
public Optional<Duration> nextExecution()
{
return Optional.of( Duration.ofSeconds( 50 ) );
}
Expand All @@ -84,7 +84,7 @@ public Instant getValue()
}

@Override
public Optional<Duration> timeToNextExecution()
public Optional<Duration> nextExecution()
{
return Optional.of( Duration.ofSeconds( 50 ) );
}
Expand Down

0 comments on commit 6479f15

Please sign in to comment.