Skip to content

Commit

Permalink
[GOBBLIN-2108] fix quartz not able to create non static inner class, …
Browse files Browse the repository at this point in the history
…create a separa… (apache#3998)

* fix quartz not able to create non static inner class, create a separate scheduler for reminder jobs
  • Loading branch information
arjun4084346 authored Jul 15, 2024
1 parent aabb8b9 commit 4f9dd3c
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.impl.StdSchedulerFactory;

import com.google.common.base.Optional;
Expand All @@ -44,52 +43,35 @@
@Singleton
public class SchedulerService extends AbstractIdleService {

private SchedulerFactory schedulerFactory;
// Refers to traditional job scheduler
@Getter
private Scheduler scheduler;
private final boolean waitForJobCompletion;
private final Optional<Properties> quartzProps;

public SchedulerService(boolean waitForJobCompletion, Optional<Properties> quartzConfig,
StdSchedulerFactory schedulerFactory) {
public SchedulerService(boolean waitForJobCompletion, Optional<Properties> quartzConfig) {
this.waitForJobCompletion = waitForJobCompletion;
this.quartzProps = quartzConfig;
if (this.schedulerFactory == null) {
this.schedulerFactory = new StdSchedulerFactory();
}
else {
this.schedulerFactory = schedulerFactory;
}
}

public SchedulerService(Properties props, StdSchedulerFactory schedulerFactory) {
public SchedulerService(Properties props) {
this(Boolean.parseBoolean(
props.getProperty(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION)),
Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props, Optional.of("org.quartz."))), schedulerFactory);
}

public SchedulerService(Properties props) {
this(props, null);
}

public SchedulerService(Config cfg) {
this(cfg, null);
ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION)),
Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props, Optional.of("org.quartz."))));
}

@Inject
public SchedulerService(Config cfg, StdSchedulerFactory schedulerFactory) {
public SchedulerService(Config cfg) {
this(cfg.hasPath(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) ?
cfg.getBoolean(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) :
Boolean.parseBoolean(ConfigurationKeys.DEFAULT_SCHEDULER_WAIT_FOR_JOB_COMPLETION),
Optional.of(ConfigUtils.configToProperties(cfg, "org.quartz.")), schedulerFactory);
Optional.of(ConfigUtils.configToProperties(cfg, "org.quartz.")));
}

@Override protected void startUp() throws SchedulerException {
if (this.quartzProps.isPresent() && this.quartzProps.get().size() > 0) {
// Cast to StdSchedulerFactory to reference initialization method that generic interface does not provide
((StdSchedulerFactory) schedulerFactory).initialize(this.quartzProps.get());
StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
if (this.quartzProps.isPresent() && !this.quartzProps.get().isEmpty()) {
schedulerFactory.initialize(this.quartzProps.get());
}
this.scheduler = schedulerFactory.getScheduler();
this.scheduler.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.Objects;

import org.apache.helix.HelixManager;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,8 +76,8 @@
import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
import org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
import org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
Expand Down Expand Up @@ -200,9 +199,6 @@ binding time (optionally bound classes cannot have names associated with them),
binder.bind(FlowLaunchHandler.class);
}

// Note: only one SchedulerFactory instance should exist per JVM
binder.bind(StdSchedulerFactory.class).in(Singleton.class);

OptionalBinder.newOptionalBinder(binder, DagManagement.class);
OptionalBinder.newOptionalBinder(binder, DagTaskStream.class);
OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Date;
import java.util.Properties;
import java.util.function.Supplier;

import org.quartz.Job;
Expand All @@ -33,9 +34,14 @@
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.spi.JobFactory;
import org.quartz.spi.TriggerFiredBundle;

import com.google.common.annotations.VisibleForTesting;

import javax.inject.Inject;
import javax.inject.Singleton;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
Expand All @@ -56,19 +62,29 @@
@Slf4j
@Singleton
public class DagActionReminderScheduler {
public static final String DAG_ACTION_REMINDER_SCHEDULER_NAME = "DagActionReminderScheduler";
public static final String RetryReminderKeyGroup = "RetryReminder";
public static final String DeadlineReminderKeyGroup = "DeadlineReminder";
private final Scheduler quartzScheduler;
@VisibleForTesting
final Scheduler quartzScheduler;
private final DagManagement dagManagement;

@Inject
public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory, DagManagement dagManagement)
throws SchedulerException {
public DagActionReminderScheduler(DagManagement dagManagement) throws SchedulerException {
// Creates a new Scheduler to be used solely for the DagProc reminders
this.quartzScheduler = schedulerFactory.getScheduler();
this.quartzScheduler = createScheduler();
this.quartzScheduler.start();
this.quartzScheduler.setJobFactory(new ReminderJobFactory());
this.dagManagement = dagManagement;
}

private Scheduler createScheduler() throws SchedulerException {
Properties properties = new Properties();
properties.setProperty("org.quartz.scheduler.instanceName", DAG_ACTION_REMINDER_SCHEDULER_NAME);
properties.setProperty("org.quartz.threadPool.threadCount", "10");
return new StdSchedulerFactory(properties).getScheduler();
}

/**
* Uses a dagAction & reminder duration in milliseconds to create a reminder job that will fire
* `reminderDurationMillis` after the current time
Expand All @@ -94,46 +110,13 @@ public void unscheduleReminderJob(DagActionStore.LeaseParams leaseParams, boolea
}
}

/**
* Static class used to store information regarding a pending dagAction that needs to be revisited at a later time
* by {@link DagManagement} interface to re-attempt a lease on if it has not been completed by the previous owner.
* These jobs are scheduled and used by the {@link DagActionReminderScheduler}.
*/
public class ReminderJob implements Job {
public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";

@Override
public void execute(JobExecutionContext context) {
// Get properties from the trigger to create a dagAction
JobDataMap jobDataMap = context.getMergedJobDataMap();
String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
String flowGroup = jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
long flowExecutionId = jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType = (DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);

DagActionStore.LeaseParams reminderLeaseParams = new DagActionStore.LeaseParams(
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType),
true, eventTimeMillis);
log.info("DagProc reminder triggered for dagAction event: {}", reminderLeaseParams);

try {
dagManagement.addDagAction(reminderLeaseParams);
} catch (IOException e) {
log.error("Failed to add DagAction event to DagManagement. dagAction event: {}", reminderLeaseParams);
}
}
}

/**
* Creates a key for the reminder job by concatenating all dagAction fields and the eventTime of the dagAction.
*
* <p>
* This ensures unique keys for multiple instances of the same action on the same flow execution that originate more
* than 'epsilon' apart. {@link MultiActiveLeaseArbiter} uses the eventTime to distinguish these distinct occurrences
* of the same action. This is necessary to prevent insertion failures due to previous reminders.
*
* <p>
* Applicable only for KILL and RESUME actions; duplication for other actions is an error.
*/
public static String createDagActionReminderKey(DagActionStore.LeaseParams leaseParams) {
Expand Down Expand Up @@ -192,4 +175,46 @@ public static Trigger createReminderJobTrigger(DagActionStore.LeaseParams leaseP
.startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
.build();
}

public class ReminderJobFactory implements JobFactory {
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) {
return new ReminderJob(dagManagement);
}
}

/**
* These jobs are scheduled and used by the {@link DagActionReminderScheduler}.
* When the reminder deadline is completed, these jobs are invoked by Quartz scheduler.
* They create a {@link DagActionStore.LeaseParams} and forward them to {@link DagManagement} for further processing.
*/
@RequiredArgsConstructor
public static class ReminderJob implements Job {
public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
public static final String FLOW_ACTION_EVENT_TIME_KEY = "flow.eventTime";
private final DagManagement dagManagement;

@Override
public void execute(JobExecutionContext context) {
// Get properties from the trigger to create a dagAction
JobDataMap jobDataMap = context.getMergedJobDataMap();
String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
String flowGroup = jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
long flowExecutionId = jobDataMap.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
DagActionStore.DagActionType dagActionType = (DagActionStore.DagActionType) jobDataMap.get(FLOW_ACTION_TYPE_KEY);
long eventTimeMillis = jobDataMap.getLong(FLOW_ACTION_EVENT_TIME_KEY);

DagActionStore.LeaseParams reminderLeaseParams = new DagActionStore.LeaseParams(
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName, dagActionType),
true, eventTimeMillis);
log.info("DagProc reminder triggered for dagAction event: {}", reminderLeaseParams);

try {
dagManagement.addDagAction(reminderLeaseParams);
} catch (IOException e) {
log.error("Failed to add DagAction event to DagManagement. dagAction event: {}", reminderLeaseParams);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.gobblin.service.modules.orchestration;

import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.function.Supplier;

import org.mockito.Mockito;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerUtils;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.spi.OperableTrigger;
Expand Down Expand Up @@ -57,14 +64,13 @@ public class DagActionReminderSchedulerTest {
DagActionStore.LeaseParams launchLeaseParams = new DagActionStore.LeaseParams(launchDagAction, eventTimeMillis);
DagActionStore.LeaseParams launchLeaseParams2 = new DagActionStore.LeaseParams(launchDagAction, eventTimeMillis2);
DagActionReminderScheduler dagActionReminderScheduler;
DagManagement dagManagement = mock(DagManagement.class);
private static boolean testJobRan = false;

@BeforeClass
private void setup() throws Exception {
StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
schedulerFactory.getScheduler();
DagManagement dagManagement = mock(DagManagement.class);
doNothing().when(dagManagement).addDagAction(any());
this.dagActionReminderScheduler = new DagActionReminderScheduler(schedulerFactory, dagManagement);
this.dagActionReminderScheduler = new DagActionReminderScheduler(this.dagManagement);
}

@Test
Expand Down Expand Up @@ -109,4 +115,53 @@ public void testRemindersForMultipleFlowExecutions() throws SchedulerException {
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams, true);
this.dagActionReminderScheduler.unscheduleReminderJob(launchLeaseParams2, true);
}

// Test multiple schedulers can co-exist and run their jobs of different types
@Test
public void testMultipleSchedules() throws SchedulerException, InterruptedException, IOException {
JobDetail jobDetail = DagActionReminderScheduler.createReminderJobDetail(launchLeaseParams, false);
Scheduler scheduler1 = this.dagActionReminderScheduler.quartzScheduler;
Scheduler scheduler2 = new StdSchedulerFactory().getScheduler();

Assert.assertNotSame(scheduler1, scheduler2);

this.dagActionReminderScheduler.scheduleReminder(launchLeaseParams, 100L, false);

Assert.assertTrue(dagActionReminderScheduler.quartzScheduler.checkExists(jobDetail.getKey()));

Thread.sleep(200L);

// verify that the quartz job ran
Mockito.verify(this.dagManagement, Mockito.times(1)).addDagAction(any());
// verify that the quartz job cleaned itself without throwing any exception
Assert.assertFalse(dagActionReminderScheduler.quartzScheduler.checkExists(jobDetail.getKey()));

scheduler2.start();

JobDetail job = JobBuilder.newJob(TestJob.class)
.withIdentity("myJob", "group1")
.build();

Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("myTrigger", "group1")
.startAt(new Date(System.currentTimeMillis() + 100L))
.build();

scheduler2.scheduleJob(job, trigger);

Assert.assertTrue(scheduler2.checkExists(job.getKey()));
Thread.sleep(200L);

// verify that the quartz job ran
Assert.assertTrue(DagActionReminderSchedulerTest.testJobRan);
// verify that the quartz job cleaned itself without throwing any exception
Assert.assertFalse(scheduler2.checkExists(jobDetail.getKey()));
}

public static class TestJob implements Job {
@Override
public void execute(JobExecutionContext context) {
DagActionReminderSchedulerTest.testJobRan = true;
}
}
}

0 comments on commit 4f9dd3c

Please sign in to comment.