From 6479f15128863bc2c5e3f528c4a393932d27e6a7 Mon Sep 17 00:00:00 2001 From: Bradnitski Date: Fri, 21 Apr 2023 14:26:55 +0200 Subject: [PATCH] Don't use hazelcast executor in task scheduling #10122 --- .../enonic/xp/scheduler/ScheduleCalendar.java | 4 +- .../distributed/CronCalendarImpl.java | 2 +- .../distributed/OneTimeCalendarImpl.java | 5 +- .../scheduler/distributed/RescheduleTask.java | 69 ++++++++++--------- .../scheduler/CalendarServiceImplTest.java | 4 +- .../distributed/CronCalendarTest.java | 2 +- .../distributed/OneTimeCalendarTest.java | 16 ++--- .../server/rest/SchedulerResourceTest.java | 4 +- 8 files changed, 55 insertions(+), 51 deletions(-) diff --git a/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java b/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java index 72a55dbaa2d..46f1ebdb1ed 100644 --- a/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java +++ b/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java @@ -12,9 +12,9 @@ public interface ScheduleCalendar extends Serializable { - Optional timeToNextExecution(); - Optional nextExecution( Instant instant ); + Optional nextExecution(); + ScheduleCalendarType getType(); } diff --git a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java index 50dcc1a7765..ef33ad55ca4 100644 --- a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java +++ b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java @@ -65,7 +65,7 @@ public Optional nextExecution( final Instant instant ) } @Override - public Optional timeToNextExecution() + public Optional nextExecution() { return this.executionTime.timeToNextExecution( ZonedDateTime.now( timeZone.toZoneId() ) ); } diff --git a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java index ce5840e8736..7c8d770f8d2 100644 --- a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java +++ b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java @@ -35,7 +35,7 @@ public Instant getValue() } @Override - public Optional timeToNextExecution() + public Optional nextExecution() { return Optional.of( Duration.between( Instant.now(), value ) ); } @@ -43,7 +43,8 @@ public Optional timeToNextExecution() @Override public Optional nextExecution( final Instant instant ) { - return Optional.of( value.atZone( ZoneId.systemDefault() ) ); //TODO:discuss + final ZonedDateTime timeToRun = value.atZone( ZoneId.systemDefault() ); + return Optional.ofNullable( instant.atZone( ZoneId.systemDefault() ).isBefore( timeToRun ) ? timeToRun : null ); } @Override diff --git a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java index 0c761d64a0d..9a665bcf0a6 100644 --- a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java +++ b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java @@ -1,18 +1,20 @@ package com.enonic.xp.impl.scheduler.distributed; import java.time.Instant; -import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; -import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableList; + import com.enonic.xp.context.Context; import com.enonic.xp.context.ContextAccessor; import com.enonic.xp.context.ContextBuilder; @@ -54,36 +56,36 @@ public String getName() return NAME; } - private static void fetchJobsToSchedule( final Map jobs ) + private static void fillJobsToSchedule( final Map jobs ) { final Instant now = Instant.now(); - final Set scheduledJobNames = QUEUE.stream().map( entity -> entity.name ).collect( Collectors.toSet() ); - scheduleCronJobs( jobs, now, scheduledJobNames ); - scheduleOneTimeJobs( jobs, scheduledJobNames ); + final Predicate filterAlreadyScheduled = + job -> !QUEUE.stream().map( entity -> entity.name ).collect( Collectors.toSet() ).contains( job.getName() ); + + scheduleCronJobs( jobs, now, filterAlreadyScheduled ); + scheduleOneTimeJobs( jobs, now, filterAlreadyScheduled ); } - private static void scheduleOneTimeJobs( final Map jobs, final Set scheduledJobNames ) + private static void scheduleOneTimeJobs( final Map jobs, final Instant now, + final Predicate filterAlreadyScheduled ) { jobs.values() .stream() .filter( ScheduledJob::isEnabled ) - .filter( job -> !scheduledJobNames.contains( job.getName() ) ) + .filter( filterAlreadyScheduled ) .filter( job -> ScheduleCalendarType.ONE_TIME.equals( job.getCalendar().getType() ) && job.getLastRun() == null ) .forEach( ( job ) -> { job.getCalendar() - .timeToNextExecution() - .ifPresent( duration -> QUEUE.offer( new JobToRun( job.getName(), Instant.now().plus( duration ) ) ) ); + .nextExecution() + .ifPresent( duration -> QUEUE.offer( new JobToRun( job.getName(), now.plus( duration ) ) ) ); } ); } private static void scheduleCronJobs( final Map jobs, final Instant now, - final Set scheduledJobNames ) + final Predicate filterAlreadyScheduled ) { - jobs.values() - .stream() - .filter( ScheduledJob::isEnabled ) - .filter( job -> !scheduledJobNames.contains( job.getName() ) ) + jobs.values().stream().filter( ScheduledJob::isEnabled ).filter( filterAlreadyScheduled ) .filter( job -> ScheduleCalendarType.CRON.equals( job.getCalendar().getType() ) ) .forEach( job -> { final Instant actualLastRun = job.getLastRun(); @@ -101,9 +103,9 @@ private static void scheduleCronJobs( final Map else { job.getCalendar() - .timeToNextExecution() - .ifPresent( nextExecutionDuration -> QUEUE.offer( - new JobToRun( job.getName(), Instant.now().plus( nextExecutionDuration ) ) ) ); + .nextExecution() + .ifPresent( + nextExecutionDuration -> QUEUE.offer( new JobToRun( job.getName(), now.plus( nextExecutionDuration ) ) ) ); } } ); } @@ -141,17 +143,17 @@ private void doRun() .stream() .collect( Collectors.toMap( ScheduledJob::getName, job -> job ) ); - fetchJobsToSchedule( jobs ); - - final List failedJobs = new ArrayList<>(); + fillJobsToSchedule( jobs ); - scheduleJobs( jobs, failedJobs ); + final List failedJobs = scheduleJobs( jobs ); retryFailedJobs( failedJobs ); } - private void scheduleJobs( final Map jobs, final List failedJobs ) + private List scheduleJobs( final Map jobs ) { + final ImmutableList.Builder failedJobs = ImmutableList.builder(); + while ( !QUEUE.isEmpty() ) { final JobToRun peek = QUEUE.peek(); @@ -169,18 +171,17 @@ private void scheduleJobs( final Map jobs, final final ScheduledJob job = jobs.get( peek.name ); + final Function submitTask = taskService -> taskService.submitTask( SubmitTaskParams.create() + .name( + job.getName().getValue() ) + .descriptorKey( + job.getDescriptor() ) + .data( job.getConfig() ) + .build() ); try { - final TaskId taskId = taskContext( job.getUser() ).callWith( () -> OsgiSupport.withService( TaskService.class, - taskService -> taskService.submitTask( - SubmitTaskParams.create() - .name( job.getName() - .getValue() ) //TODO: check - .descriptorKey( - job.getDescriptor() ) - .data( - job.getConfig() ) - .build() ) ) ); + final TaskId taskId = + taskContext( job.getUser() ).callWith( () -> OsgiSupport.withService( TaskService.class, submitTask ) ); adminContext().runWith( () -> OsgiSupport.withService( NodeService.class, nodeService -> UpdateLastRunCommand.create() .nodeService( nodeService ) @@ -200,6 +201,8 @@ private void scheduleJobs( final Map jobs, final } } } + + return failedJobs.build(); } private void retryFailedJobs( final List failedJobs ) diff --git a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java index 859df6e4ffa..c9126286044 100644 --- a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java +++ b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java @@ -30,7 +30,7 @@ public void cron() { final CronCalendar calendar = calendarService.cron( "* * * * *", TimeZone.getTimeZone( "GMT+5:30" ) ); - assertTrue( calendar.timeToNextExecution().get().get( ChronoUnit.SECONDS ) <= 60 ); + assertTrue( calendar.nextExecution().get().get( ChronoUnit.SECONDS ) <= 60 ); assertEquals( TimeZone.getTimeZone( "GMT+5:30" ), calendar.getTimeZone() ); assertEquals( "* * * * *", calendar.getCronValue() ); } @@ -53,7 +53,7 @@ public void oneTime() { final OneTimeCalendar calendar = calendarService.oneTime( Instant.parse( "2014-09-25T10:00:00.00Z" ) ); - assertTrue( calendar.timeToNextExecution().get().isNegative() ); + assertTrue( calendar.nextExecution().get().isNegative() ); assertEquals( Instant.parse( "2014-09-25T10:00:00.00Z" ), calendar.getValue() ); } diff --git a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java index 700900f27d6..9ad03481387 100644 --- a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java +++ b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java @@ -49,7 +49,7 @@ public void create() timeZone( TimeZone.getDefault() ). build(); - assertTrue( calendar.timeToNextExecution().get().toSeconds() <= 60 ); + assertTrue( calendar.nextExecution().get().toSeconds() <= 60 ); assertEquals( ScheduleCalendarType.CRON, calendar.getType() ); } diff --git a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java index 28c77cc9168..580c8983dc8 100644 --- a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java +++ b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java @@ -31,18 +31,18 @@ public void createWrongValue() @Test public void create() { - OneTimeCalendarImpl calendar = OneTimeCalendarImpl.create(). - value( Instant.now().plus( Duration.of( 1, ChronoUnit.MINUTES ) ) ). - build(); + final Instant now = Instant.now(); + + OneTimeCalendarImpl calendar = OneTimeCalendarImpl.create().value( now.plus( Duration.of( 1, ChronoUnit.MINUTES ) ) ).build(); + + assertFalse( calendar.nextExecution().get().isNegative() ); + assertTrue( calendar.nextExecution( now.plus( Duration.of( 2, ChronoUnit.MINUTES ) ) ).isEmpty() ); - assertFalse( calendar.timeToNextExecution().get().isNegative() ); assertEquals( ScheduleCalendarType.ONE_TIME, calendar.getType() ); - calendar = OneTimeCalendarImpl.create(). - value( Instant.now().minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ). - build(); + calendar = OneTimeCalendarImpl.create().value( now.minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ).build(); - assertTrue( calendar.timeToNextExecution().get().isNegative() ); + assertTrue( calendar.nextExecution().get().isNegative() ); } @Test diff --git a/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java b/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java index 48327fc7692..42c6772270b 100644 --- a/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java +++ b/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java @@ -57,7 +57,7 @@ public TimeZone getTimeZone() return TimeZone.getTimeZone( "GMT+3:00" ); } - public Optional timeToNextExecution() + public Optional nextExecution() { return Optional.of( Duration.ofSeconds( 50 ) ); } @@ -84,7 +84,7 @@ public Instant getValue() } @Override - public Optional timeToNextExecution() + public Optional nextExecution() { return Optional.of( Duration.ofSeconds( 50 ) ); }