diff --git a/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java b/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java index 2f6a974aef1..72a55dbaa2d 100644 --- a/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java +++ b/modules/core/core-api/src/main/java/com/enonic/xp/scheduler/ScheduleCalendar.java @@ -2,6 +2,8 @@ import java.io.Serializable; import java.time.Duration; +import java.time.Instant; +import java.time.ZonedDateTime; import java.util.Optional; import com.enonic.xp.annotation.PublicApi; @@ -10,7 +12,9 @@ public interface ScheduleCalendar extends Serializable { - Optional nextExecution(); + Optional timeToNextExecution(); + + Optional nextExecution( Instant instant ); ScheduleCalendarType getType(); } diff --git a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java index d3b09902887..50dcc1a7765 100644 --- a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java +++ b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarImpl.java @@ -2,6 +2,7 @@ import java.io.Serializable; import java.time.Duration; +import java.time.Instant; import java.time.ZonedDateTime; import java.util.Optional; import java.util.TimeZone; @@ -58,7 +59,13 @@ public static Builder create() } @Override - public Optional nextExecution() + public Optional nextExecution( final Instant instant ) + { + return this.executionTime.nextExecution( ZonedDateTime.ofInstant( instant, timeZone.toZoneId() ) ); + } + + @Override + public Optional timeToNextExecution() { return this.executionTime.timeToNextExecution( ZonedDateTime.now( timeZone.toZoneId() ) ); } diff --git a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java index 7d9e4f02c5f..6385de5508e 100644 --- a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java +++ b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarImpl.java @@ -2,6 +2,8 @@ import java.time.Duration; import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.Optional; import com.google.common.base.Preconditions; @@ -33,11 +35,17 @@ public Instant getValue() } @Override - public Optional nextExecution() + public Optional timeToNextExecution() { return Optional.of( Duration.between( Instant.now(), value ) ); } + @Override + public Optional nextExecution( final Instant instant ) + { + return Optional.of( value.atZone( ZoneId.systemDefault() ) ); + } + @Override public ScheduleCalendarType getType() { diff --git a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java index 6a9f7a96f9e..9f5ac7b43ef 100644 --- a/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java +++ b/modules/core/core-scheduler/src/main/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTask.java @@ -1,9 +1,14 @@ package com.enonic.xp.impl.scheduler.distributed; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -12,14 +17,22 @@ import com.enonic.xp.context.ContextAccessor; import com.enonic.xp.context.ContextBuilder; import com.enonic.xp.core.internal.osgi.OsgiSupport; -import com.enonic.xp.impl.scheduler.SchedulerExecutorService; +import com.enonic.xp.impl.scheduler.UpdateLastRunCommand; +import com.enonic.xp.node.NodeService; +import com.enonic.xp.scheduler.CronCalendar; import com.enonic.xp.scheduler.ScheduleCalendarType; import com.enonic.xp.scheduler.ScheduledJob; +import com.enonic.xp.scheduler.ScheduledJobName; import com.enonic.xp.scheduler.SchedulerService; import com.enonic.xp.security.PrincipalKey; import com.enonic.xp.security.RoleKeys; +import com.enonic.xp.security.SecurityService; import com.enonic.xp.security.User; import com.enonic.xp.security.auth.AuthenticationInfo; +import com.enonic.xp.security.auth.VerifiedUsernameAuthToken; +import com.enonic.xp.task.SubmitTaskParams; +import com.enonic.xp.task.TaskId; +import com.enonic.xp.task.TaskService; public class RescheduleTask implements SchedulableTask @@ -32,12 +45,69 @@ public class RescheduleTask private static final AtomicInteger FAILED_COUNT = new AtomicInteger( 0 ); + private static final PriorityQueue QUEUE = new PriorityQueue<>( Comparator.comparing( a -> a.timeToRun ) ); + + @Override public String getName() { return NAME; } + private static void fetchJobsToSchedule( final Map jobs ) + { + final Instant now = Instant.now(); + final Set scheduledJobNames = QUEUE.stream().map( entity -> entity.name ).collect( Collectors.toSet() ); + + scheduleCronJobs( jobs, now, scheduledJobNames ); + scheduleOneTimeJobs( jobs, scheduledJobNames ); + } + + private static void scheduleOneTimeJobs( final Map jobs, final Set scheduledJobNames ) + { + jobs.values() + .stream() + .filter( ScheduledJob::isEnabled ) + .filter( job -> !scheduledJobNames.contains( job.getName() ) ) + .filter( job -> ScheduleCalendarType.ONE_TIME.equals( job.getCalendar().getType() ) && job.getLastRun() == null ) + .forEach( ( job ) -> { + job.getCalendar() + .timeToNextExecution() + .ifPresent( duration -> QUEUE.offer( new JobToRun( job.getName(), Instant.now().plus( duration ) ) ) ); + } ); + } + + private static void scheduleCronJobs( final Map jobs, final Instant now, + final Set scheduledJobNames ) + { + jobs.values() + .stream() + .filter( ScheduledJob::isEnabled ) + .filter( job -> !scheduledJobNames.contains( job.getName() ) ) + .filter( job -> ScheduleCalendarType.CRON.equals( job.getCalendar().getType() ) ) + .forEach( job -> { + final Instant actualLastRun = job.getLastRun(); + final CronCalendar calendar = (CronCalendar) job.getCalendar(); + + if ( actualLastRun != null ) + { + calendar.nextExecution( actualLastRun ).ifPresent( nextExecution -> { + if ( nextExecution.isBefore( now.atZone( calendar.getTimeZone().toZoneId() ) ) ) + { + QUEUE.offer( new JobToRun( job.getName(), now ) ); + } + } ); + } + else + { + job.getCalendar() + .timeToNextExecution() + .ifPresent( nextExecutionDuration -> QUEUE.offer( + new JobToRun( job.getName(), Instant.now().plus( nextExecutionDuration ) ) ) ); + } + } ); + } + @Override public void run() { @@ -46,13 +116,15 @@ public void run() this.doRun(); FAILED_COUNT.set( 0 ); } - catch ( IllegalStateException e ) + catch ( Exception e ) { if ( FAILED_COUNT.addAndGet( 1 ) >= 10 ) { FAILED_COUNT.set( 0 ); LOG.warn( "Problem during tasks scheduling", e ); - } else { + } + else + { LOG.debug( "Problem during tasks scheduling", e ); } } @@ -64,39 +136,87 @@ public void run() private void doRun() { - final List jobs = - OsgiSupport.withService( SchedulerService.class, schedulerService -> adminContext().callWith( schedulerService::list ) ); + final Map jobs = + OsgiSupport.withService( SchedulerService.class, schedulerService -> adminContext().callWith( schedulerService::list ) ) + .stream() + .collect( Collectors.toMap( ScheduledJob::getName, job -> job ) ); - OsgiSupport.withService( SchedulerExecutorService.class, schedulerExecutorService -> { - schedulerExecutorService.disposeAllDone(); - return null; - } ); + fetchJobsToSchedule( jobs ); - final Set scheduledJobs = - OsgiSupport.withService( SchedulerExecutorService.class, SchedulerExecutorService::getAllFutures ); + final List failedJobs = new ArrayList<>(); - LOG.debug( "Currently scheduled jobs {}", scheduledJobs ); + scheduleJobs( jobs, failedJobs ); - jobs.stream() - .filter( ScheduledJob::isEnabled ) - .filter( job -> !scheduledJobs.contains( job.getName().getValue() ) ) - .filter( job -> !ScheduleCalendarType.ONE_TIME.equals( job.getCalendar().getType() ) || job.getLastRun() == null ) - .forEach( job -> job.getCalendar().nextExecution().ifPresent( duration -> { - try + retryFailedJobs( failedJobs ); + } + + private void scheduleJobs( final Map jobs, final List failedJobs ) + { + while ( !QUEUE.isEmpty() ) + { + final JobToRun peek = QUEUE.peek(); + + if ( peek.timeToRun.isAfter( Instant.now() ) ) { - LOG.debug( "Rescheduling a job {}", job.getName() ); - OsgiSupport.withService( SchedulerExecutorService.class, - scheduler -> scheduler.schedule( SchedulableTaskImpl.create().job( job ).build(), - duration.isNegative() ? 0 : duration.toMillis(), - TimeUnit.MILLISECONDS ) ); + break; } - catch ( Exception e ) + + QUEUE.remove(); + + if ( jobs.containsKey( peek.name ) ) //there is a job to run { - LOG.warn( "{} job rescheduling failed", job.getName(), e ); + //submit task + + final ScheduledJob job = jobs.get( peek.name ); + + try + { + final TaskId taskId = taskContext( job.getUser() ).callWith( () -> OsgiSupport.withService( TaskService.class, + taskService -> taskService.submitTask( + SubmitTaskParams.create() + .name( job.getName() + .getValue() ) //TODO: check + .descriptorKey( + job.getDescriptor() ) + .data( + job.getConfig() ) + .build() ) ) ); + + adminContext().runWith( () -> OsgiSupport.withService( NodeService.class, nodeService -> UpdateLastRunCommand.create() + .nodeService( nodeService ) + .name( job.getName() ) + .lastRun( Instant.now() ) + .lastTaskId( taskId ) + .build() + .execute() ) ); + } + catch ( Exception e ) + { + failedJobs.add( peek ); + } + catch ( Throwable t ) + { + LOG.error( "Error while running job [{}], no further attempts will be made", job.getName(), t ); + } } - } ) ); + } } + private void retryFailedJobs( final List failedJobs ) + { + failedJobs.forEach( entity -> { + if ( entity.attempts < 10 ) + { + QUEUE.offer( new JobToRun( entity.name, entity.timeToRun, entity.attempts + 1 ) ); + } + else + { + LOG.error( "Error while running job [{}], no further attempts will be made", entity.name ); + } + } ); + } + + private static Context adminContext() { return ContextBuilder.from( ContextAccessor.current() ) @@ -106,4 +226,45 @@ private static Context adminContext() .build() ) .build(); } + + private Context taskContext( final PrincipalKey user ) + { + if ( user == null ) + { + return ContextBuilder.from( ContextAccessor.current() ).authInfo( AuthenticationInfo.unAuthenticated() ).build(); + } + + final AuthenticationInfo authInfo = OsgiSupport.withService( SecurityService.class, securityService -> { + final VerifiedUsernameAuthToken token = new VerifiedUsernameAuthToken(); + token.setIdProvider( user.getIdProviderKey() ); + token.setUsername( user.getId() ); + + return securityService.authenticate( token ); + } ); + + return ContextBuilder.from( ContextAccessor.current() ).authInfo( authInfo ).build(); + } + + private static class JobToRun + { + private final ScheduledJobName name; + + private final Instant timeToRun; + + private final int attempts; + + public JobToRun( final ScheduledJobName name, final Instant timeToRun ) + { + this.name = name; + this.timeToRun = timeToRun; + this.attempts = 0; + } + + public JobToRun( final ScheduledJobName name, final Instant timeToRun, final int attempts ) + { + this.name = name; + this.timeToRun = timeToRun; + this.attempts = attempts; + } + } } diff --git a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java index c9126286044..859df6e4ffa 100644 --- a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java +++ b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/CalendarServiceImplTest.java @@ -30,7 +30,7 @@ public void cron() { final CronCalendar calendar = calendarService.cron( "* * * * *", TimeZone.getTimeZone( "GMT+5:30" ) ); - assertTrue( calendar.nextExecution().get().get( ChronoUnit.SECONDS ) <= 60 ); + assertTrue( calendar.timeToNextExecution().get().get( ChronoUnit.SECONDS ) <= 60 ); assertEquals( TimeZone.getTimeZone( "GMT+5:30" ), calendar.getTimeZone() ); assertEquals( "* * * * *", calendar.getCronValue() ); } @@ -53,7 +53,7 @@ public void oneTime() { final OneTimeCalendar calendar = calendarService.oneTime( Instant.parse( "2014-09-25T10:00:00.00Z" ) ); - assertTrue( calendar.nextExecution().get().isNegative() ); + assertTrue( calendar.timeToNextExecution().get().isNegative() ); assertEquals( Instant.parse( "2014-09-25T10:00:00.00Z" ), calendar.getValue() ); } diff --git a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java index 9ad03481387..700900f27d6 100644 --- a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java +++ b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/CronCalendarTest.java @@ -49,7 +49,7 @@ public void create() timeZone( TimeZone.getDefault() ). build(); - assertTrue( calendar.nextExecution().get().toSeconds() <= 60 ); + assertTrue( calendar.timeToNextExecution().get().toSeconds() <= 60 ); assertEquals( ScheduleCalendarType.CRON, calendar.getType() ); } diff --git a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java index 95d4777e167..28c77cc9168 100644 --- a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java +++ b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/OneTimeCalendarTest.java @@ -35,14 +35,14 @@ public void create() value( Instant.now().plus( Duration.of( 1, ChronoUnit.MINUTES ) ) ). build(); - assertFalse( calendar.nextExecution().get().isNegative() ); + assertFalse( calendar.timeToNextExecution().get().isNegative() ); assertEquals( ScheduleCalendarType.ONE_TIME, calendar.getType() ); calendar = OneTimeCalendarImpl.create(). value( Instant.now().minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ). build(); - assertTrue( calendar.nextExecution().get().isNegative() ); + assertTrue( calendar.timeToNextExecution().get().isNegative() ); } @Test diff --git a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTaskTest.java b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTaskTest.java index ce019cf36d7..d2581bac350 100644 --- a/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTaskTest.java +++ b/modules/core/core-scheduler/src/test/java/com/enonic/xp/impl/scheduler/distributed/RescheduleTaskTest.java @@ -3,14 +3,8 @@ import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.TimeZone; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -26,23 +20,31 @@ import org.osgi.framework.BundleContext; import org.osgi.framework.ServiceReference; -import com.hazelcast.scheduledexecutor.IScheduledFuture; -import com.hazelcast.scheduledexecutor.ScheduledTaskHandler; - import com.enonic.xp.app.ApplicationKey; import com.enonic.xp.core.internal.osgi.OsgiSupportMock; +import com.enonic.xp.data.PropertySet; import com.enonic.xp.data.PropertyTree; -import com.enonic.xp.impl.scheduler.SchedulerExecutorService; +import com.enonic.xp.impl.scheduler.ScheduledJobPropertyNames; +import com.enonic.xp.node.Node; +import com.enonic.xp.node.NodeId; +import com.enonic.xp.node.NodePath; +import com.enonic.xp.node.NodeService; +import com.enonic.xp.node.UpdateNodeParams; import com.enonic.xp.page.DescriptorKey; import com.enonic.xp.scheduler.ScheduledJob; import com.enonic.xp.scheduler.ScheduledJobName; import com.enonic.xp.scheduler.SchedulerService; +import com.enonic.xp.security.IdProviderKey; import com.enonic.xp.security.PrincipalKey; +import com.enonic.xp.security.SecurityService; +import com.enonic.xp.security.auth.AuthenticationInfo; +import com.enonic.xp.security.auth.AuthenticationToken; +import com.enonic.xp.task.SubmitTaskParams; +import com.enonic.xp.task.TaskId; +import com.enonic.xp.task.TaskService; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.isA; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -53,19 +55,34 @@ public class RescheduleTaskTest { @Mock(stubOnly = true) - ServiceReference serviceReference; + ServiceReference taskReference; + + @Mock(stubOnly = true) + ServiceReference nodeReference; @Mock(stubOnly = true) - ServiceReference executorReference; + ServiceReference schedulerReference; + + @Mock(stubOnly = true) + ServiceReference securityReference; + + @Captor + ArgumentCaptor taskCaptor; @Captor - ArgumentCaptor taskCaptor; + ArgumentCaptor tokenCaptor; @Mock private SchedulerService schedulerService; @Mock - private SchedulerExecutorService schedulerExecutorService; + private TaskService taskService; + + @Mock + private NodeService nodeService; + + @Mock + private SecurityService securityService; @Mock private BundleContext bundleContext; @@ -78,11 +95,15 @@ public void setUp() when( bundle.getBundleContext() ).thenReturn( bundleContext ); - when( bundleContext.getServiceReferences( SchedulerService.class, null ) ).thenReturn( List.of( serviceReference ) ); - when( bundleContext.getServiceReferences( SchedulerExecutorService.class, null ) ).thenReturn( List.of( executorReference ) ); + when( bundleContext.getServiceReferences( SchedulerService.class, null ) ).thenReturn( List.of( schedulerReference ) ); + when( bundleContext.getServiceReferences( TaskService.class, null ) ).thenReturn( List.of( taskReference ) ); + when( bundleContext.getServiceReferences( NodeService.class, null ) ).thenReturn( List.of( nodeReference ) ); + when( bundleContext.getServiceReferences( SecurityService.class, null ) ).thenReturn( List.of( securityReference ) ); - when( bundleContext.getService( serviceReference ) ).thenReturn( schedulerService ); - when( bundleContext.getService( executorReference ) ).thenReturn( schedulerExecutorService ); + when( bundleContext.getService( nodeReference ) ).thenReturn( nodeService ); + when( bundleContext.getService( taskReference ) ).thenReturn( taskService ); + when( bundleContext.getService( schedulerReference ) ).thenReturn( schedulerService ); + when( bundleContext.getService( securityReference ) ).thenReturn( securityService ); } @AfterEach @@ -92,38 +113,156 @@ void tearDown() } @Test - public void rescheduleWithDoneAndDisabledTasks() + public void submitOldOneTimeTask() { - mockFutures(); mockJobs(); + when( taskService.submitTask( isA( SubmitTaskParams.class ) ) ).thenReturn( TaskId.from( "123" ) ); - final RescheduleTask task = createAndRunTask(); - assertEquals( "rescheduleTask", task.getName() ); + final Node node = mockNode(); + when( nodeService.update( isA( UpdateNodeParams.class ) ) ).thenReturn( node ); - verify( schedulerExecutorService, times( 2 ) ).schedule( taskCaptor.capture(), anyLong(), isA( TimeUnit.class ) ); + createAndRunTask(); - assertEquals( 2, taskCaptor.getAllValues().size() ); - assertEquals( "task2", taskCaptor.getAllValues().get( 0 ).getName() ); - assertEquals( "task3", taskCaptor.getAllValues().get( 1 ).getName() ); + verify( taskService, times( 1 ) ).submitTask( taskCaptor.capture() ); + assertEquals( "task3", taskCaptor.getValue().getDescriptorKey().getName() ); } + @Test - public void firstJobRescheduleFailedButSecondIsOk() + public void submitInOrder() { - mockFutures(); - mockJobs(); + final Instant now = Instant.now(); + final ScheduledJob job1 = mockOneTimeJob( "job1", now.minus( 2, ChronoUnit.SECONDS ) ); + final ScheduledJob job2 = mockOneTimeJob( "job2", now ); + final ScheduledJob job3 = mockOneTimeJob( "job3", now.minus( 1, ChronoUnit.SECONDS ) ); + + when( schedulerService.list() ).thenReturn( List.of( job1, job2, job3 ) ); - when( schedulerExecutorService.schedule( isA( SchedulableTask.class ), anyLong(), isA( TimeUnit.class ) ) ). - thenThrow( RuntimeException.class ). - then( a -> null ); + when( taskService.submitTask( isA( SubmitTaskParams.class ) ) ).thenReturn( TaskId.from( "1" ) ) + .thenReturn( TaskId.from( "2" ) ) + .thenReturn( TaskId.from( "3" ) ); + + final Node node = mockNode(); + when( nodeService.update( isA( UpdateNodeParams.class ) ) ).thenReturn( node ); createAndRunTask(); - verify( schedulerExecutorService, times( 2 ) ).schedule( taskCaptor.capture(), anyLong(), isA( TimeUnit.class ) ); + verify( taskService, times( 3 ) ).submitTask( taskCaptor.capture() ); - assertEquals( 2, taskCaptor.getAllValues().size() ); - assertEquals( "task2", taskCaptor.getAllValues().get( 0 ).getName() ); - assertEquals( "task3", taskCaptor.getAllValues().get( 1 ).getName() ); + assertEquals( "job1", taskCaptor.getAllValues().get( 0 ).getName() ); + assertEquals( "job3", taskCaptor.getAllValues().get( 1 ).getName() ); + assertEquals( "job2", taskCaptor.getAllValues().get( 2 ).getName() ); + } + + @Test + public void jobSubmitFailedButRetried() + { + final Instant now = Instant.now(); + ScheduledJob job1 = mockOneTimeJob( "job1", now.minus( 1, ChronoUnit.SECONDS ) ); + ScheduledJob job2 = mockOneTimeJob( "job2", now ); + + when( schedulerService.list() ).thenReturn( List.of( job1, job2 ) ); + + final Node node = mockNode(); + when( nodeService.update( isA( UpdateNodeParams.class ) ) ).thenReturn( node ); + + when( taskService.submitTask( isA( SubmitTaskParams.class ) ) ).thenThrow( RuntimeException.class ) + .thenReturn( TaskId.from( "1" ) ); + + createAndRunTask(); + + verify( taskService, times( 2 ) ).submitTask( taskCaptor.capture() ); + + assertEquals( "job1", taskCaptor.getAllValues().get( 0 ).getName() ); + assertEquals( "job2", taskCaptor.getAllValues().get( 1 ).getName() ); + + job1 = mockOneTimeJob( "job1", now.minus( 1, ChronoUnit.SECONDS ), now ); + job2 = mockOneTimeJob( "job2", now, now ); + + when( schedulerService.list() ).thenReturn( List.of( job1, job2 ) ); + + createAndRunTask(); + + verify( taskService, times( 3 ) ).submitTask( taskCaptor.capture() ); + + assertEquals( "job1", taskCaptor.getAllValues().get( 2 ).getName() ); + + when( taskService.submitTask( isA( SubmitTaskParams.class ) ) ).thenThrow( new Error() ); + + createAndRunTask(); + verify( taskService, times( 3 ) ).submitTask( taskCaptor.capture() ); + } + + @Test + public void jobSubmitFailedWithError() + { + final Instant now = Instant.now(); + ScheduledJob job1 = mockOneTimeJob( "job1", now.minus( 1, ChronoUnit.SECONDS ) ); + + when( schedulerService.list() ).thenReturn( List.of( job1 ) ); + + final Node node = mockNode(); + when( nodeService.update( isA( UpdateNodeParams.class ) ) ).thenReturn( node ); + + when( taskService.submitTask( isA( SubmitTaskParams.class ) ) ).thenThrow( new Error() ).thenReturn( TaskId.from( "1" ) ); + + createAndRunTask(); + + verify( taskService, times( 1 ) ).submitTask( taskCaptor.capture() ); + + job1 = mockOneTimeJob( "job1", now.minus( 1, ChronoUnit.SECONDS ), now ); + + when( schedulerService.list() ).thenReturn( List.of( job1 ) ); + createAndRunTask(); + verify( taskService, times( 1 ) ).submitTask( taskCaptor.capture() ); + } + + @Test + public void retryFailedMultipleTimes() + { + final Instant now = Instant.now(); + ScheduledJob job1 = mockOneTimeJob( "job1", now.minus( 1, ChronoUnit.SECONDS ) ); + + when( schedulerService.list() ).thenReturn( List.of( job1 ) ); + + final Node node = mockNode(); + when( nodeService.update( isA( UpdateNodeParams.class ) ) ).thenReturn( node ); + + when( taskService.submitTask( isA( SubmitTaskParams.class ) ) ).thenThrow( new RuntimeException() ); + + for ( int i = 0; i <= 10; i++ ) + { + createAndRunTask(); + } + verify( taskService, times( 11 ) ).submitTask( taskCaptor.capture() ); + } + + @Test + public void submitJobAsUser() + { + final Instant now = Instant.now(); + final PrincipalKey user = PrincipalKey.ofUser( IdProviderKey.createDefault(), "my-user" ); + + ScheduledJob job1 = mockOneTimeJob( "job1", now.minus( 1, ChronoUnit.SECONDS ), user ); + + when( schedulerService.list() ).thenReturn( List.of( job1 ) ); + + final Node node = mockNode(); + when( nodeService.update( isA( UpdateNodeParams.class ) ) ).thenReturn( node ); + + when( taskService.submitTask( isA( SubmitTaskParams.class ) ) ).thenReturn( TaskId.from( "1" ) ); + when( securityService.authenticate( tokenCaptor.capture() ) ).thenReturn( mock( AuthenticationInfo.class ) ); + + createAndRunTask(); + + assertEquals( "default", tokenCaptor.getValue().getIdProvider().toString() ); + verify( taskService, times( 1 ) ).submitTask( taskCaptor.capture() ); + } + + @Test + public void testName() + { + assertEquals( "rescheduleTask", new RescheduleTask().getName() ); } @Test @@ -132,7 +271,6 @@ public void serviceIsDown() { when( bundleContext.getServiceReferences( SchedulerService.class, null ) ).thenReturn( List.of() ); - mockFutures(); mockJobs(); for ( int i = 0; i < 10; i++ ) @@ -151,121 +289,101 @@ private RescheduleTask createAndRunTask() return task; } - private void mockFutures() + private void mockJobs() { - final ScheduledTaskHandler handler1 = mock( ScheduledTaskHandler.class ); - final ScheduledTaskHandler handler2 = mock( ScheduledTaskHandler.class ); - final ScheduledTaskHandler handler3 = mock( ScheduledTaskHandler.class ); - final ScheduledTaskHandler handler4 = mock( ScheduledTaskHandler.class ); - - final IScheduledFuture future1 = mock( IScheduledFuture.class ); - final IScheduledFuture future2 = mock( IScheduledFuture.class ); - final IScheduledFuture future3 = mock( IScheduledFuture.class ); - final IScheduledFuture future4 = mock( IScheduledFuture.class ); - - when( future1.getHandler() ).thenReturn( handler1 ); - when( future2.getHandler() ).thenReturn( handler2 ); - when( future3.getHandler() ).thenReturn( handler3 ); - when( future4.getHandler() ).thenReturn( handler4 ); - - when( handler1.getTaskName() ).thenReturn( "task1" ); - when( handler2.getTaskName() ).thenReturn( "task2" ); - when( handler3.getTaskName() ).thenReturn( "task3" ); - when( handler4.getTaskName() ).thenReturn( "task4" ); - - when( future1.isDone() ).thenReturn( true ); - when( future2.isDone() ).thenReturn( true ); - when( future3.isDone() ).thenReturn( true ); + final ScheduledJob job1 = ScheduledJob.create() + .name( ScheduledJobName.from( "task1" ) ) + .calendar( CronCalendarImpl.create().value( "* * * * *" ).timeZone( TimeZone.getDefault() ).build() ) + .descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task1" ) ) + .config( new PropertyTree() ) + .enabled( false ) + .creator( PrincipalKey.from( "user:system:creator" ) ) + .modifier( PrincipalKey.from( "user:system:creator" ) ) + .createdTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ) + .modifiedTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ) + .build(); + + final ScheduledJob job2 = ScheduledJob.create() + .name( ScheduledJobName.from( "task2" ) ) + .calendar( CronCalendarImpl.create().value( "* * * * *" ).timeZone( TimeZone.getDefault() ).build() ) + .descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task2" ) ) + .config( new PropertyTree() ) + .enabled( true ) + .creator( PrincipalKey.from( "user:system:creator" ) ) + .modifier( PrincipalKey.from( "user:system:modifier" ) ) + .createdTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ) + .modifiedTime( Instant.parse( "2021-02-25T10:44:53.170079900Z" ) ) + .build(); + + final ScheduledJob job3 = mockOneTimeJob( "task3", Instant.now().minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ); + + final ScheduledJob job4 = ScheduledJob.create() + .name( ScheduledJobName.from( "task4" ) ) + .calendar( CronCalendarImpl.create().value( "* * * * *" ).timeZone( TimeZone.getDefault() ).build() ) + .descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task4" ) ) + .config( new PropertyTree() ) + .creator( PrincipalKey.from( "user:system:creator" ) ) + .modifier( PrincipalKey.from( "user:system:modifier" ) ) + .createdTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ) + .modifiedTime( Instant.parse( "2021-02-25T11:44:33.170079900Z" ) ) + .enabled( true ) + .build(); + + when( schedulerService.list() ).thenReturn( List.of( job1, job2, job3, job4 ) ); + } - Map> futures = new HashMap<>( - Map.of( handler1.getTaskName(), future1, handler2.getTaskName(), future2, handler3.getTaskName(), future3, - handler4.getTaskName(), future4 ) ); + private ScheduledJob mockOneTimeJob( final String scheduledJobName, final Instant instant ) + { + return mockOneTimeJob( scheduledJobName, instant, null, null ); + } - when( schedulerExecutorService.getAllFutures() ). - thenAnswer( invocation -> futures.keySet() ); + private ScheduledJob mockOneTimeJob( final String scheduledJobName, final Instant instant, final Instant lastRun ) + { + return mockOneTimeJob( scheduledJobName, instant, lastRun, null ); + } - doAnswer( invocation -> { - final Set doneTasks = futures.entrySet().stream(). - filter( entry -> entry.getValue().isDone() ). - map( Map.Entry::getKey ). - collect( Collectors.toSet() ); + private ScheduledJob mockOneTimeJob( final String scheduledJobName, final Instant instant, final PrincipalKey user ) + { + return mockOneTimeJob( scheduledJobName, instant, null, user ); + } - doneTasks.forEach( futures::remove ); + private ScheduledJob mockOneTimeJob( final String scheduledJobName, final Instant instant, final Instant lastRun, + final PrincipalKey user ) + { + return ScheduledJob.create() + .name( ScheduledJobName.from( scheduledJobName ) ) + .calendar( OneTimeCalendarImpl.create().value( instant ).build() ) + .descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task3" ) ) + .config( new PropertyTree() ) + .enabled( true ) + .creator( PrincipalKey.from( "user:system:creator" ) ) + .modifier( PrincipalKey.from( "user:system:creator" ) ) + .createdTime( Instant.parse( "2021-02-26T10:44:33.170079900Z" ) ) + .modifiedTime( Instant.parse( "2021-02-26T10:44:33.170079900Z" ) ) + .lastRun( lastRun ) + .user( user ) + .build(); + } - return doneTasks; - } ).when( schedulerExecutorService ).disposeAllDone(); + private Node mockNode() + { + final PropertyTree jobData = new PropertyTree(); - doAnswer( invocation -> { + final PropertySet calendar = new PropertySet(); + calendar.addString( ScheduledJobPropertyNames.CALENDAR_TYPE, "ONE_TIME" ); + calendar.addString( ScheduledJobPropertyNames.CALENDAR_VALUE, "2021-02-25T10:44:33.170079900Z" ); - final ScheduledFuture future = futures.remove( (String) invocation.getArgument( 0 ) ); - return future != null; - } ).when( schedulerExecutorService ).dispose( isA( String.class ) ); + jobData.addString( ScheduledJobPropertyNames.DESCRIPTOR, "app:key" ); + jobData.addBoolean( ScheduledJobPropertyNames.ENABLED, true ); + jobData.addSet( ScheduledJobPropertyNames.CALENDAR, calendar ); + jobData.addSet( ScheduledJobPropertyNames.CONFIG, new PropertySet() ); + jobData.setString( ScheduledJobPropertyNames.CREATOR, "user:system:creator" ); + jobData.setString( ScheduledJobPropertyNames.MODIFIER, "user:system:modifier" ); + jobData.setString( ScheduledJobPropertyNames.CREATED_TIME, "2021-02-26T10:44:33.170079900Z" ); + jobData.setString( ScheduledJobPropertyNames.MODIFIED_TIME, "2021-03-26T10:44:33.170079900Z" ); - } + return Node.create().id( NodeId.from( "abc" ) ).name( "test" ).parentPath( NodePath.ROOT ).data( jobData ).build(); - private void mockJobs() - { - final ScheduledJob job1 = ScheduledJob.create(). - name( ScheduledJobName.from( "task1" ) ). - calendar( CronCalendarImpl.create(). - value( "* * * * *" ). - timeZone( TimeZone.getDefault() ). - build() ). - descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task1" ) ). - config( new PropertyTree() ). - enabled( false ). - creator( PrincipalKey.from( "user:system:creator" ) ). - modifier( PrincipalKey.from( "user:system:creator" ) ). - createdTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ). - modifiedTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ). - build(); - - final ScheduledJob job2 = ScheduledJob.create(). - name( ScheduledJobName.from( "task2" ) ). - calendar( CronCalendarImpl.create(). - value( "* * * * *" ). - timeZone( TimeZone.getDefault() ). - build() ). - descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task2" ) ). - config( new PropertyTree() ). - enabled( true ). - creator( PrincipalKey.from( "user:system:creator" ) ). - modifier( PrincipalKey.from( "user:system:modifier" ) ). - createdTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ). - modifiedTime( Instant.parse( "2021-02-25T10:44:53.170079900Z" ) ). - build(); - - final ScheduledJob job3 = ScheduledJob.create(). - name( ScheduledJobName.from( "task3" ) ). - calendar( OneTimeCalendarImpl.create(). - value( Instant.now().minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ). - build() ). - descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task3" ) ). - config( new PropertyTree() ). - enabled( true ). - creator( PrincipalKey.from( "user:system:creator" ) ). - modifier( PrincipalKey.from( "user:system:creator" ) ). - createdTime( Instant.parse( "2021-02-26T10:44:33.170079900Z" ) ). - modifiedTime( Instant.parse( "2021-02-26T10:44:33.170079900Z" ) ). - build(); - - final ScheduledJob job4 = ScheduledJob.create(). - name( ScheduledJobName.from( "task4" ) ). - calendar( CronCalendarImpl.create(). - value( "* * * * *" ). - timeZone( TimeZone.getDefault() ). - build() ). - descriptor( DescriptorKey.from( ApplicationKey.from( "com.enonic.app.test" ), "task4" ) ). - config( new PropertyTree() ). - creator( PrincipalKey.from( "user:system:creator" ) ). - modifier( PrincipalKey.from( "user:system:modifier" ) ). - createdTime( Instant.parse( "2021-02-25T10:44:33.170079900Z" ) ). - modifiedTime( Instant.parse( "2021-02-25T11:44:33.170079900Z" ) ). - enabled( true ). - build(); - - when( schedulerService.list() ). - thenReturn( List.of( job1, job2, job3, job4 ) ); } } diff --git a/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java b/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java index d5e4cda32fe..52a931c6cea 100644 --- a/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java +++ b/modules/server/server-rest/src/test/java/com/enonic/xp/impl/server/rest/SchedulerResourceTest.java @@ -56,7 +56,7 @@ public TimeZone getTimeZone() return TimeZone.getTimeZone( "GMT+3:00" ); } - public Optional nextExecution() + public Optional timeToNextExecution() { return Optional.of( Duration.ofSeconds( 50 ) ); } @@ -77,7 +77,7 @@ public Instant getValue() } @Override - public Optional nextExecution() + public Optional timeToNextExecution() { return Optional.of( Duration.ofSeconds( 50 ) ); }