Skip to content

Commit

Permalink
Don't use hazelcast executor in task scheduling #10122
Browse files Browse the repository at this point in the history
  • Loading branch information
vbradnitski committed Apr 20, 2023
1 parent c470ea4 commit 1a6a2a6
Show file tree
Hide file tree
Showing 9 changed files with 479 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Optional;

import com.enonic.xp.annotation.PublicApi;
Expand All @@ -10,7 +12,9 @@
public interface ScheduleCalendar
extends Serializable
{
Optional<Duration> nextExecution();
Optional<Duration> timeToNextExecution();

Optional<ZonedDateTime> nextExecution( Instant instant );

ScheduleCalendarType getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.Serializable;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.TimeZone;
Expand Down Expand Up @@ -58,7 +59,13 @@ public static Builder create()
}

@Override
public Optional<Duration> nextExecution()
public Optional<ZonedDateTime> nextExecution( final Instant instant )
{
return this.executionTime.nextExecution( ZonedDateTime.ofInstant( instant, timeZone.toZoneId() ) );
}

@Override
public Optional<Duration> timeToNextExecution()
{
return this.executionTime.timeToNextExecution( ZonedDateTime.now( timeZone.toZoneId() ) );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Optional;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -33,11 +35,17 @@ public Instant getValue()
}

@Override
public Optional<Duration> nextExecution()
public Optional<Duration> timeToNextExecution()
{
return Optional.of( Duration.between( Instant.now(), value ) );
}

@Override
public Optional<ZonedDateTime> nextExecution( final Instant instant )
{
return Optional.of( value.atZone( ZoneId.systemDefault() ) );
}

@Override
public ScheduleCalendarType getType()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package com.enonic.xp.impl.scheduler.distributed;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,14 +17,22 @@
import com.enonic.xp.context.ContextAccessor;
import com.enonic.xp.context.ContextBuilder;
import com.enonic.xp.core.internal.osgi.OsgiSupport;
import com.enonic.xp.impl.scheduler.SchedulerExecutorService;
import com.enonic.xp.impl.scheduler.UpdateLastRunCommand;
import com.enonic.xp.node.NodeService;
import com.enonic.xp.scheduler.CronCalendar;
import com.enonic.xp.scheduler.ScheduleCalendarType;
import com.enonic.xp.scheduler.ScheduledJob;
import com.enonic.xp.scheduler.ScheduledJobName;
import com.enonic.xp.scheduler.SchedulerService;
import com.enonic.xp.security.PrincipalKey;
import com.enonic.xp.security.RoleKeys;
import com.enonic.xp.security.SecurityService;
import com.enonic.xp.security.User;
import com.enonic.xp.security.auth.AuthenticationInfo;
import com.enonic.xp.security.auth.VerifiedUsernameAuthToken;
import com.enonic.xp.task.SubmitTaskParams;
import com.enonic.xp.task.TaskId;
import com.enonic.xp.task.TaskService;

public class RescheduleTask
implements SchedulableTask
Expand All @@ -32,12 +45,69 @@ public class RescheduleTask

private static final AtomicInteger FAILED_COUNT = new AtomicInteger( 0 );

private static final PriorityQueue<JobToRun> QUEUE = new PriorityQueue<>( Comparator.comparing( a -> a.timeToRun ) );


@Override
public String getName()
{
return NAME;
}

private static void fetchJobsToSchedule( final Map<ScheduledJobName, ScheduledJob> jobs )
{
final Instant now = Instant.now();
final Set<ScheduledJobName> scheduledJobNames = QUEUE.stream().map( entity -> entity.name ).collect( Collectors.toSet() );

scheduleCronJobs( jobs, now, scheduledJobNames );
scheduleOneTimeJobs( jobs, scheduledJobNames );
}

private static void scheduleOneTimeJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final Set<ScheduledJobName> scheduledJobNames )
{
jobs.values()
.stream()
.filter( ScheduledJob::isEnabled )
.filter( job -> !scheduledJobNames.contains( job.getName() ) )
.filter( job -> ScheduleCalendarType.ONE_TIME.equals( job.getCalendar().getType() ) && job.getLastRun() == null )
.forEach( ( job ) -> {
job.getCalendar()
.timeToNextExecution()
.ifPresent( duration -> QUEUE.offer( new JobToRun( job.getName(), Instant.now().plus( duration ) ) ) );
} );
}

private static void scheduleCronJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final Instant now,
final Set<ScheduledJobName> scheduledJobNames )
{
jobs.values()
.stream()
.filter( ScheduledJob::isEnabled )
.filter( job -> !scheduledJobNames.contains( job.getName() ) )
.filter( job -> ScheduleCalendarType.CRON.equals( job.getCalendar().getType() ) )
.forEach( job -> {
final Instant actualLastRun = job.getLastRun();
final CronCalendar calendar = (CronCalendar) job.getCalendar();

if ( actualLastRun != null )
{
calendar.nextExecution( actualLastRun ).ifPresent( nextExecution -> {
if ( nextExecution.isBefore( now.atZone( calendar.getTimeZone().toZoneId() ) ) )
{
QUEUE.offer( new JobToRun( job.getName(), now ) );
}
} );
}
else
{
job.getCalendar()
.timeToNextExecution()
.ifPresent( nextExecutionDuration -> QUEUE.offer(
new JobToRun( job.getName(), Instant.now().plus( nextExecutionDuration ) ) ) );
}
} );
}

@Override
public void run()
{
Expand All @@ -46,13 +116,15 @@ public void run()
this.doRun();
FAILED_COUNT.set( 0 );
}
catch ( IllegalStateException e )
catch ( Exception e )
{
if ( FAILED_COUNT.addAndGet( 1 ) >= 10 )
{
FAILED_COUNT.set( 0 );
LOG.warn( "Problem during tasks scheduling", e );
} else {
}
else
{
LOG.debug( "Problem during tasks scheduling", e );
}
}
Expand All @@ -64,39 +136,87 @@ public void run()

private void doRun()
{
final List<ScheduledJob> jobs =
OsgiSupport.withService( SchedulerService.class, schedulerService -> adminContext().callWith( schedulerService::list ) );
final Map<ScheduledJobName, ScheduledJob> jobs =
OsgiSupport.withService( SchedulerService.class, schedulerService -> adminContext().callWith( schedulerService::list ) )
.stream()
.collect( Collectors.toMap( ScheduledJob::getName, job -> job ) );

OsgiSupport.withService( SchedulerExecutorService.class, schedulerExecutorService -> {
schedulerExecutorService.disposeAllDone();
return null;
} );
fetchJobsToSchedule( jobs );

final Set<String> scheduledJobs =
OsgiSupport.withService( SchedulerExecutorService.class, SchedulerExecutorService::getAllFutures );
final List<JobToRun> failedJobs = new ArrayList<>();

LOG.debug( "Currently scheduled jobs {}", scheduledJobs );
scheduleJobs( jobs, failedJobs );

jobs.stream()
.filter( ScheduledJob::isEnabled )
.filter( job -> !scheduledJobs.contains( job.getName().getValue() ) )
.filter( job -> !ScheduleCalendarType.ONE_TIME.equals( job.getCalendar().getType() ) || job.getLastRun() == null )
.forEach( job -> job.getCalendar().nextExecution().ifPresent( duration -> {
try
retryFailedJobs( failedJobs );
}

private void scheduleJobs( final Map<ScheduledJobName, ScheduledJob> jobs, final List<JobToRun> failedJobs )
{
while ( !QUEUE.isEmpty() )
{
final JobToRun peek = QUEUE.peek();

if ( peek.timeToRun.isAfter( Instant.now() ) )
{
LOG.debug( "Rescheduling a job {}", job.getName() );
OsgiSupport.withService( SchedulerExecutorService.class,
scheduler -> scheduler.schedule( SchedulableTaskImpl.create().job( job ).build(),
duration.isNegative() ? 0 : duration.toMillis(),
TimeUnit.MILLISECONDS ) );
break;
}
catch ( Exception e )

QUEUE.remove();

if ( jobs.containsKey( peek.name ) ) //there is a job to run
{
LOG.warn( "{} job rescheduling failed", job.getName(), e );
//submit task

final ScheduledJob job = jobs.get( peek.name );

try
{
final TaskId taskId = taskContext( job.getUser() ).callWith( () -> OsgiSupport.withService( TaskService.class,
taskService -> taskService.submitTask(
SubmitTaskParams.create()
.name( job.getName()
.getValue() ) //TODO: check
.descriptorKey(
job.getDescriptor() )
.data(
job.getConfig() )
.build() ) ) );

adminContext().runWith( () -> OsgiSupport.withService( NodeService.class, nodeService -> UpdateLastRunCommand.create()
.nodeService( nodeService )
.name( job.getName() )
.lastRun( Instant.now() )
.lastTaskId( taskId )
.build()
.execute() ) );
}
catch ( Exception e )
{
failedJobs.add( peek );
}
catch ( Throwable t )
{
LOG.error( "Error while running job [{}], no further attempts will be made", job.getName(), t );
}
}
} ) );
}
}

private void retryFailedJobs( final List<JobToRun> failedJobs )
{
failedJobs.forEach( entity -> {
if ( entity.attempts < 10 )
{
QUEUE.offer( new JobToRun( entity.name, entity.timeToRun, entity.attempts + 1 ) );
}
else
{
LOG.error( "Error while running job [{}], no further attempts will be made", entity.name );
}
} );
}


private static Context adminContext()
{
return ContextBuilder.from( ContextAccessor.current() )
Expand All @@ -106,4 +226,45 @@ private static Context adminContext()
.build() )
.build();
}

private Context taskContext( final PrincipalKey user )
{
if ( user == null )
{
return ContextBuilder.from( ContextAccessor.current() ).authInfo( AuthenticationInfo.unAuthenticated() ).build();
}

final AuthenticationInfo authInfo = OsgiSupport.withService( SecurityService.class, securityService -> {
final VerifiedUsernameAuthToken token = new VerifiedUsernameAuthToken();
token.setIdProvider( user.getIdProviderKey() );
token.setUsername( user.getId() );

return securityService.authenticate( token );
} );

return ContextBuilder.from( ContextAccessor.current() ).authInfo( authInfo ).build();
}

private static class JobToRun
{
private final ScheduledJobName name;

private final Instant timeToRun;

private final int attempts;

public JobToRun( final ScheduledJobName name, final Instant timeToRun )
{
this.name = name;
this.timeToRun = timeToRun;
this.attempts = 0;
}

public JobToRun( final ScheduledJobName name, final Instant timeToRun, final int attempts )
{
this.name = name;
this.timeToRun = timeToRun;
this.attempts = attempts;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void cron()
{
final CronCalendar calendar = calendarService.cron( "* * * * *", TimeZone.getTimeZone( "GMT+5:30" ) );

assertTrue( calendar.nextExecution().get().get( ChronoUnit.SECONDS ) <= 60 );
assertTrue( calendar.timeToNextExecution().get().get( ChronoUnit.SECONDS ) <= 60 );
assertEquals( TimeZone.getTimeZone( "GMT+5:30" ), calendar.getTimeZone() );
assertEquals( "* * * * *", calendar.getCronValue() );
}
Expand All @@ -53,7 +53,7 @@ public void oneTime()
{
final OneTimeCalendar calendar = calendarService.oneTime( Instant.parse( "2014-09-25T10:00:00.00Z" ) );

assertTrue( calendar.nextExecution().get().isNegative() );
assertTrue( calendar.timeToNextExecution().get().isNegative() );
assertEquals( Instant.parse( "2014-09-25T10:00:00.00Z" ), calendar.getValue() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void create()
timeZone( TimeZone.getDefault() ).
build();

assertTrue( calendar.nextExecution().get().toSeconds() <= 60 );
assertTrue( calendar.timeToNextExecution().get().toSeconds() <= 60 );
assertEquals( ScheduleCalendarType.CRON, calendar.getType() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ public void create()
value( Instant.now().plus( Duration.of( 1, ChronoUnit.MINUTES ) ) ).
build();

assertFalse( calendar.nextExecution().get().isNegative() );
assertFalse( calendar.timeToNextExecution().get().isNegative() );
assertEquals( ScheduleCalendarType.ONE_TIME, calendar.getType() );

calendar = OneTimeCalendarImpl.create().
value( Instant.now().minus( Duration.of( 1, ChronoUnit.SECONDS ) ) ).
build();

assertTrue( calendar.nextExecution().get().isNegative() );
assertTrue( calendar.timeToNextExecution().get().isNegative() );
}

@Test
Expand Down
Loading

0 comments on commit 1a6a2a6

Please sign in to comment.